Python异步库asyncio、aiohttp详解

 更新时间:2024年06月27日 10:38:50   作者:IT.BOB  
这篇文章主要介绍了Python异步库asyncio、aiohttp使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

asyncio

版本支持

  • asyncio 模块在 Python3.4 时发布。
  • async 和 await 关键字最早在 Python3.5 中引入。
  • Python3.3 之前不支持。

关键概念

  • event_loop 事件循环:程序开启一个无限的循环,程序员会把一些函数(协程)注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。
  • coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
  • future 对象: 代表将来执行或没有执行的任务的结果。它和task上没有本质的区别
  • task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。Task 对象是 Future 的子类,它将 coroutine 和 Future 联系在一起,将 coroutine 封装成一个 Future 对象。
  • async/await 关键字:python3.5 用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。其作用在一定程度上类似于yield。

工作流程

  • 定义/创建协程对象
  • 将协程转为task任务
  • 定义事件循环对象容器
  • 将task任务放到事件循环对象中触发
import asyncio

async def hello(name):
    print('Hello,', name)

# 定义协程对象
coroutine = hello("World")

# 定义事件循环对象容器
loop = asyncio.get_event_loop()

# 将协程转为task任务
# task = asyncio.ensure_future(coroutine)
task = loop.create_task(coroutine)

# 将task任务扔进事件循环对象中并触发
loop.run_until_complete(task)

并发

1. 创建多个协程的列表 tasks:

import asyncio


async def do_some_work(x):
    print('Waiting: ', x)
    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)

tasks = [do_some_work(1), do_some_work(2), do_some_work(4)]

2. 将协程注册到事件循环中:

  • 方法一:使用 asyncio.wait()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
  • 方法二:使用 asyncio.gather()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))

3. 查看 return 结果:

for task in tasks:
    print('Task ret: ', task.result())

4. asyncio.wait()asyncio.gather() 的区别:

接收参数不同:

  • asyncio.wait():必须是一个 list 对象,list 对象里存放多个 task 任务。
# 使用 asyncio.ensure_future 转换为 task 对象
tasks=[
       asyncio.ensure_future(factorial("A", 2)),
       asyncio.ensure_future(factorial("B", 3)),
       asyncio.ensure_future(factorial("C", 4))
]

# 也可以不转为 task 对象
# tasks=[
#        factorial("A", 2),
#        factorial("B", 3),
#        factorial("C", 4)
# ]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
  • asyncio.gather():比较广泛,注意接收 list 对象时 * 不能省略。
tasks=[
       asyncio.ensure_future(factorial("A", 2)),
       asyncio.ensure_future(factorial("B", 3)),
       asyncio.ensure_future(factorial("C", 4))
]

# tasks=[
#        factorial("A", 2),
#        factorial("B", 3),
#        factorial("C", 4)
# ]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
loop = asyncio.get_event_loop()

group1 = asyncio.gather(*[factorial("A" ,i) for i in range(1, 3)])
group2 = asyncio.gather(*[factorial("B", i) for i in range(1, 5)])
group3 = asyncio.gather(*[factorial("B", i) for i in range(1, 7)])

loop.run_until_complete(asyncio.gather(group1, group2, group3))

返回结果不同:

  • asyncio.wait():返回 dones(已完成任务) 和 pendings(未完成任务)
dones, pendings = await asyncio.wait(tasks)

for task in dones:
    print('Task ret: ', task.result())
  • asyncio.gather():直接返回结果
results = await asyncio.gather(*tasks)

for result in results:
    print('Task ret: ', result)

aiohttp

ClientSession 会话管理

import aiohttp
import asyncio


async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://httpbin.org/get') as resp:
            print(resp.status)
            print(await resp.text())

asyncio.run(main())

其他请求:

session.post('http://httpbin.org/post', data=b'data')
session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')

URL 参数传递

async def main():
    async with aiohttp.ClientSession() as session:
        params = {'key1': 'value1', 'key2': 'value2'}
        async with session.get('http://httpbin.org/get', params=params) as r:
            expect = 'http://httpbin.org/get?key1=value1&key2=value2'
            assert str(r.url) == expect
async def main():
    async with aiohttp.ClientSession() as session:
        params = [('key', 'value1'), ('key', 'value2')]
        async with session.get('http://httpbin.org/get', params=params) as r:
            expect = 'http://httpbin.org/get?key=value2&key=value1'
            assert str(r.url) == expect

获取响应内容

async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://httpbin.org/get') as r:
            # 状态码
            print(r.status)
            # 响应内容,可以自定义编码
            print(await r.text(encoding='utf-8'))
            # 非文本内容
            print(await r.read())
            # JSON 内容
            print(await r.json())

自定义请求头

headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36"
    }


async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://httpbin.org/get', headers=headers) as r:
            print(r.status)

为所有会话设置请求头:

headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36"
    }


async def main():
    async with aiohttp.ClientSession(headers=headers) as session:
        async with session.get('http://httpbin.org/get') as r:
            print(r.status)

自定义 cookies

async def main():
    cookies = {'cookies_are': 'working'}
    async with aiohttp.ClientSession() as session:
        async with session.get('http://httpbin.org/cookies', cookies=cookies) as resp:
            assert await resp.json() == {"cookies": {"cookies_are": "working"}}

为所有会话设置 cookies:

async def main():
    cookies = {'cookies_are': 'working'}
    async with aiohttp.ClientSession(cookies=cookies) as session:
        async with session.get('http://httpbin.org/cookies') as resp:
            assert await resp.json() == {"cookies": {"cookies_are": "working"}}

设置代理

注意:只支持 http 代理。

async def main():
    async with aiohttp.ClientSession() as session:
        proxy = "http://127.0.0.1:1080"
        async with session.get("http://python.org", proxy=proxy) as r:
            print(r.status)

需要用户名密码授权的代理:

async def main():
    async with aiohttp.ClientSession() as session:
        proxy = "http://127.0.0.1:1080"
        proxy_auth = aiohttp.BasicAuth('username', 'password')
        async with session.get("http://python.org", proxy=proxy, proxy_auth=proxy_auth) as r:
            print(r.status)

也可以直接传递:

async def main():
    async with aiohttp.ClientSession() as session:
        proxy = "http://username:password@127.0.0.1:1080"
        async with session.get("http://python.org", proxy=proxy) as r:
            print(r.status)

异步爬虫示例

import asyncio
import aiohttp

from lxml import etree
from datetime import datetime

headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36"}


async def get_movie_url():
    req_url = "https://movie.douban.com/chart"
    async with aiohttp.ClientSession() as session:
        async with session.get(url=req_url, headers=headers) as response:
            result = await response.text()
            result = etree.HTML(result)
        return result.xpath("//*[@id='content']/div/div[1]/div/div/table/tr/td/a/@href")


async def get_movie_content(movie_url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url=movie_url, headers=headers) as response:
            result = await response.text()
            result = etree.HTML(result)
        movie = dict()
        name = result.xpath('//*[@id="content"]/h1/span[1]//text()')
        author = result.xpath('//*[@id="info"]/span[1]/span[2]//text()')
        movie["name"] = name
        movie["author"] = author
    return movie


def run():
    start = datetime.now()
    loop = asyncio.get_event_loop()
    movie_url_list = loop.run_until_complete(get_movie_url())
    tasks = [get_movie_content(url) for url in movie_url_list]
    movies = loop.run_until_complete(asyncio.gather(*tasks))
    print(movies)
    print("异步用时为:{}".format(datetime.now() - start))


if __name__ == '__main__':
    run()

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Python操作PDF实现制作数据报告

    Python操作PDF实现制作数据报告

    Python操作PDF的库有很多,比如PyPDF2、pdfplumber、PyMuPDF等等。本文将利用FPDF模块操作PDF实现制作数据报告,感兴趣的小伙伴可以尝试一下
    2022-12-12
  • Python魔术方法详解

    Python魔术方法详解

    这篇文章主要介绍了Python魔术方法详解,本文讲解了构造和初始化、用于比较的魔术方法、数值处理的魔术方法、普通算数操作符等内容,需要的朋友可以参考下
    2015-02-02
  • Python实现远程调用MetaSploit的方法

    Python实现远程调用MetaSploit的方法

    这篇文章主要介绍了Python实现远程调用MetaSploit的方法,是很有借鉴价值的一个技巧,需要的朋友可以参考下
    2014-08-08
  • Python GUI Tkinter简单实现个性签名设计

    Python GUI Tkinter简单实现个性签名设计

    这篇文章主要为大家详细介绍了Python GUI Tkinter简单实现个性签名设计,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-06-06
  • 关于pandas.date_range()的用法及说明

    关于pandas.date_range()的用法及说明

    这篇文章主要介绍了关于pandas.date_range()的用法及说明,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-07-07
  • python关于倒排列的知识点总结

    python关于倒排列的知识点总结

    在本篇文章里小编给大家分享的是一篇关于python关于倒排列的知识点总结,有需要的朋友们可以参考下。
    2020-10-10
  • 解决json中ensure_ascii=False的问题

    解决json中ensure_ascii=False的问题

    这篇文章主要介绍了解决json中ensure_ascii=False的问题,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-04-04
  • Python绘制折线图可视化神器pyecharts案例

    Python绘制折线图可视化神器pyecharts案例

    这篇文章主要介绍了Python绘制折线图可视化神器pyecharts,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下
    2022-07-07
  • python基于搜索引擎实现文章查重功能

    python基于搜索引擎实现文章查重功能

    这篇文章主要介绍了python基于搜索引擎实现文章查重功能,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-05-05
  • 利用Pygame绘制圆环的示例代码

    利用Pygame绘制圆环的示例代码

    这篇文章主要介绍了利用Python中的Pygame模块绘制一个彩色的圆环,文中的示例代码讲解详细,对我们学习Pygame有一定帮助,需要的可以参考一下
    2022-01-01

最新评论