RUST异步流处理方法详细讲解

 更新时间:2022年12月16日 09:49:37   作者:上后左爱  
这篇文章主要介绍了RUST异步流处理方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧

Stream 特质

在同步Rust 中流的核心是Iterator 提供了一种在序列中产生项的方法,并在它们之间进行阻塞,通过迭代器传递给其他迭代器

在异步Rust中流的核心Stream, 允许其他任务在当前阻塞等待时允许

Read/Write, AsyncRead/AsyncWrite

fn main() {
    let f = file::create("E:\\foot.txt").await?;
    f.write_all(b"hello world").await?;
    let f = file::open("E:\\foot.txt").await?;
    let mut buffer = Vec::new();
    f.read_to_end(&mut buffer).await?;
}

Stream 经典子流

source: 可以生成数据流

Sink: 可以消费数据流

Through: 消费数据,对其进行操作生成新数据流

Duplex: 流可以生成数据,也可以独立消费数据(AsyncWrite/Read)

asyncread 和 Stream 区别

这两种对byte 进行操作,AsyncRead 只能对byte进行操作(生成未解析数据),Stream对任何类型的数据进行操作(生成解析数据)

使用for_each_concurrent, try_for_each_concurrent 进行并发的处理流,进行流的处理

yield 匿名流

在async 异步过程中使用yield 关键字, 类似于Python 迭代产生时候可以返回,下一次从上一次返回值在进行开始跌打

try_join

如果某个发生错误后会立即返回数据

使用try_join 需要函数返回结果,并且错误的类型,才能正常运行

use futures;
use tokio::runtime::Runtime;
use std::io::Result;
async fn func1() -> Result<()> {
    tokio::time::delay_for(tokio::time::Duration::from_secs(1)).await;
    println!("func1 finished!");
	Ok(())
}
async fn func2() -> Result<()> {
    println!("func2 finished!");
	Ok(())
}
async fn async_main() {
    let f1 = func1();
    let f2 = func2();

    if let Err(_) = futures::try_join!(f1, f2) {
		println!("Err!");
	}
}
fn main() {
    let mut runtime = Runtime::new().unwrap();
    runtime.block_on(async_main());
    println!("Hello, world!");
}

select

使用场景 有三个运行任务 ,只要其中一个完成后立马返回,使用select

在使用select启动使用pin_mut!(f1, f2), 使用select! 进行匹配

use futures::{select, future::FutureExt, pin_mut};
use tokio::runtime::Runtime;
use std::io::Result;
async fn func1() -> Result<()> {
	tokio::time::delay_for(tokio::time::Duration::from_secs(2)).await;
	println!("func1 finished!");
	Ok(())
}
async fn func2() -> Result<()> {
	println!("func2 finished!");
	Ok(())
}
async fn async_main() {
	let f1 = func1().fuse();
	let f2 = func2().fuse();
	pin_mut!(f1, f2);
	// 使用select 进行匹配
	select! {
		_ = f1 => println!("func1 finished++++++!"),
		_ = f2 => println!("func2 finished++++++!"),
	}
}
fn main() {
// 使用tokio的runtime()
	let mut runtime = Runtime::new().unwrap();
	runtime.block_on(async_main());
    println!("Hello, world!");
}

select! y与default/complete 一起联合使用

complete :表示两个都已经就绪,default表示两个都没有就绪

use futures::{future, select, executor};
async fn count() {
	let mut a_fut = future::ready(4);
	let mut b_fut = future::ready(6);
	let mut total = 0;	
	loop {
		select! {
			a = a_fut => total += a,
			b = b_fut => total += b,
			complete => break,   //表示所有的分支都已经完成,并且不会再取得进展的情况
			default => unreachable!(), //表示没有分支完成
		}
	}
	assert_eq!(total, 10);
}
fn main() {
	executor::block_on(count());
    println!("Hello, world!");
}

complete 表示所有分支都已经完成,并且不会取得进展的情况,如上所示,使用loop 第一次b分支准备好,下一次循环可能是a分支,最后两个分支都已经完成后 就break退出

complete 类似让所有分支都完成后直接退出

SELECT宏几个条件

  • select中使用Future必须首先UnPinFuture trait, Fused trait
  • 必须实现UnpinFuture原因在于select! 不是按照值获取,按照引用获取,这样能够在不获取future所有权条件下,未完成的future可以继续使用
  • 必须实现FusedFuture: select 完成后不在轮询future,因此需要实现FusedFuture 跟踪Future是否完成
  • 如果select使用stream,其stream 也是需要实现FusedStream

async 问号使用

如果返回类型有Result<T, E> 结果使用.await?

Send trait

在保证多线程安全时候 需要保证接口实现Send trait 、sync trait 才能保证多线程的安全

Send trait 表示数据能够在线程间安全的发送,sync trait 能够保证线程安全的引用

use std::rc::Rc;
#[derive(Default)]
struct NoSend(Rc<()>);
async fn bar() {}
async fn foo() {
	NoSend::default();
	//{
	//	let x = NoSend::default();
	//	//to do : xxxxx
	//}
	let _ = NoSend::default();
	bar().await;
}
//Send trait:如果所有的子类型都是实现Send trait的,那么它本身也是实现Send Trait的
// 如果内部没有定义 只是使用 是一个Send Trait 主要是在 生成 匿名结构体中 会进行解析
not let x: impl Send Trait
//struct Foo {
//	f: Future,
//}
let x: Not impl Send Trait
//struct Foo {
//	x: NoSend, //not impl Send Trait
//	f: Future, //impl Send Trait
//}
fn required_send(_: impl Send) {}
fn main() {
	required_send(foo());
    println!("Hello, world!");
}

到此这篇关于RUST异步流处理方法详细讲解的文章就介绍到这了,更多相关RUST异步流处理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:

相关文章

  • Rust可迭代类型迭代器正确创建自定义可迭代类型的方法

    Rust可迭代类型迭代器正确创建自定义可迭代类型的方法

    在 Rust 中, 如果一个类型实现了 Iterator, 那么它会被同时实现 IntoIterator, 具体逻辑是返回自身, 因为自身就是迭代器,这篇文章主要介绍了Rust可迭代类型迭代器正确创建自定义可迭代类型的方法,需要的朋友可以参考下
    2023-12-12
  • Rust语言之Prometheus系统监控工具包的使用详解

    Rust语言之Prometheus系统监控工具包的使用详解

    Prometheus 是一个开源的系统监控和警报工具包,最初是由SoundCloud构建的,随着时间的发展,Prometheus已经具有适用于各种使用场景的版本,为了开发者方便开发,更是有各种语言版本的Prometheus的开发工具包,本文主要介绍Rust版本的Prometheus开发工具包
    2023-10-10
  • C++的替代:微软如何使用rust?

    C++的替代:微软如何使用rust?

    这篇文章主要介绍了微软如何使用rust的,帮助大家了解c++和rust这两门编程语言的联系与区别,感兴趣的朋友可以了解下
    2020-09-09
  • 使用vscode配置Rust运行环境全过程

    使用vscode配置Rust运行环境全过程

    VS Code对Rust有着较完备的支持,这篇文章主要给大家介绍了关于使用vscode配置Rust运行环境的相关资料,文中通过图文介绍的非常详细,需要的朋友可以参考下
    2023-06-06
  • 深入了解Rust中引用与借用的用法

    深入了解Rust中引用与借用的用法

    这篇文章主要为大家详细介绍了Rust语言中引用与借用的使用,文中的示例代码讲解详细,具有一定的借鉴价值,需要的小伙伴可以了解一下
    2022-11-11
  • Rust语言之trait中的个方法可以重写吗

    Rust语言之trait中的个方法可以重写吗

    在Rust中,trait定义了一组方法,这些方法可以被一个或多个类型实现,当你为某个类型实现一个trait时,你可以为该trait中的每个方法提供自己的具体实现,本文将给大家介绍一下trait中的个方法是否可以重写,需要的朋友可以参考下
    2023-10-10
  • R语言ggplot2绘图安装与调试

    R语言ggplot2绘图安装与调试

    ggplot2是R语言中最常用的绘图包之一,它提供了一种基于图层的绘图语法,使得用户可以轻松地创建高质量的数据可视化图表。在使用ggplot2之前,需要先安装该包并进行调试。安装ggplot2可以通过CRAN或GitHub进行,调试则需要注意数据格式、语法错误等问题。
    2023-06-06
  • Rust文件 launch.json作用大全

    Rust文件 launch.json作用大全

    launch.json 是 Visual Studio Code(VSCode)中的一个配置文件,主要用于配置调试器,本文给大家介绍Rust文件 launch.json 有什么用,感兴趣的朋友跟随小编一起看看吧
    2024-05-05
  • Rust anyhow 简明示例教程

    Rust anyhow 简明示例教程

    anyhow 是 Rust 中的一个库,旨在提供灵活的、具体的错误处理能力,建立在 std::error::Error 基础上,主要用于那些需要简单错误处理的应用程序和原型开发中,本文给大家分享Rust anyhow 简明教程,一起看看吧
    2024-06-06
  • MacBook Pro安装rust编程环境的过程

    MacBook Pro安装rust编程环境的过程

    rustup是一个用于管理Rust版本和工具链的工具,这篇文章主要介绍了MacBook Pro安装rust编程环境的过程,感兴趣的朋友跟随小编一起看看吧
    2024-02-02

最新评论