文档章节

Python学习-并发编程之进程池

D7
 D7
发布于 2017/08/30 20:23
字数 1916
阅读 71
收藏 0

Python学习-并发编程之进程池

进程池概述

多进程的目的是实现程序的并发,每个进程可以执行不同的任务,以提高程序的效率。如果进程需要执行的任务都是类似的,那么我们可以给这些进程创建一个池子,并设置最小进程数和最大进程数,这样可以高效率的执行任务。例如:httpd服务的进程模式。创建进程池可以通过multiprocessing模块的Pool类,也可以通过Python的并发库concurrent.futures的ProcessPoolExecutor类来创建。

利用multiprocessing模块实现进程池

Pool类概述

# 这里只列举一些常用方法
class Pool(object):
  	'''
  	numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值;
	nitializer:是每个工作进程启动时要执行的可调用对象,默认为None;
	initargs:是要传给initializer的参数组。
  	'''
  	def __init__(self, processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None, context=None):
      	pass
  
    def apply(self, func, args=(), kwds={}):
      	'''
      	向进程池中添加一个同步的进程,进程执行func(*args,**kwargs),然后返回结果。
      	注意:用apply()方法向进程池中添加的进程是串行的,
      	即只有拿到这个进程的返回值(即func的返回值)才会继续执行下一个进程。
      	'''
        pass
      
	def apply_async(self, func, args=(), kwds={}, callback=None,
                    error_callback=None):
      	'''
      	向进程池中在添加一个异步的进程,进程执行func(*args,**kwargs),然后返回结果。
      	此方法返回的结果是AsyncResult类的实例,callback是可调用对象,可以有参数。
      	当func产生返回值时,将返回值传递给callback(callback即是回调函数),
      	callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
      	'''
        pass

    def close(self):
      	'''
      	关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成。
      	'''
        pass

    def terminate(self):
      	'''
      	立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。
      	如果进程池被垃圾回收,将自动调用此函数。
      	'''
        pass

    def join(self):
      	'''等待所有工作进程退出,此方法只能在close()或teminate()之后调用'''
        pass

Pool类创建进程池的方法

# 创建同步进程池
import os,time,random
from multiprocessing import Pool

def Calculation(value, count):		# 创建一个处理函数
    if value == 1:
        print("pid:%s count:%d" %((os.getpid(), count)))
        return count
    if value % 2 == 0:
        count += 1
        value = value / 2
        time.sleep(1)               # 模拟执行时间
    else:
        value = value * 3 + 1
    return Calculation(value, count)

if __name__ == '__main__':
  	# 进程池中从无到有创建四个进程,以后一直是这四个进程在循环执行任务
    P_Pool = Pool(4)		# 默认是cpu核数
    P_List=[]
    count = 0
    for i in range(10):	# 创建10个进程
        # 同步运行,阻塞、直到本次任务执行完毕拿到返回值ret
        ret = P_Pool.apply(Calculation, args=(random.randint(i+10, i+100), count,))
        P_List.append(ret)
    print(P_List)
# 创建异步进程池
import os, time, random
from multiprocessing import Pool

def Calculation(value, count):		# 创建一个处理函数
    if value == 1:
        print("pid:%s count:%d" %((os.getpid(), count)))
        return count
    if value % 2 == 0:
        count += 1
        value = value / 2
        time.sleep(1)               # 模拟执行时间
    else:
        value = value * 3 + 1
    return Calculation(value, count)

if __name__ == '__main__':
  	# 进程池中从无到有创建四个进程,以后一直是这四个进程在循环执行任务
    P_Pool = Pool(4)
    P_List=[]
    count = 0
    for i in range(10):
      	# 异步运行,同时4个进程,运行完成后返回一个AsyncResult类的实例
        ret = P_Pool.apply_async(Calculation, args=(random.randint(i+1, i+10), count,))
        P_List.append(ret)
    # 关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成。
    P_Pool.close()
    # 调用join之前,先调用close函数,否则会出错。
    # 执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束。
    # 不join的话,还没等子进程执行,主进程就结束了。
    P_Pool.join()
    for ret in P_List:
        print(ret.get())	# 使用AsyncResult对象的get()方法来获取结果

利用concurrent.futures模块创建进程池

ProcessPoolExecutor类概述

class ProcessPoolExecutor(_base.Executor):
  	def __init__(self, max_workers=None):
      	'''max_workers进程池中最大进程数'''
      	pass
  
  	def submit(self, fn, *args, **kwargs):
      	'''
      	向进程池中提交进程,默认是异步的,
      	fn:进程处理的对象,函数的地址,
      	*args:以位置传参的方式给处理对象传递参数,
      	**kwargs:以关键字传参的方式给对象传递参数,
      	返回一个Future类型的对象。
      	'''
    	pass
    
    def map(self, fn, *iterables, timeout=None, chunksize=1):
      	'''
      	向进程池中提交一个类似如map()函数功能的进程
      	fn:进程处理的对象,函数的地址,
      	*iterables:传递一个可迭代的对象,
      	map()方法返回值为一个迭代器。
      	'''
    	pass
    
    def shutdown(self, wait=True):
      	'''关闭进程池'''
      	pass

ProcessPoolExecutor类创建进程池的方法

# 用ProcessPoolExecutor类创建的进程池默认都是异步的
import os,time,random
from concurrent.futures import  ProcessPoolExecutor

def Calculation(value, count):		# 创建一个处理函数
    if value == 1:
        print("pid:%s count:%d" %((os.getpid(), count)))
        return count
    if value % 2 == 0:
        count += 1
        value = value / 2
        time.sleep(1)				# 模拟执行时间
    else:
        value = value * 3 + 1
    return Calculation(value, count)

if __name__ == '__main__':
  	# ProcessPoolExecutor类支持with上下文管理,因此可以省略shutdown()方法
    with ProcessPoolExecutor(4) as P_Pool:
        P_List = []
        count = 0
        for i in range(10):
            ret = P_Pool.submit(Calculation, random.randint(i+1, i+10), count)
            # 如果想实现同步进程池,可用下述方法,返回的就直接就是进程的值
            # ret = P_Pool.submit(Calculation, random.randint(i+1, i+10), count).result()
            P_List.append(ret)
    # 同Pool类获取返回值的方法不同,这里使用result()方法,而不是get()方法
    print([ret.result() for ret in P_List])

进程池的回调函数

需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程。主进程则调用一个函数去处理该结果,该函数即回调函数。例如:可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数,这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。注意:只有异步的进程池才有回调函数。

Pool类实现进程池的回调函数

from multiprocessing import Pool
import os,requests

def get_page(url):				# 定义一个获取网页的函数
    print('<进程%s> get %s' %(os.getpid(), url))
    respone = requests.get(url)		# 获取网页
    if respone.status_code == 200:	# 查看获取状态
        return {'url':url,'text':respone.text}	# 返回获取的内容

def pasrse_page(res):		# 定义处理网页的函数
    print('<进程%s> parse %s' %(os.getpid(),res['url']))
    parse_res = 'url:<%s> size:[%s]\n' %(res['url'], len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)		# 保存至文件

if __name__ == '__main__':

    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    p = Pool(4)				# 定义4个进程的进程池
    res_l = []
    for url in urls:
      	# 定义异步的进程,并指定回调函数,回调函数接收get_page函数的返回值
        res = p.apply_async(get_page,args=(url,),callback=pasrse_page)
        res_l.append(res)

ProcessPoolExecutor实现进程池的回调函数

from concurrent.futures import ProcessPoolExecutor
import os,requests

def get_page(url):				# 定义一个获取网页的函数
    print('<进程%s> get %s' %(os.getpid(), url))
    respone = requests.get(url)		# 获取网页
    if respone.status_code == 200:	# 查看获取状态
        return {'url':url,'text':respone.text}	# 返回获取的内容

def pasrse_page(res):		# 定义处理网页的函数
    res = res.result()      # 注意:此时接受的res是Future类型的对象,通过result() 方法拿到值
    print('<进程%s> parse %s' %(os.getpid(), res['url']))
    parse_res = 'url:<%s> size:[%s]\n' %(res['url'], len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)		# 保存至文件

if __name__ == '__main__':

    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    with ProcessPoolExecutor(4) as P_Pool:
        P_List = []
        for url in urls:
            # 注意:add_done_callback()方法接受进程返回的Future类型的对象,而不进程的结果值
            res = P_Pool.submit(get_page, url).add_done_callback(pasrse_page)
            P_List.append(res)

© 著作权归作者所有

共有 人打赏支持
D7

D7

粉丝 0
博文 5
码字总数 5326
作品 0
海淀
技术主管
Python高级编程和异步IO并发编程

Python高级编程和异步IO并发编程 网盘地址:https://pan.baidu.com/s/1eB-BsUacBRhKxh7qXwndMQ 密码: tgba 备用地址(腾讯微云):https://share.weiyun.com/5Z3x9V0 密码:7cdnb2 针对Pytho...

人气王子333
04/23
0
0
《Python分布式计算》 第3章 Python的并行计算 (Distributed Computing with Python)

序言 第1章 并行和分布式计算介绍 第2章 异步编程 第3章 Python的并行计算 第4章 Celery分布式应用 第5章 云平台部署Python 第6章 超级计算机群使用Python 第7章 测试和调试分布式应用 第8章...

seancheney
2017/10/13
0
0
Python3 与 C# 并发编程之~ 进程篇

上次说了很多Linux下进程相关知识,这边不再复述,下面来说说Python的并发编程,如有错误欢迎提出~ 如果遇到听不懂的可以看上一次的文章:https://www.cnblogs.com/dotnetcrazy/p/9363810.h...

鲲逸鹏
08/16
0
0
使用 Python 进行线程编程

对于 Python 来说,并不缺少并发选项,其标准库中包括了对线程、进程和异步 I/O 的支持。在许多情况下,通过创建诸如异步、线程和子进程之类的高层模块,Python 简化了各种并发方法的使用。除...

丰圣谋
2013/08/22
0
0
Tornado实现多线程、多进程HTTP服务

背景 线上有一个相关百科的服务,返回一个query中提及的百科词条。该服务是用python实现的,以前通过thrift接口访问,现要将其改为通过HTTP访问。之前没有搭建HTTPServer的经验,因此想用pyt...

LUIS1983
09/04
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Coding and Paper Letter(三十九)

资源整理。 1 Coding: 1.Python库benchmark rio s3,用于在访问S3上的文件时对Rasterio / GDAL的多线程性能进行基准测试的工具。 benchmark rio s3 2.Pangeo-Binder Cookiecutter模板。 cook...

胖胖雕
39分钟前
2
0
Promise 对象

Promise(承诺) 的含义 Promise 是异步编程的一种解决方案,比传统的解决方案——回调函数和事件——更合理和更强大。它由社区最早提出和实现,ES6 将其写进了语言标准,统一了用法,原生提供...

简心
42分钟前
1
0
让UI设计师崩溃的瞬间,你经历过哪些?

隔行如隔山,这句话人人耳熟能详,但其实隔行并不可怕,大家各谋其事,各尽其职,倒也互不打扰,真正可怕的是,是内行还要受外行指点江山,而最难的部分,便是那沟通。流畅的沟通,和声细语,...

mo311
43分钟前
3
0
python进制转换

#进制转换print(bin(10)) #十进制转换成二进制print(oct(10)) #十进制转换成八进制print(hex(10)) #十进制转换成十六进制print(int('1010',2)) #二进制转十进制print(int(...

fadsaa
54分钟前
5
0
syntax error near unexpected token

最近不断重复在虚拟机CentOS测试安装gitlab,因为gitlab有一个脚本需要饭强才能下载,于是我先在windows下载好再上传到虚拟机,可是执行脚本的时候提示“syntax error near unexpected toke...

W_Lu
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部