利用Queue和managers实现分布式进程
博客专区 > acutesun 的博客 > 博客详情
利用Queue和managers实现分布式进程
acutesun 发表于5个月前
利用Queue和managers实现分布式进程
  • 发表于 5个月前
  • 阅读 3
  • 收藏 0
  • 点赞 0
  • 评论 0

腾讯云 新注册用户 域名抢购1元起>>>   

multiprocessing模块中的managers子模块支持把多进程分布到多台机器上。
首先编写服务进程,服务进程负责把任务写入task_queue, 并接收工作进程返回的result_queue
serverManager.py:

from multiprocessing.managers import BaseManager,Queue

# 1. 建立task_queue和result_queue, 用来存放任务和结果
task_queue = Queue()
result_queue = Queue()


class QueueManager(BaseManager):
    pass

# 2. 把创建的队列注册在网络上,利用register方法, callable 参数关联类Queue对象
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)

# 3.绑定端口8000, 验证口令'123'
manager = QueueManager(address=('', 8000), authkey='123'.encode('utf-8'))
manager.start()

# 4. 通过manager获取网络注册的Queue对象
task = manager.get_task_queue()
result = manager.get_result_queue()

# 5.添加任务
for url in ["imageUrl_" + str(i) for i in range(10)]:
    print('put task %s...' % url)
    task.put(url)

# 6.获取返回结果
print('get result...')

for i in range(10):
    print('result is %s' % result.get(timeout=10))

manager.shutdown()

工作进程通过网络获取任务队列task_queue, 对任务队列中的数据进行处理后写入result_queue

workerManager.py:

import time
from multiprocessing.managers import BaseManager


class QueueManager(BaseManager):
    pass

# 1.注册获取queue
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 2.连接服务器
server_addr = '127.0.0.1'
print('connect to server')
mng = QueueManager(address=(server_addr, 8000), authkey='123'.encode('utf-8'))

# 连接
mng.connect()
# 3. 获取queue对象
task = mng.get_task_queue()
result = mng.get_result_queue()

# 4.从task获取任务,并把结果写入result
while not task.empty():
    image_url = task.get(True, timeout=5)
    print('run task download %s ...' % image_url)
    time.sleep(1)
    result.put('%s--->success' % image_url)

print('worker finish!')

首先运行服务进程,得到结果:

put task imageUrl_0...
put task imageUrl_1...
put task imageUrl_2...
put task imageUrl_3...
put task imageUrl_4...
put task imageUrl_5...
put task imageUrl_6...
put task imageUrl_7...
put task imageUrl_8...
put task imageUrl_9...
get result...

然后运行工作进程

connect to server
run task download imageUrl_0 ...
run task download imageUrl_1 ...
run task download imageUrl_2 ...
run task download imageUrl_3 ...
run task download imageUrl_4 ...
run task download imageUrl_5 ...
run task download imageUrl_6 ...
run task download imageUrl_7 ...
run task download imageUrl_8 ...
run task download imageUrl_9 ...
worker finish!

最后服务进程输出

result is imageUrl_0--->success
result is imageUrl_1--->success
result is imageUrl_2--->success
result is imageUrl_3--->success
result is imageUrl_4--->success
result is imageUrl_5--->success
result is imageUrl_6--->success
result is imageUrl_7--->success
result is imageUrl_8--->success
result is imageUrl_9--->success

共有 人打赏支持
粉丝 0
博文 65
码字总数 83152
×
acutesun
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: