Reactive Programinng(Rx Java)
Reactive Programing(1) – 리액티브 프로그래밍 개념잡기
Reactive Programing(2) – Reactive Operator
Reactive Programing(3) – Reactive Operator
Reactive Programing(4) – Scheduler
Reactive Programing(5) – 안드로이드에서의 RxJava 활용
리액티브 프로그래밍이란?
-> 데이터의 흐름과 전달에 관한 프로그래밍 패러다임
명령형 프로그래밍 vs 리액티브 프로그래밍
명령형 프로그래밍 – 작성된 코드가 정해진 순서대로 실행됨
리액티브 프로그래밍 – 데이터 흐름을 먼저 정의하고 데이터가 변경되었을 때 연관되는 함수나 수식이 업데이트 되는 방식
자바와 리액티브 프로그래밍의 관계
- 기존 pull 방식의 개념을 push방식으로 바꿈
PUSH : 주제객체가 구독객체에게 상태를 보내는 방식
PULL : 구독객체가 주제객체에게서 상태를 가져가는(요청하는) 방식 - 함수형 프로그래밍의 지원을 받음
함수형 프로그래밍은 Side Effect(부수효과)가 없다.콜백이나 옵저버 패턴이 스레드에 안전하지 않은 이유는 같은 자원에 여러 스레드가 Race condition(경쟁조건)에 빠지게 되었을 때 알 수 없는 결과가 나오기 때문이다. 이를 Side Effect라 함.함수형 프로그래밍은 Side Effect가 없는 순수 함수를 지향하므로 스레드에 안전하다.
리액티브 프로그래밍의 개념(위키 펌)
컴퓨터 프로그램에는 세가지 종류가 있다. 첫째는 주어진 입력값을 바탕으로 결과를 계산하는 변환프로그램. 일반적으로 컴파일러와 수치 계산 프로그램입니다. 두 번째는 상호작용프로그램으로 프로그램이 주도하는 속도로 사용자 혹은 다른 프로그램과 상호작용을 합니다. 사용자의 관점에서 볼때 시분할 시스템은 상호작용프로그램입니다. 리액티브 프로그램은 주변의 환경과 끊임없이 상호작용을 하는데 프로그램이 주도하는 것이 아니라 환경이 변하면 이벤트를 받아 동작합니다. 상호작용 프로그램은 자신의 속도에 맞춰 일하고 대부분 통신을 담당한다. 반면에 리액티브 프로그램은 외부 요구에 끊임없이 반응하고 처리한다.
Android 에서 RxJava 설정하기
app레벨에서 build.gradle의 dependency에 아래의 내용을 추가
implementation "io.reactivex.rxjava2:rxjava:2.x.y"
x,y에 들어갈 최신버전은 아래에 링크에서 참조해주세요. (현재 2.1.12가 최신입니다.)
https://github.com/ReactiveX/RxJava
혹시라도 RxKotlin쓰실분은
implementation 'io.reactivex.rxjava2:rxkotlin:x.y.z'
현재 2.2.0이 최신입니다.
https://github.com/ReactiveX/RxKotlin
RxJava에 관한내용을 포스팅하고 있으나 샘플코드는 RxKotlin 입니다.
Observable
Observable은 데이터 흐름에 맞게 알림을 보내 구독자가 데이터를 처리할 수 있도록한다.
Observable은 옵저버패턴을 구현하며, 리액티브 프로그래밍은 Observable로 시작해서 Observable로 끝난다고 해도 과언이 아니다.
안드로이드에서 버튼을 클릭하면 이벤트를 받을수 있게 하는 OnClickListener가 대표적인 옵저버 패턴의 예라고 볼 수 있다.
RxJava에서 Observable은 세가지를 구독자에게 전달한다.
- onNext : Observable이 데이터의 발행을 알린다. 기존의 옵저버 패턴과 같다
- onComplete : 모든 데이터가 발행이 완료 되었음을 알린다. 그러므로 더 이상의 onNext는 발생하지않으며, 마지막에 딱 한번만 호출된다.
- onError : Observable에서 어떤 이유로 에러가 발생했음을 알린다. onError 이벤트가 발생하면 이후에 onNext 및 onComplete 이벤트가 발생하지 않는다. 즉 Observable의 실행을 종료한다.
Observable 클래스에는 옵저버블 객체를 생성하는 팩토리함수, 중간결과를 처리하는 함수, 디버그 및 예외 처리 함수가 모두 포함되어 있다.
Observable을 생성할 때는 인스턴스를 직접 만들지 않고 정적 팩토리 함수를 호출한다.
팩토리 함수 | 함수 |
---|---|
RxJava 1.x 기본 팩토리 함수 | create(), just(), from() |
RxJava 2.x 추가 팩토리 함수 | fromArray(), fromIterable(), fromCallable(), fromFuture(), fromPublisher() |
기타 팩토리 함수 | interval(), range(), timer(), defer() 등 |
1. Just()함수
just함수는 인자로 넣은 데이터를 차례로 발행하며, 한개의 값을 넣을수도 있고, 타입이 같은 여러개의 값을 넣을 수도 있습니다. 중앙의 원은 Observable에서 발행하는 데이터로 just()함수를 거치면 입력한 원을 그대로 발행한다. 파이프(|) 표시는 모든 데이터의 발행이 완료 되었음을 의미한다.
인자가 N개인 경우, 다음과 같이 6개의 원을 1개씩 발행하며 모두 발행한 뒤 완료(|)한다.
Observable.just("A", "B", "C").subscribe { print(it.toString()) }
결과 : ABC
2. subscribe함수와 Disposable 객체
RxJava에서는 내가 하고 싶은 내용을 먼저 정의하고, 그 내용을 실행하는 시점을 조절할 수 있다.
이게 바로 subscribe이다. Observable의 just() 등의 다양한 함수로 데이터 흐름을 정의한 후 subscribe()함수를 호출해야 실제로 데이터를 발행한다.
subscribe의 리턴형은 Disposable 인터페이스 객체이다.
dispose()는 Observable에게 데이터를 더 이상 발행하지 않도록 구독을 해지하는 함수이다.
onComplete 콜백이 이미 들어왔다면 별도로 dispose를 호출할 필요는 없다.
3. create() 함수
just()와 다르게 onNext, onComplete, onError를 개발자가 직접 호출해야한다.
val src = Observable.create<String> { it.onNext("A") it.onNext("B") it.onNext("C") } src.subscribe { t: String? -> print(t) }
결과 :ABC
Note: RxJava가 익숙하지 않은 개발자라면 create함수를 쓰는것을 추천하지 않습니다.
- Observable은 dispose되었을때 등록된 콜백을 모두 해제 하지 않으면 메모리 누수가 일어난다.
- 구독자가 구독하는 동안에만 onNext와 onComplete를 처리해야한다.
- 에러가 발생했을 때는 오직 onError 이벤트로만 에러를 전달해야한다.
- back pressure를 직접 처리해야한다.
4. From() 함수
From함수를 이용한다면 Observable을 쓰는경우 좀더 편하게 데이터들을 Observable객체로 바꿔 표현할 수있습니다.
RxJava와 RxKotlin이 지원하는 From계열의 함수
RxJava 2.x | fromArray, fromCallable, fromFuture, fromIterable, fromPublisher |
---|---|
RxKotlin | asObservable, from |
4-1. fromArray() 함수
데이터가 배열일경우 fromArray를 이용하여 처리할 수 있습니다.
Java
String[] arr = {"A", "B", "C"}; Observable<String> source = Observable.fromArray(arr); source.subscribe(System.out::println);
Kotlin
val array: Array<String> = arrayOf("A", "B", "C") val src: Observable<String> = array.toObservable() src.subscribe { System.out.println(it) }
kotlin에서는 fromArray를 할 필요 없이 배열을 Observable형태로 변환해주는 toObservable()이 있기때문에 비교해서 보기 위해 넣었습니다
(::) 이중콜론 연산자는 java8에서만 되므로
app레벨의 build.gradle에 다음과 같은 내용을 추가해주어야 합니다.
android { ... compileOptions { targetCompatibility 1.8 sourceCompatibility 1.8 } }
Note: Java에서 int[] 배열은 에러가 난다면?
Integer[] arr1 = {1, 2, 3};//Integer형 배열 Observable<Integer> source1 = Observable.fromArray(arr1);//문제없음 source1.subscribe(System.out::print);//결과 123 int[] arr2 = {1, 2, 3};//int형 배열 Observable<Integer> source2 = Observable.fromArray(arr2);//error source2.subscribe(System.out::print);
실제로 Integer 타입으로 코딩을 하지는 않는편입니다.
하지만 int[] 은 fromArray인자에 적합하지 않다고 나옵니다.
그러므로 int[]을 Integer[]로 바꿔줘야합니다.
private static Integer[] toIntegerArray(int[] intArray){ return IntStream.of(intArray).boxed().toArray(Integer[]::new); }
위함수는 Java8 으로 작성된 메소드 입니다.
이제 실제 적용하는 코드를 보겠습니다.
int[] arr2 = {1, 2, 3}; Observable<Integer> source2 = Observable.fromArray(toIntegerArray(arr2)); //int[] -> Integer[] everything is fine! source2.subscribe(System.out::print); // 결과 123
4-2. fromIterable() 함수
Iterator 인터페이스를 구현한 대표적인 클래스 ArrayList, Stack, LinkedList, HashSet 등에서 Observable을 만드는 방법입니다.
val friends = ArrayList<String>().apply{ add("Charles") add("Runa") add("Nick") } Observable.fromIterable(friends).subscribe { println(it) }
결과:
Charles
Runa
Nick
Note: Map은 배열도 아니고 Iterable 인터페이스를 구현하지 않았으므로 Map객체를 from함수로 Observable을 만드는것은 불가능합니다.
4-3. fromCallable() 함수
val callable = Callable<String>{ println("I am gonna sleep for 5 seconds") Thread.sleep(5000) "Hello world" } Observable.fromCallable(callable).subscribe { println(it) }
Callable인터페이스에는 비동기 실행 후 결과를 반환하는 call()가 있습니다.
Runnable의 run()과는 다르게 실행 결과를 리턴한다는 점에서 차이가 있습니다.
결과 :
I am gonna sleep for 5 seconds
Hello world
5초동안 쓰레드를 잠재운다는 메시지를 띄운뒤 5초후에
subscribe를 통해 Hello World를 출력하는 메시지를 볼수 있습니다.
4-4. fromFuture() 함수
Future 인터페이스 역시 비동기 계산의 결과를 구할 때 사용합니다. 보통 Executor 인터페이스를 구현한 클래스에 Callable 객체를 인자로 넣어 Future객체를 반환합니다. get() 메서드를 호출하면 Callable 객체에서 구현한 계산 결과가 나올 때까지 쓰레드가 블로킹 됩니다.
val future = Executors.newSingleThreadExecutor().submit(Callable { Thread.sleep(5000) "hello" }) Observable.fromFuture(future).subscribe { println(it) }
결과:
hello
Note:RxJava는 위와 같은 Excutors 클래스보다는 Rxjava에서 제공하는 스케줄러를 활용하길 권장하고 있습니다.
4-5. fromPublisher() 함수
Publisher는 자바 9의 표준인 Flow API의 한부분입니다.
Publisher 인터페이스의 패키지명은 org.reactivestreams인데 Observable은 io.reactivex입니다.
publisher객체는 onNext()와 onComplete() 호출이 가능합니다.
val publisher: Publisher<String> = Publisher { s: Subscriber<in String>? -> s?.onNext("Hello Publisher") s?.onComplete() } Observable.fromPublisher(publisher).subscribe{ println(it) }
결과:
Hello Publisher
Single 클래스
Observable클래스는 데이터를 1개이상 발행할 수 있지만 Single클래스는 이름 그대로 1개의 데이터만 발행하도록 한정합니다.
서버 API를 호출할 때 유용합니다.
Single의 가장 큰 특징은 데이터가 발행과 동시에 종료(onSuccess) 된다는 점입니다.
(어찌 보면 당연한것일수도 있지만..데이터를 하나만 발행하니까…)
그래서 Single클래스의 라이프 사이클은 onSuccess와 onError 함수로 구성됩니다.
Single.just("Hello World").subscribe { t1: String?, t2: Throwable? -> println("onSucess:${t1}") println("onError:${t2.toString()}") }
결과:
onSucess:Hello World
onError:null
Single은 Observable의 특수한 형태이므로 Observable을 Single 클래스에서 이용도 가능합니다.
아래의 예제들을 확인해 봅시다.
- Observable to Single 1
val source1 = Observable.just("First") Single.fromObservable(source1).subscribe { success: String?, error: Throwable? -> println(success) println(error) }
결과 :
First
null
- Observable to Single 2
val source2 = Observable.just("First","Second","Third") Single.fromObservable(source2).subscribe { success: String?, error: Throwable? -> println(success) println(error) }
결과:
null
java.lang.IllegalArgumentException: Sequence contains more than one element!
Observable객체를 활용하여 Single객체로서 데이터를 발행하는 예제입니다.
기존 Observable에서 첫번째 값을 발행하는데 성공을 한다면 onSuccess를 호출한 후 종료합니다. Observable 객체에 두개이상의 데이터가 들어가있는 경우 Error가 발생합니다.
- Calling single() to Make an object of Single
Observable.just("HelloWorld").single("default").subscribe { t: String? -> println(t) }
결과:
HelloWorld
Observable객체로부터 single()함수를 호출하고 기본값 “default’를 가집니다. Observable에서 값이 발행되지 않을 때도 인자로 넣은 default값을 대신 발행합니다.
- Pick first from observable
arrayOf("A", "B", "C").toObservable() .first("default") .subscribe { t1, error -> println(t1) println(error) }
결과:
A
null
배열을 갖는 Observable로 부터 first()함수를 호출하여 첫번째를 인자를 갖는 Single객체로 변환한뒤 출력합니다.
- Print default value from Single
Observable.empty<String>().single("default").subscribe { t1, error -> println(t1) println(error) }
결과:
default
null
empty()함수를 통해 비어있는 Single객체를 만들게 됩니다. 기본값을 “default”로 지정했으므로 default가 출력되는것을 확인할 수 있습니다.
- Take one to make an object of Single
Observable.just("First", "Second", "Third") .take(1) .single("default") .subscribe { t1, error -> println(t1) }
Observable로 부터 하나의 값을 take하여 single로 만든후 데이터를 발행하는 예제 입니다.
Maybe 클래스
Observable의 또 다른 형태입니다. Single처럼 하나의 데이터를 가질 수 있지만 데이터 발행 없이 데이터 발생을 완료할 수도 아닐수도 있습니다(그래서 메이비(아마도)입니다) .
√ : onSuccess 이벤트
X : onError 이벤트
| : onComplete 이벤트
Maybe 객체는 직접 생성자를 통해 만들어 질수도 있지만 보통 Observable의 연산자로부터 생성하는 경우가 더 많습니다.
Note:Maybe 객체를 생성할 수 있는 연산자
elementAt(), firstElement(), flatMapMaybe(), lastElement(), reduce(), singleElement()
Hot Observable vs Cold Ovservable
Hot Observable은 구독자의 존재와 관계 없이 데이터를 발행합니다. 따라서 여러 구독자를 고려할 수 있습니다. Hot Observable은 구독을 한 시점으로부터 데이터를 받으며, 구독자로서는 Observable에서 발행하는 데이터를 처음부터 수신한다고 보장할 수가 없습니다. (Cold Observable은 준비된 데이터를 처음부터 발행함)
Cold Observable은 just(), fromIterable()함수를 하더라도 subscribe함수를 호출 하지 않으면 데이터를 발행하지 않는 Lazy loading입니다. 보통 서버 API 호출, DB 쿼리, 파일 읽기 등에서 쓰입니다. 위에서 다룬 Observable은 모두 Cold Observable이였습니다.
Cold Observable의 예를 들었으니 HotObservable의 예도 들어보자면 키보드, 마우스 이벤트, 시스템이벤트, 센서값 이벤트 등이 있습니다.
Hot Observable을 처리할 때는 Back Pressure(배압)을 고려해야 하며 발행자와 구독자의 처리 속도가 차이가 클 때 발생합니다.
Cold Observable을 Hot Observable로 변환하는 방법은 Subject를 만들거나 Connectableobservable을 이용하는것입니다.
Subject 클래스
Subject 클래스 특성은 Observable의 속성과 구독자의 속성이 모두 있다는 점입니다.
AsyncSubject, BehaviorSubject, PublishSubject, ReplaySubject 등이 있습니다.
1. AsyncSubject 클래스
AsyncSubject 클래스는 Observable에서 발행한 마지막 데이터를 얻어올 수 있는 Subject클래스 입니다. 완료되기 전 마지막 데이터에만 관심이 있으며 이전 데이터는 무시합니다.
마블 다이어그램을 하나씩 뜯어 보도록 하겠습니다.
아래 구독자(subscribe)의 타임라인이 여러개(3개)인것이 보이네요. 흐름에 대한 순서는 이렇습니다
(1) 맨 처음 subscribe() 호출
(2) 빨강, 연두를 발행이 완료되고, 파랑이 발행되기전에 두번째 구독자가 subscribe()를 합니다.
(3) 첫번째, 두번째 구독자 모두 발행된 파랑을 받고 onComplete 이벤트를 받습니다.
val subject = AsyncSubject.create<String>() subject.subscribe{ color: String? -> println("subscriber1:${color}") } subject.onNext("Red") subject.onNext("Green") subject.subscribe{ color: String? -> println("subscriber2:${color}") } subject.onNext("Blue") subject.onComplete()
결과:
subscriber1:Blue
subscriber2:Blue
마지막으로 입력된 데이터가 구독자에게 최종 전달됩니다.
AsyncSubject는 구독자로도 동작 가능합니다.
val subject = AsyncSubject.create<String>() subject.subscribe{ println(it) } Observable.just("Red","Green","Blue") .subscribe(subject)
결과:
Blue
AsyncSubject에서 onComplete() 함수를 호출 한 후에도 구독을 할때도 살펴보겠습니다.
val subject = AsyncSubject.create<String>() subject.subscribe{ color: String? -> println("subscriber1:${color}") } subject.onNext("Red") subject.onNext("Green") subject.onComplete() subject.subscribe{ color: String? -> println("subscriber2:${color}") } subject.onNext("Blue") subject.subscribe{ color: String? -> println("subscriber3:${color}") }
결과 :
subscriber1:Green
subscriber2:Green
subscriber3:Green
발행을 완료(onComplete)한 이후의 onNext는 무시되는것을 확인 할 수 있습니다.
2. BehaviorSubject 클래스
구독을 하면 가장 최근 값 or 기본값을 넘겨주는 클래스
핑크는 초깃값이며 첫번째 subscribe에서는 초기값을 받고 빨강, 연두를 받습니다. 이어 두번째 subscribe가 일어나고 가장 최근값인 연두를 가져옵니다. 이후 두 구독자 모두 파랑을 같이 받습니다.
val subject = BehaviorSubject.createDefault("Pink") subject.subscribe { println("subscriber1:$it") } subject.onNext("Red") subject.onNext("Green") subject.subscribe{ println("subscriber2:$it") } subject.onNext("Blue") subject.onComplete()
결과:
subscriber1:Pink
subscriber1:Red
subscriber1:Green
subscriber2:Green
subscriber1:Blue
subscriber2:Blue
3. PublishSubject 클래스
가장 평범한 Subject 클래스. Subscribe() 시 호출값을 발행하기 시작합니다.
마지막 값만 발행하거나 기본값을 발행하지도 않고 딱 해당 시간에 발생한 데이터를 전달받습니다.
val subject = PublishSubject.create<String>() subject.subscribe{ println("subscriber1:${it}") } subject.onNext("Red") subject.onNext("Green") subject.subscribe{ println("subscriber2:${it}") } subject.onNext("Blue") subject.onComplete()
결과:
subscriber1:Red
subscriber1:Green
subscriber1:Blue
subscriber2:Blue
4. ReplaySubject 클래스
Subject의 클래스의 목적은 Hot Observable을 활용하는 것인데 Cold Observable 처럼 동작합니다.
새로운 구독자가 생기면 항상 데이터의 처음부터 끝까지 발행하는것을 보장합니다.
Note:모든 데이터를 저장해두는 과정에서 메모리 누수가 발생할 수 있습니다.
val subject = ReplaySubject.create<String>() subject.subscribe { println("subscriber1:$it") } subject.onNext("Red") subject.onNext("Green") subject.subscribe{ println("subscriber2:$it") } subject.onNext("Blue") subject.onComplete()
결과:
subscriber1:Red
subscriber1:Green
subscriber2:Red
subscriber2:Green
subscriber1:Blue
subscriber2:Blue
ConnectableObservable 클래스
Subject클래스처럼 Cold Observable을 Hot Observable로 변경합니다.
Observable을 여러 구독자에게 공유할 수 있으므로 데이터 하나를 여러 구독자에게 동시에 전달할 때 사용합니다. 특이한 점은 subscribe()를 호출해도 아무런 일이 일어나지 않고, connect() 함수를 호출한 시점부터 구독자에게 데이터를 발행합니다.
객체 생성을 위해서 Observable의 publish() 를 호출합니다. connect()함수를 호출하기 전까지 데이터 발행을 지연시킵니다.
val array = arrayOf("Red", "Green", "Blue") val colors = Observable.interval(100L, TimeUnit.MILLISECONDS) .map { t: Long -> array[t.toInt()] } .take(array.size.toLong()) val source: ConnectableObservable<String> = colors.publish() source.subscribe { println("subscribe1:${it}") } source.subscribe { println("subscribe2:${it}") } source.connect() Thread.sleep(250) source.subscribe { println("subscribe3:${it}") } Thread.sleep(100)
결과 :
subscribe1:Red
subscribe2:Red
subscribe1:Green
subscribe2:Green
subscribe1:Blue
subscribe2:Blue
subscribe3:Blue
위의 interval()함수는 100ms 단위로 데이터를 발행하는것을 의미합니다.
그러므로 connect() 한 뒤 250ms동안 쓰레드를 재우면 2개의 데이터가 발행 됩니다.
3번째 데이터인 Blue가 발행되기직전(300ms이전)에 또 다시 subscribe() 한 뒤 100ms동안 쓰레드를 또다시 재운다면 3번째 구독자는 Blue를 받을 수 있습니다.
4개의 댓글
홍정아 · 2019년 8월 16일 9:07 오후
진짜 rxjava 배울 곳이 마땅찮아서,, 유튜브에도 강의들도 별로고 udemy 들으려고 했던 찰나에
이런 좋은 문서를 발견해서 많이 공부하고 갑니다!!!!
Charlezz · 2019년 11월 15일 8:42 오전
감사합니다
Chu · 2019년 11월 15일 8:30 오전
찰스님!!! 공부하고 있다가… ReplaySubject 클래스 부분에서 귀여운 오타가 발생했습니다!
Subject의 클래서의 목적 -> 클래스!
감사합니당 🙂 😛
Charlezz · 2019년 11월 15일 8:42 오전
수정했습니다, 감사합니다!