Beeeam

코루틴 공식 문서 읽기 (Asynchronous Flow 1) 본문

Kotlin

코루틴 공식 문서 읽기 (Asynchronous Flow 1)

Beamjun 2023. 4. 13. 19:04

 

https://kotlinlang.org/docs/flow.html

 

Asynchronous Flow | Kotlin

 

kotlinlang.org

 

suspend 함수는 한 개의 값을 비동기적으로 실행하면서 반환한다. 그러면 여러 개의 값을 반환하려면 어떻게 해야 할까? 이 때 코루틴 플로우(Coroutine flow)를 사용하면 여러 개의 값을 반환할 수 있다.

 

Representing multiple values

코틀린에서는 컬렉션을 사용하여 다수의 값을 나타낼 수 있다.

3개의 숫자를 요소로 갖는 리스트를 반환하는 함수를 main 함수에서 forEach 함수를 사용하여 이 리스트의 모든 값들을 출력할 수 있다.

fun simple(): List<Int> = listOf(1, 2, 3)

fun main() {
    simple().forEach { value -> println(value) }
}

위의 코드를 실행하면 다음과 같은 결과가 나온다.

1
2
3

 

Sequences

CPU 연산(밑의 예의 Thread.sleep()) 이 필요한 어떤 일련의 수들은 sequence를 사용하여 나타낼 수 있다. 밑의 예제 코드에서는 각 연산을 하는데 100ms가 소요된다.

fun simple(): Sequence<Int> = sequence {
    for (i in 1..3) {
        Thread.sleep(100) 
        yield(i) 
    }
}

fun main() {
    simple().forEach { value -> println(value) }
}

위의 코드의 실행 결과는 바로 이전의 예제의 실행 결과와 같다. 하지만 각 숫자가 출력 되는데 약간의 시간 텀(100ms)이 생긴다.

 

Suspending functions

하지만 위의 코드는 코드를 실행하는 메인 스레드를 정지 시킨다. 이러한 연산이 비동기적으로 처리 되야 하는 경우에는 함수의 앞에 suspend 키워드를 붙여서 중단 함수로 만들어 사용한다. 그리고 suspend 키워드를 붙인 함수를 코루틴 스코프 내에서 호출하면 스레드의 정지 없이 코드를 실행 할 수 있고 리스트를 반환 받게 만들 수 있다.

suspend fun simple(): List<Int> {
    delay(1000) 
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    simple().forEach { value -> println(value) } 
}

위의 코드도 이전 예제들의 실행 결과와 동일하다.

 

Flows

반환 값으로 List<Int> 타입을 사용하면 해당 함수의 모든 실행이 끝나고 반환 값을 한 번에 반환하게 된다. 비동기적으로 처리 될 값들의 스트림을 나타내기 위해서 Flow<Int> 타입을 사용할 수 있다.

즉 flow는 비동기적으로 동작하면서 여러 개의 값을 반환하는 함수를 만들 때 사용한다.

fun simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(500) 
        emit(i) 
    }
}

fun main() = runBlocking<Unit> {
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(500)
        }
    }
    simple().collect { value -> println(value) } 
}

위의 코드를 실행하면 다음과 같은 결과가 나온다.

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

이 코드는 각 숫자를 출력하기 전에 메인 스레드를 정지하지 않고 500ms를 기다리고 출력한다. 이는 숫자가 출력 되기 전에 “I'm not blocked $k” 가 출력 되는 것을 통해서 알 수 있다.

이전 예제들과 비교하면 flow를 사용한 위의 코드는 다음과 같은 차이점을 가진다.

  • Flow 타입은 flow 빌더를 사용하여 생성한다.
  • flow 블럭 내에서는 일시정지(suspend)가 가능하다.
  • 위의 코드의 simple 함수는 더 이상 suspend 키워드가 필요 없다.
  • 결과 값들은 flow에서 emit() 함수를 통해서 방출된다.
  • flow에서 방출된 값들은 collect() 함수를 통해서 수집된다.

 

Flows are cold

플로우는 시퀀스와 유사한 콜드 스트림이다. 이는 flow{ … } 빌더 내부의 코드 블럭은 플로우가 수집되기 전까지는 실행되지 않는 것을 의미한다. 밑의 예시를 보면 알 수 있다.

fun simple(): Flow<Int> = flow {
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling simple function...")
    val flow = simple()
    println("Calling collect...")
    flow.collect { value -> println(value) }
    println("Calling collect again...")
    flow.collect { value -> println(value) }
}

위의 코드를 실행하면 다음과 같은 결과가 나온다.

Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

결과를 보면 위에서 언급 한 것처럼 플로우가 수집 되기 전에는 값이 반환 되지 않고, 플로우가 수집되기 시작하면서 플로우 빌더 내부의 코드가 실행이 되고, 값을 반환하는 것을 확인 할 수 있다.

이것이 플로우를 반환하는 함수에 suspend 키워드를 붙이지 않는 주요한 이유이다. 플로우를 반환하는 함수는 호출 즉시 반환이 되며, 아무 것도 기다리지 않는다. 그리고 플로우는 수집(collect)가 될 때 마다 시작이 된다. 그래서 위의 실행 결과에서 Flow started 를 2번 확인 할 수 있다.

 

Flow cancellation basics

플로우는 코루틴의 일반적인 취소 규칙을 준수한다. 일시 중지 기능(ex: delay)에서 플로우가 일시중지되면 플로우의 수집을 취소 할 수 있다. 다음 예시에서 withTimeoutOrNull 블록에서 시간 초과시 플로우가 어떻게 취소되고, 해당 코드의 실행을 중지하는지 볼 수 있다.

fun simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { 
        simple().collect { value -> println(value) } 
    }
    println("Done")
}

위의 코드를 실행하면 다음과 같은 결과가 나온다.

Emitting 1
1
Emitting 2
2
Done

메인 스레드는 withTimeoutOrNull(250) 때문에 250ms가 초과되면 종료된다. 그리고 flow 빌더 내부에는 delay(100)이 있어서 플로우 수집에는 최소 100ms가 소요 된다. 그래서 플로우 수집이 2번 진행되면 시간 초과로 인해 플로우 수집이 취소가 된다.

 

Flow builders

이전 예제들 에서는 플로우를 생성할 때 flow{ … } 빌더를 사용했다. 그런데 플로우를 생성하는 빌더는 더 있다.

  • flowOf 빌더는 고정된 값들을 방출하는 플로우를 정의한다.
  • .asFlow() 확장 함수를 사용해서 다양한 컬렉션 및 시퀀스를 플로우로 변환 할 수 있다.

이를 사용해서 플로우로 1 ~ 3을 출력하는 코드를 다음과 같이 만들 수 있다.

fun main() = runBlocking<Unit> {
    (1..3).asFlow().collect { value -> println(value) }
}

결과

1
2
3

 

Intermediate flow operators

시퀀스나 컬렉션을 변환하는 것 처럼 연산자를 통해서 플로우를 변환할 수 있다. 중간 연산자는 업스트림 플로우에 적용되어 다운스트림 플로우를 반환한다. 이 연산자 또한 플로우 처럼 cold 방식으로 동작한다. 이러한 중간 연산자들의 호출 그 자체는 중단 함수가 아니라서 변형된 플로우는 즉시 반환 된다.

(cold 방식이란? 플로우의 수집이 완료될 때 까지 이후의 동작을 하지 않는 것이다.

기본 연산자들은 우리에게 익숙한 map, filter 등의 이름을 가진다. 시퀀스와의 중요한 차이점은 이 연산자들을 통해서 실행되는 코드 블럭 내부에서 중단 함수를 호출할 수 있는 점이다. (밑의 예시 코드에서 확인 가능)

suspend fun performRequest(request: Int): String {
    delay(1000)
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() 
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}

위의 코드를 실행하면 다음과 같은 결과가 나온다.

response 1
response 2
response 3

처음 플로우가 생성될 때는 요소가 1, 2, 3 이였는데 map함수를 통해서 각 요소들이 String 타입으로 변환되었다. 여기서 위에서 확인 가능한 점이 나오는데 delay와 같은 중단 함수를 호출해도 정상적으로 동작한다. 그리고 변환된 플로우는 수집 되어 정상적으로 출력이 된다.

 

Transform operator

플로우 변환 연산자 중 가장 일반적인 것은 transform 이다. 이 연산자는 map 이나 filter 처럼 간단한 변환이나 더 복잡한 변환에도 사용할 수 있다. transform 연산자를 사용하여 임의의 값을 임의의 횟수 만큼 방출할 수 있다.

예를 들어 transform 연산자를 사용하면 오래 걸리는 비동기 요청을 수행하기 전에 원하는 것을 방출 하고, 후에 수행 완료된 비동기 요청의 결과를 방출할 수 있다.

suspend fun performRequest(request: Int): String {
    delay(1000)
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow()
        .transform { request ->
            emit("Making request $request")
            emit(performRequest(request))
        }
        .collect { response -> println(response) }
}

위의 코드를 실행하면 다음과 같은 결과가 나온다.

Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

delay가 있어서 오래 걸리는 performRequest를 실행하기 전에 먼저 원하는 값을 방출하고, performRequest의 결과를 방출하였다.

 

Size-limiting operators

take와 같은 크기 제한 연산자는 정의된 제한치에 도달하면 플로우의 실행을 취소한다. 코루틴의 취소는 항상 예외를 발생 시켜서 모든 try { … } finally { … } 와 같은 리소스 관리 함수들이 정상적으로 실행이 된다.

fun numbers(): Flow<Int> = flow {
    try {
        emit(1)
        emit(2)
        println("This line will not execute")
        emit(3)
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers()
        .take(2)
        .collect { value -> println(value) }
}

위의 코드를 실행시키면 다음과 같은 결과가 나온다.

1
2
Finally in numbers

플로우는 take 함수에 인자로 전달한 수 만큼 방출되고 예외를 발생 시키면서 종료하는 것을 확인할 수 있다.

 

Terminal flow operators

플로우 종단 연산자는 플로우 수집을 시작하는 중단 함수이다. collect 연산자가 기본적인 종단 연산자이지만 수집을 더 쉽게 할 수 있게 하는 다양한 연산자들도 있다.

  • toList , toSet 같은 다양한 컬렉션으로 변환
  • 첫 번째 값만 방출하며 플로우는 단일 값만 방출함을 보장
  • reduce 나 fold를 사용하여 플로우를 값으로 변환
fun main() = runBlocking<Unit> {
    val sum = (1..5).asFlow()
        .map { it * it }                          
        .reduce { a, b -> a + b } 
    println(sum)
}

위의 코드를 실행하면 다음과 같은 결과가 나온다.

55

map 함수를 통해서 각 요소들을 제곱하고, reduce 함수를 이용하여 값들을 수집할 때 다 더하게 하였다. 그래서 1 + 4+ 9+ 16+ 25의 결과로 55가 출력된다.

 

Flows are sequential

컬렉션은 종단 연산자를 호출한 코루틴에서 바로 작업한다. 기본적으로 새로운 코루틴은 실행되지 않는다. 각각의 방출된 값은 업스트림의 중간 연산자들에 의해 처리되고 다운스트림 전달되고 이후에 종단 연산자에 전달이 된다.

fun main() = runBlocking<Unit> {
    (1..5).asFlow()
        .filter {
            println("Filter $it")
            it % 2 == 0              
        }              
        .map { 
            println("Map $it")
            "string $it"
        }.collect { 
            println("Collect $it")
        }    
}

위의 코드를 실행하면 다음과 같은 결과가 나온다.

Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

filter 함수를 통해서 짝수인지 확인하고 짝수이면 해당 값을 map 함수를 사용해서 매핑하고, 수집을 하는 함수이다. 그래서 짝수인 2, 4에서는 map과 collect 함수가 실행이 된다.

 

Flow context

플로우의 수집은 항상 호출된 코루틴의 컨텍스트에서 진행되는데 플로우의 세부 구현 내용에 관계없이 코드 작성자가 명시적으로 지정한 컨텍스트에서 실행된다.

이러한 플로우의 특성을 컨텍스트 보존(Context preservation)이라고 한다.

그래서 flow { … } 빌더의 코드는 해당 플로우의 수집을 호출한 컨텍스트에서 실행된다.

밑의 코드는 호출된 스레드와 함께 플로우에서 방출되는 수들을 출력한다.

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple().collect { value -> log("Collected $value") }
}

위의 코드를 실행하면 다음과 같은 결과가 나온다.

[main] Started simple flow
[main] Collected 1
[main] Collected 2
[main] Collected 3

simple 의 수집이 main 스레드에서 실행되었기 때문에 simple { … } 빌더 내부의 코드도 main 스레드에서 실행이 된 것을 확인할 수 있다.

 

A common pitfall when using withContext

하지만 CPU 소비가 큰 코드는 Dispatchers.Default 같은 별도의 스레드에서 실행해야 하고, UI 업데이트 코드는 Dispatchers.Main 같은 UI 전용 스레드에서 실행해야 한다. 일반적으로 코틀린 코루틴에서는 컨텍스트를 변경하기 위해서 withContext를 사용하는데 flow { … } 빌더의 코드는 컨텍스트 보존 특성을 지켜야 해서 다른 컨텍스트에서 방출할 수 없다.

fun simple(): Flow<Int> = flow {
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) 
            emit(i) 
        }
    }
}

fun main() = runBlocking<Unit> {
    simple().collect { value -> println(value) } 
}

그래서 이 코드를 실행하면 다음과 같은 오류가 발생한다.

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: Flow was collected in [BlockingCoroutine{Active}@74c8c892, BlockingEventLoop@5a5dcdf8], but emission happened in [DispatchedCoroutine{Active}@522f99ab, Dispatchers.Default]. Please refer to 'flow' documentation or use 'flowOn' instead at kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext(SafeCollector.common.kt:84) at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:88) …

simple 플로우를 main 스레드에서 호출하였지만 simple { … } 빌더 내부에서 withContext로 Dispatchers.Default 스레드에서 실행되게 했기 때문에 오류가 발생했다.

 

flowOn operator

flowOn 함수flow의 방출에서 컨텍스트 변환을 하는 올바른 방법이다. 밑의 코드는 현재 스레드의 이름을 출력하여 플로우의 값 방출이 어느 스레드에서 어떻게 동작 하는지를 보여준다.

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
           
fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100)
        log("Emitting $i")
        emit(i) 
    }
}.flowOn(Dispatchers.Default)

fun main() = runBlocking<Unit> {
    simple().collect { value ->
        log("Collected $value") 
    } 
}

실행 결과

[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3

simple { … } 빌더의 확장 함수로 flowOn을 사용하여 스레드를 Dispatchers.Default로 변경하였기 때문에 simple { … } 내부의 코드는 DefaultDispathcer에서 동작하는 것을 확인할 수 있다. 수집은 위의 예시처럼 했기 때문에 동일하게 main 스레드에서 동작한다.

 

여기서 하나 봐야 할 점은 flowOn 연산자가 플로우의 기본적인 특성인 순차성을 일부 변경했다는 점이다. 플로우의 수집은 coroutine#1 에서 발생하고 플로우의 방출은 coroutine#2 에서 발생한다. flowOn 연산자는 컨텍스트 내에서 CouroutineDispatcher를 변경 해야 하는 경우 업스트림 플로우를 위한 새로운 코루틴을 생성한다.

 

내용이 너무 길어서 여기까지만 한번 잘라야겠다.