Reactor 프로그래밍 2 (Thread, Scheduler, defer())

Reactor 프로그래밍 2 (Thread, Scheduler, Context, defer())

참고자료


Scheduler, Thread

아무 설정을 하지 않으면 Publisher 는 subscribe 를 호출한 caller 가 속했던 스레드에서 실행됩니다. subscribe() 내에 제공된 lambda 또는 Scheduler 역시 caller 가 속해있던 스레드에서 실행됩니다.

publish 또는 subscribe 에서 어떤 Scheduler 를 적용했는지에 따라 task 가 실행될 스레드 풀이 달라지게 됩니다.


Scheduler

  • ImmediateScheduler
  • SingleScheduler
  • ParallelScheduler
  • BoundedElasticScheduler


Scheduler (opens in a new tab) 를 implements 하고 있는 클래스는 ImmediateScheduler, SingleScheduler, ParallelScheduler, BoundedElasticScheduler 가 있습니다.

각각의 Scheduler 객체는 아래에서 설명하게 될 Schedulers.immediate(), Schedulers.single(), Schedulers.parallel(), Schedulers.boundedElastic() 메서드를 통해서 생성할 수 있습니다.

  • Schedulers.immediate()
  • Schedulers.single()
  • Schedulers.parallel()
  • Schedulers.boundedElastic()

Schedulers

Schedulers.immediate()

subscribe 를 호출한 caller 스레드에서 즉시 실행합니다.

Flux, Mono 등의 Publisher 코드를 subscribe 하는 코드에서 별도로 Schedulers 를 지정하지 않으면 기본적으로 Schedulers.immediate 를 사용하게 됩니다.

package io.chagchagchag.example.foobar.spring_webflux.schedulers;
 
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
 
@Slf4j
public class SchedulersImmediate_Example1 {
  public static void main(String[] args) {
    Flux.create(fluxSink -> {
      for(int i=0; i<=3; i++){
        log.info("i == " + i);
        fluxSink.next(i);
      }
    })
    .subscribeOn(Schedulers.immediate())
    .subscribe(
        v -> {
          log.info("v == " + v);
        }
    );
  }
}

출력결과

18:17:03.980 [main] INFO io...schedulers.SchedulersImmediate_Example1 -- i == 0
18:17:03.982 [main] INFO io...schedulers.SchedulersImmediate_Example1 -- v == 0
18:17:03.983 [main] INFO io...schedulers.SchedulersImmediate_Example1 -- i == 1
18:17:03.983 [main] INFO io...schedulers.SchedulersImmediate_Example1 -- v == 1
18:17:03.983 [main] INFO io...schedulers.SchedulersImmediate_Example1 -- i == 2
18:17:03.983 [main] INFO io...schedulers.SchedulersImmediate_Example1 -- v == 2
18:17:03.983 [main] INFO io...schedulers.SchedulersImmediate_Example1 -- i == 3
18:17:03.983 [main] INFO io...schedulers.SchedulersImmediate_Example1 -- v == 3

Process finished with exit code 0

Schedulers.single()

1개의 스레드를 사용하는 스레드 풀이며, 캐싱된 스레드 풀입니다. Schedulers.single 을 사용하면 publish, subscribe 가 하나의 스레드에서 실행됩니다.

package io.chagchagchag.example.foobar.spring_webflux.schedulers;
 
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
 
@Slf4j
public class SchedulersSingle_Example1 {
  @SneakyThrows
  public static void main(String[] args) {
    for(int i=0; i<15; i++){
      final int index = i;
      Flux.create(fluxSink -> {
        log.info("index == " + index);
        fluxSink.next(index);
      }).subscribeOn(
          Schedulers.single()
      ).subscribe(
          v -> {
            log.info("v ::: " + v);
          }
      );
    }
 
    Thread.sleep(2000);
  }
}

출력결과를 보면 모두 같은 스레드인 single-1 에서 실행되었음을 확인 가능합니다.


출력결과

18:26:36.501 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- index == 0
18:26:36.503 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- v ::: 0
18:26:36.503 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- index == 1
18:26:36.503 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- v ::: 1
18:26:36.503 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- index == 2
18:26:36.503 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- v ::: 2
18:26:36.503 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- index == 3
18:26:36.503 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- v ::: 3
18:26:36.503 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- index == 4
18:26:36.503 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- v ::: 4
18:26:36.503 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- index == 5
18:26:36.503 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- v ::: 5
18:26:36.504 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- index == 6
18:26:36.504 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- v ::: 6
18:26:36.504 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- index == 7
18:26:36.504 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- v ::: 7
18:26:36.504 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- index == 8
18:26:36.504 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- v ::: 8
18:26:36.504 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- index == 9
18:26:36.504 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- v ::: 9
18:26:36.504 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- index == 10
18:26:36.504 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- v ::: 10
18:26:36.504 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- index == 11
18:26:36.504 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- v ::: 11
18:26:36.504 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- index == 12
18:26:36.504 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- v ::: 12
18:26:36.504 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- index == 13
18:26:36.504 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- v ::: 13
18:26:36.504 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- index == 14
18:26:36.504 [single-1] INFO io...schedulers.SchedulersSingle_Example1 -- v ::: 14

Process finished with exit code 0

Schedulers.parallel()

n 개의 캐싱된 스레드를 가진 스레드 풀을 생성하는 메서드 입니다. 스레드 풀은 CPU 코어 수 만큼의 크기를 가집니다.

package io.chagchagchag.example.foobar.spring_webflux.schedulers;
 
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
 
@Slf4j
public class SchedulersParallel_Example1 {
  @SneakyThrows
  public static void main(String[] args) {
    for(int i=0; i<15; i++){
      final int index = i;
      Flux.create(fluxSink -> {
        log.info("index = " + index);
        fluxSink.next(index);
      })
      .subscribeOn(Schedulers.parallel())
      .subscribe(v -> {
        log.info("v ::: " + v);
      });
    }
 
    Thread.sleep(2000);
  }
}

출력결과

18:36:59.042 [parallel-9] INFO io...schedulers.SchedulersParallel_Example1 -- index = 8
18:36:59.042 [parallel-5] INFO io...schedulers.SchedulersParallel_Example1 -- index = 4
18:36:59.042 [parallel-4] INFO io...schedulers.SchedulersParallel_Example1 -- index = 3
18:36:59.042 [parallel-2] INFO io...schedulers.SchedulersParallel_Example1 -- index = 1
18:36:59.042 [parallel-12] INFO io...schedulers.SchedulersParallel_Example1 -- index = 11
18:36:59.042 [parallel-14] INFO io...schedulers.SchedulersParallel_Example1 -- index = 13
18:36:59.042 [parallel-7] INFO io...schedulers.SchedulersParallel_Example1 -- index = 6
18:36:59.042 [parallel-1] INFO io...schedulers.SchedulersParallel_Example1 -- index = 0
18:36:59.042 [parallel-10] INFO io...schedulers.SchedulersParallel_Example1 -- index = 9
18:36:59.042 [parallel-15] INFO io...schedulers.SchedulersParallel_Example1 -- index = 14
18:36:59.042 [parallel-8] INFO io...schedulers.SchedulersParallel_Example1 -- index = 7
18:36:59.048 [parallel-12] INFO io...schedulers.SchedulersParallel_Example1 -- v ::: 11
18:36:59.048 [parallel-10] INFO io...schedulers.SchedulersParallel_Example1 -- v ::: 9
18:36:59.042 [parallel-13] INFO io...schedulers.SchedulersParallel_Example1 -- index = 12
18:36:59.042 [parallel-3] INFO io...schedulers.SchedulersParallel_Example1 -- index = 2
18:36:59.042 [parallel-11] INFO io...schedulers.SchedulersParallel_Example1 -- index = 10
18:36:59.048 [parallel-3] INFO io...schedulers.SchedulersParallel_Example1 -- v ::: 2
18:36:59.042 [parallel-6] INFO io...schedulers.SchedulersParallel_Example1 -- index = 5
18:36:59.048 [parallel-4] INFO io...schedulers.SchedulersParallel_Example1 -- v ::: 3
18:36:59.048 [parallel-5] INFO io...schedulers.SchedulersParallel_Example1 -- v ::: 4
18:36:59.048 [parallel-7] INFO io...schedulers.SchedulersParallel_Example1 -- v ::: 6
18:36:59.048 [parallel-6] INFO io...schedulers.SchedulersParallel_Example1 -- v ::: 5
18:36:59.048 [parallel-1] INFO io...schedulers.SchedulersParallel_Example1 -- v ::: 0
18:36:59.048 [parallel-15] INFO io...schedulers.SchedulersParallel_Example1 -- v ::: 14
18:36:59.048 [parallel-2] INFO io...schedulers.SchedulersParallel_Example1 -- v ::: 1
18:36:59.048 [parallel-14] INFO io...schedulers.SchedulersParallel_Example1 -- v ::: 13
18:36:59.048 [parallel-8] INFO io...schedulers.SchedulersParallel_Example1 -- v ::: 7
18:36:59.048 [parallel-9] INFO io...schedulers.SchedulersParallel_Example1 -- v ::: 8
18:36:59.048 [parallel-13] INFO io...schedulers.SchedulersParallel_Example1 -- v ::: 12
18:36:59.048 [parallel-11] INFO io...schedulers.SchedulersParallel_Example1 -- v ::: 10

Process finished with exit code 0

Schedulers.boundedElastic()

고정되지 않은 크기의 스레드 풀이며, 캐싱된 스레드 풀입니다. 재사용할 수 있는 스레드가 있으면 재사용하고, 재사용할 수 있는 스레드가 없으면 새로 스레드를 생성합니다. 생성 가능한 스레드 수는 cpu 코어수 x 10 입니다. I/O Blocking 작업의 수행에 적합하고, 특정시간 (기본 60초) 동안 사용하지 않으면 사용되지 않는 스레드는 스레드 풀에서 제거되게끔 관리됩니다.

package io.chagchagchag.example.foobar.spring_webflux.schedulers;
 
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
 
@Slf4j
public class SchedulersBoundedElastic_Example {
  @SneakyThrows
  public static void main(String[] args) {
    for(int i=0; i<300; i++){
      final int index = i;
      Flux.create(fluxSink -> {
        log.info("index == " + index);
        fluxSink.next(index);
      })
      .subscribeOn(Schedulers.boundedElastic())
      .subscribe(
          v -> {
            log.info("v ::: " + v);
          }
      );
    }
 
    Thread.sleep(2000);
  }
}

출력결과

 ...
18:41:16.822 [boundedElastic-117] INFO io...schedulers.SchedulersBoundedElastic_Example -- index == 276
18:41:16.822 [boundedElastic-117] INFO io...schedulers.SchedulersBoundedElastic_Example -- v ::: 276
18:41:16.822 [boundedElastic-118] INFO io...schedulers.SchedulersBoundedElastic_Example -- index == 277
18:41:16.822 [boundedElastic-119] INFO io...schedulers.SchedulersBoundedElastic_Example -- index == 278
18:41:16.822 [boundedElastic-118] INFO io...schedulers.SchedulersBoundedElastic_Example -- v ::: 277
18:41:16.822 [boundedElastic-119] INFO io...schedulers.SchedulersBoundedElastic_Example -- v ::: 278
...

Schedulers.new---

위에서 살펴본 Schedulers.immediate(), Schedulers.single(), Schedulers.parallel(), Schedulers.boundedElastic() 메서드 들은 캐싱되지 않는 스레드 풀이었지만, Schedulers.new--- 과 같은 이름을 가진 메서드들인 Schedulers.newSingle(), Schedulers.newParallel(), Schedulers.newBoundedElastic() 은 캐싱된 스레드 풀을 사용합니다.

  • Schedulers.newSingle()
  • Schedulers.newParallel()
  • Schedulers.newBoundedElastic()

스레드 풀의 해제는 dispose() 로 해제합니다.


package io.chagchagchag.example.foobar.spring_webflux.schedulers;
 
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
 
@Slf4j
public class SchedulersNew_Example1 {
  @SneakyThrows
  public static void main(String[] args) {
    for(int i=0; i<100; i++){
      var newSingle = Schedulers.newSingle("싱글스레드");
      final int index = i;
      Flux.create(fluxSink -> {
        log.info("index == " + index);
        fluxSink.next(index);
      })
      .subscribeOn(newSingle)
      .subscribe(v -> {
        log.info("v ::: " + v);
        newSingle.dispose();
      });
    }
  }
}

출력결과

...

18:54:50.336 [싱글스레드-80] INFO io...schedulers.SchedulersNew_Example1 -- index == 79
18:54:50.336 [싱글스레드-61] INFO io...schedulers.SchedulersNew_Example1 -- index == 60
18:54:50.336 [싱글스레드-96] INFO io...schedulers.SchedulersNew_Example1 -- index == 95
18:54:50.335 [싱글스레드-8] INFO io...schedulers.SchedulersNew_Example1 -- index == 7
... 

Schedulers.fromExecutorService(executorService)

커스텀하게 직접 생성한 ExecutorService 를 이용해서 Scheduler 를 생성합니다.

package io.chagchagchag.example.foobar.spring_webflux.schedulers;
 
import java.util.concurrent.Executors;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
 
@Slf4j
public class SchedulersFromExecutorService_Example1 {
  @SneakyThrows
  public static void main(String[] args) {
    var executorService = Executors.newSingleThreadExecutor();
    for(int i=0; i<100; i++){
      final int index = i;
      Flux.create(fluxSink -> {
        log.info("index == " + index);
        fluxSink.next(index);
      })
      .subscribeOn(Schedulers.fromExecutorService(executorService))
      .subscribe(v -> {
        log.info("v == " + v);
      });
    }
 
    Thread.sleep(2000);
    executorService.shutdown();
  }
}

출력결과

...

18:59:27.838 [pool-1-thread-1] INFO io...schedulers.SchedulersFromExecutorService_Example1 -- index == 84
18:59:27.838 [pool-1-thread-1] INFO io...schedulers.SchedulersFromExecutorService_Example1 -- v == 84
18:59:27.838 [pool-1-thread-1] INFO io...schedulers.SchedulersFromExecutorService_Example1 -- index == 85
18:59:27.838 [pool-1-thread-1] INFO io...schedulers.SchedulersFromExecutorService_Example1 -- v == 85
18:59:27.838 [pool-1-thread-1] INFO io...schedulers.SchedulersFromExecutorService_Example1 -- index == 86
18:59:27.838 [pool-1-thread-1] INFO io...schedulers.SchedulersFromExecutorService_Example1 -- v == 86
18:59:27.838 [pool-1-thread-1] INFO io...schedulers.SchedulersFromExecutorService_Example1 -- index == 87
18:59:27.838 [pool-1-thread-1] INFO io...schedulers.SchedulersFromExecutorService_Example1 -- v == 87

...

publishOn(), subscribeOn()

publishOn(Scheduler)

publishOn(Scheduler) 는 어느 위치에서 사용했는지가 중요합니다. 만약 A→B→C 순으로 연산자가 체이닝되어 있을 경우 publishOn(Scheduler) 를 선언하지 않으면, A를 실행한 스레드가 B를 수행하고, B를 실행한 스레드는 C를 실행합니다. 체이닝된 연산 수행시 스레드 역시 체이닝되어 수행됩니다.

publishOn(Scheduler) 는 Publisher 가 데이터를 공급할 때 사용되는 연산이기 때문에 Publisher 코드의 체이닝 코드 중간의 어디에서든 publishOn(Scheduler) 를 선언가능하고 데이터 흐름의 중간에 실행 스레드를 변경할 수 있습니다.


subscribeOn(Scheduler)

subscribeOn(Scheduler)는 어느 위치에서 사용했는지가 중요하지 않습니다. Subscriber 가 Publisher를 subscribe 할 때 사용될 스레드이기에 subscrieOn(Scheduler)는 데이터 소스가 실행될 스레드에 영향을 줍니다.


publishOn(Scheduler)

publishOn(Scheduler) 코드 다음에 실행되는 스레드는 publishOn(Scheduler) 에서 지정한 스레드 풀에 의해 변경됩니다. 즉 실행되는 중간에 publishOn(Scheduler)를 통해 다른 설정을 사용하면 실행되는 스레드를 변경할 수 있습니다. 스레드 풀 중 하나의 스레드만 지속적으로 사용되는 성격이 있습니다.

publishOn(Scheduler) 는 Publisher 가 데이터를 공급할 때 사용되는 연산이기 때문에 Publisher 코드의 체이닝 코드 중간의 어디에서든 publishOn(Scheduler) 를 선언가능하고 데이터 흐름의 중간에 실행 스레드를 변경할 수 있습니다.


package io.chagchagchag.example.foobar.spring_webflux.publish_on_subscribe_on;
 
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
 
@Slf4j
public class PublishOn_Example1 {
  @SneakyThrows
  public static void main(String[] args) {
    Flux.create(fluxSink -> {
      for(int i=0; i<7; i++){
        log.info("i == " + i);
        fluxSink.next(i);
      }
    })
    .publishOn(Schedulers.single())
    .doOnNext(element -> {
      log.info("doOnNext1 :: " + element);
    })
    .publishOn(Schedulers.boundedElastic())
    .doOnNext(element -> {
      log.info("doOnNext2 :: " + element);
    })
    .subscribe(v -> {
      log.info("v ::: " + v);
    });
 
    Thread.sleep(2000);
  }
}

실행 결과를 보면 doOnNext1 은 single-1 스레드에서 실행되고 doOnNext2 는 boundedElastic-1 에서 실행됩니다. 그리고 각각의 스레드는 개별적으로 수행되면서 가끔은 single-1 이 먼저 수행하고, 가끔은 boundedElastic-1 이 먼저 수행되기도 하는 것을 확인 가능합니다.


출력결과

19:17:27.086 [main] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- i == 0
19:17:27.089 [main] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- i == 1
19:17:27.089 [main] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- i == 2
19:17:27.089 [main] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- i == 3
19:17:27.089 [single-1] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- doOnNext1 :: 0
19:17:27.089 [main] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- i == 4
19:17:27.089 [single-1] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- doOnNext1 :: 1
19:17:27.089 [main] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- i == 5
19:17:27.089 [single-1] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- doOnNext1 :: 2
19:17:27.089 [main] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- i == 6
19:17:27.089 [boundedElastic-1] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- doOnNext2 :: 0
19:17:27.090 [single-1] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- doOnNext1 :: 3
19:17:27.090 [single-1] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- doOnNext1 :: 4
19:17:27.090 [single-1] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- doOnNext1 :: 5
19:17:27.090 [boundedElastic-1] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- v ::: 0
19:17:27.090 [single-1] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- doOnNext1 :: 6
19:17:27.090 [boundedElastic-1] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- doOnNext2 :: 1
19:17:27.090 [boundedElastic-1] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- v ::: 1
19:17:27.090 [boundedElastic-1] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- doOnNext2 :: 2
19:17:27.090 [boundedElastic-1] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- v ::: 2
19:17:27.090 [boundedElastic-1] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- doOnNext2 :: 3
19:17:27.090 [boundedElastic-1] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- v ::: 3
19:17:27.090 [boundedElastic-1] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- doOnNext2 :: 4
19:17:27.090 [boundedElastic-1] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- v ::: 4
19:17:27.090 [boundedElastic-1] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- doOnNext2 :: 5
19:17:27.090 [boundedElastic-1] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- v ::: 5
19:17:27.090 [boundedElastic-1] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- doOnNext2 :: 6
19:17:27.090 [boundedElastic-1] INFO io...publish_on_subscribe_on.PublishOn_Example1 -- v ::: 6

Process finished with exit code 0

subscribeOn(Scheduler)

subscribeOn(Scheduler)는 어느 위치에서 사용했는지가 중요하지 않습니다. Scheduler 를 인자로 받으며, Subscriber 가 Publisher를 subscribe 할 때 사용될 스레드이기에 subscrieOn(Scheduler)는 데이터 소스가 실행될 스레드에 영향을 줍니다.

package io.chagchagchag.example.foobar.spring_webflux.publish_on_subscribe_on;
 
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
 
@Slf4j
public class SubscribeOn_Example1 {
  @SneakyThrows
  public static void main(String[] args) {
    Flux.create(fluxSink -> {
      for(int i=0; i<8; i++){
        log.info("i == " + i);
        fluxSink.next(i);
      }
    })
    .doOnNext(element -> {
      log.info("doOnNext1 :: " + element);
    })
    .doOnNext(element -> {
      log.info("doOnNext2 :: " + element);
    })
    .subscribeOn(Schedulers.boundedElastic())
    .subscribe(v -> {
      log.info("v ::: " + v);
    });
 
    Thread.sleep(2000);
  }
}

출력결과

19:25:28.579 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- i == 0
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- doOnNext1 :: 0
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- doOnNext2 :: 0
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- v ::: 0
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- i == 1
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- doOnNext1 :: 1
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- doOnNext2 :: 1
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- v ::: 1
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- i == 2
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- doOnNext1 :: 2
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- doOnNext2 :: 2
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- v ::: 2
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- i == 3
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- doOnNext1 :: 3
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- doOnNext2 :: 3
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- v ::: 3
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- i == 4
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- doOnNext1 :: 4
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- doOnNext2 :: 4
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- v ::: 4
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- i == 5
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- doOnNext1 :: 5
19:25:28.582 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- doOnNext2 :: 5
19:25:28.583 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- v ::: 5
19:25:28.583 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- i == 6
19:25:28.583 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- doOnNext1 :: 6
19:25:28.583 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- doOnNext2 :: 6
19:25:28.583 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- v ::: 6
19:25:28.583 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- i == 7
19:25:28.583 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- doOnNext1 :: 7
19:25:28.583 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- doOnNext2 :: 7
19:25:28.583 [boundedElastic-1] INFO io...publish_on_subscribe_on.SubscribeOn_Example1 -- v ::: 7

Process finished with exit code 0

Context, ContextView

참고


Context 는 파이프라인 내부의 어디에서든 접근 가능한 key, value 저장소입니다. subscribeOn, publishOn 등으로 인해 실행되는 스레드가 달라질 때 Context 를 통해서 특정 key 에 대한 value 를 조회 또는 수정할 수 있습니다. Map 과 유사한 형식이며, Context 의 종류로는 읽기전용인 ContextView, 쓰기가능한 Context 가 있습니다.

Project Reactor 의 Publisher 에서는 Context 에 관련된 아래의 함수들을 제공합니다.

  • contextWrite() : 컨텍스트 쓰기
  • contextView() : 컨텍스트 읽기
  • deferContextual()

Project Reactor 의 Context (opens in a new tab) interface 는 아래의 메서드 또는 default 메서드 들이 정의되어 있습니다. 일반적인 Map 에서 제공하는 비슷한 연산들을 제공합니다. 자세한 코드는 Context - github.com/reactor (opens in a new tab) 를 참고해주시기 바랍니다.

  • put(k, v)
  • putNonNull(key, value)
  • delete(key)
  • putAll(contextView)
  • putAllMap(Map)
  • putAll(Context) : Deprecated
public interface Context extends ContextView {
    // ...
}

Context interface 가 extends 하고 있는 ContextView 는 아래와 같은 메서드 또는 default 메서드 들을 제공합니다. 자세한 코드는 ContextView - github.com/reactor (opens in a new tab) 를 참고해주시기 바랍니다.

  • get(key)
  • getOrDefault(key, defaultValue)
  • getOrEmpty(key)
  • hasKey(key)
  • isEmpty()
  • size
public interface ContextView {
    // ...
}

Publisher, Subscriber 코드에서 ThreadLocal 을 사용가능할까?

Context 에서는 ThreadLocal 을 사용불가능합니다. ThreadLocal 은 하나의 스레드에 값을 저장하고 이 스레드 안에서 어디서든지 접근 가능한 key,value 쌍을 의미합니다.

Project Reactor 의 Publisher, Subscriber 는 어떤 Thread 에서 실행되다가 다시 다른 Thread에서 실행되고 하는 작업들이 내부적으로 반복되기 때문에 ThreadLocal 에 접근하는 것이 불가능하고, Project Reactor 에서도 ThreadLocal 에 접근할 수 있는 방법을 따로 제공해주지 않고 있습니다. 아래의 예제는 Publisher, Subscriber 내에서 ThreadLocal 내의 값에 접근할 수 있는지를 체크하는 예제입니다.

출력결과를 보면 Publishser, Subscriber 에서는 ThreadLocal 값을 제대로 읽어오지 못한다는 사실을 알수 있습니다. main 스레드가 아닌 다른 스레드에서 실행되기 때문입니다. 출력결과의 로그에서 보이는 single-1 , parallel-1 , boundedElastic-1 은 스레드 명인데, 이렇게 ThreadLocal 이 생성된 스레드가 아닌 다른 스레드에서 실행되기 때문에 ThreadLocal 값을 제대로 읽어오지 못합니다.

package io.chagchagchag.example.foobar.spring_webflux.context;
 
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
 
@Slf4j
public class JavaThreadLocal_Example {
  @SneakyThrows
  public static void main(String[] args) {
    ThreadLocal<String> threadLocal = new ThreadLocal<>();
    threadLocal.set("배고파요");
 
    Flux.create(fluxSink -> {
      log.info("threadLocal : " + threadLocal.get());
      fluxSink.next(5);
    })
    .publishOn(Schedulers.parallel())
    .map(v -> {
      log.info("threadLocal : " + threadLocal.get());
      return v;
    })
    .publishOn(Schedulers.boundedElastic())
    .map(v -> {
      log.info("threadLocal : " + threadLocal.get());
      return v;
    })
    .subscribeOn(Schedulers.single())
    .subscribe();
 
    Thread.sleep(2000);
  }
}

출력결과

22:12:23.929 [single-1] INFO io...context.JavaThreadLocal_Example -- threadLocal : null
22:12:23.931 [parallel-1] INFO io...context.JavaThreadLocal_Example -- threadLocal : null
22:12:23.931 [boundedElastic-1] INFO io...context.JavaThreadLocal_Example -- threadLocal : null

Process finished with exit code 0

Context 초기화

Flux::subscribe(a,b,c,d) 에 initialContext 전달

참고

Flux::subscribe(consumer, errorConsumer, completeConsumer, initialContext) (opens in a new tab) 의 4번째 인자에는 context 의 초기값을 전달 가능합니다. 이 경우 역시 context 는 subscribe() 에서부터 위로 전파된다는 사실에 유념하셔야 합니다.

package io.chagchagchag.example.foobar.spring_webflux.context;
 
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
 
@Slf4j
public class SubscribeOnContext_Example1 {
  public static void main(String[] args) {
    var initialContext = Context.of("message", "배불러요");
    Flux.just("hello")
        .flatMap(v -> contextLog(v, "(1)"))
        .contextWrite(context -> context.put("message", "배고파요"))
        .flatMap(v -> contextLog(v, "(2)"))
        .subscribe(null, null, null, initialContext);
  }
 
  public static <T> Mono<T> contextLog(T t, String v){
    return Mono.deferContextual(c -> {
      log.info("v === {} , context === {}", v, c);
      return Mono.just(t);
    });
  }
}

출력결과를 보면, context 는 초기값인 "배불러요" 가 아닌 "배고파요"부터 시작하고 있습니다.

출력결과

23:19:42.858 [main] INFO io...context.SubscribeOnContext_Example1 -- v === (1) , context === Context1{message=배고파요}
23:19:42.861 [main] INFO io...context.SubscribeOnContext_Example1 -- v === (2) , context === Context1{message=배불러요}

Process finished with exit code 0

Context Read

fluxSink.contextView() 로 ContextView 접근

Sink 데이터에서 ContextView 객체에 접근 가능한데, sink.contextView() 메서드를 통해 접근 가능합니다.

package io.chagchagchag.example.foobar.spring_webflux.context;
 
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.util.context.Context;
 
@Slf4j
public class FromSinkGetContextView_Example {
  public static void main(String[] args) {
    var initialContext = Context.of("message", "배고파요");
 
    Flux.create(fluxSink -> {
      var name = fluxSink.contextView().get("message");
      log.info("Publisher 의 첫 Create (Sink) 시의 message = " + name);
      fluxSink.next("aaa");
    })
    .contextWrite(context -> context.put("message", "배불러요"))
    .subscribe(null, null, null, initialContext);
  }
}

출력결과

23:34:37.542 [main] INFO io...context.FromSinkGetContextView_Example -- Publisher 의 첫 Create (Sink) 시의 message = 배불러요

Process finished with exit code 0

Mono.deferContextual(Fucntion<ContextView, Mono<T>>)

참고 : Mono.deferContextual(Funciton<ContextView, Mono>) (opens in a new tab)


Mono.deferContexetual() 은 ContextView 를 인자로 받아서 Mono<T> 로 변환해서 반환하는 Function 을 인자로 받습니다. 아래 예제에서 flatMap 을 사용한 이유는 Flux 를 Mono 로 바꾼 후 conetxtWrite() 를 수행하기 위해서 flatMap 을 사용했습니다. 실제로 Subscriber 가 subscribe() 할때 context 를 읽는 순서는 역순인 맨 뒤에서부터 시작합니다.

package io.chagchagchag.example.foobar.spring_webflux.context;
 
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
 
@Slf4j
public class ContextRead_MonoDeferContextual_Example {
  public static void main(String[] args) {
    Flux.just("누구시죠")
        .flatMap(v -> {
          return Mono.deferContextual(contextView -> {
            String message = contextView.get("message");
            log.info("message ::: " + message);
            return Mono.just(v);
          });
        })
        .contextWrite(context -> context.put("message", "안녕하세요"))
        .subscribe();
  }
}

출력결과

23:44:39.279 [main] INFO io...context.ContextRead_MonoDeferContextual_Example -- message ::: 안녕하세요

Process finished with exit code 0

ContextWrite

Flux::contextWrite(Function)

Flux.java 내에 정의된 contextWrite(Function) 함수는 Context 입력에 대해 Context 출력을 리턴하는 함수형 인터페이스 Funciton<Context, Context> contextModifier 를 인자로 사용하는 함수입니다.

Function.contextWrite (Function<Context, Context> contextModifier) (opens in a new tab)

public abstract class Flux<T> implements CorePublisher<T> {
    // ...
    
	public final Flux<T> contextWrite(Function<Context, Context> contextModifier) {
		if (ContextPropagationSupport.shouldPropagateContextToThreadLocals()) {
			return onAssembly(new FluxContextWriteRestoringThreadLocals<>(
					this, contextModifier
			));
		}
		return onAssembly(new FluxContextWrite<>(this, contextModifier));
	}
    
	// ...
}

예제를 살펴보겠습니다.

package io.chagchagchag.example.foobar.spring_webflux.context;
 
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
 
@Slf4j
public class ContextWrite_Example {
  @SneakyThrows
  public static void main(String[] args) {
    Flux.just(1)
        .flatMap(v -> contextLog(v, "(1)"))
        .contextWrite(context -> context.put("message", "배고파요"))
        .flatMap(v -> contextLog(v, "(2)"))
        .contextWrite(context -> context.put("message", "배불러요"))
        .flatMap(v -> contextLog(v, "(3)"))
        .subscribe();
  }
 
  public static <T> Mono<T> contextLog(T t, String v){
    return Mono.deferContextual(c -> {
      log.info("v === {} , context === {}", v, c);
      return Mono.just(t);
    });
  }
}

출력결과를 보면 이해가 쉽지 않습니다.

일반 데이터 흐름과는 context 의 순서는 역순입니다. 먼저, subscribe() 부터 시작합니다. 그리고 새로운 contextWrite 를 만났을 때는 contextWrite 를 실행해서 새로운 context 를 생성하거나 수정한 후에 바로 위의 연산자로 이동합니다. 그리고 또 contextWrite 를 만나면 새로운 context 를 생성하거나 업데이트 후에 위의 연산자로 이동합니다. 이 과정을 위로 전파하면서 반복합니다.

출력결과

23:05:00.322 [main] INFO io...context.ContextWrite_Example -- v === (1) , context === Context1{message=배고파요}
23:05:00.327 [main] INFO io...context.ContextWrite_Example -- v === (2) , context === Context1{message=배불러요}
23:05:00.328 [main] INFO io...context.ContextWrite_Example -- v === (3) , context === Context0{}

Process finished with exit code 0

Mono.defer()

Mono.defer() (opens in a new tab) 는 아래와 같이 정의되어 있습니다.

public abstract class Mono<T> implements CorePublisher<T> {
    // ...
    
	public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier) {
		return onAssembly(new MonoDefer<>(supplier));
	}
    
    // ...
}

인자값으로 Supplier 를 받고 있습니다. 이 말은 값이 아닌 식을 전달받는다는 의미이고 지연된(lazy) 초기화가 가능하다는 의미가 됩니다.

Cold Publisher 는 실행중인 스레드가 Cold Publisher 를 구독할 때에만 Cold Publisher 에 대해 연산(evaluate)을 하게 되는데, 이런 연산을 Lazy Evaluation 이라고 부릅니다. Hot Publisher 와 비교해보면, Hot Publisher 는 구독하기 전에 이미 연산(evaluate)을 즉시해서 데이터를 제공합니다. 이렇게 Hot Publisher 처럼 즉시 연산을 하는 것을 Eager Evaluation 이라고 이야기합니다.

Mono.defer() 는 Cold Publisher 연산을 수행합니다.

Mono.defer() 내에 Supplier 를 건네주었다는 것은 그 Supplier 를 지연된 처리를 할 것이며, 별도의 다른 Publisher 를 defer() 를 통해 생성하고 이것을 현재 Subscriber 로 구독하는 작업을 통해 그 Supplier 를 실행시키겠다는 의미가 됩니다.

출처 : https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#defer-java.util.function.Supplier- (opens in a new tab)

위 그림을 보면, Subscriber 에게 Mono.defer 의 실행연산이 박스에 담긴채로 넘어가는 것이 강조되어 표현되어 있습니다.

아래는 비교적 단순한 Mono.defer() 의 사용법을 알 수 있는 예제입니다.

package io.chagchagchag.example.foobar.spring_webflux.defer;
 
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
 
@Slf4j
public class MonoDefer_Example {
  public static void main(String[] args) {
    Mono.defer(() -> {
      return Mono.just("안녕하세요");
    }).subscribe(message -> {
      log.info("message == " + message);
    });
  }
}

출력결과

10:41:14.477 [main] INFO io...defer.MonoDefer_Example -- message == 안녕하세요

Process finished with exit code 0

Mono.defer() 를 flatMap() 으로

하나의 Publisher 흐름에서 map() 등의 연산을 통해 Mono.defer() 를 호출하고 있을 때, 첫번째 Mono.defer() 내의 Supplier 람다바디 안에서 또 다시 별도의 Mono Publisher 를 생성하고 이 Publisher 를 전달해주려 할 경우가 있습니다.

I/O 처리 작업, Database 연산 들 처럼 응답지연이 많은 현대적인 애플리케이션의 특성상 이런 작업들이 자주 발생합니다. 이 경우 아래의 예제처럼 flatMap() 으로 Publisher 를 풀어서 처리할 수 있습니다.

package io.chagchagchag.example.foobar.spring_webflux.defer;
 
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
 
@Slf4j
public class MonoDeferFlatMap_Example {
  public static void main(String[] args) {
    var userName = "Mono.defer";
    Mono.just(userName)
        .flatMap(v -> Mono.defer(() -> {
          return Mono.just(userName + "님, 안녕하세요. 반가워요");
        })).subscribe(v -> {
          log.info("message === " + v);
        });
  }
}

출력결과

11:03:00.426 [main] INFO io...defer.MonoDeferFlatMap_Example -- message === Mono.defer님, 안녕하세요. 반가워요

Process finished with exit code 0

Mono.deferContextual(Function<T,R>)

Mono.deferContextual(Function) 의 정확한 입출력, 반환형 명세는 Mono.deferContextual(Fucntion<ContextView, Mono<T>>) 입니다.

참고 : Mono.deferContextual(Funciton<ContextView, Mono>) (opens in a new tab)


Mono.deferContexetual() 은 ContextView 를 인자로 받아서 Mono<T> 로 변환해서 반환하는 Function 을 인자로 받습니다. 아래 예제에서 flatMap 을 사용한 이유는 Flux 를 Mono 로 바꾼 후 conetxtWrite() 를 수행하기 위해서 flatMap 을 사용했습니다. 실제로 Subscriber 가 subscribe() 할때 context 를 읽는 순서는 역순인 맨 뒤에서부터 시작합니다.

package io.chagchagchag.example.foobar.spring_webflux.context;
 
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
 
@Slf4j
public class ContextRead_MonoDeferContextual_Example {
  public static void main(String[] args) {
    Flux.just("누구시죠")
        .flatMap(v -> {
          return Mono.deferContextual(contextView -> {
            String message = contextView.get("message");
            log.info("message ::: " + message);
            return Mono.just(v);
          });
        })
        .contextWrite(context -> context.put("message", "안녕하세요"))
        .subscribe();
  }
}

출력결과

23:44:39.279 [main] INFO io...context.ContextRead_MonoDeferContextual_Example -- message ::: 안녕하세요

Process finished with exit code 0