文档章节

Python 通用任务分发器

wil
 wil
发布于 2015/07/30 22:33
字数 622
阅读 274
收藏 0

应用场景

把一个大任务分解成N个子任务并行执行,全部完成后可进行汇总统计。
例如:

  • 消息分发:将一条消息发送给100w用户,可拆分成100个子任务,每个子任务发送1w用户;
  • 数据处理:对1000w条数据记录进行某种操作,可拆分成N个子任务并行执行;

特点

  • 一键运行,无须手动分解任务
  • 支持动态加入新的worker进程
  • 任务分发是负载均衡的
  • 子任务完成后可进行汇总统计

进程视图

输入图片说明

使用方法

任务调用器 task_dispatcher.py

#!/usr/bin/python
#coding: utf-8

# Task dispatcher.
# Binds PUSH socket to tcp://localhost:5557
# Sends batch of tasks to workers via that socket
# Start after all workers started
#
# Author: wilzhang
# Created: 2015-07-29


import os
# Change path to current file path
os.chdir(os.path.split(os.path.realpath(__file__))[0])

import sys
import json
import time
import zmq


def get_one_task():
    """Get one task each time.
    Return a str.
    """
    #TODO
    return 'task A'


def main():
    context = zmq.Context()

    # Socket to send messages on
    sender = context.socket(zmq.PUSH)
    sender.bind("tcp://*:5557")

    # Wait all clients connected
    time.sleep(3)

    # Send tasks
    taskn = 0
    while True:
        task = get_one_task()
        print 'get task', str(task)
        if task is None:
            break

        sender.send_string(str(task))
        taskn += 1

    # Give 0MQ time to deliver
    print 'finish. task number:', taskn


if __name__ == '__main__':
    main()

工作进程 worker.py

#!/usr/bin/python
#coding: utf-8

# Task worker.
# Connects PULL socket to tcp://localhost:5557
# Collects tasks from dispatcher via that socket
# Connects PUSH socket to tcp://localhost:5558
# Sends results to collector via that socket
#
# Author: wilzhang
# Created: 2015-07-29


import os
# Change path to current file path
os.chdir(os.path.split(os.path.realpath(__file__))[0])

import sys
import json
import time
import zmq
import random
import traceback


def do_work(task):
    """Do task and return the result.
    Return a str.
    """
    #TODO
    return 'result'


def main():
    context = zmq.Context()

    # Socket to receive messages on
    receiver = context.socket(zmq.PULL)
    receiver.connect("tcp://localhost:5557")

    # Socket to send messages to
    sender = context.socket(zmq.PUSH)
    sender.connect("tcp://localhost:5558")

    # Process tasks forever
    while True:
        try:
            print 'waiting work'
            s = receiver.recv()
            print 'receive msg', s

            # Do the work
            result = do_work(s)

            # Send results to collector
            if result:
                sender.send(str(result))
            print 'finish work'
        except Exception, e:
            print traceback.format_exc()
            continue    
            

if __name__ == '__main__':
    main()

汇总统计进程 collector.py

#!/usr/bin/python
#coding: utf-8

# Result collcetor.
# Binds PULL socket to tcp://localhost:5558
# Collects results from workers via that socket
#
# Author: wilzhang
# Created: 2015-07-29


import os
# Change path to current file path
os.chdir(os.path.split(os.path.realpath(__file__))[0])

import json
import time
import zmq
import random
import sys


def collect_result(result):
    """Collect result and do some calculation.
    """
    #TODO
    print 'finsh N tasks, cost time: 3600s'


def main():
    context = zmq.Context()

    # Socket to receive messages on
    receiver = context.socket(zmq.PULL)
    receiver.bind("tcp://*:5558")

    # Process 
    while True:
        s = receiver.recv()
        print 'receive msg', s
        collect_result(s)

    print 'finish'


if __name__ == '__main__':
    main()

修改步骤:

  1. 实现task_dispatcher.py中的get_one_task函数,每次分配一个任务
  2. 实现worker.py中的do_work函数,处理任务
  3. 实现collector.py中的collect_result函数,统计结果

启动顺序:

  1. 启动collector.py
  2. 启动N个worker.py
  3. 启动task_dispatcher.py

© 著作权归作者所有

wil

wil

粉丝 0
博文 2
码字总数 690
作品 0
深圳
私信 提问
156个Python网络爬虫资源,妈妈再也不用担心你找不到资源了

本列表包含Python网页抓取和数据处理相关的库。 前几天有私信小编要Python的学习资料,小编整理了一些有深度的Python教程和参考资料,从入门到高级的都有,文件已经打包好了,正在学习Pytho...

雁横
2018/05/02
0
0
Python 爬虫的工具列表 附Github代码下载链接

这个列表包含与网页抓取和数据处理的Python库 1、网络 通用 urllib -网络库(stdlib)。 requests -网络库。 grab – 网络库(基于pycurl)。 pycurl – 网络库(绑定libcurl)。 urllib3 – ...

大数据之路
2012/07/07
2.9K
0
python开源工具列表【持续更新】

以下是个人在工作中整理的一些python wheel,供参考。 这个列表包含与网页抓取和数据处理的Python库 网络 通用urllib -网络库(stdlib)。 requests -网络库。 grab – 网络库(基于pycurl)。...

武耀文
2018/04/25
0
0
python模块介绍-gearman:程序排程 概述

python模块介绍-gearman:程序排程 概述 Gearman是一套用来把程序需求委派给机器,提供通用的程序框架来将任务分发在机器运算。它同时具备并行工作的能力、负载均衡处理的能力,以及在不同程...

磁针石
2014/04/09
440
0
斯坦福大学发布 StanfordNLP,支持多种语言

雷锋网(公众号:雷锋网) AI 科技评论按,近日,斯坦福大学发布了一款用于 NLP 的 Python 官方库,这个库可以适用于多种语言,其地址是:https://stanfordnlp.github.io/stanfordnlp/,githu...

王雪佩
02/11
0
0

没有更多内容

加载失败,请刷新页面

加载更多

golang-字符串-地址分析

demo package mainimport "fmt"func main() {str := "map.baidu.com"fmt.Println(&str, str)str = str[0:5]fmt.Println(&str, str)str = "abc"fmt.Println(&s......

李琼涛
今天
4
0
Spring Boot WebFlux 增删改查完整实战 demo

03:WebFlux Web CRUD 实践 前言 上一篇基于功能性端点去创建一个简单服务,实现了 Hello 。这一篇用 Spring Boot WebFlux 的注解控制层技术创建一个 CRUD WebFlux 应用,让开发更方便。这里...

泥瓦匠BYSocket
今天
6
0
从0开始学FreeRTOS-(列表与列表项)-3

FreeRTOS列表&列表项的源码解读 第一次看列表与列表项的时候,感觉很像是链表,虽然我自己的链表也不太会,但是就是感觉很像。 在FreeRTOS中,列表与列表项使用得非常多,是FreeRTOS的一个数...

杰杰1号
今天
4
0
Java反射

Java 反射 反射是框架设计的灵魂(使用的前提条件:必须先得到代表的字节码的 Class,Class 类 用于表示.class 文件(字节码)) 一、反射的概述 定义:JAVA 反射机制是在运行状态中,对于任...

zzz1122334
今天
5
0
聊聊nacos的LocalConfigInfoProcessor

序 本文主要研究一下nacos的LocalConfigInfoProcessor LocalConfigInfoProcessor nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/config/impl/LocalConfigInfoProcessor.java p......

go4it
昨天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部