协程
此笔记记录于《流畅的 python》,大部分为其中的摘要,少部分为笔者自己的理解;笔记为 jupyter 转的 markdown,原始版 jupyter 笔记在这个仓库
字典为动词“to yield”给出了两个释义:产出和让步。对于 Python 生成器中的 yield 来说,这两个含义都成立。yield item 这行代码会产出一个值,提供给next(...)
的调用方;此外,还会作出让步,暂停执行生成器,让调用方继续工作,直到需要使用另一个值时再调用next()
。调用方会从生成器中拉取值。
从句法上看,协程与生成器类似,都是定义体中包含 yield 关键字的函数。可是,在协程中,yield 通常出现在表达式的右边(例如,datum=yield
),可以产出值,也可以不产出——如果 yield 关键字后面没有表达式,那么生成器产出 None。协程可能会从调用方接收数据,不过调用方把数据提供给协程使用的是.send(datum)
方法,而不是next(...)
函数。通常,调用方会把值推送给协程。
yield 关键字甚至还可以不接收或传出数据。不管数据如何流动,yield 都是一种流程控制工具,使用它可以实现协作式多任务:协程可以把控制器让步给中心调度程序,从而激活其他的协程。
一些改动进化:
- 生成器的调用方可以使用
.send(...)
方法发送数据,发送的数据会成为生成器函数中 yield 表达式的值。因此,生成器可以作为协程使用。协程是指一个过程,这个过程与调用方协作,产出由调用方提供的值。 - 现在,生成器可以返回一个值;以前,如果在生成器中给 return 语句提供值,会抛出 SyntaxError 异常。
- 新引入了 yield from 句法,使用它可以把复杂的生成器重构成小型的嵌套生成器,省去了之前把生成器的工作委托给子生成器所需的大量样板代码。
用作协程的生成器的基本行为
def simple_coroutine():
print('-> coroutine started')
x = yield
print('-> coroutine received:', x)
my_coro = simple_coroutine()
my_coro
<generator object simple_coroutine at 0x00000196104F4660>
next(my_coro)
-> coroutine started
my_coro.send(42) # 调用这个方法后,协程定义体中的 yield 表达式会计算出 42;现在,协程会恢复,一直运行到下一个 yield 表达式,或者终止。
-> coroutine received: 42
---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
Cell In[4], line 1
----> 1 my_coro.send(42)
StopIteration:
这里,控制权流动到协程定义体的末尾,导致生成器像往常一样抛出 StopIteration 异常。
注意:send 方法的参数会成为暂停的 yield 表达式的值,所以,仅当协程处于暂停状态时才能调用 send 方法,例如my_coro.send(42)
。不过,如果协程还没激活(即,状态是'GEN_CREATED'),情况就不同了。因此,始终要调用next(my_coro)
激活协程——也可以调用my_coro.send(None)
,效果一样。
# 产出两个值的协程
def simple_coro2(a):
print('-> Started: a =', a)
b = yield a
print('-> Received: b =', b)
c = yield a + b
print('-> Received: c =', c)
my_coro2 = simple_coro2(14)
from inspect import getgeneratorstate
getgeneratorstate(my_coro2)
# GEN_CREATED:等待开始执行;GEN_RUNNING:解释器正在执行;GEN_SUSPENDED:在 yield 表达式处暂停;GEN_CLOSED:执行结束
'GEN_CREATED'
next(my_coro2) # 这里 next(my_coro2)本身值是 14,即 yield 后面 a 的值
-> Started: a = 14
14
getgeneratorstate(my_coro2)
'GEN_SUSPENDED'
my_coro2.send(28) # # 这里 next(my_coro2)本身值是 14+28,即 yield 后面 a+b 的值
-> Received: b = 28
42
my_coro2.send(99)
-> Received: c = 99
---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
Cell In[10], line 1
----> 1 my_coro2.send(99)
StopIteration:
getgeneratorstate(my_coro2)
'GEN_CLOSED'
总的来说,yield 会把后面表达式的值暴露出来,而 send 会把值赋值给 yeild 表达式,到里面继续执行
示例:使用协程计算移动平均值
前面使用了闭包在多次调用之前跟踪了 total 和 count 的值,这里使用协程实现了同样的功能
# 这个无限循环表明,只要调用方不断把值发给这个协程,它就会一直接收值,然后生成结果。
# 仅当调用方在协程上调用.close()方法,或者没有对协程的引用而被垃圾回收程序回收时,这个协程才会终止。
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total/count
coro_avg = averager()
next(coro_avg) # 激活协程
coro_avg.send(10)
10.0
coro_avg.send(30)
20.0
coro_avg.send(5)
15.0
预激活协程的装饰器
from functools import wraps
def coroutine(func):
"""装饰器:向前执行到第一个`yield`表达式,预激`func`"""
@wraps(func) # 这样做可以保持原函数 func 的名称和文档字符串不变。
def primer(*args, **kwargs):
gen = func(*args, **kwargs)
next(gen)
return gen
return primer
# 使用
@coroutine
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total/count
getgeneratorstate(averager()) # 这个协程已经准备好了
'GEN_SUSPENDED'
终止协程和异常处理
协程中未处理的异常会向上冒泡,传给 next 函数或 send 方法的调用方(即触发协程的对象)
coro_avg = averager()
coro_avg.send(40)
40.0
coro_avg.send(50)
45.0
coro_avg.send('spam') # 发送的值不是数字,导致协程内部有异常抛出
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In[22], line 1
----> 1 coro_avg.send('spam')
Cell In[18], line 9, in averager()
7 while True:
8 term = yield average
----> 9 total += term
10 count += 1
11 average = total/count
TypeError: unsupported operand type(s) for +=: 'float' and 'str'
# 由于在协程内没有处理异常,协程会终止。如果试图重新激活协程,会抛出 StopIteration 异常。
coro_avg.send(60)
---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
Cell In[23], line 1
----> 1 coro_avg.send(60)
StopIteration:
上例中暗示了终止协程的一种方式:发送某个哨符值,让协程退出。内置的 None 和 Ellipsis 等常量经常用作哨符值。Ellipsis 的优点是,数据流中不太常有这个值。我还见过有人把 StopIteration 类(类本身,而不是实例,也不抛出)作为哨符值;也就是说,是像这样使用的:my_coro.send(StopIteration)
。
从 Python 2.5 开始,客户代码可以在生成器对象上调用两个方法,显式地把异常发给协程。
generator.throw(exc_type[, exc_value[, traceback]])
:致使生成器在暂停的 yield 表达式处抛出指定的异常。generator.close()
:致使生成器在暂停的 yield 表达式处抛出 GeneratorExit 异常。
class DemoException(Exception):
"""为这次演示定义的异常类型。"""
def demo_exc_handling():
print('-> coroutine started')
while True:
try:
x = yield
except DemoException:
print('*** DemoException handled. Continuing...')
else:
print('-> coroutine received: {!r}'.format(x))
# 最后一行代码不会执行,因为只有未处理的异常才会中止那个无限循环,而一旦出现未处理的异常,协程会立即终止。
raise RuntimeError('This line should never run.')
exc_coro = demo_exc_handling()
next(exc_coro)
-> coroutine started
exc_coro.send(11)
-> coroutine received: 11
exc_coro.send(22)
-> coroutine received: 22
exc_coro.close()
from inspect import getgeneratorstate
getgeneratorstate(exc_coro)
'GEN_CLOSED'
传入异常也不会导致协程终止
exc_coro = demo_exc_handling()
next(exc_coro)
-> coroutine started
exc_coro.send(11)
-> coroutine received: 11
exc_coro.throw(DemoException)
*** DemoException handled. Continuing...
getgeneratorstate(exc_coro)
'GEN_SUSPENDED'
如果无法处理传入的异常,协程会终止
exc_coro = demo_exc_handling()
next(exc_coro)
-> coroutine started
exc_coro.send(11)
-> coroutine received: 11
exc_coro.throw(ZeroDivisionError) # 协程内部没有处理 ZeroDivisionError 异常,导致协程终止
---------------------------------------------------------------------------
ZeroDivisionError Traceback (most recent call last)
Cell In[47], line 1
----> 1 exc_coro.throw(ZeroDivisionError)
Cell In[33], line 9, in demo_exc_handling()
7 while True:
8 try:
----> 9 x = yield
10 except DemoException:
11 print('*** DemoException handled. Continuing...')
ZeroDivisionError:
getgeneratorstate(exc_coro)
'GEN_CLOSED'
# 使用 try/finally 块在协程终止时执行操作
class DemoException(Exception):
"""为这次演示定义的异常类型。"""
def demo_finally():
print('-> coroutine started')
try:
while True:
try:
x = yield
except DemoException:
print('*** DemoException handled. Continuing...')
else:
print('-> coroutine received: {!r}'.format(x))
finally:
print('-> coroutine ending')
让协程返回值
from collections import namedtuple
Result = namedtuple('Result', 'count average')
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break # 为了返回值,协程必须正常终止;因此,这一版 averager 中有个条件判断,以便退出累计循环
total += term
count += 1
average = total/count
return Result(count, average)
coro_avg = averager()
next(coro_avg)
coro_avg.send(10)
coro_avg.send(30)
coro_avg.send(6.5)
coro_avg.send(None)
---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
Cell In[55], line 1
----> 1 coro_avg.send(None)
StopIteration: Result(count=3, average=15.5)
这一版不产出值,但 return 表达式的值会偷偷传给调用方,赋值给 StopIteration 异常的一个属性。这样做有点不合常理,却能保留生成器对象的常规行为——耗尽时抛出 StopIteration 异常。
## 捕获异常,返回值
coro_avg = averager()
next(coro_avg)
coro_avg.send(10)
coro_avg.send(30)
coro_avg.send(6.5)
try:
coro_avg.send(None)
except StopIteration as exc:
result = exc.value
result
Result(count=4, average=14.125)
使用 yield from
ield from 结构会在内部自动捕获 StopIteration 异常。这种处理方式与 for 循环处理 StopIteration 异常的方式一样:循环机制使用用户易于理解的方式处理异常。对 yield from 结构来说,解释器不仅会捕获 StopIteration 异常,还会把 value 属性的值变成 yield from 表达式的值。
def gen():
for c in 'AB':
yield c
for i in range(1, 3):
yield i
# 使用 yield fromj 简化 for 循环
def gen():
yield from 'AB'
yield from range(1, 3)
使用 yield from 链接可迭代的对象
def chain(*iterables):
for it in iterables:
yield from it
s = 'ABC'
t = tuple(range(3))
list(chain(s, t))
['A', 'B', 'C', 0, 1, 2]
yield from x
表达式对 x 对象所做的第一件事是,调用iter(x)
,从中获取迭代器。因此,x 可以是任何可迭代的对象。
可是,如果 yield from 结构唯一的作用是替代产出值的嵌套 for 循环,这个结构很有可能不会添加到 Python 语言中。yield from 结构的本质作用无法通过简单的可迭代对象说明,而要发散思维,使用嵌套的生成器。因此,引入 yield from 结构的 PEP 380 才起了“Syntax for Delegating to a Subgenerator”(“把职责委托给子生成器的句法”)这个标题。
yield from 的主要功能是打开双向通道,把最外层的调用方与最内层的子生成器连接起来,这样二者可以直接发送和产出值,还可以直接传入异常,而不用在位于中间的协程中添加大量处理异常的样板代码。有了这个结构,协程可以通过以前不可能的方式委托职责。
术语:
- 委派生成器:包含
yield from <iterable>
表达式的生成器函数 - 子生成器:从 yield from 表达式中
<iterable>
部分获取的生成器 - 调用方:调用委派生成器的客户端代码
yield from
可以从生成器中获取值,并且可以把值传递给生成器,相当于多了一层
from collections import namedtuple
Result = namedtuple('Result', 'count average')
# 子生成器
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break
total += term
count += 1
average = total/count
return Result(count, average)
# 委派生成器
def grouper(results, key):
while True:
results[key] = yield from averager()
# 客户端代码,即调用方
def main(data):
results = {}
for key, values in data.items():
group = grouper(results, key)
next(group)
for value in values:
group.send(value)
group.send(None) # 如果需要,传入 None,结束协程
# print(results) #如果要调试,去掉注释
report(results)
# 输出报告
def report(results):
for key, result in sorted(results.items()):
group, unit = key.split(';')
print('{:2} {:5} averaging {:.2f}{}'.format(
result.count, group, result.average, unit))
data = {
'girls;kg':
[40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
'girls;m':
[1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
'boys;kg':
[39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
'boys;m':
[1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}
if __name__ == '__main__':
main(data)
9 boys averaging 40.42kg
9 boys averaging 1.39m
10 girls averaging 42.04kg
10 girls averaging 1.43m
yield from 的意义
上例中的行为表现:
- 子生成器产出的值都直接传给委派生成器的调用方(即客户端代码)。
- 使用
send( )
方法发给委派生成器的值都直接传给子生成器。如果发送的值是 None,那么会调用子生成器的__next__( )
方法。如果发送的值不是 None,那么会调用子生成器的send( )
方法。如果调用的方法抛出 StopIteration 异常,那么委派生成器恢复运行。任何其他异常都会向上冒泡,传给委派生成器。 - 生成器退出时,生成器(或子生成器)中的
return expr
表达式会触发StopIteration(expr)
异常抛出。 yield from
表达式的值是子生成器终止时传给 StopIteration 异常的第一个参数。
yield from
结构的另外两个特性与异常和终止有关:
- 传入委派生成器的异常,除了 GeneratorExit 之外都传给子生成器的
throw( )
方法。如果调用throw( )
方法时抛出 StopIteration 异常,委派生成器恢复运行。StopIteration 之外的异常会向上冒泡,传给委派生成器。 - 如果把 GeneratorExit 异常传入委派生成器,或者在委派生成器上调用
close( )
方法,那么在子生成器上调用close( )
方法,如果它有的话。如果调用close( )
方法导致异常抛出,那么异常会向上冒泡,传给委派生成器;否则,委派生成器抛出 GeneratorExit 异常。
略一些例子...