文档章节

python gevent 模块

Bing-
 Bing-
发布于 2016/05/13 10:11
字数 3887
阅读 30
收藏 0

 Greenlet

    在gevent里面最多应用到的就是greenlet,一个轻量级的协程实现。在任何时间点,只有一个greenlet处于运行状态。Greenlet与multiprocessing 和 threading这两个库提供的真正的并行结构的区别在于这两个库会真正的切换进程,POSIX线程是由操作系统来负责调度,并且它们是真正并行的。

同步和异步

    应对并发的主要思路就是将一个大的任务分解成一个子任务的集合并且能够让它并行或者异步地执行,而不是一次执行一个或者同步执行。在两个子任务中的切换被称为上下文切换。

    gevent里面的上下文切换是非常平滑的。在下面的例子程序中,我们可以看到两个上下文通过调用 gevent.sleep()来互相切换。

import gevent

def foo():
    print('Running in foo')
    gevent.sleep(0)
    print('Explicit context switch to foo again')

def bar():
    print('Explicit context to bar')
    gevent.sleep(0)
    print('Implicit context switch back to bar')

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

    这段程序的执行结果如下:

Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar

    从这个执行结果可以看出这个程序的执行过程,在这里的两个函数是交替执行的。

    gevent的真正威力是在处理网络和带有IO阻塞的功能时能够这些任务协调地运行。gevent来实现了这些具体的细节来保证在需要的时候greenlet上下文进行切换。在这里用一个例子来说明。

import time
import gevent
from gevent import select

start = time.time()
tic = lambda: 'at %1.1f seconds' % (time.time() - start)

def gr1():
    # Busy waits for a second, but we don't want to stick around...
    print('Started Polling: ', tic())
    select.select([], [], [], 2)
    print('Ended Polling: ', tic())

def gr2():
    # Busy waits for a second, but we don't want to stick around...
    print('Started Polling: ', tic())
    select.select([], [], [], 2)
    print('Ended Polling: ', tic())

def gr3():
    print("Hey lets do some stuff while the greenlets poll, at", tic())
    gevent.sleep(1)

gevent.joinall([
    gevent.spawn(gr1),
    gevent.spawn(gr2),
    gevent.spawn(gr3),
])

在上面的例子里,select() 通常是一个阻塞的调用。

程序的执行结果如下:

Started Polling:  at 0.0 seconds
Started Polling:  at 0.0 seconds
Hey lets do some stuff while the greenlets poll, at at 0.0 seconds
Ended Polling:  at 2.0 seconds
Ended Polling:  at 2.0 seconds

接下来一个例子中可以看到gevent是安排各个任务的执行的。

import gevent
import random

def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(random.randint(0,2)*0.001)
    print('Task', pid, 'done')

def synchronous():
    for i in range(1,10):
        task(i)

def asynchronous():
    threads = [gevent.spawn(task, i) for i in xrange(10)]
    gevent.joinall(threads)

print('Synchronous:')
synchronous()

print('Asynchronous:')
asynchronous()

执行结果如下:

root@master:~# python two.py 
Synchronous:
('Task', 1, 'done')
('Task', 2, 'done')
('Task', 3, 'done')
('Task', 4, 'done')
('Task', 5, 'done')
('Task', 6, 'done')
('Task', 7, 'done')
('Task', 8, 'done')
('Task', 9, 'done')
Asynchronous:
('Task', 0, 'done')
('Task', 9, 'done')
('Task', 7, 'done')
('Task', 3, 'done')
('Task', 6, 'done')
('Task', 5, 'done')
('Task', 4, 'done')
('Task', 1, 'done')
('Task', 2, 'done')
('Task', 8, 'done')

在同步的情况下,任务是按顺序执行的,在执行各个任务的时候会阻塞主线程。

而gevent.spawn 的重要功能就是封装了greenlet里面的函数。初始化的greenlet放在了threads这个list里面,被传递给了 gevent.joinall 这个函数,它会阻塞当前的程序来执行所有的greenlet。

在异步执行的情况下,所有任务的执行顺序是完全随机的。每一个greenlet的都不会阻塞其他greenlet的执行。

在有时候需要异步地从服务器获取数据,gevent可以通过判断从服务器的数据载入情况来处理请求。

import gevent.monkey
gevent.monkey.patch_socket()

import gevent
import urllib2
import simplejson as json

def fetch(pid):
    response = urllib2.urlopen('http://json-time.appspot.com/time.json')
    result = response.read()
    json_result = json.loads(result)
    datetime = json_result['datetime']

    print 'Process ', pid, datetime
    return json_result['datetime']

def synchronous():
    for i in range(1,10):
        fetch(i)

def asynchronous():
    threads = []
    for i in range(1,10):
        threads.append(gevent.spawn(fetch, i))
    gevent.joinall(threads)

print 'Synchronous:'
synchronous()

print 'Asynchronous:'
asynchronous()

确定性

就像之前说的,greenlet是确定的。给每个greenlet相同的配置和相同的输入,得到的输出是相同的。我们可以用python 的多进程池和gevent池来作比较。下面的例子可以说明这个特点:

import time

def echo(i):
    time.sleep(0.001)
    return i

# Non Deterministic Process Pool

from multiprocessing.pool import Pool

p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]

print( run1 == run2 == run3 == run4 )

# Deterministic Gevent Pool

from gevent.pool import Pool

p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]

print( run1 == run2 == run3 == run4 )

下面是执行结果:

False
True

 从上面的例子可以看出,执行同一个函数,产生的greenlet是相同的,而产生的process是不同的。

 在处理并发编程的时候会碰到一些问题,比如竞争资源的问题。最简单的情况,当有两个线程或进程访问同一资源并且修改这个资源的时候,就会引发资源竞争的问题。那么这个资源最终的值就会取决于那个线程或进程是最后执行的。这是个问题,总之,在处理全局的程序不确定行为的时候,需要尽量避免资源竞争的问题

 最好的方法就是在任何时候尽量避免使用全局的状态。全局状态是经常会坑你的!

产生Greenlet

在gevent里面封装了一些初始化greenlet的方法,下面是几个最常用的例子:

import gevent
from gevent import Greenlet

def foo(message, n):
    """
    Each thread will be passed the message, and n arguments
    in its initialization.
    """
    gevent.sleep(n)
    print(message)

# Initialize a new Greenlet instance running the named function
# foo
thread1 = Greenlet.spawn(foo, "Hello", 1)

# Wrapper for creating and runing a new Greenlet from the named 
# function foo, with the passed arguments
thread2 = gevent.spawn(foo, "I live!", 2)

# Lambda expressions
thread3 = gevent.spawn(lambda x: (x+1), 2)

threads = [thread1, thread2, thread3]

# Block until all threads complete.
gevent.joinall(threads)

在上面的程序里使用 spawn 方法来产生greenlet。还有一种初始化greenlet的方法,就是创建Greenlet的子类,并且重写 _run 方法。

import gevent
from gevent import Greenlet

class MyGreenlet(Greenlet):

    def __init__(self, message, n):
        Greenlet.__init__(self)
        self.message = message
        self.n = n

    def _run(self):
        print(self.message)
        gevent.sleep(self.n)

g = MyGreenlet("Hi there!", 3)
g.start()
g.join()

Greenlet 的状态

就像其他的代码一样,greenlet在执行的时候也会出错。Greenlet有可能会无法抛出异常,停止失败,或者消耗了太多的系统资源。

greenlet的内部状态通常是一个依赖时间的参数。greenlet有一些标记来让你能够监控greenlet的状态。

  • started -- 标志greenlet是否已经启动
  • ready -- 标志greenlet是否已经被终止
  • successful() -- 标志greenlet是否已经被终止,并且没有抛出异常
  • value -- 由greenlet返回的值
  • exception -- 在greenlet里面没有被捕获的异常
import gevent

def win():
    return 'You win!'

def fail():
    raise Exception('You fail at failing.')

winner = gevent.spawn(win)
loser = gevent.spawn(fail)

print(winner.started) # True
print(loser.started)  # True

# Exceptions raised in the Greenlet, stay inside the Greenlet.
try:
    gevent.joinall([winner, loser])
except Exception as e:
    print('This will never be reached')

print(winner.value) # 'You win!'
print(loser.value)  # None

print(winner.ready()) # True
print(loser.ready())  # True

print(winner.successful()) # True
print(loser.successful())  # False

# The exception raised in fail, will not propogate outside the
# greenlet. A stack trace will be printed to stdout but it
# will not unwind the stack of the parent.

print(loser.exception)

# It is possible though to raise the exception again outside
# raise loser.exception
# or with
# loser.get()

 这段代码的执行结果如下:

True
True
You win!
None
True
True
True
False
You fail at failing.

终止程序

在主程序收到一个SIGQUIT 之后会阻塞程序的执行让Greenlet无法继续执行。这会导致僵尸进程的产生,需要在操作系统中将这些僵尸进程清除掉。

import gevent
import signal

def run_forever():
    gevent.sleep(1000)

if __name__ == '__main__':
    gevent.signal(signal.SIGQUIT, gevent.shutdown)
    thread = gevent.spawn(run_forever)
    thread.join()

超时

gevent提供了对与代码运行时的时间限制功能,也就是超时功能。

import gevent
from gevent import Timeout

seconds = 10

timeout = Timeout(seconds)
timeout.start()

def wait():
    gevent.sleep(10)

try:
    gevent.spawn(wait).join()
except Timeout:
    print 'Could not complete'

也可以通过用with 上下文的方法来实现超时的功能:

import gevent
from gevent import Timeout

time_to_wait = 5 # seconds

class TooLong(Exception):
    pass

with Timeout(time_to_wait, TooLong):
    gevent.sleep(10)

gevent还提供了一些超时的参数以应对不同的状况:

import gevent
from gevent import Timeout

def wait():
    gevent.sleep(2)

timer = Timeout(1).start()
thread1 = gevent.spawn(wait)

try:
    thread1.join(timeout=timer)
except Timeout:
    print('Thread 1 timed out')

# --

timer = Timeout.start_new(1)
thread2 = gevent.spawn(wait)

try:
    thread2.get(timeout=timer)
except Timeout:
    print('Thread 2 timed out')

# --

try:
    gevent.with_timeout(1, wait)
except Timeout:
    print('Thread 3 timed out')

运行结果如下:

Thread 1 timed out
Thread 2 timed out
Thread 3 timed out

Monkeypatching

现在这是gevent里面的一个难点。下面一个例子里可能看到 monkey.patch_socket() 能够在运行时里面修改基础库socket:

import socket
print( socket.socket )

print "After monkey patch"
from gevent import monkey
monkey.patch_socket()
print( socket.socket )

import select
print select.select
monkey.patch_select()
print "After monkey patch"
print( select.select )

 运行结果如下:

class 'socket.socket'
After monkey patch
class 'gevent.socket.socket'

built-in function select
After monkey patch
function select at 0x1924de8

 Python的运行时里面允许能够大部分的对象都是可以修改的,包括模块,类和方法。这通常是一个坏主意,然而在极端的情况下,当有一个库需要加入一些Python基本的功能的时候,monkey patch就能派上用场了。在上面的例子里,gevent能够改变基础库里的一些使用IO阻塞模型的库比如socket,ssl,threading等等并且把它们改成协程的执行方式。

 

事件

事件是一种可以让greenlet进行异步通信的手段。

 

import gevent
from gevent.event import AsyncResult

a = AsyncResult()

def setter():
    """
    After 3 seconds set wake all threads waiting on the value of
    a.
    """
    gevent.sleep(3)
    a.set()

def waiter():
    """
    After 3 seconds the get call will unblock.
    """
    a.get() # blocking
    print 'I live!'

gevent.joinall([
    gevent.spawn(setter),
    gevent.spawn(waiter),
])

AsyncResult 是 event对象的扩展能够让你来发送值并且带有一定延迟。这种功能被成为feature或deferred,当它拿到一个未来的值的引用时,能够在任意安排好的时间内让它起作用。

队列

 

队列是一个有序的数据集合,通常有 put/get 的操作,这样能让队列在有在有greenletJ进行操作的时候能够进行安全的管理。

例如,如果greenlet从队列中取出了一项数据,那么这份数据就不能被另一个greenlet取出。

 

import gevent
from gevent.queue import Queue

tasks = Queue()

def worker(n):
    while not tasks.empty():
        task = tasks.get()
        print('Worker %s got task %s' % (n, task))
        gevent.sleep(0)

    print('Quitting time!')

def boss():
    for i in xrange(1,25):
        tasks.put_nowait(i)

gevent.spawn(boss).join()

gevent.joinall([
    gevent.spawn(worker, 'steve'),
    gevent.spawn(worker, 'john'),
    gevent.spawn(worker, 'nancy'),
])

 

执行的结果如下:

Worker steve got task 1
Worker john got task 2
Worker nancy got task 3
Worker steve got task 4
Worker nancy got task 5
Worker john got task 6
Worker steve got task 7
Worker john got task 8
Worker nancy got task 9
Worker steve got task 10
Worker nancy got task 11
Worker john got task 12
Worker steve got task 13
Worker john got task 14
Worker nancy got task 15
Worker steve got task 16
Worker nancy got task 17
Worker john got task 18
Worker steve got task 19
Worker john got task 20
Worker nancy got task 21
Worker steve got task 22
Worker nancy got task 23
Worker john got task 24
Quitting time!
Quitting time!
Quitting time!

队列的 put/get 操作在需要的情况下也可以阻塞程序的执行。

put 和 get 操作都有非阻塞的副本,就是 put_nowait 和 get_nowait。

在下面代码的例子里,运行一个叫boss的方法,同时运行worker方法,并且对队列有一个限制:队列的子项不能超过3个。这个限制意味着 put 操作在队列里面有足够空间之前会阻塞。相反,如果队列里没有任何子项,get操作会阻塞,同时也需要超时的机制,当一个操作在阻塞超过一定时间后会抛出异常。

import gevent
from gevent.queue import Queue, Empty

tasks = Queue(maxsize=3)

def worker(n):
    try:
        while True:
            task = tasks.get(timeout=1) # decrements queue size by 1
            print('Worker %s got task %s' % (n, task))
            gevent.sleep(0)
    except Empty:
        print('Quitting time!')

def boss():
    """
    Boss will wait to hand out work until a individual worker is
    free since the maxsize of the task queue is 3.
    """

    for i in xrange(1,10):
        tasks.put(i)
    print('Assigned all work in iteration 1')

    for i in xrange(10,20):
        tasks.put(i)
    print('Assigned all work in iteration 2')

gevent.joinall([
    gevent.spawn(boss),
    gevent.spawn(worker, 'steve'),
    gevent.spawn(worker, 'john'),
    gevent.spawn(worker, 'bob'),
])

代码的执行结果如下:

Worker steve got task 1
Worker john got task 2
Worker bob got task 3
Worker steve got task 4
Worker bob got task 5
Worker john got task 6
Assigned all work in iteration 1
Worker steve got task 7
Worker john got task 8
Worker bob got task 9
Worker steve got task 10
Worker bob got task 11
Worker john got task 12
Worker steve got task 13
Worker john got task 14
Worker bob got task 15
Worker steve got task 16
Worker bob got task 17
Worker john got task 18
Assigned all work in iteration 2
Worker steve got task 19
Quitting time!
Quitting time!
Quitting time!

组和池

组是一个由greenlet组成的集合,并且能够被统一管理。

import gevent
from gevent.pool import Group

def talk(msg):
    for i in xrange(3):
        print(msg)

g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz')

group = Group()
group.add(g1)
group.add(g2)
group.join()

group.add(g3)
group.join()

这在管理一组异步任务的时候会很有用。

Group还提供了一个API来分配成组的greenlet任务,并且通过不同的方法来获取结果。

import gevent
from gevent import getcurrent
from gevent.pool import Group

group = Group()

def hello_from(n):
    print('Size of group', len(group))
    print('Hello from Greenlet %s' % id(getcurrent()))

group.map(hello_from, xrange(3))

def intensive(n):
    gevent.sleep(3 - n)
    return 'task', n

print('Ordered')

ogroup = Group()
for i in ogroup.imap(intensive, xrange(3)):
    print(i)

print('Unordered')

igroup = Group()
for i in igroup.imap_unordered(intensive, xrange(3)):
    print(i)

执行结果如下:

Size of group 3
Hello from Greenlet 10769424
Size of group 3
Hello from Greenlet 10770544
Size of group 3
Hello from Greenlet 10772304
Ordered
('task', 0)
('task', 1)
('task', 2)
Unordered
('task', 2)
('task', 1)
('task', 0)

池是用来处理当拥有动态数量的greenlet需要进行并发管理(限制并发数)时使用的。

这在处理大量的网络和IO操作的时候是非常需要的。

import gevent
from gevent.pool import Pool

pool = Pool(2)

def hello_from(n):
    print('Size of pool', len(pool))

pool.map(hello_from, xrange(3))
Size of pool 2
Size of pool 2
Size of pool 1

经常在创建gevent驱动程序的时候,整个服务需要围绕一个池的结构来执行。

锁和信号量

信号量是低级别的同步机制,能够让greenlet在执行的时候互相协调并且限制其并发数。信号量暴露了两个方法,acquire 和 release。如果信号量范围变成0,那么它会阻塞住直到另一个greenlet释放它的获得物。

from gevent import sleep
from gevent.pool import Pool
from gevent.coros import BoundedSemaphore

sem = BoundedSemaphore(2)

def worker1(n):
    sem.acquire()
    print('Worker %i acquired semaphore' % n)
    sleep(0)
    sem.release()
    print('Worker %i released semaphore' % n)

def worker2(n):
    with sem:
        print('Worker %i acquired semaphore' % n)
        sleep(0)
    print('Worker %i released semaphore' % n)

pool = Pool()
pool.map(worker1, xrange(0,2))
pool.map(worker2, xrange(3,6))

一下是代码的执行结果:

Worker 0 acquired semaphore
Worker 1 acquired semaphore
Worker 0 released semaphore
Worker 1 released semaphore
Worker 3 acquired semaphore
Worker 4 acquired semaphore
Worker 3 released semaphore
Worker 4 released semaphore
Worker 5 acquired semaphore
Worker 5 released semaphore

 如果把信号量的数量限制为1那么它就成为了锁。它经常会在多个greenlet访问相同资源的时候用到。

本地线程

Gevent还能够让你给gevent上下文来指定那些数据是本地的。

import gevent
from gevent.local import local

stash = local()

def f1():
    stash.x = 1
    print(stash.x)

def f2():
    stash.y = 2
    print(stash.y)

    try:
        stash.x
    except AttributeError:
        print("x is not local to f2")

g1 = gevent.spawn(f1)
g2 = gevent.spawn(f2)

gevent.joinall([g1, g2])

 以下是执行结果:

1
2
x is not local to f2

 很多集成了gevent的框架把HTTP的session对象存在gevent 本地线程里面。比如下面的例子:

from werkzeug.local import LocalProxy
from werkzeug.wrappers import Request
from contextlib import contextmanager

from gevent.wsgi import WSGIServer

_requests = local()
request = LocalProxy(lambda: _requests.request)

@contextmanager
def sessionmanager(environ):
    _requests.request = Request(environ)
    yield
    _requests.request = None

def logic():
    return "Hello " + request.remote_addr

def application(environ, start_response):
    status = '200 OK'

    with sessionmanager(environ):
        body = logic()

    headers = [
        ('Content-Type', 'text/html')
    ]

    start_response(status, headers)
    return [body]

WSGIServer(('', 8000), application).serve_forever()

 子进程

在gevent 1.0版本中,gevent.subprocess 这个库被添加上。这个库能够让子进程相互协调地执行。

import gevent
from gevent.subprocess import Popen, PIPE

def cron():
    while True:
        print "cron"
        gevent.sleep(0.2)

g = gevent.spawn(cron)
sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True)
out, err = sub.communicate()
g.kill()
print out.rstrip()

 执行结果:

cron
cron
cron
cron
cron
Linux

© 著作权归作者所有

共有 人打赏支持
Bing-
粉丝 0
博文 25
码字总数 18863
作品 0
大连
Win7 + VS2010 + Python2.7.5 安装 gevent

昨天折腾了下gevent,做个简单的记录。具体环境本文标题已经说得很明白了, win7是32位的 1. 下载gevent安装包 去gevent官网下载个source包, 地址是这里 http://pypi.python.org/packages/so...

cardinalinux
2013/09/10
0
0
python --- 协程编程(第三方库gevent的使用)

1. 什么是协程?   协程(coroutine),又称微线程。协程不是线程也不是进程,它的上下文关系切换不是由CPU控制,一个协程由当前任务切换到其他任务由当前任务来控制。一个线程可以包含多个...

码农47
2017/11/19
0
0
Locust install on windows Locust 在win上安装 笔记

首先,安装python,2.7, Locust 需要的版本 (Locust requires Python 2.6+. It is not currently compatible with Python 3.x.) 然后把python配置到环境变量,再安装,pip。 python,pip...

menglinxi_a
2015/08/05
0
0
python模块介绍-gevent介绍:基于协程的网络库

python模块介绍-gevent介绍:基于协程的网络库 介绍 gevent是基于协程的Python网络库。特点: 基于libev的快速事件循环(Linux上epoll,FreeBSD上kqueue)。 基于greenlet的轻量级执行单元。 ...

磁针石
2014/01/13
0
2
Ubuntu 安装gevent

Gevent是一个基于greenlet的Python的并发框架,以赖于greenlet和libevent库,因此安装Gevent前,首先需要安装greenlet和libevent。 libevent的安装,这里就不介绍了,网上的安装教程一大箩,...

jackliu8722
2012/07/16
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

angular 解决其他电脑不能访问的问题。

ng serve --host 0.0.0.0 --disable-host-check

miaojiangmin
今天
1
0
优酷视频文件怎么转换格式

  以前在优酷上下载视频都只是在手机上观看,但随着科技的发展,对于视频的要求也逐渐增多,不再只是观看视频那么简单,在精彩的部分还会将其单独分割出来,然后进行视频剪辑,可以做出我们...

萤火的萤火
今天
0
0
数据结构:散列

在一个数据结构中查找key元素,用顺序查找、二分查找都需要经过一系列关键之比较才能查找到结果,平均查找长度与数据量有关,元素越多比较次数就越多。 如果根据元素的关键字就能知道元素的存...

京一
今天
1
0
Apache RocketMQ 正式开源分布式事务消息

近日,Apache RocketMQ 社区正式发布4.3版本。此次发布不仅包括提升性能,减少内存使用等原有特性增强,还修复了部分社区提出的若干问题,更重要的是该版本开源了社区最为关心的分布式事务消...

阿里云云栖社区
今天
33
0
使用JavaScript和MQTT开发物联网应用

如果说Java和C#哪个是最好的开发语言,无疑会挑起程序员之间的相互怒怼,那如果说JavaScript是动态性最好的语言,相信大家都不会有太大的争议。随着越来越多的硬件平台和开发板开始支持JavaS...

少年不搬砖老大徒伤悲
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部