Reactor 프로그래밍 (스압 주의)

Reactor 프로그래밍 (스압 주의)

Spring Webflux 는 pivotal 사에서 개발한 project reactor 를 기반으로 한 서버 애플리케이션을 개발하기 위한 Web 프레임워크입니다.

Flux, Mono

참고


  • Flux :
    • Java 의 List 와 유사한 데이터의 흐름입니다. 두개 이상의 데이터의 흐름입니다.
    • projectreactor.io - Flux (opens in a new tab) 을 보면 알 수 있듯 Publisher 를 implements 한 클래스이기에 자료구조라기보다는, 데이터의 흐름이라는 사실을 기억해주시기 바랍니다.
  • Mono :
    • 하나만 존재하는 데이터를 의미합니다.
    • projectreactor.io - Mono (opens in a new tab) 을 보면 알 수 있듯 Publisher 를 implements 한 클래스이기에 자료구조라기보다는 데이터의 흐름이라는 사실을 기억해주시기 바랍니다.

Flux 하나만 쓰면 될것 같은데 Mono 가 존재하는 이유에 대해 생각이 들 수 있습니다. Mono 라는 자료형이 존재하는 것으로 인한 장점은 아래와 같습니다.

  • 하나만 존재하는 데이터의 흐름(Response, Count 결과값 등) 일 경우 onNext 이후에 바로 onComplete 를 하면 되기 때문에 구현이 더 명확해지게 됩니다.
  • Mono 라는 Publisher 를 받는 Subscriber 측 역시 1개의 요소만 처리해야 한다는 사실을 알 수 있기 때문에 조금 더 구현이 명확해집니다.

sequence

Mono,Flux 를 이용해서 다양한 데이터의 흐름을 만들어봅니다. Mono, Flux 를 통해서 만들어내는 데이터의 흐름을 일반적으로 sequence 라고 부릅니다.


just

Mono.just(), Flux.just() 메서드를 사용하면 시퀀스를 생성할 수 있습니다. 자세한 내용은 예제와 출력결과를 확인해주시기 바랍니다.

JustExample.java

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
 
@Slf4j
public class JustExample {
  public static void main(String[] args) {
    Mono.just("안녕하세요")
        .subscribe(v -> {
          log.info(">>> {}", v);
        });
 
    Flux.just("MSFT", "NVDA", "SMCI")
        .subscribe(v -> {
          log.info(">>> {}", v);
        });
  }
}

출력결과

15:18:07.066 [main] INFO io...sequence.JustExample -- >>> 안녕하세요
15:18:07.228 [main] INFO io...sequence.JustExample -- >>> MSFT
15:18:07.228 [main] INFO io...sequence.JustExample -- >>> NVDA
15:18:07.228 [main] INFO io...sequence.JustExample -- >>> SMCI

Process finished with exit code 0

error

Mono.error, Flux.error 를 사용하면 subscriber 에게 onError 이벤트를 전달할 수 있습니다.

반드시 **"subscriber 에게 onError 이벤트를 전달한다."**라는 의미를 꼭 기억해주셨으면 합니다.

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
 
@Slf4j
public class ErrorExample {
 
  public static void main(String[] args) {
    Mono.error(new IllegalArgumentException("어머, 에러에요"))
        .subscribe(
            v -> {log.info("v ::: " + v);},
            error -> {log.error("error ::: " + error);}
        );
 
    Flux.error(new IllegalArgumentException("어머, 에러에요"))
        .subscribe(
            v -> {log.info("value ::: " + v);},
            error -> {log.error("error ::: " + error);}
        );
  }
}
 

출력결과

15:27:13.589 [main] ERROR io...sequence.ErrorExample -- error ::: java.lang.IllegalArgumentException: 어머, 에러에요
15:27:13.699 [main] ERROR io...sequence.ErrorExample -- error ::: java.lang.IllegalArgumentException: 어머, 에러에요

Process finished with exit code 0

empty

Mono.empty, Flux.empty 를 써보는 예제입니다. empty 를 사용하면 Mono 또는 Flux 에게 onComplete 이벤트만 전달되게 됩니다.

package io...sequence;
 
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
 
@Slf4j
public class EmptyExample {
  public static void main(String[] args) {
    Mono.empty()
        .subscribe(
            v -> {log.info("value === " +v);},
            null,
            () -> {log.info("complete");}
        );
 
    Flux.empty()
        .subscribe(
            v -> {log.info("value === " + v);},
            null,
            () -> {log.info("complete");}
        );
  }
}

출력결과

15:31:46.735 [main] INFO io...sequence.EmptyExample -- complete
15:31:46.792 [main] INFO io...sequence.EmptyExample -- complete

Process finished with exit code 0

Mono.fromOOO()

Mono.from 으로 시작하는 함수들은 아래와 같습니다.

  • Mono.fromCallable
    • Callable 함수형 인터페이스를 람다로 실행 후 반환값은 onNext 로 전달됩니다.
  • Mono.formFuture
    • Future 를 받아서 done 상태가 되었을 때 반환값을 onNext 로 Subscriber 에 전달합니다.
  • Mono.fromSupplier
    • Supplier 함수형 인터페이스를 람다로 실행 후 반환값은 onNext 로 Subscriber 에 전달합니다.
  • Mono.fromRunnable
    • Runnable 함수형 인터페이스를 람다로 실행 후, onComplete 를 Subscriber 에 전달합니다.

import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
 
@Slf4j
public class MonoFromExample {
  public static void main(String[] args) {
    Mono.fromCallable(() -> {
      return "삼성전자";
    }).subscribe(v -> {
      log.info("fromCallable 에서 받은 value === " + v);
    });
 
    Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
      return "삼성전자";
    })).subscribe(v -> {
      log.info("fromFuture 로부터 받은 value === " + v);
    });
 
    Mono.fromSupplier(() -> {
      return "삼성전자";
    }).subscribe(v -> {
      log.info("fromSupplier 로부터 받은 value === " + v);
    });
 
    Mono.fromRunnable(() -> {
      log.info("do some runnable");
    }).subscribe(
        null, null, () -> {log.info("fromRunnable Complete.");}
    );
  }
}

출력결과

15:45:03.043 [main] INFO io...sequence.MonoFromExample -- fromCallable 에서 받은 value === 삼성전자
15:45:03.049 [main] INFO io...sequence.MonoFromExample -- fromFuture 로부터 받은 value === 삼성전자
15:45:03.051 [main] INFO io...sequence.MonoFromExample -- fromSupplier 로부터 받은 value === 삼성전자
15:45:03.052 [main] INFO io...sequence.MonoFromExample -- do some runnable
15:45:03.052 [main] INFO io...sequence.MonoFromExample -- fromRunnable Complete.

Process finished with exit code 0

Flux.fromOOO()

Flux.from 으로 시작하는 함수들은 아래와 같습니다.

  • Flux.fromIterable
  • Flux.fromStream
  • Flux.fromArray
  • Flux.range(start, n)
import java.util.List;
import java.util.stream.IntStream;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class FluxFromExample {
  public static void main(String[] args) {
    Flux.fromIterable(List.of("MSFT", "NVDA", "SMCI"))
        .subscribe(v -> {log.info("value ==> " + v);});
 
    Flux.fromStream(IntStream.range(1,10).boxed())
        .subscribe(v -> {log.info("value ==> " + v);});
 
    Flux.fromArray(new Integer[]{1,2,3,4,5,6,7,8,9,10})
        .subscribe(v -> {log.info("value ==> " + v);});
 
    Flux.range(1,10)
        .subscribe(v -> {log.info("value ==> " + v);});
  }
}

출력결과

15:53:24.737 [main] INFO io...sequence.FluxFromExample -- value ==> MSFT
15:53:24.740 [main] INFO io...sequence.FluxFromExample -- value ==> NVDA
15:53:24.740 [main] INFO io...sequence.FluxFromExample -- value ==> SMCI
15:53:24.746 [main] INFO io...sequence.FluxFromExample -- value ==> 1
15:53:24.746 [main] INFO io...sequence.FluxFromExample -- value ==> 2
15:53:24.746 [main] INFO io...sequence.FluxFromExample -- value ==> 3
15:53:24.747 [main] INFO io...sequence.FluxFromExample -- value ==> 4
15:53:24.747 [main] INFO io...sequence.FluxFromExample -- value ==> 5
15:53:24.747 [main] INFO io...sequence.FluxFromExample -- value ==> 6
15:53:24.747 [main] INFO io...sequence.FluxFromExample -- value ==> 7
15:53:24.747 [main] INFO io...sequence.FluxFromExample -- value ==> 8
15:53:24.747 [main] INFO io...sequence.FluxFromExample -- value ==> 9
15:53:24.749 [main] INFO io...sequence.FluxFromExample -- value ==> 1
15:53:24.749 [main] INFO io...sequence.FluxFromExample -- value ==> 2
15:53:24.749 [main] INFO io...sequence.FluxFromExample -- value ==> 3
15:53:24.749 [main] INFO io...sequence.FluxFromExample -- value ==> 4
15:53:24.749 [main] INFO io...sequence.FluxFromExample -- value ==> 5
15:53:24.749 [main] INFO io...sequence.FluxFromExample -- value ==> 6
15:53:24.749 [main] INFO io...sequence.FluxFromExample -- value ==> 7
15:53:24.749 [main] INFO io...sequence.FluxFromExample -- value ==> 8
15:53:24.749 [main] INFO io...sequence.FluxFromExample -- value ==> 9
15:53:24.749 [main] INFO io...sequence.FluxFromExample -- value ==> 10
15:53:24.751 [main] INFO io...sequence.FluxFromExample -- value ==> 1
15:53:24.751 [main] INFO io...sequence.FluxFromExample -- value ==> 2
15:53:24.751 [main] INFO io...sequence.FluxFromExample -- value ==> 3
15:53:24.751 [main] INFO io...sequence.FluxFromExample -- value ==> 4
15:53:24.751 [main] INFO io...sequence.FluxFromExample -- value ==> 5
15:53:24.751 [main] INFO io...sequence.FluxFromExample -- value ==> 6
15:53:24.751 [main] INFO io...sequence.FluxFromExample -- value ==> 7
15:53:24.751 [main] INFO io...sequence.FluxFromExample -- value ==> 8
15:53:24.751 [main] INFO io...sequence.FluxFromExample -- value ==> 9
15:53:24.751 [main] INFO io...sequence.FluxFromExample -- value ==> 10

Process finished with exit code 0

Flux.generate

generate 는 동기적으로 Flux 를 생성합니다.

Flux.java(github) (opens in a new tab) 내에 선언된 generate() 메서드는 세 종류의 메서드가 있는데 그 중 한 가지는 아래와 같습니다.

public abstract class Flux<T> implements CorePublisher<T>{
    // ...
    public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) {
		return onAssembly(new FluxGenerate<>(stateSupplier, generator));
	}
    // ...
 
}

stateSupplier

  • 초기값을 제공하는 역할의 callable 입니다.

generator

  • 첫번째 인자로는 state 를 제공합니다. 그리고 이 state 에 대해 변경된 state 를 반환합니다.
  • 두번째 인자로 SynchronousSink 를 제공합니다. 이 sink 객체의 next, error, complete 메서드를 이용해서 Subscriber 에게 onNext, onError, onComplete 이벤트를 전달하는 것이 가능합니다.
  • 한번의 generator 에서는 최대 한번만 next 를 호출 가능합니다.

1) sink.next()

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class FluxGenerateExample1 {
  public static void main(String[] args) {
    Flux.generate(
        () -> 0,
        (state, sink) -> {
          sink.next(state);
          if(state == 9) sink.complete();
          return state + 1;
        }
    ).subscribe(
        v -> {log.info("value === " + v);},
        error -> {log.error("error === " + error);},
        () -> {log.info("complete !!");}
    );
  }
}

초기값은 0 으로 세팅해줬습니다. 매 순간 다음 스텝의 연산을 수행하는 것은 state + 1 을 통해서 다음 스텝의 연산을 합니다. 그리고 state 가 9 가 되었을 때는 sink 객체의 complete() 메서드를 호출해서 onComplete 신호를 방출합니다.

자세히 보면 FSM, CPS 연산과 닮아있음을 알 수 있습니다.


출력결과

16:07:48.178 [main] INFO io...sequence.FluxGenerateExample1 -- value === 0
16:07:48.180 [main] INFO io...sequence.FluxGenerateExample1 -- value === 1
16:07:48.180 [main] INFO io...sequence.FluxGenerateExample1 -- value === 2
16:07:48.180 [main] INFO io...sequence.FluxGenerateExample1 -- value === 3
16:07:48.180 [main] INFO io...sequence.FluxGenerateExample1 -- value === 4
16:07:48.180 [main] INFO io...sequence.FluxGenerateExample1 -- value === 5
16:07:48.180 [main] INFO io...sequence.FluxGenerateExample1 -- value === 6
16:07:48.180 [main] INFO io...sequence.FluxGenerateExample1 -- value === 7
16:07:48.180 [main] INFO io...sequence.FluxGenerateExample1 -- value === 8
16:07:48.180 [main] INFO io...sequence.FluxGenerateExample1 -- value === 9
16:07:48.180 [main] INFO io...sequence.FluxGenerateExample1 -- complete !!

Process finished with exit code 0

2) sink.next()를 두번 호출하면? 에러발생

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class FluxGenerateExample2 {
  public static void main(String[] args) {
    Flux.generate(
        () -> 0,
        (state, sink) -> {
          sink.next(state);
          sink.next(state);
          if(state == 9) sink.complete();
 
          return state + 1;
        }
    ).subscribe(
        v -> { log.info("value === " + v); },
        error -> { log.error("error === " + error); },
        () -> { log.info("complete"); }
    );
  }
}

generate 에서 sink 객체를 이용해서 next() 함수를 통해 onNext 이벤트를 두번 호출해보면 에러가 나는 것을 확인 가능합니다.

출력결과

16:14:41.992 [main] INFO io.....sequence.FluxGenerateExample2 -- value === 0
16:14:41.994 [main] ERROR io.chagchagchag...sequence.FluxGenerateExample2 -- error === java.lang.IllegalStateException: More than one call to onNext

Process finished with exit code 0

Flux.create()

Flux.java(github) (opens in a new tab) 내에 선언된 create() 메서드는 아래와 같습니다.

public abstract class Flux<T> implements CorePublisher<T>{
    // ...
    public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) {
	    return create(emitter, OverflowStrategy.BUFFER);
    }
    // ...
}

Flux.craete() 는 Flux 를 비동기적으로 생성합니다. 중간에 있는 인자를 보면 FluxSink 가 Consumer 의 인자로 되는 것을 알 수 있습니다. 이 FluxSink 를 이용해서 next(), error(), complete() 메서드를 호출해서 Subscriber 에게 onNext, onError, onComplete 신호를 발생시킬 수 있습니다.

위에서 살펴본 Flux.generate() 에서 사용하던 SynchronousSink 와는 다르게 여러번 next() 를 호출하는 것이 가능합니다. 그리고 Flux.create() 는 여러 스레드에서 동시에 호출가능합니다.

예제

import java.util.concurrent.CompletableFuture;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class FluxCreateExample {
  @SneakyThrows
  public static void main(String[] args) {
    Flux.create(fluxSink -> {
      var task1 = CompletableFuture.runAsync(() -> {
        for (int i=0; i<5; i++){
          fluxSink.next(i);
        }
      });
 
      var task2 = CompletableFuture.runAsync(() -> {
        for (int i=5; i<10; i++){
          fluxSink.next(i);
        }
      });
 
      CompletableFuture.allOf(task1, task2)
          .thenRun(fluxSink::complete);
    }).subscribe(
        v -> {log.info("value === " + v);},
        error -> {log.error("error === " + error);},
        () -> {log.info("complete");}
    );
 
    Thread.sleep(1000);
  }
}

0~4 까지의 숫자를 sink.next 하는 스레드, 5 ~ 9 까지의 숫자를 sink.next 하는 스레드 이렇게 두개의 스레드를 이용해서 sink.next 를 수행하며, CompletableFuture.allOf 를 통해서 두 개의 CompletableFuture 객체 task1, task2 작업이 끝난 시점에 fluxSink 객체를 이용해서 Subscriber 에게 complete 신호를 내보냅니다.

출력결과

16:24:55.472 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- value === 0
16:24:55.474 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- value === 5
16:24:55.474 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- value === 6
16:24:55.474 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- value === 7
16:24:55.474 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- value === 8
16:24:55.474 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- value === 9
16:24:55.474 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- value === 1
16:24:55.474 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- value === 2
16:24:55.474 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- value === 3
16:24:55.474 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- value === 4
16:24:55.475 [ForkJoinPool.commonPool-worker-1] INFO io.....sequence.FluxCreateExample -- complete

Process finished with exit code 0

Flux.handle()

Flux.java(github) (opens in a new tab) 내에 선언된 handle() 메서드는 아래와 같습니다.

public abstract class Flux<T> implements CorePublisher<T>{
    // ...
    public final <R> Flux<R> handle(BiConsumer<? super T, SynchronousSink<R>> handler) {
		if (this instanceof Fuseable) {
			return onAssembly(new FluxHandleFuseable<>(this, handler));
		}
		return onAssembly(new FluxHandle<>(this, handler));
	}
    // ...
}

Flux.handle() 메서드는 현재 존재하는 source 에 대해 handle 작업을 처리하는 함수입니다. 위 코드에서 handler 라는 BiConcumer 의 인자값에 대한 설명은 아래와 같습니다.

? super T

  • source 에서 제공하는 데이터흐름에 대한 각 요소를 의미하는 item 입니다.

SynchronousSink<R>

  • sink 의 next 연산을 이용해서 현재 주어진 item 을 전달할지 말지 결정합니다.

source 의 item 을 필터링 하는 등 interceptor 같은 역할을 하게끔 구현가능합니다.


예제

import java.util.stream.IntStream;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class FluxHandleExample {
  public static void main(String[] args) {
    Flux.fromStream(IntStream.range(0,11).boxed())
        .handle((v, sink) -> {
          if(v%2 == 0) sink.next(v);
        }).subscribe(
            v -> {log.info("value === " + v);},
            error -> {log.error("error === " + error);},
            () -> {log.info("complete");}
        );
  }
}
 

짝수 숫자에 대해서만 다음으로 넘어갈 수 있도록 sink 를 이용해서 next() 함수를 호출해서 Subscriber 에게 onNext 신호를 방출합니다.

출력결과

16:37:34.735 [main] INFO io.....sequence.FluxHandleExample -- value === 0
16:37:34.737 [main] INFO io.....sequence.FluxHandleExample -- value === 2
16:37:34.737 [main] INFO io.....sequence.FluxHandleExample -- value === 4
16:37:34.737 [main] INFO io.....sequence.FluxHandleExample -- value === 6
16:37:34.737 [main] INFO io.....sequence.FluxHandleExample -- value === 8
16:37:34.737 [main] INFO io.....sequence.FluxHandleExample -- value === 10
16:37:34.738 [main] INFO io.....sequence.FluxHandleExample -- complete

Process finished with exit code 0

subscribe(), Subscriber

Flux, Mono 와 같은 Publisher 는 subscribe 를 하지 않으면 아무 일도 일어나지 않습니다. 예를 들면 아래와 같은 코드는 아무 일도 일어나지 않으며 그냥 표현식일 뿐입니다.

Flux.fromIterable(List.of("배고파요", "밥먹어요", "배불러요"))
    .doOnNext(v -> {log.info(">>> " + v);});

subscribe()

예를 들어 Flux 의 코드를 보면, subscribe() 함수는 아래와 같은 메서드들이 있습니다. 오버로딩 된 여러가지 메서드들이 있지만, 가장 대표적인 메서드 들은 아래와 같습니다.

package reactor.core.publisher;
 
// ...
 
public abstract class Flux<T> implements CorePublisher<T> {
    public final Disposable subscribe() {
		return subscribe(null, null, null);
	}
    
    public final Disposable subscribe(
			@Nullable Consumer<? super T> consumer,
			@Nullable Consumer<? super Throwable> errorConsumer,
			@Nullable Runnable completeConsumer,
			@Nullable Context initialContext) {
		return subscribeWith(new LambdaSubscriber<>(consumer, errorConsumer,
				completeConsumer,
				null,
				initialContext));
	}
    
    @Override
	@SuppressWarnings("unchecked")
	public final void subscribe(Subscriber<? super T> actual) {
		// ...
	}
}

  • subscribe()
    • 별도의 Consumer 를 전달 받지 않은 채로 subscribe 합니다.
  • subscribe (consumer, errorConsumer, completeConsumer, initialContext)
    • 함수형 인터페이스를 이용해서 subscribe 를 합니다. Disposable 을 반환해서 이 Disposable 을 이용해서 언제든지 연결을 종료하는 것이 가능합니다.
  • subscribe (Subscriber)
    • 별도의 Subscriber 를 전달받아서 subscribe 합니다.
    • 예를 들면 BaseSubscriber (opens in a new tab) 같은 Subscriber 를 인자로 전달하거나, 비즈니스 로직에 따라 특수하게 BaseSubscriber (opens in a new tab) 를 별도로 확장(extends)한 커스텀한 Subscriber 를 넘겨줘서 subscribe 하는 경우도 있습니다.
    • BaseSubscriber (opens in a new tab) 는 project reactor 에서 제공하는 배압관리를 위해 필요한 것들이 잘 갖춰져 있는 클래스입니다.
    • 이때 Subscriber 는 subscription 을 Publisher 로부터 전달 받습니다. 그리고 이 subscription 객체를 이용해서 request를 통해서 backpressure 를 조절하거나, cancel 을 통해서 연결 종료할 수 있습니다.
    • Subscriber 를 직접 작성할 경우에 어떻게 작성하는지 궁금하시다면 Publisher, Subscriber, Supscription, Backpressure 문서 (opens in a new tab) 를 참고해주시기 바랍니다.

subscribe()

Publiser 에서 데이터의 흐름을 만들긴 하지만, 이 데이터들을 Subscriber 측에서 받아서 별도의 처리를 할 필요가 없이 뒷단에서만 동작하게끔 해야 할 경우 단순한 형태의 subscribe() 메서드를 사용합니다.

@Slf4j
public class Subscribe1Example {
  public static void main(String[] args) {
    Flux.fromIterable(List.of("배고파요", "밥먹어요", "배불러요"))
        .doOnNext(v -> { save(v);})
        .subscribe();
  }
 
  public static void save(String s){
    log.info("SAVE DATA >>> " + s);
  }
}

doOnNext()

  • 결과를 확인하고 싶을 때나, 지나가는 값들을 통해서 별도의 작업을 확인하고 싶을 때 사용합니다.

출력결과

17:26:42.596 [main] INFO io.....subscribe.Subscribe1Example -- SAVE DATA >>> 배고파요
17:26:42.598 [main] INFO io.....subscribe.Subscribe1Example -- SAVE DATA >>> 밥먹어요
17:26:42.598 [main] INFO io.....subscribe.Subscribe1Example -- SAVE DATA >>> 배불러요

Process finished with exit code 0

subscribe (consumer, errorConsumer, completeConsumer, initialContext)

정상일때, 에러일때, onComplete 일때 어떤것을 할지, initialContext 를 정의합니다.

import java.util.List;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.util.context.Context;
 
@Slf4j
public class Subscribe2Example {
  public static void main(String[] args) {
    Flux.fromIterable(List.of("배고파요", "밥먹어요", "배불러요"))
        .subscribe(
            v -> {log.info("value === " + v);},
            error -> {log.error("error === " + error);},
            () -> {log.info("complete");},
            Context.empty()
        );
  }
}
 

consumer

  • 인자값을 받아서 해야 할 일을 Consumer 함수형 인터페이스에 맞게 구현합니다.

errorConsumer

  • errorConsumer 에는 에러를 인자로 받아서 처리하는 Consumer 를 넘겨주면 됩니다.

completeConsumer

  • onComplete 신호가 발생했을 때 실행할 Runnable 인터페이스의 람다 바디를 이곳에 정의해줍니다.

initialContext

  • upstream 에 전달할 context 입니다.

출력결과

17:31:30.686 [main] INFO io.....subscribe.Subscribe2Example -- value === 배고파요
17:31:30.688 [main] INFO io.....subscribe.Subscribe2Example -- value === 밥먹어요
17:31:30.688 [main] INFO io.....subscribe.Subscribe2Example -- value === 배불러요
17:31:30.689 [main] INFO io.....subscribe.Subscribe2Example -- complete

Process finished with exit code 0

subscribe (Subscriber)

Publisher 가 Subscription 에 어떤 Subscriber 를 등록할지를 직접 지정합니다.

package io.chagchagchag...subscribe;
 
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
 
@Slf4j
public class Subscribe3Example {
  public static void main(String[] args) {
    var subscriber = new BaseSubscriber<String>(){
      @Override
      protected void hookOnNext(String value) {
        log.info("value === " + value);
      }
 
      @Override
      protected void hookOnComplete() {
        log.info("complte");
      }
    };
 
    Flux.fromIterable(List.of("배고파요", "밥먹어요", "배불러요"))
        .subscribe(subscriber);
  }
}

출력결과

18:47:08.696 [main] INFO io.....subscribe.Subscribe3Example -- value === 배고파요
18:47:08.699 [main] INFO io.....subscribe.Subscribe3Example -- value === 밥먹어요
18:47:08.699 [main] INFO io.....subscribe.Subscribe3Example -- value === 배불러요
18:47:08.699 [main] INFO io.....subscribe.Subscribe3Example -- complte

Process finished with exit code 0

BaseSubscriber

예제에서 사용한 BaseSubscriber (opens in a new tab) 는 projectreactor 에서 제공하는 BaseSubscriber (opens in a new tab) 를 사용했습니다.

BaseSubscriber 내의 hookOnNext(), hookOnComplete(), hookOnError(), hookOnSubscribe() 를 override 해서 cancel, request 를 별도로 호출할 수 있도록 정의할 수 있습니다.


backpressure 관리함수 : request(n)

Publisher 에게 "아이템을 n 만큼씩 주세요" 하면서 그 속도를 조절하는 함수입니다.

Subscriber 외부에서 request(n) 을 통해 요청할 수 있어서 backpressure 의 속도를 조절하는 등의 작업을 할 때 유용하게 쓰입니다.

예를 들어 아래의 코드는 Subscriber interface 에서 제공하는 backpressure 함수인 request () 함수를 이용했습니다. hookOnSubscribe() 에서 request(1) 을 통해서 1만큼 읽어들이겠다고 하는 방식으로 backpressure 관리 코드를 작성했습니다.

import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
 
@Slf4j
public class Subscribe4Example {
  public static void main(String[] args) {
    var subscriber = new BaseSubscriber<String>(){
      @Override
      protected void hookOnSubscribe(Subscription subscription) {
        request(1); // backpressure 함수인 reuest(n) 호출
      }
 
      @Override
      protected void hookOnNext(String value) {
        log.info("value === " + value);
      }
 
      @Override
      protected void hookOnComplete() {
        log.info("complte");
      }
    };
 
    Flux.fromIterable(List.of("배고파요", "밥먹어요", "배불러요"))
        .subscribe(subscriber);
  }
}

출력결과

19:04:26.426 [main] INFO io.....subscribe.Subscribe4Example -- value === 배고파요

Process finished with exit code 0

backpressure 관리함수 : requestUnbounded()

requestUnbounded() 는 request(Long.MAX_VALUE) 로 실행되는데,

Publisher 에게 "가능한 빠르게 아에템을 전달해주세요"와 같은 의미의 요청입니다.

Subscriber 외부에서 request(n) 을 통해 요청할 수 있어서 backpressure 의 속도를 조절하는 등의 작업을 할 때 유용하게 쓰입니다.

request 를 거의 무한대에 가까운 숫자로 요청하기에 가지고 있는 것들을 모두 한번에 보내주세요 라는 의미와 같습니다. 또한 backpressure 의 속도를 조절하지 않고 한번에 요청하므로 거의 backpressure 를 비활성화 한것과 같은 효과를 냅니다.

BaseSubscriber 의 기본 전략은 requestUnbounded() 입니다.


requestUnbounded(), request(Long.MAX_VALUE) 가 발생하는 경우

아래와 같은 경우에 request(Long.MAX_VALUE) 와 같은 요청이 발생됩니다.

  • 아무 인자 없이 사용되는 subscribe() 함수
  • BaseSubscriber 의 hookOnSubscribe() 를 그대로 사용할 경우
  • BaseSubscriber 를 재정의 없이 그냥 사용할 경우
  • block(), blockFirst(), blockLast() 등의 blocking 연산자를 사용할 경우
  • toIterable(), toStream() 등의 toCallect 연산자를 사용할 경우

BasePublisher 는 위에서 살펴봤던 request(n) 과 같은 backpressure 를 관리하는 함수를 조금 더 다양하게 사용할 수 있는 함수를 제공합니다.


buffer(n)

BasePublisher 는 위에서 살펴봤던 request(n) 과 같은 backpressure 를 관리하는 함수를 조금 더 다양하게 사용할 수 있는 함수 인 buffer(n) 을 제공합니다.

import java.util.List;
import java.util.stream.IntStream;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
 
@Slf4j
public class Buffer1Example {
  public static void main(String[] args) {
    var subscriber = new BaseSubscriber<List<Integer>>(){
      @Override
      protected void hookOnSubscribe(Subscription subscription) {
        request(2);
      }
 
      @Override
      protected void hookOnNext(List<Integer> value) {
        log.info("value = " + value);
      }
 
      @Override
      protected void hookOnComplete() {
        log.info("complete");
      }
    };
 
    Flux.fromStream(IntStream.range(0,20).boxed())
        .buffer(3)
        .subscribe(subscriber);
  }
}

(request = 2) x (buffer = 3) = 6 입니다.

한번의 요청에 size 가 3 인 List 를 buffer 로 전달하고 이것을 requet(2) 를 통해 한번의 엘리먼트에서 (3개짜리 리스트) 를 2회 요청하는 효과를 냅니다.

출력결과는 아래와 같습니다.

19:38:07.365 [main] INFO io.....subscribe.Buffer1Example -- value = [0, 1, 2]
19:38:07.367 [main] INFO io.....subscribe.Buffer1Example -- value = [3, 4, 5]

Process finished with exit code 0

take(n, limitRequest)

BasePublisher 는 위에서 살펴봤던 request(n) 과 같은 backpressure 를 관리하는 함수를 조금 더 다양하게 사용할 수 있는 함수인 take(n, limitRequest) 를 제공합니다.

Publisher 에는 데이터가 수천만개가 있을 수 있습니다. 하지만 데이터를 SQL의 limit 문 처럼 원하는 갯수만 구독하려 할 경우에는 take(n, limitRequest) 를 사용합니다.

그런데 take(n, limitRequest) 함수는 n개를 지정했더라도 정확하게 n개만큼 딱 떨어지게 끊지 못하는 경우도 있는데 limitRequest == true 로 설정할 경우에는 정확하게 n개 만큼을 take 할 수 있게 됩니다. 만약 limitRequest == false 일 경우 정확히 n 개를 take 하진 않더라도 그 비슷한 시간 대에 반환됩니다.

import java.util.stream.IntStream;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
 
@Slf4j
public class Subscribe5Example {
  public static void main(String[] args) {
    var subscriber = new BaseSubscriber<Integer>(){
      @Override
      protected void hookOnNext(Integer value) {
        log.info("value == " + value);
      }
 
      @Override
      protected void hookOnComplete() {
        log.info("completed");
      }
    };
 
    Flux.fromStream(IntStream.range(0, 20).boxed())
        .take(11, true)
        .subscribe(subscriber);
  }
}

출력결과

19:54:18.272 [main] INFO io.....subscribe.Subscribe5Example -- value == 0
19:54:18.274 [main] INFO io.....subscribe.Subscribe5Example -- value == 1
19:54:18.274 [main] INFO io.....subscribe.Subscribe5Example -- value == 2
19:54:18.274 [main] INFO io.....subscribe.Subscribe5Example -- value == 3
19:54:18.274 [main] INFO io.....subscribe.Subscribe5Example -- value == 4
19:54:18.274 [main] INFO io.....subscribe.Subscribe5Example -- value == 5
19:54:18.274 [main] INFO io.....subscribe.Subscribe5Example -- value == 6
19:54:18.274 [main] INFO io.....subscribe.Subscribe5Example -- value == 7
19:54:18.274 [main] INFO io.....subscribe.Subscribe5Example -- value == 8
19:54:18.274 [main] INFO io.....subscribe.Subscribe5Example -- value == 9
19:54:18.274 [main] INFO io.....subscribe.Subscribe5Example -- value == 10
19:54:18.275 [main] INFO io.....subscribe.Subscribe5Example -- completed

Process finished with exit code 0

delayElements()

onNext 이벤트 발행 시에 최소 delay 만큼의 간격을 두어서 delay 를 수행합니다. 만약 onNext 이벤트가 발행된 후에 delay 보다 더 늦게 다음 onNext 이벤트가 전달되면 바로 전파됩니다.

출처 : https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#delayElements-java.time.Duration- (opens in a new tab)


e.g 1

for 문 안에서는 10ms 만큼 지연이 발생하고, delayElements 에서는 최소 500ms 만큼 기다려주고 있습니다.

import java.time.Duration;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
 
@Slf4j
public class DelayElementsExample1 {
  @SneakyThrows
  public static void main(String[] args) {
    Flux.create(fluxSink -> {
      for(int i=1; i<=5; i++){
        try{
          Thread.sleep(10);
        }
        catch (InterruptedException e){
          e.printStackTrace();
          throw new IllegalStateException(e);
        }
        fluxSink.next(i);
      }
      fluxSink.complete();
    })
    .delayElements(Duration.ofMillis(500))
    .doOnNext(v -> {
      log.info("doOnNext = " + v);
    })
    .subscribeOn(Schedulers.single())
    .subscribe();
 
    Thread.sleep(5000);
  }
}

출력결과

20:44:50.730 [parallel-1] INFO io.....delay_elements.DelayElementsExample1 -- doOnNext = 1
20:44:51.240 [parallel-2] INFO io.....delay_elements.DelayElementsExample1 -- doOnNext = 2
20:44:51.756 [parallel-3] INFO io.....delay_elements.DelayElementsExample1 -- doOnNext = 3
20:44:52.269 [parallel-4] INFO io.....delay_elements.DelayElementsExample1 -- doOnNext = 4
20:44:52.782 [parallel-5] INFO io.....delay_elements.DelayElementsExample1 -- doOnNext = 5

Process finished with exit code 0

로그에 찍힌 시간을 보면 500ms 마다 한번씩 doOnNext= 로그가 찍혔음을 확인 가능합니다.


e.g 2

이번에는 for 문안에서 1000ms 를 지정합니다. delayElements 에서는 100ms 를 지정했습니다. 결과는 delayElements 의 100ms 는 최소로 기다려야 하는 시간이기에 for loop 의 1000ms 를 거친 후, delayElements 는 어이쿠 최소로 기다려야 하는 시간(100ms)이 넘어버렸네? 하면서 delayElements 는 넘어가게 됩니다.

import java.time.Duration;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
 
@Slf4j
public class DelayElementsExample2 {
  @SneakyThrows
  public static void main(String[] args) {
    Flux.create(fluxSink -> {
          for(int i=1; i<=5; i++){
            try{
              Thread.sleep(1000);
            }
            catch (InterruptedException e){
              e.printStackTrace();
              throw new IllegalStateException(e);
            }
            fluxSink.next(i);
          }
          fluxSink.complete();
        })
        .delayElements(Duration.ofMillis(700))
        .doOnNext(v -> {
          log.info("doOnNext = " + v);
        })
        .subscribeOn(Schedulers.single())
        .subscribe();
 
    Thread.sleep(7000);
  }
}

출력결과

20:50:17.131 [parallel-1] INFO io.....delay_elements.DelayElementsExample2 -- doOnNext = 1
20:50:18.142 [parallel-2] INFO io.....delay_elements.DelayElementsExample2 -- doOnNext = 2
20:50:19.145 [parallel-3] INFO io.....delay_elements.DelayElementsExample2 -- doOnNext = 3
20:50:20.155 [parallel-4] INFO io.....delay_elements.DelayElementsExample2 -- doOnNext = 4
20:50:21.154 [parallel-5] INFO io.....delay_elements.DelayElementsExample2 -- doOnNext = 5

Process finished with exit code 0

concat(), merge(), mergeSequential()

concat()

출처 : https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#concat-java.lang.Iterable- (opens in a new tab)


현재 Publisher 를 다른 Publisher 와 합치는 연산을 수행합니다. 위의 그림을 설명해보면, 바로 앞의 Publisher 가 onComplete 를 전파하면 바로 다음 Publisher 를 subscribe() 합니다.

그림을 보면 Publisher 두개를 합치는 도중에 각각의 Publiser 의 onNext 연산에 대한 순서가 각각의 요소에 대해 지켜지고 있는 것을 확인 가능합니다.

import java.time.Duration;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class ConcatExample1 {
  @SneakyThrows
  public static void main(String[] args) {
    var flux1 = Flux.range(1,3)
        .doOnSubscribe(v -> {
          log.info("doOnSubscribe flux#1");
        })
        .delayElements(Duration.ofMillis(100));
 
    var flux2 = Flux.range(11, 3)
        .doOnSubscribe(v -> {
          log.info("doOnSubscribe flux#2");
        })
        .delayElements(Duration.ofMillis(100));
 
    Flux.concat(flux1, flux2)
        .doOnNext(v -> {log.info("doOnNext " + v);})
        .subscribe();
 
    Thread.sleep(2000);
  }
}

출력결과를 보면 doOnSubscribe flux#1, doOnSubscribe flux#2 를 통해 두개의 Publisher 를 순서대로 받음을 알수 있고, 데이터의 순서 역시 로그를 확인해보면 순서대로 들어온다는 것이 보장됩니다.

출력결과

00:22:07.475 [main] INFO io.....concat_merge_mergesequential.ConcatExample1 -- doOnSubscribe flux#1
00:22:07.596 [parallel-1] INFO io.....concat_merge_mergesequential.ConcatExample1 -- doOnNext 1
00:22:07.704 [parallel-2] INFO io.....concat_merge_mergesequential.ConcatExample1 -- doOnNext 2
00:22:07.814 [parallel-3] INFO io.....concat_merge_mergesequential.ConcatExample1 -- doOnNext 3
00:22:07.814 [parallel-3] INFO io.....concat_merge_mergesequential.ConcatExample1 -- doOnSubscribe flux#2
00:22:07.922 [parallel-4] INFO io.....concat_merge_mergesequential.ConcatExample1 -- doOnNext 11
00:22:08.029 [parallel-5] INFO io.....concat_merge_mergesequential.ConcatExample1 -- doOnNext 12
00:22:08.138 [parallel-6] INFO io.....concat_merge_mergesequential.ConcatExample1 -- doOnNext 13

Process finished with exit code 0

merge()

출처 : https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#merge-org.reactivestreams.Publisher...- (opens in a new tab)


그림을 보면 가로 축 화살표가 두개인 것을 확인 가능합니다. 두개의 Publisher 가 흐르고 있는데, 이 것을 공통 축에 합치고 있습니다.순서는 보장되지 않습니다.


import java.time.Duration;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class MergeExample1 {
  @SneakyThrows
  public static void main(String[] args) {
    var flux1 = Flux.range(1,3)
        .doOnSubscribe(v -> {
          log.info("doOnSubscribe #1");
        })
        .delayElements(Duration.ofMillis(100));
 
    var flux2 = Flux.range(11, 3)
        .doOnSubscribe(v -> {
          log.info("doOnSubscribe #2");
        })
        .delayElements(Duration.ofMillis(100));
 
    Flux.merge(flux1, flux2)
        .doOnNext(v -> {log.info("doOnNext : " + v);})
        .subscribe();
 
    Thread.sleep(2000);
  }
}

출력결과를 보면 doOnSubscribe#1, doOnSubscribe#2 가 근소한 차이로 시작되었고, 각 요소들의 순서도 지켜지지 않은 것을 확인 가능합니다.

출력결과

00:37:08.038 [main] INFO io.....concat_merge_mergesequential.MergeExample1 -- doOnSubscribe #1
00:37:08.048 [main] INFO io.....concat_merge_mergesequential.MergeExample1 -- doOnSubscribe #2
00:37:08.157 [parallel-2] INFO io.....concat_merge_mergesequential.MergeExample1 -- doOnNext : 11
00:37:08.161 [parallel-1] INFO io.....concat_merge_mergesequential.MergeExample1 -- doOnNext : 1
00:37:08.267 [parallel-4] INFO io.....concat_merge_mergesequential.MergeExample1 -- doOnNext : 2
00:37:08.267 [parallel-4] INFO io.....concat_merge_mergesequential.MergeExample1 -- doOnNext : 12
00:37:08.377 [parallel-5] INFO io.....concat_merge_mergesequential.MergeExample1 -- doOnNext : 13
00:37:08.377 [parallel-5] INFO io.....concat_merge_mergesequential.MergeExample1 -- doOnNext : 3

Process finished with exit code 0

mergeSequential()

Publisher 를 다른 Publisher 와 합치는 연산을 수행하는 함수입니다. 모든 Publisher 를 바로 subscribe 합니다. Publisher 의 onNext 이벤트가 동시에 도달하는데, merge 와는 다르게 내부에서 재정렬하기에 순서를 보장합니다.

출처 : https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#mergeSequential-org.reactivestreams.Publisher...- (opens in a new tab)


import java.time.Duration;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class MergeSequentialExample1 {
  @SneakyThrows
  public static void main(String[] args) {
    var flux1 = Flux.range(1,3)
        .doOnSubscribe(v -> {
          log.info("doOnSubscribe #1");
        })
        .delayElements(Duration.ofMillis(100));
 
    var flux2 = Flux.range(11,3)
        .doOnSubscribe(v -> {
          log.info("doOnSubscribe #2");
        })
        .delayElements(Duration.ofMillis(100));
 
    Flux.mergeSequential(flux1, flux2)
        .doOnNext(v -> {
          log.info("doOnNext : " + v);
        })
        .subscribe();
 
    Thread.sleep(2000);
  }
}

출력결과를 보면 doOnSubscribe #1, doOnSubscribe #2 가 거의 동시에 subscribe 되었다는 사실을 알 수 있습니다. 그리고 각각의 요소들은 각 Publisher 내에서의 순서에 맞게 Publisher 별로 이어져서 나오는 것을 확인 가능합니다.

출력결과

00:43:52.235 [main] INFO io.....concat_merge_mergesequential.MergeSequentialExample1 -- doOnSubscribe #1
00:43:52.243 [main] INFO io.....concat_merge_mergesequential.MergeSequentialExample1 -- doOnSubscribe #2
00:43:52.360 [parallel-1] INFO io.....concat_merge_mergesequential.MergeSequentialExample1 -- doOnNext : 1
00:43:52.468 [parallel-4] INFO io.....concat_merge_mergesequential.MergeSequentialExample1 -- doOnNext : 2
00:43:52.578 [parallel-6] INFO io.....concat_merge_mergesequential.MergeSequentialExample1 -- doOnNext : 3
00:43:52.578 [parallel-6] INFO io.....concat_merge_mergesequential.MergeSequentialExample1 -- doOnNext : 11
00:43:52.578 [parallel-6] INFO io.....concat_merge_mergesequential.MergeSequentialExample1 -- doOnNext : 12
00:43:52.578 [parallel-6] INFO io.....concat_merge_mergesequential.MergeSequentialExample1 -- doOnNext : 13

Process finished with exit code 0

앞에서 살펴봤던 concat 예제와는 다르게 doOnSubscribe #1, doOnSubscribe #2 가 모두 제일 처음에 나타났다는 사실에 주목해주세요. 먼저 subscribe 를 하고, flux 별로 순서를 내부적으로 지켜주면서 merge 를 한다는 사실을 알 수 있습니다.


다양한 함수들

  • map(), mapNotNull()
  • doOn---()
  • flatMap()
  • filter()
  • take(), takeLast()
  • skip(), skipLast()
  • collectList()
  • cache()

map(), mapNotNull()

@Slf4j
public class Map_MapNotNull_Example {
  public static void main(String[] args) {
    Flux.range(1,3)
        .map(v -> v*100)
        .doOnNext(v -> {
          log.info("doOnNext : " + v);
        })
        .subscribe();
 
    Flux.range(1,3)
        .mapNotNull(v -> {
          if(v % 2 == 0) return v;
          return null;
        })
        .doOnNext(v -> {
          log.info("doOnNext : " + v);
        })
        .subscribe();
  }
}

출력결과

10:24:21.178 [main] INFO io...various_functions.Map_MapNotNull_Example -- doOnNext : 100
10:24:21.180 [main] INFO io...various_functions.Map_MapNotNull_Example -- doOnNext : 200
10:24:21.180 [main] INFO io...various_functions.Map_MapNotNull_Example -- doOnNext : 300
10:24:21.183 [main] INFO io...various_functions.Map_MapNotNull_Example -- doOnNext : 2

Process finished with exit code 0

doOn---()

onComplete, onNext, onSubscribe, onError 와 같은 이벤트에 대한 메서드로 doOnComplete, doOnNext, doOnSubscribe, doOnError 가 있습니다. 이 함수들은 이벤트의 흐름에 영향을 주지 않고 원하는 작업을 추가할 수 있습니다.

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class DoOnFunctions_Example {
  public static void main(String[] args) {
    Flux.range(1,3)
        .map(v -> v*2)
        .doOnNext(v -> {
          log.info("doOnNext : " + v);
        })
        .doOnComplete(() -> {
          log.info("doOnComplete");
        })
        .doOnSubscribe(subscription -> {
          log.info("doOnSubscribe");
        })
        .doOnRequest(v -> {
          log.info("doOnRequest : " + v);
        })
        .map(v -> v/2)
        .subscribe();
  }
}

출력결과

10:31:38.016 [main] INFO io...spring_webflux.various_functions.DoOnFunctions_Example -- doOnSubscribe
10:31:38.020 [main] INFO io...spring_webflux.various_functions.DoOnFunctions_Example -- doOnRequest : 9223372036854775807
10:31:38.020 [main] INFO io...spring_webflux.various_functions.DoOnFunctions_Example -- doOnNext : 2
10:31:38.020 [main] INFO io...spring_webflux.various_functions.DoOnFunctions_Example -- doOnNext : 4
10:31:38.020 [main] INFO io...spring_webflux.various_functions.DoOnFunctions_Example -- doOnNext : 6
10:31:38.020 [main] INFO io...spring_webflux.various_functions.DoOnFunctions_Example -- doOnComplete

Process finished with exit code 0

flatMap() (중요)

여러개의 Publisher 를 조합할 때 사용됩니다. onNext 이벤트를 받아서 Publisher 를 반환합니다.

package io.chagchagchag.example.foobar.spring_webflux.various_functions;
 
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
 
@Slf4j
public class FlatMap_Example {
  @SneakyThrows
  public static void main(String[] args) {
    Flux.range(1,3)
        .flatMap(v1 -> {
          return Flux.range(4,3)
              .map(v2 -> v1 + ", " + v2)
              .publishOn(Schedulers.parallel());
        })
        .doOnNext(v -> {
          log.info("doOnNext : " + v);
        })
        .subscribe();
 
    Thread.sleep(1000);
  }
}

출력결과

11:03:21.028 [parallel-1] INFO io...various_functions.FlatMap_Example -- doOnNext : 1, 4
11:03:21.029 [parallel-1] INFO io...various_functions.FlatMap_Example -- doOnNext : 1, 5
11:03:21.030 [parallel-1] INFO io...various_functions.FlatMap_Example -- doOnNext : 1, 6
11:03:21.030 [parallel-1] INFO io...various_functions.FlatMap_Example -- doOnNext : 2, 4
11:03:21.030 [parallel-1] INFO io...various_functions.FlatMap_Example -- doOnNext : 2, 5
11:03:21.030 [parallel-1] INFO io...various_functions.FlatMap_Example -- doOnNext : 2, 6
11:03:21.030 [parallel-1] INFO io...various_functions.FlatMap_Example -- doOnNext : 3, 4
11:03:21.030 [parallel-1] INFO io...various_functions.FlatMap_Example -- doOnNext : 3, 5
11:03:21.030 [parallel-1] INFO io...various_functions.FlatMap_Example -- doOnNext : 3, 6

Process finished with exit code 0

filter()

Stream 에서의 filter 와 유사한 연산을 합니다.

package io.chagchagchag.example.foobar.spring_webflux.various_functions;
 
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class Filter_Example {
  public static void main(String[] args) {
    Flux.range(1,30)
        .filter(v -> v%3 == 0)
        .doOnNext(v -> {
          log.info("doOnNext : " + v);
        })
        .subscribe();
  }
}

출력결과

11:21:19.135 [main] INFO io...various_functions.Filter_Example -- doOnNext : 3
11:21:19.137 [main] INFO io...various_functions.Filter_Example -- doOnNext : 6
11:21:19.137 [main] INFO io...various_functions.Filter_Example -- doOnNext : 9
11:21:19.137 [main] INFO io...various_functions.Filter_Example -- doOnNext : 12
11:21:19.137 [main] INFO io...various_functions.Filter_Example -- doOnNext : 15
11:21:19.137 [main] INFO io...various_functions.Filter_Example -- doOnNext : 18
11:21:19.137 [main] INFO io...various_functions.Filter_Example -- doOnNext : 21
11:21:19.137 [main] INFO io...various_functions.Filter_Example -- doOnNext : 24
11:21:19.137 [main] INFO io...various_functions.Filter_Example -- doOnNext : 27
11:21:19.137 [main] INFO io...various_functions.Filter_Example -- doOnNext : 30

Process finished with exit code 0

take(n), takeLast(n)

take(n)

  • 대략 n 개 까지 onNext 이벤트를 전파합니다. n개에 도달하면 onComplete 이벤트가 발생합니다.

takeLast(n)

  • onComplete 이벤트가 발생하기 직전의 n개의 아이템만 onNext 이벤트를 전파합니다.
package io.chagchagchag.example.foobar.spring_webflux.various_functions;
 
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class Take_TakeLast_Example {
  @SneakyThrows
  public static void main(String[] args) {
    Flux.range(1,100000)
        .take(5)
        .doOnNext(v -> {
          log.info("taken item = " + v);
        })
        .subscribe();
 
    Flux.range(1, 100)
        .takeLast(10)
        .doOnNext(v -> {
          log.info("taken item = " + v);
        })
        .subscribe();
 
    Thread.sleep(2000);
  }
}

출력결과

11:26:56.569 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 1
11:26:56.572 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 2
11:26:56.572 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 3
11:26:56.572 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 4
11:26:56.572 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 5
11:26:56.575 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 91
11:26:56.575 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 92
11:26:56.575 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 93
11:26:56.575 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 94
11:26:56.575 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 95
11:26:56.575 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 96
11:26:56.575 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 97
11:26:56.575 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 98
11:26:56.575 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 99
11:26:56.575 [main] INFO io...various_functions.Take_TakeLast_Example -- taken item = 100

Process finished with exit code 0

skip(), skipLast()

package io.chagchagchag.example.foobar.spring_webflux.various_functions;
 
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class Skip_SkipLast_Example {
  @SneakyThrows
  public static void main(String[] args) {
    // 처음부터 n 번째까지를 스킵한 결과를 순회
    Flux.range(1, 10)
        .skip(3)
        .doOnNext(v -> {
          log.info("skip 하지 않은 요소 : " + v);
        })
        .subscribe();
    // 맨 끝에서부터 n 개를 skip 한 결과를 순회
    Flux.range(1, 1000)
        .skipLast(997)
        .doOnNext(v -> {
          log.info("skip 하지 않은 요소 " + v);
        })
        .subscribe();
 
    Thread.sleep(2000);
  }
}

출력결과

11:32:33.028 [main] INFO io...various_functions.Skip_SkipLast_Example -- skip 하지 않은 요소 : 4
11:32:33.030 [main] INFO io...various_functions.Skip_SkipLast_Example -- skip 하지 않은 요소 : 5
11:32:33.030 [main] INFO io...various_functions.Skip_SkipLast_Example -- skip 하지 않은 요소 : 6
11:32:33.030 [main] INFO io...various_functions.Skip_SkipLast_Example -- skip 하지 않은 요소 : 7
11:32:33.030 [main] INFO io...various_functions.Skip_SkipLast_Example -- skip 하지 않은 요소 : 8
11:32:33.030 [main] INFO io...various_functions.Skip_SkipLast_Example -- skip 하지 않은 요소 : 9
11:32:33.030 [main] INFO io...various_functions.Skip_SkipLast_Example -- skip 하지 않은 요소 : 10
11:32:33.032 [main] INFO io...various_functions.Skip_SkipLast_Example -- skip 하지 않은 요소 1
11:32:33.032 [main] INFO io...various_functions.Skip_SkipLast_Example -- skip 하지 않은 요소 2
11:32:33.032 [main] INFO io...various_functions.Skip_SkipLast_Example -- skip 하지 않은 요소 3

Process finished with exit code 0

collectList()

참고 : https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#collectList-- (opens in a new tab)


onComplete 이벤트가 발생하기 전까지 내부에 item 을 저장해둡니다. 그리고 시퀀스가 완료(onComplete)될 때 Mono 에 의해 방출되는 List 로 수집합니다.

시퀀스가 비어있으면 비어있는 List 가 방출됩니다.

package io.chagchagchag.example.foobar.spring_webflux.various_functions;
 
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class CollectList_Example {
  public static void main(String[] args) {
    Flux.range(1, 10)
        .doOnNext(v -> {
          log.info("(before) doOnNext >> " + v);
        })
        .collectList()
        .doOnNext(v -> {
          log.info("(after) doOnNext >> " + v);
        })
        .subscribe();
  }
}

출력결과

12:16:45.442 [main] INFO io...various_functions.CollectList_Example -- (before) doOnNext >> 1
12:16:45.444 [main] INFO io...various_functions.CollectList_Example -- (before) doOnNext >> 2
12:16:45.444 [main] INFO io...various_functions.CollectList_Example -- (before) doOnNext >> 3
12:16:45.444 [main] INFO io...various_functions.CollectList_Example -- (before) doOnNext >> 4
12:16:45.444 [main] INFO io...various_functions.CollectList_Example -- (before) doOnNext >> 5
12:16:45.444 [main] INFO io...various_functions.CollectList_Example -- (before) doOnNext >> 6
12:16:45.444 [main] INFO io...various_functions.CollectList_Example -- (before) doOnNext >> 7
12:16:45.444 [main] INFO io...various_functions.CollectList_Example -- (before) doOnNext >> 8
12:16:45.444 [main] INFO io...various_functions.CollectList_Example -- (before) doOnNext >> 9
12:16:45.444 [main] INFO io...various_functions.CollectList_Example -- (before) doOnNext >> 10
12:16:45.444 [main] INFO io...various_functions.CollectList_Example -- (after) doOnNext >> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Process finished with exit code 0

cache()

처음 subscribe 시에만 publisher 를 실행하고, 이후 subscribe 시에는 저장된 이벤트들을 흘려보내줍니다.

package io.chagchagchag.example.foobar.spring_webflux.various_functions;
 
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class CacheExample {
  public static void main(String[] args) {
    // (1)
    Flux<Object> flux = Flux.create(fluxSink -> {
      for(int i=0; i<3; i++){
        fluxSink.next(i);
        log.info("(created) " + i);
      }
      log.info("sink next 완료 (at Publisher)");
      fluxSink.complete();
    }).cache();
 
    // (2)
    flux.subscribe(
        v -> {log.info("v ::: " + v);},
        null,
        () -> {log.info("complete");}
    );
 
    // (3)
    flux.subscribe(
        v -> {log.info("v ::: " + v);},
        null,
        () -> {log.info("complete");}
    );
  }
}

(1)

  • (1) 에서 Flux.create 를 수행한 것은 subscribe 를 하지 않으면 아무 일도 일어나지 않습니다.
  • (1) 의 구문은 단순한 식(Statement) 일 뿐이고, Subscribe 할 때 Publisher 의 구독이 시작됩니다.

(2)

  • (1) 을 subscribe() 해서 첫번째 Subscribe 를 수행하는데, 이때 Flux 에 의해 Publisher 의 아이템들이 생성됩니다.
  • (1) 에서 Flux.create() 한 후 마무리로 cache() 를 했기 때문에 내부에 결과가 저장됩니다.

(3)

  • (1) 을 subscribe() 해서 Flux 를 새로 create 하지 않고, (2) 에서 저장해둔 결과를 그대로 읽어들입니다.
  • 따라서 Sink 이벤트 없이 구독만 하기에 onNext, onComplete 이벤트만 발생합니다.

출력결과

12:31:10.022 [main] INFO io...various_functions.CacheExample -- v ::: 0
12:31:10.026 [main] INFO io...various_functions.CacheExample -- (created) 0
12:31:10.027 [main] INFO io...various_functions.CacheExample -- v ::: 1
12:31:10.027 [main] INFO io...various_functions.CacheExample -- (created) 1
12:31:10.027 [main] INFO io...various_functions.CacheExample -- v ::: 2
12:31:10.027 [main] INFO io...various_functions.CacheExample -- (created) 2
12:31:10.027 [main] INFO io...various_functions.CacheExample -- sink next 완료 (at Publisher)
12:31:10.027 [main] INFO io...various_functions.CacheExample -- complete
12:31:10.028 [main] INFO io...various_functions.CacheExample -- v ::: 0
12:31:10.028 [main] INFO io...various_functions.CacheExample -- v ::: 1
12:31:10.028 [main] INFO io...various_functions.CacheExample -- v ::: 2
12:31:10.028 [main] INFO io...various_functions.CacheExample -- complete

Process finished with exit code 0