文档章节

Python学习-并发编程之进程池

D7
 D7
发布于 2017/08/30 20:23
字数 1916
阅读 53
收藏 0

Python学习-并发编程之进程池

进程池概述

多进程的目的是实现程序的并发,每个进程可以执行不同的任务,以提高程序的效率。如果进程需要执行的任务都是类似的,那么我们可以给这些进程创建一个池子,并设置最小进程数和最大进程数,这样可以高效率的执行任务。例如:httpd服务的进程模式。创建进程池可以通过multiprocessing模块的Pool类,也可以通过Python的并发库concurrent.futures的ProcessPoolExecutor类来创建。

利用multiprocessing模块实现进程池

Pool类概述

# 这里只列举一些常用方法
class Pool(object):
  	'''
  	numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值;
	nitializer:是每个工作进程启动时要执行的可调用对象,默认为None;
	initargs:是要传给initializer的参数组。
  	'''
  	def __init__(self, processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None, context=None):
      	pass
  
    def apply(self, func, args=(), kwds={}):
      	'''
      	向进程池中添加一个同步的进程,进程执行func(*args,**kwargs),然后返回结果。
      	注意:用apply()方法向进程池中添加的进程是串行的,
      	即只有拿到这个进程的返回值(即func的返回值)才会继续执行下一个进程。
      	'''
        pass
      
	def apply_async(self, func, args=(), kwds={}, callback=None,
                    error_callback=None):
      	'''
      	向进程池中在添加一个异步的进程,进程执行func(*args,**kwargs),然后返回结果。
      	此方法返回的结果是AsyncResult类的实例,callback是可调用对象,可以有参数。
      	当func产生返回值时,将返回值传递给callback(callback即是回调函数),
      	callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
      	'''
        pass

    def close(self):
      	'''
      	关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成。
      	'''
        pass

    def terminate(self):
      	'''
      	立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。
      	如果进程池被垃圾回收,将自动调用此函数。
      	'''
        pass

    def join(self):
      	'''等待所有工作进程退出,此方法只能在close()或teminate()之后调用'''
        pass

Pool类创建进程池的方法

# 创建同步进程池
import os,time,random
from multiprocessing import Pool

def Calculation(value, count):		# 创建一个处理函数
    if value == 1:
        print("pid:%s count:%d" %((os.getpid(), count)))
        return count
    if value % 2 == 0:
        count += 1
        value = value / 2
        time.sleep(1)               # 模拟执行时间
    else:
        value = value * 3 + 1
    return Calculation(value, count)

if __name__ == '__main__':
  	# 进程池中从无到有创建四个进程,以后一直是这四个进程在循环执行任务
    P_Pool = Pool(4)		# 默认是cpu核数
    P_List=[]
    count = 0
    for i in range(10):	# 创建10个进程
        # 同步运行,阻塞、直到本次任务执行完毕拿到返回值ret
        ret = P_Pool.apply(Calculation, args=(random.randint(i+10, i+100), count,))
        P_List.append(ret)
    print(P_List)
# 创建异步进程池
import os, time, random
from multiprocessing import Pool

def Calculation(value, count):		# 创建一个处理函数
    if value == 1:
        print("pid:%s count:%d" %((os.getpid(), count)))
        return count
    if value % 2 == 0:
        count += 1
        value = value / 2
        time.sleep(1)               # 模拟执行时间
    else:
        value = value * 3 + 1
    return Calculation(value, count)

if __name__ == '__main__':
  	# 进程池中从无到有创建四个进程,以后一直是这四个进程在循环执行任务
    P_Pool = Pool(4)
    P_List=[]
    count = 0
    for i in range(10):
      	# 异步运行,同时4个进程,运行完成后返回一个AsyncResult类的实例
        ret = P_Pool.apply_async(Calculation, args=(random.randint(i+1, i+10), count,))
        P_List.append(ret)
    # 关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成。
    P_Pool.close()
    # 调用join之前,先调用close函数,否则会出错。
    # 执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束。
    # 不join的话,还没等子进程执行,主进程就结束了。
    P_Pool.join()
    for ret in P_List:
        print(ret.get())	# 使用AsyncResult对象的get()方法来获取结果

利用concurrent.futures模块创建进程池

ProcessPoolExecutor类概述

class ProcessPoolExecutor(_base.Executor):
  	def __init__(self, max_workers=None):
      	'''max_workers进程池中最大进程数'''
      	pass
  
  	def submit(self, fn, *args, **kwargs):
      	'''
      	向进程池中提交进程,默认是异步的,
      	fn:进程处理的对象,函数的地址,
      	*args:以位置传参的方式给处理对象传递参数,
      	**kwargs:以关键字传参的方式给对象传递参数,
      	返回一个Future类型的对象。
      	'''
    	pass
    
    def map(self, fn, *iterables, timeout=None, chunksize=1):
      	'''
      	向进程池中提交一个类似如map()函数功能的进程
      	fn:进程处理的对象,函数的地址,
      	*iterables:传递一个可迭代的对象,
      	map()方法返回值为一个迭代器。
      	'''
    	pass
    
    def shutdown(self, wait=True):
      	'''关闭进程池'''
      	pass

ProcessPoolExecutor类创建进程池的方法

# 用ProcessPoolExecutor类创建的进程池默认都是异步的
import os,time,random
from concurrent.futures import  ProcessPoolExecutor

def Calculation(value, count):		# 创建一个处理函数
    if value == 1:
        print("pid:%s count:%d" %((os.getpid(), count)))
        return count
    if value % 2 == 0:
        count += 1
        value = value / 2
        time.sleep(1)				# 模拟执行时间
    else:
        value = value * 3 + 1
    return Calculation(value, count)

if __name__ == '__main__':
  	# ProcessPoolExecutor类支持with上下文管理,因此可以省略shutdown()方法
    with ProcessPoolExecutor(4) as P_Pool:
        P_List = []
        count = 0
        for i in range(10):
            ret = P_Pool.submit(Calculation, random.randint(i+1, i+10), count)
            # 如果想实现同步进程池,可用下述方法,返回的就直接就是进程的值
            # ret = P_Pool.submit(Calculation, random.randint(i+1, i+10), count).result()
            P_List.append(ret)
    # 同Pool类获取返回值的方法不同,这里使用result()方法,而不是get()方法
    print([ret.result() for ret in P_List])

进程池的回调函数

需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程。主进程则调用一个函数去处理该结果,该函数即回调函数。例如:可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数,这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。注意:只有异步的进程池才有回调函数。

Pool类实现进程池的回调函数

from multiprocessing import Pool
import os,requests

def get_page(url):				# 定义一个获取网页的函数
    print('<进程%s> get %s' %(os.getpid(), url))
    respone = requests.get(url)		# 获取网页
    if respone.status_code == 200:	# 查看获取状态
        return {'url':url,'text':respone.text}	# 返回获取的内容

def pasrse_page(res):		# 定义处理网页的函数
    print('<进程%s> parse %s' %(os.getpid(),res['url']))
    parse_res = 'url:<%s> size:[%s]\n' %(res['url'], len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)		# 保存至文件

if __name__ == '__main__':

    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    p = Pool(4)				# 定义4个进程的进程池
    res_l = []
    for url in urls:
      	# 定义异步的进程,并指定回调函数,回调函数接收get_page函数的返回值
        res = p.apply_async(get_page,args=(url,),callback=pasrse_page)
        res_l.append(res)

ProcessPoolExecutor实现进程池的回调函数

from concurrent.futures import ProcessPoolExecutor
import os,requests

def get_page(url):				# 定义一个获取网页的函数
    print('<进程%s> get %s' %(os.getpid(), url))
    respone = requests.get(url)		# 获取网页
    if respone.status_code == 200:	# 查看获取状态
        return {'url':url,'text':respone.text}	# 返回获取的内容

def pasrse_page(res):		# 定义处理网页的函数
    res = res.result()      # 注意:此时接受的res是Future类型的对象,通过result() 方法拿到值
    print('<进程%s> parse %s' %(os.getpid(), res['url']))
    parse_res = 'url:<%s> size:[%s]\n' %(res['url'], len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)		# 保存至文件

if __name__ == '__main__':

    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    with ProcessPoolExecutor(4) as P_Pool:
        P_List = []
        for url in urls:
            # 注意:add_done_callback()方法接受进程返回的Future类型的对象,而不进程的结果值
            res = P_Pool.submit(get_page, url).add_done_callback(pasrse_page)
            P_List.append(res)

© 著作权归作者所有

共有 人打赏支持
D7

D7

粉丝 0
博文 5
码字总数 5326
作品 0
海淀
技术主管
Python高级编程和异步IO并发编程

Python高级编程和异步IO并发编程 网盘地址:https://pan.baidu.com/s/1eB-BsUacBRhKxh7qXwndMQ 密码: tgba 备用地址(腾讯微云):https://share.weiyun.com/5Z3x9V0 密码:7cdnb2 针对Pytho...

人气王子333
04/23
0
0
使用 Python 进行线程编程

对于 Python 来说,并不缺少并发选项,其标准库中包括了对线程、进程和异步 I/O 的支持。在许多情况下,通过创建诸如异步、线程和子进程之类的高层模块,Python 简化了各种并发方法的使用。除...

丰圣谋
2013/08/22
0
0
Python3 与 C# 并发编程之~ 进程篇

上次说了很多Linux下进程相关知识,这边不再复述,下面来说说Python的并发编程,如有错误欢迎提出~ 如果遇到听不懂的可以看上一次的文章:https://www.cnblogs.com/dotnetcrazy/p/9363810.h...

鲲逸鹏
昨天
0
0
Python 进程线程协程 GIL 闭包 与高阶函数(五)

1 GIL线程全局锁 线程全局锁(Global Interpreter Lock),即Python为了保证线程安全而采取的独立线程运行的限制,说白了就是一个核只能在同一时间运行一个线程.对于io密集型任务,python的多线程...

善良小郎君
06/20
0
0
多进程(multiprocessing module)

一、多进程 1.1 多进程的概念   由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的...

Hme
07/21
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Spring Cloud Gateway真的有那么差吗?

前言 Spring Cloud从一开始最受大家质疑的就是网关性能,那是由于Spring Cloud最初选择了使用Netflix几年前开源的Zuul作为基础,而高性能版的Zuul 2在经过了多次跳票之后,对于Spring这样的整...

Java小铺
32分钟前
1
0
SpringBoot远程调试,远程debug你的线上项目

开发环境中代码出错了,可以利用IDE的debug功能来进行调试。那线上环境出错呢? 一、假设我们的项目是部署在tomcat中,那我们就需要对tomcat进行一定对配置,配置如下。 1. windows系统中,找...

nonnetta
37分钟前
0
0
JAVA秒杀优化方向

秒杀优化方向 将请求尽量拦截在系统上游:传统秒杀系统之所以挂,请求都压倒了后端数据层,数据读写锁冲突严重,几乎所有请求都超时,流量虽大,下单成功的有效流量甚小,我们可以通过限流、...

小贱是个程序员
45分钟前
0
0
C# 统计字符串中大写字母和小写字母的个数

static void Main() { int count1 = 0; int count2 = 0; Console.WriteLine("请输入字符串"); string str = Convert.ToString(Consol......

熊二的爸爸是谁
47分钟前
0
0
分布式服务框架之远程通讯技术及原理分析

在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现远程通讯的技术,例如:RMI、MINA、ESB、Burlap、Hessian、SOAP、EJB和JMS等,这些名词之间到底是...

老道士
52分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部