코루틴(Coroutines)이란?

코루틴은 비동기적으로 실행되는 코드를 간소화하기 위해 Android에서 사용할 수 있는 동시 실행 설계 패턴이다.

코루틴은 다음과 같은 특징을 갖는다.

  • 경량화
  • 메모리 누수 감소
  • 계층적 구조를 통한 취소 전달
  • Jetpack과의 통합

Suspend function

suspend(유예하다, 연기하다) vs block(막다, 차단하다)

blocking과 suspending의 차이에 대해서 먼저 알아보자.

BLOCKING : 함수B를 수행하기 위해서는 함수A가 먼저 끝나야 한다.

단일 쓰레드에서 네트워크 요청(FUNCTION A)과 같은 작업을 수행하다 보면 응답을 얻는데까지 긴 시간이 필요할 수 있다. 이 동안 해당 쓰레드는 블록상태(blocked)가 되고 다음 작업(FUNCTION B)은 네트워크 작업이 끝날 때까지 기다리게 된다. 그래서 일반적으로 다중 스레드를 사용하여 비동기적으로 작업을 수행한다.

SUSPENDING: 함수A가 시작 된 후 정지(suspended) 될 수 있으며, 함수B가 수행되고 끝난 뒤 함수A가 재개(resume)될 수 있다. 즉 이 쓰레드는 함수A에 의해 블락되지 않는다.

코루틴의 suspend 함수는 네트워크 요청(FUNCTION A)을 수행하고 쓰레드가 블록되는 대신 하던 작업을 정지(suspended) 시킨다. 그리고 다른 작업이 해당 쓰레드를 사용할 수 있도록 한다. 하나의 쓰레드에서 여러 작업을 수행할 수 있으니 쓰레드를 만들 필요가 없다. 이러한 이유로 코루틴을 경량화 된 쓰레드(Light-weighted Thread)라고 표현하기도 한다.

Suspend 함수는 시작(Started)되고 중단(paused) 될 수 있으며, 다시 재게(Resume)할 수 있는 함수를 의미 한다.

CoroutineContext

CoroutineContext가 가지고 있는 정보를 간단히 도식화한 이미지

CoroutineContext는 개념적으로는 map 또는 set과 유사하다. CoroutineContext는 Job, CoroutineName, CorotuineDispatcher 등과 같은 Element 인스턴스를 순서가 있는 Set(indexed set)으로 관리 한다. 특이한 점은 각 Element 또한 CoroutineContext 이기 때문에 컬렉션 내에 있는 모든 element들 또한 컬렉션이 된다. 그리고 모든 element에는 이를 식별하기 위한 고유한 Key를 가지고 있다. 이 부분이 구조화된 동시성(Structured Concurrency)을 구현하기 위한 기반 요소가 된다.

Dispatchers

Dispatcher를 사용하여 코루틴 실행에 사용되는 스레드를 지정할 수 있다. 기본적으로 제공하는 Dispatcher는 다음과 같다.

  • Default: CPU 코어 수에 비례하는 스레드 풀에서 수행한다. CPU 집약적인 작업에 적합하다.
  • IO: 일반적으로 IO 작업은 CPU에 영향을 미치지 않으므로, CPU 코어 수보다 훨씬 많은 스레드를 가지는 스레드 풀에서 수행한다(최대 64개). Default dispatcher와 스레드를 공유하기 때문에 Context switching으로 인한 오버헤드가 없다.
  • Unconfined: 해당 코루틴을 호출한 부모의 쓰레드에서 실행 또는 재게된다.
  • newSingleThreadContext: 새로운 쓰레드를 생성한다.

CoroutineScope

코루틴의 범위를 정의한다. launch, async 등과 같은 모든 코루틴 빌더는 CoroutineScope의 확장함수이며, 해당 CoroutineContext를 상속하여 모든 element와 cancellation(취소)를 자동으로 전파한다.

일반적으로 CoroutineScope의 직접적인 구현은 권장하지 않으며, 델리게이션에 의한 구현을 권장한다. 관례에 따라 Scope내의 Context에는 Cancellation의 전파를 포함한 구조화 된 동시성 원칙을 적용하기 위한 Job 인스턴스가 포함되어야 한다.

기본적인 코루틴 빌더의 사용법과 그 외에 다양한 경우에 사용되는 코루틴 빌더에 대해 알아보자.

기본적인 Coroutine builder 종류

  • launch
  • runBlocking
  • async

launch builder

현재 쓰레드를 블로킹하지 않고 새 코루틴을 시작하며, 이에 대한 참조를 Job 객체로 반환한다. 이 Job이 취소되면 코루틴이 취소 된다. CoroutineContext는 해당 CoroutineScope의 것을 상속하며, launch 함수의 매개변수로 추가적인 Context를 지정할 수도 있다. 해당 Context에 Dispatcher나 다른 ContinuationInterceptor가 없으면 Dispatchers.Default가 지정된다.

기본적으로 코루틴은 즉시 실행되도록 스케쥴링된다. 다른 시작 옵션은 start 매개변수를 통해 지정할 수 있으며, 선택적으로 CoroutineStart.Lazy로 설정하여 코루틴을 느리게 시작할 수 있다. 이 경우 코루틴 Job은 새로운 상태로 생성된다. start의 기본값은 CoroutineStart.DEFAULT 다.

이 코루틴의 catch 되지 않은 예외들은 기본적으로 해당 컨텍스트의 상위 작업을 취소한다. 이는 launch가 다른 코루틴의 컨텍스트와 함께 사용 될 때 포착되지 않은 예외가 상위 코루틴의 취소로 이어진다는 것을 의미 한다.

runBlocking builder

새 코루틴을 실행하고 완료될 때까지 현재 사용하고 있는 스레드의 인터럽트를 차단한다. 이 함수는 코루틴에서 사용하지 않는 것이 좋다. 이 함수는 단지 main 함수나 테스트에서 사용되기 위해 설계되었기 때문이다.

이 코루틴 빌더의 기본적인 CoroutineDispatcher는 이 코루틴이 완료될 때까지 차단된 스레드에서 Continuation을 처리하는 내부 이벤트 루프의 구현이다.

CoroutineDispatcher가 컨텍스트에서 명시적으로 지정되면 새 코루틴은 현재 스레드가 차단되는 동안 지정된 디스패처의 컨텍스트에서 실행된다. 이 차단된 스레드가 인터럽트 되면 코루틴 job이 취소되고 interruptedException이 발생한다.

async builder

코루틴을 만들고 Deffered의 구현으로 해당 결과를 반환한다. 실행 중인 코루틴은 deferred의 결과가 취소되면 취소된다. 결과 코루틴은 다른 언어 및 프레임워크의 유사한 기초 요소와 비교하여 중요한 차이점이 있다. 구조화 된 동시성 패러다임을 시행하지 못하면 상위 작업을 취소한다. 해당 동작을 변경하려면 SupervisorJob 또는 supervisorScope를 사용할 수 있다.

코루틴 컨텍스트는 CoroutineScope에서 상속되며 context 인자와 함께 추가적인 컨텍스트를 지정할 수 있다. 컨텍스트에 dispatcher나 다른 ContinuationInterceptor가 없으면 Dispatchers.Default가 사용된다. 부모 작업도 CoroutineScope에서 상속되지만 해당 컨텍스트 요소로 오버라이드 될 수 있다.

launch와 마찬가지로 기본적으로 코루틴은 즉시 실행되도록 스케쥴링되며, 시작옵션을 매개변수로 지정할 수 있다.

취소 및 타임아웃

launch 함수의 반환으로 얻은 Job에 대해 cancel() 함수를 호출하여 취소를 할 수 있다.

fun main() = runBlocking {
    val job1 = launch {
        delay(1000L)
        println("1")
    }

    val job2 = launch { println("2") }
    val job3 = launch { println("3") }
    
    delay(500L)
    job1.cancel()
    job2.cancel()
}
//실행 결과
// 2
// 3

코루틴 내부에서 작업이 취소 되었는지 확인하는 방법은 isActive 플래그를 활용하는 것이다.

val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
    var nextPrintTime = startTime
    while (isActive) { // 취소 여부 확인
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("job: I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
    }
}
delay(1300L) // 작업이 끝날 때 까지 조금 기다림
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 작업을 취소하고 완료될 때까지 기다린다.
println("main: Now I can quit.")

// 실행결과 
// job: I'm sleeping 0 ...
// job: I'm sleeping 1 ...
// job: I'm sleeping 2 ...
// main: I'm tired of waiting!
// main: Now I can quit.

suspend 함수들은 JobCancellationException이 발생하기 때문에 이를 try catch 블록으로 예외처리 하는 방법도 있다.

어떤 코드들은 취소가 불가해야한다. 그럴 때는 withContext(NonCancellable)를 사용할 수 있다.

fun main() = runBlocking {
    val job1 = launch {
        withContext(NonCancellable) {
            delay(1000)
            println("I survived!")
        }
        delay(1000)
        print("job1: end")
    }

    delay(500L)
    job1.cancel()
}

// 실행결과
// I survived!

일정 시간이 지난 다음 종료하고 싶다면 withTimeout을 사용할 수 있다.

suspend fun doCount() = coroutineScope {
    val job1 = launch(Dispatchers.Default) {
        var i = 1
        var nextTime = System.currentTimeMillis() + 100L

        while (i <= 10 && isActive) {
            val currentTime = System.currentTimeMillis()
            if (currentTime >= nextTime) {
                println(i)
                nextTime = currentTime + 100L
                i++
            }
        }
    }
}

fun main() = runBlocking {
    withTimeout(500L) {
        doCount()
    }
}

// 실행결과
// 1
// 2
// 3
// 4
// Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 500 ms

취소가 되면 TimeoutCancellationException 예외가 발생한다. 만약 에러 핸들링하는게 귀찮고 null을 반환하여 이를 처리하고 싶다면 withTimeoutOrNull을 사용할 수 있다.

CoroutineExceptionHandler 및 SupervisorJob

예외를 가장 체계적으로 다루는 방법은 CEH (Coroutine Exception Handler, 코루틴 익셉션 핸들러)를 사용하는 것이다. 커스텀 CEH를 만든 다음 상위 코루틴 빌더의 컨텍스트에 등록하면 하위의 모든 예외까지 종합적으로 핸들링 가능하다.

val ceh = CoroutineExceptionHandler { coroutineContext, exception ->
    println("에러 내용: $exception")
}

fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(Dispatchers.IO)
    val job = scope.launch (ceh) {
        launch { 
            delay(1000L)
            println("Hello World") // 에러 때문에 실행되지 않음
        }
        launch { 
            delay(500L)
            throw Exception() // 에러 발생
        }
    }
    job.join()
}

// 실행 결과
// 에러 내용: java.lang.Exception

참고로 runBlocking에서는 CEH를 사용할 수 없다. runBlocking은 자식이 예외발생으로 인해 종료되면 그냥 종료된다.

SupervisorJob은 예외에 의한 취소를 하위 코루틴으로 전파한다. 위의 예제와 비슷한 내용을 SupervisorJob을 사용하여 다음과 같이 실행해보자.

val ceh = CoroutineExceptionHandler { coroutineContext, exception ->
    println("에러 내용: $exception")
}

fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(Dispatchers.IO+ceh+SupervisorJob())
    val job1 = scope.launch {
        delay(1000L)
        println("Hello World") // 에러가 발생해도 실행됨
    }
    val job2 = scope.launch {
        delay(500L)
        throw Exception() 
    }
    joinAll(job1, job2)
}

// 실행 결과
// 에러 내용: java.lang.Exception
// Hello World

CoroutineScope와 SupervisorJob을 합친듯 한 SupervisorScope가 있다.

val ceh = CoroutineExceptionHandler { coroutineContext, exception ->
    println("에러 내용: $exception")
}

fun main() = runBlocking<Unit> {
	val scope = CoroutineScope(ceh)
    val job = scope.launch{
        supervisorScope {
            launch {
                delay(1000L)
                println("Hello World") 
            }
            launch {
                delay(500L)
                throw Exception() 
            }
        }
    }
    job.join()
}
// 실행결과 
// 에러 내용: java.lang.Exception
// Hello World

supervisorScope를 사용할 때 주의할 점은 무조건 자식 수준에서 예외를 핸들링 해야한다는 것이다. 자식의 실패가 부모에게 전달되지 않기 때문에 자식 수준에서 예외를 처리해야 하는 것을 명심하자.

actor

액터는 1973년에 칼 휴이트가 만든 개념으로 액터가 독점적으로 자료를 갖고, 그 자료를 다른 코루틴과 공유하지 않고 액터를 통해서만 접근하게 만든다.

sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()

fun main() = runBlocking<Unit> {
    val counter = counterActor() // CounterMsg만 다루는 액터 생성
    withContext(Dispatchers.Default) {
        massiveRun { // IntCounter를 100*1000 (10만번) 전송
            counter.send(IncCounter) 
        }
    }

    val response = CompletableDeferred<Int>() // 결과를 전달 받기 위한 CompletableDeffered
    counter.send(GetCounter(response)) // response를 전달하고,
    println("Counter = ${response.await()}") // await()하면 결과를 반환해준다.     
    counter.close()
}

// 대량 실행 작업 (10만번 반복)
suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100
    val k = 1000
    val elapsed = measureTimeMillis {
        coroutineScope {
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("$elapsed ms동안 ${n * k}개의 액션을 수행했습니다.")  
}

// ConterMsg만 다루는 액터
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0
    for (msg in channel) {
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}
카테고리: Kotlin

2개의 댓글

김정원 · 2022년 3월 22일 2:17 오후

깔끔하게 정리된 글 잘 읽고 갑니다! 머리속에 잘 정리되네요 ㅎㅎ

    Charlezz · 2022년 3월 22일 6:13 오후

    잘 정리 되셨다니 뿌듯하네요

답글 남기기

Avatar placeholder

이메일은 공개되지 않습니다. 필수 입력창은 * 로 표시되어 있습니다.