資料流

suspend fun很方便,可是suspend fun得要呼叫才有回應,沒有辦法接收即時回傳的資料。在Coroutine裡面就有Flow跟Channel這兩種可以接受即時資料變動的工具,而我們只需要在Coroutine裡面訂閱一次,就可以監聽裡面發生的變化。接下來我們就來認識他們吧!

Flow

fun main() = runBlocking<Unit> {
    simpleFlow().collect { value ->
        println(value)
    }
}

fun simpleFlow(): Flow<Int> = flow {
    for (i in 1..100) {
        delay(100)
        emit(i)
    }
}

要創建一個Flow最簡單的方式是使用flow函數,在flow函數裡面我們可以使用emit函數來丟出資料,最後再使用collect函數來接受資料。

冷流(Cold Flow)

Cold Flow的意思是指必須要有觀測者的存在,才會運作並且發送資料的流,一旦collect存在的Coroutine取消,觀測者不存在了,Flow也會停止運作

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) {
        simpleFlow().collect { value ->
            println(value)
        }
    }
    println("Done")
}

fun simpleFlow(): Flow<Int> = flow {
    for (i in 1..100) {
        delay(100)
        println("Emitting $i")
        emit(i)
    }
}

Flow除了可以使用flow函數建立,也可以對集合使用.asFlow()讓集合轉成流

中間操作

跟惰性集合一樣Flow也有區分中間操作與終端操作,中間操作例如map、filter、take等等跟集合是一樣的,這邊介紹另一個transform操作,可以把讓各個元素依序運算轉換成更多元素

fun main() = runBlocking<Unit> {
    val duration = measureTime {
        simpleFlow().transform {
            emit("$it")
            delay(200)
            emit("complete $it")
        }.collect { value ->
            println(value)
        }
    }
    println("Done $duration")
}

fun simpleFlow(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        println("Emitting $i")
        emit(i)
    }
}

終端操作

Flow的終端操作為suspend fun,除了前面介紹的collect之外,還有toList、toSet、first、single、reduce、fold。有了終端操作,Flow就會開始運作傳出資料,執行終端操作的Coroutine被取消的時候Flow就會停止運作。

fun main() = runBlocking<Unit> {
    val duration = measureTime {
        simpleFlow().reduce { a, b -> a + b }.run(::println)
    }
    println("Done $duration")
}

fun simpleFlow(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        println("Emitting $i")
        emit(i)
    }
}

跟惰性集合一樣,Flow執行的順序是各個元素依次執行。

Flow的運作環境

Flow在沒有特別設定的情形下,會跟呼叫終端操作的Coroutine使用一樣的環境如果我們想要切換執行的環境可以使用flowOn函數

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main() = runBlocking<Unit> {
    val duration = measureTime {
        simpleFlow()
            .map {
                log("map $it")
                it * it
            }
            .reduce { a, b -> a + b }.run(::println)
    }
    println("Done $duration")
}

fun simpleFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(100)
        log("Emitting $i")
        emit(i)
    }
}.flowOn(Dispatchers.Default)

緩衝

數值的運算、處理需要時間,如果處理的速度無法消耗流的進入速度的時候就需要使用緩衝來決定要怎麼處理這樣的情況

buffer

fun main() = runBlocking<Unit> {
    val duration = measureTime {
        simpleFlow()
            .buffer()
            .collect {
                delay(400)
                log(it)
            }
    }
    println("Done $duration")
}

fun simpleFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(100)
        emit(i)
    }
}

conflate

fun main() = runBlocking<Unit> {
    val duration = measureTime {
        simpleFlow()
            .conflate()
            .collect {
                delay(400)
                log("Done $it")
            }
    }
    println("Done $duration")
}

fun simpleFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(100)
        emit(i)
    }
}

collectLatest

fun main() = runBlocking<Unit> {
    val duration = measureTime {
        simpleFlow()
            .collectLatest {
                log("Collect $it")
                delay(400)
                log("Done $it")
            }
    }
    println("Done $duration")
}

fun simpleFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(100)
        emit(i)
    }
}

合併

我們常常會需要合併兩個資料流,而合併也會有很多種不一樣的形式,下面我們來看看有什麼樣的合併方式

zip

zip會把兩個流依照順序合併

fun main() = runBlocking<Unit> {
    val duration = measureTime {
        numberFlow().zip(textFlow()) { number, text ->
            "$number -> $text"
        }.collect {
            log(it)
        }
    }
    println("Done $duration")
}

fun numberFlow(): Flow<Int> = (1..5).asFlow().onEach { delay(300) }
fun textFlow(): Flow<String> = flowOf("one", "two", "three", "four", "five").onEach { delay(400) }

combine

combine會在其中一個流有出現新值的時候,把每個流的最新值拿來合併

fun main() = runBlocking<Unit> {
    val duration = measureTime {
        combine(numberFlow(), textFlow()) { number, text ->
            "$number -> $text"
        }.collect {
            log(it)
        }
    }
    println("Done $duration")
}

fun numberFlow(): Flow<Int> = (1..5).asFlow().onEach { delay(300) }
fun textFlow(): Flow<String> = flowOf("one", "two", "three", "four", "five").onEach { delay(400) }

攤平

有些時候我們會把Flow裡面的物件運算成另一個Flow,這個時候就需要把這個“流中流”攤開來變成線性的流

flatMapConcat

flatMapConcat會依序執行每個Flow

@OptIn(ExperimentalCoroutinesApi::class)
fun main() = runBlocking<Unit> {
    val duration = measureTime {
        numberFlow(5)
            .flatMapConcat { numberFlow(it) }
            .collect {
                log(it)
            }
    }
    println("Done $duration")
}

fun numberFlow(number: Int): Flow<Int> = (1..number).asFlow().onEach { delay(300) }

flatMapMerge

flatMapMerge會同時執行各個Flow

@OptIn(ExperimentalCoroutinesApi::class)
fun main() = runBlocking<Unit> {
    val duration = measureTime {
        numberFlow(5)
            .flatMapMerge { numberFlow(it) }
            .collect {
                log(it)
            }
    }
    println("Done $duration")
}

fun numberFlow(number: Int): Flow<Int> = (1..number).asFlow().onEach { delay(300) }

flatMapLatest

flatMapLatest會在有新的Flow產生的時候取消舊的Flow

@OptIn(ExperimentalCoroutinesApi::class)
fun main() = runBlocking<Unit> {
    val duration = measureTime {
        numberFlow(5)
            .flatMapLatest { numberFlow(it) }
            .collect {
                log(it)
            }
    }
    println("Done $duration")
}

fun numberFlow(number: Int): Flow<Int> = (1..number).asFlow().onEach { delay(300) }

Flow的錯誤處理

我們除了可以使用try/catch把Flow整個包起來捕捉錯誤之外,Flow裡面也有提供catch函數,可以讓我們處理錯誤

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> emit("Caught $e") }
        .collect { value -> println(value) }
}

fun simple(): Flow<String> =
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i)
        }
    }
        .map { value ->
            check(value <= 1) { "Crashed on $value" }
            "string $value"
        }

開啟另一個Coroutine執行Flow

我們可以使用launchIn來開啟一個新的Coroutine來執行Flow,有點類似launch的感覺

fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this)
    println("Done")
}            

Flow的取消

跟一般的Coroutine一樣,在產生Flow的時候如果沒有使用suspend fun,會沒辦法取消Flow,這時候我們需要使用cancellable函數來讓Flow可以被取消

fun main() = runBlocking<Unit> {
    simple()
        .collect { value ->
            if (value > 2) cancel()
            println(value)
        }
}

fun simple(): Flow<Int> = (1..5).asFlow().cancellable()

Channel

Channel是一個可以在Coroutine裡運作的序列(Queue),使用send函數把東西放進去,並且使用receive取出我們放進去的東西

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x * x)
    }
    repeat(5) { println(channel.receive()) }
    println("Done!")
}

不過通常我們在使用Channel的時候並不會知道Channel會被放進去多少東西,我們可以使用for迴圈在channel有東西的時候取出並執行,知道Channel被結束的時候

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) {
            channel.send(x * x)
            delay(200)
        }
        channel.close()
    }
    for (y in channel) println(y)
    println("Done!")
}

使用Channel運算

使用produce函數可以產生一個Channel,我們可以在裡面使用for迴圈取出另一個Channel的值做完運算再丟出

fun main() = runBlocking {
    val numbers = produceNumbers()
    val squares = square(numbers)
    repeat(5) {
        println(squares.receive())
    }
    println("Done!")
    coroutineContext.cancelChildren()
}

@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++)
}

@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)
}

練習

使用produce函數產生Channel運算出20個質數

Channel緩衝

使用Channel函數產生的Channel也有緩衝的特性,就像Flow一樣可以選擇Channel塞滿後要怎麼處理多出來的物件

fun main() = runBlocking<Unit> {
    val channel = Channel<Int>(4)
    val sender = launch {
        repeat(10) {
            println("Sending $it")
            channel.send(it)
        }
    }
    delay(1000)
    for (x in channel) {
        println(x)
        delay(100)
        if (x >= 9) break
    }
    sender.cancel()
}

Last updated