Reactor 프로그래밍 (스압 주의)
Spring Webflux 는 pivotal 사에서 개발한 project reactor 를 기반으로 한 서버 애플리케이션을 개발하기 위한 Web 프레임워크입니다.
Flux, Mono
참고
- Flux :
- Java 의 List 와 유사한 데이터의 흐름입니다. 두개 이상의 데이터의 흐름입니다.
- projectreactor.io - Flux (opens in a new tab) 을 보면 알 수 있듯 Publisher 를 implements 한 클래스이기에 자료구조라기보다는, 데이터의 흐름이라는 사실을 기억해주시기 바랍니다.
- Mono :
- 하나만 존재하는 데이터를 의미합니다.
- projectreactor.io - Mono (opens in a new tab) 을 보면 알 수 있듯 Publisher 를 implements 한 클래스이기에 자료구조라기보다는 데이터의 흐름이라는 사실을 기억해주시기 바랍니다.
Flux 하나만 쓰면 될것 같은데 Mono 가 존재하는 이유에 대해 생각이 들 수 있습니다. Mono 라는 자료형이 존재하는 것으로 인한 장점은 아래와 같습니다.
- 하나만 존재하는 데이터의 흐름(Response, Count 결과값 등) 일 경우 onNext 이후에 바로 onComplete 를 하면 되기 때문에 구현이 더 명확해지게 됩니다.
- Mono 라는 Publisher 를 받는 Subscriber 측 역시 1개의 요소만 처리해야 한다는 사실을 알 수 있기 때문에 조금 더 구현이 명확해집니다.
sequence
Mono,Flux 를 이용해서 다양한 데이터의 흐름을 만들어봅니다. Mono, Flux 를 통해서 만들어내는 데이터의 흐름을 일반적으로 sequence
라고 부릅니다.
just
Mono.just(), Flux.just() 메서드를 사용하면 시퀀스를 생성할 수 있습니다. 자세한 내용은 예제와 출력결과를 확인해주시기 바랍니다.
JustExample.java
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
public class JustExample {
public static void main(String[] args) {
Mono.just("안녕하세요")
.subscribe(v -> {
log.info(">>> {}", v);
});
Flux.just("MSFT", "NVDA", "SMCI")
.subscribe(v -> {
log.info(">>> {}", v);
});
}
}
출력결과
15:18:07.066 [main] INFO io...sequence.JustExample -- >>> 안녕하세요
15:18:07.228 [main] INFO io...sequence.JustExample -- >>> MSFT
15:18:07.228 [main] INFO io...sequence.JustExample -- >>> NVDA
15:18:07.228 [main] INFO io...sequence.JustExample -- >>> SMCI
Process finished with exit code 0
error
Mono.error, Flux.error 를 사용하면 subscriber 에게 onError 이벤트를 전달할 수 있습니다.
반드시 **"subscriber 에게 onError 이벤트를 전달한다."**라는 의미를 꼭 기억해주셨으면 합니다.
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
public class ErrorExample {
public static void main(String[] args) {
Mono.error(new IllegalArgumentException("어머, 에러에요"))
.subscribe(
v -> {log.info("v ::: " + v);},
error -> {log.error("error ::: " + error);}
);
Flux.error(new IllegalArgumentException("어머, 에러에요"))
.subscribe(
v -> {log.info("value ::: " + v);},
error -> {log.error("error ::: " + error);}
);
}
}
출력결과
15:27:13.589 [main] ERROR io...sequence.ErrorExample -- error ::: java.lang.IllegalArgumentException: 어머, 에러에요
15:27:13.699 [main] ERROR io...sequence.ErrorExample -- error ::: java.lang.IllegalArgumentException: 어머, 에러에요
Process finished with exit code 0
empty
Mono.empty, Flux.empty 를 써보는 예제입니다. empty 를 사용하면 Mono 또는 Flux 에게 onComplete 이벤트만 전달되게 됩니다.
package io...sequence;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
public class EmptyExample {
public static void main(String[] args) {
Mono.empty()
.subscribe(
v -> {log.info("value === " +v);},
null,
() -> {log.info("complete");}
);
Flux.empty()
.subscribe(
v -> {log.info("value === " + v);},
null,
() -> {log.info("complete");}
);
}
}
출력결과
15:31:46.735 [main] INFO io...sequence.EmptyExample -- complete
15:31:46.792 [main] INFO io...sequence.EmptyExample -- complete
Process finished with exit code 0
Mono.fromOOO()
Mono.from 으로 시작하는 함수들은 아래와 같습니다.
- Mono.fromCallable
- Callable 함수형 인터페이스를 람다로 실행 후 반환값은 onNext 로 전달됩니다.
- Mono.formFuture
- Future 를 받아서 done 상태가 되었을 때 반환값을 onNext 로 Subscriber 에 전달합니다.
- Mono.fromSupplier
- Supplier 함수형 인터페이스를 람다로 실행 후 반환값은 onNext 로 Subscriber 에 전달합니다.
- Mono.fromRunnable
- Runnable 함수형 인터페이스를 람다로 실행 후, onComplete 를 Subscriber 에 전달합니다.
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Slf4j
public class MonoFromExample {
public static void main(String[] args) {
Mono.fromCallable(() -> {
return "삼성전자";
}).subscribe(v -> {
log.info("fromCallable 에서 받은 value === " + v);
});
Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
return "삼성전자";
})).subscribe(v -> {
log.info("fromFuture 로부터 받은 value === " + v);
});
Mono.fromSupplier(() -> {
return "삼성전자";
}).subscribe(v -> {
log.info("fromSupplier 로부터 받은 value === " + v);
});
Mono.fromRunnable(() -> {
log.info("do some runnable");
}).subscribe(
null, null, () -> {log.info("fromRunnable Complete.");}
);
}
}
출력결과
15:45:03.043 [main] INFO io...sequence.MonoFromExample -- fromCallable 에서 받은 value === 삼성전자
15:45:03.049 [main] INFO io...sequence.MonoFromExample -- fromFuture 로부터 받은 value === 삼성전자
15:45:03.051 [main] INFO io...sequence.MonoFromExample -- fromSupplier 로부터 받은 value === 삼성전자
15:45:03.052 [main] INFO io...sequence.MonoFromExample -- do some runnable
15:45:03.052 [main] INFO io...sequence.MonoFromExample -- fromRunnable Complete.
Process finished with exit code 0
Flux.fromOOO()
Flux.from 으로 시작하는 함수들은 아래와 같습니다.
- Flux.fromIterable
- Flux.fromStream
- Flux.fromArray
- Flux.range(start, n)
import java.util.List;
import java.util.stream.IntStream;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class FluxFromExample {
public static void main(String[] args) {
Flux.fromIterable(List.of("MSFT", "NVDA", "SMCI"))
.subscribe(v -> {log.info("value ==> " + v);});
Flux.fromStream(IntStream.range(1,10).boxed())
.subscribe(v -> {log.info("value ==> " + v);});
Flux.fromArray(new Integer[]{1,2,3,4,5,6,7,8,9,10})
.subscribe(v -> {log.info("value ==> " + v);});
Flux.range(1,10)
.subscribe(v -> {log.info("value ==> " + v);});
}
}
출력결과
15:53:24.737 [main] INFO io...sequence.FluxFromExample -- value ==> MSFT
15:53:24.740 [main] INFO io...sequence.FluxFromExample -- value ==> NVDA
15:53:24.740 [main] INFO io...sequence.FluxFromExample -- value ==> SMCI
15:53:24.746 [main] INFO io...sequence.FluxFromExample -- value ==> 1
15:53:24.746 [main] INFO io...sequence.FluxFromExample -- value ==> 2
15:53:24.746 [main] INFO io...sequence.FluxFromExample -- value ==> 3
15:53:24.747 [main] INFO io...sequence.FluxFromExample -- value ==> 4
15:53:24.747 [main] INFO io...sequence.FluxFromExample -- value ==> 5
15:53:24.747 [main] INFO io...sequence.FluxFromExample -- value ==> 6
15:53:24.747 [main] INFO io...sequence.FluxFromExample -- value ==> 7
15:53:24.747 [main] INFO io...sequence.FluxFromExample -- value ==> 8
15:53:24.747 [main] INFO io...sequence.FluxFromExample -- value ==> 9
15:53:24.749 [main] INFO io...sequence.FluxFromExample -- value ==> 1
15:53:24.749 [main] INFO io...sequence.FluxFromExample -- value ==> 2
15:53:24.749 [main] INFO io...sequence.FluxFromExample -- value ==> 3
15:53:24.749 [main] INFO io...sequence.FluxFromExample -- value ==> 4
15:53:24.749 [main] INFO io...sequence.FluxFromExample -- value ==> 5
15:53:24.749 [main] INFO io...sequence.FluxFromExample -- value ==> 6
15:53:24.749 [main] INFO io...sequence.FluxFromExample -- value ==> 7
15:53:24.749 [main] INFO io...sequence.FluxFromExample -- value ==> 8
15:53:24.749 [main] INFO io...sequence.FluxFromExample -- value ==> 9
15:53:24.749 [main] INFO io...sequence.FluxFromExample -- value ==> 10
15:53:24.751 [main] INFO io...sequence.FluxFromExample -- value ==> 1
15:53:24.751 [main] INFO io...sequence.FluxFromExample -- value ==> 2
15:53:24.751 [main] INFO io...sequence.FluxFromExample -- value ==> 3
15:53:24.751 [main] INFO io...sequence.FluxFromExample -- value ==> 4
15:53:24.751 [main] INFO io...sequence.FluxFromExample -- value ==> 5
15:53:24.751 [main] INFO io...sequence.FluxFromExample -- value ==> 6
15:53:24.751 [main] INFO io...sequence.FluxFromExample -- value ==> 7
15:53:24.751 [main] INFO io...sequence.FluxFromExample -- value ==> 8
15:53:24.751 [main] INFO io...sequence.FluxFromExample -- value ==> 9
15:53:24.751 [main] INFO io...sequence.FluxFromExample -- value ==> 10
Process finished with exit code 0
Flux.generate
generate 는 동기적으로 Flux 를 생성합니다.
Flux.java(github) (opens in a new tab) 내에 선언된 generate() 메서드는 세 종류의 메서드가 있는데 그 중 한 가지는 아래와 같습니다.
public abstract class Flux<T> implements CorePublisher<T>{
// ...
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) {
return onAssembly(new FluxGenerate<>(stateSupplier, generator));
}
// ...
}
stateSupplier
- 초기값을 제공하는 역할의 callable 입니다.
generator
- 첫번째 인자로는 state 를 제공합니다. 그리고 이 state 에 대해 변경된 state 를 반환합니다.
- 두번째 인자로 SynchronousSink 를 제공합니다. 이 sink 객체의 next, error, complete 메서드를 이용해서 Subscriber 에게 onNext, onError, onComplete 이벤트를 전달하는 것이 가능합니다.
- 한번의 generator 에서는 최대 한번만 next 를 호출 가능합니다.
1) sink.next()
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class FluxGenerateExample1 {
public static void main(String[] args) {
Flux.generate(
() -> 0,
(state, sink) -> {
sink.next(state);
if(state == 9) sink.complete();
return state + 1;
}
).subscribe(
v -> {log.info("value === " + v);},
error -> {log.error("error === " + error);},
() -> {log.info("complete !!");}
);
}
}
초기값은 0 으로 세팅해줬습니다. 매 순간 다음 스텝의 연산을 수행하는 것은 state + 1 을 통해서 다음 스텝의 연산을 합니다. 그리고 state 가 9 가 되었을 때는 sink 객체의 complete() 메서드를 호출해서 onComplete 신호를 방출합니다.
자세히 보면 FSM, CPS 연산과 닮아있음을 알 수 있습니다.
출력결과
16:07:48.178 [main] INFO io...sequence.FluxGenerateExample1 -- value === 0
16:07:48.180 [main] INFO io...sequence.FluxGenerateExample1 -- value === 1
16:07:48.180 [main] INFO io...sequence.FluxGenerateExample1 -- value === 2
16:07:48.180 [main] INFO io...sequence.FluxGenerateExample1 -- value === 3
16:07:48.180 [main] INFO io...sequence.FluxGenerateExample1 -- value === 4
16:07:48.180 [main] INFO io...sequence.FluxGenerateExample1 -- value === 5
16:07:48.180 [main] INFO io...sequence.FluxGenerateExample1 -- value === 6
16:07:48.180 [main] INFO io...sequence.FluxGenerateExample1 -- value === 7
16:07:48.180 [main] INFO io...sequence.FluxGenerateExample1 -- value === 8
16:07:48.180 [main] INFO io...sequence.FluxGenerateExample1 -- value === 9
16:07:48.180 [main] INFO io...sequence.FluxGenerateExample1 -- complete !!
Process finished with exit code 0
2) sink.next()를 두번 호출하면? 에러발생
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class FluxGenerateExample2 {
public static void main(String[] args) {
Flux.generate(
() -> 0,
(state, sink) -> {
sink.next(state);
sink.next(state);
if(state == 9) sink.complete();
return state + 1;
}
).subscribe(
v -> { log.info("value === " + v); },
error -> { log.error("error === " + error); },
() -> { log.info("complete"); }
);
}
}
generate 에서 sink 객체를 이용해서 next() 함수를 통해 onNext 이벤트를 두번 호출해보면 에러가 나는 것을 확인 가능합니다.
출력결과
16:14:41.992 [main] INFO io.....sequence.FluxGenerateExample2 -- value === 0
16:14:41.994 [main] ERROR io.chagchagchag...sequence.FluxGenerateExample2 -- error === java.lang.IllegalStateException: More than one call to onNext
Process finished with exit code 0
Flux.create()
Flux.java(github) (opens in a new tab) 내에 선언된 create() 메서드는 아래와 같습니다.
public abstract class Flux<T> implements CorePublisher<T>{
// ...
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) {
return create(emitter, OverflowStrategy.BUFFER);
}
// ...
}
Flux.craete() 는 Flux 를 비동기적으로 생성합니다. 중간에 있는 인자를 보면 FluxSink 가 Consumer 의 인자로 되는 것을 알 수 있습니다. 이 FluxSink 를 이용해서 next(), error(), complete() 메서드를 호출해서 Subscriber 에게 onNext, onError, onComplete 신호를 발생시킬 수 있습니다.
위에서 살펴본 Flux.generate() 에서 사용하던 SynchronousSink 와는 다르게 여러번 next() 를 호출하는 것이 가능합니다. 그리고 Flux.create() 는 여러 스레드에서 동시에 호출가능합니다.
예제
import java.util.concurrent.CompletableFuture;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class FluxCreateExample {
@SneakyThrows
public static void main(String[] args) {
Flux.create(fluxSink -> {
var task1 = CompletableFuture.runAsync(() -> {
for (int i=0; i<5; i++){
fluxSink.next(i);
}
});
var task2 = CompletableFuture.runAsync(() -> {
for (int i=5; i<10; i++){
fluxSink.next(i);
}
});
CompletableFuture.allOf(task1, task2)
.thenRun(fluxSink::complete);
}).subscribe(
v -> {log.info("value === " + v);},
error -> {log.error("error === " + error);},
() -> {log.info("complete");}
);
Thread.sleep(1000);
}
}
0~4 까지의 숫자를 sink.next 하는 스레드, 5 ~ 9 까지의 숫자를 sink.next 하는 스레드 이렇게 두개의 스레드를 이용해서 sink.next 를 수행하며, CompletableFuture.allOf 를 통해서 두 개의 CompletableFuture 객체 task1, task2 작업이 끝난 시점에 fluxSink 객체를 이용해서 Subscriber 에게 complete 신호를 내보냅니다.
출력결과
16:24:55.472 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- value === 0
16:24:55.474 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- value === 5
16:24:55.474 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- value === 6
16:24:55.474 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- value === 7
16:24:55.474 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- value === 8
16:24:55.474 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- value === 9
16:24:55.474 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- value === 1
16:24:55.474 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- value === 2
16:24:55.474 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- value === 3
16:24:55.474 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- value === 4
16:24:55.475 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- complete
Process finished with exit code 0
Flux.handle()
Flux.java(github) (opens in a new tab) 내에 선언된 handle() 메서드는 아래와 같습니다.
public abstract class Flux<T> implements CorePublisher<T>{
// ...
public final <R> Flux<R> handle(BiConsumer<? super T, SynchronousSink<R>> handler) {
if (this instanceof Fuseable) {
return onAssembly(new FluxHandleFuseable<>(this, handler));
}
return onAssembly(new FluxHandle<>(this, handler));
}
// ...
}
Flux.handle() 메서드는 현재 존재하는 source 에 대해 handle 작업을 처리하는 함수입니다. 위 코드에서 handler
라는 BiConcumer 의 인자값에 대한 설명은 아래와 같습니다.
? super T
- source 에서 제공하는 데이터흐름에 대한 각 요소를 의미하는 item 입니다.
SynchronousSink<R>
- sink 의 next 연산을 이용해서 현재 주어진 item 을 전달할지 말지 결정합니다.
source 의 item 을 필터링 하는 등 interceptor 같은 역할을 하게끔 구현가능합니다.
예제
import java.util.stream.IntStream;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class FluxHandleExample {
public static void main(String[] args) {
Flux.fromStream(IntStream.range(0,11).boxed())
.handle((v, sink) -> {
if(v%2 == 0) sink.next(v);
}).subscribe(
v -> {log.info("value === " + v);},
error -> {log.error("error === " + error);},
() -> {log.info("complete");}
);
}
}
짝수 숫자에 대해서만 다음으로 넘어갈 수 있도록 sink 를 이용해서 next() 함수를 호출해서 Subscriber 에게 onNext 신호를 방출합니다.
출력결과
16:37:34.735 [main] INFO io.....sequence.FluxHandleExample -- value === 0
16:37:34.737 [main] INFO io.....sequence.FluxHandleExample -- value === 2
16:37:34.737 [main] INFO io.....sequence.FluxHandleExample -- value === 4
16:37:34.737 [main] INFO io.....sequence.FluxHandleExample -- value === 6
16:37:34.737 [main] INFO io.....sequence.FluxHandleExample -- value === 8
16:37:34.737 [main] INFO io.....sequence.FluxHandleExample -- value === 10
16:37:34.738 [main] INFO io.....sequence.FluxHandleExample -- complete
Process finished with exit code 0
subscribe(), Subscriber
Flux, Mono 와 같은 Publisher 는 subscribe 를 하지 않으면 아무 일도 일어나지 않습니다. 예를 들면 아래와 같은 코드는 아무 일도 일어나지 않으며 그냥 표현식일 뿐입니다.
Flux.fromIterable(List.of("배고파요", "밥먹어요", "배불러요"))
.doOnNext(v -> {log.info(">>> " + v);});
subscribe()
예를 들어 Flux 의 코드를 보면, subscribe() 함수는 아래와 같은 메서드들이 있습니다. 오버로딩 된 여러가지 메서드들이 있지만, 가장 대표적인 메서드 들은 아래와 같습니다.
package reactor.core.publisher;
// ...
public abstract class Flux<T> implements CorePublisher<T> {
public final Disposable subscribe() {
return subscribe(null, null, null);
}
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer,
@Nullable Context initialContext) {
return subscribeWith(new LambdaSubscriber<>(consumer, errorConsumer,
completeConsumer,
null,
initialContext));
}
@Override
@SuppressWarnings("unchecked")
public final void subscribe(Subscriber<? super T> actual) {
// ...
}
}
- subscribe()
- 별도의 Consumer 를 전달 받지 않은 채로 subscribe 합니다.
- subscribe (consumer, errorConsumer, completeConsumer, initialContext)
- 함수형 인터페이스를 이용해서 subscribe 를 합니다. Disposable 을 반환해서 이 Disposable 을 이용해서 언제든지 연결을 종료하는 것이 가능합니다.
- subscribe (Subscriber)
- 별도의 Subscriber 를 전달받아서 subscribe 합니다.
- 예를 들면 BaseSubscriber (opens in a new tab) 같은 Subscriber 를 인자로 전달하거나, 비즈니스 로직에 따라 특수하게 BaseSubscriber (opens in a new tab) 를 별도로 확장(extends)한 커스텀한 Subscriber 를 넘겨줘서 subscribe 하는 경우도 있습니다.
- BaseSubscriber (opens in a new tab) 는 project reactor 에서 제공하는 배압관리를 위해 필요한 것들이 잘 갖춰져 있는 클래스입니다.
- 이때 Subscriber 는 subscription 을 Publisher 로부터 전달 받습니다. 그리고 이 subscription 객체를 이용해서 request를 통해서 backpressure 를 조절하거나, cancel 을 통해서 연결 종료할 수 있습니다.
- Subscriber 를 직접 작성할 경우에 어떻게 작성하는지 궁금하시다면 Publisher, Subscriber, Supscription, Backpressure 문서 (opens in a new tab) 를 참고해주시기 바랍니다.
subscribe()
Publiser 에서 데이터의 흐름을 만들긴 하지만, 이 데이터들을 Subscriber 측에서 받아서 별도의 처리를 할 필요가 없이 뒷단에서만 동작하게끔 해야 할 경우 단순한 형태의 subscribe() 메서드를 사용합니다.
@Slf4j
public class Subscribe1Example {
public static void main(String[] args) {
Flux.fromIterable(List.of("배고파요", "밥먹어요", "배불러요"))
.doOnNext(v -> { save(v);})
.subscribe();
}
public static void save(String s){
log.info("SAVE DATA >>> " + s);
}
}
doOnNext()
- 결과를 확인하고 싶을 때나, 지나가는 값들을 통해서 별도의 작업을 확인하고 싶을 때 사용합니다.
출력결과
17:26:42.596 [main] INFO io.....subscribe.Subscribe1Example -- SAVE DATA >>> 배고파요
17:26:42.598 [main] INFO io.....subscribe.Subscribe1Example -- SAVE DATA >>> 밥먹어요
17:26:42.598 [main] INFO io.....subscribe.Subscribe1Example -- SAVE DATA >>> 배불러요
Process finished with exit code 0
subscribe (consumer, errorConsumer, completeConsumer, initialContext)
정상일때, 에러일때, onComplete 일때 어떤것을 할지, initialContext 를 정의합니다.
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.util.context.Context;
@Slf4j
public class Subscribe2Example {
public static void main(String[] args) {
Flux.fromIterable(List.of("배고파요", "밥먹어요", "배불러요"))
.subscribe(
v -> {log.info("value === " + v);},
error -> {log.error("error === " + error);},
() -> {log.info("complete");},
Context.empty()
);
}
}
consumer
- 인자값을 받아서 해야 할 일을 Consumer 함수형 인터페이스에 맞게 구현합니다.
errorConsumer
- errorConsumer 에는 에러를 인자로 받아서 처리하는 Consumer 를 넘겨주면 됩니다.
completeConsumer
- onComplete 신호가 발생했을 때 실행할 Runnable 인터페이스의 람다 바디를 이곳에 정의해줍니다.
initialContext
- upstream 에 전달할 context 입니다.
출력결과
17:31:30.686 [main] INFO io.....subscribe.Subscribe2Example -- value === 배고파요
17:31:30.688 [main] INFO io.....subscribe.Subscribe2Example -- value === 밥먹어요
17:31:30.688 [main] INFO io.....subscribe.Subscribe2Example -- value === 배불러요
17:31:30.689 [main] INFO io.....subscribe.Subscribe2Example -- complete
Process finished with exit code 0
subscribe (Subscriber)
Publisher 가 Subscription 에 어떤 Subscriber 를 등록할지를 직접 지정합니다.
package io.chagchagchag...subscribe;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
@Slf4j
public class Subscribe3Example {
public static void main(String[] args) {
var subscriber = new BaseSubscriber<String>(){
@Override
protected void hookOnNext(String value) {
log.info("value === " + value);
}
@Override
protected void hookOnComplete() {
log.info("complte");
}
};
Flux.fromIterable(List.of("배고파요", "밥먹어요", "배불러요"))
.subscribe(subscriber);
}
}
출력결과
18:47:08.696 [main] INFO io.....subscribe.Subscribe3Example -- value === 배고파요
18:47:08.699 [main] INFO io.....subscribe.Subscribe3Example -- value === 밥먹어요
18:47:08.699 [main] INFO io.....subscribe.Subscribe3Example -- value === 배불러요
18:47:08.699 [main] INFO io.....subscribe.Subscribe3Example -- complte
Process finished with exit code 0
BaseSubscriber
예제에서 사용한 BaseSubscriber (opens in a new tab) 는 projectreactor 에서 제공하는 BaseSubscriber (opens in a new tab) 를 사용했습니다.
BaseSubscriber 내의 hookOnNext(), hookOnComplete(), hookOnError(), hookOnSubscribe() 를 override 해서 cancel, request 를 별도로 호출할 수 있도록 정의할 수 있습니다.
backpressure 관리함수 : request(n)
Publisher 에게 "아이템을 n 만큼씩 주세요" 하면서 그 속도를 조절하는 함수입니다.
Subscriber 외부에서 request(n) 을 통해 요청할 수 있어서 backpressure 의 속도를 조절하는 등의 작업을 할 때 유용하게 쓰입니다.
예를 들어 아래의 코드는 Subscriber interface 에서 제공하는 backpressure 함수인 request () 함수를 이용했습니다. hookOnSubscribe() 에서 request(1) 을 통해서 1만큼 읽어들이겠다고 하는 방식으로 backpressure 관리 코드를 작성했습니다.
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
@Slf4j
public class Subscribe4Example {
public static void main(String[] args) {
var subscriber = new BaseSubscriber<String>(){
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1); // backpressure 함수인 reuest(n) 호출
}
@Override
protected void hookOnNext(String value) {
log.info("value === " + value);
}
@Override
protected void hookOnComplete() {
log.info("complte");
}
};
Flux.fromIterable(List.of("배고파요", "밥먹어요", "배불러요"))
.subscribe(subscriber);
}
}
출력결과
19:04:26.426 [main] INFO io.....subscribe.Subscribe4Example -- value === 배고파요
Process finished with exit code 0
backpressure 관리함수 : requestUnbounded()
requestUnbounded() 는 request(Long.MAX_VALUE) 로 실행되는데,
Publisher 에게 "가능한 빠르게 아에템을 전달해주세요"와 같은 의미의 요청입니다.
Subscriber 외부에서 request(n) 을 통해 요청할 수 있어서 backpressure 의 속도를 조절하는 등의 작업을 할 때 유용하게 쓰입니다.
request 를 거의 무한대에 가까운 숫자로 요청하기에 가지고 있는 것들을 모두 한번에 보내주세요 라는 의미와 같습니다. 또한 backpressure 의 속도를 조절하지 않고 한번에 요청하므로 거의 backpressure 를 비활성화 한것과 같은 효과를 냅니다.
BaseSubscriber 의 기본 전략은 requestUnbounded() 입니다.
requestUnbounded(), request(Long.MAX_VALUE) 가 발생하는 경우
아래와 같은 경우에 request(Long.MAX_VALUE) 와 같은 요청이 발생됩니다.
- 아무 인자 없이 사용되는 subscribe() 함수
- BaseSubscriber 의 hookOnSubscribe() 를 그대로 사용할 경우
- BaseSubscriber 를 재정의 없이 그냥 사용할 경우
- block(), blockFirst(), blockLast() 등의 blocking 연산자를 사용할 경우
- toIterable(), toStream() 등의 toCallect 연산자를 사용할 경우
BasePublisher 는 위에서 살펴봤던 request(n) 과 같은 backpressure 를 관리하는 함수를 조금 더 다양하게 사용할 수 있는 함수를 제공합니다.
buffer(n)
BasePublisher 는 위에서 살펴봤던 request(n) 과 같은 backpressure 를 관리하는 함수를 조금 더 다양하게 사용할 수 있는 함수 인 buffer(n) 을 제공합니다.
import java.util.List;
import java.util.stream.IntStream;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
@Slf4j
public class Buffer1Example {
public static void main(String[] args) {
var subscriber = new BaseSubscriber<List<Integer>>(){
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(2);
}
@Override
protected void hookOnNext(List<Integer> value) {
log.info("value = " + value);
}
@Override
protected void hookOnComplete() {
log.info("complete");
}
};
Flux.fromStream(IntStream.range(0,20).boxed())
.buffer(3)
.subscribe(subscriber);
}
}
(request = 2) x (buffer = 3) = 6 입니다.
한번의 요청에 size 가 3 인 List 를 buffer 로 전달하고 이것을 requet(2) 를 통해 한번의 엘리먼트에서 (3개짜리 리스트) 를 2회 요청하는 효과를 냅니다.
출력결과는 아래와 같습니다.
19:38:07.365 [main] INFO io.....subscribe.Buffer1Example -- value = [0, 1, 2]
19:38:07.367 [main] INFO io.....subscribe.Buffer1Example -- value = [3, 4, 5]
Process finished with exit code 0
take(n, limitRequest)
BasePublisher 는 위에서 살펴봤던 request(n) 과 같은 backpressure 를 관리하는 함수를 조금 더 다양하게 사용할 수 있는 함수인 take(n, limitRequest) 를 제공합니다.
Publisher 에는 데이터가 수천만개가 있을 수 있습니다. 하지만 데이터를 SQL의 limit 문 처럼 원하는 갯수만 구독하려 할 경우에는 take(n, limitRequest) 를 사용합니다.
그런데 take(n, limitRequest) 함수는 n개를 지정했더라도 정확하게 n개만큼 딱 떨어지게 끊지 못하는 경우도 있는데 limitRequest == true 로 설정할 경우에는 정확하게 n개 만큼을 take 할 수 있게 됩니다. 만약 limitRequest == false 일 경우 정확히 n 개를 take 하진 않더라도 그 비슷한 시간 대에 반환됩니다.
import java.util.stream.IntStream;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
@Slf4j
public class Subscribe5Example {
public static void main(String[] args) {
var subscriber = new BaseSubscriber<Integer>(){
@Override
protected void hookOnNext(Integer value) {
log.info("value == " + value);
}
@Override
protected void hookOnComplete() {
log.info("completed");
}
};
Flux.fromStream(IntStream.range(0, 20).boxed())
.take(11, true)
.subscribe(subscriber);
}
}
출력결과
19:54:18.272 [main] INFO io.....subscribe.Subscribe5Example -- value == 0
19:54:18.274 [main] INFO io.....subscribe.Subscribe5Example -- value == 1
19:54:18.274 [main] INFO io.....subscribe.Subscribe5Example -- value == 2
19:54:18.274 [main] INFO io.....subscribe.Subscribe5Example -- value == 3
19:54:18.274 [main] INFO io.....subscribe.Subscribe5Example -- value == 4
19:54:18.274 [main] INFO io.....subscribe.Subscribe5Example -- value == 5
19:54:18.274 [main] INFO io.....subscribe.Subscribe5Example -- value == 6
19:54:18.274 [main] INFO io.....subscribe.Subscribe5Example -- value == 7
19:54:18.274 [main] INFO io.....subscribe.Subscribe5Example -- value == 8
19:54:18.274 [main] INFO io.....subscribe.Subscribe5Example -- value == 9
19:54:18.274 [main] INFO io.....subscribe.Subscribe5Example -- value == 10
19:54:18.275 [main] INFO io.....subscribe.Subscribe5Example -- completed
Process finished with exit code 0
delayElements()
onNext 이벤트 발행 시에 최소 delay 만큼의 간격을 두어서 delay 를 수행합니다. 만약 onNext 이벤트가 발행된 후에 delay 보다 더 늦게 다음 onNext 이벤트가 전달되면 바로 전파됩니다.
e.g 1
for 문 안에서는 10ms 만큼 지연이 발생하고, delayElements 에서는 최소 500ms 만큼 기다려주고 있습니다.
import java.time.Duration;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
@Slf4j
public class DelayElementsExample1 {
@SneakyThrows
public static void main(String[] args) {
Flux.create(fluxSink -> {
for(int i=1; i<=5; i++){
try{
Thread.sleep(10);
}
catch (InterruptedException e){
e.printStackTrace();
throw new IllegalStateException(e);
}
fluxSink.next(i);
}
fluxSink.complete();
})
.delayElements(Duration.ofMillis(500))
.doOnNext(v -> {
log.info("doOnNext = " + v);
})
.subscribeOn(Schedulers.single())
.subscribe();
Thread.sleep(5000);
}
}
출력결과
20:44:50.730 [parallel-1] INFO io.....delay_elements.DelayElementsExample1 -- doOnNext = 1
20:44:51.240 [parallel-2] INFO io.....delay_elements.DelayElementsExample1 -- doOnNext = 2
20:44:51.756 [parallel-3] INFO io.....delay_elements.DelayElementsExample1 -- doOnNext = 3
20:44:52.269 [parallel-4] INFO io.....delay_elements.DelayElementsExample1 -- doOnNext = 4
20:44:52.782 [parallel-5] INFO io.....delay_elements.DelayElementsExample1 -- doOnNext = 5
Process finished with exit code 0
로그에 찍힌 시간을 보면 500ms 마다 한번씩 doOnNext=
로그가 찍혔음을 확인 가능합니다.
e.g 2
이번에는 for 문안에서 1000ms 를 지정합니다. delayElements 에서는 100ms 를 지정했습니다. 결과는 delayElements 의 100ms 는 최소로 기다려야 하는 시간이기에 for loop 의 1000ms 를 거친 후, delayElements 는 어이쿠 최소로 기다려야 하는 시간(100ms)이 넘어버렸네? 하면서 delayElements 는 넘어가게 됩니다.
import java.time.Duration;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
@Slf4j
public class DelayElementsExample2 {
@SneakyThrows
public static void main(String[] args) {
Flux.create(fluxSink -> {
for(int i=1; i<=5; i++){
try{
Thread.sleep(1000);
}
catch (InterruptedException e){
e.printStackTrace();
throw new IllegalStateException(e);
}
fluxSink.next(i);
}
fluxSink.complete();
})
.delayElements(Duration.ofMillis(700))
.doOnNext(v -> {
log.info("doOnNext = " + v);
})
.subscribeOn(Schedulers.single())
.subscribe();
Thread.sleep(7000);
}
}
출력결과
20:50:17.131 [parallel-1] INFO io.....delay_elements.DelayElementsExample2 -- doOnNext = 1
20:50:18.142 [parallel-2] INFO io.....delay_elements.DelayElementsExample2 -- doOnNext = 2
20:50:19.145 [parallel-3] INFO io.....delay_elements.DelayElementsExample2 -- doOnNext = 3
20:50:20.155 [parallel-4] INFO io.....delay_elements.DelayElementsExample2 -- doOnNext = 4
20:50:21.154 [parallel-5] INFO io.....delay_elements.DelayElementsExample2 -- doOnNext = 5
Process finished with exit code 0
concat(), merge(), mergeSequential()
concat()
현재 Publisher 를 다른 Publisher 와 합치는 연산을 수행합니다. 위의 그림을 설명해보면, 바로 앞의 Publisher 가 onComplete 를 전파하면 바로 다음 Publisher 를 subscribe() 합니다.
그림을 보면 Publisher 두개를 합치는 도중에 각각의 Publiser 의 onNext 연산에 대한 순서가 각각의 요소에 대해 지켜지고 있는 것을 확인 가능합니다.
import java.time.Duration;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class ConcatExample1 {
@SneakyThrows
public static void main(String[] args) {
var flux1 = Flux.range(1,3)
.doOnSubscribe(v -> {
log.info("doOnSubscribe flux#1");
})
.delayElements(Duration.ofMillis(100));
var flux2 = Flux.range(11, 3)
.doOnSubscribe(v -> {
log.info("doOnSubscribe flux#2");
})
.delayElements(Duration.ofMillis(100));
Flux.concat(flux1, flux2)
.doOnNext(v -> {log.info("doOnNext " + v);})
.subscribe();
Thread.sleep(2000);
}
}
출력결과를 보면 doOnSubscribe flux#1
, doOnSubscribe flux#2
를 통해 두개의 Publisher 를 순서대로 받음을 알수 있고, 데이터의 순서 역시 로그를 확인해보면 순서대로 들어온다는 것이 보장됩니다.
출력결과
00:22:07.475 [main] INFO io.....concat_merge_mergesequential.ConcatExample1 -- doOnSubscribe flux#1
00:22:07.596 [parallel-1] INFO io.....concat_merge_mergesequential.ConcatExample1 -- doOnNext 1
00:22:07.704 [parallel-2] INFO io.....concat_merge_mergesequential.ConcatExample1 -- doOnNext 2
00:22:07.814 [parallel-3] INFO io.....concat_merge_mergesequential.ConcatExample1 -- doOnNext 3
00:22:07.814 [parallel-3] INFO io.....concat_merge_mergesequential.ConcatExample1 -- doOnSubscribe flux#2
00:22:07.922 [parallel-4] INFO io.....concat_merge_mergesequential.ConcatExample1 -- doOnNext 11
00:22:08.029 [parallel-5] INFO io.....concat_merge_mergesequential.ConcatExample1 -- doOnNext 12
00:22:08.138 [parallel-6] INFO io.....concat_merge_mergesequential.ConcatExample1 -- doOnNext 13
Process finished with exit code 0
merge()
그림을 보면 가로 축 화살표가 두개인 것을 확인 가능합니다. 두개의 Publisher 가 흐르고 있는데, 이 것을 공통 축에 합치고 있습니다.순서는 보장되지 않습니다.
import java.time.Duration;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class MergeExample1 {
@SneakyThrows
public static void main(String[] args) {
var flux1 = Flux.range(1,3)
.doOnSubscribe(v -> {
log.info("doOnSubscribe #1");
})
.delayElements(Duration.ofMillis(100));
var flux2 = Flux.range(11, 3)
.doOnSubscribe(v -> {
log.info("doOnSubscribe #2");
})
.delayElements(Duration.ofMillis(100));
Flux.merge(flux1, flux2)
.doOnNext(v -> {log.info("doOnNext : " + v);})
.subscribe();
Thread.sleep(2000);
}
}
출력결과를 보면 doOnSubscribe#1
, doOnSubscribe#2
가 근소한 차이로 시작되었고, 각 요소들의 순서도 지켜지지 않은 것을 확인 가능합니다.
출력결과
00:37:08.038 [main] INFO io.....concat_merge_mergesequential.MergeExample1 -- doOnSubscribe #1
00:37:08.048 [main] INFO io.....concat_merge_mergesequential.MergeExample1 -- doOnSubscribe #2
00:37:08.157 [parallel-2] INFO io.....concat_merge_mergesequential.MergeExample1 -- doOnNext : 11
00:37:08.161 [parallel-1] INFO io.....concat_merge_mergesequential.MergeExample1 -- doOnNext : 1
00:37:08.267 [parallel-4] INFO io.....concat_merge_mergesequential.MergeExample1 -- doOnNext : 2
00:37:08.267 [parallel-4] INFO io.....concat_merge_mergesequential.MergeExample1 -- doOnNext : 12
00:37:08.377 [parallel-5] INFO io.....concat_merge_mergesequential.MergeExample1 -- doOnNext : 13
00:37:08.377 [parallel-5] INFO io.....concat_merge_mergesequential.MergeExample1 -- doOnNext : 3
Process finished with exit code 0
mergeSequential()
Publisher 를 다른 Publisher 와 합치는 연산을 수행하는 함수입니다. 모든 Publisher 를 바로 subscribe 합니다. Publisher 의 onNext 이벤트가 동시에 도달하는데, merge 와는 다르게 내부에서 재정렬하기에 순서를 보장합니다.
import java.time.Duration;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class MergeSequentialExample1 {
@SneakyThrows
public static void main(String[] args) {
var flux1 = Flux.range(1,3)
.doOnSubscribe(v -> {
log.info("doOnSubscribe #1");
})
.delayElements(Duration.ofMillis(100));
var flux2 = Flux.range(11,3)
.doOnSubscribe(v -> {
log.info("doOnSubscribe #2");
})
.delayElements(Duration.ofMillis(100));
Flux.mergeSequential(flux1, flux2)
.doOnNext(v -> {
log.info("doOnNext : " + v);
})
.subscribe();
Thread.sleep(2000);
}
}
출력결과를 보면 doOnSubscribe #1
, doOnSubscribe #2
가 거의 동시에 subscribe 되었다는 사실을 알 수 있습니다. 그리고 각각의 요소들은 각 Publisher 내에서의 순서에 맞게 Publisher 별로 이어져서 나오는 것을 확인 가능합니다.
출력결과
00:43:52.235 [main] INFO io.....concat_merge_mergesequential.MergeSequentialExample1 -- doOnSubscribe #1
00:43:52.243 [main] INFO io.....concat_merge_mergesequential.MergeSequentialExample1 -- doOnSubscribe #2
00:43:52.360 [parallel-1] INFO io.....concat_merge_mergesequential.MergeSequentialExample1 -- doOnNext : 1
00:43:52.468 [parallel-4] INFO io.....concat_merge_mergesequential.MergeSequentialExample1 -- doOnNext : 2
00:43:52.578 [parallel-6] INFO io.....concat_merge_mergesequential.MergeSequentialExample1 -- doOnNext : 3
00:43:52.578 [parallel-6] INFO io.....concat_merge_mergesequential.MergeSequentialExample1 -- doOnNext : 11
00:43:52.578 [parallel-6] INFO io.....concat_merge_mergesequential.MergeSequentialExample1 -- doOnNext : 12
00:43:52.578 [parallel-6] INFO io.....concat_merge_mergesequential.MergeSequentialExample1 -- doOnNext : 13
Process finished with exit code 0
앞에서 살펴봤던 concat 예제와는 다르게 doOnSubscribe #1
, doOnSubscribe #2
가 모두 제일 처음에 나타났다는 사실에 주목해주세요. 먼저 subscribe 를 하고, flux 별로 순서를 내부적으로 지켜주면서 merge 를 한다는 사실을 알 수 있습니다.
다양한 함수들
- map(), mapNotNull()
doOn---()
- flatMap()
- filter()
- take(), takeLast()
- skip(), skipLast()
- collectList()
- cache()
map(), mapNotNull()
@Slf4j
public class Map_MapNotNull_Example {
public static void main(String[] args) {
Flux.range(1,3)
.map(v -> v*100)
.doOnNext(v -> {
log.info("doOnNext : " + v);
})
.subscribe();
Flux.range(1,3)
.mapNotNull(v -> {
if(v % 2 == 0) return v;
return null;
})
.doOnNext(v -> {
log.info("doOnNext : " + v);
})
.subscribe();
}
}
출력결과
10:24:21.178 [main] INFO io...various_functions.Map_MapNotNull_Example -- doOnNext : 100
10:24:21.180 [main] INFO io...various_functions.Map_MapNotNull_Example -- doOnNext : 200
10:24:21.180 [main] INFO io...various_functions.Map_MapNotNull_Example -- doOnNext : 300
10:24:21.183 [main] INFO io...various_functions.Map_MapNotNull_Example -- doOnNext : 2
Process finished with exit code 0
doOn---()
onComplete, onNext, onSubscribe, onError 와 같은 이벤트에 대한 메서드로 doOnComplete, doOnNext, doOnSubscribe, doOnError 가 있습니다. 이 함수들은 이벤트의 흐름에 영향을 주지 않고 원하는 작업을 추가할 수 있습니다.
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class DoOnFunctions_Example {
public static void main(String[] args) {
Flux.range(1,3)
.map(v -> v*2)
.doOnNext(v -> {
log.info("doOnNext : " + v);
})
.doOnComplete(() -> {
log.info("doOnComplete");
})
.doOnSubscribe(subscription -> {
log.info("doOnSubscribe");
})
.doOnRequest(v -> {
log.info("doOnRequest : " + v);
})
.map(v -> v/2)
.subscribe();
}
}
출력결과
10:31:38.016 [main] INFO io...spring_webflux.various_functions.DoOnFunctions_Example -- doOnSubscribe
10:31:38.020 [main] INFO io...spring_webflux.various_functions.DoOnFunctions_Example -- doOnRequest : 9223372036854775807
10:31:38.020 [main] INFO io...spring_webflux.various_functions.DoOnFunctions_Example -- doOnNext : 2
10:31:38.020 [main] INFO io...spring_webflux.various_functions.DoOnFunctions_Example -- doOnNext : 4
10:31:38.020 [main] INFO io...spring_webflux.various_functions.DoOnFunctions_Example -- doOnNext : 6
10:31:38.020 [main] INFO io...spring_webflux.various_functions.DoOnFunctions_Example -- doOnComplete
Process finished with exit code 0
flatMap() (중요)
여러개의 Publisher 를 조합할 때 사용됩니다. onNext 이벤트를 받아서 Publisher 를 반환합니다.
package io.chagchagchag.example.foobar.spring_webflux.various_functions;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
@Slf4j
public class FlatMap_Example {
@SneakyThrows
public static void main(String[] args) {
Flux.range(1,3)
.flatMap(v1 -> {
return Flux.range(4,3)
.map(v2 -> v1 + ", " + v2)
.publishOn(Schedulers.parallel());
})
.doOnNext(v -> {
log.info("doOnNext : " + v);
})
.subscribe();
Thread.sleep(1000);
}
}
출력결과
11:03:21.028 [parallel-1] INFO io...various_functions.FlatMap_Example -- doOnNext : 1, 4
11:03:21.029 [parallel-1] INFO io...various_functions.FlatMap_Example -- doOnNext : 1, 5
11:03:21.030 [parallel-1] INFO io...various_functions.FlatMap_Example -- doOnNext : 1, 6
11:03:21.030 [parallel-1] INFO io...various_functions.FlatMap_Example -- doOnNext : 2, 4
11:03:21.030 [parallel-1] INFO io...various_functions.FlatMap_Example -- doOnNext : 2, 5
11:03:21.030 [parallel-1] INFO io...various_functions.FlatMap_Example -- doOnNext : 2, 6
11:03:21.030 [parallel-1] INFO io...various_functions.FlatMap_Example -- doOnNext : 3, 4
11:03:21.030 [parallel-1] INFO io...various_functions.FlatMap_Example -- doOnNext : 3, 5
11:03:21.030 [parallel-1] INFO io...various_functions.FlatMap_Example -- doOnNext : 3, 6
Process finished with exit code 0
filter()
Stream 에서의 filter 와 유사한 연산을 합니다.
package io.chagchagchag.example.foobar.spring_webflux.various_functions;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class Filter_Example {
public static void main(String[] args) {
Flux.range(1,30)
.filter(v -> v%3 == 0)
.doOnNext(v -> {
log.info("doOnNext : " + v);
})
.subscribe();
}
}
출력결과
11:21:19.135 [main] INFO io...various_functions.Filter_Example -- doOnNext : 3
11:21:19.137 [main] INFO io...various_functions.Filter_Example -- doOnNext : 6
11:21:19.137 [main] INFO io...various_functions.Filter_Example -- doOnNext : 9
11:21:19.137 [main] INFO io...various_functions.Filter_Example -- doOnNext : 12
11:21:19.137 [main] INFO io...various_functions.Filter_Example -- doOnNext : 15
11:21:19.137 [main] INFO io...various_functions.Filter_Example -- doOnNext : 18
11:21:19.137 [main] INFO io...various_functions.Filter_Example -- doOnNext : 21
11:21:19.137 [main] INFO io...various_functions.Filter_Example -- doOnNext : 24
11:21:19.137 [main] INFO io...various_functions.Filter_Example -- doOnNext : 27
11:21:19.137 [main] INFO io...various_functions.Filter_Example -- doOnNext : 30
Process finished with exit code 0
take(n), takeLast(n)
take(n)
- 대략 n 개 까지 onNext 이벤트를 전파합니다. n개에 도달하면 onComplete 이벤트가 발생합니다.
takeLast(n)
- onComplete 이벤트가 발생하기 직전의 n개의 아이템만 onNext 이벤트를 전파합니다.
package io.chagchagchag.example.foobar.spring_webflux.various_functions;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class Take_TakeLast_Example {
@SneakyThrows
public static void main(String[] args) {
Flux.range(1,100000)
.take(5)
.doOnNext(v -> {
log.info("taken item = " + v);
})
.subscribe();
Flux.range(1, 100)
.takeLast(10)
.doOnNext(v -> {
log.info("taken item = " + v);
})
.subscribe();
Thread.sleep(2000);
}
}
출력결과
11:26:56.569 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 1
11:26:56.572 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 2
11:26:56.572 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 3
11:26:56.572 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 4
11:26:56.572 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 5
11:26:56.575 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 91
11:26:56.575 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 92
11:26:56.575 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 93
11:26:56.575 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 94
11:26:56.575 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 95
11:26:56.575 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 96
11:26:56.575 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 97
11:26:56.575 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 98
11:26:56.575 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 99
11:26:56.575 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 100
Process finished with exit code 0
skip(), skipLast()
package io.chagchagchag.example.foobar.spring_webflux.various_functions;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class Skip_SkipLast_Example {
@SneakyThrows
public static void main(String[] args) {
// 처음부터 n 번째까지를 스킵한 결과를 순회
Flux.range(1, 10)
.skip(3)
.doOnNext(v -> {
log.info("skip 하지 않은 요소 : " + v);
})
.subscribe();
// 맨 끝에서부터 n 개를 skip 한 결과를 순회
Flux.range(1, 1000)
.skipLast(997)
.doOnNext(v -> {
log.info("skip 하지 않은 요소 " + v);
})
.subscribe();
Thread.sleep(2000);
}
}
출력결과
11:32:33.028 [main] INFO io...various_functions.Skip_SkipLast_Example -- skip 하지 않은 요소 : 4
11:32:33.030 [main] INFO io...various_functions.Skip_SkipLast_Example -- skip 하지 않은 요소 : 5
11:32:33.030 [main] INFO io...various_functions.Skip_SkipLast_Example -- skip 하지 않은 요소 : 6
11:32:33.030 [main] INFO io...various_functions.Skip_SkipLast_Example -- skip 하지 않은 요소 : 7
11:32:33.030 [main] INFO io...various_functions.Skip_SkipLast_Example -- skip 하지 않은 요소 : 8
11:32:33.030 [main] INFO io...various_functions.Skip_SkipLast_Example -- skip 하지 않은 요소 : 9
11:32:33.030 [main] INFO io...various_functions.Skip_SkipLast_Example -- skip 하지 않은 요소 : 10
11:32:33.032 [main] INFO io...various_functions.Skip_SkipLast_Example -- skip 하지 않은 요소 1
11:32:33.032 [main] INFO io...various_functions.Skip_SkipLast_Example -- skip 하지 않은 요소 2
11:32:33.032 [main] INFO io...various_functions.Skip_SkipLast_Example -- skip 하지 않은 요소 3
Process finished with exit code 0
collectList()
onComplete 이벤트가 발생하기 전까지 내부에 item 을 저장해둡니다. 그리고 시퀀스가 완료(onComplete)될 때 Mono 에 의해 방출되는 List 로 수집합니다.
시퀀스가 비어있으면 비어있는 List 가 방출됩니다.
package io.chagchagchag.example.foobar.spring_webflux.various_functions;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class CollectList_Example {
public static void main(String[] args) {
Flux.range(1, 10)
.doOnNext(v -> {
log.info("(before) doOnNext >> " + v);
})
.collectList()
.doOnNext(v -> {
log.info("(after) doOnNext >> " + v);
})
.subscribe();
}
}
출력결과
12:16:45.442 [main] INFO io...various_functions.CollectList_Example -- (before) doOnNext >> 1
12:16:45.444 [main] INFO io...various_functions.CollectList_Example -- (before) doOnNext >> 2
12:16:45.444 [main] INFO io...various_functions.CollectList_Example -- (before) doOnNext >> 3
12:16:45.444 [main] INFO io...various_functions.CollectList_Example -- (before) doOnNext >> 4
12:16:45.444 [main] INFO io...various_functions.CollectList_Example -- (before) doOnNext >> 5
12:16:45.444 [main] INFO io...various_functions.CollectList_Example -- (before) doOnNext >> 6
12:16:45.444 [main] INFO io...various_functions.CollectList_Example -- (before) doOnNext >> 7
12:16:45.444 [main] INFO io...various_functions.CollectList_Example -- (before) doOnNext >> 8
12:16:45.444 [main] INFO io...various_functions.CollectList_Example -- (before) doOnNext >> 9
12:16:45.444 [main] INFO io...various_functions.CollectList_Example -- (before) doOnNext >> 10
12:16:45.444 [main] INFO io...various_functions.CollectList_Example -- (after) doOnNext >> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Process finished with exit code 0
cache()
처음 subscribe 시에만 publisher 를 실행하고, 이후 subscribe 시에는 저장된 이벤트들을 흘려보내줍니다.
package io.chagchagchag.example.foobar.spring_webflux.various_functions;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class CacheExample {
public static void main(String[] args) {
// (1)
Flux<Object> flux = Flux.create(fluxSink -> {
for(int i=0; i<3; i++){
fluxSink.next(i);
log.info("(created) " + i);
}
log.info("sink next 완료 (at Publisher)");
fluxSink.complete();
}).cache();
// (2)
flux.subscribe(
v -> {log.info("v ::: " + v);},
null,
() -> {log.info("complete");}
);
// (3)
flux.subscribe(
v -> {log.info("v ::: " + v);},
null,
() -> {log.info("complete");}
);
}
}
(1)
- (1) 에서 Flux.create 를 수행한 것은 subscribe 를 하지 않으면 아무 일도 일어나지 않습니다.
- (1) 의 구문은 단순한
식(Statement)
일 뿐이고, Subscribe 할 때 Publisher 의 구독이 시작됩니다.
(2)
- (1) 을 subscribe() 해서 첫번째 Subscribe 를 수행하는데, 이때 Flux 에 의해 Publisher 의 아이템들이 생성됩니다.
- (1) 에서 Flux.create() 한 후 마무리로 cache() 를 했기 때문에 내부에 결과가 저장됩니다.
(3)
- (1) 을 subscribe() 해서 Flux 를 새로 create 하지 않고, (2) 에서 저장해둔 결과를 그대로 읽어들입니다.
- 따라서 Sink 이벤트 없이 구독만 하기에 onNext, onComplete 이벤트만 발생합니다.
출력결과
12:31:10.022 [main] INFO io...various_functions.CacheExample -- v ::: 0
12:31:10.026 [main] INFO io...various_functions.CacheExample -- (created) 0
12:31:10.027 [main] INFO io...various_functions.CacheExample -- v ::: 1
12:31:10.027 [main] INFO io...various_functions.CacheExample -- (created) 1
12:31:10.027 [main] INFO io...various_functions.CacheExample -- v ::: 2
12:31:10.027 [main] INFO io...various_functions.CacheExample -- (created) 2
12:31:10.027 [main] INFO io...various_functions.CacheExample -- sink next 완료 (at Publisher)
12:31:10.027 [main] INFO io...various_functions.CacheExample -- complete
12:31:10.028 [main] INFO io...various_functions.CacheExample -- v ::: 0
12:31:10.028 [main] INFO io...various_functions.CacheExample -- v ::: 1
12:31:10.028 [main] INFO io...various_functions.CacheExample -- v ::: 2
12:31:10.028 [main] INFO io...various_functions.CacheExample -- complete
Process finished with exit code 0