Publisher,Subscriber,Subscription,Backpressure

Reactive Streams - Publisher, Subscriber 개념, 예제

참고


예제 코드


Publisher, Subscriber, Subscription

참고


Publisher 는 Subscriber 를 등록할 수 있습니다. Subscriber 는 Publisher 에 자기 자신을 등록하기 위해 Publisher::subscribe(Subscriber) 메서드를 실행해서 자기 자신을 등록합니다.

Publisher.java (opens in a new tab) 의 코드는 아래와 같이 되어있습니다.

package org.reactivestreams;
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

Subscriber 는 Subscriber::onSubscribe(Subscrition) 을 통해서 Publisher 가 수락한 Subscription 객체를 수신합니다.

Subscriber.java (opens in a new tab) 의 코드는 아래와 같습니다.

package org.reactivestreams;
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

Subscriber 객체는 onNext(T) 를 통해서 계속해서 구독하고 있는 이벤트를 수신합니다. 그리고 onError(Throwable) 과 onComplete() 는 최종적으로 단 한번 호출되는데, 예외가 발생했을 경우에는 onError(Throwable) 을 통해 종료가 되고, 정상적으로 종료될 때에는 onComplete() 를 통해서 종료됩니다.


Subscription 객체는 Subscriber 가 Publisher 에게 자신을 subscribe(Subscriber) 할 때 Publisher 가 생성하는 객체이며, 이 것을 Subscriber의 onSubscribe(Subscription) 의 인자값으로 전달해주는 객체입니다.

Subscription.java (opens in a new tab) 의 코드는 아래와 같습니다.

package org.reactivestreams;
public interface Subscription {
    public void request(long n);
    public void cancel();
}

request 함수는 backpressure 를 조절하는 데에 사용됩니다. 그리고 cancel 은 onNext() 작업을 중단(취소)할수 있도록 Subscrition 객체를 이용해서 이벤트의 흐름을 취소할 때 사용되는 메서드입니다.


Hot Publisher, Cold Publisher

Hot Publisher 는 subscriber 가 없는 상태에서도 데이터를 생성해서 stream 에 이벤트를 push 하는 것을 의미합니다. 모든 subscriber 들에게 같은 데이터를 전달합니다. 예를 들면 facebook 의 타임라인이 다른 사람들에게도 전파되는 것과 같은 현상을 예로 들 수 있습니다.

Cold Publisher 는 subscribe 를 하는 순간부터 stream 이벤트가 시작되는 것을 의미합니다. subscriber 에 맞춰서 데이터 스트림을 제공하는 것이 가능합니다. 예를 들면 API 요청 처리, 파일 읽기 작업을 예로 들 수 있습니다.

e.g Publisher, Subscriber, Subscription 예제

예제 코드는 github (opens in a new tab) 에 남겨두었습니다.

간단한 메시지를 Publisher 에서 발행하고, Subscription 내에는 executorService 를 통해서 동시성제어를 수행하고 Subscriber 는 Publisher 가 만든 Subscription 객체를 이용해 지속적으로 request() 함수를 호출하는 예제입니다.

자세한 설명은 추후 시간이 된다면 설명을 추가하도록 하겠습니다.


Message

데이터를 담는 용도의 record 객체입니다.

package io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message;
 
public record Message(
    String message,
    Integer requestedCnt
) {
 
}

NSizedMessageSubscription<Message>

백프레셔 함수인 request() 함수의 정의와, 동시성 제어시 필요한 스레드 풀의 갯수를 간단하게 정의한 executorService 등을 정의해둔 예제 용도의 단순한 기능을 가진 Subscription 클래스 예제입니다.

package io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message;
 
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.RequiredArgsConstructor;
 
@RequiredArgsConstructor
public class NSizedMessageSubscription implements Flow.Subscription{
  private final Subscriber<? super Message> subscriber;
  private final Iterator<String> messages;
  private final ExecutorService executorService = Executors.newSingleThreadExecutor();
 
  // 백프레셔 요청 횟수 기록
  private final AtomicInteger requestCnt = new AtomicInteger(1);
  private final AtomicBoolean isCompleted = new AtomicBoolean(false);
 
  @Override
  public void request(long requestSize) {
    executorService.submit(()->{
      // requestSize 만큼 데이터를 처리
      for(int i=0; i<requestSize; i++){
        if(messages.hasNext()){
          String message = messages.next();
          subscriber.onNext(new Message(message, requestCnt.get()));
        }
        else{ // 더 이상 보낼 데이터가 없다. 종료 진행
          // 현재 isCompleted 가 false 일때 true 로 바꿔준다.
          var isChanged = isCompleted.compareAndSet(false, true);
 
          if(isChanged){
            executorService.shutdown(); // executorService 회수
            subscriber.onComplete();  // subscriber 에 onComplete 이벤트 emit
            isCompleted.set(true);  // isCompleted 를 true 로 세팅
          }
          break;
        }
      }
 
      requestCnt.incrementAndGet();
    });
  }
 
  @Override
  public void cancel() {
    // cancel 시에는 complete 수행
    subscriber.onComplete();
  }
}

NSizedMessagePublisher<Message>

간단한 7개 정도의 메시지를 새로 등록하는 Subscriber 에게 통지하는 간단한 예제 용도의 Publisher 입니다. Subscriber 가 subscribe(Subscriber) 메서드를 통해 자기 자신을 Publisher 에 등록하려 할 때 Publisher 는 Subscription 내에 메시지 데이터를 iterator 로 바인딩하고, subscriber 객체 역시 등록합니다.

그리고 이 작업이 끝나면 Subscriber 의 onSubscribe(Subscription) 메서드를 호출해서 Subscriber 에게도 Subscription 객체를 전달해줍니다.

package io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message;
 
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
public class NSizedMessagePublisher implements Flow.Publisher<Message>{
 
  @Override
  public void subscribe(Subscriber<? super Message> subscriber) {
    var messages = Collections.synchronizedList(
        new ArrayList<>(
            List.of(
                "1.배고파요",
                "2.편의점가요",
                "3.먹을게 없네요",
                "4.다이어트하자요",
                "5.산책해요",
                "6.잠자요",
                "7.퇴근해요")
        )
    );
 
    Iterator<String> iterator = messages.iterator();
    var subscription = new NSizedMessageSubscription(subscriber, iterator);
    subscriber.onSubscribe(subscription);
  }
}

NSizedMessageSubscriber

Subscriber 는 Publisher 가 onSubscribe(Subscription) 을 통해 전달해준 subscription 객체를 자기 자신의 멤버 필드 subscription 에도 바인딩해줍니다. 그리고 이 subscription 객체를 이용해서 backpressure 제어 메서드인 request() 메서드를 호출해서 데이터를 계속해서 소비하며 onComplete() 메시지가 발생할 때 까지 계속해서 소비하게 됩니다.

package io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message;
 
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
@RequiredArgsConstructor
public class NSizedMessageSubscriber<T> implements Flow.Subscriber<T>{
  private final Integer requestSize;
  private Flow.Subscription subscription;
  private int firstRequestSize = 1;
  private int nextCnt = 0;
 
  @Override
  public void onSubscribe(Subscription subscription) {
    this.subscription = subscription;
    this.subscription.request(firstRequestSize);
  }
 
  @Override
  public void onNext(T item) {
    log.info("(onNext) item = {}", item);
    // nextCnt++ 연산을 수행하고, requestSize 만큼 데이터가 한 차례 들어왔음을 판정
    if(nextCnt++ % requestSize == 0){
      // 지정한 requestSize 에 도달해서 onNext 를 수행
      log.info(">>> onNext : request 를 publisher 의 subscription 에 전송합니다.");
      this.subscription.request(requestSize);
    }
  }
 
  @Override
  public void onError(Throwable throwable) {
    log.error("error : {}", throwable.getMessage());
  }
 
  @Override
  public void onComplete() {
    log.info("complete");
  }
}

출력결과

12:31:01.664 [pool-1-thread-1] INFO io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message.NSizedMessageSubscriber -- (onNext) item = Message[message=1.배고파요, requestedCnt=1]
12:31:01.689 [pool-1-thread-1] INFO io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message.NSizedMessageSubscriber -- >>> onNext : request 를 publisher 의 subscription 에 전송합니다.
12:31:01.690 [pool-1-thread-1] INFO io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message.NSizedMessageSubscriber -- (onNext) item = Message[message=2.편의점가요, requestedCnt=2]
12:31:01.690 [pool-1-thread-1] INFO io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message.NSizedMessageSubscriber -- (onNext) item = Message[message=3.먹을게 없네요, requestedCnt=2]
12:31:01.690 [pool-1-thread-1] INFO io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message.NSizedMessageSubscriber -- (onNext) item = Message[message=4.다이어트하자요, requestedCnt=2]
12:31:01.690 [pool-1-thread-1] INFO io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message.NSizedMessageSubscriber -- (onNext) item = Message[message=5.산책해요, requestedCnt=2]
12:31:01.690 [pool-1-thread-1] INFO io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message.NSizedMessageSubscriber -- (onNext) item = Message[message=6.잠자요, requestedCnt=2]
12:31:01.690 [pool-1-thread-1] INFO io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message.NSizedMessageSubscriber -- >>> onNext : request 를 publisher 의 subscription 에 전송합니다.
12:31:01.690 [pool-1-thread-1] INFO io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message.NSizedMessageSubscriber -- (onNext) item = Message[message=7.퇴근해요, requestedCnt=3]
12:31:01.690 [pool-1-thread-1] INFO io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message.NSizedMessageSubscriber -- complete

Process finished with exit code 0