asyncio源码浅析(一)- 事件循环(EventLoop)

前言

这一篇笔记会主要从 asyncio 标准库源码的角度,来熟悉 Python 3.7 所提供的异步编程API。同时,尝试着理解和灵活运用 asyncio 所提供的功能。由于涉及到不同平台的实现不同,而我们使用最多的是Linux平台。因此,梳理过程中主要以Linux平台相关实现为主。并且,本篇以理解流程为重点,不会对每一行代码进行分析,相关细节可以在理解流程后再做深入。

概念

I/O模型

说到 异步编程 或者 asyncio ,首先需要对几种I/O模型有些基本的概念。这个在网上已经有了很多的博文,这里就不再赘述。在《Unix网络编程 Vol.1》和《The Linux Programming Interface》中,对I/O模型的特点和相关的系统调用也有详尽的介绍。5类I/O模型如下:

  • 阻塞I/O(blocking I/O)
  • 非阻塞I/O(nonblocking I/O)
  • I/O多路复用(I/O multiplexing)
  • 信号驱动I/O(signal driven I/O)
  • 异步I/O(asynchronous I/O)

使用asyncio 进行编程,应用的是I/O多路复用模型,即在单个进程(或线程)中,同时监听多个描述符,实现对进程(或线程)的复用。另外,I/O多路复用模型与非阻塞I/O模型并不是互斥的关系,事实上,它正是基于非阻塞I/O才得以实现的。

通常,我们会更多地在网络编程中用到,而且很多示例都是使用 socket 来介绍的(比如下文的 selectors),这让我产生了一种错觉:I/O多路复用模型是不是只能应用在网络编程或者socket编程中呢? 答案是否定的。事实上,通过学习与I/O多路复用相关的系统调用,可以了解到,devices(terminals ans pseudoterminals)、pipes、FIFOs类型,同样支持非阻塞模式。那么自然,I/O多路复用模型也可以用于这些类型的文件描述符监听。

selectors 简介

在总结 asyncio 之前,有必要先了解一下selectors标准库。selectors是在 Python 3.4 版本加入到Python标准库中,旨在提供高效的、high-level的I/O多路复用支持。它是依赖于更 low-level 的select模块实现的。

selectors中的类层级关系如下:

1
2
3
4
5
6
BaseSelector
+-- SelectSelector
+-- PollSelector
+-- EpollSelector
+-- DevpollSelector
+-- KqueueSelector

这其中不同的 “Selector” ,是基于不同平台的实现。当然,selectors 中提供了方便的 DefaultSelector 类,可以根据当前平台自动选择合适的 Selector 类,也是推荐大部分用户直接使用的类。

下面是官方文档中提供的标准示例,理解这个例子非常有助于进一步理解asyncio的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import selectors
import socket

sel = selectors.DefaultSelector() # <1> 示例化Selector,根据平台进行选择

def accept(sock, mask):
conn, addr = sock.accept() # Should be ready <7> 获得客户端连接的描述符
print('accepted', conn, 'from', addr)
conn.setblocking(False) # <8> 设置为非阻塞模式
sel.register(conn, selectors.EVENT_READ, read) # <9> 绑定事件和回调函数,将描述符注册到selector上

def read(conn, mask):
data = conn.recv(1000) # Should be ready
if data:
print('echoing', repr(data), 'to', conn)
conn.send(data) # Hope it won't block
else:
print('closing', conn)
sel.unregister(conn)
conn.close()

sock = socket.socket() # <2> 获得socket实例
sock.bind(('localhost', 1234)) # <3> 服务端,绑定地址,监听链接
sock.listen(100)
sock.setblocking(False) # <4> 设置为非阻塞模式
sel.register(sock, selectors.EVENT_READ, accept) # <5> 将socket实例和回调函数,绑定事件,注册到selector中

while True: # <6> 开启事件循环,在每一轮循环中阻塞在select调用处,等待事件触发返回可用描述符
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask) # 调用回调函数

根据这个简单的例子,其中有几个关键的对象和步骤:

对象:

  • 文件描述符(file descriptor)- conn sock
  • 回调函数(callback) - accept() read()
  • 事件循环 (event loop) - while: ...

过程:

  • 设置非阻塞模式 - fd.setblocking(False)
  • 注册描述符 - sel.register(fd, event, callback)

这是一个最简单的I/O多路复用模式的雏形。在分析 asyncio 源码的过程中,始终记住这个模板,并且尝试的把不同角色对应起来,对理解asyncio复杂的调度会非常有帮助。那么接下来就进入正题。

asyncio 源码浅析

目录结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
asyncio
+ —— __init__.py
+ —— base_events.py
+ —— base_futures.py
+ —— base_subprocess.py
+ —— base_tasks.py
+ —— constants.py
+ —— coroutines.py
+ —— events.py
+ —— format_helpers.py
+ —— futures.py
+ —— locks.py
+ —— log.py
+ —— proactor_events.py
+ —— protocols.py
+ —— queues.py
+ —— runners.py
+ —— selector_events.py
+ —— sslproto.py
+ —— streams.py
+ —— subprocess.py
+ —— tasks.py
+ —— transports.py
+ —— unix_events.py
+ —— windows_events.py
+ —— windows_utils.py

为了快速的了解 asyncio 的基本工作模式,这里选择暂时忽略一些内容,只提取其中涉及到调度的核心部分来进行分析。

那么,挑挑拣拣,剩下一些主要模块:

1
2
3
4
5
6
7
8
9
10
11
12
13
asyncio
+ —— __init__.py
+ —— base_events.py
+ —— base_futures.py
+ —— base_tasks.py
+ —— events.py
+ —— futures.py
+ —— proactor_events.py
+ —— runners.py
+ —— selector_events.py
+ —— tasks.py
+ —— unix_events.py
+ —— windows_events.py

核心类

EventLoop

相关文件: events.py, base_events.py, selector_events.py, windows_events.py, proactor_events.py

由上面 selectors 的示例可以看出,最终的 while 循环是使得整个逻辑运行起来的核心。在asyncio 中,事件循环被抽象为一系列的类,它负责了异步任务、回调函数的执行,进行网络I/O操作和子进程的运行等,是基于asyncio的应用的核心。根据不同平台,有不同的子类实现,继承关系如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
AbstractEventLoop
|
+ —— BaseEventLoop
|
+ —— BaseSelectorEventLoop
| |
| + —— _UnixSelectorEventLoop
| |
| + —— _WindowsSelectorEventLoop
|
+ —— BaseProactorEventLoop
|
+ —— ProactorEventLoop

首先,对于我们最常用的 loop.run_until_complete()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# base_events.py
class BaseEventLoop(events.AbstractEventLoop):

...

def run_until_complete(self, future):
self._check_closed()

new_task = not futures.isfuture(future)
future = tasks.ensure_future(future, loop=self)
if new_task:
future._log_destroy_pending = False

future.add_done_callback(_run_until_complete_cb)
try:
self.run_forever()
except:
if new_task and future.done() and not future.cancelled():
future.exception()
raise
finally:
future.remove_done_callback(_run_until_complete_cb)
if not future.done():
raise RuntimeError('Event loop stopped before Future completed.')

return future.result()

可以看到,run_until_complete()方法其实是通过 run_forever()方法和 _run_until_complete_cb() 这个回调函数来实现运行完成后终止的。
回调函数通过 futurn.add_done_callback() 来进行添加。

1
2
3
4
5
6
7
8
9
# base_events.py
def _run_until_complete_cb(fut):
if not fut.cancelled():
exc = fut.exception()
if isinstance(exc, BaseException) and not isinstance(exc, Exception):
# Issue #22429: run_forever() already finished, no need to
# stop it.
return
futures._get_loop(fut).stop()

这个回调函数,在 fut 执行结束后,最关键的一步,获取到它的事件循环,并调用 stop() 方法将其停止。

run_forever() 的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# base_events.py
class BaseEventLoop(events.AbstractEventLoop):
...

def run_forever(self):
"""Run until stop() is called."""
self._check_closed()
if self.is_running():
raise RuntimeError('This event loop is already running')
if events._get_running_loop() is not None:
raise RuntimeError(
'Cannot run the event loop while another loop is running')
self._set_coroutine_origin_tracking(self._debug)
self._thread_id = threading.get_ident()

old_agen_hooks = sys.get_asyncgen_hooks()
sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
finalizer=self._asyncgen_finalizer_hook)
try:
events._set_running_loop(self)
while True:
self._run_once()
if self._stopping:
break
finally:
self._stopping = False
self._thread_id = None
events._set_running_loop(None)
self._set_coroutine_origin_tracking(False)
sys.set_asyncgen_hooks(*old_agen_hooks)

回想开篇 selectors 的示例,在这里首先对应上了第一个关键结构:事件循环 while ...

显然,如果我们调用 run_forever()方法,事件循环将会永远的运行下去,除非强制中断。在实际的应用中,我们就不能像使用 run_until_complete() 时,还将一部分代码功能留在“外部”,我们应该预先定义基于任务与回调的代码逻辑,让事件循环来完成调用执行工作,在运行代码的最后一步,调用run_forever()启动事件循环,使程序持续运转起来。

在这个核心的循环中,可以看到,当 _stopping 被设置成 False 时,循环立即退出;否则,每一轮循环,调用一次 _run_once()

1
2
3
4
while True:
self._run_once()
if self._stopping:
break

接下来进入到 _run_once() 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
class BaseEventLoop(events.AbstractEventLoop):

...

def _run_once(self):
"""Run one full iteration of the event loop.

This calls all currently ready callbacks, polls for I/O,
schedules the resulting callbacks, and finally schedules
'call_later' callbacks.
"""

sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count >
_MIN_CANCELLED_TIMER_HANDLES_FRACTION):
# Remove delayed calls that were cancelled if their number
# is too high
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)

heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
# Remove delayed calls that were cancelled from head of queue.
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False

timeout = None
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
# Compute the desired timeout.
when = self._scheduled[0]._when
timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)

if self._debug and timeout != 0:
t0 = self.time()
event_list = self._selector.select(timeout)
dt = self.time() - t0
if dt >= 1.0:
level = logging.INFO
else:
level = logging.DEBUG
nevent = len(event_list)
if timeout is None:
logger.log(level, 'poll took %.3f ms: %s events',
dt * 1e3, nevent)
elif nevent:
logger.log(level,
'poll %.3f ms took %.3f ms: %s events',
timeout * 1e3, dt * 1e3, nevent)
elif dt >= 1.0:
logger.log(level,
'poll %.3f ms took %.3f ms: timeout',
timeout * 1e3, dt * 1e3)
else:
event_list = self._selector.select(timeout)
self._process_events(event_list)

# Handle 'later' callbacks that are ready.
end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)

# This is the only place where callbacks are actually *called*.
# All other places just add them to ready.
# Note: We run all currently scheduled callbacks, but not any
# callbacks scheduled by callbacks run this time around --
# they will be run the next time (after another I/O poll).
# Use an idiom that is thread-safe without using locks.
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
if self._debug:
try:
self._current_handle = handle
t0 = self.time()
handle._run()
dt = self.time() - t0
if dt >= self.slow_callback_duration:
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt)
finally:
self._current_handle = None
else:
handle._run()
handle = None

从上到下看了一遍,有几个问题需要继续思考一下:

  1. 前面都没有遇到 self._scheduled, 为什么这里看上去需要 self._scheduled 里面有值才行? self_scheduled 到底是在什么位置被添加进了值呢?
  2. self._selector.select(timeout) 在这里直接等待事件触发返回描述符,并且后续调用 self._process_events(event_list) 处理获取到的描述符。那么,这些描述符,不论是Socket描述符,还是其他类型的描述符,又是在什么地方被注册到 self._selector 中的呢?

下一篇笔记,我们从一个实例出发,开始寻找这两个问题的答案。