Spring Webflux 에서 Backpressure 핸들링
우선 결론부터 이야기하면, spring webflux 에서는 Backpressure 를 지원하지만, 효율적으로 제공하지는 않습니다. TCP 요청/응답을 처리하기 위한 Backpressure 로직이기에 비즈니스로직의 요구사항에 따라서 커스텀한 backpressure 처리 또는 I/O 처리가 중요한 경우 별도의 backpressure 처리로직을 작성해야 합니다.
이글의 하단부의 예제에서 나오는 BaseSubscriber (opens in a new tab) 와 같은 Subscriber 구현체들을 잘 활용하면 목적에 따라 Subscriber 를 구현해서 backpressure 를 실무에서도 관리할 수 있습니다.
Subscriber 와 Publiser 를 처음부터 모두 구현하는 예제였던 /reactive-programming/publisher-subscriber-subscription-backpressure/ 글의 예제와 이 글의 하단부의 BaseSubscriber 예제를 잘 활용해서 Spring Webflux 내에서 컴포넌트 화 한다면 특정 부하가 높은 IO 처리를 이벤트 지향적으로 관리할 수 있게 될 것 같습니다.
참고자료
- Backpressure Mechanism in Spring Webflux (opens in a new tab)
- Spring Webflux 의 배압 메커니즘 (opens in a new tab)
오늘 문서는 Backpressure Mechanism in Spring Webflux (opens in a new tab) 을 요약하면서 Spring Webflux 의 배압 메커니즘 (opens in a new tab) 의 내용을 참고해서 요약한 내용입니다.
backpressure(배압) 이란?
일반적인 backpressure 의 뜻 은 소프트웨어 시스템에서는 트래픽 통신에 부하를 주는 기능을 의미합니다.
그런데 이 단어는 정반대의 의미로도 쓰입니다. backpressure 는 트래픽 통신에 부하를 주는 기능을 처리하는 메커니즘이라는 의미로도 사용합니다. 조금 더 자세히 설명하면, "시스템이 다운 스트림을 제어하고 처리하는 데에 취하는 보호조치" 를 backpressure 메커니즘으로도 부릅니다.
오늘 이 문서에서 정리하는 내용은 "다운스트림의 부하를 제어하고 처리하는 데에 취하는 보호조치라는 의미에서의 backpressure" 의 개념입니다.
그리고 backpressure 라는 용어와는 혼선을 피하기 위해 backpressure 를 제어한다, backpressure 를 관리한다라는 말을 따로 쓰고, backpressure 단어 자체는 그 자체로 시스템 부하 상황에 대해서만 사용하겠습니다.
e.g. backpressure 가 발생하는상황
- Publisher, Consumer, GUI 가 있는 시스템이 있습니다.
- Publisher 는 10000/s 의 이벤트를 Consumer 로 보냅니다.
- Consumer 는 이것을 처리해야 하고 결과를 GUI 에 보냅니다.
- GUI 는 이 결과를 표시합니다.
- 소비자는 7500/s 의 이벤트만 처리 가능합니다.
이미지 출처 : https://www.baeldung.com/spring-webflux-backpressure (opens in a new tab)
이 속도로는 Consumer 가 backpressure 를 처리할 수 없습니다. 결국 시스템은 붕괴되고 사용자는 결과를 볼 수 없게 됩니다.
일반적인 backpressure 전략
일반적인 backpressure 를 처리할 때에는 아래와 같은 방식으로 제어를 하게 됩니다.
- 첫번째 옵션 : 전송된 데이터 스트림 제어
- 이 방식에서는 Publisher 가 이벤트 속도를 늦춰야 합니다. 이렇게 Publisher 에서 속도를 늦추면 Consumer는 과부하(overload)가 발생하지 않습니다. 이 방식은 모든 경우에 사용할 수 있는 방식이 아니기에 사용가능한 다른 옵션을 찾아야 할 수 있습니다.
- 두번째 옵션 : 여분의 데이터를 버퍼링
- 이 방식에서는 소비자는 나머지 이벤트를 처리할 수 있을 때 까지 임시로 데이터를 저장합니다. 즉, 버퍼링을 하는 방식입니다. 이 방식의 단점은 메모리의 충돌을 일으키는 버퍼 바인딩을 해제하는 것입니다.
- 세번째 옵션 : 추적하지 못하는 추가 이벤트 삭제
- 너무 오래된 이벤트일경우 삭제하는 방식. 이상적인 방식은 아닙니다. 이 기술을 사용하면 시스템이 붕괴되지는 않습니다.
이미지 출처 : https://www.baeldung.com/spring-webflux-backpressure (opens in a new tab)
이벤트 스트림 기반의 배압 제어
이 방식은 Publisher 가 보낸 이벤트를 제어하는 데에 중점을 두는 방식입니다.
- request : Subscriber 가 요청할 경우에만 새로운 이벤트를 전송
- emitter 요청시 엘리먼트 들을 수집하는 Pull 전략입니다.
- limit : Client 측에서 수신할 이벤트 수를 제한
- 위에서 정리한 제한된 푸시 전략으로 작동하며, Publisher 는 한번에 클라이언트에게 최대 항목 수를 보낼 수 있습니다.
- cancel : Consumer 가 더 아싱 이벤트를 처리할 수 없을 때 데이터 스트리밍을 취소합니다.
- Consumer 는 언제든지 전송을 중단하고 다시 스트림을 구독할 수 있습니다.
이미지 출처 : https://www.baeldung.com/spring-webflux-backpressure (opens in a new tab)
Spring Webflux 의 backpressure 처리
Spring Webflux 에서는 Project Reactor (opens in a new tab) 가 배압의 처리를 담당하고 있습니다. 내부적으로는 Flux 의 개념을(On Backpressure and Ways to Reshape Requests) (opens in a new tab) 사용해서 emitter 에서 생성된 이벤트를 제어하는 역할을 수행합니다.
webflux 는 TCP 흐름제어를 이용해서 Backpressure 를 바이트 단위로 조절합니다. 하지만 소비자가 받을 수 있는 논리적인 요소까지는 처리하지 않습니다.
내부 동작은 아래와 같이 동작합니다.
- webflux 프레임워크는 TCP 를 통해 이벤트 전송/수신을 위해 이벤트를 바이트로 변환하는 역할을 합니다.
- 다음 논리적 요소를 요청하기 전에 소비자가 시작하고 장기 실행 작업이 발생할 수 있습니다.
- 수신자가 이벤트를 처리하는 동안 WebFlux는 새로운 이벤트에 대한 요구가 없기 때문에 확인 없이 바이트를 큐에 넣습니다.
- TCP 프로토콜의 특성으로 인해 새 이벤트가 있으면 게시자가 계속해서 네트워크로 보냅니다.
이미지 출처 : https://www.baeldung.com/spring-webflux-backpressure (opens in a new tab)
그림을 자세히 보면 Consumer 는 일정하게 TCP 계층에 request(1), onNext() 를 하고 있지만, TCP 계층에서 Publisher 에 요청할 때에는 request 시에는 request(10)을 하고 onNext 시에는 onNext(!) 을 합니다.
webflux 는 단순히 TCP/IP 를 효율적으로 수행하는 계층이고, 사용자 영역에서의 논리적인 데이터(비즈니스로직)을 효율적으로 처리하기 위한 것은 사용자 레벨에서 직접 작성해야 한다는 사실을 알 수 있습니다.
webflux 에서 사용자 정의 backpressure 처리 로직 구현
위의 이벤트 스트림 기반의 배압 제어
에서 정리한 아래의 세가지 요소들을 정의하는 세가지 예제를 살펴봅니다.
- request(n)
- limitRate()
- cancel()
request(n)
첫번째 원칙인 request : Subscriber 가 요청할 경우에만 새로운 이벤트를 전송
에 대한 예제를 살펴보면 아래와 같습니다.
package io.chagchagchag.example.foobar.reactive_streams.webflux_backpressure;
import java.util.concurrent.Flow.Subscription;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
public class WebFluxBackpressureTests {
@DisplayName("REQUEST_CHUNK")
@Test
public void TEST_REQUEST_CHUNK(){
// given
Flux<Integer> request = Flux.range(1,50);
// when
request
.subscribe(
System.out::println,
throwable -> throwable.printStackTrace(),
() -> System.out.println("50개 요청 모두 완료"),
subscription -> {
for(int i=0; i<5; i++){
System.out.println(">>> request 10 element ");
subscription.request(10);
}
}
);
// then
StepVerifier.create(request)
.expectSubscription()
.thenRequest(10)
.expectNext(1,2,3,4,5,6,7,8,9,10)
.thenRequest(10)
.expectNext(11,12,13,14,15,16,17,18,19,20)
.thenRequest(10)
.expectNext(21,22,23,24,25,26,27,28,29,30)
.thenRequest(10)
.expectNext(31,32,33,34,35,36,37,38,39,40)
.thenRequest(10)
.expectNext(41,42,43,44,45,46,47,48,49,50)
.verifyComplete();
}
}
given
영역에서는 Flux 를 통해 데이터를 결정합니다. when
에서는 Flux 를 subscribe 하는데, 일반적인 subscribe 와는 다르게 한번 요청시마다 10개씩 요청하고 있습니다. Subscription (opens in a new tab), Subscription.java (opens in a new tab) 의 request 메서드는 backpressure 를 처리하기 위해 제공되는 메서드 입니다.
limitRate(limitSize)
두번째 원칙인 limit : Client 측에서 수신할 이벤트 수를 제한
에 대한 예제를 살펴보면 아래와 같습니다.
package io.chagchagchag.example.foobar.reactive_streams.webflux_backpressure;
import java.util.concurrent.Flow.Subscription;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
public class WebFluxBackpressureTests {
// ...
@DisplayName("LIMIT_RATE")
@Test
public void TEST_LIMIT_RATE(){
// given
Flux<Integer> flux = Flux.range(1, 25); // (1)
// when
flux.limitRate(10);
flux.subscribe(
System.out::println,
err -> err.printStackTrace(),
() -> System.out.println("Finished"),
subscription -> subscription.request(15)
);
// then
StepVerifier.create(flux)
.expectSubscription()
.thenRequest(15)
.expectNext(1,2,3,4,5,6,7,8,9,10) // limit 10 수행
.expectNext(11,12,13,14,15) // 15 중 남은 5개 수행
.thenRequest(10) // limit 10 수행
.expectNext(16,17,18,19,20,21,22,23,24,25)
.verifyComplete();
}
}
위의 코드를 보면 Publisher 인 Flux 를 subscribe 시에 Subscription 객체를 이용해서 backpressure 처리 메서드인 request() 메서드를 수행하고 있는데 15개씩의 element 를 요청하고 있습니다. 그런데 (1) 에서 Flux 를 생성하면서 이미 Publisher 의 유량을 limitRate (10) 을 통해 limit 을 걸어두었습니다.
따라서 request(n)을 통해 backpressure 시에 n 만큼의 데이터를 읽어오도록 요청을 하더라도 Publisher 에 걸려있는 limit 만큼 잘라서 응답받습니다. 그리고 잘린데이터는 다음 시퀀스에 받아옵니다.
출력결과
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Process finished with exit code 0
cancel
세번째 원칙인 cancel : Consumer 가 더 아싱 이벤트를 처리할 수 없을 때 데이터 스트리밍을 취소합니다.
에 대한 예제를 살펴보면 아래와 같습니다.
package io.chagchagchag.example.foobar.reactive_streams.webflux_backpressure;
import java.util.concurrent.Flow.Subscription;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
public class WebFluxBackpressureTests {
// ...
@DisplayName("CANCEL")
@Test
public void TEST_CANCEL(){
// given
Flux<Integer> flux = Flux.range(1,10).log();
// when
flux.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnNext(Integer value) {
request(3);
System.out.println(value);
cancel();
}
});
// then
StepVerifier.create(flux)
.expectNext(1,2,3)
.thenCancel()
.verify();
}
}
3개의 element 를 요청한 후에 cancel() 을 통해서 구독을 취소하고 있습니다. 그리고 정상적으로 취소가 되어서 3개의 element 까지만 데이터를 구독했음을 알 수 있습니다.
출력결과
16:19:38.229 [main] INFO reactor.Flux.Range.1 -- | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
16:19:38.235 [main] INFO reactor.Flux.Range.1 -- | request(unbounded)
16:19:38.236 [main] INFO reactor.Flux.Range.1 -- | onNext(1)
16:19:38.236 [main] INFO reactor.Flux.Range.1 -- | request(3)
1
16:19:38.236 [main] INFO reactor.Flux.Range.1 -- | cancel()
16:19:38.259 [main] INFO reactor.Flux.Range.1 -- | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
16:19:38.261 [main] INFO reactor.Flux.Range.1 -- | request(unbounded)
16:19:38.261 [main] INFO reactor.Flux.Range.1 -- | onNext(1)
16:19:38.261 [main] INFO reactor.Flux.Range.1 -- | onNext(2)
16:19:38.261 [main] INFO reactor.Flux.Range.1 -- | onNext(3)
16:19:38.261 [main] INFO reactor.Flux.Range.1 -- | cancel()
Process finished with exit code 0