Reactive Programing(1) – 리액티브 프로그래밍 개념잡기
Reactive Programing(2) – Reactive Operator
Reactive Programing(3) – Reactive Operator
Reactive Programing(4) – Scheduler
Reactive Programing(5) – 안드로이드에서의 RxJava 활용
Reactive Operators(리액티브 연산자)
스케쥴러 | 설명 |
---|---|
newThread() | 새로운 스레드 생성 |
single() | 단일 스레드 생성 후 사용 |
computation() | 계산용 스레드 |
io() | 네트워크, 파일 입출력 스레드 |
trampoline() | 현제 스레드에 대기행렬 생성 |
지난 포스팅에서 생성연산자와, 변환연산자에 대해서 알아보았습니다.
이번 포스팅에서는 결합연산자, 조건연산자에 대해서 알아보겠습니다.
결합연산자
1. zip() 함수
2개 이상의 Observable을 결합하여 데이터를 발행합니다.
val shapes = listOf("Circle", "Pentagon", "Star") val colors = listOf("yellow", "Pink", "Cyan") Observable.zip( shapes.toObservable(), colors.toObservable(), BiFunction<String, String, String> { t1, t2 -> "$t1 $t2" } ).subscribe { println(it) }
결과:
Circle yellow
Pentagon Pink
Star Cyan
도형 데이터와 색깔 데이터 두 셋트를 준비합니다.
zip()으로 Observable을 하나 생성하고 인자로 도형, 색깔 데이터를 Observable형태로 넣습니다.
두 Observable에서 각각 데이터가 발행될때 처리할 수 있는 BiFunction 콜백 인터페이스를 구현합니다.
다음과 같이 코드를 조금 변형해보았습니다.
val shapes = listOf("Circle", "Pentagon", "Star", "Hello") val colors = listOf("yellow", "Pink", "Cyan") Observable.zip( shapes.toObservable(), Observable.interval(0, 500L, TimeUnit.MILLISECONDS).take(colors.size.toLong()).map { colors[it.toInt()] }, BiFunction<String, String, String> { t1, t2 -> "$t1 $t2" } ).subscribe { println(it) } Thread.sleep(2000)
도형 리스트에 ‘Hello’라는 데이터가 추가되었습니다. 아까는 도형 3개 색깔 3개로 비율이 1:1로 맞았으나 어느 한 Observable의 데이터가 더 많은 경우는 어떻게 될까요?
그리고 한가지 더,
데이터가 발행되는 시기를 다르게 하기 위해 interval로 색깔을 500ms 시간차를 두고 발행했습니다.
결과:
Circle yellow
Pentagon Pink
Star Cyan
실행해본다면 100ms 마다 이벤트를 처리하는것을 확인하실 수 있습니다. 그리고 어느 한 Observable이 데이터를 발행하지 않는경우 이벤트가 처리되지 않는것도 확인할 수 있습니다.
*zipWith()함수
zip함수와 동일 하지만 생성 연산자가 아니라 Observable객체가 가지고 있는 연산자 입니다. Observable을 다양한 함수와 조합하면서 중간중간 호출할 수 있는 장점이 있습니다
Observable.zip( Observable.just(100, 200, 300), Observable.just(10, 20, 30), BiFunction<Int, Int, Int> { t1, t2 -> t1 + t2 } ).zipWith(Observable.just(1, 2, 3), BiFunction { t1: Int, t2: Int -> t1 + t2 }) .subscribe { println(it) }
결과:
111
222
333
2. combineLatest() 함수
zip과 비슷하지만 다른점은 두개이상의 Observable객체를 합성할 때 서로다른 Observable끼리 데이터가 발행하는것을 기다렸다가 합성하는게 아니라 가장 최근에 발행된 데이터끼리 합성합니다.
즉, Observable은 아래의 다이어그램의 타임라인처럼 독립적으로 데이터를 발행하고, 발행했을때의 다른 Observable의 가장 최근값을 가져와 합성합니다.
val colors = listOf("Pink", "Orange", "Cyan", "Yellow") val shapes = listOf("Diamond", "Star", "Pentagon") val source = Observable.combineLatest( colors.toObservable().zipWith(Observable.interval(0L, 100L, TimeUnit.MILLISECONDS), BiFunction { color: String, _: Long -> color }), shapes.toObservable().zipWith(Observable.interval(50L,200L, TimeUnit.MILLISECONDS),BiFunction { shape: String, _: Long -> shape }), BiFunction { color: String, shape: String -> "$color $shape" } ) source.subscribe { println(it) } Thread.sleep(1000)
결과:
Pink Diamond
Orange Diamond
Cyan Diamond
Cyan Star
Yellow Star
Yellow Pentagon
3. merge() 함수
두개이상의 Observable에서 데이터 발행 순서여부에 상관없이 업스트림에서 먼저 입력되는 데이터를 그대로 받아 발행합니다.
val colorSet1 = listOf("Red", "Green") val colorSet2 = listOf("Yellow", "Cyan", "Pink") val colorSrc1 = Observable.zip( colorSet1.toObservable(), Observable.interval(0L, 200L, TimeUnit.MILLISECONDS).take(colorSet1.size.toLong()), BiFunction { color: String, _: Long -> color } ) val colorSrc2 = Observable.zip( colorSet2.toObservable(), Observable.interval(100L, 200L, TimeUnit.MILLISECONDS).take(colorSet2.size.toLong()), BiFunction { color: String, _: Long -> color } ) Observable.merge( colorSrc1, colorSrc2 ).subscribe { println(it) } Thread.sleep(2000)
결과:
Red
Yellow
Green
Cyan
Pink
4. concat() 함수
2개 이상의 Observable을 이어 붙이는 함수.
Note:첫번째 Observable의 onComplete 이벤트가 발생하지 않으면 메모리 누수가 발생합니다.
val colors1 = listOf("Red", "Green", "Blue") val colors2 = listOf("Yellow", "Blue", "Pink") Observable.concat(colors1.toObservable(), colors2.toObservable()) .subscribe { println(it) }
조건 연산자
조건 연산자는 Observable의 흐름을 제어합니다.
1. amb() 함수
가장 먼저 데이터를 발행하는 Observable을 택합니다. 나머지는 무시됩니다.
val colors1 = arrayOf("Red", "Green", "Blue") val colors2 = arrayOf("Yellow", "Cyan") Observable.amb( listOf(colors2.toObservable().delay(100L, TimeUnit.MILLISECONDS), colors1.toObservable()) ).subscribe { println(it) }
결과:
Red
Green
Blue
colors2 배열만 100ms 지연시켜서 발행한 결과,
colors1 이 채택되어 colors2는 무시되는것을 확인할 수 있습니다.
2. takeUntil() 함수
val colors = arrayOf("Red", "Yellow", "Green", "Cyan", "Blue", "Pink") colors.toObservable() .zipWith( Observable.interval(100L, TimeUnit.MILLISECONDS), BiFunction { t1: String, t2: Long -> t1 } ) .takeUntil(Observable.timer(550L, TimeUnit.MILLISECONDS)) .subscribe { println(it) } Thread.sleep(1000)
결과:
Red
Yellow
Green
Cyan
Blue
3. skipUntil() 함수
takeUntil과 반대로 Observable에서 데이터를 받을때까지 값을 건너 뜁니다.
val colors = arrayOf("Red", "Yellow", "Green", "Cyan", "Blue", "Pink") colors.toObservable() .zipWith( Observable.interval(100L, TimeUnit.MILLISECONDS), BiFunction { t1: String, t2: Long -> t1 } ) .skipUntil(Observable.timer(550L, TimeUnit.MILLISECONDS)) .subscribe { println(it) } Thread.sleep(1000)
결과:
Pink
4. all() 함수
모든 조건이 true일때만 데이터를 true를 발행합니다.
val colors = arrayOf("Red", "Yellow", "Green", "Cyan", "Blue", "Pink") colors.toObservable().all { t: String -> t == "Red" }.subscribe { t: Boolean? -> println(t) }
결과:
false
기타 연산자
1. delay() 함수
Observable의 데이터 발행을 지연시켜주는 역할을 합니다.
val startTime = System.currentTimeMillis() val colors = arrayOf("Red", "Orange", "Yellow") colors.toObservable() .delay(100L, TimeUnit.MILLISECONDS)//이 라인을 주석 후 실행하면 대략 100ms 만큼 빨라짐 .subscribe { println("$it - ${System.currentTimeMillis() - startTime}") } Thread.sleep(1000)
결과:
Red – 168
Orange – 168
Yellow – 168
2. timeInterval() 함수
어떤 값을 발행했을 때 이전 값을 발행한 후 얼마나 시간이 흘렀는지 알 수 있습니다.
val colors = arrayOf("Red", "Green", "Orange") colors.toObservable() .delay { Thread.sleep(Random().nextInt(1000).toLong()) Observable.just(it) } .timeInterval() .subscribe { println("$it") }
결과:
Timed[time=671, unit=MILLISECONDS, value=Red]
Timed[time=275, unit=MILLISECONDS, value=Green]
Timed[time=238, unit=MILLISECONDS, value=Orange]
1초내로Random하게 스레드를 sleep()하므로 실행할 때 마다 1초미만으로 time 값이 다르게 나오는 것을 확인 할 수 있습니다.
0개의 댓글