Kotlin Channel处理多个数据组合的流

 更新时间:2022年11月25日 16:48:58   作者:wayne214  
最近项目中对 kotlin 的使用比较多。不得不说 kotlin 确实可以极大的提高 android 的开发效率,channel用于协程之间的通讯,使用send和receive往通道里写入或者读取数据,2个方法为非阻塞挂起函数,channel是热流,不管有没有订阅者都会发送

结论先行

Kotlin协程中的Channel用于处理多个数据组合的流,随用随取,时刻准备着,就像自来水一样,打开开关就有水了。

Channel使用示例

fun main() = runBlocking {
    logX("开始")
    val channel = Channel<Int> {  }
    launch {
        (1..3).forEach{
            channel.send(it)
            logX("发送数据: $it")
        }
        // 关闭channel, 节省资源
        channel.close()
    }
    launch {
        for (i in channel){
            logX("接收数据: $i")
        }
    }
    logX("结束")
}

示例代码 使用Channel创建了一组int类型的数据流,通过send发送数据,并通过for循环取出channel中的数据,最后channel是一种协程资源,使用结束后应该及时调用close方法关闭,以免浪费不必要的资源。

Channel的源码

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> {}
        CONFLATED -> {}
        UNLIMITED -> {}
        else -> {}
    }

可以看到Channel的构造函数包含了三个参数,分别是capacity、onBufferOverflow、onUndeliveredElement.

首先看capacity,这个参数代表了管道的容量,默认参数是RENDEZVOUS,取值是0,还有其他一些值:

  • UNLIMITED: Int = Int.MAX_VALUE,没有限量
  • CONFLATED: 容量为1,新的覆盖旧的值
  • BUFFERED: 添加缓冲容量,默认值是64,可以通过修改VM参数:kotlinx.coroutines.channels.defaultBuffer,进行修改

接下来看onBufferOverflow, 顾名思义就是管道容量满了,怎么办?默认是挂起,也就是suspend,一共有三种分别是:

SUSPNED、DROP_OLDEST以及DROP_LATEST

public enum class BufferOverflow {
    /**
     * Suspend on buffer overflow.
     */
    SUSPEND,
    /**
     * Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
     */
    DROP_OLDEST,
    /**
     * Drop **the latest** value that is being added to the buffer right now on buffer overflow
     * (so that buffer contents stay the same), do not suspend.
     */
    DROP_LATEST
}
  • SUSPEND,当管道的容量满了以后,如果发送方还要继续发送,我们就会挂起当前的 send() 方法。由于它是一个挂起函数,所以我们可以以非阻塞的方式,将发送方的执行流程挂起,等管道中有了空闲位置以后再恢复,有点像生产者-消费者模型
  • DROP_OLDEST,顾名思义,就是丢弃最旧的那条数据,然后发送新的数据,有点像LRU算法。
  • DROP_LATEST,丢弃最新的那条数据。这里要注意,这个动作的含义是丢弃当前正准备发送的那条数据,而管道中的内容将维持不变。

最后一个参数是onUndeliveredElement,从名字看像是没有投递成功的回调,也确实如此,当管道中某些数据没有成功接收时,这个就会被调用。

综合这个参数使用一下

fun main() = runBlocking {
    println("开始")
    val channel = Channel<Int>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
        println("onUndeliveredElement = $it")
    }
    launch {
        (1..3).forEach{
            channel.send(it)
            println("发送数据: $it")
        }
        // 关闭channel, 节省资源
        channel.close()
    }
    launch {
        for (i in channel){
            println("接收数据: $i")
        }
    }
    println("结束")
}

输出结果如下:
开始
结束
发送数据: 1
发送数据: 2
发送数据: 3
接收数据: 2
接收数据: 3

安全的从Channel中取数据

先看一个例子

val channel: ReceiveChannel<Int> = produce {
        (1..100).forEach{
            send(it)
            println("发送: $it")
        }
    }
while (!channel.isClosedForReceive){
    val i = channel.receive();
    println("接收: $i")
}

输出报错信息:
Exception in thread "main" kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed

可以看到使用isClosedForReceive判断是否关闭再使用receive方法接收数据,依然会报错,所以不推荐使用这种方式。

推荐使用上面for循环的方式取数据,还有kotlin推荐的consumeEach方式,看一下示例代码

val channel: ReceiveChannel<Int> = produce {
        (1..100).forEach{
            send(it)
            println("发送: $it")
        }
    }
channel.consumeEach {
    println("接收:$it")
}

所以,当我们想要获取Channel当中的数据时,我们尽量使用 for 循环,或者是channel.consumeEach {},不要直接调用channel.receive()。

热的数据流从何而来

先看一下代码

    println("开始")
    val channel = Channel<Int>(capacity = 3, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
        println("onUndeliveredElement = $it")
    }
    launch {
        (1..3).forEach{
            channel.send(it)
            println("发送数据: $it")
        }
    }
    println("结束")
}

输出:
开始
结束
发送数据: 1
发送数据: 2
发送数据: 3

可以看到上述代码中并没有 取channel中的数据,但是发送的代码正常执行了,这种“不管有没有接收方,发送方都会工作”的模式,就是我们将其认定为“热”的原因。

举个例子,就像去海底捞吃火锅一样,你不需要主动要求服务员加水,服务员看到你的杯子中水少了,会自动给你添加,你只管拿起水杯喝水就行了。

总的来说,不管接收方是否存在,Channel 的发送方一定会工作。

Channel能力的来源

通过源码可以看到Channel只是一个接口,它的能力来源于SendChannel和ReceiveChannel,一个发送管道,一个接收管道,相当于做了一个组合。

这也是一种良好的设计思想,“对读取开放,对写入封闭”的开闭原则。

到此这篇关于Kotlin Channel处理多个数据组合的流的文章就介绍到这了,更多相关Kotlin Channel内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Android实现选项菜单子菜单

    Android实现选项菜单子菜单

    这篇文章主要为大家详细介绍了Android实现选项菜单子菜单,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-12-12
  • Android开发Kotlin语言协程中的并发问题和互斥锁

    Android开发Kotlin语言协程中的并发问题和互斥锁

    Android开发Kotlin语言提供了多种机制来处理并发和同步,其中包括高层次和低层次的工具,对于常规的并发任务,可以利用 Kotlin 协程提供的结构化并发方式,而对于需要更低层次的锁定机制,可以使用Mutex(互斥锁)来实现对共享资源的线程安全访问
    2024-06-06
  • android查看网络图片的实现方法

    android查看网络图片的实现方法

    这篇文章主要为大家详细介绍了android查看网络图片的实现方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-04-04
  • Android自定义View实现钟摆效果进度条PendulumView

    Android自定义View实现钟摆效果进度条PendulumView

    这篇文章主要介绍了Android自定义View实现钟摆效果进度条PendulumView,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2016-09-09
  • php 异步调用方法实现示例

    php 异步调用方法实现示例

    客户端与服务器端是通过HTTP协议进行连接通讯,客户端发起请求,服务器端接收到请求后执行处理,并返回处理结果
    2014-01-01
  • Kotlin中单例模式和Java的对比浅析

    Kotlin中单例模式和Java的对比浅析

    目前java中的单例模式有多种写法,kotlin中的写法更多一点,下面这篇文章主要给大家介绍了关于Kotlin中单例模式和Java对比的相关资料,会总结全部的到单例模式写法,需要的朋友可以参考下
    2018-07-07
  • AndroidStudio中重载方法@Override的使用详解

    AndroidStudio中重载方法@Override的使用详解

    这篇文章主要介绍了AndroidStudio中重载方法@Override的使用详解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-04-04
  • android判断phonegap是否联网且加载super.loadUrl网址

    android判断phonegap是否联网且加载super.loadUrl网址

    android判断phonegap是否联网动态加载super.loadUrl网址,接下来本文所提供的知识会帮助你解决以上问题,感兴趣的你可不要错过了哈
    2013-02-02
  • 深入浅出学习Android ListView基础

    深入浅出学习Android ListView基础

    这篇文章主要介绍了深入浅出的带领大家学习Android ListView基础,ListView是安卓里常用的控件,本文介绍一下常用用法,以及优化等方法,感兴趣的小伙伴们可以参考一下
    2016-01-01
  • Flutter绘图组件之CustomPaint使用详解

    Flutter绘图组件之CustomPaint使用详解

    CustomPaint是Flutter中用于自由绘制的一个widget,它与android原生的绘制规则基本一致,以当前Canves(画布)的左上角为原点进行绘制。本文将详细讲解CustomPaint的使用教程,需要的可以参考一下
    2022-03-03

最新评论