asyncio — 异步 I/O
踩坑 进行总结。tmd。只记录最新版的Python能够使用的协程方式。
对象 Eventloop Eventloop可以说是asyncio应用的核心,是中央总控。Eventloop实例提供了注册、取消和执行任务和回调的方法。
把一些异步函数(就是任务,Task,一会就会说到)注册到这个事件循环上,事件循环会循环执行这些函数(但同时只能执行一个),当执行到某个函数时,如果它正在等待I/O返回,事件循环会暂停它的执行去执行其他的函数;当某个函数完成I/O后会恢复,下次循环到它的时候继续执行。因此,这些异步函数可以协同(Cooperative)运行:这就是事件循环的目标。
Coroutine 协程(Coroutine)本质上是一个函数,特点是在代码块中可以将执行权交给其他协程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 ❯ cat coro1.py import asyncio async def a(): print('Suspending a') await asyncio.sleep(0) print('Resuming a') async def b(): print('In b') async def main(): await asyncio.gather(a(), b()) if __name__ == '__main__': asyncio.run(main())
协程要用async def声明,Python 3.5时的装饰器写法已经过时,我就不列出来了。
asyncio.gather用来并发运行任务,在这里表示协同的执行a和b2个协程
在协程a中,有一句await asyncio.sleep(0),await表示调用协程,sleep 0并不会真的sleep(因为时间为0),但是却可以把控制权交出去了。
**非常关键:**只要在await操作的时候。才会将执行权交给其他协程。否则会一直执行该协程。
Future 它代表了一个「未来」对象,异步操作结束后会把最终结果设置到这个Future对象上。Future是对协程的封装,不过日常开发基本是不需要直接用这个底层Future类的。
可以对这个Future实例添加完成后的回调(add_done_callback)、取消任务(cancel)、设置最终结果(set_result)、设置异常(如果有的话,set_exception)等。现在我们让Future完成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 In : for a in dir(future): ...: if not a.startswith('_'): ...: print(a) ...: add_done_callback cancel cancelled done exception get_loop remove_done_callback result set_exception set_result
Task Eventloop除了支持协程,还支持注册Future和Task2种类型的对象,那为什么要存在Future和Task这2种类型呢?
先回忆前面的例子,Future是协程的封装,Future对象提供了很多任务方法(如完成后的回调、取消、设置任务结果等等),但是开发者并不需要直接操作Future这种底层对象,而是用Future的子类Task协同的调度协程以实现并发。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 In : task = asyncio.ensure_future(a()) In : task Out: <Task pending coro=<a() running at /Users/dongwm/mp/2019 -05-22 /coro1.py:4 >> In : task.done() Out: False In : await task Suspending a Resuming a In : task Out: <Task finished coro=<a() done, defined at /Users/dongwm/mp/2019 -05-22 /coro1.py:4 > result=None > In : task.done() Out: True
asyncio 正确使用
async函数本身不会并发。调用async函数会顺序执行。除非一组并行执行的async函数中有一部分被await协程阻塞 了。
强烈注意。即是await堵塞可以导致进程间切换。但如果await占用的资源加锁 ,不支持多协程访问,同样会导致,切换到其他线程的时候,堵塞在锁上。
asyncio.create_task()/asyncio.ensure_future()
创建task和future ,不执行内容,只创建待执行的task和future。
配合await关键字可以并发执行
1 2 3 4 5 async def c3 (): task1 = asyncio.create_task(a()) task2 = asyncio.create_task(b()) await task1 await task2
asyncio.gather()
gather多个异步函数 可以并发执行
asyncio.gather 会创建 2 个子任务,当出现 await 的时候,程序会在这 2 个子任务之间进行调度。
1 2 async def c1(): await asyncio.gather(a(), b())
asyncio.wait()
wait()多个异步task或异步函数 构成的列表 ,可以并发执行
1 await asyncio.wait([a(), b()])
asyncio.run()
最贱实践
一个比较好的实践
总结
async只是声明了一个函数是可异步的。
await关键字是核心,所有的await点可以共同调度。await关键字是核心,所有的await点可以共同调度。await 调用相当于 协程调度的切入点。只有async函数可以被await调用。
本质上是select/epoll机制与java asyncio一样,都是将多个io操作阻塞在同一个进程上,通过事件响应处理。
asyncio.gather() asyncio.run()用于启动多协程。所以异步函数必须有统一的入口。这两个函数是并发的启动点。如果asyncio 只run了一个async函数(即协程)则不存在并发。所以一般都是跟循环结合起来进行并发。
异步只有用在io操作上才有意义。async函数表示其内部的执行过程中存在io操作。await调用表示允许在调用过程中终端执行。所以调用的起点 必然是asyncio.run()运行一组协程(运行时已经决定了启动的协程的数量)。调用的终点 必然是一个可以await的io操作。在示例程序中一般是asyncio.sleep()模拟一个可以await的io操作
一个优势是避免了回调函数。那我们来拆分下,回调函数包括几部分:回调前的同步操作,执行IO操作(可以异步并发),IO操作后执行回调函数(可以异步并发)。那么async await关键字是如何避免回调函数呢?使用await调用前的部分,可以理解为IO操作前的同步操作。使用await完成了一个比较耗时的IO操作。await之后的代码,则相当于回调函数的部分。用于在IO操作完成时进行回调。与直接调用相比,就是await调用执行了一个 IO操作,并且运行在这个点上进行并发调度。
有哪些方式可以实现并发 Python 中执行一个 async 函数(异步函数或协程函数)通常使用以下方法:
asyncio.run: 是 Python 3.7 新增加的高级 API,用于运行最高级别的异步函数入口点。
1 2 3 4 5 6 7 8 import asyncio async def async_function(): # 异步操作 pass # 运行异步函数 asyncio.run(async_function())
event_loop.run_until_complete: 这是在 asyncio.run 出现之前的传统方式,你需要手动获取事件循环,然后在事件循环中运行协程。
1 2 3 4 5 6 7 8 9 10 11 12 import asyncio async def async_function(): # 异步操作 pass # 获取当前事件循环 loop = asyncio.get_event_loop() # 在事件循环中执行异步函数 loop.run_until_complete(async_function()) # 关闭事件循环(在使用完后需要关闭,尤其在生产环境中) loop.close()
asyncio.create_task 或 asyncio.ensure_future: 这些函数可以把一个异步函数封装成一个任务(task),这个任务会被安排在事件循环中运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import asyncio async def async_function(): # 异步操作 pass async def main(): task = asyncio.create_task(async_function()) # Python 3.7+ # task = asyncio.ensure_future(async_function()) # 兼容性更强的方法 await task asyncio.run(main()) 在异步上下文中使用 await: 可以在一个异步函数内部使用 await 来运行其他的异步函数。 import asyncio async def async_function(): # 异步操作 pass async def main(): # 在此调用异步函数 await async_function() asyncio.run(main())
并发运行多个异步函数(使用 async.gather 或 asyncio.wait 等): 当你想同时运行多个异步任务时,可以使用 asyncio.gather() 或 asyncio.wait()。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import asyncio async def async_function_one(): # 异步操作 pass async def async_function_two(): # 异步操作 pass async def main(): # 同时运行两个异步函数 await asyncio.gather( async_function_one(), async_function_two(), ) asyncio.run(main())
使用异步上下文管理器(async with): 如果需要自动管理资源(如打开和关闭连接),可以在 async with 语句中运行异步函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import asyncio class AsyncContextManager: async def __aenter__(self): # 设置资源 pass async def __aexit__(self, exc_type, exc, tb): # 清理资源 pass async def async_function(): # 异步操作 pass async def main(): async with AsyncContextManager(): # 可以省略 as var,如果不需要变量的话 await async_function() asyncio.run(main())
请注意,asyncio.run() 实际上是一个方便的函数,它创建了一个新的事件循环,运行传递给它的协程,然后关闭事件循环。但是,如果你已经在一个异步环境中(比如已经运行在一个事件循环内),你就不能再次使用 asyncio.run()。在那种情况下,你需要使用 await,配合 asyncio.create_task() 或者直接等待异步函数。