Python的异步IO

协程

Python 的异步 I/O 基于协程实现。使用async关键字来创建一个异步函数,对异步函数的调用不会执行该函数,而是生成一个协程对象。
对每一个协程对象,都必须等待其结束(即使是没有启动的协程),否则会产生一个RuntimeWarning

示例 :

1# 创建一个异步函数
2async def say_hello():
3    print("hello world")
4
5# 创建协程
6coro = say_hello()
7print(coro)

运行结果 :

1<coroutine object say_hello at 0x109bf6170>
2sys:1: RuntimeWarning: coroutine 'say_hello' was never awaited

要启动一个协程,有三种方式 :

  • 通过asyncio.run运行一个协程
  • 使用await关键字,这种方法只能在另一个async函数中才能使用
  • 通过asyncio.create_task

await必须在async函数中才能使用,因此无法启动协程的顶层入口点,此时只能使用asyncio.run函数。

await让出当前协程并运行目标协程,当前协程直到目标目标的状态变为done时才会恢复就绪。 如果await的目标不是一个协程(例如Task和Future),让出当前协程后,事件循环(EventLoop)会从就绪队列中选择一个协程运行。

asyncio.create_task让出当前协程并运行目标协程,当前协程不会等待而是加入就绪队列。
只要目标协程让出,当前协程就有机会执行,从而将启动多个协程,实现并发执行。
返回的Task对象也可以在适当的时候使用await等待其结束。

简化的协程状态 :

协程状态

await的示例 :

1import asyncio
2import time
3
4async def say_hello():
5    print("hello", time.strftime('%X'))
6    await asyncio.sleep(1)
7    print("hello", time.strftime('%X'))
8
9async def say_world():
10    print("world", time.strftime('%X'))
11    await asyncio.sleep(1)
12    print("world", time.strftime('%X'))
13
14# 顶层入口点
15async def main():
16    await say_hello() # 启动say_hello()返回的协程,并等待其结束
17    await say_world() # 要等到前一个await结束后,才会启动
18
19# 启动顶层入口点
20asyncio.run(main())

运行结果 :

1hello 15:27:26
2hello 15:27:27
3world 15:27:27
4world 15:27:28

asyncio.create_task的示例 :

1import asyncio
2import time
3
4async def say_hello():
5    print("hello", time.strftime('%X'))
6    await asyncio.sleep(1)
7    print("hello", time.strftime('%X'))
8
9async def say_world():
10    print("world", time.strftime('%X'))
11    await asyncio.sleep(1)
12    print("world", time.strftime('%X'))
13
14# 顶层入口点
15async def main():
16    task_say_hello = asyncio.create_task(say_hello()) # 启动协程不等待
17    task_say_world = asyncio.create_task(say_world()) 
18
19    await task_say_hello
20    await task_say_world
21
22# 启动顶层入口点
23asyncio.run(main())

运行结果 :

1hello 15:29:41
2world 15:29:41
3hello 15:29:42
4world 15:29:42

通过上面两个示例打印的顺序和时间可以看出awaitasyncio.create_task的区别

本来准备介绍一下asyncio中的TCP和UDP接口,但是抄袭官方文档没有意义,而且我懒得写了,下面是一个TCP server的示例,旨在演示如何使用协程并发处理客户请求。

/block的请求处理函数中有一个延时10秒的操作(await asyncio.sleep(delay)),但是因为使用异步操作进行,所有不需要等待它结束就能相应其它请求。

  • await asyncio.sleep(delay)将当前协程让出,运行asyncio.sleep(delay)返回的协程。
  • asyncio.sleep(delay)返回的协程里,会创建一个Future对象,并在EventLoop中注册(EventLoop将在delay秒后将Future对象的状态设为done ),之后await future让出,等待future的状态变为done
  • 由于目标不是协程,EventLoop会从就绪队列中选取一个协程来运行,因此可以对新的请求做出相应。
1import asyncio
2import re
3
4class DemoProtocol(asyncio.Protocol):
5    # 获取url的正则
6    url_re = re.compile(b'GET (.*) HTTP/1.1')
7
8    # 连接创建时的回调函数
9    def connection_made(self, transport):
10        peername = transport.get_extra_info('peername')
11        print('Connection from {}'.format(peername))
12        self.transport = transport
13
14    # 收到数据时的回调函数
15    def data_received(self, data):
16        # 获取url
17        url = DemoProtocol.url_re.match(data).group(1)
18        print("GET", url)
19        # 根据url做不同的处理
20        if url == b"/block" :
21            # 10s后响应
22            asyncio.create_task(self.response_after(b'<h1>Are you block?</h1>', 10))
23        else:
24            asyncio.create_task(self.response(b'<h1>hello world</h1>'))
25
26    # 立刻返回响应
27    async def response(self, content):
28        self.transport.write(b"HTTP/1.1 200 OK\r\n")
29        self.transport.write(b"Content-Type: text/html\r\n")
30        self.transport.write(b"\r\n")
31        self.transport.write(content)
32        self.transport.write(b"\r\n")
33        self.transport.close()
34
35    # 延迟返回响应
36    async def response_after(self, content, delay):
37        await asyncio.sleep(delay)
38        await self.response(content)
39
40
41async def main():
42    # Get a reference to the event loop as we plan to use
43    # low-level APIs.
44    loop = asyncio.get_running_loop()
45
46    server = await loop.create_server(lambda: DemoProtocol(), '127.0.0.1', 8888)
47
48    async with server:
49        await server.serve_forever()
50
51asyncio.run(main())