python异步编程之asyncio低阶API的使用详解

 更新时间:2024年01月08日 08:44:43   作者:金色旭光  
asyncio中低阶API的种类很多,涉及到开发的5个方面,这篇文章主要为大家详细介绍了这些低阶API的具体使用,感兴趣的小伙伴可以学习一下

1.低阶API介绍

asyncio中低阶API的种类很多,涉及到开发的5个方面。包括:

  • 获取事件循环
  • 事件循环方法集
  • 传输
  • 协议
  • 事件循环策略

本篇中只讲解asyncio常见常用的函数,很多底层函数如网络、IPC、套接字、信号等不在本篇范围。

2.获取事件循环

事件循环是异步中重要的概念之一,用于驱动任务的执行。包含的低阶API如下:

函数功能
asyncio.get_running_loop()获取当前运行的事件循环首选函数。
asyncio.get_event_loop()获得一个事件循环实例
asyncio.set_event_loop()将策略设置到事件循环
asyncio.new_event_loop()创建一个新的事件循环

在asyncio初识这篇中提到过事件循环,可以把事件循环当做是一个while循环,在周期性的运行并执行一些任务。这个说法比较抽象,事件循环本质上其实是能调用操作系统IO模型的模块。以Linux系统为例,IO模型有阻塞,非阻塞,IO多路复用等。asyncio 常用的是IO多路复用模型的epool和 kqueue。事件循环原理涉及到异步编程的操作系统原理,后续更新一系列相关文章。

get_event_loop()创建一个事件循环,用于驱动协程的执行

import asyncio

async def demo(i):
    print(f"hello {i}")

def main():
    loop = asyncio.get_event_loop()
    print(loop._selector)
    task = loop.create_task(demo(1))
    loop.run_until_complete(task)

main()

结果:

<selectors.KqueueSelector object at 0x104eabe20>
hello 1

可以通过loop._selector属性获取到当前事件循环使用的是kqueue模型

获取循环

import asyncio

async def demo(i):
    res = asyncio.get_running_loop()
    print(res)
    print(f"hello {i}")


def main():
    loop = asyncio.get_event_loop()
    task = loop.create_task(demo(1))
    loop.run_until_complete(task)
main()

结果:

<_UnixSelectorEventLoop running=True closed=False debug=False>
hello 1

推荐使用asyncio.run 创建事件循环,底层API主要用于库的编写。

3.生命周期

生命周期是用于管理任务的启停的函数,如下:

函数功能
loop.run_until_complete()运行一个期程/任务/可等待对象直到完成。
loop.run_forever()一直运行事件循环,直到被显示停止
loop.stop()停止事件循环
loop.close()关闭事件循环
loop.is_running()返回 True , 如果事件循环正在运行
loop.is_closed()返回 True ,如果事件循环已经被关闭
await loop.shutdown_asyncgens()关闭异步生成器

run_until_complete

运行一个期程/任务/可等待对象直到完成。run_until_complete的参数是一个futrue对象。当传入一个协程,其内部会自动封装成task。run_until_complete()是会自动关闭事件循环的函数,区别于run_forever()是需要手动关闭事件循环的函数。

import asyncio 


async def demo(i):
    print(f"hello {i}")


def main():
    loop = asyncio.get_event_loop()
    
    task = loop.create_task(demo(1))

    # 传入的是一个任务
    loop.run_until_complete(task)

    # 传入的是一个协程也可以
    loop.run_until_complete(demo(20))


main()

结果:

hello 1
hello 20

4.调试

函数功能
loop.set_debug()开启或禁用调试模式
loop.get_debug()获取当前测试模式

5.调度回调函数

在异步编程中回调函数是一种很常见的方法,想要在事件循环中增加一些回调函数,可以有如下方法:

函数功能
loop.call_soon()尽快调用回调。
loop.call_soon_threadsafe()loop.call_soon() 方法线程安全的变体。
loop.call_later()在给定时间之后调用回调函数。
loop.call_at()在指定的时间调用回调函数。

这些回调函数既可以回调普通函数也可以回调协程函数。

call_soon函数原型:

loop.call_soon(callback, *args, context=None)

示例:

import asyncio

async def my_coroutine():
    print("协程被执行")

async def other_coro():
    print("非call_soon调用")

def callback_function():
    print("回调函数被执行")


# 创建一个事件循环
loop = asyncio.get_event_loop()

# 使用create_task包装协程函数,并调度执行
loop.call_soon(loop.create_task, my_coroutine())

# 调度一个常规函数以尽快执行
loop.call_soon(callback_function)

# 启动一个事件循环
task = loop.create_task(other_coro())
loop.run_until_complete(task)

结果:

回调函数被执行
非call_soon调用
协程被执行

结果分析:

call_soon调用普通函数直接传入函数名作为参数,调用协程函数需要讲协程通过loop.create_task封装成task。

6.线程/进程池

函数功能
await loop.run_in_executor()多线程中运行一个阻塞的函数
loop.set_default_executor()设置 loop.run_in_executor() 默认执行器

asyncio.run_in_executor 用于在异步事件循环中执行一个阻塞的函数或方法。它将阻塞的调用委托给一个线程池或进程池,以确保不阻塞主事件循环。可以用于在协程中调用一些不支持异步编程的方法,不支持异步编程的模块。

run_in_executor

import asyncio
import concurrent.futures

def blocking_function():
    # 模拟一个阻塞的操作
    import time
    time.sleep(2)
    return "阻塞函数返回"

async def async_function2():
    print("async_function2 start")
    await asyncio.sleep(1)
    print("async_function2 end")

async def async_function():
    print("异步函数开始执行。。。")

    print("调用同步阻塞函数")
    # 使用run_in_executor调度执行阻塞函数
    result = await loop.run_in_executor(None, blocking_function)

    print(f"获取同步函数的结果: {result}")

# 创建一个事件循环
loop = asyncio.get_event_loop()

# 运行异步函数
loop.run_until_complete(asyncio.gather(async_function(), async_function2()))

结果:

异步函数开始执行。。。
调用同步阻塞函数
async_function2 start
async_function2 end
获取同步函数的结果: 阻塞函数返回

结果分析:

通过事件循环执行任务async_function,在async_function中通过loop.run_in_executor调用同步阻塞函数blocking_function,该阻塞函数没有影响事件循环中另一个任务async_function2的执行。

await loop.run_in_executor(None, blocking_function)中None代表使用的是默认线程池,也可以替换成其他线程池。

使用自定义线程池和进程池

import asyncio
import concurrent.futures

def blocking_function():
    # 模拟一个阻塞的操作
    import time
    time.sleep(2)
    return "阻塞函数返回"

async def async_function():
    print("异步函数开始执行。。。")

    print("调用同步阻塞函数")

    # 线程池
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_function)
        print('线程池调用返回结果:', result)

    # 进程池
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_function)
        print('进程池调用返回结果:', result)

if __name__ == '__main__':
    # 创建一个事件循环
    loop = asyncio.get_event_loop()

    # 运行异步函数
    loop.run_until_complete(async_function())

结果:

异步函数开始执行。。。
调用同步阻塞函数
线程池调用返回结果: 阻塞函数返回
进程池调用返回结果: 阻塞函数返回

结果分析:

通过线程池concurrent.futures.ThreadPoolExecutor()和进程池concurrent.futures.ProcessPoolExecutor()执行阻塞函数。

7.任务与期程

函数功能
loop.create_future()创建一个 Future 对象。
loop.create_task()将协程当作 Task 一样调度。
loop.set_task_factory()设置 loop.create_task() 使用的工厂,它将用来创建 Tasks 。
loop.get_task_factory()获取 loop.create_task() 使用的工厂,它用来创建 Tasks 。

create_futurecreate_future 的功能是创建一个future对象。future对象通常不需要手动创建,因为task会自动管理任务结果。相当于task是全自动,创建future是半自动。创建的future就需要手动的讲future状态设置成完成,才能表示task的状态为完成。

import asyncio


def foo(future, result):
    print(f"此时future的状态:{future}")
    future.set_result(result)
    print(f"此时future的状态:{future}")


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    # 手动创建future对象
    all_done = loop.create_future()

    # 设置一个回调函数用于修改设置future的结果
    loop.call_soon(foo, all_done, "Future is done!")

    result = loop.run_until_complete(all_done)

    print("返回结果", result)
    print("获取future的结果", all_done.result())

结果:

此时future的状态:<Future pending cb=[_run_until_complete_cb() at /Users/lib/python3.10/asyncio/base_events.py:184]>
此时future的状态:<Future finished result='Future is done!'>
返回结果 Future is done!
获取future的结果 Future is done!

结果分析:

future设置结果之后之后,future对象的状态就从pending变成finished状态。如果一个future没有手动设置结果,那么事件循环就不会停止。

create_task将协程封装成一个task对象,事件循环主要操作的是task对象。协程没有状态,而task是有状态的。

import asyncio 


async def demo(i):
    print(f"hello {i}")
    await asyncio.sleep(1)

def main():
    loop = asyncio.get_event_loop()

    # 将携程封装成task,给事件使用
    task = loop.create_task(demo(1))

    loop.run_until_complete(task)

main()
>>> 
hello 1

asyncio.create_task 和 loop.create_task的区别:

两者实现的功能都是一样的,将协程封装成一个task,让协程拥有了生命周期。区别仅仅在于使用的方法。asyncio.create_task 是高阶API,不需要创建事件循环,而loop.create_task需要先创建事件循环再使用该方法。

到此这篇关于python异步编程之asyncio低阶API的使用详解的文章就介绍到这了,更多相关python asyncio低阶API内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • python图形开发GUI库wxpython使用方法详解

    python图形开发GUI库wxpython使用方法详解

    这篇文章主要介绍了python GUI库wxpython使用方法详解,需要的朋友可以参考下
    2020-02-02
  • Python+Pillow+Pytesseract实现验证码识别

    Python+Pillow+Pytesseract实现验证码识别

    这篇文章主要为大家详细介绍了如何利用pillow和pytesseract来实现验证码的识别,文中的示例代码讲解详细,感兴趣的小伙伴可以了解一下
    2022-05-05
  • Python入门教程(三十六)Python的文件写入

    Python入门教程(三十六)Python的文件写入

    这篇文章主要介绍了Python入门教程(三十六)Python的文件写入,open()函数可以打开一个文件供读取或写入,如果这个函数执行成功,会回传文件对象,需要的朋友可以参考下
    2023-05-05
  • python实现决策树分类算法

    python实现决策树分类算法

    这篇文章主要为大家详细介绍了python实现决策树分类算法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-12-12
  • Pandas缺失值填充 df.fillna()的实现

    Pandas缺失值填充 df.fillna()的实现

    本文主要介绍了Pandas缺失值填充 df.fillna()的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-07-07
  • 5个Python使用F-String进行格式化的实用技巧分享

    5个Python使用F-String进行格式化的实用技巧分享

    F-String(格式化字符串字面值)是在Python 3.6中引入的,它是一种非常强大且灵活的字符串格式化方法,本文总结了5个实用的F-String技巧,相信一定能让你的代码输出更加的美观,快跟随小编一起学习起来吧
    2024-03-03
  • Python定时库APScheduler的原理以及用法示例

    Python定时库APScheduler的原理以及用法示例

    APScheduler的全称是Advanced Python Scheduler,它是一个轻量级的 Python 定时任务调度框架,下面这篇文章主要给大家介绍了关于Python定时库APScheduler的原理以及用法的相关资料,需要的朋友可以参考下
    2021-12-12
  • python中的exec()、eval()及complie()示例详解

    python中的exec()、eval()及complie()示例详解

    这篇文章主要介绍了python中的exec()、eval()及complie(),本文通过示例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-08-08
  • 浅谈function(函数)中的动态参数

    浅谈function(函数)中的动态参数

    下面小编就为大家带来一篇浅谈function(函数)中的动态参数。小编觉得听不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-04-04
  • Python匹配中文的正则表达式

    Python匹配中文的正则表达式

    正则表达式是一个特殊的字符序列,它能帮助你方便的检查一个字符串是否与某种模式匹配。接下来通过本文给大家介绍Python匹配中文的正则表达式,感兴趣的朋友一起学习吧
    2016-05-05

最新评论