Reactor 프로그래밍 3 (Error 핸들링)

Reactor 프로그래밍 3 (Error 핸들링)

Error

onError 이벤트

Reactive Streams 에서는 Error 가 발생하면 Error 이벤트를 전달하며, onError 이벤트가 발생하면 더 이상 onNext, onComplete 이벤트가 생산되지 않고 종료됩니다.

Reactor 에서도 onError 이벤트가 발생하면 그 이후부터는 더 이상 데이터의 흐름(Stream, Pipe)을 처리하지 않고 onError 이벤트만 뒤로 계속해서 전파하게 됩니다.


onError 이벤트를 처리할 때에는 아래의 방식으로 처리할 수 있습니다.

  • 에러 발생시 해야할 작업을 따로 지정
  • 고정된 값을 반환
  • publisher 를 반환
  • onComplete 이벤트로 변환
  • 다른 에러로 변환

에러 핸들링이 없을 때는 어떻게 동작할까요?

아래와 같이 에러 핸들링이 없을 때는 아래와 같은 문구 처럼 onErrorDropped 를 호출합니다.

  • Operator called default onErrorDropped

예제

package io.chagchagchag.example.foobar.spring_webflux.error_handling;
 
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class NoErrorHanding_Example {
  public static void main(String[] args) {
    Flux.create(fluxSink -> {
      try{
        Thread.sleep(1000);
      } catch (InterruptedException e){
        throw new RuntimeException(e);
      }
      fluxSink.error(new RuntimeException("에러가 발생했어요"));
    }).subscribe();
  }
}
 

출력결과를 보면, 가장 처음으로 Operator called default onErrorDropped 문구가 나타난 것을 확인 가능합니다.

출력결과

14:18:47.591 [main] ERROR reactor.core.publisher.Operators -- Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: 에러가 발생했어요
Caused by: java.lang.RuntimeException: 에러가 발생했어요
	at io.chagchagchag.example.foobar.spring_webflux.error_handling.NoErrorHanding_Example.lambda$main$0(NoErrorHanding_Example.java:15)
	at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:97)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8777)
	at reactor.core.publisher.Flux.subscribeWith(Flux.java:8898)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8742)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8666)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8584)
	at io.chagchagchag.example.foobar.spring_webflux.error_handling.NoErrorHanding_Example.main(NoErrorHanding_Example.java:16)

Process finished with exit code 0

onErrorDropped

onErrorDropped 에서는 어떻게 로그가 출력되는 걸까요? onErrorDropped 내부에서 로그를 출력하도록 되어 있기 때문입니다.

public abstract class Operators {
    
    // ... 
    
	public static void onErrorDropped(Throwable e, Context context) {
		Consumer<? super Throwable> hook = context.getOrDefault(Hooks.KEY_ON_ERROR_DROPPED,null);
		if (hook == null) {
			hook = Hooks.onErrorDroppedHook;
		}
		if (hook == null) {
			log.error("Operator called default onErrorDropped", e);
			return;
		}
		hook.accept(e);
	}
}

에러 발생시 해야 할 작업을 따로 지정

onErrorConsumer 로 컨슈머 처리

아래 코드에서는 여러가지 subscribe() 함수 중 subscribe (consumer, errorConsumer) (opens in a new tab) 를 사용했습니다.

package io.chagchagchag.example.foobar.spring_webflux.error_handling;
 
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class ErrorConsumer_Example {
 
  public static void main(String[] args) {
    Flux.error(new RuntimeException("error"))
        .subscribe(
            v -> {
              log.info("v ::: " + v);
            },
            error -> {
              log.info("error ::: " + error);
            }
        );
  }
}

출력결과

16:21:37.048 [main] INFO io...spring_webflux.error_handling.ErrorConsumer_Example -- error ::: java.lang.RuntimeException: error

Process finished with exit code 0

doOnError

Error 를 변환하지 않고 별도로 로깅을 한다거나 하는 작업만 수행하려 할 때 사용합니다. 파이프라인 흐름에 영향을 주지 않고 수행 가능합니다.

package io.chagchagchag.example.foobar.spring_webflux.error_handling;
 
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class DoOnError_Example {
  public static void main(String[] args) {
    Flux.error(new RuntimeException("에러가 났어요"))
        .doOnError(error -> {
          log.info("doOnError ::: " + error);
        })
        .subscribe(
            v -> {
              log.info("v ::: " + v);
            },
            error -> {
              log.info("error ::: " + error);
            }
        );
  }
}

출력결과

16:20:58.209 [main] INFO io...spring_webflux.error_handling.DoOnError_Example -- doOnError ::: java.lang.RuntimeException: 에러가 났어요
16:20:58.211 [main] INFO io...spring_webflux.error_handling.DoOnError_Example -- error ::: java.lang.RuntimeException: 에러가 났어요

Process finished with exit code 0

고정된 값을 반환하도록 처리

onErrorReturn

onErrorReturn 함수를 사용하면 onError 발생시 onErrorReturn 에 지정한 값이 이후 부터 데이터 흐름을 통해서 전달됩니다.

e.g 1. 정상적으로 onErrorReturn 을 사용하는 경우

package io.chagchagchag.example.foobar.spring_webflux.error_handling;
 
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class OnErrorReturn_Example1 {
  public static void main(String[] args) {
    Flux.error(new RuntimeException("error"))
        .onErrorReturn("에러가 발생했어요")
        .subscribe(v -> {
          log.info("v ::: " + v);
        });
  }
}

에러가 발생했을때 onErrorReturn(값) 에서 리턴값으로 지정한 값이 출력되는 것을 확인 가능합니다.

출력결과

15:00:08.926 [main] INFO io...spring_webflux.error_handling.OnErrorReturn_Example1 -- v ::: 에러가 발생했어요

Process finished with exit code 0

e.g 2. 함수 호출을 onErrorReturn 의 인자로 사용되는 경우 - error 가 아니어도 호출된다. 주의!

onErrorReturn 내에 값을 넣는 것이 아니라 함수 호출을 넣으면 함수가 무조건 실행되게 됩니다. error 가 아니어도 호출되기에 주의가 필요합니다.

package io.chagchagchag.example.foobar.spring_webflux.error_handling;
 
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class OnErrorReturn_Example2 {
  public static void main(String[] args) {
    Flux.error(new RuntimeException("error"))
        .onErrorReturn(errorMessage())
        .subscribe(v -> {
          log.info("v ::: " + v);
        });
  }
  
  public static String errorMessage(){
    return "에러가 발생했어요";
  }
}

출력결과

15:04:30.788 [main] INFO io...spring_webflux.error_handling.OnErrorReturn_Example2 -- v ::: 에러가 발생했어요

Process finished with exit code 0

publisher 를 반환하도록 처리

onErrorResume

참고

onErrorResume 은 onError 이벤트에 대해서 Publisher 를 반환하는 Function 을 실행하는 함수입니다.


e.g 1.

package io.chagchagchag.example.foobar.spring_webflux.error_handling;
 
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
 
@Slf4j
public class OnErrorResume_Example1 {
  public static void main(String[] args) {
    Flux.error(new RuntimeException("error"))
        .onErrorResume(new Function<Throwable, Publisher<?>>() {
          @Override
          public Publisher<?> apply(Throwable throwable) {
            return Flux.just("배고파요", "밥먹어요", "배불러요");
          }
        })
        .subscribe(v -> {
          log.info("v ::: " + v);
        });
  }
}
 

출력결과

15:10:46.555 [main] INFO io...spring_webflux.error_handling.OnErrorResume_Example1 -- v ::: 배고파요
15:10:46.556 [main] INFO io...spring_webflux.error_handling.OnErrorResume_Example1 -- v ::: 밥먹어요
15:10:46.556 [main] INFO io...spring_webflux.error_handling.OnErrorResume_Example1 -- v ::: 배불러요

Process finished with exit code 0

e.g 2

onErrorReturn 과는 다르게 onErrorResume 은 에러가 발생하지 않으면, 에러에 대한 값을 subscribe 하지 않습니다.

package io.chagchagchag.example.foobar.spring_webflux.error_handling;
 
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
 
@Slf4j
public class OnErrorResume_Example2 {
  public static void main(String[] args) {
    Flux.just("배고파요", "밥먹어요", "배불러요")
        .onErrorResume(e -> Mono.just(errorMessage()))
        .subscribe(v -> log.info("v ::: " + v));
  }
 
  public static String errorMessage(){
    return "에러가 발생했어요";
  }
}

출력결과

15:20:36.654 [main] INFO io...spring_webflux.error_handling.OnErrorResume_Example2 -- v ::: 배고파요
15:20:36.655 [main] INFO io...spring_webflux.error_handling.OnErrorResume_Example2 -- v ::: 밥먹어요
15:20:36.656 [main] INFO io...spring_webflux.error_handling.OnErrorResume_Example2 -- v ::: 배불러요

Process finished with exit code 0

onComplete 이벤트로 변환하도록 처리

onErrorComplete

onErrorComplete 메서드를 사용하면 onError 이벤트를 onComplete 이벤트로 변경합니다.

error 이벤트가 complete 이벤트로 변경되었기 때문에 errorConsumer 가 동작하지 않는 것을 출력결과에서 확인 가능합니다.

package io.chagchagchag.example.foobar.spring_webflux.error_handling;
 
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class OnErrorComplete_Example1 {
  public static void main(String[] args) {
    Flux.create(fluxSink -> {
      fluxSink.next(1);
      fluxSink.next(2);
      fluxSink.error(new RuntimeException("에러가 발생했어요"));
    }).onErrorComplete()
        .subscribe(
            v -> {log.info("v ::: " + v);},
            e -> {log.info("e ::: " + e);},
            () -> {log.info("complete");}
        );
  }
}

출력결과

15:26:43.964 [main] INFO io...spring_webflux.error_handling.OnErrorComplete_Example1 -- v ::: 1
15:26:43.966 [main] INFO io...spring_webflux.error_handling.OnErrorComplete_Example1 -- v ::: 2
15:26:43.968 [main] INFO io...spring_webflux.error_handling.OnErrorComplete_Example1 -- complete

Process finished with exit code 0

다른 에러로 변환하도록 처리

onErrorResume + Flux.error

onErrorResume 과 Flux.error 를 활용하면 여러가지 Exception 을 비즈니스 요구사항에 맞도록 변환하는 것이 가능합니다.

package io.chagchagchag.example.foobar.spring_webflux.error_handling;
 
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class ErrorToAnotherError_Example1 {
  public static void main(String[] args) {
    Flux.error(new IllegalArgumentException("잘못된 요청이에요"))
        .onErrorResume(
            e -> Flux.error(new ArgumentAmbiguousException("필수파라미터가 누락되었습니다.", e))
        )
        .subscribe(
            v -> {
              log.info("v ::: " + v);
            },
            err -> {
              log.info("error ::: " + err);
            }
        );
  }
 
  static class ArgumentAmbiguousException extends IllegalArgumentException{
 
    public ArgumentAmbiguousException(String message, Throwable cause) {
      super(message, cause);
    }
  }
}

출력결과

16:01:40.059 [main] INFO io...spring_webflux.error_handling.ErrorToAnotherError_Example1 -- error ::: io.chagchagchag.example.foobar.spring_webflux.error_handling.ErrorToAnotherError_Example1$ArgumentAmbiguousException: 필수파라미터가 누락되었습니다.

Process finished with exit code 0

onErrorMap

onError 이벤트 발생시 다른 에러로 변환할 수 있습니다. 저수준의 에러를 고수준의 에러, 비즈니스 요구사항에 맞는 에러로 변환 가능합니다. 변환만 해주기에 추가적인 에러 핸들링 관련 로직은 직접 작성해야 합니다.

package io.chagchagchag.example.foobar.spring_webflux.error_handling;
 
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
 
@Slf4j
public class ErrorToAnotherError_Example2 {
  public static void main(String[] args) {
    Flux.error(new IllegalArgumentException("잘못된 요청이에요"))
        .onErrorMap(e -> new ArgumentAmbiguousException("필수파라미터가 누락되었습니다.", e))
        .subscribe(
            v -> {
              log.info("v ::: " + v);
            },
            err -> {
              log.info("error ::: " + err);
            }
        );
  }
 
  static class ArgumentAmbiguousException extends IllegalArgumentException{
 
    public ArgumentAmbiguousException(String message, Throwable cause) {
      super(message, cause);
    }
  }
}

출력결과

16:12:57.817 [main] INFO io...spring_webflux.error_handling.ErrorToAnotherError_Example2 -- error ::: io.chagchagchag.example.foobar.spring_webflux.error_handling.ErrorToAnotherError_Example2$ArgumentAmbiguousException: 필수파라미터가 누락되었습니다.

Process finished with exit code 0