理解 Python 中的异步编程

2017/10/23 20:23
阅读数 373

让我们来写一些 Python 代码

你可以在这个 GitHub 仓库 下载所有的示例代码。

这篇文章中的所有例子都已经在 Python 3.6.1 环境下测试过,而且在代码示例中的这个 requirements.txt 文件包含了运行所有这些测试所需要的模块。

我强烈建议创建一个 Python 虚拟环境来运行这些代码,这样就不会和系统级别的 Python 产生耦合。

示例 1:同步编程

第一个例子展示的是一种有些刻意设计的方式,即有一个任务先从队列中拉取"工作"之后再执行这个工作。在这种情况下,这个工作的内容只是获取一个数字,然后任务会把这个数字叠加起来。在每个计数步骤中,它还打印了字符串表明该任务正在运行,并且在循环的最后还打印出了总的计数。我们设计的部分即这个程序为多任务处理在队列中的工作提供了很自然的基础。

"""
example_1.py

Just a short example showing synchronous running of 'tasks'
"""

import queue

def task(name, work_queue):
    if work_queue.empty():
        print(f'Task {name} nothing to do')
    else:
        while not work_queue.empty():
            count = work_queue.get()
            total = 0
            for x in range(count):
                print(f'Task {name} running')
                total += 1
            print(f'Task {name} total: {total}')


def main():
    """
    This is the main entry point for the program
    """
    # create the queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # create some tasks
    tasks = [
        (task, 'One', work_queue),
        (task, 'Two', work_queue)
    ]

    # run the tasks
    for t, n, q in tasks:
        t(n, q)

if __name__ == '__main__':
    main()

该程序中的"任务"就是一个函数,该函数可以接收一个字符串和一个队列作为参数。在执行时,它会去看队列里是否有任何需要处理的工作,如果有,它就会把值从队列中取出来,开启一个 for 循环来叠加这个计数值并且在最后打印出总数。它会一直这样运行直到队列里什么都没剩了才会结束离开。

当我们在执行这个任务时,我们会得到一个列表表明任务一(即代码中的 task One)做了所有的工作。它内部的循环消费了队列里的全部工作,并且执行这些工作。当退出任务一的循环后,任务二(即代码中的 task Two)有机会运行,但是它会发现队列是空的,因为这个影响,该任务会打印一段语句之后退出。代码中并没有任何地方可以让任务一和任务二协作的很好并且可以在它们之间切换。

示例 2: 简单的协作并发

程序(example_2.py)的下个版本通过使用生成器增加了两个任务可以跟好相互协作的能力。在任务函数中添加 yield 语句意味着循环会在执行到这个语句时退出,但是仍然保留当时的上下文,这样之后就可以恢复先前的循环。在程序后面 "run the tasks" 的循坏中当 t.next() 被调用时就可以利用这个。这条语句会在之前生成(即调用 yield 的语句处)的地方重新开始之前的任务。

这是一种协作并发的方式。这个程序会让出对它当前上下文的控制,这样其它的任务就可以运行。在这种情况下,它允许我们主要的 "run the tasks" 调度器可以运行任务函数的两个实例,每一个实例都从相同的队列中消费工作。这种做法虽然聪明一些,但是为了和第一个示例达成同样结果的同时做了更多的工作。

"""
example_2.py

Just a short example demonstrating a simple state machine in Python
"""

import queue

def task(name, queue):
    while not queue.empty():
        count = queue.get()
        total = 0
        for x in range(count):
            print(f'Task {name} running')
            total += 1
            yield
        print(f'Task {name} total: {total}')

def main():
    """
    This is the main entry point for the program
    """
    # create the queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # create some tasks
    tasks = [
        task('One', work_queue),
        task('Two', work_queue)
    ]

    # run the tasks
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True


if __name__ == '__main__':
    main()

当程序运行时,输出表明任务一和任务二都在运行,它们都从队列里消耗工作并且处理它。这就是我们想要的,两个任务都在处理工作,而且都是以处理从队列中的两个项目结束。但是再一次,需要做一点工作来实现这个结果。

这里的技巧在于使用 yield 语句,它将任务函数转变为生成器,来实现一个 "上下文切换"。这个程序使用这个上下文切换来运行任务的两个实例。

示例 3:通过阻塞调用来协作并发

程序(example_3.py)的下个版本和上一个版本几乎完全一样,除了在我们任务循环体内添加了一个 time.sleep(1) 调用。这使任务循环中的每次迭代都添加了一秒的延迟。这个添加的延迟是为了模拟在我们任务中出现缓慢 IO 操作的影响。

我还导入了一个简单的 Elapsed Time 类来处理报告中使用的开始时间/已用时间功能。

"""
example_3.py

Just a short example demonstraing a simple state machine in Python
However, this one has delays that affect it
"""

import time
import queue
from lib.elapsed_time import ET


def task(name, queue):
    while not queue.empty():
        count = queue.get()
        total = 0
        et = ET()
        for x in range(count):
            print(f'Task {name} running')
            time.sleep(1)
            total += 1
            yield
        print(f'Task {name} total: {total}')
        print(f'Task {name} total elapsed time: {et():.1f}')


def main():
    """
    This is the main entry point for the program
    """
    # create the queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)


    tasks = [
        task('One', work_queue),
        task('Two', work_queue)
    ]
    # run the scheduler to run the tasks
    et = ET()
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

    print()
    print('Total elapsed time: {}'.format(et()))


if __name__ == '__main__':
    main()

当该程序运行时,输出表明任务一和任务二都在运行,消费从队列里来的工作并像之前那样处理它们。随着增加的模拟 IO 操作延迟,我们发现我们协作式的并发并没有为我们做任何事,延迟会停止整个程序的运行,而 CPU 就只会等待这个 IO 延迟的结束。

这就是异步文档中 ”阻塞代码“的确切含义。注意运行整个程序所需要的时间,你会发现这就是所有 IO 延迟的累积时间。这再次意味着通过这种方式运行程序并不是胜利了。

示例 4:使用非阻塞调用来协作并发

程序(example_4.py)的下一个版本已经修改了不少代码。它在程序一开始就使用了 gevent 异步编程模块。该 模块以及另一个叫做 monkey 的模块被导入了。

之后 monkey 模块一个叫做 patch_all() 的方法被调用。这个方法是用来干嘛的呢?简单来说它配置了这个应用程序,使其它所有包含阻塞(同步)代码的模块都会被打上"补丁",这样这些同步代码就会变成异步的。

就像大多数简单的解释一样,这个解释对你并没有很大的帮助。在我们示例代码中与之相关的就是 time.sleep(1)(我们模拟的 IO 延迟)不会再"阻塞"整个程序。取而代之的是它让出程序的控制返回给系统。请注意,"example_3.py" 中的 "yield" 语句不再存在,它现在已经是 time.sleep(1) 函数调用内的一部分。

所以,如果 time.sleep(1) 已经被 gevent 打补丁来让出控制,那么这个控制又到哪里去了?使用 gevent 的一个作用是它会在程序中运行一个事件循环的线程。对于我们的目的来说,这个事件循环就像在 example_3.py 中 "run the tasks" 的循环。当 time.sleep(1) 的延迟结束时,它就会把控制返回给 time.sleep(1) 语句的下一条可执行语句。这样做的优点是 CPU 不会因为延迟被阻塞,而是可以有空闲去执行其它代码。

我们 "run the tasks" 的循环已经不再存在了,取而代之的是我们的任务队列包含了两个对 gevent.spawn(...) 的调用。这两个调用会启动两个 gevent 线程(叫做 greenlet),它们是相互协作进行上下文切换的轻量级微线程,而不是像普通线程一样由系统切换上下文。

注意在我们任务生成之后的 gevent.joinall(tasks) 调用。这条语句会让我们的程序会一直等待任务一和任务二都完成。如果没有这个的话,我们的程序将会继续执行后面打印的语句,但是实际上没有做任何事。

"""
example_4.py

Just a short example demonstrating a simple state machine in Python
However, this one has delays that affect it
"""

import gevent
from gevent import monkey
monkey.patch_all()

import time
import queue
from lib.elapsed_time import ET


def task(name, work_queue):
    while not work_queue.empty():
        count = work_queue.get()
        total = 0
        et = ET()
        for x in range(count):
            print(f'Task {name} running')
            time.sleep(1)
            total += 1
        print(f'Task {name} total: {total}')
        print(f'Task {name} total elapsed time: {et():.1f}')


def main():
    """
    This is the main entry point for the programWhen
    """
    # create the queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # run the tasks
    et = ET()
    tasks = [
        gevent.spawn(task, 'One', work_queue),
        gevent.spawn(task, 'Two', work_queue)
    ]
    gevent.joinall(tasks)
    print()
    print(f'Total elapsed time: {et():.1f}')


if __name__ == '__main__':
    main()

当这个程序运行的时候,请注意任务一和任务二都在同样的时间开始,然后等待模拟的 IO 调用结束。这表明 time.sleep(1) 调用已经不再阻塞,其它的工作也正在被做。

在程序结束时,看下总的运行时间你就会发现它实际上是 example_3.py 运行时间的一半。现在我们开始看到异步程序的优势了。

在并发运行两个或者多个事件可以通过非阻塞的方式来执行 IO 操作。通过使用 gevent greenlets 和控制上下文切换,我们就可以在多个任务之间实现多路复用,这个实现并不会遇到太多麻烦。

示例 5:异步(阻塞)HTTP 下载

程序(example_5.py)的下一个版本有一点进步也有一点退步。这个程序现在处理的是有真正 IO 操作的工作,即向一个 URL 列表发起 HTTP 请求来获取页面内容,但是它仍然是以阻塞(同步)的方式运行的。

我们修改了这个程序导入了非常棒的 requests 模块 来创建真实的 HTTP 请求,而且我们把一份 URL 列表加入到队列中,而不是像之前一样只是数字。在这个任务中,我们也没有再用计数器,而是使用 requests 模块来获取从队列里得到 URL 页面的内容,并且我们打印了执行这个操作的时间。

"""
example_5.py

Just a short example demonstrating a simple state machine in Python
This version is doing actual work, downloading the contents of
URL's it gets from a queue
"""

import queue
import requests
from lib.elapsed_time import ET


def task(name, work_queue):
    while not work_queue.empty():
        url = work_queue.get()
        print(f'Task {name} getting URL: {url}')
        et = ET()
        requests.get(url)
        print(f'Task {name} got URL: {url}')
        print(f'Task {name} total elapsed time: {et():.1f}')
        yield


def main():
    """
    This is the main entry point for the program
    """
    # create the queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the queue
    for url in [
        "http://google.com",
        "http://yahoo.com",
        "http://linkedin.com",
        "http://shutterfly.com",
        "http://mypublisher.com",
        "http://facebook.com"
    ]:
        work_queue.put(url)

    tasks = [
        task('One', work_queue),
        task('Two', work_queue)
    ]
    # run the scheduler to run the tasks
    et = ET()
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

    print()
    print(f'Total elapsed time: {et():.1f}')


if __name__ == '__main__':
    main()

和这个程序之前版本一样,我们使用一个 yield 关键字来把我们的任务函数转换成生成器,并且为了让其他任务实例可以执行,我们执行了一次上下文切换。

每个任务都会从工作队列中获取到一个 URL,获取这个 URL 指向页面的内容并且报告获取这些内容花了多长时间。

和之前一样,这个 yield 关键字让我们两个任务都能运行,但是因为这个程序是以同步的方式运行的,每个 requests.get() 调用在获取到页面之前都会阻塞 CPU。注意在最后运行整个程序的总时间,这对于下一个示例会很有意义。

示例 6:使用 gevent 实现异步(非阻塞)HTTP 下载

这个程序(example_6.py)的版本修改了先前的版本再次使用了 gevent 模块。记得 gevent 模块的 monkey.patch_all() 调用会修改之后的所有模块,这样这些模块的同步代码就会变成异步的,其中也包括 requests 模块。

现在的任务已经改成移除了对 yield 的调用,因为 requests.get(url) 调用已经不会再阻塞了,反而是执行一次上下文切换让出控制给 gevent 的事件循环。在 “run the task” 部分我们使用 gevent 来产生两个任务生成器,之后使用 joinall() 来等待它们完成。

"""
example_6.py

Just a short example demonstrating a simple state machine in Python
This version is doing actual work, downloading the contents of
URL's it gets from a queue. It's also using gevent to get the
URL's in an asynchronous manner.
"""

import gevent
from gevent import monkey
monkey.patch_all()

import queue
import requests
from lib.elapsed_time import ET


def task(name, work_queue):
    while not work_queue.empty():
        url = work_queue.get()
        print(f'Task {name} getting URL: {url}')
        et = ET()
        requests.get(url)
        print(f'Task {name} got URL: {url}')
        print(f'Task {name} total elapsed time: {et():.1f}')

def main():
    """
    This is the main entry point for the program
    """
    # create the queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the queue
    for url in [
        "http://google.com",
        "http://yahoo.com",
        "http://linkedin.com",
        "http://shutterfly.com",
        "http://mypublisher.com",
        "http://facebook.com"
    ]:
        work_queue.put(url)

    # run the tasks
    et = ET()
    tasks = [
        gevent.spawn(task, 'One', work_queue),
        gevent.spawn(task, 'Two', work_queue)
    ]
    gevent.joinall(tasks)
    print()
    print(f'Total elapsed time: {et():.1f}')

if __name__ == '__main__':
    main()

在程序运行的最后,你可以看下总共的时间和获取每个 URL 分别的时间。你将会看到总时间会少于 requests.get() 函数调用的累计时间。

这是因为这些函数调用是异步运行的,所以我们可以同一时间发送多个请求,从而更好地发挥出 CPU的优势。

示例 7:使用 Twisted 实现异步(非阻塞)HTTP 下载

程序(example_7.py)的版本使用了 Twisted 模块 ,该模块本所做的质上和 gevent 模块一样,即以非阻塞的方式下载 URL 对应的内容。

Twisted是一个非常强大的系统,采用了和 gevent 根本上不一样的方式来创建异步程序。gevent 模块是修改其模块使它们的同步代码变成异步,Twisted 提供了它自己的函数和方法来达到同样的结果。

之前在 example_6.py 中使用被打补丁的 requests.get(url) 调用来获取 URL 内容的位置,现在我们使用 Twisted 函数 getPage(url)

在这个版本中,@defer.inlineCallbacks 函数装饰器和语句 yield getPage(url) 一起实现把上下文切换到 Twisted 的事件循环。

在 gevent 中这个事件循环是隐含的,但是在 Twisted 中,事件循环由位于程序底部的 reactor.run() 明确提供。

"""
example_7.py

Just a short example demonstrating a simple state machine in Python
This version is doing actual work, downloading the contents of
URL's it gets from a work_queue. This version uses the Twisted
framework to provide the concurrency
"""

from twisted.internet import defer
from twisted.web.client import getPage
from twisted.internet import reactor, task

import queue
from lib.elapsed_time import ET


@defer.inlineCallbacks
def my_task(name, work_queue):
    try:
        while not work_queue.empty():
            url = work_queue.get()
            print(f'Task {name} getting URL: {url}')
            et = ET()
            yield getPage(url)
            print(f'Task {name} got URL: {url}')
            print(f'Task {name} total elapsed time: {et():.1f}')
    except Exception as e:
        print(str(e))


def main():
    """
    This is the main entry point for the program
    """
    # create the work_queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the work_queue
    for url in [
        b"http://google.com",
        b"http://yahoo.com",
        b"http://linkedin.com",
        b"http://shutterfly.com",
        b"http://mypublisher.com",
        b"http://facebook.com"
    ]:
        work_queue.put(url)

    # run the tasks
    et = ET()
    defer.DeferredList([
        task.deferLater(reactor, 0, my_task, 'One', work_queue),
        task.deferLater(reactor, 0, my_task, 'Two', work_queue)
    ]).addCallback(lambda _: reactor.stop())

    # run the event loop
    reactor.run()

    print()
    print(f'Total elapsed time: {et():.1f}')


if __name__ == '__main__':
    main()

注意最后的结果和 gevent 版本一样,整个程序运行的时间会小于获取每个 URL 内容的累计时间。

示例8:使用 Twisted 回调函数实现异步(非阻塞)HTTP 下载

程序 (example_8.py)的这个版本也是使用 Twisted 库,但是是以更传统的方式使用 Twisted。

这里我的意思是不再使用 @defer.inlineCallbacks / yield 这种代码风格,这个版本会使用明确的回调函数。一个"回调函数"是一个被传递给系统的函数,该函数可以在之后的事件响应中被调用。在下面的例子中,success_callback() 被提供给 Twisted,用来在 getPage(url) 调用完成后被调用。

注意在这个程序中 @defer.inlineCallbacks 装饰器并没有在 my_task() 函数中使用。除此之外,这个函数产出一个叫做 d 的变量,该变量是延后调用的缩写,是调用函数 getPage(url) 得到的返回值。

延后是 Twisted 处理异步编程的方式,回调函数就附加在其之上。当这个延后"触发"(即当 getPage(url) 完成时),会以回调函数被附加时定义的变量作为参数,来调用这个回调函数。

"""
example_8.py

Just a short example demonstrating a simple state machine in Python
This version is doing actual work, downloading the contents of
URL's it gets from a queue. This version uses the Twisted
framework to provide the concurrency
"""

from twisted.internet import defer
from twisted.web.client import getPage
from twisted.internet import reactor, task

import queue
from lib.elapsed_time import ET


def success_callback(results, name, url, et):
    print(f'Task {name} got URL: {url}')
    print(f'Task {name} total elapsed time: {et():.1f}')


def my_task(name, queue):
    if not queue.empty():
        while not queue.empty():
            url = queue.get()
            print(f'Task {name} getting URL: {url}')
            et = ET()
            d = getPage(url)
            d.addCallback(success_callback, name, url, et)
            yield d


def main():
    """
    This is the main entry point for the program
    """
    # create the queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the queue
    for url in [
        b"http://google.com",
        b"http://yahoo.com",
        b"http://linkedin.com",
        b"http://shutterfly.com",
        b"http://mypublisher.com",
        b"http://facebook.com"
    ]:
        work_queue.put(url)

    # run the tasks
    et = ET()

    # create cooperator
    coop = task.Cooperator()

    defer.DeferredList([
        coop.coiterate(my_task('One', work_queue)),
        coop.coiterate(my_task('Two', work_queue)),
    ]).addCallback(lambda _: reactor.stop())

    # run the event loop
    reactor.run()

    print()
    print(f'Total elapsed time: {et():.1f}')


if __name__ == '__main__':
    main()

运行这个程序的最终结果和先前的两个示例一样,运行程序的总时间小于获取 URLs 内容的总时间。

无论你使用 gevent 还是 Twisted,这只是个人的喜好和代码风格问题。这两个都是强大的库,提供了让程序员可以编写异步代码的机制

展开阅读全文
加载中

作者的其它热门文章

打赏
0
0 收藏
分享
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部