파란하늘의 지식창고
반응형

reactor reference 문서의 Appendix A: Which operator do I need? 의 내용 요약

워낙 많은 operator를 제공하기 때문에 적절하게 사용하기 위해 상황별 목록을 요약해서 제공하고 있다.

시퀀스 생성 시 (Creating a New Sequence)

  • just
    • Optional<T> : Mono#JustOrEmpty(Optional<T>)
    • null도 존재할 경우 T: Mono#justOrEmpty(T)
  • 만약 늦은 처리가 필요한 경우 Mono#fromSupplier 또는 just 내 defer를 사용
  • iterate 관련
    • array : Flux#fromArray
    • collection 또는 iterable : Flux#fromIterable
    • 정수 범위 : Flux#range
    • 각 Subscription에 제공된 Stream : Flux#fromStream(Supplier<Stream>)
  • 다양한 단일 값 source
    • Supplier<T>: Mono#fromSupplier
    • task : Mono#fromCallable, Mono#fromRunnable
    • CompletableFuture<T>: Mono#fromFuture
  • 빈 값 생성 : empty
  • 에러 생성 : error
    • 늦은 처리가 필요한 경우 Throwable: error(Supplier<Throwable>)
  • 아무 것도 안하는 경우 : never
  • subscription 시 결정되는 경우 : defer
  • 상태에 따라 프로그래밍 방식으로 이벤트를 생성하는 경우
    • 동기적으로 하나씩 : Flux#generate
    • 비동기 (또는 동기)적으로 한번에 여러개 생성 : Flux#create (Mono#create)

기존 시퀀스를 변경하는 경우 (Transforming an Existing Sequence)

  • 기존 데이터를 변형하는 경우
    • 1 to 1 : map
      • 형 변형 : cast
      • 각 소스 값의 색인을 구체화 : Flux#index (Flux<Tuple2<Long, T>>로 변환되어 반환됨)
    • 1 to N : flatMap
    • 1 to N (각 source element 또는 state에 대한 프로그래밍 방식) : handle
    • 각 source item 에 대해 비동기 작업을 수행하려는 경우 (예를 들어 urls to http request) : flatMap + an async Publisher-returning method
      • 일부 데이터를 무시 : 해당 조건에 대해 flatMap lambda에서 Mono.empty()를 반환
      • 원래 sequence 순서를 유지 : Flux#flatMapSequential
      • Mono source에서 async task가 여러 값을 반환하려는 경우 : Mono#flatMapMany
  • 기존 sequence에 사전 설정된 요소를 추가하고 싶은 경우
    • 시작 부분에 : Flux#startWith(T...)
    • 끝 부분에 : Flux#concatWith(T...)
  • Flux를 합하고 싶은 경우
    • list로 : Flux#collectList, Flux#collectSortedList (Mono<List>로 변환되어 반환됨)
    • map으로 : Flux#collectMap, Flux#collectMultiMap (Mono<Map<K, T>>, Mono<Map<K, Collection>>로 변환되어 반환됨)
    • 임의 container : collect
    • sequence 수 : count
    • 각 element 사이에  function 적용 (예: 합계) : reduce
      • 중간에 값을 방출 : scan
    • 조건절로 boolean 값 호출
      • 모든 value에 대해 (AND) : all
      • 아무 value에 대해 (OR) : any
      • 아무 값이라도 있는지 : hasElements
      • 해당 값이 있는지 : hasElement
  • publisher 결합
    • 순서대로 결합 : Flux#concat 또는 .concatWith(other)
      • 남은 publisher가 방출될 때까지 오류를 지연시키는 경우 : Flux#concatDelayError
      • 다음 publisher를 subscribe : Flux#mergeSequential
    • 방출된 순서대로 결합 (기존 순서가 아님)  : Flux#merge / .mergeWith(other)
      • 다른 타입으로 변환 : Flux#zip / Flux#zipWith
    • 쌍으로 된 값
      • 2개의 Mono를 Tuple2로 변환 : Mono#zipWith
      • N개의 Mono를 모두 완료된 시점에 변환 : Mono#zip
    • 완료로 처리하는 경우
      • 1개의 Mono와 아무 source를 Mono<Void>로 : Mono#and
      • N개의 source를 모두 완료된 시점에 반환 : Mono#when
    • 임의의 container 유형
      • 모든 source가 하나의 요소를 방출하고 이런 요소를 Tuple2에 결합 : Flux#zip
      • 다른 side에 새 value가 도착할 때마다 : Flux#combineLatest
    • 처음 방출된 sequence만 고려 : Flux#first, Mono#first, mono.or(otherMono).or(thirdMono), flux.or(otherFlux).or(thridFlux)
    • source sequence의 요소에 의해 트리거 : switchMap
    • 다음 publisher의 시작에 의해 트리거 : switchOnNext
  • 기존 sequence를 반복 : repeat
    • 시간 간격으로 : Flux.interval(duration).flatMap(tick -> myExistingPublisher)
  • empty sequence를 가지고 있는 경우
    • 대체 값을 원하는 경우 : defaultIfEmpty
    • 다른 sequence를 원하는 경우 : switchIfEmpty
  • sequence를 가지고 있고 값에는 관심이 없는경우 : ignoreElements
    • Mono로 완료를 원하는 경우 : then
    • 마지막에 다른 작업의 완료를 기다리고 싶은 경우 : thenEmpty
    • 마지막에 다른 Mono로 변경하고 싶은 경우 : Mono#then(mono)
    • 마지막에 단일 값을 방출하고 싶은 경우 : Mono#thenReturn(T)
    • 마지막에 Flux로 변경하고 싶은 경우 : thenMany
  • 완료를 연기하고 싶은 Mono가 있는 경우
    • 이 값에서 파생된 다른 publisher가 완료될 때까지 : Mono#delayUntil(Function)
  •  반복적인 요소를 sequence graph로 확장하고 조합하고 싶은 경우
    • graph를 먼저 확장하기 : expand(Function)
    • depth를 먼저 확장하기 : expandDeep(Function)

시퀀스 엿보기 (Peeking into a Sequence)

  • 최종 sequence를 수정하지 않고 다음을 수행
    • 다음에 대한 추가 동작에 대한 알림을 받거나 실행 (부수 효과)
      • 방출 시 : doOnNext
      • 완료 시 : Flux#doOnComplete, Mono#doOnSuccess
      • 에러 종료 시 : doOnError
      • 취소 시 : doOnCancel
      • sequence의 시작 시 : doFirst
        • Publisher#subscribe(Subscriber)에 연결되어 있음
      • subscription 이후 : doOnSubscribe
        • Subscriber#onSubscribe(Subscription)에 연결되어 있음
      • 요청 시 : doOnRequest
      • 완료 또는 에러 시 : doOnTerminate (Mono의 경우 결과도 포함함)
      • downstream으로 전파 된 이후 : doAfterTerminate
    • Signal로 표현되는 모든 유형의 신호 : Flux#doOnEach
    • 종료 조건 (complette, error, cancel) 시 : doFinally
  • 모든 이벤트를 알고 싶을 때
    • Signal 객체로 표현되는 각각에 대해
      • sequence 외부의 콜백에서 : doOnEach
      • 기존 onNext 방출 대신 : materialize
        • onNexts로 다시 돌아가려고 할 때 : dematerialize
    • 로그를 보고 싶은 경우 : log

시퀀스 필터링 (Filtering a Sequence)

  • sequence를 필터링하고 싶은 경우
    • 임의 criteria를 사용할 때 : filter
      • 비동기로 계산하고 싶을 때 : filterWhen
    • 방출된 객체의 유형을 제한 : ofType
    • 모든 값을 무시 : ignoreElements
    • 중복을 무시
      • 전체 sequence에 대해 : Flux#distinct
      • 이후 방출된 item에 대해 중복 제거 : Flux#distinctUntilChanged
  • sequence의 subset을 획득
    • N 개 요소 획득
      • sequence의 시작에서 : Flux#take(long)
        • 기간 조건으로 : Flux#take(Duration)
        • 첫번째 요소만 Mono로 획득 : Flux#next()
        • 취소 대신 request(N) 사용 : Flux#limitRequest(long)
      • sequence의 끝에서 : Flux#takeLast
      • 기준이 충족될 때까지 (inclusive) : Flux#takeUntil (predicate-based), Flux#takeUntilOther (companion publisher-based)
      • 기준이 충족되는 동안 (exclusive) : Flux#takeWhile
    • 최대 1개 요소 획득
      • 특정 위치에서 : Flux#elementAt
      • 끝에서 : .takeLast(1)
        • 만약 empty인 경우 에러로 방출 : Flux#last()
        • 만약 empty인 경우 기본 값으로 방출 : Flux#last(T)
      • by skipping elements
        • sequence의 시작에서 : Flux#skip(long)
          • 기간 조건으로 : Flux#skip(Duration)
        • sequence의 끝에서 : Flux#skipLast
        • 기준이 충족될 때까지 (inclusive) : Flux#skipUntil (predicate-based), Flux#skipUntilOther (companion publisher-based)
        • 기준이 충족되는 동안 (exclusive) : Flux#skipWhile
      • by sampling items
        • 기간 조건으로 : Flux#sample(Duration)
          • 마지막 요소 대신 샘플링 창에서 첫번째 요소를 유지 : sampleFirst
        • by a publisher-based window : Flux#sample(Publisher)
        • 시간 초과 publisher를 기반으로 : Flux#sampleTimeout 
    • 최대 1개 요소
      • sequence가 empty인 경우 에러로 방출 : Flux#single()
      • sequence가 empty인 경우 기본 값으로 방출 : Flux#single(T)
      • empty sequence도 허용하는 경우 : Flux#singleOrEmpty

에러 처리 (Handling Errors)

  • 오류 sequence 만들기 : error
    • 성공한 flux의 완료로 대체 : .concat(Flux.error(e))
    • 성공한 Mono의 방출로 대체 : .then(Mono.error(e))
    • onNexts 에 너무 많은 시간이 경과한 경우 : timeout
    • lazily : error(Supplier<Throwable>)
  • try/catch에 해당하는 형태
    • throwing : error
    • catching an exception
      • 기본 값으로 반환 : onErrorReturn
      • 다른 Flux나 Mono로 반환 : onErrorResume
      • wrapping and re-throwing : .onErrorMap(t -> new RuntimeExceptionI(t))
    • finally block : doFinally
    • Java 7 의 사용 패턴 : using
  • 오류를 복구
    • by falling back
      • 값으로 : onErrorReturn
      • Publisher 나 Mono : Flux#onErrorResume, Mono#onErrorResume
    • 재시도 : retry
      • companion control Flux에 의해 trigger : retryWhen
      • 표준 backoff 전략 사용 : retryBackoff
    • backpressure 오류 처리 (upstream에서 최대 요청 및 downstream이 충분한 요청을 생성하지 않을 경우 전략 적용)
      • 특별한 IllegalStateException을 throwing : Flux#onBackpressureError
      • 초과 값을 삭제 : Flux#onBackpressureDrop
        • 마지막 본 것을 제외 : Flux#onBackpressureLatest
      • 초과 값을 버퍼링 (바운드 또는 언바운드) : Flux#onBackpressureBuffer
        • bounded buffer가 overflow 시 전략 적용 : Flux#onBackpressureBuffer with a BufferOverflowStrategy

시간 작업 (Working with Time)

  • 측정된 시간과 함께 연관 (Tuple2<Long, T>) 방출
    • subscription 이후: elapsed
    • 컴퓨터 시간 이후 : timestamp
  • 방출 사이에 지연이 너무 많으면 sequence가 중단되기를 원할 경우 : timeout
  • 일정한 시간 간격으로 호출 : Flux#interval
  • 초기 지연 후 단일 0을 방출 : Mono.delay
  • 지연을 유발하고 싶을 때
    • onNext 신호 사이마다 : Mono#delayElement, Flux#delayElements
    • subscription이 발생하기 전 : delaySubscription

Flux 나누기 (Splitting a Flux)

  • Flux<T>를 경계 기준에 따라 Flux<Flux<T>>로 나누고 싶을 때
    • 사이즈 별 : window(int)
      • 겹치거나 삭제하려는 경우 : window(int, int)
    • 시간 별 : window(Duration)
      • 겹치거나 삭제하려는 경우 : window(Duration, Duration)
    • 크기 또는 시간 (카운트에 도달하거나 시간 초과 시 window를 닫음) : windowTimeout(int, Duration)
    • 요소를 기준으로 : windowUtil
      • 다음 window에서 경계를 trigger 한 요소를 방출 : .windowUntil(predicate, true)
      • 요소가 일치하는 동안 window를 열린 상태로 유지 :  windowWhile (일치하지 않는 요소는 방출되지 않음)
    • control Publisher에서 onNexts로 표시되는 임의 경계에 의해 : window(Publisher), windowWhen
  • Flux<T>와 버퍼 요소를 경계 내에서 함께 나누고 싶을 때
    • List로
      • size 별 : buffer(int)
        • 겹치거나 삭제하려는 경우 : buffer(int, int)
      • 기간 별 : buffer(Duration)
        • 겹치거나 삭제하려는 경우 : buffer(Duration, Duration)
      • 크기나 기간 경계 기준으로 : bufferTimeout(int, Duration)
      • 임의의 기준 경계 별 : bufferUntil(Predicate)
        • 다음 버퍼에서 경계를 유발한 요소를 입력 : .bufferUntil(predicate, true)
        • 일치한 동안 버퍼링하고 경계를 유발한 요소를 삭제 : bufferWhile(Predicate)
      • control publisher에서 onNexts로 표시되는 임의 경계에 의해 : buffer(Publisher), bufferWhen
    • 임의의 collection 유형 C로 : buffer(int, Supplier<C>)
  • 특성을 공유하는 요소가 동일한 sub-flux로 끝나도록 Flux<T>를 분할 : groupBy(Function<T, K>)
    Flux<GroupedFlux<K, T>>를 반환하며 각 GroupdFlux는 key()를 통해 엑세스 할 수 있는 동일한 K key를 공유함

 

동기로 변환 (Going Back to the Synchronous World)

Note : Mono#toFuture 를 제외한 모든 메서드는 "non-blocking only"로 표시된 스케쥴러에서 호출되면 UnsupportedOperatorException을 발생시킨다.

  • Flux<T>를 가지고 있고
    • 첫번 째 요소를 획득 할 때 까지 block : Flux#blockFirst
      • timeout과 함께 : Flux#blockFirst(Duration)
    • 마지막 요소를 획득 할 때까지 block : Flux#blockLast
      • timeout과 함께 : Flux#blockLast(Duration)
    • Iterable<T> 동기로 변환 : Flux#toIterable
    • Stream<T> 동기로 변환 : Flux#toStream
  • Mono<T>를 가지고 있고
    • 값을 획득 할 때까지 block : Mono#block
      • timeout과 함께 : Mono#block(Duration)
    • CompletableFuture<T> 로 :  Mono#toFuture

각 method 사용 예

Flux merge, concat method

두 개의 publisher를 합칠 땐 merge, 순서대로 merge를 해야할 땐 concat

N개가 되므로 Flux로만 사용할 수 있음

Mono<String> a = Mono.just("TEST");
Mono<String> b = Mono.just("TEST");

Flux<String> mergeFlux = Flux.merge(a, b);
Flux<String> concatFlux = Flux.concat(a, b);

Exceptions utility class

publisher의 시퀀스 중간에 runtime exception이 아닌 exception이 발생한 경우 시퀀스가 중단된다.

중단되지 않고 의도된 대로 시퀀스의 에러 처리 과정이 진행되도록 하려면 runtime exception으로 wrapping하는 과정이 필요하고 이를 도와주는 헬퍼 클래스를 reactor에서 제공한다.

예를 들어 아래와 같은 method는 IOExcpeption을 발생시킬 수 있다.

public String convert(int i) throws IOException {
    if (i > 3) {
        throw new IOException("boom " + i);
    }
    return "OK " + i;
}

위 method를 아래처럼 사용해야 IOExcpeption으로 인한 중단을 막을 수 있다.

Flux<String> converted = Flux
    .range(1, 10)
    .map(i -> {
        try { return convert(i); }
        catch (IOException e) { throw Exceptions.propagate(e); }
    });

이후 subscribe하고 error에 반응 할 때 wrapping된 에러를 다시 꺼낼 수 있다.

converted.subscribe(
    v -> System.out.println("RECEIVED: " + v),
    e -> {
        if (Exceptions.unwrap(e) instanceof IOException) {
            System.out.println("Something bad happened with I/O");
        } else {
            System.out.println("Something bad happened");
        }
    }
);

doOn* method

시퀀스의 요소를 실제로 수정하지 않고 사용자 지정 작업을 수행하는 경우 doOn으로 시작하는 "side effect" method를 사용

Flux
	.range(1, 2)
	.doOnNext(System.out::println)
	.doOnError(System.out::println)
	.doOnComplete(System.out::println)
	.subscribe();    

RxJava를 Reactor에서 사용할 수도 있음

RxJava를 Reactor에서 쓰기 위한 add-on library가 있음 (io.projectreactor.addons:reactor-adapter)

zip method

여러 publisher를 묶어 새로운 publisher를 만들 때 사용

Mono<String> a = Mono.just("TEST");
Mono<String> b = Mono.just("TEST");

Mono<Tuple2<String, String>> zip = Mono.zip(a, b);
Flux<Tuple2<String, String>> zip2 = Flux.zip(a, b);

first method

first : 여러 publisher 중 제일 빠른 1개만 선별, flux의 경우 flux의 시퀀스 중 가장 빨리 도달한 시퀀스를 가진 flux를 선별함 (flux의 시퀀스중 젤 빠른 걸 선별하는게 아니고 그걸 가진 flux를 선별하는 것임)

아래 예에서는 Flux.first에 Mono를 넣었지만 Flux도 넣을 수 있음.

Mono.first는 Mono만 넣을 수 있음.

Mono<String> a = Mono.just("TEST");
Mono<String> b = Mono.just("TEST");

Mono<String> firstMono = Mono.first(a, b);
Flux<String> firstFlux = Flux.first(a, b);

ignoreElement / ignoreElements method

요소 반환은 하지 않지만 Mono, Flux의 타입 유형은 유지하려는 경우 사용함

Mono는 static method가 있어서 mono, flux를 mono로 반환해주는데 Flux는 static method가 없다. (어째서?)

Mono<String> mono = Mono.just("TEST");
Flux<String> flux = Flux.just("TEST");

Mono<String> ignoreElements = Mono.ignoreElements(mono);
Mono<String> ignoreElements2 = Mono.ignoreElements(flux);
Mono<String> ignoreElement = mono.ignoreElement();
Mono<String> ignoreElements3 = flux.ignoreElements();

then method

기존 값을 버리고 Void 또는 해당 매개 변수로 넘겨받은 Mono로 시퀀스를 이어가려고 할 경우 사용

Mono<String> strMono = Mono.just("TEST");
Mono<Integer> intMono = Mono.just(1);
Flux<String> flux = Flux.just("TEST");

Mono<Void> then = strMono.then();
Mono<Integer> then2 = strMono.then(intMono);
Mono<Void> then3 = flux.then();
Mono<String> then4 = flux.then(strMono);

justOrEmptry method

Mono 생성 시 null value인 경우 empty 객체로 처리할 경우 사용

Mono<String> justOrEmpty = Mono.justOrEmpty("");

defer method

지연된 instance를 publisher에 등록하기 위해 사용

Flux.defer(() -> Flux.fromIterable(repository.findAll()))
반응형
profile

파란하늘의 지식창고

@Bluesky_

도움이 되었다면 광고를 클릭해주세요