Reactive Programing(1) – 리액티브 프로그래밍 개념잡기
Reactive Programing(2) – Reactive Operator
Reactive Programing(3) – Reactive Operator
Reactive Programing(4) – Scheduler
Reactive Programing(5) – 안드로이드에서의 RxJava 활용


Scheduler

스케줄러 개념 익히기

지금까지의 예제들은 대부분 MainThread에서 동작 했습니다. 이를 비동기로 동작하게 하기 위해 다른 쓰레드를 생성하거나 이용하는것이 바로 스케쥴러입니다.
예제를 살펴보도록 하겠습니다.

    Observable.just(1, 2, 3, 4, 5)
            .doOnNext { println("doOnNext:$it") }
            .subscribeOn(Schedulers.newThread())
//          .observeOn(Schedulers.newThread()) //주석을 풀면?
            .subscribe {
               println("$it")
            }
      Thread.sleep(1000)

결과:
doOnNext:1
1
doOnNext:2
2
doOnNext:3
3
doOnNext:4
4
doOnNext:5
5
 
주석을 풀었을 때

Observable.just(1, 2, 3, 4, 5)
      .doOnNext { println("doOnNext:$it") }
      .subscribeOn(Schedulers.newThread())
      .observeOn(Schedulers.newThread())
      .subscribe {
         println("$it")
      }
Thread.sleep(1000)

결과:
doOnNext:1
doOnNext:2
doOnNext:3
doOnNext:4
doOnNext:5
1
2
3
4
5
 
doOnNext는 onNext이벤트가 발생하면 실행되고 데이터값을 확인할 수 있습니다.
subscribeOn과 observeOn은 각각 구독과 발행시 사용하는 스레드를 지정할 수 있습니다.
Schdulers.newThread()는 새로운 쓰레드를 생성합니다.
데이터를 발행하는 스레드와 구독하는 스레드가 다르기 때문에 위의 예제처럼 데이터의 흐름이 바뀌는것을 확인 할 수 있습니다.
 

스케줄러의 종류

RxJava 2.x fromArray, fromCallable, fromFuture, fromIterable, fromPublisher
RxKotlin asObservable, from


 

1. newThread() 스케쥴러

 

       val array = arrayOf("1", "2", "3")
      array.toObservable()
            .doOnNext { println("doOnNext:$it") }
            .map { "($it)" }
//          .subscribeOn(Schedulers.newThread())
            .subscribe{
               println(it)
               Thread.sleep(100)
            }
      array.toObservable()
            .doOnNext { println("doOnNext:$it") }
            .map { "<$it>" }
//          .subscribeOn(Schedulers.newThread())
            .subscribe{
               println(it)
               Thread.sleep(100)
            }
      Thread.sleep(1000)

 
결과:
doOnNext:1
(1)
doOnNext:2
(2)
doOnNext:3
(3)
doOnNext:1
<1>
doOnNext:2
<2>
doOnNext:3
<3>
 
얼핏 보기에 새로운 스레드를 지정하나 안하나 결과가 똑같아 보입니다.
여기서 Thread.sleep을 빼보도록 하겠습니다.
 

val array = arrayOf("1", "2", "3")
      array.toObservable()
            .doOnNext { println("doOnNext:$it") }
            .map { "($it)" }
            .subscribeOn(Schedulers.newThread())
            .subscribe{
               println(it)
               Thread.sleep(100)
            }
      array.toObservable()
            .doOnNext { println("doOnNext:$it") }
            .map { "<$it>" }
            .subscribeOn(Schedulers.newThread())
            .subscribe{
               println(it)
               Thread.sleep(100)
            }
      Thread.sleep(1000)

 
결과:
doOnNext:1
doOnNext:1
<1>
(1)
doOnNext:2
doOnNext:2
<2>
(2)
doOnNext:3
doOnNext:3
<3>
(3)
같은 Observable이지만 서로다른 스레드로 구독을 하고 있기때문에 비동기적으로 처리를 하게 됩니다.
하지만 이는 추천하는 방법이 아니며 특수한 상황에서만 사용하시기 바랍니다.
다른 활용도가 좋은 스케쥴러 사용을 권장하고 있습니다.

2. computation() 스케쥴러

빠르게 계산하여 결과를 도출하는데 집중하는 스케줄러입니다. 계산 스케줄러는 내부적으로 스레드 풀을 생성하여 프로세서 개수만큼 스레드를 만들어 처리합니다.
스레드가 코어를 모두 점유하고 있는 경우에는 대기열에 들어갑니다.
 

val array = arrayOf("1", "2", "3")
val source = array.toObservable()
      .zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), BiFunction { number: String, _: Long ->
         number
      })
source.map { "<${it}>" }
      .subscribeOn(Schedulers.computation())
      .subscribe { println("${Thread.currentThread().name}:$it") }
source.map { "(${it})" }
      .subscribeOn(Schedulers.computation())
      .subscribe { println("${Thread.currentThread().name}:$it") }
Thread.sleep(1000)

결과:
RxComputationThreadPool-3:<1>
RxComputationThreadPool-4:(1)
RxComputationThreadPool-4:(2)
RxComputationThreadPool-3:<2>
RxComputationThreadPool-3:<3>
RxComputationThreadPool-4:(3)
 
간혹 첫번째 구독이 빠르게 끝나 아래와같이 동일한 스레드가 처리하는 경우도 있습니다.
RxComputationThreadPool-3:<1>
RxComputationThreadPool-3:(1)
RxComputationThreadPool-3:<2>
RxComputationThreadPool-3:(2)
RxComputationThreadPool-3:<3>
RxComputationThreadPool-3:(3)
 
사실 interval()함수가 이미 Computation스레드를 쓰고 있어 따로 스케줄러를 지정하지 않아도 결과는 같습니다.
3. Trampoline 스케줄러
새로운 스레드를 생성하지 않고 무한한 크기의 대기 Queue를 생성합니다.
 

val array = arrayOf("1", "2", "3")
val source = array.toObservable()
source.map { "<${it}>" }
      .subscribeOn(Schedulers.trampoline())
      .subscribe { println("${Thread.currentThread().name}:$it") }
source.map { "(${it})" }
      .subscribeOn(Schedulers.trampoline())
      .subscribe { println("${Thread.currentThread().name}:$it") }

결과:
main:<1>
main:<2>
main:<3>
main:(1)
main:(2)
main:(3)
 
스레드를 생성하지 않고 MainThread에서 모두 처리 합니다. 큐에 넣은 작업을 순차적으로 처리하므로 실행순서가 바뀌지도 않습니다.
 

4. SingleThread 스케줄러

내부에서 단일 스레드를 생성하여 구독을 처리합니다. 여러번 구독이 있어도 공통된 스레드를 사용합니다.

Observable.range(0,5)
      .subscribeOn(Schedulers.single())
      .subscribe { println("${Thread.currentThread().name}:$it") }
Observable.range(1000,5)
      .subscribeOn(Schedulers.single())
      .subscribe { println("${Thread.currentThread().name}:$it") }
Thread.sleep(1000)

결과:
RxSingleScheduler-1:0
RxSingleScheduler-1:1
RxSingleScheduler-1:2
RxSingleScheduler-1:3
RxSingleScheduler-1:4
RxSingleScheduler-1:1000
RxSingleScheduler-1:1001
RxSingleScheduler-1:1002
RxSingleScheduler-1:1003
RxSingleScheduler-1:1004
 
trampoline과 비교하면 메인스레드가 아닌 스레드에서 그리고 단일 스레드에서 구독을 처리한다는 사실을 알 수 있습니다.
 

5. Executor 변환 스케줄러

기존에 사용하던 Java Executor 클래스를 재사용할 때만 한정적쓸 수 있지만 동작방식이 다르기때문에 추천하지는 않습니다.

val list = arrayOf(1, 2, 3)
val executor = Executors.newFixedThreadPool(10)
list.toObservable().subscribeOn(Schedulers.from(executor))
      .subscribe { println(it) }
list.toObservable().subscribeOn(Schedulers.from(executor))
      .subscribe { println(it) }

결과:
1
1
2
3
2
3
executor는 10개의 스레드 풀을 생성하고 첫번째와 두번째 observable에서 변환스케줄러를 지정했습니다.

스케줄러를 활용한 콜백 탈출

이번 예제는 RxJava를 활용하여 어떤식으로 콜백을 다루는지에 대해서 알아보겠습니다.

연속적인 네트워크 요청

네트워크의 경우 OkHttp3를 이용하였습니다.
참조 링크 : http://square.github.io/okhttp/

  • 일반적인 경우
    val url1 = "https://raw.githubusercontent.com/Charlezz/RxJavaStudy/master/Sample/first.txt"
    val url2 = "https://raw.githubusercontent.com/Charlezz/RxJavaStudy/master/Sample/second.txt"
    val client = OkHttpClient()
    val request1 = Request.Builder().url(url1).build()
    val request2 = Request.Builder().url(url2).build()
    client.newCall(request1).enqueue(object : Callback {
        override fun onFailure(call: Call?, e: IOException?) {
            println(e?.message)
        }
        override fun onResponse(call: Call?, response: Response?) {
            println(response?.body()?.string())
            client.newCall(request2).enqueue(object : Callback {
                override fun onFailure(call: Call?, e: IOException?) {
                    println(e?.message)
                }
                override fun onResponse(call: Call?, response: Response?) {
                    println(response?.body()?.string())
                }
            })
        }
    })

    onResponse 콜백내에서 또 다른 call을 요청하고 그에 따른 또 다른 콜백을 볼 수 있습니다.
    문제는 없어보입니다. RxJava의 경우 어떻게 해결하는지 보도록 하겠습니다

 

  • RxJava의 경우미리 get 호출용 메소드를 만들었습니다.
    fun get(url: String): String? {
        val client = OkHttpClient()
        val request = Request.Builder()
                .url(url)
                .build()
        return try {
            client.newCall(request).execute().body()?.string()
        } catch (e: Exception) {
            e.message
        }
    }

    concat함수를 이용하여 두개의 Observable이 데이터를 순차적으로 발행하게 했습니다.

    val url1 = "https://raw.githubusercontent.com/Charlezz/RxJavaStudy/master/Sample/first.txt"
    val url2 = "https://raw.githubusercontent.com/Charlezz/RxJavaStudy/master/Sample/second.txt"
    val src1 = Observable.just(url1).subscribeOn(Schedulers.io()).map(::get)
    val src2 = Observable.just(url2).subscribeOn(Schedulers.io()).map(::get)
    val start = System.currentTimeMillis()
    Observable.concat(src1, src2)
            .subscribe {
                println(it)
            }

    일반적인 예제와 RxJava의 차이가 느껴지시나요?
    가독성 측면이나 스레드 관리적인 측면에서 장점이 있다는 것을 알 수 있습니다.
    두개의 url을 Observable객체로 만들어 io스케줄러를 활용하여 get메소드를 호출 하게금 했습니다.
    사실 위의 코드도 좀 아쉬운점이 있습니다. 첫번째 call을 하고 응답이 들어와야 그 다음 call을 할 수 있다는 것인데,
    동시에 call을 하고 둘다 응답이 들어올때까지 기다릴순 없을까요??
    zip()함수를 이용하면 가능합니다.

  • 동시 호출
    val url1 = "https://raw.githubusercontent.com/Charlezz/RxJavaStudy/master/Sample/first.txt"
    val url2 = "https://raw.githubusercontent.com/Charlezz/RxJavaStudy/master/Sample/second.txt"
    val src1 = Observable.just(url1).map(::get)
    val src2 = Observable.just(url2).map(::get)
    Observables.zip(src1, src2).subscribe {
        println(it.first)
        println(it.second)
    }

    첫번째 응답을 기다리지 않고 idle타임 없이 동시에 네트워크 요청을 하므로 성능향상을 기대할 수 있습니다.

실제 위예제를 활용하여 concat과 zip의 소요된 실행시간의 차이를 확인한 결과 950ms, 295ms로 3배 가까이 성능차이가 나는것을 확인할 수 있었습니다.

 

카테고리: RxJava

0개의 댓글

답글 남기기

Avatar placeholder

이메일은 공개되지 않습니다. 필수 입력창은 * 로 표시되어 있습니다.