Reactive Streams - Publisher, Subscriber 개념, 예제
참고
-
reactive streams github (opens in a new tab), reactive-streams-jvm (opens in a new tab)
-
Publisher (opens in a new tab) , Subscriber (opens in a new tab), Subscription (opens in a new tab)
-
Publisher.java (opens in a new tab) , Subscriber.java (opens in a new tab) , Subscription.java (opens in a new tab)
예제 코드
Publisher, Subscriber, Subscription
참고
Publisher 는 Subscriber 를 등록할 수 있습니다. Subscriber 는 Publisher 에 자기 자신을 등록하기 위해 Publisher::subscribe(Subscriber) 메서드를 실행해서 자기 자신을 등록합니다.
Publisher.java (opens in a new tab) 의 코드는 아래와 같이 되어있습니다.
package org.reactivestreams;
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
Subscriber 는 Subscriber::onSubscribe(Subscrition) 을 통해서 Publisher 가 수락한 Subscription 객체를 수신합니다.
Subscriber.java (opens in a new tab) 의 코드는 아래와 같습니다.
package org.reactivestreams;
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
Subscriber 객체는 onNext(T) 를 통해서 계속해서 구독하고 있는 이벤트를 수신합니다. 그리고 onError(Throwable) 과 onComplete() 는 최종적으로 단 한번 호출되는데, 예외가 발생했을 경우에는 onError(Throwable) 을 통해 종료가 되고, 정상적으로 종료될 때에는 onComplete() 를 통해서 종료됩니다.
Subscription 객체는 Subscriber 가 Publisher 에게 자신을 subscribe(Subscriber) 할 때 Publisher 가 생성하는 객체이며, 이 것을 Subscriber의 onSubscribe(Subscription) 의 인자값으로 전달해주는 객체입니다.
Subscription.java (opens in a new tab) 의 코드는 아래와 같습니다.
package org.reactivestreams;
public interface Subscription {
public void request(long n);
public void cancel();
}
request 함수는 backpressure 를 조절하는 데에 사용됩니다. 그리고 cancel 은 onNext() 작업을 중단(취소)할수 있도록 Subscrition 객체를 이용해서 이벤트의 흐름을 취소할 때 사용되는 메서드입니다.
Hot Publisher, Cold Publisher
Hot Publisher 는 subscriber 가 없는 상태에서도 데이터를 생성해서 stream 에 이벤트를 push 하는 것을 의미합니다. 모든 subscriber 들에게 같은 데이터를 전달합니다. 예를 들면 facebook 의 타임라인이 다른 사람들에게도 전파되는 것과 같은 현상을 예로 들 수 있습니다.
Cold Publisher 는 subscribe 를 하는 순간부터 stream 이벤트가 시작되는 것을 의미합니다. subscriber 에 맞춰서 데이터 스트림을 제공하는 것이 가능합니다. 예를 들면 API 요청 처리, 파일 읽기 작업을 예로 들 수 있습니다.
e.g Publisher, Subscriber, Subscription 예제
예제 코드는 github (opens in a new tab) 에 남겨두었습니다.
간단한 메시지를 Publisher 에서 발행하고, Subscription 내에는 executorService 를 통해서 동시성제어를 수행하고 Subscriber 는 Publisher 가 만든 Subscription 객체를 이용해 지속적으로 request()
함수를 호출하는 예제입니다.
자세한 설명은 추후 시간이 된다면 설명을 추가하도록 하겠습니다.
Message
데이터를 담는 용도의 record
객체입니다.
package io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message;
public record Message(
String message,
Integer requestedCnt
) {
}
NSizedMessageSubscription<Message>
백프레셔 함수인 request() 함수의 정의와, 동시성 제어시 필요한 스레드 풀의 갯수를 간단하게 정의한 executorService 등을 정의해둔 예제 용도의 단순한 기능을 가진 Subscription 클래스 예제입니다.
package io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public class NSizedMessageSubscription implements Flow.Subscription{
private final Subscriber<? super Message> subscriber;
private final Iterator<String> messages;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
// 백프레셔 요청 횟수 기록
private final AtomicInteger requestCnt = new AtomicInteger(1);
private final AtomicBoolean isCompleted = new AtomicBoolean(false);
@Override
public void request(long requestSize) {
executorService.submit(()->{
// requestSize 만큼 데이터를 처리
for(int i=0; i<requestSize; i++){
if(messages.hasNext()){
String message = messages.next();
subscriber.onNext(new Message(message, requestCnt.get()));
}
else{ // 더 이상 보낼 데이터가 없다. 종료 진행
// 현재 isCompleted 가 false 일때 true 로 바꿔준다.
var isChanged = isCompleted.compareAndSet(false, true);
if(isChanged){
executorService.shutdown(); // executorService 회수
subscriber.onComplete(); // subscriber 에 onComplete 이벤트 emit
isCompleted.set(true); // isCompleted 를 true 로 세팅
}
break;
}
}
requestCnt.incrementAndGet();
});
}
@Override
public void cancel() {
// cancel 시에는 complete 수행
subscriber.onComplete();
}
}
NSizedMessagePublisher<Message>
간단한 7개 정도의 메시지를 새로 등록하는 Subscriber 에게 통지하는 간단한 예제 용도의 Publisher 입니다. Subscriber 가 subscribe(Subscriber) 메서드를 통해 자기 자신을 Publisher 에 등록하려 할 때 Publisher 는 Subscription 내에 메시지 데이터를 iterator 로 바인딩하고, subscriber 객체 역시 등록합니다.
그리고 이 작업이 끝나면 Subscriber 의 onSubscribe(Subscription) 메서드를 호출해서 Subscriber 에게도 Subscription 객체를 전달해줍니다.
package io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NSizedMessagePublisher implements Flow.Publisher<Message>{
@Override
public void subscribe(Subscriber<? super Message> subscriber) {
var messages = Collections.synchronizedList(
new ArrayList<>(
List.of(
"1.배고파요",
"2.편의점가요",
"3.먹을게 없네요",
"4.다이어트하자요",
"5.산책해요",
"6.잠자요",
"7.퇴근해요")
)
);
Iterator<String> iterator = messages.iterator();
var subscription = new NSizedMessageSubscription(subscriber, iterator);
subscriber.onSubscribe(subscription);
}
}
NSizedMessageSubscriber
Subscriber 는 Publisher 가 onSubscribe(Subscription) 을 통해 전달해준 subscription 객체를 자기 자신의 멤버 필드 subscription
에도 바인딩해줍니다. 그리고 이 subscription 객체를 이용해서 backpressure 제어 메서드인 request() 메서드를 호출해서 데이터를 계속해서 소비하며 onComplete() 메시지가 발생할 때 까지 계속해서 소비하게 됩니다.
package io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RequiredArgsConstructor
public class NSizedMessageSubscriber<T> implements Flow.Subscriber<T>{
private final Integer requestSize;
private Flow.Subscription subscription;
private int firstRequestSize = 1;
private int nextCnt = 0;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(firstRequestSize);
}
@Override
public void onNext(T item) {
log.info("(onNext) item = {}", item);
// nextCnt++ 연산을 수행하고, requestSize 만큼 데이터가 한 차례 들어왔음을 판정
if(nextCnt++ % requestSize == 0){
// 지정한 requestSize 에 도달해서 onNext 를 수행
log.info(">>> onNext : request 를 publisher 의 subscription 에 전송합니다.");
this.subscription.request(requestSize);
}
}
@Override
public void onError(Throwable throwable) {
log.error("error : {}", throwable.getMessage());
}
@Override
public void onComplete() {
log.info("complete");
}
}
출력결과
12:31:01.664 [pool-1-thread-1] INFO io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message.NSizedMessageSubscriber -- (onNext) item = Message[message=1.배고파요, requestedCnt=1]
12:31:01.689 [pool-1-thread-1] INFO io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message.NSizedMessageSubscriber -- >>> onNext : request 를 publisher 의 subscription 에 전송합니다.
12:31:01.690 [pool-1-thread-1] INFO io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message.NSizedMessageSubscriber -- (onNext) item = Message[message=2.편의점가요, requestedCnt=2]
12:31:01.690 [pool-1-thread-1] INFO io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message.NSizedMessageSubscriber -- (onNext) item = Message[message=3.먹을게 없네요, requestedCnt=2]
12:31:01.690 [pool-1-thread-1] INFO io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message.NSizedMessageSubscriber -- (onNext) item = Message[message=4.다이어트하자요, requestedCnt=2]
12:31:01.690 [pool-1-thread-1] INFO io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message.NSizedMessageSubscriber -- (onNext) item = Message[message=5.산책해요, requestedCnt=2]
12:31:01.690 [pool-1-thread-1] INFO io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message.NSizedMessageSubscriber -- (onNext) item = Message[message=6.잠자요, requestedCnt=2]
12:31:01.690 [pool-1-thread-1] INFO io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message.NSizedMessageSubscriber -- >>> onNext : request 를 publisher 의 subscription 에 전송합니다.
12:31:01.690 [pool-1-thread-1] INFO io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message.NSizedMessageSubscriber -- (onNext) item = Message[message=7.퇴근해요, requestedCnt=3]
12:31:01.690 [pool-1-thread-1] INFO io.chagchagchag.example.foobar.reactive_streams.publisher_subscriber.n_sized_message.NSizedMessageSubscriber -- complete
Process finished with exit code 0