Reactive Streams 라이브러리들
Reactive Streams 구현체 라이브러리들
Reactive Streams 를 구현한 구현체 라이브러리들은 Project Reactor, RxJava, Mutiny 가 있습니다.
- Project Reactor
- RxJava
- Mutiny
Project Reactor
Pivotal 사에서 개발한 라이브러리입니다. Spring Reactor 내에 포함되어 있습니다. Mono, Flux 라는 이름의 Publisher 클래스가 있습니다.
RxJava
Netflix에서 개발한 프레임워크입니다. 닷넷 프레임워크에서 지원되던 Reactive Extensions를 Java 버전으로 포팅한 프레임워크입니다. Flowable, Observable, Single, Maybe, Completable 라는 이름의 퍼블리셔 클래스가 있습니다.
Mutiny
Hibernate Reactive 에서 비동기 라이브러리로 제공하는 라이브러리입니다. 대표적인 자료형 및 클래스는 Multi, Uni 라고 하는 퍼블리셔 클래스가 있습니다.
Project Reactor
Pivotal 사에서 개발한 라이브러리입니다. Spring Reactor 내에 포함되어 있습니다. Mono, Flux 라는 이름의 Publisher 클래스가 있습니다.
Project Reactor 의 Official Website 는 projectreactor.io (opens in a new tab)이고
Reference Guide 는 projectreactor.io - reference (opens in a new tab) 입니다.
gradle 의존성
// reactor-core
implementation("io.projectreactor:reactor-core:3.6.2")
Mono 와 Flux
Mono 는 Optional<T> 를 상속받은(extends) 자료형입니다. (Mono<T> : Optional<T>
)
값이 없는 경우 또는 하나의 값을 명시적으로 의미하고자 할 때 Mono 를 사용합니다. 흔히 Mono<Void> 를 사용하는 경우는 특정 작업이 완료되는 파이프라인이라는 것을 명시적으로 표현하고자 할 때 사용합니다.
Flux 는 List<T> 를 상속받은(extends) 자료형입니다. (Flux<T>: List<T>)
무한한 값을 가리키거나 사이즈가 정해져있는 유한한 여러개의 요소들을 가리킬 때 Flux 를 사용합니다.
Flux
참고 : prjectreactor.io - reference#flux (opens in a new tab)
Flux 는 0 ~ n 개의 요소들을 전달합니다. 에러가 발생할경우에는 Error 시그널을 전달한 후에 종료를 합니다. 만약 모든 요소를 전달했을 경우에는 Complete 시그널이 전달되면서 종료됩니다. Flux 는 Backpressure 를 지원합니다.
Flux 를 Mono 로 바꿔야할 때가 있습니다. 아래의 두가지 방식으로 변환이 가능합니다.
Mono.from(Flux) 를 사용할 경우
- Flux 의 첫번째 요소만 Mono 에 전달됩니다.
Flux.collectList() 를 사용
- Flux.collectList() 를 사용해서 List 로 변환해서 이것을 Mono<List<T>> 자료형으로 변환한다면 Mono 타입의 List 로 변환이 가능합니다.
예제 1) request size 가 큰 Subscriber
한번 subscribe 요청 시에 한번에 request(Integer.MAX_VALUE) 를 하는 커스텀 Subscriber 를 만들고
500 이라는 숫자열을 담고 있는 Flux 가
이 Subscriber 를 subscribe 하는 예제를 만들어봅니다.
이 예제는 결국 적은 사이즈의 Flux 에 대해 subscribe 할 때 한번에 Integer.MAX_VALUE 만큼을request() 해서 onComlete() 가 발생하게끔 하는 결과를 만들어냅니다.
BigRequestSizeSubscriber
간단한 Subscriber 입니다. 1초에 한개의 요소를 구독(request()) 합니다.
package io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.bigsize_request;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@Slf4j
@RequiredArgsConstructor
public class BigRequestSizeSubscriber<T> implements Subscriber<T> {
private final Integer requestSize;
@Override
public void onSubscribe(Subscription s) {
log.info("(subscribe) --- ");
s.request(requestSize);
log.info(" >>> subscriber.request({})", requestSize);
}
@SneakyThrows
@Override
public void onNext(T t) {
log.info("(next) item : {}", t);
Thread.sleep(1000);
}
@Override
public void onError(Throwable t) {
log.error("error : {}", t.getMessage());
}
@Override
public void onComplete() {
log.info("=== (complete) ===");
}
}
FiniteFluxClient
package io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.bigsize_request;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class BigRequestSizeSubscribing_FluxClient {
public static void main(String[] args) {
log.info("main function started --- ");
getItems().subscribe(new BigRequestSizeSubscriber<>(Integer.MAX_VALUE));
log.info("main function end --- ");
}
private static Flux<Integer> getItems(){
return Flux.fromIterable(List.of(100,200,300,400,500));
}
}
출력결과
08:24:50.463 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.bigsize_request.BigRequestSizeSubscribing_FluxClient -- main function started ---
08:24:50.749 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.bigsize_request.BigRequestSizeSubscriber -- (subscribe) ---
08:24:50.749 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.bigsize_request.BigRequestSizeSubscriber -- >>> subscriber.request(2147483647)
08:24:50.754 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.bigsize_request.BigRequestSizeSubscriber -- (next) item : 100
08:24:51.762 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.bigsize_request.BigRequestSizeSubscriber -- (next) item : 200
08:24:52.774 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.bigsize_request.BigRequestSizeSubscriber -- (next) item : 300
08:24:53.783 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.bigsize_request.BigRequestSizeSubscriber -- (next) item : 400
08:24:54.793 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.bigsize_request.BigRequestSizeSubscriber -- (next) item : 500
08:24:55.808 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.bigsize_request.BigRequestSizeSubscriber -- === (complete) ===
08:24:55.808 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.bigsize_request.BigRequestSizeSubscribing_FluxClient -- main function end ---
Process finished with exit code 0
예제 2) request size 를 적절하게 지정한 Backpressure Subscriber 예제
한번 subscribe 시에 처음에는 request(1) 을 하고 그 다음부터는 onNext() 에서 1초에 1개씩 request(1) 하는 커스텀 Subscriber 를 만들고
500 이라는 숫자열을 담고 있는 Flux 가
이 Subscriber 를 subscribe 하는 예제를 만들어봅니다.
이 예제는 결국 처음 subscribe 시에는 request(1) 을 해서 그 이후부터는 onNext 에서 200,300,400,500 를 차례로 request 하면서 마지막에 도달 시 onComlete() 가 발생하게끔 하는 결과를 만들어냅니다.
SmallSizeBackpressureSubscriber.java
package io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.smallsize_request_backpressure;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@Slf4j
public class SmallSizeBackpressureSubscriber<T> implements Subscriber<T> {
private final Integer requestSize = 1;
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
log.info("(subscribe) --- ");
s.request(requestSize);
log.info(" >>> subscriber.request({})", requestSize);
}
@SneakyThrows
@Override
public void onNext(T t) {
log.info("(next) item : {}", t);
Thread.sleep(1000);
// BigRequestSizeSubscriber 예제에서는 onNext() 내부에서 아래와 같이 request 를 하지 않았다는 점과 비교해보셔야 합니다.
subscription.request(requestSize);
log.info("requestSize : {}", requestSize);
}
@Override
public void onError(Throwable t) {
log.error("error : {}", t.getMessage());
}
@Override
public void onComplete() {
log.info("=== (complete) ===");
}
}
SmallSizeBackpressureSubscribing_FluxClient.java
package io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.smallsize_request_backpressure;
import java.util.List;
import reactor.core.publisher.Flux;
public class SmallSizeBackpressureSubscribing_FluxClient {
public static void main(String[] args) {
getItems().subscribe(
new SmallSizeBackpressureSubscriber<>()
);
}
public static Flux<Integer> getItems(){
return Flux.fromIterable(List.of(1,2,3,4,5));
}
}
출력결과
08:56:18.125 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.smallsize_request_backpressure.SmallSizeBackpressureSubscriber -- (subscribe) ---
08:56:18.132 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.smallsize_request_backpressure.SmallSizeBackpressureSubscriber -- >>> subscriber.request(1)
08:56:18.138 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.smallsize_request_backpressure.SmallSizeBackpressureSubscriber -- (next) item : 1
08:56:19.150 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.smallsize_request_backpressure.SmallSizeBackpressureSubscriber -- requestSize : 1
08:56:19.150 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.smallsize_request_backpressure.SmallSizeBackpressureSubscriber -- (next) item : 2
08:56:20.167 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.smallsize_request_backpressure.SmallSizeBackpressureSubscriber -- requestSize : 1
08:56:20.168 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.smallsize_request_backpressure.SmallSizeBackpressureSubscriber -- (next) item : 3
08:56:21.174 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.smallsize_request_backpressure.SmallSizeBackpressureSubscriber -- requestSize : 1
08:56:21.174 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.smallsize_request_backpressure.SmallSizeBackpressureSubscriber -- (next) item : 4
08:56:22.175 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.smallsize_request_backpressure.SmallSizeBackpressureSubscriber -- requestSize : 1
08:56:22.175 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.smallsize_request_backpressure.SmallSizeBackpressureSubscriber -- (next) item : 5
08:56:23.185 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.smallsize_request_backpressure.SmallSizeBackpressureSubscriber -- requestSize : 1
08:56:23.188 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.smallsize_request_backpressure.SmallSizeBackpressureSubscriber -- === (complete) ===
Process finished with exit code 0
예제 3) error 발생하면 곧바로 중지
onComplete() 호출이 되지 않고 바로 중지되는 것을 확인 가능합니다.
FluxErrorClient1.java
package io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.error;
import io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class FluxErrorClient1 {
public static void main(String[] args) {
log.info("main function started");
getItems()
.subscribe(
new SimpleSubscriber<>(Integer.MAX_VALUE)
);
log.info("main function end");
}
public static Flux<Integer> getItems(){
return Flux.create(fluxSink -> {
fluxSink.next(0);
fluxSink.next(1);
var error = new IllegalStateException("Error 발생");
fluxSink.error(error);
});
}
}
출력결과
09:33:15.405 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.error.FluxErrorClient1 -- main function started
09:33:15.519 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- (subscribe) ---
09:33:15.519 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- >>> subscriber.request(2147483647)
09:33:15.523 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- (next) item : 0
09:33:16.529 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- (next) item : 1
09:33:17.547 [main] ERROR io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- error : Error 발생
09:33:17.547 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.error.FluxErrorClient1 -- main function end
SimpleSubscriber.java
package io.chagchagchag.example.foobar.reactive_streams_libraries.reactor;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@Slf4j
@RequiredArgsConstructor
public class SimpleSubscriber<T> implements Subscriber<T> {
private final Integer requestSize;
@Override
public void onSubscribe(Subscription s) {
log.info("(subscribe) --- ");
s.request(requestSize);
log.info(" >>> subscriber.request({})", requestSize);
}
@SneakyThrows
@Override
public void onNext(T t) {
log.info("(next) item : {}", t);
Thread.sleep(1000);
}
@Override
public void onError(Throwable t) {
log.error("error : {}", t.getMessage());
}
@Override
public void onComplete() {
log.info("=== (complete) ===");
}
}
예제 4) complete - complete 시그널 발생시 종료
package io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.complete;
import io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class FluxCompleteClient {
public static void main(String[] args) {
log.info("main function started");
getItems()
.subscribe(
new SimpleSubscriber<>(Integer.MAX_VALUE)
);
log.info("main function end");
}
public static Flux<Integer> getItems(){
return Flux.create(fluxSink -> {
fluxSink.complete();
});
}
}
출력결과
09:39:21.752 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.complete.FluxCompleteClient -- main function started
09:39:21.845 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- (subscribe) ---
09:39:21.846 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- >>> subscriber.request(2147483647)
09:39:21.851 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- === (complete) ===
09:39:21.851 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.complete.FluxCompleteClient -- main function end
SimpleSubscriber.java
package io.chagchagchag.example.foobar.reactive_streams_libraries.reactor;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@Slf4j
@RequiredArgsConstructor
public class SimpleSubscriber<T> implements Subscriber<T> {
private final Integer requestSize;
@Override
public void onSubscribe(Subscription s) {
log.info("(subscribe) --- ");
s.request(requestSize);
log.info(" >>> subscriber.request({})", requestSize);
}
@SneakyThrows
@Override
public void onNext(T t) {
log.info("(next) item : {}", t);
Thread.sleep(1000);
}
@Override
public void onError(Throwable t) {
log.error("error : {}", t.getMessage());
}
@Override
public void onComplete() {
log.info("=== (complete) ===");
}
}
예제 5) Flux → Mono (Mono.from(Flux))
Mono.from(Flux) 를 사용할 경우
- Flux 의 첫번째 요소만 Mono 에 전달됩니다.
예제
package io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.flux_to_mono;
import io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
public class FluxToMonoClient_MonoFrom {
public static void main(String[] args) {
log.info("main function started");
Mono
.from(getItems())
.subscribe(
new SimpleSubscriber<>(Integer.MAX_VALUE)
);
log.info("main function end");
}
public static Flux<Integer> getItems(){
return Flux.fromIterable(List.of(1,2,3,4,5));
}
}
예제 6) Flux → Mono (Mono.from(Flux.collectList()))
Flux.collectList() 를 사용
- Flux.collectList() 를 사용해서 List 로 변환해서 이것을 Mono<List<T>> 자료형으로 변환한다면 Mono 타입의 List 로 변환이 가능합니다.
package io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.flux_to_mono;
import io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
public class FluxToMonoClient_Flux_collectList {
public static void main(String[] args) {
log.info("main function started");
Mono
.from(getItems().collectList())
.subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("main function end");
}
public static Flux<Integer> getItems(){
return Flux.fromIterable(List.of(1,2,3,4,5));
}
}
Mono
참고 : prjectreactor.io - reference#mono (opens in a new tab)
Mono 는 0개 또는 1개의 요소를 전달합니다. 에러가 발생할 경우 Error 시그널을 전달한 후에 종료를 합니다. 만약 모든 요소를 전달했을 경우에는 Complete 시그널이 전달되면서 종료됩니다.
Mono 를 Flux 로 바꿔야 할 때가 있습니다. 이 경우 아래의 두 가지 방식으로 변환 가능합니다.
Mono::flux() 함수를 사용할 경우
- Mono<List<T>> 타입일 경우 이것을 flux() 함수를 이용해서 Flux<List<T>> 로 변환 가능합니다.
- e.g.
getListItems().flux().subscribe(...)
Mono::flatMapMany() 함수를 사용할 경우
- Mono<List<T>> 타입일 경우 이것을 flatMapMany() 를 사용해서 Flux<T> 로 풀어서 사용가능합니다.
- e.g.
getListItems().flatMapMany(v -> Flux.fromIterable(v)).subscribe(...)
예제 1) Subscriber
Mono 는 1개의 item만 전달하므로 next() 가 한번만 실행되면 바로 complete 가 호출되는 것이 보장됩니다. 그리고 값을 전달하지 않고도 complete 를 할 경우 값이 없다는 것을 의미합니다.
package io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.mono;
import io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Slf4j
public class SimpleMonoClient {
@SneakyThrows
public static void main(String[] args) {
log.info("main function started");
getITems().subscribe(
new SimpleSubscriber<>(Integer.MAX_VALUE)
);
log.info("main function end");
}
public static Mono<Integer> getITems(){
return Mono.create(monoSink -> {
monoSink.success(1);
});
}
}
SimpleSubscriber.java
package io.chagchagchag.example.foobar.reactive_streams_libraries.reactor;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@Slf4j
@RequiredArgsConstructor
public class SimpleSubscriber<T> implements Subscriber<T> {
private final Integer requestSize;
@Override
public void onSubscribe(Subscription s) {
log.info("(subscribe) --- ");
s.request(requestSize);
log.info(" >>> subscriber.request({})", requestSize);
}
@SneakyThrows
@Override
public void onNext(T t) {
log.info("(next) item : {}", t);
Thread.sleep(1000);
}
@Override
public void onError(Throwable t) {
log.error("error : {}", t.getMessage());
}
@Override
public void onComplete() {
log.info("=== (complete) ===");
}
}
출력결과
10:40:03.253 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.mono.SimpleMonoClient -- main function started
10:40:03.355 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- (subscribe) ---
10:40:03.356 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- >>> subscriber.request(2147483647)
10:40:03.360 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- (next) item : 1
10:40:04.386 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- === (complete) ===
10:40:04.387 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.mono.SimpleMonoClient -- main function end
Process finished with exit code 0
예제 2) Mono → Flux (flux())
package io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.mono_to_flux;
import io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
public class MonoToFluxClient_flux {
public static void main(String[] args) {
log.info("main function started");
getItems()
.flux()
.subscribe(new SimpleSubscriber<>(Integer.MAX_VALUE));
log.info("main function end");
}
public static Mono<List<Integer>> getItems(){
return Mono.just(List.of(1,2,3,4,5));
}
}
출력결과
10:36:36.575 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.mono_to_flux.MonoToFluxClient_flux -- main function started
10:36:36.685 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- (subscribe) ---
10:36:36.685 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- >>> subscriber.request(2147483647)
10:36:36.687 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- (next) item : [1, 2, 3, 4, 5]
10:36:37.691 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- === (complete) ===
10:36:37.691 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.mono_to_flux.MonoToFluxClient_flux -- main function end
Process finished with exit code 0
예제 3) Mono → Flux (flatMapMany())
package io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.mono_to_flux;
import io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
public class MonoToFluxClient_flatMapMany {
public static void main(String[] args) {
log.info("main function started");
getItems()
.flatMapMany(
list -> Flux.fromIterable(list)
)
.subscribe(
new SimpleSubscriber<>(Integer.MAX_VALUE)
);
log.info("main function end");
}
public static Mono<List<Integer>> getItems(){
return Mono.just(List.of(1,2,3,4,5));
}
}
출력결과
10:37:03.180 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.mono_to_flux.MonoToFluxClient_flatMapMany -- main function started
10:37:03.297 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- (subscribe) ---
10:37:03.298 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- >>> subscriber.request(2147483647)
10:37:03.299 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- (next) item : 1
10:37:04.302 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- (next) item : 2
10:37:05.316 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- (next) item : 3
10:37:06.320 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- (next) item : 4
10:37:07.320 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- (next) item : 5
10:37:08.335 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.SimpleSubscriber -- === (complete) ===
10:37:08.335 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.reactor.mono_to_flux.MonoToFluxClient_flatMapMany -- main function end
Process finished with exit code 0
RxJava
Netflix에서 개발한 프레임워크입니다. 닷넷 프레임워크에서 지원되던 Reactive Extensions를 Java 버전으로 포팅한 프레임워크입니다. Flowable, Observable, Single, Maybe, Completable 라는 이름의 퍼블리셔 클래스가 있습니다.
Flowable
Observable
Single
Maybe
Completable
Mutiny
Mutiny 공식 도큐먼트 : https://smallrye.io/smallrye-mutiny/latest/ (opens in a new tab)
Hibernate Reactive 에서 비동기 라이브러리로 제공하는 라이브러리입니다. 대표적인 자료형 및 클래스는 Multi, Uni 라고 하는 퍼블리셔 클래스가 있습니다.
Multi 와 Uni
Multi 는 Reactor 의 Flux 와 유사한 특징을 가지는 Publisher 클래스입니다. Multi 에는 0 ~ n 개의 item 을 전달 가능하고 에러가 발생할 경우 Error 시그널을 전달하면서 종료를 합니다. 만약 모든 item 을 전달했을 경우에는 Complete 시그널을 전달하면서 종료합니다. Multi 역시 Backpressure 를 지원합니다.
Uni 는 Reactor 의 Mono 와 유사한 특징을 가지는 Publisher 클래스입니다. Uni 에는 0 ~ 1 개의 item 을 전달 가능하고 에러가 발생할 경우 Error 시그널을 전달하면서 종료를 합니다. 만약 모든 item 을 전달했을 경우에는 Complete 시그널을 전달하면서 종료합니다.
gradle 의존성
// mutiny
implementation("io.smallrye.reactive:mutiny:2.5.7")
Multi
MultiClient.java
package io.chagchagchag.example.foobar.reactive_streams_libraries.mutiny.multi;
import io.smallrye.mutiny.Multi;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MultiClient {
public static void main(String[] args) {
getItems()
.subscribe()
.withSubscriber(new SimpleMultiSubscriber<>(1024));
}
public static Multi<Integer> getItems(){
return Multi.createFrom().items(1,2,3,4,5);
}
}
SimpleMultiSubscriber.java
package io.chagchagchag.example.foobar.reactive_streams_libraries.mutiny.multi;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import java.util.concurrent.Flow.Subscription;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequiredArgsConstructor
public class SimpleMultiSubscriber <T> implements MultiSubscriber<T> {
private final Integer requestSize;
@Override
public void onItem(T t) {
log.info("item == {}", t);
}
@Override
public void onFailure(Throwable throwable) {
log.error("fail, mesage = {}", throwable.getMessage());
}
@Override
public void onCompletion() {
log.info("complete");
}
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(requestSize);
log.info("subscribe");
}
}
출력결과
10:54:17.883 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.mutiny.multi.SimpleMultiSubscriber -- item == 1
10:54:17.898 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.mutiny.multi.SimpleMultiSubscriber -- item == 2
10:54:17.898 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.mutiny.multi.SimpleMultiSubscriber -- item == 3
10:54:17.898 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.mutiny.multi.SimpleMultiSubscriber -- item == 4
10:54:17.898 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.mutiny.multi.SimpleMultiSubscriber -- item == 5
10:54:17.898 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.mutiny.multi.SimpleMultiSubscriber -- complete
10:54:17.898 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.mutiny.multi.SimpleMultiSubscriber -- subscribe
Process finished with exit code 0
Uni
UniClient.java
package io.chagchagchag.example.foobar.reactive_streams_libraries.mutiny.uni;
import io.smallrye.mutiny.Uni;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class UniClient {
public static void main(String[] args) {
getItems()
.subscribe()
.withSubscriber(
new SimpleUniSubscriber<>(1024)
);
}
public static Uni<Integer> getItems(){
return Uni.createFrom().item(-1);
}
}
SimpleUniSubscriber.java
package io.chagchagchag.example.foobar.reactive_streams_libraries.mutiny.uni;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.subscription.UniSubscription;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequiredArgsConstructor
public class SimpleUniSubscriber<T> implements UniSubscriber<T> {
private final Integer count;
private UniSubscription subscription;
@Override
public void onSubscribe(UniSubscription uniSubscription) {
this.subscription = uniSubscription;
subscription.request(1);
log.info("subscribe");
}
@Override
public void onItem(T t) {
log.info("item : {}", t);
}
@Override
public void onFailure(Throwable throwable) {
log.error("error, message = {}", throwable.getMessage());
}
}
출력결과
11:01:35.633 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.mutiny.uni.SimpleUniSubscriber -- subscribe
11:01:35.639 [main] INFO io.chagchagchag.example.foobar.reactive_streams_libraries.mutiny.uni.SimpleUniSubscriber -- item : -1
Process finished with exit code 0