파란하늘의 지식창고
article thumbnail
Published 2020. 1. 2. 08:44
Reactor 레퍼런스 문서 공부 Study/Java
반응형

Reactor 레퍼런스 문서 공부.


3.2 Asynchronicity to the Rescue?

JVM에서 비동기 코드를 어떻게 개발할 수 있을까?

자바는 비동기 프로그래밍을 위한 두 가지 모델을  제공한다.

Callbacks

비동기 메서드는 반환 값을 가질 수 없지만 외부 callback 매개 변수 (람다나 익명 클래스)를 통해 결과를 호출받을 수 있다. 

(ex : Swing의 EventListener) 

Futures 

비동기 메서드는 Future를 즉시 리턴한다. 비동기 프로세스는 T value를 계산하지만 Future 개체는 이에 대한 access를 래핑 한다. 값을 즉시 사용할 수 없으며 사용할 수 있을 때까지 값을 polling 할 수 있다. 

(ex: Callable 작업을 실행하는 ExecutorService는 Future 개체를 사용한다.) 

하지만 여러 문제를 가지고 있다.

Callback Hell

Java 8에서 callback보다 조금 나은, 향상된 CompletableFuture를 제공하고 있지만 여전히 구성이 좋지 않다.

여러 Future object를 함께 조율하는 것은 가능하지만 복잡하다.

  • get() 메소드 호출로 인한 다른 blocking 상황으로 종료되기 쉬움
  • 지연 결합을 지원하지 않음
  • 복합 값에 대한 지원과 향상된 에러 핸들링 지원이 부족 

3.3 From Imperative to Reactive Programming

Reactor와 같은 reactive 라이브러리는 JVM의 classic 비동기 방식의 여러 단점을 해결하는 것을 목표로 하며 몇 가지 추가적인 측면을 중점으로 한다.

  • 결합성과 가독성
  • 풍부한 연산자를 사용하여 데이터를 flow로 처리
  • 구독(subscribe) 하기 전까진 아무 일도 발생하지 않음
  • 제공자에게 구독에 대한 상황을 전달하는 backpressure를 제공 (그래야 제공에 대한 부가 처리를 할 수 있으니까?)
  • 동시성에 구애받지 않는 높은 수준의 추상화

3.3.6 hot, cold 개념

subscribe(구독) 개념으로 인해 사용되는 개념

Cold 

Flux나 Mono를 subscribe 할 때마다 매번 독립적으로 새로 데이터를 생성해서 동작, subscribe 호출 전까지 아무런 동작도 하지 않고, subscribe를 호출하면 새로운 데이터를 생성. 

기본적으로 특별하게 hot을 취급하는 연산자가 아닌 이상 Flux나 Mono는 Cold로 동작함 

Hot  이미 데이터가 생성된 상테에서 구독을 하면 구독한 시점 이후에 발생하는 데이터부터 시퀀스를 받음. 

4.1 Flux, 0-N개 Items의 비동기 시퀀스

Reactor에서 제공하는 여러 메서드의 javadoc에는 대부분 아래와 같은 그림으로 어떤 역할을 하는지 설명하고 있다.

Flux에 대한 그림은 다음과 같다.

Flux

  • 첫 줄은 Flux의 timeline을 의미하고 왼쪽에서 오른쪽으로 시간이 흐른다.
  • 첫 줄의 동그라미 6개는 Flux에서 방출된 아이템이다.
  • 첫 줄의 마지막 수직 라인은 Flux가 성공적으로 종료되었음을 나타낸다.
  • 점선과 중간의 box는 Flux에 변환이 적용되고 있음을 나타내며 박스 안의 본문은 해당 변환이 어떤 것인지 나타낸다.
  • 마지막 줄은 변환된 결과 Flux이다.
  • 만약 변환 과정 중 어떤 이유로 인해 Flux가 중단된 경우 수직 라인 대신 X로 표시를 한다.

Flux<T>는 0에서 N개의 방출된 항목의 비동기 시퀀스를 나타낸다.

선택적으로 완료 신호 또는 오류로 종료가 되며 onNext, onComplete 및 onError 메서드를 제공하여 각각 상황에 대해 구현할 수 있게 한다.

쉽게 생각하면 item을 가진 collection과 유사하다.

4.2 Mono, 0-1개의 비동기 결과

Mono

Flux가 0~N개의 시퀀스를 가지는 Publisher라면 Mono는 0~1개의 시퀀스를 가진다.

이미 Flux에 대한 그림에서 대강 설명을 했기 때문에 Mono의 그림은 별다른 설명을 하지 않아도 알 수 있다.

Mono의 경우 다수의 item이 없기 때문에 onComplete와 onError 메서드를 제공한다. (onNext는 없음)

Flux에서 사용할 수 있는 연산자 중 일부만 제공하고 일부 연산자 (특히 Mono를 다른 Publisher와 결합한 연산자)는 Flux로 전환한다.

예를 들어 Mono#concatWith(Publisher)는 Flux를 반환하고 Mono#then(Mono)는 다른 Mono를 반환한다.

Mono를 사용하면 완료 개념만 있는 값이 없는 비동기 프로세스를 나타낼 수 있다. 하나를 만들려면 empty Mono<Void>를 사용할 수 있다.

쉽게 생각하면 Optional과 유사하고 null을 사용하지 않아야 하기 때문에 Void를 제공하여 empty 객체를 쓰게 한다.

4.3 Flux와 Mono를 만들고 Subscribe 하는 간단한 방법

String 시퀀스로 이루어진 Flux는 다음과 같은 여러 방식으로 만들 수 있다.

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");

List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);

factory method를 사용한 예제는 다음과 같다.

// Reactor가 제공하는 empty Mono 객체
Mono<String> noData = Mono.empty(); 

Mono<String> data = Mono.just("foo");

// 5에서 3개를 순차적으로 생성
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); 

생성한 Publisher (Flux/Mono)는 subscribe method를 통해 구독이 된다.

// 3개의 값을 가진 Flux
Flux<Integer> ints = Flux.range(1, 3); 
// 해당 Flux를 Subscribe 함
ints.subscribe(); 

실제 실행하면 console에는 아무 응답이 표시되지 않는다.

만약 output을 확인하고 싶은 경우 subscribe에 consumer를 추가하면 된다.

Flux<Integer> ints = Flux.range(1, 3); 
ints.subscribe(i -> System.out.println(i)); 
1
2
3

위 예제처럼 Reactor는 여러 매개 변수의 subscribe method를 제공하며 Flux의 subscribe method는 대략 다음과 같다.

// Subscribe and trigger the sequence
subscribe(); 

// 제공된 comsumer를 수행하는 Subscribe
subscribe(Consumer<? super T> consumer); 

// + errorConsumer
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); 

// + completeConsumer
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); 

// + subscriptionConsumer
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer); 

에러가 발생한 경우의 예제는 다음과 같다

Flux<Integer> ints = Flux.range(1, 4) 
      .map(i -> { 
        if (i <= 3) return i; 
        throw new RuntimeException("Got to 4"); 
      });
ints.subscribe(i -> System.out.println(i), 
      error -> System.err.println("Error: " + error));
1
2
3
Error: java.lang.RuntimeException: Got to 4

subscribe에 error handler와 종료 이벤트에 대한 handler를 다음처럼 넘겨줄 수 있다.

Flux<Integer> ints = Flux.range(1, 4); 
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> System.out.println("Done")); 

이 경우 앞의 예제와 달리 에러가 발생하는 경우가 없으므로 종료 이벤트 handler가 처리되는 것을 볼 수 있다.

1
2
3
4
Done

마지막으로 subscribe를 포함한 경우는 다음과 같다.

// 구독을 하면 수신되며 소스에서 최대 10개의 요소를 원함 
// (실제 4개의 요소만 가지고 있기 때문에 4개만 방출하고 완료됨)
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> System.out.println("Done"),
    sub -> sub.request(10)); 

위처럼 추가 구독을 하는 경우에 대한 subscription을 설정할 수 있고 이와 같이 추가 구독을 하는 경우를 위해 Reactor는 BaseSubscriber를 제공하여 구현할 수 있게 한다.

SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> {System.out.println("Done");},
    s -> s.request(10));
ints.subscribe(ss);
package io.projectreactor.samples;

import org.reactivestreams.Subscription;

import reactor.core.publisher.BaseSubscriber;

public class SampleSubscriber<T> extends BaseSubscriber<T> {

	public void hookOnSubscribe(Subscription subscription) {
		System.out.println("Subscribed");
		request(1);
	}

	public void hookOnNext(T value) {
		System.out.println(value);
		request(1);
	}
}

위와 같이 subscribe 한 이후 sampleSubscriber로 구현된 추가 구독이 발생하면 다음과 같이 추가 구독에 대한 이벤트가 발생한다.

1
2
3
4
Done
Subscribed
1
2
3
4

4.3.4. On Backpressure and Ways to Reshape Requests

Reactor에서 backpressure 구현은 request를 upstream operator로 요청을 보내 consumer pressure를 source로 전파한다.

현재 request의 합은 때때로 현재 'demand' 또는 'pending request'로 참조된다. 수요가 Long.MAX_VALUE로 설정된 경우 무제한으로 요청을 처리한다.

가장 간단한 방법은 다음 예제와 같이 hookOnSubscribe method를 override 하여 BaseSubscriber를 등록하는 방법이다.

Flux.range(1, 10)
    .doOnRequest(r -> System.out.println("request of " + r))
    .subscribe(new BaseSubscriber<Integer>() {

      @Override
      public void hookOnSubscribe(Subscription subscription) {
        request(1);
      }

      @Override
      public void hookOnNext(Integer integer) {
        System.out.println("Cancelling after having received " + integer);
        cancel();
      }
});
request of 1
Cancelling after having received 1

backpressure 부분은 나중에 좀 더 봐야 할 듯하다.

4.4. Programmatically creating a sequence

4.4.1. Synchronous generate

Flux를 프로그래밍 방식으로 작성하는 가장 간단한 형태는 generator function을 사용한 generate method를 사용하는 것이다.

이는 동기로 1:1 방출을 하기 위해 사용되며 sink는 synchronousSink이며 next() method는 callback  호출당 최대 한 번만 호출할 수 있다.

이후 error(Throwable)이나 complete()를 추가로 호출할 수 있지만 이는 선택사항이다.

아래는 상태 기반으로 generate method를 사용한 예제이다.

Flux<String> flux = Flux.generate(() -> 0, (state, sink) -> {
	sink.next("3 x " + state + " = " + 3 * state);
	if (state == 10)
		sink.complete();
	return state + 1;
});

다음과 같은 sequence를 가지게 된다.

3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30

또한 변경 가능한 state를 사용할 수도 있다.

아래 예에서는 단일 AtomicLong을 state로 사용해 각 라운드에서 변경시킨다.

Flux<String> flux = Flux.generate(AtomicLong::new, (state, sink) -> {
	long i = state.getAndIncrement();
	sink.next("3 x " + i + " = " + 3 * i);
	if (i == 10)
		sink.complete();
	return state;
});

아래 예는 cunsumer를 포함한 generate method의 경우이다.

Flux<String> flux = Flux.generate(AtomicLong::new, (state, sink) -> {
	long i = state.getAndIncrement();
	sink.next("3 x " + i + " = " + 3 * i);
	if (i == 10)
		sink.complete();
	return state;
}, (state) -> System.out.println("state: " + state));

프로세스가 끝난 후 database 연결 종료 처리나 기타 자원이 포함된 상태인 경우 자원 종료 처리 등의 작업을 이와 같이 할 수 있다.

4.4.2. Asynchronous and Multi-threaded: create

create는 Flux의 보다 향상된 프로그래밍 방식의 생성 방식으로 여러 thread의 round 별 다중 방출 처리에 적합하다.

next, error 및 complete method와 함께 FluxSink를 노출한다.

generate와 달리 상태 기반 변형이 없다.

반면 콜백에서 다중 thread event를 trigger 할 수 있다.

create는 기존 API를 리스너 기반 비동기 API와 같이 reactive 세계에 브리지하는데 매우 유용하다.

리스너 기반 API를 사용한다고 가정하면 chunk로 데이터를 처리하고 다음과 같은 두가지 이벤트가 있다.

  1. data chunk가 준비되고
  2. MyEventListener Interface에 표현된 대로 처리가 완료
interface MyEventListener<T> {
    void onDataChunk(List<T> chunk);
    void processComplete();
}

create를 사용하여 이를 Flux<T>에 연결할 수 있다.

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register( 
      new MyEventListener<String>() { 

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); 
          }
        }

        public void processComplete() {
            sink.complete(); 
        }
    });
});

또한 create는 비동기 API를 연결하고 OverflowStrategy를 사용하여 backpressure를 관리할 수 있다.

4.4.3. Asynchronous but single-threaded: push

push는 single producer의 이벤트 처리에 적합한 generate와 create의 중간 지점이다.

create가 지원하는 overflowstrategy를 사용하여 비동기 일 수도 있고 backpressure를 관리할 수 있다는 점에서 create와 유사하다.

그러나 한번에 하나의 생성 thread만 next, complete 또는 error를 호출할 수 있다.

Flux<String> bridge = Flux.push(sink -> {
    myEventProcessor.register(
      new SingleThreadEventListener<String>() { 

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); 
          }
        }

        public void processComplete() {
            sink.complete(); 
        }

        public void processError(Throwable e) {
            sink.error(e); 
        }
    });
});

(예제의 myEventProcessor, SingleThreadEventListener가 정의가 되어 있지 않아 동작을 확인하지 못함)

A hybrid push/pull model

create와 같은 대부분의 Reactor operator는 hybrid push/pull 모델을 사용한다.

대부분의 처리가 비동기적이지만 (push 접근 방식을 제안하지만) 요청에 대한 작은 pull 구성 요소가 있다.

consumer가 처음 요청할 때까지 아무것도 방출하지 않는다는 의미에서 source에서 data를 가져(pull)온다.

소스는 데이터가 사용가능할 때마다 request amount의 범위 내에서 consumer에게 데이터를 push 한다.

push()와 create()는 request amount을 관리하고 보류 중인 요청이 있을 때만 sink를 통해 data가 push되도록 onRequest consumer를 설정할 수 있다.

Flux<String> bridge = Flux.create(sink -> {
    myMessageProcessor.register(
      new MyMessageListener<String>() {

        public void onMessage(List<String> messages) {
          for(String s : messages) {
            sink.next(s); 
          }
        }
    });
    sink.onRequest(n -> {
        List<String> messages = myMessageProcessor.getHistory(n); 
        for(String s : message) {
           sink.next(s); 
        }
    });
});

(예제의 myEventProcessormyMessageProcessor, SingleThreadEventListenerMyMessageListener가 정의가 되어 있지 않아 동작을 확인하지 못함)

Cleaning up after push() or create()

onDispose와 onCancel 두 콜백은 취소 또는 종료 시 수행된다.

Flux가 complete, error 또는 cancel의 경우 onDispose를 사용하여 수행할 수 있다.

onCancel은 onDispose가 수행되기 전에 cancel과 관련된 모든 작업을 수행하는데 사용된다.

Flux<String> bridge = Flux.create(sink -> {
    sink.onRequest(n -> channel.poll(n))
        .onCancel(() -> channel.cancel()) 
        .onDispose(() -> channel.close())  
    });

(예제의 channel이 정의되어 있지 않아 동작을 확인하지 못함)

4.4.4. Handle

handle method는 약간 다르다.

instance method이고 일반적인 연산자와 마찬가지로 기존 소스에 연결되어 있다.

Mono와 Flux에 모두 존재한다.

SynchronousSink를 사용하고 1:1 방출만 허용한다는 점에서 generate에 가깝다.

그러나 handle을 사용하여 각 소스 요소에서 임의의 값을 생성하여 일부 요소를 건너 뛸 수 있다.

이런 식으로 map과 filter의 조합으로 사용할 수 있다.

Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);

reactive stream은 시퀀스에 null 값을 허용하지 않는다.

map을 수행 시 기존 함수를 맵 함수로 사용하려고 하면 해당 메소드가 때때로 null을 반환하게 되는 경우 다음처럼 안전하게 적용할 수 있다

public String alphabet(int letterNumber) {
	if (letterNumber < 1 || letterNumber > 26) {
		return null;
	}
	int letterIndexAscii = 'A' + letterNumber - 1;
	return "" + (char) letterIndexAscii;
}

handle을 사용하여 null을 제거할 수 있다.

Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
    .handle((i, sink) -> {
        String letter = alphabet(i); 
        if (letter != null) 
            sink.next(letter); 
    });

alphabet.subscribe(System.out::println);

결과는 아래와 같다.

M
I
T

4.5. Threading and Schedulers

Flux 또는 Mono를 획득한다고 반드시 전용 Thread에서 실행되는 것은 아니다.

대신 대부분의 operator는 이전 operator가 실행된 Thread에서 계속 작업한다.

지정하지 않으면 최상위 operator (the source) 자체는 subscribe() 호출이 수행된 Thread에서 실행된다.

다음 예제는 새 Thread에서 Mono를 실행한다.

public static void main(String[] args) throws InterruptedException {
  final Mono<String> mono = Mono.just("hello "); 

  Thread t = new Thread(() -> mono
      .map(msg -> msg + "thread ")
      .subscribe(v -> 
          System.out.println(v + Thread.currentThread().getName()) 
      )
  )
  t.start();
  t.join();

}

결과는 아래와 같다.

hello thread Thread-0

Reactor에서 실행 모델과 실행 위치는 사용된 Schueduler에 의해 결정된다.

Scheduler는 ExecutorService와 유사한 스케줄링 책임을 가지고 있지만 전용 추상화를 통해 더 많은 작업을 수행할 수 있고 특히 시계와 관련한 더 광범위한 구현(테스트를 위한 가상 시간, 트램폴린 또는 즉각적인 스케줄링 등)을 가능하게 한다.

Schedulers class는 다음 execution context에 엑세스할 수 있는 static method를 가지고 있다.

  • 현재 Thread (Schedulers.immediate())
  • 재사용 가능한 단일 Thread (Schedulers.single())
    스케줄러가 종료될 때까지 모든 호출에 대해 동일한 Thread를 재사용함
    호출 별 전용 Thread를 원하는 경우 각 호출에 대해 Schedules.newSingle()을 사용해야 한다.
  • 무제한 탄성 Thread pool (Schedulers.elastic())
    이 경우 backpressure 문제를 숨기고 너무 많은 Thread를 유발하는 경향이 있어 Schedules.boundedElastic() 도입 이 후 더이상 선호되지 않는다.
  • 제한 탄성 Thread pool (Schedulers.boundedElastic())
    이전의 elastic()과 마찬가지로 필요에 따라 새 worker pool을 만들고 유휴 상태인 풀을 재사용한다.
    너무 오래 유휴 상태인 worker pool (기본 값은 60초)은 폐기된다.
    elastic() 과 달리 만들 수 있는 지원 Thread 수에 제한이 있다. (기본 값은 CPU 코어 수 X 10)
    한도에 도달한 후 제출된 최대 10만 개의 작업이 대기열에 추가되고 Thread를 사용할 수 있게 되면 다시 스케쥴이 조정된다. (지연으로 예약하면 Thread가 사용가능해지면 지연이 시작됨)
    이것은 I/O blocking work에 더 나은 선택이다.
    Schedulers.boundedElastic()은 다른 리소스를 묶지 않도록 차단 프로세스에 자체 Thread를 제공하는 편리한 방법이다.
  • 병렬 작업에 맞게 조정된 고정 worker pool (Schedulers.parallel())

또한 Schedulers.fromExecutorService(ExecutorService)를 사용하여 기존의 모든 ExecutorService에서 Scheduler를 작성할 수 있다. (권장하지는 않지만 Executor로 부터 하나를 만들 수도 있다.)

newXXX method를 사용하여 다양한 Scheduler 유형의 새 instance를 작성할 수도 있다.

예를 들어 Schedulers.newParallel(yourSchedueName)은 yourScheduleName이라는 새 병렬 스케줄러를 만든다.

boundedElastic 은 피할 수 없는 레거시 blocking code를 돕기 위해 만들어졌지만 single 및 parallel은 불가능하다.

따라서 Reactor blocking API (block(), blockFirst(), blockLast() 및 toIterable() 또는 toStream())을 반복하여 사용할 경우 FormalStateException이 발생하게 된다.

custom scheduler는 non blocking marker interface를 구현하는 Thread instance를 생성하여 "Non Blocking only"로 표시할 수도 있다.

일부 operator는 기본적으로 스케줄러의 특정 스케줄러를 사용하여 일반적으로 다른 스케줄러를 제공하는 옵션을 제공한다.

예를 들어 Flux.interval(Duration.ofMillis(300)) factory method를 호출하면 300ms마다 틱을 하는 Flulx<Long>이 생성된다.

기본적으로 이것은 Schedulers.parallel()에 의해 활성화된다.

다음 행은 스케줄러를 Schedulers.single()과 유사한 새 instance로 변경한다.

Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))

Reactor는 reactive chain에서 execution context (또는 scheduler)를 전환하는 두 가지 방법 인 publishOn 및 subscribeOn을 제공한다.

둘 다 Scheduler를 사용하여 execution context를 해당 scheduler로 전환할 수 있다.

그러나 chain에서 publishOn의 배치는 중요하지만 subscribeOn의 배치는 중요하지 않다. (구독 할 때까지 아무런 변화가 없기 때문)

Reactor에서 operator를 연결할 때 필요한 만큼 Flux 및 Mono 구현을 서로 wrapping 할 수 있다.

subscribe하면 subscriber 객체 체인이 첫 번째 publisher의 뒤로 만들어진다.

4.5.1. The publishOn Method

publishOn은 다른 operator 와 동일하게 subscriber 체인 중간에 적용할 수 있다.

연결된 Scheduler에서 worker에 callback 이 실행되는 동안 upstream으로 부터 신호를 받아 downstream으로 전달한다.

결과적으로 다음 operator가 후속 operator가 실행되는 위치에 영향을 준다.

  • execution context를 Scheduler가 선택한 하나의 Thread로 변경한다.
  • 사양에 따라 onNext 호출은 순차적으로 발생하므로 단일 Thread를 사용한다.
  • 특정 Scheduler에서 작업하지 않는 한 publishOn 후 operator는 동일한 Thread에서 실행된다.
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); 

final Flux<String> flux = Flux
    .range(1, 2)
    .map(i -> 10 + i)  
    .publishOn(s)  
    .map(i -> "value " + i);  

new Thread(() -> flux.subscribe(System.out::println));  

4.5.2. The subscribeOn Method

subscribeOn은 backward chain이 구성 될 때 subscription process에 적용된다.

결과적으로 chain의 subscribeOn 위치에 관계없이 source 방출의 context에 항상 영향을 미친다.

publishOn에 대한 후속 호출의 동작에는 영향을 미치지 않는다.

  • 전체 operator chain이 구독하는 thread를 변경한다.
  • Scheduler에서 Thread 하나를 선택한다.
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); 

final Flux<String> flux = Flux
    .range(1, 2)
    .map(i -> 10 + i)  
    .subscribeOn(s)  
    .map(i -> "value " + i);  

new Thread(() -> flux.subscribe(System.out::println));  

4.6. Handling Errors

Reactive Stream에서 error는 terminal event이다.

오류가 발생하면 즉시 시퀀스를 중지하고 operator chain을 마지막 단계, 정의한 Subscriber 및 onError method로 전파한다.

이러한 오류는 여전히 application level에서 처리해야한다.

예를 들어 UI에 오류 알림을 표시하거나 rest endpoint에 의미있는 오류 payload를 보낼 수 있다.

따라서 subscriber의 onError method는 항상 정의해야 한다.

정의되지 않은 경우 onError는 UnsupportedOperationException을 발생시킨다.

Exceptions.isErrorCallbackNotImplemented 메소드를 사용하여 이를 추가로 감지하고 심사할 수 있다.

Reactor는 또한 error-handling operator로 chain 중간의 오류를 처리하는 대체 수단을 제공한다.

Flux.just(1, 2, 0)
    .map(i -> "100 / " + i + " = " + (100 / i)) //this triggers an error with 0
    .onErrorReturn("Divided by zero :("); // error handling example

4.6.1. Error Handling Operators

try-catch block에서 예외를 처리하는 여러 가지 방법에 익숙할 수 있다. 

  • catch 한 후 static default vale를 반환
  • catch 한 후 fallback method로 alternative path를 실행
  • catch 한 후 fallback value를 동적으로 계산
  • catch 한 후 BusinessException을 다시 감싸서 re-throw
  • catch 한 후 오류 별 메세지를 처리 후 re-throw
  • finally block 을 사용하여 자원을 정리하거나 Java 7 "try-with-resource" 구성을 사용

이 모든 것은 Reactor에서 error-handling operator의 형태로 동등하다.

이러한 operator를 살펴보기 전에 먼저 reactive chain과 try-catch block 간에 병렬을 설정하려고 한다.

subscribe 할 때 chain 끝의 onError callback은 catch 블록과 유사하다.

다음 예제와 같이 예외가 발생하면 실행이 catch로 건너뛴다.

Flux<String> s = Flux.range(1, 10)
    .map(v -> doSomethingDangerous(v)) 
    .map(v -> doSecondTransform(v)); 
s.subscribe(value -> System.out.println("RECEIVED " + value), 
            error -> System.err.println("CAUGHT " + error) 
);

앞의 예제는 개념적으로 다음 try-catch 블록과 유사하다.

try {
    for (int i = 1; i < 11; i++) {
        String v1 = doSomethingDangerous(i); 
        String v2 = doSecondTransform(v1); 
        System.out.println("RECEIVED " + v2);
    }
} catch (Throwable t) {
    System.err.println("CAUGHT " + t); 
}

Static Fallback Value

"catch 한 후 static default value를 반환"하는 것과 동등한 것은 onErrorReturn이다.

다음 예제와 같다.

try {
  return doSomethingDangerous(10);
}
catch (Throwable error) {
  return "RECOVERED";
}

앞의 예제는 Reactor에서 다음과 같다.

Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn("RECOVERED");

또한 다음 예제처럼 복구 여부를 결정하기 위해 예외에 대해 Predicate를 적용할 수도 있다.

Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10"); 

Fallback Method

단일 default value 이상을 원하고 데이터를 처리하는 다른 (유의적인) 방법이 있는 경우 onErrorResume을 사용할 수 있다.

이는 "대체 방법으로 대체 경로를 잡아서 실행하는" 것과 같다.

예를 들어, 명목상 프로세스가 신뢰할 수 없는 외부 서비스에서 데이터를 가져오는 중이지만 좀 더 오래되었지만 신뢰할 수 있는 동일한 데이터의 로컬 캐시를 유지하는 경우 다음과 예제와 같다.

String v1;
try {
  v1 = callExternalService("key1");
}
catch (Throwable error) {
  v1 = getFromCache("key1");
}

String v2;
try {
  v2 = callExternalService("key2");
}
catch (Throwable error) {
  v2 = getFromCache("key2");
}

앞의 예제는 Reactor에서 다음과 같다.

Flux.just("key1", "key2")
    .flatMap(k -> callExternalService(k) 
        .onErrorResume(e -> getFromCache(k)) 
    );

onErrorReturn과 마찬가지로 onErrorResome에는 예외 클래스 또는 Predicate를 기반으로 fallback 할 exception을 필터링 할 수 있다.

또한 발생한 오류에 따라 전환할 다른 fallback 시퀀스를 선택할 수도 있다.

Flux.just("timeout1", "unknown", "key2")
    .flatMap(k -> callExternalService(k)
        .onErrorResume(error -> { 
            if (error instanceof TimeoutException) 
                return getFromCache(k);
            else if (error instanceof UnknownKeyException)  
                return registerNewEntry(k, "DEFAULT");
            else
                return Flux.error(error); 
        })
    );

Dynamic Fallback Value

데이터를 처리하는 다른 방법이 없는 경우에도 전달받은 예외에서 대체 값을 계산할 수 있다.

이는 "대체 값을 catch하고 동적으로 계산"하는 것과 같다.

예를 들어 return type (MyWrapper)에 예외를 가지기 위한 전용 변형(Future.complete(T success) 대 Future.completeExceptionally(Throwable error))이 있는 경우  오류 보유 변형을 인스턴스화하고 예외를 전달할 수 있다.

try {
  Value v = erroringMethod();
  return MyWrapper.fromValue(v);
}
catch (Throwable error) {
  return MyWrapper.fromError(error);
}

다음처럼 onErrorResome을 사용하여 fallback method solution과 동일한 방식으로 이 작업을 reactive하게 처리할 수 있다.

erroringFlux.onErrorResume(error -> Mono.just( 
        MyWrapper.fromError(error) 
));

Catch and Rethrow

try {
  return callExternalService(k);
}
catch (Throwable error) {
  throw new BusinessException("oops, SLA exceeded", error);
}

아래처럼 사용한다.

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorResume(original -> Flux.error(
            new BusinessException("oops, SLA exceeded", original))
    );

onErrorMap을 사용할 수도 있다.

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));

Log or React on the Side

오류가 계속 전파되길 원하지만 시퀀스를 수정하지 않고(ex: 로그처리) 오류에 계속 반응하려면 doOnError operator를 사용할 수 있다.

이 것은 "catch 한 후 오류 별 메세지를 처리 후 re-throw" 패턴과 동일하다.

try {
  return callExternalService(k);
}
catch (RuntimeException error) {
  //make a record of the error
  log("uh oh, falling back, service failed for key " + k);
  throw error;
}

doOnError operator와 doOn 접두사가 붙은 모든 연산자는 "side-effect"라고 부를ㄴ다.

시퀀스 이벤트를 수정하지 않고 동작(peek)할 수 있다.

LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
    .flatMap(k -> callExternalService(k) 
        .doOnError(e -> {
            failureStat.increment();
            log("uh oh, falling back, service failed for key " + k); 
        })
        
    );

Using Resources and the Finally Block

명령형 프로그래밍을 사용하는 마지막 방법은 "finally block 을 사용하여 자원을 정리하거나 Java 7 "try-with-resource" 구성을 사용"하는 방법이다.

Stats stats = new Stats();
stats.startTimer();
try {
  doSomethingDangerous();
}
finally {
  stats.stopTimerAndRecordTiming();
}
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
  return disposableInstance.toString();
}

doFinally와 using을 사용한다.

doFinally는 시퀀스가 종료되거나 (onComplete 또는 onError) 취소 될 때마다 실행된다.

Stats stats = new Stats();
LongAdder statsCancel = new LongAdder();

Flux<String> flux =
Flux.just("foo", "bar")
    .doOnSubscribe(s -> stats.startTimer())
    .doFinally(type -> { 
        stats.stopTimerAndRecordTiming();
        if (type == SignalType.CANCEL) 
          statsCancel.increment();
    })
    .take(1); 

다음 예에서는 "try-with-resource"의 AutoCloseable interface를 Disposable로 바꾼다.

AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
    @Override
    public void dispose() {
        isDisposed.set(true); 
    }

    @Override
    public String toString() {
        return "DISPOSABLE";
    }
};

"try-with-resource"에 상응하는 reactive 구문은 다음과 같다.

Flux<String> flux =
Flux.using(
        () -> disposableInstance, 
        disposable -> Flux.just(disposable.toString()), 
        Disposable::dispose 
);

Demonstrating the Terminal Aspect of onError

오류가 발생할 때 이러한 모든 operator가 upstream original 시퀀스를 종료시킨다는 것을 보여주기 위해 Flux.interval과 함게 보다 시각적인 예를 사용할 수 있다.

interval operator는 증가하는 long 값으로 x 단위마다 틱을 표시한다.

Flux<String> flux =
Flux.interval(Duration.ofMillis(250))
    .map(input -> {
        if (input < 3) return "tick " + input;
        throw new RuntimeException("boom");
    })
    .onErrorReturn("Uh oh");

flux.subscribe(System.out::println);
Thread.sleep(2100); 

결과는 다음과 같다.

tick 0
tick 1
tick 2
Uh oh

Retrying

retry는 오류 생성 순서를 재 시도 할 수 있게 한다.

명심해야 할 것은 upstream flux에 다시 re-subscribe 하여 동작한다는 것이다.

이것은 실제로 다른 순서이고 원래 순서는 여전히 종료된다.

이를 확인하기 위해 이전 예제를 다시 사용하고 onErrorReturn을 사용하는 대신 retry(1)을 추가하여 한번 재시도 할 수 있다.

Flux.interval(Duration.ofMillis(250))
    .map(input -> {
        if (input < 3) return "tick " + input;
        throw new RuntimeException("boom");
    })
    .retry(1)
    .elapsed() 
    .subscribe(System.out::println, System.err::println); 

Thread.sleep(2100); 

결과는 다음과 같다.

259,tick 0
249,tick 1
251,tick 2
506,tick 0 
248,tick 1
253,tick 2
java.lang.RuntimeException: boom

0 부터 다시 새 interval이 시작된다.

앞의 예제에서 볼 수 있듯이 retry(1)은 원래 간격으로 한 번만 다시 subscribe하여 0에서 틱을 다시 시작한다.

두 번째로 예외가 계속 발생하므로 오류가 downstream으로 전달되고 전파된다.

"companion" Flux를 사용하여 특정 장애가 재 시도되어야 하는지 여부를 알려주는 advanced version(retryWhen)이 있다.

이 companion Flux는 operator가 작성하지만 재시도 조건을 사용자 정의하기 위해 사용자가 설정해야 한다.

companion Flux는 retryWhen의 Function에 전달하는 유일한 매개 변수인 Flux<Throwable>이 있다.

사용자는 해당 Function을 정의하고 새 Publisher<?>를 반환하게 한다.

재시도 주기는 다음과 같다.

  1. 오류가 발생할 때마다 (재시도 가능) 오류는 함수에 의해 선언된 companion Flux로 방출된다.
    지금까지의 모든 시도를 확인할 수 있다.
  2. companion Flux가 값을 방출하면 재시도가 발생한다.
  3. companion Flux가 완료되면 오류를 삼키고 재시도 주기가 중지되고 그 결과 시퀀스도 완료된다.
  4. companion flux에서 오류가 발생하면 재시도 주기가 중지되고 e로 시퀀스 오류가 발생한다.
Flux<String> flux = Flux
    .<String>error(new IllegalArgumentException()) 
    .doOnError(System.out::println) 
    .retryWhen(companion -> companion.take(3)); 

위 예제는 무조건 오류가 발생하는 flux에 대해 3번더 재시도 한 후 종료를 한다.

retry(3)의 경우 오류로 종결되지만 retryWhen의 경우 empty Flux를 생성하고 success로 완료된다.

만약 retry와 동일하게 처리를 하려면 다음과 같이 추가 처리가 필요하다.

Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
    .retryWhen(companion -> companion
    .zipWith(Flux.range(1, 4), 
          (error, index) -> { 
            if (index < 4) return index; 
            else throw Exceptions.propagate(error); 
          })
    );

4.6.2. Handling Exceptions in Operators or Functions

일반적으로 모든 operator는 예외를 trigger할 가능성이 있는 코드를 포함하거나 유사하게 실패 할 수 있는 사용자 정의 callback을 호출할 수 있으므로 모든 형식의 오류 처리가 포함된다.

일반적으로 검사되지 않은 예외는 항상 onError를 통해 전파된다.

예를 들어, map function 내에 RuntimeException을 발생 시키면 다음 코드와 같이 onError 이벤트로 변환된다.

Flux.just("foo")
    .map(s -> { throw new IllegalArgumentException(s); })
    .subscribe(v -> System.out.println("GOT VALUE"),
               e -> System.out.println("ERROR: " + e));

결과는 다음과 같다.

ERROR: java.lang.IllegalArgumentException: foo

Reactor는 항상 치명적인 것으로 간주되는 예외 세트(ex: OutofMemoryError)를 정의한다.

Exceptions.throwIfFatal method를 참고

이러한 오류는 Reactor가 계속 작동할 수 없으며 전파되지 않고 throw 처리를 한다.

checked Exception은 어떻게 처리를 해야할까?

예를 들어 예뢰를 발생시키는 것으로 선언된 일부 method를 호출해야 하는 경우 try-catch block에서 해당 예외를 처리해야 한다.

  1. exception을 catch하고 복구하는 경우.
    시퀀스는 정상 진행한다.
  2. exception을 catch하고 unchecked exception으로 wrapping하는 경우
    시퀀스는 중단된다.
    Exceptions utility class가 도움이 될 수 있다. (다음 단계로 진행)
  3. Flux를 반환해야 하는 경우 (ex: flatMap) 다음과 같이 오류를 발생시키는 Flux에서 예외를 wrapping 한다.
    return Flux.error(checkedException)
    (시퀀스도 종료됨)

Reactor에는 예외가 검사된 경우에만 wrapping되도록 하는데 사용할 수 있는 Exceptions utility class가 있다.

  • 필요한 경우 Exceptions.propagate method를 사용하여 예외를 wrapping한다.
    또한 throwIfFatal을 먼저 호출하고 RuntimeException을 wrapping하지 않는다.
  • wrapping되지 않은 원래 예외를 가져오려면 Exceptions.unwrap method를 사용한다.

IOException을 발생시킬 수 있는 변환 method를 사용하는 map의 경우를 보자

public String convert(int i) throws IOException {
    if (i > 3) {
        throw new IOException("boom " + i);
    }
    return "OK " + i;
}

이제 map에서 해당 method를 사용한다고 가정하자.

예외를 명시적으로 잡아야 하고 map 함수가 다시 re-throw 할수 없다.

따라서 다음과 같이 RuntimeException으로 map의 onError method에 전파할 수 있다.

Flux<String> converted = Flux
    .range(1, 10)
    .map(i -> {
        try { return convert(i); }
        catch (IOException e) { throw Exceptions.propagate(e); }
    });

나중에 이전 Flux를 subscribe하고 error (ex: UI)에 반응할 때 IOException에 대해 특별한 작업을 수행하려는 경우 원래 예외로 되돌릴 수 있다.

converted.subscribe(
    v -> System.out.println("RECEIVED: " + v),
    e -> {
        if (Exceptions.unwrap(e) instanceof IOException) {
            System.out.println("Something bad happened with I/O");
        } else {
            System.out.println("Something bad happened");
        }
    }
);

4.7. Processors

Processor는 Publisher이기도 한 특별한 종류의 Subscriber이다.

즉 Processor를 subscribe 할 수 있지만 (일반적으로 Flulx를 구현함) 데이터를 시퀀스에 수동으로 주입하거나 종료하는 method를 호출할 수도 있다.

4.7.1. Do I Need a Processor?

대부분의 경우 Processor 사용을 피해야 한다.

올바르게 사용하기 어렵고 일부 corner case가 발생하기 쉽다

Processor가 사용 사례에 적합하다고 생각되면 다음 두가지 대안을 시도해봐야한다.

  1. operator 또는 operator 조합이 적합한지 (필요한 operator가 있는지)
  2. "generator" operator 가 대신 동작하는지
    (일반적으로 이러한 operator는 반응이 없는 API를 브리지하여 processor와 개념적으로 유사한 sink르 ㄹ제공하므로 시퀀스를 데이터로 수동으로 채우거나 종료할 수 있음)

위의 대안을 고려한 후에도 여전히 Processor가 필요하다고 생각되면 사용 가능한 Processor 에 대해 알아보면 된다.

4.7.2. Safely Produce from Multiple Threads by Using the Sink Facade

Reator Processor를 직접 사용하는 대신 sink()를 호출하여 Processor에 대한 sink를 얻는 것이 좋다.

FluxProcessor sink는 multi thread producer를 안전하게 게이트하고 여러 thread에서 동시에 데이터를 생성하는 application에서 사용할 수 있다.

예를 들어 다음처럼 UnicateProcessor에 대한 thread safe serialized sink를 만들 수 있다.

UnicastProcessor<Integer> processor = UnicastProcessor.create();
FluxSink<Integer> sink = processor.sink(overflowStrategy);

여러 producer thread가 다음을 수행하여 serialized sink에서 동시에 데이터를 생성할 수 있다.

sink.next(n);

FluxSink는 Processor의 multi thread manual 공급에 적합하지만 subscriber와 sink 접근 방식을 같이 사용할 수 없다.

FluxProcessor를 source Publisher에 subscribe하거나 FluxSink를 통해 수동으로 공급해야한다.

overflow는 processor와 구성에 따라 두 가지 가능한 방식으로 동작한다.

  • unbounded processor는 삭제 또는 버퍼링을 통해 overflow 자체를 처리한다.
  • bound processor는 IGNORE strategy를 차단하거나 "spin"하거나 sink에 지정된 overflowStragegy 동작을 적용한다.
반응형
profile

파란하늘의 지식창고

@Bluesky_

내용이 유익했다면 광고 배너를 클릭 해주세요