器→工具, 编程语言

Python协程与异步

钱魏Way · · 735 次浏览

协程简介

在了解协程、异步之前,我们首先得了解一些基础概念:,如阻塞和非阻塞、同步和异步、多进程和协程。

阻塞和非阻塞

  • 阻塞:阻塞状态指程序未得到所需计算资源时被挂起的状态。程序在等待某个操作完成期间,自身无法继续干别的事情,则称该程序在该操作上是阻塞的。 常见的阻塞形式有:网络 I/O 阻塞、磁盘 I/O 阻塞、用户输入阻塞等。阻塞是无处不在的,包括 CPU 切换上下文时,所有的进程都无法真正干事情,它们也会被阻塞。如果是多核 CPU 则正在执行上下文切换操作的核不可被利用。
  • 非阻塞:程序在等待某操作过程中,自身不被阻塞,可以继续运行干别的事情,则称该程序在该操作上是非阻塞的。 非阻塞并不是在任何程序级别、任何情况下都可以存在的。 仅当程序封装的级别可以囊括独立的子程序单元时,它才可能存在非阻塞状态。 非阻塞的存在是因为阻塞存在,正因为某个操作阻塞导致的耗时与效率低下,我们才要把它变成非阻塞的。

同步和异步

  • 同步:不同程序单元为了完成某个任务,在执行过程中需靠某种通信方式以协调一致,称这些程序单元是同步执行的。 例如购物系统中更新商品库存,需要用 “行锁” 作为通信信号,让不同的更新请求强制排队顺序执行,那更新库存的操作是同步的。 简言之,同步意味着有序。
  • 异步:为完成某个任务,不同程序单元之间过程中无需通信协调,也能完成任务的方式,不相关的程序单元之间可以是异步的。 例如,爬虫下载网页。调度程序调用下载程序后,即可调度其他任务,而无需与该下载任务保持通信以协调行为。不同网页的下载、保存等操作都是无关的,也无需相互通知协调。这些异步操作的完成时刻并不确定。简言之,异步意味着无序。

进程和线程

  • 进程(Process)是应用程序启动的实例,拥有代码、数据和文件和独立的内存空间,是操作系统最小资源管理单元。每个进程下面有一个或者多个线程(Thread),来负责执行程序的计算,是最小的执行单元。操作系统会负责进程的资源的分配;控制权主要在操作系统。
  • 线程做为任务的执行单元,有新建、可运行runnable(调用start方法,进入调度池,等待获取cpu使用权)、运行running(得到cpu使用权开始执行程序) 阻塞blocked(放弃了cpu 使用权,再次等待) 死亡dead5中不同的状态。线程的转态也是由操作系统进行控制。线程如果存在资源共享的情况下,就需要加锁,比如生产者和消费者模式,生产者生产数据多共享队列,消费者从共享队列中消费数据。

线程和进程在得到和放弃cpu使用权时,cpu使用权的切换都需损耗性能,因为某个线程为了能够在再次获得cpu使用权时能继续执行任务,必须记住上一次执行的所有状态。另外线程还有锁的问题。

并行和并发

并行和并发,听起来都像是同时执行不同的任务。但是这个同时的含义是不一样的。

  • 并行(Parallelism):多核CPU才有可能真正的同时执行,就是独立的资源来完成不同的任务,没有先后顺序。
  • 并发(concurrent):是看上去的同时执行,实际微观层面是顺序执行,是操作系统对进程的调度以及cpu的快速上下文切换,每个进程执行一会然后停下来,cpu资源切换到另一个进程,只是切换的时间很短,看起来是多个任务同时在执行。要实现大并发,需要把任务切成小的任务。

上面说的多核cpu可能同时执行,这里的可能是和操作系统调度有关,如果操作系统调度到同一个cpu,那就需要cpu进行上下文切换。当然多核情况下,操作系统调度会尽可能考虑不同cpu。这里的上下文切换可以理解为需要保留不同执行任务的状态和数据。所有的并发处理都有排队等候,唤醒,执行三个这样的步骤。

协程背景与定义

GIL: 全局解释器锁(英语:Global Interpreter Lock,缩写GIL),是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。即便在多核心处理器上,使用 GIL 的解释器也只允许同一时间执行一个线程。由于GIL的存在,导致Python多线程性能甚至比单线程更糟。

协程,英文叫做 Coroutine,又称微线程,纤程,协程是一种用户态的轻量级线程。 协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此协程能保留上一次调用时的状态,即所有局部状态的一个特定组合,每次过程重入时,就相当于进入上一次调用的状态。 协程本质上是个单进程,协程相对于多进程来说,无需线程上下文切换的开销,无需原子操作锁定及同步的开销,编程模型也非常简单。 我们可以使用协程来实现异步操作,比如在网络爬虫场景下,我们发出一个请求之后,需要等待一定的时间才能得到响应,但其实在这个等待过程中,程序可以干许多其他的事情,等到响应得到之后才切换回来继续处理,这样可以充分利用 CPU 和其他资源,这就是异步协程的优势。

如果你还无法理解协程的概念,那么可以这么简单的理解:

  • 进程/线程:操作系统提供的一种并发处理任务的能力。
  • 协程:程序员通过高超的代码能力,在代码执行流程中人为的实现多任务并发,是单个线程内的任务调度技巧。

多进程和多线程体现的是操作系统的能力,而协程体现的是程序员的流程控制能力。

协程的概念很早就提出来了,但直到最近几年才在某些语言(如Lua)中得到广泛应用。

子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。

注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。比如子程序A、B:

def A():
    print('1')
    print('2')
    print('3')

def B():
    print('x')
    print('y')
    print('z')

假设由协程执行,在执行A的过程中,可以随时中断,去执行B,B也可能在执行过程中中断再去执行A,结果可能是:

1
2
x
y
3
z

但是在A中是没有调用B的,所以协程的调用比函数调用理解起来要难一些。看起来A、B的执行有点像多线程,但协程的特点在于是一个线程执行,那和多线程比,协程有何优势?

  • 协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
  • 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

Python对协程的支持是通过generator实现的。在generator中,我们不但可以通过for循环来迭代,还可以不断调用next()函数获取由yield语句返回的下一个值。但是Python的yield不但可以返回一个值,它还可以接收调用者发出的参数。

传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高:

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

c = consumer()
produce(c)

执行结果:

[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK

注意到consumer函数是一个generator,把一个consumer传入produce后:

  • 首先调用send(None)启动生成器
  • 然后,一旦生产了东西,通过send(n)切换到consumer执行
  • consumer通过yield拿到消息,处理,又通过yield把结果传回
  • produce拿到consumer处理的结果,继续生产下一条消息
  • produce决定不生产了,通过close()关闭consumer,整个过程结束

整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。

Python对协程的支持

生成器(Generator)

我们这里主要讨论yield和yield from这两个表达式,这两个表达式和协程的实现息息相关。

  • Python 2.5中引入yield表达式,参见PEP342
  • Python 3.3中增加yield from语法,参见PEP380

方法中包含yield表达式后,Python会将其视作generator对象,不再是普通的方法。

yield表达式的使用

最早的时候,Python提供了yield关键字,用于制造生成器。也就是说,包含有yield的函数,都是一个生成器!

yield的语法规则是:在yield这里暂停函数的执行,并返回yield后面表达式的值(默认为None),直到被next()方法再次调用时,从上次暂停的yield代码处继续往下执行。当没有可以继续next()的时候,抛出异常,该异常可被for循环处理。

我们先来看该表达式的具体使用:

def test():
    print("generator start")
    n = 1
    while True:
        yield_expression_value = yield n
        print("yield_expression_value = %d" % yield_expression_value)
        n += 1


# 创建generator对象
generator = test()

# 启动generator
next_result = generator.__next__()
print("next_result = %d" % next_result)

# 发送值给yield表达式
send_result = generator.send(666)
print("send_result = %d" % send_result)

执行结果:

generator start
next_result = 1
yield_expression_value = 666
send_result = 2

方法说明:

  • __next__()方法: 作用是启动或者恢复generator的执行,相当于send(None)
  • send(value)方法:每个生成器都可以执行send()方法,作用是发送值给yield表达式。启动generator则是调用send(None)。此时yield语句不再只是yield xxxx的形式,还可以是var = yield xxxx的赋值形式。它同时具备两个功能,一是暂停并返回函数,二是接收外部send()方法发送过来的值,重新激活函数,并将这个值赋值给var变量。

执行结果的说明:

  • 创建generator对象:包含yield表达式的函数将不再是一个函数,调用之后将会返回generator对象
  • 启动generator:使用生成器之前需要先调用__next__或者send(None),否则将报错。启动generator后,代码将执行到yield出现的位置,也就是执行到yield n,然后将n传递到__next__()这行的返回值。(注意,生成器执行到yield n后将暂停在这里,直到下一次生成器被启动)
  • 发送值给yield表达式:调用send方法可以发送值给yield表达式,同时恢复生成器的执行。生成器从上次中断的位置继续向下执行,然后遇到下一个yield,生成器再次暂停,切换到主函数打印出send_result。

理解这个demo的关键是:生成器启动或恢复执行一次,将会在yield处暂停。上面的第2步仅仅执行到了yield n,并没有执行到赋值语句,到了第3步,生成器恢复执行才给yield_expression_value赋值。

协程可以处于下面四个状态中的一个。当前状态可以导入inspect模块,使用inspect.getgeneratorstate() 方法查看,该方法会返回下述字符串中的一个。

  • GEN_CREATED:等待开始执行
  • GEN_RUNNING:协程正在执行
  • GEN_SUSPENDED:在 yield 表达式处暂停
  • GEN_CLOSE:协程执行结束

示例代码:

from inspect import getgeneratorstate


def simple_coro():
    print("generator start")
    n = 1
    while True:
        yield_expression_value = yield n
        print("yield_expression_value = %d" % yield_expression_value)
        n += 1


my_coro = simple_coro()
print(getgeneratorstate(my_coro))
next(my_coro)
print(getgeneratorstate(my_coro))
my_coro.send(28)
print(getgeneratorstate(my_coro))
my_coro.close()
print(getgeneratorstate(my_coro))

输出:

GEN_CREATED
generator start
GEN_SUSPENDED
yield_expression_value = 28
GEN_SUSPENDED
GEN_CLOSED

因为send()方法的参数会成为暂停的yield表达式的值,所以,仅当协程处于暂停状态时才能调用 send()方法。不过,如果协程还没激活(状态是’GEN_CREATED’),就立即把None之外的值发给它,会出现TypeError。因此,始终要先调用next(my_coro)激活协程,这一过程被称作预激活。

除了send()方法,其实还有throw()和close()方法:

  • throw(exc_type[, exc_value[, traceback]]):使生成器在暂停的yield表达式处抛出指定的异常。如果生成器处理了抛出的异常,代码会向前执行到下一个yield表达式,而产出的值会成为调用generator.throw()方法得到的返回值。如果生成器没有处理抛出的异常,异常会向上冒泡,传到调用方的上下文中。
  • close():使生成器在暂停的yield表达式处抛出GeneratorExit异常。如果生成器没有处理这个异常,或者抛出了StopIteration异常(通常是指运行到结尾),调用方不会报错。如果收到GeneratorExit异常,生成器一定不能产出值,否则解释器会抛出RuntimeError异常。生成器抛出的其他异常会向上冒泡,传给调用方。

yield from表达式

Python 3.3版本新增yield from语法,新语法用于将一个生成器部分操作委托给另一个生成器。此外,允许子生成器(即yield from后的“参数”)返回一个值,该值可供委派生成器(即包含yield from的生成器)使用。并且在委派生成器中,可对子生成器进行优化。

我们先来看最简单的应用,例如:

# 子生成器
def test(n):
    i = 0
    while i < n:
        yield i
        i += 1


# 委派生成器
def test_yield_from(n):
    print("test_yield_from start")
    yield from test(n)
    print("test_yield_from end")


for i in test_yield_from(3):
    print(i)

输出:

test_yield_from start
0
1
2
test_yield_from end

如果上面的test_yield_from函数中有两个yield from语句,将串行执行。比如将上面的test_yield_from函数改写成这样:

# 子生成器
def test(n):
    i = 0
    while i < n:
        yield i
        i += 1


# 委派生成器
def test_yield_from(n):
    print("test_yield_from start")
    yield from test(n)
    print("test_yield_from doing")
    yield from test(n)
    print("test_yield_from end")


for i in test_yield_from(3):
    print(i)

则输出:

test_yield_from start
0
1
2
test_yield_from doing
0
1
2
test_yield_from end

协程(Coroutine)

  • Python 3.4 开始,新增了asyncio相关的API,语法使用@asyncio.coroutine和yield from`实现协程
  • Python 3.5 中引入async/await语法,参见PEP492

@asyncio.coroutine

Python 3.4中,使用@asyncio.coroutine装饰的函数称为协程。不过没有从语法层面进行严格约束。

对于Python原生支持的协程来说,Python对协程和生成器做了一些区分,便于消除这两个不同但相关的概念的歧义:

  • 标记了@asyncio.coroutine装饰器的函数称为协程函数,iscoroutinefunction()方法返回True
  • 调用协程函数返回的对象称为协程对象,iscoroutine()函数返回True

举例:

import asyncio


def test(n):
    i = 0
    while i < n:
        yield i
        i += 1


@asyncio.coroutine
def test_yield_from(n):
    print("test_yield_from start")
    yield from test(n)
    print("test_yield_from end")


if __name__ == "__main__":
    # 是否是协程函数
    print(asyncio.iscoroutinefunction(test_yield_from))
    # 是否是协程对象
    print(asyncio.iscoroutine(test_yield_from(3)))

可以看下@asyncio.coroutine的源码中查看其做了什么,将其源码简化下,大致如下:

import functools
import types
import inspect

def coroutine(func):
    # 判断是否是生成器
    if inspect.isgeneratorfunction(func):
        coro = func
    else:
        # 将普通函数变成generator
        @functools.wraps(func)
        def coro(*args, **kw):
            res = func(*args, **kw)
            res = yield from res
            return res
    # 将generator转换成coroutine
    wrapper = types.coroutine(coro)
    # For iscoroutinefunction().
    wrapper._is_coroutine = True
    return wrapper

将这个装饰器标记在一个生成器上,就会将其转换成coroutine。然后,我们来实际使用下@asyncio.coroutine和yield from:

import asyncio


@asyncio.coroutine
def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    yield from asyncio.sleep(1.0)
    return x + y


@asyncio.coroutine
def print_sum(x, y):
    result = yield from compute(x, y)
    print("%s + %s = %s" % (x, y, result))


loop = asyncio.get_event_loop()
print("start")
# 中断调用,直到协程执行结束
loop.run_until_complete(print_sum(1, 2))
print("end")
loop.close()

执行结果:

start
Compute 1 + 2 ...
1 + 2 = 3
end

print_sum这个协程中调用了子协程compute,它将等待compute执行结束才返回结果。这个demo点调用流程如下图:

EventLoop将会把print_sum封装成Task对象

流程图展示了这个demo的控制流程,不过没有展示其全部细节。比如其中“暂停”的1s,实际上创建了一个future对象, 然后通过BaseEventLoop.call_later()在1s后唤醒这个任务。

注意 @asyncio.coroutine将在Python 3.10版本中移除。

async/await

Python3.5开始引入async/await语法(PEP 492),用来简化协程的使用并且便于理解。

async/await实际上只是@asyncio.coroutine和yield from的语法糖:

  • 把@asyncio.coroutine替换为async
  • 把yield from替换为await
import asyncio


async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y


async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))


loop = asyncio.get_event_loop()
print("start")
loop.run_until_complete(print_sum(1, 2))
print("end")
loop.close()

我们再来看一个asyncio中Future的例子:

import asyncio

future = asyncio.Future()


async def coro1():
    print("wait 1 second")
    await asyncio.sleep(1)
    print("set_result")
    future.set_result('data')


async def coro2():
    result = await future
    print(result)


loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([coro1(), coro2()]))
loop.close()

输出结果:

wait 1 second
set_result
data

这里await后面跟随的future对象,协程中yield from或者await后面可以调用future对象,其作用是:暂停协程,直到future执行结束或者返回result或抛出异常。

在例子中,await future必须要等待future.set_result(‘data’)后才能够结束。将coro2()作为第二个协程可能体现得不够明显,可以将协程的调用改成这样:

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([coro2(), coro1()]))
loop.close()

输出的结果仍旧与上面相同。

其实,async这个关键字的用法不止能用在函数上,还有async with异步上下文管理器,async for异步迭代。

asyncio详解

关于asyncio的一些关键字的说明:

  • event_loop 事件循环:程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数
  • coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
  • task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含了任务的各种状态
  • future: 代表将来执行或没有执行的任务的结果。它和task上没有本质上的区别
  • async/await 关键字:5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

asyncio 模块最大特点就是,只存在一个线程,跟 JavaScript 一样。由于只有一个线程,就不可能多个任务同时运行。asyncio 是”多任务合作”模式(cooperative multitasking),允许异步任务交出执行权给其他任务,等到其他任务完成,再收回执行权继续往下执行,这跟 JavaScript 也是一样的。

由于代码的执行权在多个任务之间交换,所以看上去好像多个任务同时运行,其实底层只有一个线程,多个任务分享运行时间。表面上,这是一个不合理的设计,明明有多线程多进程的能力,为什么放着多余的 CPU 核心不用,而只用一个线程呢?但是就像前面说的,单线程简化了很多问题,使得代码逻辑变得简单,写法符合直觉。

asyncio 模块在单线程上启动一个事件循环(event loop),时刻监听新进入循环的事件,加以处理,并不断重复这个过程,直到异步任务结束。事件循环的内部机制,和JavaScript 的模型是一样的。

创建协程

通过async关键字定义一个协程(coroutine),协程也是一种对象。下面say_after,main就是一个协程:

import asyncio
import time


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    print("started ")
    s_time = time.time()
    await say_after(1, 'hello')
    await say_after(2, 'world')
    print("runtime : ", time.time() - s_time)
    print("finished ")


asyncio.run(main())

asyncio.run() 函数用来运行一个协程对象,这里我们将main()作为入口函数。await等待一个协程。上面代码段会在等待 1 秒后打印 “hello”,然后 再次 等待 2 秒后打印 “world”。asyncio.sleep表示阻塞多少秒,运行结果如下:

started 
hello
world
runtime :  3.001490592956543
finished

可以观察到上面的代码,是同步运行的,两个await say_after之间遇到了阻塞。因为asyncio.run() 只是单纯的运行一个协程,并不会并发运行。

运行协程

运行协程对象的方法主要有:

  • 通过run(main) 运行一个协程,同步的方式,主要用于运行入口协程(实际上参考源码asyncio.run本质也是获取loop,运行协程,即协程依靠loop运行)
  • 在另一个已经运行的协程中用 `await` 等待它,比如上面运行了main协程,其中等待的say_after协程也会运行
  • 将协程封装成task或future对象,然后挂到事件循环loop上,使用loop来运行。主要方法为run_until_complete。此方法可以异步的并发运行

并发协程

asyncio.create_task() 函数用来并发运行多个协程:

import asyncio
import time


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    print("started ")
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))
    s_time = time.time()
    await task1
    await task2
    print("runtime : ", time.time() - s_time)
    print("finished ")


asyncio.run(main())

运行输出,比上面快一秒。这里我们使用create_task将协程封装成task对象(会自动的添加到事件循环中),然后我们在main这个入口协程中挂起task1和task2。使用run运行main入口协程,它会自动检测循环事件,并将等待task1和task2两个task执行完成

started 
hello
world
runtime :  2.0008769035339355
finished

运行输出,比上面快一秒。这里我们使用create_task将协程封装成task对象(会自动的添加到事件循环中),然后我们在main这个入口协程中挂起task1和task2。使用run运行main入口协程,它会自动检测循环事件,并将等待task1和task2两个task执行完成

asyncio.create_task方法实际是封装了获取事件循环asyncio.get_running_loop()与创建循环任务loop.create_task(coro)的一种高级方法。

可等待对象

await 关键字用于将程序控制权移交给事件循环并中断当前协程的执行。它有以下几个使用规则:

  • 只能用在由 async def 修饰的函数中,在普通函数中使用会抛出异常
  • 调用一个协程函数后,就必须等待其执行完成并返回结果
  • await func() 中的 func() 必须是一个 awaitable 对象。即一个协程函数或者一个在内部实现了 __await__() 方法的对象,该方法会返回一个生成器

Awaitable 对象包含协程、Task 和 Future :

  • 协程对象:async def 的函数对象
  • task任务:将协程包装成的一个任务(task)对象,用于注册到事件循环上
  • Future:是一种特殊的低层级可等待对象,表示一个异步操作的最终结果

可等待的意思就是跳转到等待对象,并将当前任务挂起。当等待对象的任务处理完了,才会跳回当前任务继续执行。实际上与yield from功能相同,不同的是await后面是awaitable,yield from后面是生成器对象。

对于协程中进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序,如下面我们使用time.sleep()替代asyncio.sleep(),会发现在time.sleep()协程时程序阻塞,最后时间为4s。

import asyncio
import time


async def say_after(delay, what):
    await timesleep(delay)
    return what


async def timesleep(delay):
    time.sleep(delay)


async def main():
    print("started ")
    task1 = asyncio.create_task(say_after(2, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))
    s_time = time.time()
    await task1
    await task2
    print(task1.result(), task2.result())
    print("runtime : ", time.time() - s_time)
    print("finished ")


asyncio.run(main())

输出:

started 
hello world
runtime :  4.001659870147705
finished

修改如下:

async def timesleep(delay):
    await asyncio.sleep(delay)

输出:

started 
hello world
runtime :  2.0003907680511475
finished

一个需要4秒,一个只要2秒。这是因为asyncio.sleep()不同于time.sleep(),它其实在内部实现了一个future对象,事件循环会异步的等待这个对象完成。所以,在事件循环中,使用await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权。对于task与future对象,await可以将他们挂在事件循环上,由于他们相比于协程对象增加了运行状态(Pending、Running、Done、Cancelled等),事件循环则可以读取他们的状态,实现异步的操作,比如上面并发的示例。同时对于阻塞的操作(没有实现异步的操作,如request就会阻塞,aihttp则不会),由于协程是单线程,会阻塞整个程序。

事件循环

事件循环是每个 asyncio 应用的核心。 事件循环会运行异步任务和回调,执行网络 IO 操作,以及运行子程序。

简单说我们将协程任务(task)注册到事件循环(loop)上,事件循环(loop)会循环遍历任务的状态,当任务触发条件发生时就会执行对应的任务。类似JavaScript事件循环,当onclick被触发时,就会执行对应的js脚本或者回调。同时当遇到阻塞,事件循环回去查找其他可运行的任务。所以事件循环被认为是一个循环,因为它在不断收集事件并遍历它们从而找到如何处理该事件。

通过以下伪代码理解:

while (1) {
    events = getEvents();
    for (e in events)
        processEvent(e);
}

所有的时间都在 while 循环中捕捉,然后经过事件处理者处理。事件处理的部分是系统唯一活跃的部分,当一个事件处理完成,流程继续处理下一个事件。如果遇到阻塞,循环会去执行其他任务,当阻塞任务完成后再回调(具体如何实现不太清楚,应该是将阻塞任务标记状态或者放进其它列来实现)

asyncio 中主要的事件循环方法有:

  • get_running_loop():返回当前 OS 线程中正在运行的事件循环对象。
  • get_event_loop():获取当前事件循环。 如果当前 OS 线程没有设置当前事件循环并且 set_event_loop() 还没有被调用,asyncio 将创建一个新的事件循环并将其设置为当前循环。
  • new_event_loop():创建一个新的事件循环。
  • run_until_complete():运行直到 future ( Future 的实例 ) 被完成。如果参数是 coroutine object ,将被隐式调度为 asyncio.Task 来运行。返回 Future 的结果 或者引发相关异常。
  • create_future():创建一个附加到事件循环中的 asyncio.Future 对象。
  • create_task(coro):安排一个 协程 的执行。返回一个 Task 对象。
  • run_forever():运行事件循环直到 stop() 被调用。
  • stop():停止事件循环
  • close():关闭事件循环。

上面的并发例子就可以改成下面形式:

import asyncio
import time


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


def main():
    print("started ")
    s_time = time.time()
    loop = asyncio.get_event_loop()  # 获取一个事件循环
    tasks = [
        asyncio.ensure_future(say_after(1, "hello")),  # asyncio.ensure_future()包装协程或可等待对象在将来等待。如果参数是Future,则直接返回。
        asyncio.ensure_future(say_after(2, "world")),
        loop.create_task(say_after(1, "hello")),  # loop.create_task()包装协程为task。
        loop.create_task(say_after(2, "world"))
    ]
    loop.run_until_complete(asyncio.wait(tasks))
    print("runtime : ", time.time() - s_time)
    print("finished ")


main()

asyncio.get_event_loop方法可以创建一个事件循环,然后使用 run_until_complete 将协程注册到事件循环,并启动事件循环。asyncio.ensure_future(coroutine) 和loop.create_task(coroutine)都可以创建一个task,run_until_complete的参数是一个futrue对象。当传入一个协程,其内部会自动封装成task,task是Future的子类。asyncio.wait类似与await 不过它可以接受一个list,asyncio.wait()返回的是一个协程。

Task对象

Asyncio是用来处理事件循环中的异步进程和并发任务执行的。它还提供了asyncio.Task() 类,可以在任务中使用协程。它的作用是,在同一事件循环中,运行某一个任务的同时可以并发地运行多个任务。当协程被包在任务中,它会自动将任务和事件循环连接起来,当事件循环启动的时候,任务自动运行。这样就提供了一个可以自动驱动协程的机制。

如果被包裹的协程要等待一个 future 对象,那么任务会被挂起,等待future的计算结果。当future计算完成,被包裹的协程将会拿到future返回的结果或异常(exception)继续执行。另外,需要注意的是,事件循环一次只能运行一个任务,除非还有其它事件循环在不同的线程并行运行,此任务才有可能和其他任务并行。当一个任务在等待future执行的期间,事件循环会运行一个新的任务。

即Task对象封装协程(async标记的函数),将其挂到事件循环上运行,如果遇到等待 future 对象(await 后面等待的),那么该事件循环会运行其他 Task、回调或执行 IO 操作

相关的主要方法有:

  • create_task():高层级的方法,创建Task对象,并自动添加进loop,即get_running_loop()和loop.create_task(coro)的封装
  • Task():打包一个协程为Task对象
  • current_task(loop=None):返回当前运行的 Task 实例,如果没有正在运行的任务则返回 None。如果 loop 为 None 则会使用 get_running_loop() 获取当前事件循环
  • all_tasks(loop=None):返回事件循环所运行的未完成的 Task 对象的集合。
  • cancel():请求取消 Task 对象。这将安排在下一轮事件循环中抛出一个 CancelledError 异常给被封包的协程。
  • result():返回 Task 的结果。如果 Task 对象 已完成,其封包的协程的结果会被返回 (或者当协程引发异常时,该异常会被重新引发。)。如果 Task 对象 被取消,此方法会引发一个 CancelledError 异常。如果 Task 对象的结果还不可用,此方法会引发一个 InvalidStateError 异常。

通过task.result()获取返回的结果:

import asyncio
import time


async def say_after(delay, what):
    await asyncio.sleep(delay)
    return what


async def main():
    print("started ")
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))
    s_time = time.time()
    await task1
    await task2
    print(task1.result(), task2.result())
    print("runtime : ", time.time() - s_time)
    print("finished ")


asyncio.run(main())

建议使用高层级的 asyncio.create_task() 函数来创建 Task 对象,也可用低层级的 loop.create_task() 或 ensure_future() 函数。不建议手动实例化 asyncio.Task() 对象。

Future对象

Future如它的名字一样,是一种对未来的一种抽象,代表将来执行或没有执行的任务的结果。它和task上没有本质上的区别,task是Future的子类。实际上Future包裹协程,添加上各种状态,而task则是在Future上添加一些特性便于挂在事件循环上执行,所以Future就是一个内部底层的对象,平时我们只要关注task就可以了。Future可以通过回调函数处理结果

相关的主要方法有:

  • isfuture(obj):判断对象是不是future对象
  • ensure_future(obj,loop=None):接收一个协程或者future或者task对象,如果是future则直接返回future,其它则返回task
  • result():返回future结果
  • set_result(result):将 Future 标记为完成并设置结果
  • add_done_callback(callback, *, context=None) :添加一个在 Future 完成 时运行的回调函数。调用 callback 时,Future 对象是它的唯一参数。

官网的一个例子,体现的是Future的四个状态:Pending、Running、Done、Cancelled。创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done。

import asyncio


async def set_after(fut, delay, value):
    # Sleep for *delay* seconds.
    await asyncio.sleep(delay)

    # Set *value* as a result of *fut* Future.
    fut.set_result(value)


async def main():
    # Get the current event loop.
    loop = asyncio.get_running_loop()

    # Create a new Future object.
    fut = loop.create_future()

    # Run "set_after()" coroutine in a parallel Task.
    # We are using the low-level "loop.create_task()" API here because
    # we already have a reference to the event loop at hand.
    # Otherwise we could have just used "asyncio.create_task()".
    loop.create_task(
        set_after(fut, 1, '... world'))

    print('hello ...')

    # Wait until *fut* has a result (1 second) and print it.
    print(await fut)


asyncio.run(main())

如果注释掉fut.set_result(value),那么future永远不会done。

绑定回调,future与task都可以使用add_done_callback方法,因为task是future子类

import asyncio


async def say_after(delay, what):
    await asyncio.sleep(delay)
    return what


def callback(future):
    print('Callback: ', future.result())


coroutine = say_after(2, "hello")
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
loop.run_until_complete(task)

协程实战:网页抓取

绑定回调

import asyncio
import requests


async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status


def callback(task):
    print('Status:', task.result())


coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
print('Task:', task)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)

我们定义了一个 request () 方法,请求了百度,返回状态码,但是这个方法里面我们没有任何 print () 语句。随后我们定义了一个 callback () 方法,这个方法接收一个参数,是 task 对象,然后调用 print () 方法打印了 task 对象的结果。这样我们就定义好了一个 coroutine 对象和一个回调方法,我们现在希望的效果是,当 coroutine 对象执行完毕之后,就去执行声明的 callback () 方法。 那么它们二者怎样关联起来呢?很简单,只需要调用 add_done_callback () 方法即可,我们将 callback () 方法传递给了封装好的 task 对象,这样当 task 执行完毕之后就可以调用 callback () 方法了,同时 task 对象还会作为参数传递给 callback () 方法,调用 task 对象的 result () 方法就可以获取返回结果了。 运行结果:

Task: <Task pending coro=<request() running at D:/CodeHub/LearnPython/t1.py:5> cb=[callback() at D:/CodeHub/LearnPython/t1.py:11]>
Status: <Response [200]>
Task: <Task finished coro=<request() done, defined at D:/CodeHub/LearnPython/t1.py:5> result=<Response [200]>>

实际上不用回调方法,直接在 task 运行完毕之后也可以直接调用 result () 方法获取结果:

import asyncio
import requests


async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status


coroutine = request()
task = asyncio.ensure_future(coroutine)
print('Task:', task)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('Task Result:', task.result())

运行结果:

Task: <Task pending coro=<request() running at D:/CodeHub/LearnPython/t1.py:5>>
Task: <Task finished coro=<request() done, defined at D:/CodeHub/LearnPython/t1.py:5> result=<Response [200]>>
Task Result: <Response [200]>

多任务协程

上面的例子我们只执行了一次请求,如果我们想执行多次请求应该怎么办呢?我们可以定义一个 task 列表,然后使用 asyncio 的 wait () 方法即可执行:

import asyncio
import requests


async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status


tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print('Tasks:', tasks)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
    print('Task Result:', task.result())

这里我们使用一个 for 循环创建了五个 task,组成了一个列表,然后把这个列表首先传递给了 asyncio 的 wait () 方法,然后再将其注册到时间循环中,就可以发起五个任务了。最后我们再将任务的运行结果输出出来,运行结果如下:

Tasks: [<Task pending coro=<request() running at D:/CodeHub/LearnPython/t1.py:5>>, <Task pending coro=<request() running at D:/CodeHub/LearnPython/t1.py:5>>, <Task pending coro=<request() running at D:/CodeHub/LearnPython/t1.py:5>>, <Task pending coro=<request() running at D:/CodeHub/LearnPython/t1.py:5>>, <Task pending coro=<request() running at D:/CodeHub/LearnPython/t1.py:5>>]
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>

可以看到五个任务被顺次执行了,并得到了运行结果。

使用 aiohttp

aiohttp 是一个支持异步请求的库,利用它和 asyncio 配合我们可以非常方便地实现异步请求操作。

import asyncio
import aiohttp
import time

start = time.time()


async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    result = await response.text()
    session.close()
    return result


async def request():
    url = 'https://www.baidu.com'
    print('Waiting for', url)
    result = await get(url)
    print('Get response from', url)


tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)

参考链接:

  • asyncio– (Python standard library) Asynchronous I/O, event loop, coroutines and tasks.
  • trio– A friendly library for async concurrency and I/O.
  • Twisted– An event-driven networking engine.
  • uvloop– Ultra fast asyncio event loop.

One Reply to “Python协程与异步”

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注