asyncio — 异步 I/O

踩坑 进行总结。tmd。只记录最新版的Python能够使用的协程方式。

对象

Eventloop

Eventloop可以说是asyncio应用的核心,是中央总控。Eventloop实例提供了注册、取消和执行任务和回调的方法。

把一些异步函数(就是任务,Task,一会就会说到)注册到这个事件循环上,事件循环会循环执行这些函数(但同时只能执行一个),当执行到某个函数时,如果它正在等待I/O返回,事件循环会暂停它的执行去执行其他的函数;当某个函数完成I/O后会恢复,下次循环到它的时候继续执行。因此,这些异步函数可以协同(Cooperative)运行:这就是事件循环的目标。

Coroutine

协程(Coroutine)本质上是一个函数,特点是在代码块中可以将执行权交给其他协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
❯ cat coro1.py
import asyncio


async def a():
print('Suspending a')
await asyncio.sleep(0)
print('Resuming a')


async def b():
print('In b')


async def main():
await asyncio.gather(a(), b())


if __name__ == '__main__':
asyncio.run(main())
  1. 协程要用async def声明,Python 3.5时的装饰器写法已经过时,我就不列出来了。
  2. asyncio.gather用来并发运行任务,在这里表示协同的执行a和b2个协程
  3. 在协程a中,有一句await asyncio.sleep(0),await表示调用协程,sleep 0并不会真的sleep(因为时间为0),但是却可以把控制权交出去了。
  4. **非常关键:**只要在await操作的时候。才会将执行权交给其他协程。否则会一直执行该协程。

Future

它代表了一个「未来」对象,异步操作结束后会把最终结果设置到这个Future对象上。Future是对协程的封装,不过日常开发基本是不需要直接用这个底层Future类的。

可以对这个Future实例添加完成后的回调(add_done_callback)、取消任务(cancel)、设置最终结果(set_result)、设置异常(如果有的话,set_exception)等。现在我们让Future完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
In : for a in dir(future):
...: if not a.startswith('_'):
...: print(a)
...:
add_done_callback
cancel
cancelled
done
exception
get_loop
remove_done_callback
result
set_exception
set_result

Task

Eventloop除了支持协程,还支持注册Future和Task2种类型的对象,那为什么要存在Future和Task这2种类型呢?

先回忆前面的例子,Future是协程的封装,Future对象提供了很多任务方法(如完成后的回调、取消、设置任务结果等等),但是开发者并不需要直接操作Future这种底层对象,而是用Future的子类Task协同的调度协程以实现并发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 或者用task = loop.create_task(a())
In : task = asyncio.ensure_future(a())

In : task
Out: <Task pending coro=<a() running at /Users/dongwm/mp/2019-05-22/coro1.py:4>>

In : task.done()
Out: False

In : await task
Suspending a
Resuming a

In : task
Out: <Task finished coro=<a() done, defined at /Users/dongwm/mp/2019-05-22/coro1.py:4> result=None>

In : task.done()
Out: True

asyncio 正确使用

  1. async函数本身不会并发。调用async函数会顺序执行。除非一组并行执行的async函数中有一部分被await协程阻塞了。
  2. 强烈注意。即是await堵塞可以导致进程间切换。但如果await占用的资源加锁,不支持多协程访问,同样会导致,切换到其他线程的时候,堵塞在锁上。

asyncio.create_task()/asyncio.ensure_future()

  • 创建task和future,不执行内容,只创建待执行的task和future。
  • 配合await关键字可以并发执行
1
2
3
4
5
async def c3():
task1 = asyncio.create_task(a())
task2 = asyncio.create_task(b())
await task1
await task2

asyncio.gather()

  • gather多个异步函数可以并发执行
  • asyncio.gather 会创建 2 个子任务,当出现 await 的时候,程序会在这 2 个子任务之间进行调度。
1
2
async def c1():
await asyncio.gather(a(), b())

asyncio.wait()

  • wait()多个异步task或异步函数构成的列表,可以并发执行
1
await asyncio.wait([a(), b()])

asyncio.run()

  • 堵塞执行一个异步任务直到结束
  • 同步执行。不能并发
1
asyncio.run(main())

最贱实践

一个比较好的实践

总结

  1. async只是声明了一个函数是可异步的。
  2. await关键字是核心,所有的await点可以共同调度。await关键字是核心,所有的await点可以共同调度。await 调用相当于 协程调度的切入点。只有async函数可以被await调用。
  3. 本质上是select/epoll机制与java asyncio一样,都是将多个io操作阻塞在同一个进程上,通过事件响应处理。
  4. asyncio.gather() asyncio.run()用于启动多协程。所以异步函数必须有统一的入口。这两个函数是并发的启动点。如果asyncio 只run了一个async函数(即协程)则不存在并发。所以一般都是跟循环结合起来进行并发。
  5. 异步只有用在io操作上才有意义。async函数表示其内部的执行过程中存在io操作。await调用表示允许在调用过程中终端执行。所以调用的起点必然是asyncio.run()运行一组协程(运行时已经决定了启动的协程的数量)。调用的终点必然是一个可以await的io操作。在示例程序中一般是asyncio.sleep()模拟一个可以await的io操作
  6. 一个优势是避免了回调函数。那我们来拆分下,回调函数包括几部分:回调前的同步操作,执行IO操作(可以异步并发),IO操作后执行回调函数(可以异步并发)。那么async await关键字是如何避免回调函数呢?使用await调用前的部分,可以理解为IO操作前的同步操作。使用await完成了一个比较耗时的IO操作。await之后的代码,则相当于回调函数的部分。用于在IO操作完成时进行回调。与直接调用相比,就是await调用执行了一个 IO操作,并且运行在这个点上进行并发调度。

有哪些方式可以实现并发

Python 中执行一个 async 函数(异步函数或协程函数)通常使用以下方法:

  1. asyncio.run: 是 Python 3.7 新增加的高级 API,用于运行最高级别的异步函数入口点。
1
2
3
4
5
6
7
8
import asyncio

async def async_function():
# 异步操作
pass

# 运行异步函数
asyncio.run(async_function())
  1. event_loop.run_until_complete: 这是在 asyncio.run 出现之前的传统方式,你需要手动获取事件循环,然后在事件循环中运行协程。
1
2
3
4
5
6
7
8
9
10
11
12
import asyncio

async def async_function():
# 异步操作
pass

# 获取当前事件循环
loop = asyncio.get_event_loop()
# 在事件循环中执行异步函数
loop.run_until_complete(async_function())
# 关闭事件循环(在使用完后需要关闭,尤其在生产环境中)
loop.close()
  1. asyncio.create_task 或 asyncio.ensure_future: 这些函数可以把一个异步函数封装成一个任务(task),这个任务会被安排在事件循环中运行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio

async def async_function():
# 异步操作
pass

async def main():
task = asyncio.create_task(async_function()) # Python 3.7+
# task = asyncio.ensure_future(async_function()) # 兼容性更强的方法
await task

asyncio.run(main())
在异步上下文中使用 await: 可以在一个异步函数内部使用 await 来运行其他的异步函数。
import asyncio

async def async_function():
# 异步操作
pass

async def main():
# 在此调用异步函数
await async_function()

asyncio.run(main())
  1. 并发运行多个异步函数(使用 async.gather 或 asyncio.wait 等): 当你想同时运行多个异步任务时,可以使用 asyncio.gather() 或 asyncio.wait()。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio

async def async_function_one():
# 异步操作
pass

async def async_function_two():
# 异步操作
pass

async def main():
# 同时运行两个异步函数
await asyncio.gather(
async_function_one(),
async_function_two(),
)

asyncio.run(main())
  1. 使用异步上下文管理器(async with): 如果需要自动管理资源(如打开和关闭连接),可以在 async with 语句中运行异步函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

class AsyncContextManager:
async def __aenter__(self):
# 设置资源
pass
async def __aexit__(self, exc_type, exc, tb):
# 清理资源
pass

async def async_function():
# 异步操作
pass

async def main():
async with AsyncContextManager(): # 可以省略 as var,如果不需要变量的话
await async_function()

asyncio.run(main())

请注意,asyncio.run() 实际上是一个方便的函数,它创建了一个新的事件循环,运行传递给它的协程,然后关闭事件循环。但是,如果你已经在一个异步环境中(比如已经运行在一个事件循环内),你就不能再次使用 asyncio.run()。在那种情况下,你需要使用 await,配合 asyncio.create_task() 或者直接等待异步函数。