Beeeam

코루틴 공식 문서 읽기 (Asynchronous Flow 2) 본문

Kotlin

코루틴 공식 문서 읽기 (Asynchronous Flow 2)

Beamjun 2023. 4. 13. 22:51

https://kotlinlang.org/docs/flow.html

 

Asynchronous Flow | Kotlin

 

kotlinlang.org

 

Buffering

오래 걸리는 비동기 작업이 관련된 플로우의 경우 해당 플로우의 로직을 다른 코루틴에서 실행하는 것이 플로우 수집에 걸리는 전체 시간의 관점에서 도움이 된다.

예를 들어 방출하는데 100ms가 걸리고 플로우의 각 요소를 수집하는데 300ms씩 걸린다고 하자

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) 
        emit(i) 
    }
}

fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        simple().collect { value ->
            delay(300) 
            println(value)
        }
    }
    println("Collected in $time ms")
}

실행 결과

1
2
3
Collected in 1220 ms

실행을 하게 되면 숫자 하나 당 대략 400ms(방출: 100ms + 수집: 300ms)씩 걸려서 총 1200ms가 소모됐다.

하지만 buffer를 사용하여 방출과 수집이 동시에 발생하게 하면 시간을 단축할 수 있다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) 
        emit(i) 
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple()
            .buffer() 
            .collect { value -> 
                delay(300) 
                println(value) 
            } 
    }   
    println("Collected in $time ms")
}

실행 결과

1
2
3
Collected in 1082 ms

첫 번째 숫자에 대해서만 방출에 100ms를 기다린 후 다음 숫자들을 처리할 때는 방출에 100ms를 기다리지 않기 때문에 시간 단축이 되었다.

 

Conflation(컨플레이션)

플로우가 연산의 일부 또는 연산 상태 업데이트를 방출하는 경우 모든 값들을 각각 방출하는 것은 비효율적이고 가장 최신 값만 처리하여 방출하는 것이 효율적일 것이다. 이러한 경우 conflation 연산자를 사용하면 중간 값들을 건너뛸 수 있다.

fun simple(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(100)
        emit(i) 
    }
}

fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        simple()
            .conflate() 
            .collect { value ->
                delay(300) 
                println(value)
            }
    }
    println("Collected in $time ms")
}

실행 결과

1
3
Collected in 776 ms

위의 코드를 실행하면 1이 방출되어 수집되는 동안 2, 3도 방출이 된다. 그리고 중간 숫자인 2는 건너뛰어서 3만 수집이 된다. 그래서 대략 700ms가 걸린다. (방출되는 시간: 100ms + 1 수집 되는 시간: 300ms + 3 수집 되는 시간: 300ms)

 

Processing the latest value

Conflation은 수집기와 방출기 둘 다 느릴 때 처리 속도를 높이는 방법 중 하나이다. 이것은 위에서 언급한 것 처럼 중간 값들을 삭제하여 수행한다.

또 다른 방법으로 새로운 값이 방출 될 때 마다 느린 수집기를 취소하고 다시 시작하는 방법이 있다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        simple()
            .collectLatest { value -> 
                println("Collecting $value")
                delay(300) 
                println("Done $value")
            }
    }
    println("Collected in $time ms")
}

실행 결과

Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 772 ms

플로우의 각 요소들은 방출되는데 100ms가 소요 되고, collectLatest 블록은 수행하는데 300ms가 소요된다. 위의 결과를 보면 대략 700ms가 나온다.

이는 각 수집기에서 값이 방출되고, 수집기가 취소 됐다가 재시작 되기 때문에 Collecting을 3번 출력하는데 300ms가 걸리고, 모든 값들이 방출되면 collectLatest 블록의 delay 300 때문에 300ms가 추가적으로 소요가 되기 때문이다. 그러면 100ms가 남는데 이는 time을 측정하는 과정에서 생기는 딜레이인 것 같다.

 

Composing multiple flows

다중 플로우를 합성하는 방법은 여러 가지가 있다.

Zip

코틀린 표준 라이브러리의 Sequence.zip 확장 함수 처럼 플로우에도 두 플로우를 결합하는 zip 연산자가 있다.

fun main() = runBlocking<Unit> {
    val nums = (1..3).asFlow() // numbers 1..3
    val strs = flowOf("one", "two", "three") // strings
    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
        .collect { println(it) } // collect and print
}

실행 결과

1 -> one
2 -> two
3 -> three

위의 결과처럼 nums 플로우와 strs 플로우를 zip으로 결합하여 a -> b 형식으로 출력한 것을 확인할 수 있다.

 

Combine

플로우가 변수나 연산의 가장 최근 값을 나타내는 경우 해당 플로우의 최근 값에 추가 연산을 하거나 별도의 업스트림 플로우가 값을 방출할 때 마다 다시 그 추가 연산을 해야 할 수 있다. 이와 관련된 연산자들을 combine라고 부른다.

예를 들어 위에서 들었던 예시에 nums 플로우를 업데이트 하는데 300ms가 소요되고, strs 플로우를 업데이트 하는데 400ms가 소요되도록 변경한 수 zip 연산자를 사용하여 값을 결합하여 출력하면 위의 예시와 같은 결과를 400ms마다 출력한다.

fun main() = runBlocking<Unit> {
    val nums = (1..3).asFlow().onEach { delay(300) }
    val strs = flowOf("one", "two", "three").onEach { delay(400) } 
    val startTime = currentTimeMillis()
    nums.zip(strs) { a, b -> "$a -> $b" } 
        .collect { value ->
            println("$value at ${currentTimeMillis() - startTime} ms from start")
        }
}

실행 결과

1 -> one at 487 ms from start
2 -> two at 888 ms from start
3 -> three at 1311 ms from start

그런데 위의 예제에서 zip 대신에 combine 연산자를 사용하면?

fun main() = runBlocking<Unit> {
    val nums = (1..3).asFlow().onEach { delay(300) } 
    val strs = flowOf("one", "two", "three").onEach { delay(400) } 
    val startTime = currentTimeMillis() 
    nums.combine(strs) { a, b -> "$a -> $b" }
        .collect { value -> 
            println("$value at ${currentTimeMillis() - startTime} ms from start")
        }
}

실행 결과

1 -> one at 518 ms from start
2 -> one at 734 ms from start
2 -> two at 919 ms from start
3 -> two at 1045 ms from start
3 -> three at 1326 ms from start

zip을 사용했을 때와는 다른 결과를 확인할 수 있다. 결과를 보면 nums, strs 플로우로 부터 값이 방출될 때마다 각 플로우의 최근 값과 결합되어 값이 출력 되는 것을 확인할 수 있다.

 

Flattening flows

플로우는 비동기적으로 수신되는 값들의 시퀀스를 나타낸다. 그러므로 어떤 플로우에서 수신되는 일련의 값들이 다른 값들의 시퀀스를 요청하는 플로우가 되는 일은 자주 발생한다.

플로우 플래트닝이란? → 중첩된 데이터 스트림을 플랫하게 만들어서 처리하기 쉽게 만들어주는 기능이다.

예를 들어 두 개의 문자열을 500ms 간격으로 방출하는 플로우를 다음과 같이 정의 할 수 있다.

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500) 
    emit("$i: Second")
}

세 개의 Int를 방출하는 플로우를 가지고 각 정수를 방출할 때 마다 requestFlow 함수를 호출하는 경우

(1..3).asFlow().map { requestFlow(it) }

위의 작업을 하면 (Flow<Flow<String>>) 형태의 플로우를 얻게 되는데 이러한 형식은 추가 처리를 위해서 플래트닝이 필요하다.

컬렉션들과 시퀀스들은 이를 위해 flatten과 flatMap 연산자들을 가지고 있다. 하지만 플로우는 비동기적 특성 때문에 플로우를 위한 플래트닝 연산자들이 별도로 정의 되어있다.

 

flatMapConcat

플로우간의 연결은 flatMapConcat과 flattenConcat 연산자에 의해 구현이 된다. 이 연산자들은 다음 플로우의 수집을 시작하기 전에 현재 플로우가 완료될 때까지 기다린다.

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500) 
    emit("$i: Second")
}

fun main() = runBlocking<Unit> {
    val startTime = currentTimeMillis() 
    (1..3).asFlow().onEach { delay(100) }
        .flatMapConcat { requestFlow(it) }
        .collect { value -> 
            println("$value at ${currentTimeMillis() - startTime} ms from start")
        }
}

실행 결과

1: First at 190 ms from start
1: Second at 700 ms from start
2: First at 808 ms from start
2: Second at 1323 ms from start
3: First at 1432 ms from start
3: Second at 1932 ms from start

결과를 통해서 flatMapConcat 함수의 순차적인 속성을 확인할 수 있다.

requestFlow 함수에서 값을 2번 방출하는데 각 방출 사이에는 500ms가 소요된다. flatMapConcat을 사용하였기 때문에 두 개의 값이 모두 방출될 때까지 다음 플로우의 수집을 하지 않기 때문에 위와 같은 결과가 나왔다.

 

flatMapMerge

또 다른 플래트닝 연산은 모든 플로우를 동시에 수집하고, 해당 값을 한 개의 플로우로 병합하여 가능한 빠르게 방출하는 것이 있다.

이것은 flatMapMerge 또는 flattenMapMerge 연산자에 의해 구현된다.

둘 다 concurrency 파라미터를 지원하는데 이는 선택적으로 사용하는 것이고, 이를 사용하여 동시에 수집되는 플로우의 개수를 제한할 수 있다.

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapMerge { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${currentTimeMillis() - startTime} ms from start") 
        } 
}

실행 결과

1: First at 194 ms from start
2: First at 284 ms from start
3: First at 396 ms from start
1: Second at 709 ms from start
2: Second at 785 ms from start
3: Second at 914 ms from start

결과를 통해서 flatMapMerge 연산자의 동시성 특징을 확인할 수 있다.

flatMapConcat을 사용했을 때와는 달리 requestFlow 중간의 500ms의 delay에도 불구하고 처음 방출되는 값들을 바로 병합하여 출력하는 결과가 나오는데 이는 flatMapMerge를 사용했기 때문이다.

 

flatMapLatest

위에서 설명한 collectLatest 연산자와 비슷한 동작을 하는 플래트닝 모드가 있다. 이는 flatMapLatest로 구현되며, 새 플로우가 방출되면 직전의 플로우의 수집을 취소한다.

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = currentTimeMillis() // remember the start time 
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapLatest { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${currentTimeMillis() - startTime} ms from start") 
        } 
}

실행 결과

1: First at 181 ms from start
2: First at 289 ms from start
3: First at 395 ms from start
3: Second at 910 ms from start

위에서 설명했던 다른 플래트닝 모드들과는 달리 1: Second ~~, 2: Second ~~ 를 볼 수 없다. 이는 flatMapLatest를 사용하였기 때문이다. requestFlow 함수 내의 delay를 기다리는 동안 다른 플로우의 수집이 시작되면서 직전 플로우의 수집을 취소 했기 때문에 이러한 결과가 나오게 된다.

 

Flow exceptions

플로우 수집은 방출 로직 이나 연산자 안의 코드가 예외를 발생 시키면 예외 상태로 종료될 수 있는데 이러한 예외를 다룰 수 있는 여러 방법들이 있다.

Collector try and catch

수집기는 코틀린의 try/catch를 사용하여 예외를 다룰 수 있다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value ->         
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}

실행 결과

Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2

결과를 통해서 collect에서 예외를 성공적으로 잡아내었고, 예외 처리를 하여 이후에 값이 방출되지 않는 것을 확인 가능하다.

Everything is caught

위의 예제는 발생하는 모든 예외를 잡아낼 수 있다.

Exception transparency

방출 코드의 예외 처리 동작을 어떻게 캡슐화 할 수 있을까?

 

플로우는 예외에 있어서 반드시 투명해야 한다. try/catch 블록 내부에서 flow 빌더의 값을 방출하는 것은 예외 투명성을 위반하는 것이다.

방출 로직은 이러한 예외 투명성을 지키기 위해서 catch 연산자를 사용할 수 있고, 이를 통해서 해당 예외 처리 로직을 캡슐화 할 수 있다. catch 연산자는 예외에 따라 다양한 방식으로 대응이 가능하다.

  • 예외는 throw를 사용하여 다시 예외를 throw 할 수 있다.
  • 예외는 catch 본문의 방출을 사용하여 값의 방출로 변환될 수 있다.
  • 예외는 다른 코드에 의해 무시, 기록, 처리될 수 있다.
fun simple(): Flow<String> =
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
        .map { value ->
            check(value <= 1) { "Crashed on $value" }
            "string $value"
        }

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> emit("Caught $e") } // emit on exception
        .collect { value -> println(value) }
}

실행 결과

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

catch 연산자를 사용하면 수집기에서 try/catch로 방출을 감싸지 않아도 위의 예제와 같은 출력을 낼 수 있다.

 

Transparent catch

예외 투명성을 지키는 catch 연산자는 업스트림 예외만 잡아낸다. 다운스트림에서 발생한 예외에 대해서는 처리하지 않는다.

업스트림: 데이터를 생산하고 소비하지 않음

다운스트림: 데이터를 소비하고 생산하지 않음

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> println("Caught $e") } 
        .collect { value ->
            check(value <= 1) { "Collected $value" }
            println(value)
        }
}

실행 결과

Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2 at MainKt$main$1$invokeSuspend$$inlined$collect$1.emit(Collect.kt:135) at kotlinx.coroutines.flow.FlowKt__ErrorsKt$catchImpl$$inlined$collect$1.emit(Collect.kt:136) at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt:15) …

실행 결과를 보면 catch 연산자가 있음에도 불구하고 Caught ~~ 라는 메시지를 확인할 수 없다.

 

Catching declaratively

수집 연산자의 본문을 onEach안으로 이동하고, catch 연산자 이전에 놓음으로 catch 연산자의 모든 예외를 처리하려는 속성과 선언적 속성을 결합할 수 있다.

이 플로우의 수집은 collect에 아무런 매개변수 없이 호출하여 시작해야 한다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .onEach { value ->
            check(value <= 1) { "Collected $value" }
            println(value)
        }
        .catch { e -> println("Caught $e") }
        .collect()
}

실행 결과

Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2

이제 Caught ~~ 라는 메시지를 확인할 수 있다. 이를 통해서 try / catch를 명시적으로 사용하지 않고 모든 예외를 잡아낼 수 있음을 알 수 있다.

 

Flow completion

플로우의 수집이 완료(정상적인 OR 예외 발생)되면 다른 작업을 실행해야 할 수 있다. 이것은 두 가지 방법으로 수행할 수 있다. (명령형, 선언형)

Imperative finally block (명령형)

try / catch에 추가적으로 수집기는 finally 블록을 사용하여 수집이 완료 된 후의 작업을 실행할 수 있다.

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } finally {
        println("Done")
    }
}

실행 결과

1
2
3
Done

simple 플로우의 수집이 완료되자 “Done” 문자열이 출력 되는 것을 확인할 수 있다.

Declarative handling (선언형)

선언형 방법에는 플로우의 수집이 완료 되었을 때 실행 될 작업을 정의할 수 있는 onCompletion 중간 연산자가 있다.

이전 예제를 onCompletion 연산자를 사용하면 밑의 코드처럼 작성할 수 있고, 동일한 결과를 출력한다.

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { println("Done") }
        .collect { value -> println(value) }
}

실행 결과는 이전 예제와 동일

onCompletion을 사용하면 람다에 nullable로 정의되는 Throwable 파라미터를 통해 플로우가 정상적으로 수집 되었는지 아니면 예외가 발생했는지 알 수 있는 장점이 있다.

밑의 예제는 플로우에서 1을 방출하고, 예외를 발생시키는 코드이다.

fun simple(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally (${cause})") }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}

실행 결과

1
Flow completed exceptionally (java.lang.RuntimeException)
Caught exception

실행 결과를 보면 onCompletion을 통해서 예외 발생 여부를 확인할 수 있다. 하지만 Caught ~~ 문자열이 출력된 것을 봄으로써 예외 처리는 하지 않음을 알 수 있다. 예외는 catch 연산자가 처리 하였다.

 

Successful completion

catch 연산자와의 다른 차이점은 onCompletion은 모든 예외를 보고 업스트림 플로우가 성공적으로 완료된 경우에만 null 예외를 수신한다는 것이다. (취소 또는 예외가 없는 경우)

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }
            println(value)
        }
}

실행 결과

1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2 at ….

Imperative versus declarative

우리는 이제 어떻게 플로우를 수집하는지, 이것의 완료와 예외를 명령적, 선언적 방법으로 다루는지 알았다.

그럼 선언적, 명령적 중 어떤 방법이 더 선호 될까?

라이브러리는 어떤게 더 낫다고 말할 수 없고, 두 가지 방법 모두 유효 하기 때문에 자신의 환경이나 스타일에 따라서 선택하여 사용해도 된다.

 

Launching flow

어떤 소스로부터 들어오는 비동기 이벤트는 플로우를 사용하여 쉽게 표현할 수 있다. 이러한 경우 들어오는 이벤트들에 대응하는 코드를 addEventListener를 통해 등록하고 이후 필요한 일을 진행하는 방식을 사용하곤 한다.

플로우에서는 onEach 연산자가 해당 역할을 한다. 하지만 onEach 연산자는 중간 연산자라서 플로우의 수집을 시작하는 종단 연산자가 없이는 호출하여도 아무 의미 없다.

만약 onEach 이후에 수집 종단 연산자를 사용하면 그 이후 코드들은 플로우가 수집될 때 까지 대기할 것이다.

fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() 
    println("Done")
}

실행 결과

Event: 1
Event: 2
Event: 3
Done

결과를 보면 실제로 collect 연산자 이후의 코드는 플로우의 수집이 완료 된 이후에 실행된 것을 확인할 수 있다.

그런데 collect 연산자 대신에 launchIn 연산자를 사용하면 별도의 코루틴에서 플로우 수집을 시작할 수 있어서 이후의 코드를 바로 실행할 수 있다.

// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) 
    println("Done")
}

실행 결과

Done
Event: 1
Event: 2
Event: 3

collect 연산자를 사용했을 때와는 달리 Done이 제일 먼저 출력된 것을 확인할 수 있다.

 

launchIn 연산자를 사용할 때는 파라미터로 플로우를 수집할 코루틴의 범위를 넣어야 한다. 위의 예제 코드의 경우는 this를 사용하여 scope를 runBlocking 코루틴 빌더로 부터 받아왔다. 이로 인해 플로우가 수집되는 동안 runBlocking 스코프는 자식 코루틴의 완료를 기다리고, main 함수가 이 예제를 반환하고 종료 하지 않도록 한다.

 

실제 앱에서 스코프는 수명주기를 갖는 엔티티들에서 전달될 수 있다. 이 엔티티의 수명이 종료되는 즉시 해당 스코프는 취소가 되고, 그 스코프 안의 플로우 수집 동작들도 같이 취소가 된다. 이러한 방식으로 onEach{ … }.launchIn(scope) 쌍이 addEventListener 처럼 동작한다.

 

하지만 removeEventListener는 필요하지 않는데 이는 취소와 구조화된 동시성이 역할을 대신 하기 때문이다.

launchIn도 job 객체를 반환하는데 이는 전체 스코프를 취소하거나 특정 job에 대한 join을 하지 않고 해당 플로우 수집 코루틴을 취소하는 곳에만 사용할 수 있다.

 

Flow cancellation checks

편의를 위해서 플로우 빌더는 방출된 각 값에 대한 취소에 추가적인 ensureActive 검사를 수행한다. 이는 플로우 { … }에서 사용 중인 루프를 취소할 수 있음을 의미한다.

fun foo(): Flow<Int> = flow {
    for (i in 1..5) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    foo().collect { value ->
        if (value == 3) cancel()
        println(value)
    }
}

실행 결과

Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@394e1a0f

실행 결과를 보면 플로우의 수집을 값이 3이면 취소하라고 했기 때문에 4가 방출 되자 예외가 발생하면서 플로우 수집이 종료 됐음을 확인할 수 있다.

 

하지만 대부분의 다른 플로우 연산자들은 그들의 성능적인 이유로 추가적인 취소 확인을 하지 않는다. 예를 들어 (정수 범위).asFlow 연산자(ex (1 .. 5).asFlow)를 사용하고 위의 예제 처럼 collect 안에서 cancel를 하게 하면 다음과 취소가 되지 않는다.

fun main() = runBlocking<Unit> {
    (1..5).asFlow().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

실행 결과

1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@72b6cbcc

실행 결과를 보면 1 ~ 5까지 값들이 방출되고 취소 때문에 예외가 발생하는 것을 확인할 수 있다.

 

Making busy flow cancellable

이와 같은 플로우에서 수집을 취소할 수 있는 방법이 있다. 이는 cancellable 연산자를 사용하는 것이다.

fun main() = runBlocking<Unit> {
    (1..5).asFlow().cancellable().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}

실행 결과

1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutine{Cancelled}@1f554b06

실행 결과처럼 cancellable 함수를 사용하면 플로우가 수집 되는 중에 원하는 상황에 취소를 시킬 수 있다.