Python学习-并发编程之进程池
博客专区 > D7 的博客 > 博客详情
Python学习-并发编程之进程池
D7 发表于6个月前
Python学习-并发编程之进程池
  • 发表于 6个月前
  • 阅读 12
  • 收藏 0
  • 点赞 0
  • 评论 0

新睿云服务器60天免费使用,快来体验!>>>   

Python学习-并发编程之进程池

进程池概述

多进程的目的是实现程序的并发,每个进程可以执行不同的任务,以提高程序的效率。如果进程需要执行的任务都是类似的,那么我们可以给这些进程创建一个池子,并设置最小进程数和最大进程数,这样可以高效率的执行任务。例如:httpd服务的进程模式。创建进程池可以通过multiprocessing模块的Pool类,也可以通过Python的并发库concurrent.futures的ProcessPoolExecutor类来创建。

利用multiprocessing模块实现进程池

Pool类概述

# 这里只列举一些常用方法
class Pool(object):
  	'''
  	numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值;
	nitializer:是每个工作进程启动时要执行的可调用对象,默认为None;
	initargs:是要传给initializer的参数组。
  	'''
  	def __init__(self, processes=None, initializer=None, initargs=(),
                 maxtasksperchild=None, context=None):
      	pass
  
    def apply(self, func, args=(), kwds={}):
      	'''
      	向进程池中添加一个同步的进程,进程执行func(*args,**kwargs),然后返回结果。
      	注意:用apply()方法向进程池中添加的进程是串行的,
      	即只有拿到这个进程的返回值(即func的返回值)才会继续执行下一个进程。
      	'''
        pass
      
	def apply_async(self, func, args=(), kwds={}, callback=None,
                    error_callback=None):
      	'''
      	向进程池中在添加一个异步的进程,进程执行func(*args,**kwargs),然后返回结果。
      	此方法返回的结果是AsyncResult类的实例,callback是可调用对象,可以有参数。
      	当func产生返回值时,将返回值传递给callback(callback即是回调函数),
      	callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
      	'''
        pass

    def close(self):
      	'''
      	关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成。
      	'''
        pass

    def terminate(self):
      	'''
      	立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。
      	如果进程池被垃圾回收,将自动调用此函数。
      	'''
        pass

    def join(self):
      	'''等待所有工作进程退出,此方法只能在close()或teminate()之后调用'''
        pass

Pool类创建进程池的方法

# 创建同步进程池
import os,time,random
from multiprocessing import Pool

def Calculation(value, count):		# 创建一个处理函数
    if value == 1:
        print("pid:%s count:%d" %((os.getpid(), count)))
        return count
    if value % 2 == 0:
        count += 1
        value = value / 2
        time.sleep(1)               # 模拟执行时间
    else:
        value = value * 3 + 1
    return Calculation(value, count)

if __name__ == '__main__':
  	# 进程池中从无到有创建四个进程,以后一直是这四个进程在循环执行任务
    P_Pool = Pool(4)		# 默认是cpu核数
    P_List=[]
    count = 0
    for i in range(10):	# 创建10个进程
        # 同步运行,阻塞、直到本次任务执行完毕拿到返回值ret
        ret = P_Pool.apply(Calculation, args=(random.randint(i+10, i+100), count,))
        P_List.append(ret)
    print(P_List)
# 创建异步进程池
import os, time, random
from multiprocessing import Pool

def Calculation(value, count):		# 创建一个处理函数
    if value == 1:
        print("pid:%s count:%d" %((os.getpid(), count)))
        return count
    if value % 2 == 0:
        count += 1
        value = value / 2
        time.sleep(1)               # 模拟执行时间
    else:
        value = value * 3 + 1
    return Calculation(value, count)

if __name__ == '__main__':
  	# 进程池中从无到有创建四个进程,以后一直是这四个进程在循环执行任务
    P_Pool = Pool(4)
    P_List=[]
    count = 0
    for i in range(10):
      	# 异步运行,同时4个进程,运行完成后返回一个AsyncResult类的实例
        ret = P_Pool.apply_async(Calculation, args=(random.randint(i+1, i+10), count,))
        P_List.append(ret)
    # 关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成。
    P_Pool.close()
    # 调用join之前,先调用close函数,否则会出错。
    # 执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束。
    # 不join的话,还没等子进程执行,主进程就结束了。
    P_Pool.join()
    for ret in P_List:
        print(ret.get())	# 使用AsyncResult对象的get()方法来获取结果

利用concurrent.futures模块创建进程池

ProcessPoolExecutor类概述

class ProcessPoolExecutor(_base.Executor):
  	def __init__(self, max_workers=None):
      	'''max_workers进程池中最大进程数'''
      	pass
  
  	def submit(self, fn, *args, **kwargs):
      	'''
      	向进程池中提交进程,默认是异步的,
      	fn:进程处理的对象,函数的地址,
      	*args:以位置传参的方式给处理对象传递参数,
      	**kwargs:以关键字传参的方式给对象传递参数,
      	返回一个Future类型的对象。
      	'''
    	pass
    
    def map(self, fn, *iterables, timeout=None, chunksize=1):
      	'''
      	向进程池中提交一个类似如map()函数功能的进程
      	fn:进程处理的对象,函数的地址,
      	*iterables:传递一个可迭代的对象,
      	map()方法返回值为一个迭代器。
      	'''
    	pass
    
    def shutdown(self, wait=True):
      	'''关闭进程池'''
      	pass

ProcessPoolExecutor类创建进程池的方法

# 用ProcessPoolExecutor类创建的进程池默认都是异步的
import os,time,random
from concurrent.futures import  ProcessPoolExecutor

def Calculation(value, count):		# 创建一个处理函数
    if value == 1:
        print("pid:%s count:%d" %((os.getpid(), count)))
        return count
    if value % 2 == 0:
        count += 1
        value = value / 2
        time.sleep(1)				# 模拟执行时间
    else:
        value = value * 3 + 1
    return Calculation(value, count)

if __name__ == '__main__':
  	# ProcessPoolExecutor类支持with上下文管理,因此可以省略shutdown()方法
    with ProcessPoolExecutor(4) as P_Pool:
        P_List = []
        count = 0
        for i in range(10):
            ret = P_Pool.submit(Calculation, random.randint(i+1, i+10), count)
            # 如果想实现同步进程池,可用下述方法,返回的就直接就是进程的值
            # ret = P_Pool.submit(Calculation, random.randint(i+1, i+10), count).result()
            P_List.append(ret)
    # 同Pool类获取返回值的方法不同,这里使用result()方法,而不是get()方法
    print([ret.result() for ret in P_List])

进程池的回调函数

需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程。主进程则调用一个函数去处理该结果,该函数即回调函数。例如:可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数,这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。注意:只有异步的进程池才有回调函数。

Pool类实现进程池的回调函数

from multiprocessing import Pool
import os,requests

def get_page(url):				# 定义一个获取网页的函数
    print('<进程%s> get %s' %(os.getpid(), url))
    respone = requests.get(url)		# 获取网页
    if respone.status_code == 200:	# 查看获取状态
        return {'url':url,'text':respone.text}	# 返回获取的内容

def pasrse_page(res):		# 定义处理网页的函数
    print('<进程%s> parse %s' %(os.getpid(),res['url']))
    parse_res = 'url:<%s> size:[%s]\n' %(res['url'], len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)		# 保存至文件

if __name__ == '__main__':

    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    p = Pool(4)				# 定义4个进程的进程池
    res_l = []
    for url in urls:
      	# 定义异步的进程,并指定回调函数,回调函数接收get_page函数的返回值
        res = p.apply_async(get_page,args=(url,),callback=pasrse_page)
        res_l.append(res)

ProcessPoolExecutor实现进程池的回调函数

from concurrent.futures import ProcessPoolExecutor
import os,requests

def get_page(url):				# 定义一个获取网页的函数
    print('<进程%s> get %s' %(os.getpid(), url))
    respone = requests.get(url)		# 获取网页
    if respone.status_code == 200:	# 查看获取状态
        return {'url':url,'text':respone.text}	# 返回获取的内容

def pasrse_page(res):		# 定义处理网页的函数
    res = res.result()      # 注意:此时接受的res是Future类型的对象,通过result() 方法拿到值
    print('<进程%s> parse %s' %(os.getpid(), res['url']))
    parse_res = 'url:<%s> size:[%s]\n' %(res['url'], len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)		# 保存至文件

if __name__ == '__main__':

    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    with ProcessPoolExecutor(4) as P_Pool:
        P_List = []
        for url in urls:
            # 注意:add_done_callback()方法接受进程返回的Future类型的对象,而不进程的结果值
            res = P_Pool.submit(get_page, url).add_done_callback(pasrse_page)
            P_List.append(res)
  • 打赏
  • 点赞
  • 收藏
  • 分享
共有 人打赏支持
粉丝 0
博文 5
码字总数 5326
×
D7
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: