RxJava 3.0

RxJava란 비동기적인 이벤트 기반의 프로그램을 만들 때 사용하는 라이브러리로써 모든 것을 스트림으로 처리한다.

이번 3.0.0 릴리즈에는 코드 전반적으로 많은 정리와 성능 개선이 이루어졌다고 한다. 어떤 점들이 개선되고 변화되었는지 알아보자.

메이저 버전넘버가 수정됨에 따라 아티팩트 아이디도 변경이 되었다. 그레이들에 RxJava3를 추가하기 위해서는 모듈레벨의 build.gradle에 다음과 같은 내용을 추가해야한다.

dependencies {
    implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
    implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
}

Java8

오랜시간동안 Android 진영에서 Java 6 API로 제한해왔으나 곧 출시할 안드로이드 스튜디오 4.0 부터는 디슈거링(desugaring)이라 불리는 프로세스 덕분에 Java7, Java8 기능들을 API제한 없이 사용할 수 있게 된다. 이런 변화는 다음과 같은 기능들을 사용할 수 있게 된다는 뜻이다.

  • Stream : java.util.stream.Stream을 소스로 사용하거나 블라킹된 스트림들을 시퀀스로 노출한다.
  • Stream Collectors : 표준 변환(standard transformations)으로 지정된 컬렉션으로 아이템들을 합친다.
  • Optional: RxJava에서 요구하는 non-null 조건을 돕는다.
  • CompletableFutre :  CompletableFutures를 비-차단적으로 사용하거나 단일 결과를 CompletableFutures로 나타낸다.
  • non-null 애노테이션의 사용 : 특정 상황에서 일부 functional 타입이 null을 반환할 수 있도록 한다.

하지만 몇가지 기능은 지원하지 않는다

  • java.time.Duration : 과부하가 걸릴 수 있으므로 time+ unit으로 분리 하여 사용한다.
  • java.util.function : 이것들은 Throwable들을 던질 수 있으므로 과부하가 걸리거나 애매모호한 경우를 만들게 된다.

안드로이드 프로젝트에서는 다음 문서를 참조하여 Java8언어 지원을 활성화 할 수 있다.

 

패키지 구조

RxJava 3 컴포넌트는 io.reactivex.rxjava3 패키지로 구성된다. (RxJava 1에는 rx가 있고 RxJava 2는 io.reactivex다). 이런 특징은 RxJava3가 이전 버전들과 같이 사용될 수 있게 한다. 추가적으로 RxJava의 핵심 타입인 Flowable, Observer 등은 io.reactivex.rxjava3.core로 이동되었다.

동작 변경 사항

때로는 컴포넌트나 연산자의 설계가 일부 상황에서 부적절하거나 너무 제한적이기 때문에 이번 메이저 업데이트에서 이를 개선할 수 있다.

에러 전달 문제

RxJava 2.x에서는 에러를 손실 없이 전달하도록 목표를 설정했다. 그럼에도 불구하고 다양한 경쟁 조건(race condition)에서 에러의 발행이 누락될 수 있었다. 2.x 패치에서 이 문제를 해결하면 너무 많은 문제가 발생했을 수 있으므로 3.0에서 이를 수정했다. 이제 내부적으로 에러를 지연시키는 연산자를 취소하면 RxJavaPlugins.onError ()를 통해 해당 오류를 전역 오류 처리기에 알리게 된다.

오류가 전달 되지 않는 샘플 코드를 확인하자

Connectable 소스에 리셋 기능 추가

Connectable 타입의 목적은 connect()를 호출 할 때 실제 업스트림이 스트리밍 되기 전에 하나 이상의 소비자를 준비 할 수 있도록 하는 것이다. 처음에는 올바르게 동작해도 업스트림이 끊기는 것 대신 종료되면 문제가 발생했었다. 이런 경우 Connectable 타입이 replay() 또는 publish()로 작성되었는지 여부에 따라 새로운 소비자는 발행되는 아이템들을 수신 할 수 없거는 아이템들을 놓치게 된다. 

RxJava3 부터는  Connectable 타입에서 reset()을 호출하면 새로운 소비자가 구독하고, connect()를 호출 하더라도 새로운 아이템들을 발행하게 된다.

Publish 리셋 예제와 Replay 리셋 예제를 확인하자

Flowable.publish 중지

Flowable.publish의 구현은 내부 큐를 호스팅하여 다운 스트림으로부터의 배압을 지원한다. 만약 ConnectableFlowable의 소비자들이 취소된 경우,  2.x에서는 이러한 큐 그리고 그에 따른 결과 생성되는 업스트림 소스는 느리게 소모된다. 이로 인해 일시적으로 소비자가 부족할 때 예상치 못한 아이템 손실이 발생한다.

RxJava 3부터는 구현이 일시 중지되고 이미 내부 큐에있는 항목을 나중에 구독한 소비자가 즉시 사용할 수 있게 된다.

Processor.offer null 체크

PublishProcessor.offer (), BehaviorProcessor.offer () 또는 MulticastProcessor.offer()의 인자로 null을 넣고 호출하면 onError를 통해 신호를 보내지 않고 프로세서를 종료하는 대신 NullPointerException이 발생한다. 이것은 이제 Reactive Streams 사양에 필요한 onNext 메소드의 동작과 일치한다 .

MulticastProcessor.offer 결합 체크

MulticastProcessor는 Flowable.publish 연산자와 같이 배압을 조정하는 프로세서로 설계되었다. 올바른 소스에 구독 할 때 연산자 결합과 같은 내부 최적화가 포함된다. 사용자는 프로세서 자체에 대한 참조를 유지할 수 있으므로 개념상 onXXX 메소드를 호출하여 문제를 일으킬 수 있다. 위에서 언급한 결합이 발생하는 동안 호출 될 때 2.x에서 정의되지 않은 동작을 유발하는 offer 메소드도 마찬가지다. 3.x에서 offer 메소드는 IllegalStateException을 발생시키고 프로세서의 내부 상태를 방해하지 않는다.

groupBy에서 그룹 폐지

groupBy 연산자는 반응 소스를 주요 출력으로 신호하는 특이한 연산자 중 하나다. 소비자는 이러한 내부 소스를 구독해야 한다. 따라서 기본 시퀀스가 ​​취소 된 경우 (즉, Flowable <GroupedFlowable <T >> 자체) 소비자는 여전히 그룹에 항목을 계속 수신해야하지만 새 그룹을 만들지 않아야 한다. 그런 다음 이러한 내부 소비자가 모두 취소 된 경우에만 원본 소스를 취소 할 수 있습니다.

그러나 2.x에서는 내부 소스의 소비를 강요하는 것이 없으므로 그룹을 완전히 무시하여 원래 소스의 취소를 방지하고 리소스 누수로 이어질 수 있다.

3.x에서는 groupBy의 동작이 변경되어 그룹을 생성 할 때 다운 스트림이 동기적으로 구독해야 한다. 그렇지 않으면 그룹은 “폐지된(abandoned)”것으로 간주되어 종료된다. 이렇게하면 버려진 그룹이 원래 소스의 취소를 막을 수 없다. 늦은 소비자가 여전히 그룹에 가입 한 경우 그룹 생성을 트리거 한 항목을 계속 사용할 수 있다. 

groupBy 그룹 폐지 예제를 확인하자.

groupBy에서의 배압

Flowable.groupBy 연산자는 내부 그룹 소비자의 배압을 조정하고 원래 Flowable의 요청을 조정하는 방식이 훨씬 더 독특하다. 문제는 그러한 요청으로 인해 새로운 그룹, 자체 요청한 그룹의 새 항목 또는 완전히 다른 그룹의 새 항목이 생성 될 수 있다. 따라서 그룹은 서로의 항목 수신 능력에 영향을 미치고 특히 일부 그룹이 전혀 소비되지 않는 경우 시퀀스를 중단시킬 수 있다.

후자의 경우 개별 그룹의 수가 flatMap의 동시성 레벨 (기본 128)보다 큰 flatMap을 통해 그룹을 병합 할 때 발생할 수 있으므로 신선한 그룹이 구독되지 않고 오래된 그룹이 공간을 만들기 위해 완료되지 않을 수 있다. concatMap을 사용하면 동일한 문제가 즉시 나타날 수 있습니다.

RxJava는 논-블라킹이기 때문에 이러한 자동 정지는 감지 및 진단하기가 어렵다 (즉, groupBy 또는 flatMap에서 스레드가 차단되지 않음). 따라서 3.x에서는 groupBy의 동작을 변경하여 즉시 다운 스트림에서 새 그룹을 수신 할 수없는 경우 시퀀스가 ​​MissingBackpressureException으로 종료된다.

groupBy 배압 예제을 확인하자

window에서 Window 폐기

groupBy와 유사하게, window연산자는 외부 시퀀스가 ​​취소 될 때 (즉, 제한된 윈도우 세트로만 작업 할 때) 아이템을 계속 수신해야하는 내부 리액티브 시퀀스를 방출한다. 마찬가지로 모든 window 소비자가 취소하면 원본 소스도 취소해야 한다.

그러나 2.x에서는 내부 소스의 소비를 강요하는 것이 없으므로 window를 완전히 무시하여 원래 소스의 취소를 막고 리소스 누수로 이어질 수 있다.

3.x에서는 모든 window 연산자의 동작이 변경되어 그룹을 생성 할 때 다운 스트림을 동기적으로 구독해야 한다. 그렇지 않으면 window는 “폐지된”것으로 간주되고 종료된다. 이렇게하면 버려진 창문으로 인해 원본 소스가 취소되지 않는다. 늦은 소비자가 여전히 창에 가입 한 경우 창 생성을 트리거 한 항목을 계속 사용할 수 있다.

window 폐기 예제를 확인하자.

CompositeException cause generation

1.x 및 2.x에서 CompositeException.getCause () 메소드를 호출하면 내부 집계된 예외 목록에서 예외 체인이 생성되었다. 이런 일들이 수행된 주된 이유는 Java6의 부족함이 예외기능이 Java 7이상의 예외들을 억제해왔기 때문이다. 그러나 구현시 예외사항이 변경되었거나 체인을 전혀 설정할 수 없는 경우가 있었으나 2.x에서 문제를 해결하는 것은 위험했다.

3.x를 사용하면, 해당 메소드는 스택 추적을 요청하면 집계된 예외를 건드리지 않고 출력을 생성하는 Cause Exception을 구성한다.

매개변수 유효성 예외사항 변경

2.x의 일부 표준 연산자는 해당 매개변수가 유효하지 않은 경우 IndexOutOfBoundsException을 발생시킨다. 다른 매개 변수 유효성 검사 예외와 일관성을 유지하기 위해 다음 연산자는 대신 IllegalArgumentException을 발생시킨다.

  • skip
  • skipLast
  • takeLast
  • takeLastTimed

콜백에서 선행 취소

2.x에서 fromRunnable 및 fromAction을 통해 생성 된 취소 시퀀스는 다운 스트림이 시퀀스를 즉시 취소 / 삭제했을 때 다른 fromX 시퀀스와 일치하지 않았다.

3.x에서 이러한 사전 취소는 지정된 콜백을 실행하지 않는다.

Runnable run = mock(Runnable.class);

Completable.fromRunnable(run)
.test(true); // cancel upfront

verify(run, never()).run();

Using cleanup 순서

연산자 using은 자원을 정리해야하는 시기를 결정하는 eager 매개 변수를 갖는다. true는 종료전을 의미하고 false는 종료후를 의미한다. 불행히도 이 설정은 donwstream 취소시 정리 순서에 영향을 미치지 않으며, 업스트림을 취소하기 전에 항상 리소스를 정리했다.

3.x에서 정리 순서는 이제 시퀀스가 ​​종료되거나 취소 될 때로 일관된다. true는 종료 전 또는 업스트림을 취소하기 전을 의미하고, false는 종료 후 또는 업스트림을 취소 한 후를 의미한다.

API의 변경

주요 릴리스를 통해 요소를 추가, 변경 또는 제거하여 API 표면을 정리하고 개선 할 수 있다. 3.x에서는 몇 가지 설명이 필요한 변경 사항이 있다.

 

Functional interface

RxJava 2.x는 Java 6 및 Java 8에서 동일한 유형으로 라이브러리를 사용할 수 있도록 io.reactivex.functions에 사용자 정의 Functional 인터페이스 세트를 도입했다. 이러한 사용자 정의 유형의 두 번째 이유는 표준 Java 8 function 타입들이 어떤 확인된 예외사항을 지원하지 않으므로 RxJava 연산자를 사용할 때 다소 불편을 겪을 수 있다.

RxJava 3가 Java 8을 기반으로하고 있음에도 불구하고,  표준 Java 8 Functional 인터페이스 관련 문제는 지속되고, 현재는 안드로이드에서  디슈거링이 가능한 이슈들과 그리고 예외처리가 불가하다.
따라서 3.x에서는 사용자 정의 인터페이스를 유지하지만 @FunctionalInterface 애노테이션이 적용되었다 (Android에서는 안전하고 이는 무시된다)

@FunctionalInterface
interface Function<@NonNull T, @NonNull R> {
    R apply(T t) throws Throwable;
}

추가적으로 Java8은 타입 인수에 애노테이션을 선언하는것을 허용하고 타입 인자가 독립적으로 사용되는것을 허용한다. 그리고 모든 functional 인터페이스들은 nullability 애노테이션을 수신하는것을 허용한다.

throws의 확장

함수형 인터페이스에서 커스텀 Exception을 던지는 것에 대한 작은 문제점은 몇몇 써드파티 API가 Exception의 서브클래스가 아닌 예외사항을 던지거나 그냥 Throwable을 던지기 때문이다.

그렇기 떄문에 3.x버전에서는 함수형 인터페이스의 예외 처리를 확장했다. 이 확장은 람다  또는 클래스 구현 에 대해 중요하지 않을 수 있지만 그냥 알아 두자. 자세한 내용은 이곳에서 확인하자

새로운 타입!

Supplier

RxJava 2.x는 call() 메소드가 선언되어있고 기본적으로 Exception을 발생시키는 표준 java.util.concurrent.Callable을 이미 지원했다. 불행히도, 우리의 커스텀 functional 인터페이스가 Throwable을 던지기 위해 확장되었을 때, Java에서는throws 절을 넓히거나 좁히거나 버릴 수 없기 때문에 Callable을 확장하는것이 불가능했다. 따라서 3.x에는 가능한 가장 넓은 범위를 정의하는 io.reactivex.rxjava3.functions.Supplier 인터페이스가 도입되었다.

자세한 내용은 문서에서 확인하자

Converters

2.x에서 to() 연산자는 일반 함수를 사용하여 플로우의 어셈블리 시간을 임의 유형으로 변환 할 수있었습니다. 이 접근의 단점은 각 기본 반응 유형이 메소드 시그니처에서 동일한 기능 인터페이스를 가졌기 때문에 동일한 클래스 내에서 다른 반응 유형에 대해 여러 컨버터를 구현하는 것이 불가능했다. 이 문제를 해결하기 위해 as 연산자와 XConverter 인터페이스가 2.x에 도입되었으며,이 인터페이스는 고유하며 동일한 클래스에서 구현 될 수 있다. 라이브러리의 이진 호환성으로 인해 2.x에서로 시그니처를 변경할 수 없다.

3.x부터 as() 메소드가 제거되었으며 to() 메소드는 이제 각각의 XConverter 인터페이스 (패키지 io.reactivex.rxjava3.core에서 호스팅)와 함께 작동한다

Moved components

Disposables

Java 8 및 Android의 desugaring 도구로 별도의 팩토리 클래스 대신 정적 인터페이스 메소드를 사용할 수 있다. 지원 클래스 io.reactivex.disposables.Disposables는 모든 메소드를 Disposable 인터페이스 자체 (io.reactivex.rxjava3.disposables.Disposable)로 옮기는 것이 주요 후보였다.

기존 팩토리 메서드를 사용할 때 

Disposable d = Disposables.empty();

지금은 이렇게 사용 가능하다

Disposable d = Disposable.empty();

DisposableContainer

내부적으로 RxJava 2.x는 어디에서나 CompositeDisposable을 사용하는 대신 Disposable 컨테이너의 추상화를 사용하므로 보다 적합한 컨테이너 유형을 사용할 수 있다. 이는 CompositeDisposable 및 기타 내부 컴포넌트로 구현된 내부 DisposableContainer를 통해 처리된다. 불행하게도, 공용 클래스는 내부 인터페이스를 참조했기 때문에 OSGi 환경에서 RxJava가 경고를 발생시킨다. RxJava 3에서 DisposableContainer는 이제 io.reactivex.rxjava3.disposables.DisposableContainer에있는 공용 API의 일부이며 더 이상 OSGi 문제를 일으키지 않는다.

API 프로모션

RxJava 2.2.x 는 여전히 실험적인 연산자들을 가지고 있었다. 이러한 연산자들이 3.x에 포함되었다.

Flowable
Observable
Maybe
Single
Completable

추가된 API

RxJava3는 꽤 많은 양의 새로운 연산자가 추가되었다. 추가된 연산자들에 대해서 살펴보려면 문서를 참조하자.

자바 8 추가사항

이제는 API기준이 Java8에 맞춰졌다. RxJava는 직접적으로 자바8의 새로운 타입들을 지원한다. RxJavaJdk8Interop라이브러리 등 외부 라이브러리가 필요 없다.

안드로이드의 desugar는 자바8 API들을 모두 지원하지 않을 수도 있다. 그래도 딱히 문제는 되지 않는다.

자바8 Functional 인터페이스 

RxJava3 자바 8의 functional 인터페이스를 직접적으로는 지원하지 않는다. 왜냐하면 RxJava는 RxJava내에 포함된 funtional 인터페이스를 사용하는 것을 선호한다. 그래야 더 많은 플랫폼을 지원하고 에러를 핸들링하기 쉽기 때문이다.

RxJava3 에서는 자바8 패키지에 포함된 타입을 Flowable, Observable 등으로 변환하는 기능을 지원하고 있으며 mapOptional 연산자를 지원한다. 자세한 내용은 문서를 확인하자

 

변경된 API 이름

startWith

startWith 메소드 이름이 애매모호한 경우가 있었는데,  다음과 같이 수정되었다.

Flowable, Observable
  • startWithArray
  • startWithItem
  • startWithIterable

 

onErrorResumeNext

onErrorResumeNext가 이름이 애매모호해서 onErrorResumeWith로 변경되었다.

zipIterable

zipIterable을 zip으로 변경

배열을 인자로 갖는 combineLatest

combineLatestArray와 combineLatestArrayDelayError로 이름을 변경했다.

Single.equals

sequenceEqual로 이름을 변경

Maybe.flatMapSingleElement

이 연산자는 혼란스러웠고, 이름을 flatMapSingle로 변경했다.

API 시그니처 변경 

Callable 에서 Supplier로 변경

에러처리를 위해 Callable 대신 Supplier를 통해 콜백을 받는 것으로 변경되었고, 그로 인해 내부 메소드 이름도 call()에서 get()으로 변경 되었다.

Maybe.defaultIfEmpty

아이템 발행의 성공과 에러를 보장하기 위해 반환 타입을 Single로 수정했다.

concatMapDelayError parameter order

prefetch / maxConcurrency 전에 boolean 매개 변수를 사용하는 다른 연산자와 일치하도록 concatMapDelayError 및 concatMapEagerDelayError에서 tillTheEnd 인수의 순서를 변경했다.

Flowable.buffer with boundary source

Flowable에 시그니처가 잘못 선언되어 있었고, Publisher로 변경됨

Maybe.flatMapSingle

Maybe가 empty일 때 flatMap연사자들이 기대하는 방식대로 동작하지 않기 때문에 flatMapSingleElement연사자는 flatMapSingle 연산자로 이름이 변경되었다.

제거된 API들

  • getValue(hot sources)
  • Maybe.toSingle(T)
  • subscribe(인자 4개 짜리)
  • Single.toCompletable()
  • Completable.blockingGet()
  • 기타 등등

자세한 정보는 문서를 확인하자

상호 운용성

Reactive Streams

RxJava 3은 여전히 ​​Reactive Streams 사양을 따르므로 io.reactivex.rxjava3.core.Flowable은 org.reactivestreams.Publisher를 입력으로 받아들이는 모든 3rd party 솔루션에 대해 호환 가능한 소스다.

RxJava1

RxJava는 7년이상 되었고, 많은 사용들이 여전히 이를 이용하고 있다. 이런 상황때문에 1.x에서 3.x로 마이그레이션을 돕는 외부 라이브러리를 제공 하고 있다. 

RxJava2

2.x에서 3.x로 넘어가는것은 성가실 수 있다. 왜냐하면 2.x만의 생태계가 있기 때문이다. RxJava3는 RxJava2와 같이 사용될 수 있도록 설계되었지만 한계점은 분명히 있다. Flowable 과 Publisher사이의 상호운용을 제한하고 있고, 2.x의 Observable을 3.x의 Observable과 상호운용할 수 있는 방법이 없다. 이 둘은 완전이 다른 타입이다.

이런 문제를 해결하기 위해 점진적인 마이그레이션을 돕기 위한 라이브러리를 제공하고 있다.

Java 9 , Swing, Project  Loom

안드로이드에 해당 사항 없음

 

 

 

카테고리: JavaRxJava

2개의 댓글

이기정 · 2020년 3월 4일 11:26 오전

안녕하세요. 지금까지 본 블로그 중 RxJava3.0과 관련하여 가장 잘 쓴 글이라고 생각이 들어 이렇게 댓글 남기고 갑니다.
감사드립니다! 그리고, 해당 글을 참고하여 추후 제 블로그 및 프레젠테이션에도 내용을 넣고자하는데, 출처만 밝히고 사용해도 괜찮을까요?

감사합니다.

    Charlezz · 2020년 3월 6일 7:20 오후

    저도 RxJava 3.0 관련 페이지 내용을 번역한 글입니다.
    자유롭게 사용하셔도 됩니다. 감사합니다

답글 남기기

Avatar placeholder

이메일은 공개되지 않습니다. 필수 입력창은 * 로 표시되어 있습니다.