Kotlin协程之Flow的使用与原理解析

 更新时间:2023年09月07日 10:31:01   作者:淘淘养乐多  
Flow是一种数据流,可以用于协程间的通信,具有冷、懒、响应式等特点,Flow是基于协程构建的,可以提供多个值,本文就来给大家讲讲Kotlin Flow使用与原理,需要的朋友可以参考下

Flow的定义和特点

Flow是一种数据流,可以用于协程间的通信,具有冷、懒、响应式等特点。Flow是基于协程构建的,可以提供多个值。Flow在概念上类似于一个数据序列,但它可以使用挂起函数来异步地产生和消费值。这意味着,例如,Flow可以安全地发起网络请求来产生下一个值,而不会阻塞主线程。

Flow的特点主要有以下几点:

  • :Flow是冷的,也就是说,它不会在没有收集器的情况下开始执行。只有当有收集器订阅了Flow时,它才会开始发射值。这与热的数据流不同,热的数据流会在没有收集器的情况下也产生值。
  • :Flow是懒的,也就是说,它只会在需要时才计算值。每个收集器都会触发Flow的重新计算,而不是共享之前计算的结果。这与惰性序列类似,惰性序列只会在迭代时才计算元素。
  • 响应式:Flow是响应式的,也就是说,它可以根据收集器的需求来调整发射速度。如果收集器不能及时处理值,Flow可以暂停或取消发射。这与反应式流规范(Reactive Streams Specification)中定义的背压(backpressure)机制类似。

Flow的创建和操作

Flow可以通过多种方式创建,最简单的方式是使用flow{}构建器函数,在其中使用emit函数手动发射值。例如,下面的代码创建了一个发射1到3的整数值的Flow:

// 创建一个Flow<Int>
fun simple(): Flow<Int> = flow {
    // 发射1到3
    for (i in 1..3) {
        emit(i) // 发射下一个值
    }
}

除了flow{}构建器函数外,还有一些其他方式可以创建Flow,例如:

  • 使用flowOf()函数创建一个包含固定元素的Flow。
  • 使用asFlow()扩展函数将各种集合和序列转换为Flow。
  • 使用channelFlow()构建器函数创建一个基于通道(Channel)的Flow。
  • 使用callbackFlow()构建器函数创建一个基于回调(Callback)的Flow。

创建好Flow后,可以使用各种操作符对数据进行处理。操作符分为两类:

  • 中间操作符:中间操作符用于对数据进行转换、过滤、组合等操作,但不会终止流。中间操作符返回一个新的Flow,可以链式调用多个中间操作符。例如,filter、map、take等操作符都是中间操作符。
  • 终止操作符:终止操作符用于对数据进行收集、聚合、统计等操作,并终止流。终止操作符返回一个非Flow类型的结果,并触发流的执行。例如,collect、first、toList等操作符都是终止操作符。

例如,下面的代码使用了map和filter两个中间操作符对simple()函数返回的Flow进行了转换和过滤,并使用了collect终止操作符对结果进行了打印:

// 对simple()返回的Flow进行处理
fun main() = runBlocking<Unit> {
    // 启动并发协程以验证主线程并未阻塞
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // 收集这个流
    simple()
        .map { it * it } // 数字求平方
        .filter { it % 2 == 0 } // 过滤偶数
        .collect { value -> // 终止操作符
            println(value) // 打印结果
        }
}

输出结果为:

I'm not blocked 1
4
I'm not blocked 2
I'm not blocked 3

可以看到,Flow的执行是在协程中进行的,不会阻塞主线程。同时,Flow的操作符也是挂起函数,可以在其中进行异步操作,例如:

// 对simple()返回的Flow进行处理
fun main() = runBlocking<Unit> {
    // 收集这个流
    simple()
        .map { request(it) } // 模拟异步请求
        .collect { value -> // 终止操作符
            println(value) // 打印结果
        }
}
// 模拟异步请求,返回字符串
suspend fun request(i: Int): String {
    delay(1000) // 延迟1秒
    return "response $i"
}

输出结果为:

response 1
response 2
response 3

可以看到,每个请求都延迟了1秒,但是不会阻塞主线程或其他请求。

Flow的生命周期和异常处理

Flow提供了一些回调函数来监听流的生命周期,例如:

  • onStart:在流开始收集之前调用,可以用于执行一些初始化操作,例如打开文件或数据库连接等。
  • onEach:在每个元素被发射之后调用,可以用于执行一些通用操作,例如日志记录或更新UI等。
  • onCompletion:在流完成收集之后调用,无论是正常完成还是异常终止。可以用于执行一些清理操作,例如关闭文件或数据库连接等。onCompletion可以接收一个可空的Throwable参数,表示流终止的原因,如果为null,则表示正常完成。

例如,下面的代码使用了onStart和onCompletion两个回调函数来打印流的开始和结束时间:

// 对simple()返回的Flow进行处理
fun main() = runBlocking<Unit> {
    // 收集这个流
    simple()
        .onStart { println("Flow started at ${System.currentTimeMillis()}") } // 开始回调
        .onCompletion { println("Flow completed at ${System.currentTimeMillis()}") } // 结束回调
        .collect { value -> // 终止操作符
            println(value) // 打印结果
        }
}

输出结果为:

Flow started at 1632828678656
1
2
3
Flow completed at 1632828678657

可以看到,流的开始和结束时间都被打印出来了。

Flow也提供了一些方式来处理异常,例如:

  • catch:在流发生异常时调用,可以用于捕获和处理异常,并决定是否继续或终止流。catch操作符必须放在可能发生异常的操作符之后,否则无法捕获异常。
  • try-catch:在收集流时使用try-catch块包裹collect操作符,可以用于捕获和处理异常,并决定是否继续或终止程序。try-catch块可以捕获任何位置发生的异常。

例如,下面的代码使用了catch和try-catch两种方式来处理异常:

// 创建一个可能发生异常的Flow<Int>
fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // 发射下一个值
    }
    throw RuntimeException() // 抛出异常
}
// 对foo()返回的Flow进行处理
fun main() = runBlocking<Unit> {
    // 使用catch操作符捕获异常,并打印错误信息,然后继续发射-1作为错误标识
    foo()
        .catch { e -> println("Caught $e") } // 捕获异常
        .emit(-1) // 发射错误标识
        .collect { value ->
            println(value)
        }
    println("Done")
// 使用try-catch块捕获异常,并打印错误信息,然后终止程序
    try {
        foo().collect { value ->
            println(value)
        }
    } catch (e: Throwable) {
        println("Caught $e")
    }
    println("Done")
}

输出结果为:

Emitting 1
1
Emitting 2
2
Emitting 3
3
Caught java.lang.RuntimeException
-1
Done
Emitting 1
1
Emitting 2
2
Emitting 3
3
Caught java.lang.RuntimeException
Done

可以看到,catch操作符可以在流中处理异常,并继续发射值,而try-catch块可以在程序中处理异常,并终止程序。

Flow的线程切换

Flow提供了一些操作符来切换上游和下游的上下文,例如:

  • flowOn:flowOn操作符用于切换上游的上下文,也就是说,它会影响flow{}构建器函数和之前的中间操作符的执行上下文。flowOn操作符可以用于将耗时的计算操作放在后台线程中执行,而不影响主线程。
  • launchIn:launchIn操作符用于切换下游的上下文,也就是说,它会影响collect操作符和之后的中间操作符的执行上下文。launchIn操作符可以用于将收集操作放在协程中执行,而不阻塞当前线程。

例如,下面的代码使用了flowOn和launchIn两个操作符来切换上下文:

// 创建一个Flow<Int>
fun simple(): Flow<Int> = flow {
    // 发射1到3,并打印当前线程名
    for (i in 1..3) {
        Thread.sleep(100) // 假装我们以消耗 CPU 的方式进行计算
        log("Emitting $i")
        emit(i) // 发射下一个值
    }
}.flowOn(Dispatchers.Default) // 在流构建器中改变消耗 CPU 代码上下文
// 对simple()返回的Flow进行处理
fun main() = runBlocking<Unit> {
    // 收集这个流,并打印当前线程名
    simple()
        .collect { value ->
            log("Collected $value")
        }
    println("Done")
    // 使用launchIn操作符将收集操作放在协程中执行,并打印当前线程名
    simple()
        .onEach { value ->
            log("Collected $value")
        }
        .launchIn(this) // 在单独的协程中收集并打印结果
    println("Done")
}

输出结果为:

[DefaultDispatcher-worker-1] Emitting 1
[main] Collected 1
[DefaultDispatcher-worker-1] Emitting 2
[main] Collected 2
[DefaultDispatcher-worker-1] Emitting 3
[main] Collected 3
Done
Done
[DefaultDispatcher-worker-2] Emitting 1
[main] Collected 1
[DefaultDispatcher-worker-2] Emitting 2
[main] Collected 2
[DefaultDispatcher-worker-2] Emitting 3
[main] Collected 3

可以看到,flowOn操作符将发射操作放在了DefaultDispatcher线程中执行,而collect操作仍然在主线程中执行。launchIn操作符将收集操作放在了一个单独的协程中执行,而不阻塞主线程。

以上就是Kotlin协程之Flow的使用与原理解析的详细内容,更多关于Kotlin Flow使用与原理的资料请关注脚本之家其它相关文章!

相关文章

最新评论