Spring Cloud Stream

Spring Cloud Stream

참고


Spring Cloud Stream 이란?

Spring Cloud Stream 은 추상화된 binder 를 제공합니다. 그리고 애플리케이션은 binder 를 통해서 input, output 을 주고 받습니다. kafka 를 사용할 경우 kafka 로부터의 메시지를 Consume 할 때에는 kafka-binder 를 이용해서 input을 통해서 접근 가능하고, 메시지를 Produce 할 경우에는 kafka-binder 의 output 기능을 통해서 데이터를 접근 가능합니다.


의존성

// ...
 
repositories {
  mavenCentral()
}
 
extra["springCloudVersion"] = "2023.0.0"
 
dependencies {
  implementation("org.springframework.cloud:spring-cloud-stream")
  testImplementation("org.springframework.boot:spring-boot-starter-test")
  testImplementation("org.springframework.cloud:spring-cloud-stream-test-binder")
}
 
dependencyManagement {
  imports {
    mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
  }
}
 
// ...
 

spring.cloud.function 등록

application-stream-function.yaml

spring:
  cloud:
    function:
      definition: increment;livenessCheck;stringToBigDecimal

input, output bind 컨벤션

위에서 살펴본 spring.cloud.function 에 등록된 함수에 입력인자명, 출력값은 아래의 컨벤션에 따라 binding 이 생성됩니다.

입력(input) 인자값

  • {cloud function bean 이름}-in-{argument index}
  • e.g. consumeMessage-in-0
    • spring.cloud.function 에 등록한 consumeMessage 함수의 0번째 입력인자 를 의미합니다.

출력(output) 컨벤션

  • {cloud function bean 이름}-out-{return index}
  • e.g. supplyReady-out-0
    • spring.cloud.function 에 등록한 supplyReady 함수의 0번째 출력(return)값을 의미합니다.

예를 들면 위와 같은 binding 컨벤션으로 생성한 입력, 출력 명세에 따라 아래와 같은 방식으로 데이터를 주고 받을 수 있습니다.

package io.chagchagchag.example.foobar.spring_cloud_stream;
 
// ...
 
@Import(TestChannelBinderConfiguration.class)
@ActiveProfiles("stream-function")
@SpringBootTest(classes = SpringCloudStreamApplication.class)
public class SpringCloudFunctionTest {
 
  @Autowired
  InputDestination inputDestination;
 
  @Autowired
  OutputDestination outputDestination;
 
  @Autowired
  ConcurrentHashMap counterMap;
 
  @BeforeEach
  public void reset(){
    counterMap.clear();
  }
 
  @DisplayName("COUNTER_STREAM_MESSAGING")
  @Test
  public void TEST_COUNTER_STREAM_MESSAGING(){
    // given
    var ticker = "MSFT";
    var input = new GenericMessage<>(ticker);
    var inputBinding = "increment-in-0";
 
    // when
    inputDestination.send(input, inputBinding);
 
    // then
    assertThat(counterMap.getOrDefault(ticker, 0)).isEqualTo(1);
  }
  
  // ...
 
}

간단한 블랙박스 테스트입니다. Spring Cloud Function 의 내용이 무엇인지 전혀 모르는 상태로 메시지를 InputDestination 을 이용해서 Message를 보낼 경우 정상적으로 상태가 반영되었는지를 체크합니다.

Spring Cloud Function 으로 등록한 increment Bean 의 Consumer 의 입력값 인자값에 해당하는 increment-in-0 을 inputBinding 으로 지정해줬기에 해당 Consumer 를 잘 찾아서 원하는 상태값으로 변경시켜준 것을 확인 가능합니다.


InputDestination, OutputDestination

참고로 InputDestination, OutputDestination 클래스들은 test 패키지 아래에 있는 클래스 들이기에 Application 레벨에서 호출하는 것이 불가능합니다. 보통 통합 테스트를 위해 InputDestination, OutputDestination 기반 코드를 작성하는 편입니다.

InputDestination

InputDestination 클래스는 아래와 같이 정의되어 있습니다.

package org.springframework.cloud.stream.binder.test;
 
import org.springframework.messaging.Message;
 
public class InputDestination extends AbstractDestination {
  public InputDestination() {
  }
 
  public void send(Message<?> message) {
    this.getChannel(0).send(message);
  }
 
  public void send(Message<?> message, String destinationName) {
    this.getChannelByName(destinationName).send(message);
  }
}

InputDestination 에서 사용되는 Message (opens in a new tab) 는 interface 이고, InputDestination 에 주로 바인딩하는 구현체는 GenericMessage (opens in a new tab) 인데, GenericMessage 의 정의는 아래와 같습니다.

package org.springframework.messaging.support;
 
// ...
public class GenericMessage<T> implements Message<T>, Serializable {
  private static final long serialVersionUID = 4268801052358035098L;
  private final T payload;
  private final MessageHeaders headers;
 
  public GenericMessage(T payload) {
    this(payload, new MessageHeaders((Map)null));
  }
 
  public GenericMessage(T payload, Map<String, Object> headers) {
    this(payload, new MessageHeaders(headers));
  }
 
  public GenericMessage(T payload, MessageHeaders headers) {
    Assert.notNull(payload, "Payload must not be null");
    Assert.notNull(headers, "MessageHeaders must not be null");
    this.payload = payload;
    this.headers = headers;
  }
    
  // ...
}

OutputDestination

package org.springframework.cloud.stream.binder.test;
 
// ...
 
public class OutputDestination extends AbstractDestination {
  private final Log log = LogFactory.getLog(OutputDestination.class);
  private final ConcurrentHashMap<String, BlockingQueue<Message<byte[]>>> messageQueues = new ConcurrentHashMap();
 
  public OutputDestination() {
  }
 
  public Message<byte[]> receive(long timeout, String bindingName) {
    try {
      bindingName = bindingName.endsWith(".destination") ? bindingName : bindingName + ".destination";
      return (Message)this.outputQueue(bindingName).poll(timeout, TimeUnit.MILLISECONDS);
    } catch (InterruptedException var5) {
      Thread.currentThread().interrupt();
      return null;
    }
  }
    
  // ...
 
  public Message<byte[]> receive() {
    return this.receive(0L, 0);
  }
 
  public Message<byte[]> receive(long timeout) {
    return this.receive(timeout, 0);
  }
 
  // ...
}

Consumer, Supplier, Function


Consumer 구현, 테스트

ConcurrentHashMap<String, Integer> 타입의 counterMap 을 Bean 으로 등록하고 이것을 기반으로 여러가지 키가 몇번 조회되었는지 카운트하는 간단한 예제를 살펴보겠습니다.

설정) StreamFunctionsConfig.java

package io.chagchagchag.example.foobar.spring_cloud_stream.config;
// ...
 
@Slf4j
@Configuration
public class StreamFunctionsConfig {
  @Bean
  public ConcurrentHashMap<String, Integer> counterMap(){
    return new ConcurrentHashMap<>();
  }
 
  @Bean
  public Consumer<Flux<String>> increment(ConcurrentHashMap<String, Integer> counterMap){
    return fluxKey -> {
      fluxKey.subscribe(key -> {
        counterMap.computeIfPresent(key, (k, v) ->v+1);
        counterMap.computeIfAbsent(key, v -> 1);
      });
    };
  }
  
  // ...
 
}
 

그리고 위의 설정을 통해서 카운팅이 잘 이뤄지는지 테스트하는 코드는 아래와 같습니다.

단위테스트) IncrementTest.java

package io.chagchagchag.example.foobar.spring_cloud_stream.config;
 
// ...
 
@Import(TestChannelBinderConfiguration.class)
@ActiveProfiles("stream-function")
@SpringBootTest(classes = SpringCloudStreamApplication.class)
public class IncrementTest {
 
  StreamFunctionsConfig streamFunctionsConfig = new StreamFunctionsConfig();
 
  ConcurrentHashMap<String, Integer> counterMap = new ConcurrentHashMap<>();
 
  @DisplayName("INCREMENT_TEST")
  @Test
  public void TEST_INCREMENT_TEST(){
    // given
    var tickersFlux = Flux.just("NVDA","SMCI","MSFT", "NVDA");
 
    // when
    streamFunctionsConfig
        .increment(counterMap)
        .accept(tickersFlux);
 
    // then
    Assertions.assertEquals(counterMap.get("NVDA"), 2);
    Assertions.assertEquals(counterMap.get("SMCI"), 1);
    Assertions.assertEquals(counterMap.get("MSFT"), 1);
  }
 
}

통합테스트) SpringCloudFunctionTest.java

참고로 InputDestination, OutputDestination 클래스들은 test 패키지 아래에 있는 클래스 들이기에 Application 레벨에서 호출하는 것이 불가능합니다. 보통 통합 테스트를 위해 InputDestination, OutputDestination 기반 코드를 작성하는 편입니다.

이번에는 블랙박스 테스트입니다. Spring Cloud Function 의 내용이 무엇인지 전혀 모르는 상태로 메시지를 InputDestination 을 이용해서 Message를 보낼 경우 정상적으로 상태가 반영되었는지를 체크합니다.

Spring Cloud Function 으로 등록한 increment Bean 의 Consumer 의 입력값 인자값에 해당하는 increment-in-0 을 inputBinding 으로 지정해줬기에 해당 Consumer 를 잘 찾아서 원하는 상태값으로 변경시켜준 것을 확인 가능합니다.

package io.chagchagchag.example.foobar.spring_cloud_stream;
 
// ...
 
@Import(TestChannelBinderConfiguration.class)
@ActiveProfiles("stream-function")
@SpringBootTest(classes = SpringCloudStreamApplication.class)
public class SpringCloudFunctionTest {
 
  @Autowired
  InputDestination inputDestination;
 
  @Autowired
  OutputDestination outputDestination;
 
  @Autowired
  ConcurrentHashMap counterMap;
 
  @BeforeEach
  public void reset(){
    counterMap.clear();
  }
 
  @DisplayName("COUNTER_STREAM_MESSAGING")
  @Test
  public void TEST_COUNTER_STREAM_MESSAGING(){
    // given
    var ticker = "MSFT";
    var input = new GenericMessage<>(ticker);
    var inputBinding = "increment-in-0";
 
    // when
    inputDestination.send(input, inputBinding);
 
    // then
    assertThat(counterMap.getOrDefault(ticker, 0)).isEqualTo(1);
  }
  
  // ...
 
}

Supplier 구현, 테스트

설정) StreamFunctionsConfig.java

package io.chagchagchag.example.foobar.spring_cloud_stream.config;
 
// ...
 
@Slf4j
@Configuration
public class StreamFunctionsConfig {
  // ...
  
  @Bean
  public Supplier<Flux<String>> livenessCheck(){
    return () -> Flux.just("OK");
  }
    
  // ...
 
}

단위테스트) LivenessCheckTest.java

package io.chagchagchag.example.foobar.spring_cloud_stream.config;
 
// ...
 
@Import(TestChannelBinderConfiguration.class)
@ActiveProfiles("stream-function")
@SpringBootTest(classes = SpringCloudStreamApplication.class)
public class LivenessCheckTest {
  StreamFunctionsConfig streamFunctionsConfig = new StreamFunctionsConfig();
 
  @DisplayName("LIVENESS_CHECK")
  @Test
  public void TEST_LIVENESS_CHECK(){
    // given
 
    // when
    var livenessCheckFlux = streamFunctionsConfig.livenessCheck().get();
 
    // then
    StepVerifier.create(livenessCheckFlux)
        .expectNext("OK")
        .verifyComplete();
  }
 
}

통합테스트) SpringCloudFunctionTest.java

참고로 InputDestination, OutputDestination 클래스들은 test 패키지 아래에 있는 클래스 들이기에 Application 레벨에서 호출하는 것이 불가능합니다. 보통 통합 테스트를 위해 InputDestination, OutputDestination 기반 코드를 작성하는 편입니다.

package io.chagchagchag.example.foobar.spring_cloud_stream;
// ...
 
@Import(TestChannelBinderConfiguration.class)
@ActiveProfiles("stream-function")
@SpringBootTest(classes = SpringCloudStreamApplication.class)
public class SpringCloudFunctionTest {
 
  @Autowired
  InputDestination inputDestination;
 
  @Autowired
  OutputDestination outputDestination;
 
  @Autowired
  ConcurrentHashMap counterMap;
 
  @BeforeEach
  public void reset(){
    counterMap.clear();
  }
  
  // ...
  
  // Supplier (livenessCheck)
  @DisplayName("LIVENESS_CHECK_STREAM_MESSAGING")
  @Test
  public void TEST_LIVENESS_CHECK_STREAM_MESSAGING(){
    // given
    var outputBinding = "livenessCheck-out-0";
    var expectedMsg = List.of("OK");
 
    for(var name: expectedMsg){
      // when
      var received = outputDestination.receive(300, outputBinding);
      String outputMessage = new String(received.getPayload());
 
      // then
      assertThat(outputMessage.equals(name)).isTrue();
    }
  }
  
  // ...
 
}

Function 구현, 테스트

설정) StreamFunctionsConfig.java

package io.chagchagchag.example.foobar.spring_cloud_stream.config;
// ...
 
@Slf4j
@Configuration
public class StreamFunctionsConfig {
  // ...
  @Bean
  public Function<Flux<String>, Flux<BigDecimal>> stringToBigDecimal(){
    return fluxString -> fluxString.handle((str, sink) -> {
      try{
        Number parse = NumberFormat.getNumberInstance(Locale.US).parse(str);
        sink.next(new BigDecimal(parse.toString()));
      }
      catch (ParseException e){
        e.printStackTrace();
        sink.error(new IllegalStateException("Number Format is not supported."));
      }
    });
  }
 
}

단위테스트) StringToBigDecimalTest

package io.chagchagchag.example.foobar.spring_cloud_stream.config;
// ...
 
@Import(TestChannelBinderConfiguration.class)
@ActiveProfiles("stream-function")
@SpringBootTest(classes = SpringCloudStreamApplication.class)
public class StringToBigDecimalTest {
  StreamFunctionsConfig streamFunctionsConfig = new StreamFunctionsConfig();
 
  @DisplayName("STRING_TO_BIG_DECIMAL")
  @Test
  public void TEST_STRING_TO_BIG_DECIMAL(){
    // given
    var strNumbers = Flux.just("28.39");
    var expected = BigDecimal.valueOf(28.39);
 
    Predicate<BigDecimal> equals = d -> {
      if(d.equals(expected)) return true;
      else return false;
    };
 
    // when
    var bigDeciamlFlux = streamFunctionsConfig.stringToBigDecimal().apply(strNumbers);
 
    // then
    StepVerifier.create(bigDeciamlFlux)
        .expectNextMatches(equals)
        .verifyComplete();
  }
 
}

통합테스트) SpringCloudFunctionTest.java

참고로 InputDestination, OutputDestination 클래스들은 test 패키지 아래에 있는 클래스 들이기에 Application 레벨에서 호출하는 것이 불가능합니다. 보통 통합 테스트를 위해 InputDestination, OutputDestination 기반 코드를 작성하는 편입니다.

package io.chagchagchag.example.foobar.spring_cloud_stream;
// ...
 
@Import(TestChannelBinderConfiguration.class)
@ActiveProfiles("stream-function")
@SpringBootTest(classes = SpringCloudStreamApplication.class)
public class SpringCloudFunctionTest {
 
  @Autowired
  InputDestination inputDestination;
 
  @Autowired
  OutputDestination outputDestination;
 
  @Autowired
  ConcurrentHashMap counterMap;
 
  // ...
  
  // Function (stringToBigDecimal)
  @DisplayName("STRING_TO_BIG_DECIMAL")
  @Test
  public void TEST_STRING_TO_BIG_DECIMAL() throws ParseException {
    // given
    var inputBinding = "stringToBigDecimal-in-0";
    var outputBinding = "stringToBigDecimal-out-0";
    var input = new GenericMessage<>("28.39");
    var expected = BigDecimal.valueOf(28.39);
 
    // when
    // 먼저 값을 보낸다.
    inputDestination.send(input, inputBinding);
 
    // then
    // 치리되어 반환하는 값을 받는다.
    var received = outputDestination.receive(30, outputBinding);
    var receivedStr = new String(received.getPayload());
    var receivedDecimal = new BigDecimal(receivedStr);
    assertThat(receivedDecimal).isEqualTo(expected);
  }
 
}

StreamBridge

지금까지 위에서 살펴봤던 InputDestination, OutputDestination 은 모두 org.springframework.cloud.stream.binder.test 패키지 아래에 있는 클래스였습니다. InputDestination, OutputDestination 클래스들은 test 패키지 아래에 있는 클래스 들이기에 Application 레벨에서 호출하는 것이 불가능합니다.

만약 Spring Cloud Function 을 애플리케이션 레벨에서 호출해서 사용해야 할 경우에는 StreamBridge 를 사용합니다.

이번 예제는 새로 Spring Cloud Function 을 작성하고 REST API 에 요청이 왔을 때 실제로 Spring Cloud Function 이 호출되고 recevie 하는 측에서는 올바른 결과를 받는지를 체크하는 기능을 테스트해봅니다. 결과를 리턴받아야하므로 Function<T,R> 타입의 Spring Cloud Function 을 작성합니다.

application-stream-function.yaml

이 파일은 src/test/resources, srs/main/resources 에 모두 추가해줍니다. 조금 전의 예제와 달라진 점은 ;toLengthList 가 추가되었다는 점 입니다.

spring:
  cloud:
    function:
      definition: increment;livenessCheck;stringToBigDecimal;toLengthList

StreamFunctionsConfig.java

StreamFunctionsConfig 에는 아래와 같이 Function<T,R> 타입의 Bean 을 추가해주었습니다. 꼭 Bean 이 아니어도 @Component 로 추가해주어도 됩니다.

package io.chagchagchag.example.foobar.spring_cloud_stream.config;
 
// ...
 
@Slf4j
@Configuration
public class StreamFunctionsConfig {
  // ...
  @Bean
  public Function<Flux<String>, Flux<Integer>> toLengthList(){
    return strFlux -> strFlux.map(String::length);
  }
}

NumbersController.java

NumbersController 는 아래와 같이 추가해줬습니다. 특정 문자열을 받으면 이 문자열에 대해 streamBridge 를 이용해서 toLengthList 함수를 호출하는 기능입니다. input Binding 과 입력값을 연결해준 모습이 보입니다.

package io.chagchagchag.example.foobar.spring_cloud_stream;
 
// ...
 
@RequiredArgsConstructor
@RequestMapping("/numbers")
@RestController
public class NumbersController {
  private final StreamBridge streamBridge;
 
  @GetMapping("/to-list")
  public void toList(@RequestParam("word") String word){
    streamBridge.send("toLengthList-in-0", word);
  }
}

단위테스트) ToLengthListTest.java

실제로 기능의 내부는 정상적으로 동작하는지 단위테스트를 작성했습니다.

package io.chagchagchag.example.foobar.spring_cloud_stream.config;
 
import io.chagchagchag.example.foobar.spring_cloud_stream.SpringCloudStreamApplication;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.ActiveProfiles;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
 
@Import(TestChannelBinderConfiguration.class)
@ActiveProfiles("stream-function")
@SpringBootTest(classes = SpringCloudStreamApplication.class)
public class ToLengthListTest {
  StreamFunctionsConfig streamFunctionsConfig = new StreamFunctionsConfig();
 
  @DisplayName("TO_LENGTH_LIST")
  @Test
  public void TEST_TO_LENGTH_LIST(){
    // given
    var strWords = Flux.just("hello", "world", "java");
 
    // when
    var resultFlux = streamFunctionsConfig.toLengthList().apply(strWords);
 
    // then
    StepVerifier.create(resultFlux)
        .expectNext("hello".length())
        .expectNext("world".length())
        .expectNext("java".length())
        .verifyComplete();
  }
}

통합테스트) NumbersControllerTest.java

이번에는 REST API 로 locahost:8080/numbers/to-list?word=hello 호출 시에 내부적으로 Stream Function 이 호출되어서 receive 시에 돌려받는 값이 실제로 hello 라는 단어의 길이를 리턴하는지를 확인하는 테스트 코드입니다.

package io.chagchagchag.example.foobar.spring_cloud_stream;
 
import java.util.concurrent.ConcurrentHashMap;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.web.reactive.server.WebTestClient;
 
@AutoConfigureWebTestClient
@Import(TestChannelBinderConfiguration.class)
@ActiveProfiles("stream-function")
@SpringBootTest(classes = SpringCloudStreamApplication.class)
public class NumbersControllerTest {
  @Autowired
  InputDestination inputDestination;
 
  @Autowired
  OutputDestination outputDestination;
 
  @Autowired
  ConcurrentHashMap counterMap;
 
  @Autowired
  WebTestClient webTestClient;
 
  @DisplayName("TO_LIST_FUNCTION")
  @Test
  public void TEST_TO_LIST_FUNCTION(){
    // given
    var expected = "hello".length();
    var outputBinding = "toLengthList-out-0";
 
    // when
    webTestClient.get()
        .uri("/numbers/to-list?word="+"hello")
        .exchange()
        .expectStatus().isOk();
 
    // then
    var result = outputDestination.receive(30, outputBinding);
    String resultMessage = new String(result.getPayload());
    Assertions.assertThat(expected).isEqualTo(Integer.parseInt(resultMessage));
  }
 
}