对于协程(coroutine),你必须知道事情都在这里了(内附代码)

什么是协程

协程(coroutine) 的概念根据Donald Knuth的说法早在1958年就由Melvin Conway提出了,对应wikipedia的定义如下: > Coroutines are computer program components that generalize subroutines for non-preemptive multitasking, by allowing execution to be suspended and resumed. Coroutines are well-suited for implementing familiar program components such as cooperative tasks, exceptions, event loops, iterators, infinite lists and pipes. 这里_子例程(subroutine)是一个概括性的术语,子例程可以是整个程序中的一个代码区块,当它被主程序调用的时候就会进入运行。例如函数就是子例程中的一种。

1
c = max(a,b);
从wikipedia定义可以看出协程相比子例程更加的灵活,允许执行过程中_被挂起_和_恢复,多个协程可以一起相互协作执行任务。从协程(co + routine)名字上来拆解为支持 协作(cooperate) 的例程。

协程与子例程的执行区别

图中左边是子例程的执行过程,右边是协程的执行过程,可以很明显的看出来执行过程中的区别。 * 先说左边,子例程可以看成是某个函数,子例程的执行过程中通常是嵌套顺序执行的过程,子例程1和子例程2的关系(调用和被调用)不是完全平等的,子例程1能调用子例程2,但子例程2不能反过来调用子例程1。 * 再说右边,协程1和协程2的关系是完全对等的,协程1执行过程中可以中断挂起执行另外一个协程2,反之也是可以的,直到最终两个协程都执行完以后再返回回到主程序中,即协程1和协程2相互协作完成了整个任务。 接下来举一个协程实现生产者和消费者的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var q := new queue

coroutine produce
loop
while q is not full
create some new items
add the items to q
yield to consume

coroutine consume
loop
while q is not empty
remove some items from q
use the items
yield to produce
这里有一个队列queue,一个生产者produce,一个消费者consume,yield代表中断挂起当前协程,并恢复其他协程的操作。生产者生产物品以后加入到队列以后,中断挂起自身并恢复消费者,消费者从队列中消费完物品以后中断挂起自身并恢复生产者,不断来回切换直到达到最终条件(比如所有原料都生产成物品并全都被消费完成),程序终止。

进程、线程、协程的关系和比较

通常会提到 进程是资源分配的最小单位,线程是CPU调度的最小单位, 一个进程里可以有多个线程 ,这里直接画了个图来说明三者关系。

  • 进程是资源分配的最小单位,会拥有独立的地址空间以及对应的内存空间,还有网络和文件资源等,不同进程之间资源都是独立的,可以通过进程间通信(管道、共享内存、信号量等方式)来进行交互。
  • 线程为CPU调度的基本单位,除了拥有运行中必不可少的信息(如程序计数器、一组寄存器和栈)以外,本身并不拥有系统资源,所有线程会共享进程的资源,比如会共享堆资源。
  • 协程可以认为是运行在线程上的代码块,协程提供的挂起操作会使协程暂停执行,而不会导致线程阻塞。一个线程内部可以创建几千个协程都没有任何问题。
  • 进程的切换和线程切换中都包含了对应上下文的切换,这块都涉及到了内核来完成,即一次用户态到内核态的切换和一次内核态到用户态的切换。因为进程上下文切换保存的信息更多,所以进程切换代价会比线程切换代价更大。
  • 协程是一个纯用户态的并发机制,同一时刻只会有一个协程在运行,其他协程挂起等待;不同协程之间的切换不涉及内核,只用在用户态切换即可,所以切换代价更小,更轻量级,适合IO密集型的场景。

coroutine的python实现

  1. Python最初的版本里是包含了yield/send关键字,通过yield/send可以方便的实现一个协程的例子,这里还是以为生产者和消费者为例,具体实现方式如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-

    def consumer():
    """consumer定义"""
    ret = ''
    while True:
    # 挂起consumer,恢复producer, ret会传回给producer
    item_id = yield ret
    if not item_id:
    return
    print('consume item_id:{}'.format(item_id))
    ret = 'success'

    def producer(consumer):
    """producer定义"""
    # send一个None可以看成是启动consumer
    # consumer这里包含了yield关键字相当于是一个generator
    consumer.send(None)
    item_id = 0
    # 生产的item的总数
    item_total_count = 3
    while item_id < item_total_count:
    item_id = item_id + 1
    print('produce item:{}'.format(item_id))
    # 挂起producer,恢复consumer, item_id会传回给consumer
    ret = c.send(item_id)
    print('consumer return:{}'.format(ret))
    consumer.close()

    if __name__ == "__main__":
    c = consumer()
    producer(c)

    结果:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    produce item:1
    consume item_id:1
    consumer return:success
    produce item:2
    consume item_id:2
    consumer return:success
    produce item:3
    consume item_id:3
    consumer return:success

  2. python 3.5版本开始引入了async/await关键字给了我们另外一种实现的方法,还是以为生产者和消费者为例,具体实现方式如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-

    import asyncio
    import random

    async def producer(queue, item_total_count):
    """producer 定义"""
    for item_id in range(0, item_total_count):
    # 生产一个新的item
    print('produce item_id:{}'.format(item_id))
    # 模拟IO操作, 挂起producer,恢复consumer
    #await asyncio.sleep(random.random())
    # 把item放入队列, 挂起producer,恢复consumer
    await queue.put(item_id)
    # 放入一个None到队列表示生产全部结束
    await queue.put(None)


    async def consumer(queue):
    """consumer 定义"""
    while True:
    # 等待从队列中获得一个新的item, 等待过程中会挂起consumer,恢复producer
    item_id = await queue.get()
    if item_id is None:
    # 表示生产者都生产完了,没有待消费的请求了
    break
    print('consume item_id:{}'.format(item_id))
    # 模拟IO操作, 挂起consumer,恢复producer
    #await asyncio.sleep(random.random())

    if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue(loop=loop)
    producer_coro = producer(queue, 3)
    consumer_coro = consumer(queue)
    loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro))
    loop.close()

    结果:

    1
    2
    3
    4
    5
    6
    produce item_id:0
    produce item_id:1
    produce item_id:2
    consume item_id:0
    consume item_id:1
    consume item_id:2