Spring Cloud Stream
참고
- Spring Cloud Stream Reference Guide (opens in a new tab)
- docs.spring.io - spring cloud stream / Spring Cloud Function Support (opens in a new tab)
- docs.spring.io - Producing and Consuming Messages (opens in a new tab)
- spring-cloud-stream/Spring Cloud Stream Reference Documentation/Testing (opens in a new tab)
- Streaming with Spring Cloud (opens in a new tab)
- Introduction to Spring Cloud Stream (opens in a new tab)
- Guide to Spring Cloud Stream with Kafka, Apache Avro and Confluent Schema Registry (opens in a new tab)
Spring Cloud Stream 이란?
Spring Cloud Stream 은 추상화된 binder 를 제공합니다. 그리고 애플리케이션은 binder 를 통해서 input, output 을 주고 받습니다. kafka 를 사용할 경우 kafka 로부터의 메시지를 Consume 할 때에는 kafka-binder 를 이용해서 input을 통해서 접근 가능하고, 메시지를 Produce 할 경우에는 kafka-binder 의 output 기능을 통해서 데이터를 접근 가능합니다.
의존성
// ...
repositories {
mavenCentral()
}
extra["springCloudVersion"] = "2023.0.0"
dependencies {
implementation("org.springframework.cloud:spring-cloud-stream")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.cloud:spring-cloud-stream-test-binder")
}
dependencyManagement {
imports {
mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
}
}
// ...
spring.cloud.function
등록
application-stream-function.yaml
spring:
cloud:
function:
definition: increment;livenessCheck;stringToBigDecimal
input, output bind 컨벤션
위에서 살펴본 spring.cloud.function
에 등록된 함수에 입력인자명, 출력값은 아래의 컨벤션에 따라 binding 이 생성됩니다.
입력(input) 인자값
{cloud function bean 이름}-in-{argument index}
- e.g.
consumeMessage-in-0
spring.cloud.function
에 등록한consumeMessage
함수의 0번째 입력인자 를 의미합니다.
출력(output) 컨벤션
{cloud function bean 이름}-out-{return index}
- e.g.
supplyReady-out-0
spring.cloud.function
에 등록한supplyReady
함수의 0번째 출력(return)값을 의미합니다.
예를 들면 위와 같은 binding 컨벤션으로 생성한 입력, 출력 명세에 따라 아래와 같은 방식으로 데이터를 주고 받을 수 있습니다.
package io.chagchagchag.example.foobar.spring_cloud_stream;
// ...
@Import(TestChannelBinderConfiguration.class)
@ActiveProfiles("stream-function")
@SpringBootTest(classes = SpringCloudStreamApplication.class)
public class SpringCloudFunctionTest {
@Autowired
InputDestination inputDestination;
@Autowired
OutputDestination outputDestination;
@Autowired
ConcurrentHashMap counterMap;
@BeforeEach
public void reset(){
counterMap.clear();
}
@DisplayName("COUNTER_STREAM_MESSAGING")
@Test
public void TEST_COUNTER_STREAM_MESSAGING(){
// given
var ticker = "MSFT";
var input = new GenericMessage<>(ticker);
var inputBinding = "increment-in-0";
// when
inputDestination.send(input, inputBinding);
// then
assertThat(counterMap.getOrDefault(ticker, 0)).isEqualTo(1);
}
// ...
}
간단한 블랙박스 테스트입니다. Spring Cloud Function 의 내용이 무엇인지 전혀 모르는 상태로 메시지를 InputDestination 을 이용해서 Message를 보낼 경우 정상적으로 상태가 반영되었는지를 체크합니다.
Spring Cloud Function 으로 등록한 increment
Bean 의 Consumer 의 입력값 인자값에 해당하는 increment-in-0
을 inputBinding 으로 지정해줬기에 해당 Consumer 를 잘 찾아서 원하는 상태값으로 변경시켜준 것을 확인 가능합니다.
InputDestination, OutputDestination
참고로 InputDestination, OutputDestination 클래스들은 test
패키지 아래에 있는 클래스 들이기에 Application 레벨에서 호출하는 것이 불가능합니다. 보통 통합 테스트를 위해 InputDestination, OutputDestination 기반 코드를 작성하는 편입니다.
InputDestination
InputDestination 클래스는 아래와 같이 정의되어 있습니다.
package org.springframework.cloud.stream.binder.test;
import org.springframework.messaging.Message;
public class InputDestination extends AbstractDestination {
public InputDestination() {
}
public void send(Message<?> message) {
this.getChannel(0).send(message);
}
public void send(Message<?> message, String destinationName) {
this.getChannelByName(destinationName).send(message);
}
}
InputDestination 에서 사용되는 Message (opens in a new tab) 는 interface 이고, InputDestination 에 주로 바인딩하는 구현체는 GenericMessage (opens in a new tab) 인데, GenericMessage 의 정의는 아래와 같습니다.
package org.springframework.messaging.support;
// ...
public class GenericMessage<T> implements Message<T>, Serializable {
private static final long serialVersionUID = 4268801052358035098L;
private final T payload;
private final MessageHeaders headers;
public GenericMessage(T payload) {
this(payload, new MessageHeaders((Map)null));
}
public GenericMessage(T payload, Map<String, Object> headers) {
this(payload, new MessageHeaders(headers));
}
public GenericMessage(T payload, MessageHeaders headers) {
Assert.notNull(payload, "Payload must not be null");
Assert.notNull(headers, "MessageHeaders must not be null");
this.payload = payload;
this.headers = headers;
}
// ...
}
OutputDestination
package org.springframework.cloud.stream.binder.test;
// ...
public class OutputDestination extends AbstractDestination {
private final Log log = LogFactory.getLog(OutputDestination.class);
private final ConcurrentHashMap<String, BlockingQueue<Message<byte[]>>> messageQueues = new ConcurrentHashMap();
public OutputDestination() {
}
public Message<byte[]> receive(long timeout, String bindingName) {
try {
bindingName = bindingName.endsWith(".destination") ? bindingName : bindingName + ".destination";
return (Message)this.outputQueue(bindingName).poll(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException var5) {
Thread.currentThread().interrupt();
return null;
}
}
// ...
public Message<byte[]> receive() {
return this.receive(0L, 0);
}
public Message<byte[]> receive(long timeout) {
return this.receive(timeout, 0);
}
// ...
}
Consumer, Supplier, Function
Consumer 구현, 테스트
ConcurrentHashMap<String, Integer>
타입의 counterMap 을 Bean 으로 등록하고 이것을 기반으로 여러가지 키가 몇번 조회되었는지 카운트하는 간단한 예제를 살펴보겠습니다.
설정) StreamFunctionsConfig.java
package io.chagchagchag.example.foobar.spring_cloud_stream.config;
// ...
@Slf4j
@Configuration
public class StreamFunctionsConfig {
@Bean
public ConcurrentHashMap<String, Integer> counterMap(){
return new ConcurrentHashMap<>();
}
@Bean
public Consumer<Flux<String>> increment(ConcurrentHashMap<String, Integer> counterMap){
return fluxKey -> {
fluxKey.subscribe(key -> {
counterMap.computeIfPresent(key, (k, v) ->v+1);
counterMap.computeIfAbsent(key, v -> 1);
});
};
}
// ...
}
그리고 위의 설정을 통해서 카운팅이 잘 이뤄지는지 테스트하는 코드는 아래와 같습니다.
단위테스트) IncrementTest.java
package io.chagchagchag.example.foobar.spring_cloud_stream.config;
// ...
@Import(TestChannelBinderConfiguration.class)
@ActiveProfiles("stream-function")
@SpringBootTest(classes = SpringCloudStreamApplication.class)
public class IncrementTest {
StreamFunctionsConfig streamFunctionsConfig = new StreamFunctionsConfig();
ConcurrentHashMap<String, Integer> counterMap = new ConcurrentHashMap<>();
@DisplayName("INCREMENT_TEST")
@Test
public void TEST_INCREMENT_TEST(){
// given
var tickersFlux = Flux.just("NVDA","SMCI","MSFT", "NVDA");
// when
streamFunctionsConfig
.increment(counterMap)
.accept(tickersFlux);
// then
Assertions.assertEquals(counterMap.get("NVDA"), 2);
Assertions.assertEquals(counterMap.get("SMCI"), 1);
Assertions.assertEquals(counterMap.get("MSFT"), 1);
}
}
통합테스트) SpringCloudFunctionTest.java
참고로 InputDestination, OutputDestination 클래스들은
test
패키지 아래에 있는 클래스 들이기에 Application 레벨에서 호출하는 것이 불가능합니다. 보통 통합 테스트를 위해 InputDestination, OutputDestination 기반 코드를 작성하는 편입니다.
이번에는 블랙박스 테스트입니다. Spring Cloud Function 의 내용이 무엇인지 전혀 모르는 상태로 메시지를 InputDestination 을 이용해서 Message를 보낼 경우 정상적으로 상태가 반영되었는지를 체크합니다.
Spring Cloud Function 으로 등록한 increment
Bean 의 Consumer 의 입력값 인자값에 해당하는 increment-in-0
을 inputBinding 으로 지정해줬기에 해당 Consumer 를 잘 찾아서 원하는 상태값으로 변경시켜준 것을 확인 가능합니다.
package io.chagchagchag.example.foobar.spring_cloud_stream;
// ...
@Import(TestChannelBinderConfiguration.class)
@ActiveProfiles("stream-function")
@SpringBootTest(classes = SpringCloudStreamApplication.class)
public class SpringCloudFunctionTest {
@Autowired
InputDestination inputDestination;
@Autowired
OutputDestination outputDestination;
@Autowired
ConcurrentHashMap counterMap;
@BeforeEach
public void reset(){
counterMap.clear();
}
@DisplayName("COUNTER_STREAM_MESSAGING")
@Test
public void TEST_COUNTER_STREAM_MESSAGING(){
// given
var ticker = "MSFT";
var input = new GenericMessage<>(ticker);
var inputBinding = "increment-in-0";
// when
inputDestination.send(input, inputBinding);
// then
assertThat(counterMap.getOrDefault(ticker, 0)).isEqualTo(1);
}
// ...
}
Supplier 구현, 테스트
설정) StreamFunctionsConfig.java
package io.chagchagchag.example.foobar.spring_cloud_stream.config;
// ...
@Slf4j
@Configuration
public class StreamFunctionsConfig {
// ...
@Bean
public Supplier<Flux<String>> livenessCheck(){
return () -> Flux.just("OK");
}
// ...
}
단위테스트) LivenessCheckTest.java
package io.chagchagchag.example.foobar.spring_cloud_stream.config;
// ...
@Import(TestChannelBinderConfiguration.class)
@ActiveProfiles("stream-function")
@SpringBootTest(classes = SpringCloudStreamApplication.class)
public class LivenessCheckTest {
StreamFunctionsConfig streamFunctionsConfig = new StreamFunctionsConfig();
@DisplayName("LIVENESS_CHECK")
@Test
public void TEST_LIVENESS_CHECK(){
// given
// when
var livenessCheckFlux = streamFunctionsConfig.livenessCheck().get();
// then
StepVerifier.create(livenessCheckFlux)
.expectNext("OK")
.verifyComplete();
}
}
통합테스트) SpringCloudFunctionTest.java
참고로 InputDestination, OutputDestination 클래스들은
test
패키지 아래에 있는 클래스 들이기에 Application 레벨에서 호출하는 것이 불가능합니다. 보통 통합 테스트를 위해 InputDestination, OutputDestination 기반 코드를 작성하는 편입니다.
package io.chagchagchag.example.foobar.spring_cloud_stream;
// ...
@Import(TestChannelBinderConfiguration.class)
@ActiveProfiles("stream-function")
@SpringBootTest(classes = SpringCloudStreamApplication.class)
public class SpringCloudFunctionTest {
@Autowired
InputDestination inputDestination;
@Autowired
OutputDestination outputDestination;
@Autowired
ConcurrentHashMap counterMap;
@BeforeEach
public void reset(){
counterMap.clear();
}
// ...
// Supplier (livenessCheck)
@DisplayName("LIVENESS_CHECK_STREAM_MESSAGING")
@Test
public void TEST_LIVENESS_CHECK_STREAM_MESSAGING(){
// given
var outputBinding = "livenessCheck-out-0";
var expectedMsg = List.of("OK");
for(var name: expectedMsg){
// when
var received = outputDestination.receive(300, outputBinding);
String outputMessage = new String(received.getPayload());
// then
assertThat(outputMessage.equals(name)).isTrue();
}
}
// ...
}
Function 구현, 테스트
설정) StreamFunctionsConfig.java
package io.chagchagchag.example.foobar.spring_cloud_stream.config;
// ...
@Slf4j
@Configuration
public class StreamFunctionsConfig {
// ...
@Bean
public Function<Flux<String>, Flux<BigDecimal>> stringToBigDecimal(){
return fluxString -> fluxString.handle((str, sink) -> {
try{
Number parse = NumberFormat.getNumberInstance(Locale.US).parse(str);
sink.next(new BigDecimal(parse.toString()));
}
catch (ParseException e){
e.printStackTrace();
sink.error(new IllegalStateException("Number Format is not supported."));
}
});
}
}
단위테스트) StringToBigDecimalTest
package io.chagchagchag.example.foobar.spring_cloud_stream.config;
// ...
@Import(TestChannelBinderConfiguration.class)
@ActiveProfiles("stream-function")
@SpringBootTest(classes = SpringCloudStreamApplication.class)
public class StringToBigDecimalTest {
StreamFunctionsConfig streamFunctionsConfig = new StreamFunctionsConfig();
@DisplayName("STRING_TO_BIG_DECIMAL")
@Test
public void TEST_STRING_TO_BIG_DECIMAL(){
// given
var strNumbers = Flux.just("28.39");
var expected = BigDecimal.valueOf(28.39);
Predicate<BigDecimal> equals = d -> {
if(d.equals(expected)) return true;
else return false;
};
// when
var bigDeciamlFlux = streamFunctionsConfig.stringToBigDecimal().apply(strNumbers);
// then
StepVerifier.create(bigDeciamlFlux)
.expectNextMatches(equals)
.verifyComplete();
}
}
통합테스트) SpringCloudFunctionTest.java
참고로 InputDestination, OutputDestination 클래스들은
test
패키지 아래에 있는 클래스 들이기에 Application 레벨에서 호출하는 것이 불가능합니다. 보통 통합 테스트를 위해 InputDestination, OutputDestination 기반 코드를 작성하는 편입니다.
package io.chagchagchag.example.foobar.spring_cloud_stream;
// ...
@Import(TestChannelBinderConfiguration.class)
@ActiveProfiles("stream-function")
@SpringBootTest(classes = SpringCloudStreamApplication.class)
public class SpringCloudFunctionTest {
@Autowired
InputDestination inputDestination;
@Autowired
OutputDestination outputDestination;
@Autowired
ConcurrentHashMap counterMap;
// ...
// Function (stringToBigDecimal)
@DisplayName("STRING_TO_BIG_DECIMAL")
@Test
public void TEST_STRING_TO_BIG_DECIMAL() throws ParseException {
// given
var inputBinding = "stringToBigDecimal-in-0";
var outputBinding = "stringToBigDecimal-out-0";
var input = new GenericMessage<>("28.39");
var expected = BigDecimal.valueOf(28.39);
// when
// 먼저 값을 보낸다.
inputDestination.send(input, inputBinding);
// then
// 치리되어 반환하는 값을 받는다.
var received = outputDestination.receive(30, outputBinding);
var receivedStr = new String(received.getPayload());
var receivedDecimal = new BigDecimal(receivedStr);
assertThat(receivedDecimal).isEqualTo(expected);
}
}
StreamBridge
지금까지 위에서 살펴봤던 InputDestination, OutputDestination 은 모두 org.springframework.cloud.stream.binder.test
패키지 아래에 있는 클래스였습니다. InputDestination, OutputDestination 클래스들은 test
패키지 아래에 있는 클래스 들이기에 Application 레벨에서 호출하는 것이 불가능합니다.
만약 Spring Cloud Function 을 애플리케이션 레벨에서 호출해서 사용해야 할 경우에는 StreamBridge
를 사용합니다.
이번 예제는 새로 Spring Cloud Function 을 작성하고 REST API 에 요청이 왔을 때 실제로 Spring Cloud Function 이 호출되고 recevie 하는 측에서는 올바른 결과를 받는지를 체크하는 기능을 테스트해봅니다. 결과를 리턴받아야하므로 Function<T,R>
타입의 Spring Cloud Function 을 작성합니다.
application-stream-function.yaml
이 파일은 src/test/resources
, srs/main/resources
에 모두 추가해줍니다. 조금 전의 예제와 달라진 점은 ;toLengthList
가 추가되었다는 점 입니다.
spring:
cloud:
function:
definition: increment;livenessCheck;stringToBigDecimal;toLengthList
StreamFunctionsConfig.java
StreamFunctionsConfig
에는 아래와 같이 Function<T,R>
타입의 Bean 을 추가해주었습니다. 꼭 Bean 이 아니어도 @Component 로 추가해주어도 됩니다.
package io.chagchagchag.example.foobar.spring_cloud_stream.config;
// ...
@Slf4j
@Configuration
public class StreamFunctionsConfig {
// ...
@Bean
public Function<Flux<String>, Flux<Integer>> toLengthList(){
return strFlux -> strFlux.map(String::length);
}
}
NumbersController.java
NumbersController 는 아래와 같이 추가해줬습니다. 특정 문자열을 받으면 이 문자열에 대해 streamBridge
를 이용해서 toLengthList 함수를 호출하는 기능입니다. input Binding 과 입력값을 연결해준 모습이 보입니다.
package io.chagchagchag.example.foobar.spring_cloud_stream;
// ...
@RequiredArgsConstructor
@RequestMapping("/numbers")
@RestController
public class NumbersController {
private final StreamBridge streamBridge;
@GetMapping("/to-list")
public void toList(@RequestParam("word") String word){
streamBridge.send("toLengthList-in-0", word);
}
}
단위테스트) ToLengthListTest.java
실제로 기능의 내부는 정상적으로 동작하는지 단위테스트를 작성했습니다.
package io.chagchagchag.example.foobar.spring_cloud_stream.config;
import io.chagchagchag.example.foobar.spring_cloud_stream.SpringCloudStreamApplication;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.ActiveProfiles;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
@Import(TestChannelBinderConfiguration.class)
@ActiveProfiles("stream-function")
@SpringBootTest(classes = SpringCloudStreamApplication.class)
public class ToLengthListTest {
StreamFunctionsConfig streamFunctionsConfig = new StreamFunctionsConfig();
@DisplayName("TO_LENGTH_LIST")
@Test
public void TEST_TO_LENGTH_LIST(){
// given
var strWords = Flux.just("hello", "world", "java");
// when
var resultFlux = streamFunctionsConfig.toLengthList().apply(strWords);
// then
StepVerifier.create(resultFlux)
.expectNext("hello".length())
.expectNext("world".length())
.expectNext("java".length())
.verifyComplete();
}
}
통합테스트) NumbersControllerTest.java
이번에는 REST API 로 locahost:8080/numbers/to-list?word=hello
호출 시에 내부적으로 Stream Function 이 호출되어서 receive 시에 돌려받는 값이 실제로 hello
라는 단어의 길이를 리턴하는지를 확인하는 테스트 코드입니다.
package io.chagchagchag.example.foobar.spring_cloud_stream;
import java.util.concurrent.ConcurrentHashMap;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.web.reactive.server.WebTestClient;
@AutoConfigureWebTestClient
@Import(TestChannelBinderConfiguration.class)
@ActiveProfiles("stream-function")
@SpringBootTest(classes = SpringCloudStreamApplication.class)
public class NumbersControllerTest {
@Autowired
InputDestination inputDestination;
@Autowired
OutputDestination outputDestination;
@Autowired
ConcurrentHashMap counterMap;
@Autowired
WebTestClient webTestClient;
@DisplayName("TO_LIST_FUNCTION")
@Test
public void TEST_TO_LIST_FUNCTION(){
// given
var expected = "hello".length();
var outputBinding = "toLengthList-out-0";
// when
webTestClient.get()
.uri("/numbers/to-list?word="+"hello")
.exchange()
.expectStatus().isOk();
// then
var result = outputDestination.receive(30, outputBinding);
String resultMessage = new String(result.getPayload());
Assertions.assertThat(expected).isEqualTo(Integer.parseInt(resultMessage));
}
}