Spring Webflux 에서 Backpressure 핸들링

Spring Webflux 에서 Backpressure 핸들링

우선 결론부터 이야기하면, spring webflux 에서는 Backpressure 를 지원하지만, 효율적으로 제공하지는 않습니다. TCP 요청/응답을 처리하기 위한 Backpressure 로직이기에 비즈니스로직의 요구사항에 따라서 커스텀한 backpressure 처리 또는 I/O 처리가 중요한 경우 별도의 backpressure 처리로직을 작성해야 합니다.

이글의 하단부의 예제에서 나오는 BaseSubscriber (opens in a new tab) 와 같은 Subscriber 구현체들을 잘 활용하면 목적에 따라 Subscriber 를 구현해서 backpressure 를 실무에서도 관리할 수 있습니다.

Subscriber 와 Publiser 를 처음부터 모두 구현하는 예제였던 /reactive-programming/publisher-subscriber-subscription-backpressure/ 글의 예제와 이 글의 하단부의 BaseSubscriber 예제를 잘 활용해서 Spring Webflux 내에서 컴포넌트 화 한다면 특정 부하가 높은 IO 처리를 이벤트 지향적으로 관리할 수 있게 될 것 같습니다.


참고자료

오늘 문서는 Backpressure Mechanism in Spring Webflux (opens in a new tab) 을 요약하면서 Spring Webflux 의 배압 메커니즘 (opens in a new tab) 의 내용을 참고해서 요약한 내용입니다.


backpressure(배압) 이란?

일반적인 backpressure 의 뜻 은 소프트웨어 시스템에서는 트래픽 통신에 부하를 주는 기능을 의미합니다.

그런데 이 단어는 정반대의 의미로도 쓰입니다. backpressure 는 트래픽 통신에 부하를 주는 기능을 처리하는 메커니즘이라는 의미로도 사용합니다. 조금 더 자세히 설명하면, "시스템이 다운 스트림을 제어하고 처리하는 데에 취하는 보호조치" 를 backpressure 메커니즘으로도 부릅니다.

오늘 이 문서에서 정리하는 내용은 "다운스트림의 부하를 제어하고 처리하는 데에 취하는 보호조치라는 의미에서의 backpressure" 의 개념입니다.

그리고 backpressure 라는 용어와는 혼선을 피하기 위해 backpressure 를 제어한다, backpressure 를 관리한다라는 말을 따로 쓰고, backpressure 단어 자체는 그 자체로 시스템 부하 상황에 대해서만 사용하겠습니다.


e.g. backpressure 가 발생하는상황

  • Publisher, Consumer, GUI 가 있는 시스템이 있습니다.
  • Publisher 는 10000/s 의 이벤트를 Consumer 로 보냅니다.
  • Consumer 는 이것을 처리해야 하고 결과를 GUI 에 보냅니다.
  • GUI 는 이 결과를 표시합니다.
  • 소비자는 7500/s 의 이벤트만 처리 가능합니다.

이미지 출처 : https://www.baeldung.com/spring-webflux-backpressure (opens in a new tab)


이 속도로는 Consumer 가 backpressure 를 처리할 수 없습니다. 결국 시스템은 붕괴되고 사용자는 결과를 볼 수 없게 됩니다.


일반적인 backpressure 전략

일반적인 backpressure 를 처리할 때에는 아래와 같은 방식으로 제어를 하게 됩니다.

  • 첫번째 옵션 : 전송된 데이터 스트림 제어
    • 이 방식에서는 Publisher 가 이벤트 속도를 늦춰야 합니다. 이렇게 Publisher 에서 속도를 늦추면 Consumer는 과부하(overload)가 발생하지 않습니다. 이 방식은 모든 경우에 사용할 수 있는 방식이 아니기에 사용가능한 다른 옵션을 찾아야 할 수 있습니다.
  • 두번째 옵션 : 여분의 데이터를 버퍼링
    • 이 방식에서는 소비자는 나머지 이벤트를 처리할 수 있을 때 까지 임시로 데이터를 저장합니다. 즉, 버퍼링을 하는 방식입니다. 이 방식의 단점은 메모리의 충돌을 일으키는 버퍼 바인딩을 해제하는 것입니다.
  • 세번째 옵션 : 추적하지 못하는 추가 이벤트 삭제
    • 너무 오래된 이벤트일경우 삭제하는 방식. 이상적인 방식은 아닙니다. 이 기술을 사용하면 시스템이 붕괴되지는 않습니다.

이미지 출처 : https://www.baeldung.com/spring-webflux-backpressure (opens in a new tab)


이벤트 스트림 기반의 배압 제어

이 방식은 Publisher 가 보낸 이벤트를 제어하는 데에 중점을 두는 방식입니다.

  • request : Subscriber 가 요청할 경우에만 새로운 이벤트를 전송
    • emitter 요청시 엘리먼트 들을 수집하는 Pull 전략입니다.
  • limit : Client 측에서 수신할 이벤트 수를 제한
    • 위에서 정리한 제한된 푸시 전략으로 작동하며, Publisher 는 한번에 클라이언트에게 최대 항목 수를 보낼 수 있습니다.
  • cancel : Consumer 가 더 아싱 이벤트를 처리할 수 없을 때 데이터 스트리밍을 취소합니다.
    • Consumer 는 언제든지 전송을 중단하고 다시 스트림을 구독할 수 있습니다.

이미지 출처 : https://www.baeldung.com/spring-webflux-backpressure (opens in a new tab)


Spring Webflux 의 backpressure 처리

Spring Webflux 에서는 Project Reactor (opens in a new tab) 가 배압의 처리를 담당하고 있습니다. 내부적으로는 Flux 의 개념을(On Backpressure and Ways to Reshape Requests) (opens in a new tab) 사용해서 emitter 에서 생성된 이벤트를 제어하는 역할을 수행합니다.

webflux 는 TCP 흐름제어를 이용해서 Backpressure 를 바이트 단위로 조절합니다. 하지만 소비자가 받을 수 있는 논리적인 요소까지는 처리하지 않습니다.

내부 동작은 아래와 같이 동작합니다.

  • webflux 프레임워크는 TCP 를 통해 이벤트 전송/수신을 위해 이벤트를 바이트로 변환하는 역할을 합니다.
  • 다음 논리적 요소를 요청하기 전에 소비자가 시작하고 장기 실행 작업이 발생할 수 있습니다.
  • 수신자가 이벤트를 처리하는 동안 WebFlux는 새로운 이벤트에 대한 요구가 없기 때문에 확인 없이 바이트를 큐에 넣습니다.
  • TCP 프로토콜의 특성으로 인해 새 이벤트가 있으면 게시자가 계속해서 네트워크로 보냅니다.

이미지 출처 : https://www.baeldung.com/spring-webflux-backpressure (opens in a new tab)

그림을 자세히 보면 Consumer 는 일정하게 TCP 계층에 request(1), onNext() 를 하고 있지만, TCP 계층에서 Publisher 에 요청할 때에는 request 시에는 request(10)을 하고 onNext 시에는 onNext(!) 을 합니다.

webflux 는 단순히 TCP/IP 를 효율적으로 수행하는 계층이고, 사용자 영역에서의 논리적인 데이터(비즈니스로직)을 효율적으로 처리하기 위한 것은 사용자 레벨에서 직접 작성해야 한다는 사실을 알 수 있습니다.


webflux 에서 사용자 정의 backpressure 처리 로직 구현

위의 이벤트 스트림 기반의 배압 제어 에서 정리한 아래의 세가지 요소들을 정의하는 세가지 예제를 살펴봅니다.

  • request(n)
  • limitRate()
  • cancel()

request(n)

첫번째 원칙인 request : Subscriber 가 요청할 경우에만 새로운 이벤트를 전송 에 대한 예제를 살펴보면 아래와 같습니다.

package io.chagchagchag.example.foobar.reactive_streams.webflux_backpressure;
 
import java.util.concurrent.Flow.Subscription;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
 
public class WebFluxBackpressureTests {
  @DisplayName("REQUEST_CHUNK")
  @Test
  public void TEST_REQUEST_CHUNK(){
    // given
    Flux<Integer> request = Flux.range(1,50);
 
    // when
    request
        .subscribe(
            System.out::println,
            throwable -> throwable.printStackTrace(),
            () -> System.out.println("50개 요청 모두 완료"),
            subscription -> {
              for(int i=0; i<5; i++){
                System.out.println(">>> request 10 element ");
                subscription.request(10);
              }
            }
        );
 
    // then
    StepVerifier.create(request)
        .expectSubscription()
        .thenRequest(10)
        .expectNext(1,2,3,4,5,6,7,8,9,10)
        .thenRequest(10)
        .expectNext(11,12,13,14,15,16,17,18,19,20)
        .thenRequest(10)
        .expectNext(21,22,23,24,25,26,27,28,29,30)
        .thenRequest(10)
        .expectNext(31,32,33,34,35,36,37,38,39,40)
        .thenRequest(10)
        .expectNext(41,42,43,44,45,46,47,48,49,50)
        .verifyComplete();
  }
  
}

given 영역에서는 Flux 를 통해 데이터를 결정합니다. when 에서는 Flux 를 subscribe 하는데, 일반적인 subscribe 와는 다르게 한번 요청시마다 10개씩 요청하고 있습니다. Subscription (opens in a new tab), Subscription.java (opens in a new tab) 의 request 메서드는 backpressure 를 처리하기 위해 제공되는 메서드 입니다.


limitRate(limitSize)

두번째 원칙인 limit : Client 측에서 수신할 이벤트 수를 제한 에 대한 예제를 살펴보면 아래와 같습니다.

package io.chagchagchag.example.foobar.reactive_streams.webflux_backpressure;
 
import java.util.concurrent.Flow.Subscription;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
 
public class WebFluxBackpressureTests {
  // ...
  @DisplayName("LIMIT_RATE")
  @Test
  public void TEST_LIMIT_RATE(){
    // given
    Flux<Integer> flux = Flux.range(1, 25); // (1)
 
    // when
    flux.limitRate(10);
    flux.subscribe(
        System.out::println,
        err -> err.printStackTrace(),
        () -> System.out.println("Finished"),
        subscription -> subscription.request(15)
    );
 
    // then
    StepVerifier.create(flux)
        .expectSubscription()
        .thenRequest(15)
        .expectNext(1,2,3,4,5,6,7,8,9,10) // limit 10 수행
        .expectNext(11,12,13,14,15) // 15 중 남은 5개 수행
        .thenRequest(10) // limit 10 수행
        .expectNext(16,17,18,19,20,21,22,23,24,25)
        .verifyComplete();
  }
}

위의 코드를 보면 Publisher 인 Flux 를 subscribe 시에 Subscription 객체를 이용해서 backpressure 처리 메서드인 request() 메서드를 수행하고 있는데 15개씩의 element 를 요청하고 있습니다. 그런데 (1) 에서 Flux 를 생성하면서 이미 Publisher 의 유량을 limitRate (10) 을 통해 limit 을 걸어두었습니다.

따라서 request(n)을 통해 backpressure 시에 n 만큼의 데이터를 읽어오도록 요청을 하더라도 Publisher 에 걸려있는 limit 만큼 잘라서 응답받습니다. 그리고 잘린데이터는 다음 시퀀스에 받아옵니다.

출력결과

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

Process finished with exit code 0

cancel

세번째 원칙인 cancel : Consumer 가 더 아싱 이벤트를 처리할 수 없을 때 데이터 스트리밍을 취소합니다. 에 대한 예제를 살펴보면 아래와 같습니다.

package io.chagchagchag.example.foobar.reactive_streams.webflux_backpressure;
 
import java.util.concurrent.Flow.Subscription;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
 
public class WebFluxBackpressureTests {
  // ...
  @DisplayName("CANCEL")
  @Test
  public void TEST_CANCEL(){
    // given
    Flux<Integer> flux = Flux.range(1,10).log();
 
    // when
    flux.subscribe(new BaseSubscriber<Integer>() {
      @Override
      protected void hookOnNext(Integer value) {
        request(3);
        System.out.println(value);
        cancel();
      }
    });
 
    // then
    StepVerifier.create(flux)
        .expectNext(1,2,3)
        .thenCancel()
        .verify();
  }
}

3개의 element 를 요청한 후에 cancel() 을 통해서 구독을 취소하고 있습니다. 그리고 정상적으로 취소가 되어서 3개의 element 까지만 데이터를 구독했음을 알 수 있습니다.


출력결과

16:19:38.229 [main] INFO reactor.Flux.Range.1 -- | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
16:19:38.235 [main] INFO reactor.Flux.Range.1 -- | request(unbounded)
16:19:38.236 [main] INFO reactor.Flux.Range.1 -- | onNext(1)
16:19:38.236 [main] INFO reactor.Flux.Range.1 -- | request(3)
1
16:19:38.236 [main] INFO reactor.Flux.Range.1 -- | cancel()
16:19:38.259 [main] INFO reactor.Flux.Range.1 -- | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
16:19:38.261 [main] INFO reactor.Flux.Range.1 -- | request(unbounded)
16:19:38.261 [main] INFO reactor.Flux.Range.1 -- | onNext(1)
16:19:38.261 [main] INFO reactor.Flux.Range.1 -- | onNext(2)
16:19:38.261 [main] INFO reactor.Flux.Range.1 -- | onNext(3)
16:19:38.261 [main] INFO reactor.Flux.Range.1 -- | cancel()

Process finished with exit code 0