Skip to content

协程

此笔记记录于《流畅的python》,大部分为其中的摘要,少部分为笔者自己的理解;笔记为jupyter转的markdown,原始版jupyter笔记在这个仓库

字典为动词“to yield”给出了两个释义:产出和让步。对于Python生成器中的yield来说,这两个含义都成立。yield item这行代码会产出一个值,提供给next(...)的调用方;此外,还会作出让步,暂停执行生成器,让调用方继续工作,直到需要使用另一个值时再调用next()。调用方会从生成器中拉取值。

从句法上看,协程与生成器类似,都是定义体中包含yield关键字的函数。可是,在协程中,yield通常出现在表达式的右边(例如,datum=yield),可以产出值,也可以不产出——如果yield关键字后面没有表达式,那么生成器产出None。协程可能会从调用方接收数据,不过调用方把数据提供给协程使用的是.send(datum)方法,而不是next(...)函数。通常,调用方会把值推送给协程。

yield关键字甚至还可以不接收或传出数据。不管数据如何流动,yield都是一种流程控制工具,使用它可以实现协作式多任务:协程可以把控制器让步给中心调度程序,从而激活其他的协程。

一些改动进化:

  • 生成器的调用方可以使用.send(...)方法发送数据,发送的数据会成为生成器函数中yield表达式的值。因此,生成器可以作为协程使用。协程是指一个过程,这个过程与调用方协作,产出由调用方提供的值。
  • 现在,生成器可以返回一个值;以前,如果在生成器中给return语句提供值,会抛出SyntaxError异常。
  • 新引入了yield from句法,使用它可以把复杂的生成器重构成小型的嵌套生成器,省去了之前把生成器的工作委托给子生成器所需的大量样板代码。

用作协程的生成器的基本行为

python
def simple_coroutine():
    print('-> coroutine started')
    x = yield
    print('-> coroutine received:', x)
python
my_coro = simple_coroutine()
my_coro
<generator object simple_coroutine at 0x00000196104F4660>
python
next(my_coro)
-> coroutine started
python
my_coro.send(42) # 调用这个方法后,协程定义体中的yield表达式会计算出42;现在,协程会恢复,一直运行到下一个yield表达式,或者终止。
-> coroutine received: 42



---------------------------------------------------------------------------

StopIteration                             Traceback (most recent call last)

Cell In[4], line 1
----> 1 my_coro.send(42)


StopIteration: 

这里,控制权流动到协程定义体的末尾,导致生成器像往常一样抛出StopIteration异常。

注意:send方法的参数会成为暂停的yield表达式的值,所以,仅当协程处于暂停状态时才能调用send方法,例如my_coro.send(42)。不过,如果协程还没激活(即,状态是'GEN_CREATED'),情况就不同了。因此,始终要调用next(my_coro)激活协程——也可以调用my_coro.send(None),效果一样。

python
# 产出两个值的协程
def simple_coro2(a):
    print('-> Started: a =', a)
    b = yield a
    print('-> Received: b =', b)
    c = yield a + b
    print('-> Received: c =', c)
python
my_coro2 = simple_coro2(14)
from inspect import getgeneratorstate
getgeneratorstate(my_coro2) 
# GEN_CREATED:等待开始执行;GEN_RUNNING:解释器正在执行;GEN_SUSPENDED:在yield表达式处暂停;GEN_CLOSED:执行结束
'GEN_CREATED'
python
next(my_coro2) # 这里next(my_coro2)本身值是14,即yield后面a的值
-> Started: a = 14





14
python
getgeneratorstate(my_coro2)
'GEN_SUSPENDED'
python
my_coro2.send(28) # # 这里next(my_coro2)本身值是14+28,即yield后面a+b的值
-> Received: b = 28





42
python
my_coro2.send(99)
-> Received: c = 99



---------------------------------------------------------------------------

StopIteration                             Traceback (most recent call last)

Cell In[10], line 1
----> 1 my_coro2.send(99)


StopIteration: 
python
getgeneratorstate(my_coro2)
'GEN_CLOSED'

总的来说,yield会把后面表达式的值暴露出来,而send会把值赋值给yeild表达式,到里面继续执行

示例:使用协程计算移动平均值

前面使用了闭包在多次调用之前跟踪了total和count的值,这里使用协程实现了同样的功能

python
# 这个无限循环表明,只要调用方不断把值发给这个协程,它就会一直接收值,然后生成结果。
# 仅当调用方在协程上调用.close()方法,或者没有对协程的引用而被垃圾回收程序回收时,这个协程才会终止。
def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield average
        total += term
        count += 1
        average = total/count
python
coro_avg = averager()
next(coro_avg) # 激活协程
python
coro_avg.send(10)
10.0
python
coro_avg.send(30)
20.0
python
coro_avg.send(5)
15.0

预激活协程的装饰器

python
from functools import wraps


def coroutine(func):
    """装饰器:向前执行到第一个`yield`表达式,预激`func`"""
    @wraps(func) # 这样做可以保持原函数func的名称和文档字符串不变。
    def primer(*args, **kwargs):
        gen = func(*args, **kwargs)
        next(gen)
        return gen
    return primer
python
# 使用
@coroutine
def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield average
        total += term
        count += 1
        average = total/count
python
getgeneratorstate(averager()) # 这个协程已经准备好了
'GEN_SUSPENDED'

终止协程和异常处理

协程中未处理的异常会向上冒泡,传给next函数或send方法的调用方(即触发协程的对象)

python
coro_avg = averager()
coro_avg.send(40)
40.0
python
coro_avg.send(50)
45.0
python
coro_avg.send('spam') # 发送的值不是数字,导致协程内部有异常抛出
---------------------------------------------------------------------------

TypeError                                 Traceback (most recent call last)

Cell In[22], line 1
----> 1 coro_avg.send('spam')


Cell In[18], line 9, in averager()
      7 while True:
      8     term = yield average
----> 9     total += term
     10     count += 1
     11     average = total/count


TypeError: unsupported operand type(s) for +=: 'float' and 'str'
python
# 由于在协程内没有处理异常,协程会终止。如果试图重新激活协程,会抛出StopIteration异常。
coro_avg.send(60)
---------------------------------------------------------------------------

StopIteration                             Traceback (most recent call last)

Cell In[23], line 1
----> 1 coro_avg.send(60)


StopIteration: 

上例中暗示了终止协程的一种方式:发送某个哨符值,让协程退出。内置的None和Ellipsis等常量经常用作哨符值。Ellipsis的优点是,数据流中不太常有这个值。我还见过有人把StopIteration类(类本身,而不是实例,也不抛出)作为哨符值;也就是说,是像这样使用的:my_coro.send(StopIteration)

从Python 2.5开始,客户代码可以在生成器对象上调用两个方法,显式地把异常发给协程。

  • generator.throw(exc_type[, exc_value[, traceback]]):致使生成器在暂停的yield表达式处抛出指定的异常。
  • generator.close():致使生成器在暂停的yield表达式处抛出GeneratorExit异常。
python
class DemoException(Exception):
    """为这次演示定义的异常类型。"""


def demo_exc_handling():
    print('-> coroutine started')
    while True:
        try:
            x = yield
        except DemoException:
            print('*** DemoException handled. Continuing...')
        else:
            print('-> coroutine received: {!r}'.format(x))
    # 最后一行代码不会执行,因为只有未处理的异常才会中止那个无限循环,而一旦出现未处理的异常,协程会立即终止。
    raise RuntimeError('This line should never run.')
python
exc_coro = demo_exc_handling()
next(exc_coro)
-> coroutine started
python
exc_coro.send(11)
-> coroutine received: 11
python
exc_coro.send(22)
-> coroutine received: 22
python
exc_coro.close()
python
from inspect import getgeneratorstate
getgeneratorstate(exc_coro)
'GEN_CLOSED'

传入异常也不会导致协程终止

python
exc_coro = demo_exc_handling()
next(exc_coro)
-> coroutine started
python
exc_coro.send(11)
-> coroutine received: 11
python
exc_coro.throw(DemoException)
*** DemoException handled. Continuing...
python
getgeneratorstate(exc_coro)
'GEN_SUSPENDED'

如果无法处理传入的异常,协程会终止

python
exc_coro = demo_exc_handling()
next(exc_coro)
-> coroutine started
python
exc_coro.send(11)
-> coroutine received: 11
python
exc_coro.throw(ZeroDivisionError) # 协程内部没有处理ZeroDivisionError异常,导致协程终止
---------------------------------------------------------------------------

ZeroDivisionError                         Traceback (most recent call last)

Cell In[47], line 1
----> 1 exc_coro.throw(ZeroDivisionError)


Cell In[33], line 9, in demo_exc_handling()
      7 while True:
      8     try:
----> 9         x = yield
     10     except DemoException:
     11         print('*** DemoException handled. Continuing...')


ZeroDivisionError: 
python
getgeneratorstate(exc_coro)
'GEN_CLOSED'
python
# 使用try/finally块在协程终止时执行操作
class DemoException(Exception):
    """为这次演示定义的异常类型。"""


def demo_finally():
    print('-> coroutine started')
    try:
        while True:
            try:
                x = yield
            except DemoException:
                print('*** DemoException handled. Continuing...')
            else:
                print('-> coroutine received: {!r}'.format(x))
    finally:
        print('-> coroutine ending')

让协程返回值

python
from collections import namedtuple
Result = namedtuple('Result', 'count average')


def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield
        if term is None:
            break # 为了返回值,协程必须正常终止;因此,这一版averager中有个条件判断,以便退出累计循环
        total += term
        count += 1
        average = total/count
    return Result(count, average)
python
coro_avg = averager()
next(coro_avg)
python
coro_avg.send(10)
python
coro_avg.send(30)
python
coro_avg.send(6.5)
python
coro_avg.send(None)
---------------------------------------------------------------------------

StopIteration                             Traceback (most recent call last)

Cell In[55], line 1
----> 1 coro_avg.send(None)


StopIteration: Result(count=3, average=15.5)

这一版不产出值,但return表达式的值会偷偷传给调用方,赋值给StopIteration异常的一个属性。这样做有点不合常理,却能保留生成器对象的常规行为——耗尽时抛出StopIteration异常。

python
## 捕获异常,返回值
coro_avg = averager()
next(coro_avg)
python
coro_avg.send(10)
coro_avg.send(30)
coro_avg.send(6.5)
python
try:
    coro_avg.send(None)
except StopIteration as exc:
    result = exc.value

result
Result(count=4, average=14.125)

使用yield from

ield from结构会在内部自动捕获StopIteration异常。这种处理方式与for循环处理StopIteration异常的方式一样:循环机制使用用户易于理解的方式处理异常。对yield from结构来说,解释器不仅会捕获StopIteration异常,还会把value属性的值变成yield from表达式的值。

python
def gen():
    for c in 'AB':
        yield c
    for i in range(1, 3):
        yield i


# 使用yield fromj简化for循环
def gen():
    yield from 'AB'
    yield from range(1, 3)

使用yield from链接可迭代的对象

python
def chain(*iterables):
    for it in iterables:
        yield from it
python
s = 'ABC'
t = tuple(range(3))
list(chain(s, t))
['A', 'B', 'C', 0, 1, 2]

yield from x表达式对x对象所做的第一件事是,调用iter(x),从中获取迭代器。因此,x可以是任何可迭代的对象。

可是,如果yield from结构唯一的作用是替代产出值的嵌套for循环,这个结构很有可能不会添加到Python语言中。yield from结构的本质作用无法通过简单的可迭代对象说明,而要发散思维,使用嵌套的生成器。因此,引入yield from结构的PEP 380才起了“Syntax for Delegating to a Subgenerator”(“把职责委托给子生成器的句法”)这个标题。

yield from的主要功能是打开双向通道,把最外层的调用方与最内层的子生成器连接起来,这样二者可以直接发送和产出值,还可以直接传入异常,而不用在位于中间的协程中添加大量处理异常的样板代码。有了这个结构,协程可以通过以前不可能的方式委托职责。

术语:

  • 委派生成器:包含yield from <iterable>表达式的生成器函数
  • 子生成器:从yield from表达式中<iterable>部分获取的生成器
  • 调用方:调用委派生成器的客户端代码

yield from可以从生成器中获取值,并且可以把值传递给生成器,相当于多了一层

python
from collections import namedtuple
Result = namedtuple('Result', 'count average')


# 子生成器
def averager():
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield
        if term is None:
            break
        total += term
        count += 1
        average = total/count
    return Result(count, average)


# 委派生成器
def grouper(results, key):
    while True:
        results[key] = yield from averager()


# 客户端代码,即调用方
def main(data):
    results = {}
    for key, values in data.items():
        group = grouper(results, key)
        next(group)
        for value in values:
            group.send(value)
        group.send(None)  # 如果需要,传入None,结束协程
    # print(results)  #如果要调试,去掉注释
    report(results)


# 输出报告
def report(results):
    for key, result in sorted(results.items()):
        group, unit = key.split(';')
        print('{:2} {:5} averaging {:.2f}{}'.format(
              result.count, group, result.average, unit))


data = {
    'girls;kg':
        [40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
    'girls;m':
        [1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
    'boys;kg':
        [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
    'boys;m':
        [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}
if __name__ == '__main__':
    main(data)
 9 boys  averaging 40.42kg
 9 boys  averaging 1.39m
10 girls averaging 42.04kg
10 girls averaging 1.43m

yield from的意义

上例中的行为表现:

  • 子生成器产出的值都直接传给委派生成器的调用方(即客户端代码)。
  • 使用send( )方法发给委派生成器的值都直接传给子生成器。如果发送的值是None,那么会调用子生成器的__next__( )方法。如果发送的值不是None,那么会调用子生成器的send( )方法。如果调用的方法抛出StopIteration异常,那么委派生成器恢复运行。任何其他异常都会向上冒泡,传给委派生成器。
  • 生成器退出时,生成器(或子生成器)中的return expr表达式会触发StopIteration(expr)异常抛出。
  • yield from表达式的值是子生成器终止时传给StopIteration异常的第一个参数。

yield from结构的另外两个特性与异常和终止有关:

  • 传入委派生成器的异常,除了GeneratorExit之外都传给子生成器的throw( )方法。如果调用throw( )方法时抛出StopIteration异常,委派生成器恢复运行。StopIteration之外的异常会向上冒泡,传给委派生成器。
  • 如果把GeneratorExit异常传入委派生成器,或者在委派生成器上调用close( )方法,那么在子生成器上调用close( )方法,如果它有的话。如果调用close( )方法导致异常抛出,那么异常会向上冒泡,传给委派生成器;否则,委派生成器抛出GeneratorExit异常。

略一些例子...