文档章节

python中的Queue与多进程(multiprocessing)

Kevin_Yang
 Kevin_Yang
发布于 2014/07/30 00:53
字数 1574
阅读 47802
收藏 22

最近接触一个项目,要在多个虚拟机中运行任务,参考别人之前项目的代码,采用了多进程来处理,于是上网查了查python中的多进程

一、先说说Queue(队列对象)

Queue是python中的标准库,可以直接import 引用,之前学习的时候有听过著名的“先吃先拉”与“后吃先吐”,其实就是这里说的队列,队列的构造的时候可以定义它的容量,别吃撑了,吃多了,就会报错,构造的时候不写或者写个小于1的数则表示无限多

import Queue

q = Queue.Queue(10)

向队列中放值(put)

q.put(‘yang’)

q.put(4)

q.put([‘yan’,’xing’])

在队列中取值get()

默认的队列是先进先出的

>>> q.get()
'yang'
>>> q.get()
4
>>> q.get()
['yan', 'xing']
>>>

 

当一个队列为空的时候如果再用get取则会堵塞,所以取队列的时候一般是用到

get_nowait()方法,这种方法在向一个空队列取值的时候会抛一个Empty异常

所以更常用的方法是先判断一个队列是否为空,如果不为空则取值

队列中常用的方法

Queue.qsize() 返回队列的大小 
Queue.empty() 如果队列为空,返回True,反之False 
Queue.full() 如果队列满了,返回True,反之False
Queue.get([block[, timeout]]) 获取队列,timeout等待时间 
Queue.get_nowait() 相当Queue.get(False)
非阻塞 Queue.put(item) 写入队列,timeout等待时间 
Queue.put_nowait(item) 相当Queue.put(item, False)

 

二、multiprocessing中使用子进程概念

from multiprocessing import Process

可以通过Process来构造一个子进程

p = Process(target=fun,args=(args))

再通过p.start()来启动子进程

再通过p.join()方法来使得子进程运行结束后再执行父进程

from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print 'Run child process %s (%s)...' % (name, os.getpid())

if __name__=='__main__':
    print 'Parent process %s.' % os.getpid()
    p = Process(target=run_proc, args=('test',))
    print 'Process will start.'
    p.start()
    p.join()
    print 'Process end.'

image

三、在multiprocessing中使用pool

如果需要多个子进程时可以考虑使用进程池(pool)来管理

from multiprocessing import Pool

 

from multiprocessing import Pool
import os, time

def long_time_task(name):
    print 'Run task %s (%s)...' % (name, os.getpid())
    start = time.time()
    time.sleep(3)
    end = time.time()
    print 'Task %s runs %0.2f seconds.' % (name, (end - start))

if __name__=='__main__':
    print 'Parent process %s.' % os.getpid()
    p = Pool()
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print 'Waiting for all subprocesses done...'
    p.close()
    p.join()
    print 'All subprocesses done.'

pool创建子进程的方法与Process不同,是通过

p.apply_async(func,args=(args))实现,一个池子里能同时运行的任务是取决你电脑的cpu数量,如我的电脑现在是有4个cpu,那会子进程task0,task1,task2,task3可以同时启动,task4则在之前的一个某个进程结束后才开始

image

上面的程序运行后的结果其实是按照上图中1,2,3分开进行的,先打印1,3秒后打印2,再3秒后打印3

代码中的p.close()是关掉进程池子,是不再向里面添加进程了,对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

当时也可以是实例pool的时候给它定义一个进程的多少

如果上面的代码中p=Pool(5)那么所有的子进程就可以同时进行

三、多个子进程间的通信

多个子进程间的通信就要采用第一步中说到的Queue,比如有以下的需求,一个子进程向队列中写数据,另外一个进程从队列中取数据,

#coding:gbk

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    for value in ['A', 'B', 'C']:
        print 'Put %s to queue...' % value
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    while True:
        if not q.empty():
            value = q.get(True)
            print 'Get %s from queue.' % value
            time.sleep(random.random())
        else:
            break

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()    
    # 等待pw结束:
    pw.join()
    # 启动子进程pr,读取:
    pr.start()
    pr.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    print
    print '所有数据都写入并且读完'

四、关于上面代码的几个有趣的问题

if __name__=='__main__':    
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    p = Pool()
    pw = p.apply_async(write,args=(q,))    
    pr = p.apply_async(read,args=(q,))
    p.close()
    p.join()
    
    print
    print '所有数据都写入并且读完'

如果main函数写成上面的样本,本来我想要的是将会得到一个队列,将其作为参数传入进程池子里的每个子进程,但是却得到

RuntimeError: Queue objects should only be shared between processes through inheritance

的错误,查了下,大意是队列对象不能在父进程与子进程间通信,这个如果想要使用进程池中使用队列则要使用multiprocess的Manager类

if __name__=='__main__':
    manager = multiprocessing.Manager()
    # 父进程创建Queue,并传给各个子进程:
    q = manager.Queue()
    p = Pool()
    pw = p.apply_async(write,args=(q,))
    time.sleep(0.5)
    pr = p.apply_async(read,args=(q,))
    p.close()
    p.join()
    
    print
    print '所有数据都写入并且读完'

 

这样这个队列对象就可以在父进程与子进程间通信,不用池则不需要Manager,以后再扩展multiprocess中的Manager类吧

关于锁的应用,在不同程序间如果有同时对同一个队列操作的时候,为了避免错误,可以在某个函数操作队列的时候给它加把锁,这样在同一个时间内则只能有一个子进程对队列进行操作,锁也要在manager对象中的锁

#coding:gbk

from multiprocessing import Process,Queue,Pool
import multiprocessing
import os, time, random

# 写数据进程执行的代码:
def write(q,lock):
    lock.acquire() #加上锁
    for value in ['A', 'B', 'C']:
        print 'Put %s to queue...' % value        
        q.put(value)        
    lock.release() #释放锁  

# 读数据进程执行的代码:
def read(q):
    while True:
        if not q.empty():
            value = q.get(False)
            print 'Get %s from queue.' % value
            time.sleep(random.random())
        else:
            break

if __name__=='__main__':
    manager = multiprocessing.Manager()
    # 父进程创建Queue,并传给各个子进程:
    q = manager.Queue()
    lock = manager.Lock() #初始化一把锁
    p = Pool()
    pw = p.apply_async(write,args=(q,lock))    
    pr = p.apply_async(read,args=(q,))
    p.close()
    p.join()
    
    print
    print '所有数据都写入并且读完'

参考文章:

http://blog.csdn.net/yatere/article/details/6668006

http://www.liaoxuefeng.com/wiki/001374738125095c955c1e6d8bb493182103fac9270762a000/0013868323401155ceb3db1e2044f80b974b469eb06cb43000

© 著作权归作者所有

共有 人打赏支持
Kevin_Yang
粉丝 253
博文 34
码字总数 34570
作品 0
通州
私信 提问
加载中

评论(3)

j
jiangzx
不错,谢谢!
立雪听风
立雪听风

引用来自“吐槽的达达仔”的评论

q = manager.Queue()
Mulitprocess里面的Queue不是线程安全的吗?还用Lock,是不是多此一举了??
赞同: Queue 是FIFO。就像 流水线作业,que=manager.Queue()
工人A put-->[queue:流水线] get--> (工人B)
(1)工人A::que.put(“产品”) , 如果流水线满了,则会阻塞等待直到流水线有空间;
(2)工人B:que.get(“产品”): 相反,如果queue空了,会阻塞等待直到流水线有产品

假设,工人A 、B 分别是一个进程(线程也可以),虽然他们是异步的,因为分别执行了 que.put() 、que.get() 形成了 B 等待 A 的同步关系。
另外,lock,不是锁住某个资源(文件/对象),而是解析器指令。线程及进程 比喻为学生。班上只有一台电脑(lock),老师(解析器)把电脑给你玩,那其它人就等着。如果你一直玩,不交出电脑。那么其它永远等着。这就是死锁!! 所以,要 try 电脑给你 finally 你交出电脑
吐槽的达达仔
吐槽的达达仔
q = manager.Queue()
Mulitprocess里面的Queue不是线程安全的吗?还用Lock,是不是多此一举了??
python--多进程的用法详解实例

想让python实现多进程(multiprocessing),我们要先区分不同的操作系统的不同之处。 Linux操作系统下提供了一个fork()系统调用,普通函数调用一次返回一次,fork()调用一次返回两次,因为操作...

山有木兮有木兮
05/14
0
0
如何利用多进程优化Python视频应用

如果要用Python播放视频,或者打开摄像头获取视频流,我们可以用OpenCV Python。但是在视频帧获取的时候同时做一些图像识别和处理,可能会因为耗时多而导致卡顿。一般来说,我们首先会想到把...

yushulx
09/04
0
0
Python标准库10 多进程初步 (multiprocessing包)

作者:Vamei 出处:http://www.cnblogs.com/vamei 欢迎转载,也请保留这段声明。谢谢! 我们已经见过了使用subprocess包来创建子进程,但这个包有两个很大的局限性:1) 我们总是让subproces...

osDaniel
2014/09/21
0
0
Python 2.6 亮点:multiprocessing模块

本来以为Python 2.6只是Python 3.0的过渡版本,不会有太多的新功能。但看到这个2.6的重大改动列表,才发现自己挺落后的。在2.6中新增的multiprocessing模块也绝对是Python 2.6的杀手级应用(...

索隆
2012/05/02
0
0
python --- mulitprocessing(多进程)模块使用

1. 什么是进程?   进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进...

码农47
2017/11/12
0
0

没有更多内容

加载失败,请刷新页面

加载更多

First Bad Version(leetcode278)

You are a product manager and currently leading a team to develop a new product. Unfortunately, the latest version of your product fails the quality check. Since each version is......

woshixin
10分钟前
1
0
executor 和task 优先于线程(68)

java.util.concurrent 包里有一个Executor 框架 基于接口的任务执行工具 只需要一行代码 提交一个runnable 方法 优雅的终止(必须做到,不然虚拟机可能不会退出) 对于负载不重的服务 Execut...

Java搬砖工程师
11分钟前
1
0
一条SQL查询语句是如何执行的

123

writeademo
13分钟前
1
0
hive 集成sentry

环境 apache-hive-2.3.3-bin apache-sentry-2.1.0-bin 1 2 sentry是目前最新的版本,支持hive的最高版本为2.3.3,hive版本如果高于2.3.3,会出一些版本兼容问题[亲测] hive快速安装 wget htt...

hblt-j
13分钟前
1
0
CSS中position属性( absolute | relative | static | fixed )详解

四个属性的特点 static:无特殊定位,对象遵循正常文档流。top,right,bottom,left等属性不会被应用。 relative:对象遵循正常文档流,但将依据top,right,bottom,left等属性在正常文档流...

简心
18分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部