Reactor 프로그래밍 3 (Error 핸들링)
Error
onError 이벤트
Reactive Streams 에서는 Error 가 발생하면 Error 이벤트를 전달하며, onError 이벤트가 발생하면 더 이상 onNext, onComplete 이벤트가 생산되지 않고 종료됩니다.
Reactor 에서도 onError 이벤트가 발생하면 그 이후부터는 더 이상 데이터의 흐름(Stream, Pipe)을 처리하지 않고 onError 이벤트만 뒤로 계속해서 전파하게 됩니다.
onError 이벤트를 처리할 때에는 아래의 방식으로 처리할 수 있습니다.
- 에러 발생시 해야할 작업을 따로 지정
- 고정된 값을 반환
- publisher 를 반환
- onComplete 이벤트로 변환
- 다른 에러로 변환
에러 핸들링이 없을 때는 어떻게 동작할까요?
아래와 같이 에러 핸들링이 없을 때는 아래와 같은 문구 처럼 onErrorDropped 를 호출합니다.
Operator called default onErrorDropped
예제
package io.chagchagchag.example.foobar.spring_webflux.error_handling;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class NoErrorHanding_Example {
public static void main(String[] args) {
Flux.create(fluxSink -> {
try{
Thread.sleep(1000);
} catch (InterruptedException e){
throw new RuntimeException(e);
}
fluxSink.error(new RuntimeException("에러가 발생했어요"));
}).subscribe();
}
}
출력결과를 보면, 가장 처음으로 Operator called default onErrorDropped
문구가 나타난 것을 확인 가능합니다.
출력결과
14:18:47.591 [main] ERROR reactor.core.publisher.Operators -- Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: 에러가 발생했어요
Caused by: java.lang.RuntimeException: 에러가 발생했어요
at io.chagchagchag.example.foobar.spring_webflux.error_handling.NoErrorHanding_Example.lambda$main$0(NoErrorHanding_Example.java:15)
at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:97)
at reactor.core.publisher.Flux.subscribe(Flux.java:8777)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:8898)
at reactor.core.publisher.Flux.subscribe(Flux.java:8742)
at reactor.core.publisher.Flux.subscribe(Flux.java:8666)
at reactor.core.publisher.Flux.subscribe(Flux.java:8584)
at io.chagchagchag.example.foobar.spring_webflux.error_handling.NoErrorHanding_Example.main(NoErrorHanding_Example.java:16)
Process finished with exit code 0
onErrorDropped
onErrorDropped 에서는 어떻게 로그가 출력되는 걸까요? onErrorDropped 내부에서 로그를 출력하도록 되어 있기 때문입니다.
public abstract class Operators {
// ...
public static void onErrorDropped(Throwable e, Context context) {
Consumer<? super Throwable> hook = context.getOrDefault(Hooks.KEY_ON_ERROR_DROPPED,null);
if (hook == null) {
hook = Hooks.onErrorDroppedHook;
}
if (hook == null) {
log.error("Operator called default onErrorDropped", e);
return;
}
hook.accept(e);
}
}
에러 발생시 해야 할 작업을 따로 지정
onErrorConsumer 로 컨슈머 처리
아래 코드에서는 여러가지 subscribe() 함수 중 subscribe (consumer, errorConsumer) (opens in a new tab) 를 사용했습니다.
package io.chagchagchag.example.foobar.spring_webflux.error_handling;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class ErrorConsumer_Example {
public static void main(String[] args) {
Flux.error(new RuntimeException("error"))
.subscribe(
v -> {
log.info("v ::: " + v);
},
error -> {
log.info("error ::: " + error);
}
);
}
}
출력결과
16:21:37.048 [main] INFO io...spring_webflux.error_handling.ErrorConsumer_Example -- error ::: java.lang.RuntimeException: error
Process finished with exit code 0
doOnError
Error 를 변환하지 않고 별도로 로깅을 한다거나 하는 작업만 수행하려 할 때 사용합니다. 파이프라인 흐름에 영향을 주지 않고 수행 가능합니다.
package io.chagchagchag.example.foobar.spring_webflux.error_handling;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class DoOnError_Example {
public static void main(String[] args) {
Flux.error(new RuntimeException("에러가 났어요"))
.doOnError(error -> {
log.info("doOnError ::: " + error);
})
.subscribe(
v -> {
log.info("v ::: " + v);
},
error -> {
log.info("error ::: " + error);
}
);
}
}
출력결과
16:20:58.209 [main] INFO io...spring_webflux.error_handling.DoOnError_Example -- doOnError ::: java.lang.RuntimeException: 에러가 났어요
16:20:58.211 [main] INFO io...spring_webflux.error_handling.DoOnError_Example -- error ::: java.lang.RuntimeException: 에러가 났어요
Process finished with exit code 0
고정된 값을 반환하도록 처리
onErrorReturn
onErrorReturn 함수를 사용하면 onError 발생시 onErrorReturn 에 지정한 값이 이후 부터 데이터 흐름을 통해서 전달됩니다.
e.g 1. 정상적으로 onErrorReturn 을 사용하는 경우
package io.chagchagchag.example.foobar.spring_webflux.error_handling;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class OnErrorReturn_Example1 {
public static void main(String[] args) {
Flux.error(new RuntimeException("error"))
.onErrorReturn("에러가 발생했어요")
.subscribe(v -> {
log.info("v ::: " + v);
});
}
}
에러가 발생했을때 onErrorReturn(값) 에서 리턴값으로 지정한 값이 출력되는 것을 확인 가능합니다.
출력결과
15:00:08.926 [main] INFO io...spring_webflux.error_handling.OnErrorReturn_Example1 -- v ::: 에러가 발생했어요
Process finished with exit code 0
e.g 2. 함수 호출을 onErrorReturn 의 인자로 사용되는 경우 - error 가 아니어도 호출된다. 주의!
onErrorReturn 내에 값을 넣는 것이 아니라 함수 호출을 넣으면 함수가 무조건 실행되게 됩니다. error 가 아니어도 호출되기에 주의가 필요합니다.
package io.chagchagchag.example.foobar.spring_webflux.error_handling;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class OnErrorReturn_Example2 {
public static void main(String[] args) {
Flux.error(new RuntimeException("error"))
.onErrorReturn(errorMessage())
.subscribe(v -> {
log.info("v ::: " + v);
});
}
public static String errorMessage(){
return "에러가 발생했어요";
}
}
출력결과
15:04:30.788 [main] INFO io...spring_webflux.error_handling.OnErrorReturn_Example2 -- v ::: 에러가 발생했어요
Process finished with exit code 0
publisher 를 반환하도록 처리
onErrorResume
참고
onErrorResume 은 onError 이벤트에 대해서 Publisher 를 반환하는 Function 을 실행하는 함수입니다.
e.g 1.
package io.chagchagchag.example.foobar.spring_webflux.error_handling;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@Slf4j
public class OnErrorResume_Example1 {
public static void main(String[] args) {
Flux.error(new RuntimeException("error"))
.onErrorResume(new Function<Throwable, Publisher<?>>() {
@Override
public Publisher<?> apply(Throwable throwable) {
return Flux.just("배고파요", "밥먹어요", "배불러요");
}
})
.subscribe(v -> {
log.info("v ::: " + v);
});
}
}
출력결과
15:10:46.555 [main] INFO io...spring_webflux.error_handling.OnErrorResume_Example1 -- v ::: 배고파요
15:10:46.556 [main] INFO io...spring_webflux.error_handling.OnErrorResume_Example1 -- v ::: 밥먹어요
15:10:46.556 [main] INFO io...spring_webflux.error_handling.OnErrorResume_Example1 -- v ::: 배불러요
Process finished with exit code 0
e.g 2
onErrorReturn 과는 다르게 onErrorResume 은 에러가 발생하지 않으면, 에러에 대한 값을 subscribe 하지 않습니다.
package io.chagchagchag.example.foobar.spring_webflux.error_handling;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
public class OnErrorResume_Example2 {
public static void main(String[] args) {
Flux.just("배고파요", "밥먹어요", "배불러요")
.onErrorResume(e -> Mono.just(errorMessage()))
.subscribe(v -> log.info("v ::: " + v));
}
public static String errorMessage(){
return "에러가 발생했어요";
}
}
출력결과
15:20:36.654 [main] INFO io...spring_webflux.error_handling.OnErrorResume_Example2 -- v ::: 배고파요
15:20:36.655 [main] INFO io...spring_webflux.error_handling.OnErrorResume_Example2 -- v ::: 밥먹어요
15:20:36.656 [main] INFO io...spring_webflux.error_handling.OnErrorResume_Example2 -- v ::: 배불러요
Process finished with exit code 0
onComplete 이벤트로 변환하도록 처리
onErrorComplete
onErrorComplete 메서드를 사용하면 onError 이벤트를 onComplete 이벤트로 변경합니다.
error 이벤트가 complete 이벤트로 변경되었기 때문에 errorConsumer 가 동작하지 않는 것을 출력결과에서 확인 가능합니다.
package io.chagchagchag.example.foobar.spring_webflux.error_handling;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class OnErrorComplete_Example1 {
public static void main(String[] args) {
Flux.create(fluxSink -> {
fluxSink.next(1);
fluxSink.next(2);
fluxSink.error(new RuntimeException("에러가 발생했어요"));
}).onErrorComplete()
.subscribe(
v -> {log.info("v ::: " + v);},
e -> {log.info("e ::: " + e);},
() -> {log.info("complete");}
);
}
}
출력결과
15:26:43.964 [main] INFO io...spring_webflux.error_handling.OnErrorComplete_Example1 -- v ::: 1
15:26:43.966 [main] INFO io...spring_webflux.error_handling.OnErrorComplete_Example1 -- v ::: 2
15:26:43.968 [main] INFO io...spring_webflux.error_handling.OnErrorComplete_Example1 -- complete
Process finished with exit code 0
다른 에러로 변환하도록 처리
onErrorResume + Flux.error
onErrorResume 과 Flux.error 를 활용하면 여러가지 Exception 을 비즈니스 요구사항에 맞도록 변환하는 것이 가능합니다.
package io.chagchagchag.example.foobar.spring_webflux.error_handling;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class ErrorToAnotherError_Example1 {
public static void main(String[] args) {
Flux.error(new IllegalArgumentException("잘못된 요청이에요"))
.onErrorResume(
e -> Flux.error(new ArgumentAmbiguousException("필수파라미터가 누락되었습니다.", e))
)
.subscribe(
v -> {
log.info("v ::: " + v);
},
err -> {
log.info("error ::: " + err);
}
);
}
static class ArgumentAmbiguousException extends IllegalArgumentException{
public ArgumentAmbiguousException(String message, Throwable cause) {
super(message, cause);
}
}
}
출력결과
16:01:40.059 [main] INFO io...spring_webflux.error_handling.ErrorToAnotherError_Example1 -- error ::: io.chagchagchag.example.foobar.spring_webflux.error_handling.ErrorToAnotherError_Example1$ArgumentAmbiguousException: 필수파라미터가 누락되었습니다.
Process finished with exit code 0
onErrorMap
onError 이벤트 발생시 다른 에러로 변환할 수 있습니다. 저수준의 에러를 고수준의 에러, 비즈니스 요구사항에 맞는 에러로 변환 가능합니다. 변환만 해주기에 추가적인 에러 핸들링 관련 로직은 직접 작성해야 합니다.
package io.chagchagchag.example.foobar.spring_webflux.error_handling;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class ErrorToAnotherError_Example2 {
public static void main(String[] args) {
Flux.error(new IllegalArgumentException("잘못된 요청이에요"))
.onErrorMap(e -> new ArgumentAmbiguousException("필수파라미터가 누락되었습니다.", e))
.subscribe(
v -> {
log.info("v ::: " + v);
},
err -> {
log.info("error ::: " + err);
}
);
}
static class ArgumentAmbiguousException extends IllegalArgumentException{
public ArgumentAmbiguousException(String message, Throwable cause) {
super(message, cause);
}
}
}
출력결과
16:12:57.817 [main] INFO io...spring_webflux.error_handling.ErrorToAnotherError_Example2 -- error ::: io.chagchagchag.example.foobar.spring_webflux.error_handling.ErrorToAnotherError_Example2$ArgumentAmbiguousException: 필수파라미터가 누락되었습니다.
Process finished with exit code 0