文档章节

使用Python处理地理数据文件-多进程处理类

JasonSE
 JasonSE
发布于 2016/05/14 20:32
字数 1032
阅读 128
收藏 7
点赞 2
评论 0

最近在研究使用ArcGIS制作地图,ArcGIS软件本身提供了丰富完备的地图文件处理工具,但是这些工具都需要手工操作,非常耗费时间。幸好ArcGIS还提供了一套Python库(arcpy),可以基于这个库编写脚本进行数据文件批处理。一般在进行批处理的时候,使用多进程处理任务会加快处理速度,所以打算先封装一个多进程处理的基础类,便于后面写具体功能时使用。

思路大致是这样:

  • 先初始化一个任务队列
  • 将待处理的任务方法和参数打包添加到任务队列中
  • 启动一个固定大小的进程池循环从任务队列中领取任务包
  • 拿出任务包中的方法和参数执行任务

开始编写了一版,源码如下:

from multiprocessing import Manager,Pool
import os, logging, timeit, time

class BatchBase(object):
       
    def __init__(self):
        '''
        Constructor
        '''
        
        
    def __del__(self):
        '''
        Destructor
        '''
    
    ##消费者
    def __task_consumer(self,q):
        while not q.empty():
            try:
                task = q.get(block=True,timeout=2)
                
                function = task.get('function')
                callback = task.get('callback')
                args     = task.get('args')
                kwargs   = task.get('kwargs')
                name     = task.get('name')
                pid      = os.getpid()
                try:
                    print '%d start job %s [%s]!\n' % (pid,name,self.getNowTime())
                    if callback:
                        callback(function(*args, **kwargs))
                    else:
                        function(*args, **kwargs)
                    
                    print '%d complete job %s success [%s]!\n' % (pid,name,self.getNowTime())
                except Exception as ex:
                    if callback:
                        callback(ex)
                    print '%d complete job %s fail [%s]\n' % (pid,name,self.getNowTime())
                    print ex
                finally:
                    q.task_done()
            except Exception as ex:
                logging.error(ex)   
    
    def getNowTime(self):
        return time.strftime("%H:%M:%S",time.localtime(time.time()))
    
    ##初始化任务队列
    def batch_init(self):
        _manager = Manager()
        _queue   = _manager.Queue()
        return _queue
    
    ##加入job
    def batch_join(self, q, name, function, callback, *args, **kwargs):       
        q.put({
            'name': name,
            'function': function,
            'callback': callback,
            'args': args,
            'kwargs': kwargs
        }) 
        
    ##执行任务
    def batch_run(self,q,num_consumer):
        _start_time = timeit.default_timer()
        _pool       = Pool(processes = num_consumer)
        print 'pool initialized with %d workers!' % num_consumer
        for i in xrange(num_consumer):
            _pool.apply_async(self.__task_consumer,args=(q,))
            print 'worker %d started' % i
        
        print 'wait for all workers'
        _pool.close()
        _pool.join()
        print 'all jobs are completed!'
        print 'Elapsed Time: %s seconds' % (timeit.default_timer() - _start_time)


##测试代码...      

调试执行时直接Exception

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

查询了一下,这个错误大意是说“不能序列化实例方法”。

详细了解了下,问题出在进程池的apply_async方法上,这个方法传入的参数需要被序列化,而第一个参数是这个类实例的方法,不能被序列化

_pool.apply_async(self.__task_consumer,args=(q,))

这个问题从stackoverflow上找到一种解决办法,Python中有种机制,只要定义类型的时候,实现了__call__函数,这个类型就成为可调用的。所以在BatchBase类中实现__call__,在__call__中再调用__task_consumer,而后在进程池申请使用方法中将self作为参数传入,进程在执行时会通过__call__调用__task_consumer,从而达到“曲线救国”。

改进后的完整代码如下:

# -*- coding:utf-8 -*- 
##========================
'''
Created on 2016年5月14日
@author: Jason
'''
from multiprocessing import Manager,Pool
import os, logging, timeit, time

class BatchBase(object):
       
    def __init__(self):
        '''
        Constructor
        '''
        
        
    def __del__(self):
        '''
        Destructor
        '''
        
    def __call__(self,q):
        return self.__task_consumer(q)
    
    ##消费者
    def __task_consumer(self,q):
        while not q.empty():
            try:
                task = q.get(block=True,timeout=2)
                
                function = task.get('function')
                callback = task.get('callback')
                args     = task.get('args')
                kwargs   = task.get('kwargs')
                name     = task.get('name')
                pid      = os.getpid()
                try:
                    print '%d start job %s [%s]!\n' % (pid,name,self.getNowTime())
                    if callback:
                        callback(function(*args, **kwargs))
                    else:
                        function(*args, **kwargs)
                    
                    print '%d complete job %s success [%s]!\n' % (pid,name,self.getNowTime())
                except Exception as ex:
                    if callback:
                        callback(ex)
                    print '%d complete job %s fail [%s]\n' % (pid,name,self.getNowTime())
                    print ex
                finally:
                    q.task_done()
            except Exception as ex:
                logging.error(ex)   
    
    def getNowTime(self):
        return time.strftime("%H:%M:%S",time.localtime(time.time()))
    
    ##初始化任务队列
    def batch_init(self):
        _manager = Manager()
        _queue   = _manager.Queue()
        return _queue
    
    ##加入job
    def batch_join(self, q, name, function, callback, *args, **kwargs):       
        q.put({
            'name': name,
            'function': function,
            'callback': callback,
            'args': args,
            'kwargs': kwargs
        }) 
        
    ##执行任务
    def batch_run(self,q,num_consumer):
        _start_time = timeit.default_timer()
        _pool       = Pool(processes = num_consumer)
        print 'pool initialized with %d workers!' % num_consumer
        for i in xrange(num_consumer):
            _pool.apply_async(self,args=(q,))
            print 'worker %d started' % i
        
        print 'wait for all workers'
        _pool.close()
        _pool.join()
        print 'all jobs are completed!'
        print 'Elapsed Time: %s seconds' % (timeit.default_timer() - _start_time)


##############for test##############################
def test_task(name):
    print 'Run task %s (%s)...' % (name, os.getpid())
    start_time = timeit.default_timer()
    time.sleep(3)
    end_time = timeit.default_timer()
    print 'Task %s runs %0.2f seconds.' % (name, (end_time - start_time))


if __name__=='__main__':

    #类实例
    batch = BatchBase()

    #初始化job队列
    jobs  = batch.batch_init()

    #循环加入job
    for i in range(4):
        batch.batch_join(jobs, str(i), test_task, None, str(i))

    #执行job(使用3个进程)
    batch.batch_run(jobs, 3)
    
    print '任务执行完成.'        

执行成功!

© 著作权归作者所有

共有 人打赏支持
JasonSE
粉丝 32
博文 54
码字总数 15572
作品 0
朝阳
程序员
5本必读Python入门书籍,你都看过吗?(附福利)

今天技术学派为大家准备了5本Python入门书籍,除了书籍小编还整理了3个常用的资源网站分享给大家。 1.Python基础教程 《Python基础教程》是经典的Python入门教程书籍,本书层次鲜明,结构严谨...

Python燕大侠
06/07
0
0
python学习笔记 | Python中的线程与进程简介

近日,我开始对代码的各个部分进行计时,以了解我是否可以加快速度。 令我惊讶的是,我发现数据增强是最大的瓶颈。我使用的方法:旋转,翻转,缩放。依靠Numpy并在CPU上运行。Numpy在某些情况...

跨界的聚能
05/25
0
0
王老板Python面试(10):17道python笔试面试真题

1、一行代码实现1--100之和 利用sum()函数求和 2、如何在一个函数内部修改全局变量 利用global 修改全局变量 3、列出5个python标准库 os:提供了不少与操作系统相关联的函数 sys: 通常用于命...

程序员八阿哥
05/22
0
0
Python 2.6 亮点:multiprocessing模块

本来以为Python 2.6只是Python 3.0的过渡版本,不会有太多的新功能。但看到这个2.6的重大改动列表,才发现自己挺落后的。在2.6中新增的multiprocessing模块也绝对是Python 2.6的杀手级应用(...

索隆
2012/05/02
0
0
用Python脚本实现对Linux服务器的监控

一、前言 二、概述 三、Python 版本说明 四、/proc 文件系统 五、对CPU监测 六、对系统负载监测 七、对内存信息的获取 八、对网络接口的监测 九、监控apache服务器进程的Python脚本 十、总结...

陈明乾
07/02
0
0
python--多进程的用法详解实例

想让python实现多进程(multiprocessing),我们要先区分不同的操作系统的不同之处。 Linux操作系统下提供了一个fork()系统调用,普通函数调用一次返回一次,fork()调用一次返回两次,因为操作...

山有木兮有木兮
05/14
0
0
全面解读python web 程序的9种部署方式

python有很多web 开发框架,代码写完了,部署上线是个大事,通常来说,web应用一般是三层结构 web server ---->application -----> DB server 主流的web server 一个巴掌就能数出来,apache,...

不必在乎朕是谁
2013/11/22
0
1
GIScript2015的第一个入门教程-使用UbuntuKylin15.04

GIScript2015是一个通用的GIS脚本库,可以帮助进行地理空间数据的处理和分析,提高数据处理的效率,帮助进行地理科学的研究。GIScript2015是一个开源工程,已建立Git版本库和虚拟Team、微信群...

openthings
2015/07/27
0
0
教程 | 如何使用Docker、TensorFlow目标检测API和OpenCV实现实时目标检测和视频处理

  选自TowardsDataScience   作者:Léo Beaucourt   机器之心编译   参与:李诗萌、路雪      本文展示了如何使用 Docker 容器中的 TensorFlow 目标检测 API,通过网络摄像头执...

机器之心
04/20
0
0
geopy使用详解

geopy使用详解   由于专业需要,经常接触一些地理处理的工具包,文档都是英文的,自己看的同时将其翻译一下,一方面自己学习的同时有个记录,要是能同时给一起的学习的童鞋们一些帮助,想想...

openthings
2016/01/12
383
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

idea tomcat 远程调试

tomcat 配置 编辑文件${tomcat_home}/bin/catalina.sh,在文件开头添加如下代码。    CATALINA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=7829" Idea端配......

qwfys
今天
1
0
遍历目录下的文件每250M打包一个文件

#!/usr/bin/env python # -*- utf-8 -*- # @Time : 2018/7/20 0020 下午 10:16 # @Author : 陈元 # @Email : abcmeabc@163.com # @file : tarFile.py import os import tarfile import thr......

寻爱的小草
今天
1
0
expect同步文件&expect指定host和要同步的文件&构建文件分发系统&批量远程执行命令

20.31 expect脚本同步文件 expect通过与rsync结合,可以在一台机器上把文件自动同步到多台机器上 编写脚本 [root@linux-5 ~]# cd /usr/local/sbin[root@linux-5 sbin]# vim 4.expect#!/...

影夜Linux
今天
1
0
SpringBoot | 第九章:Mybatis-plus的集成和使用

前言 本章节开始介绍数据访问方面的相关知识点。对于后端开发者而言,和数据库打交道是每天都在进行的,所以一个好用的ORM框架是很有必要的。目前,绝大部分公司都选择MyBatis框架作为底层数...

oKong
今天
13
0
win10 上安装解压版mysql

1.效果 2. 下载MySQL 压缩版 下载地址: https://downloads.mysql.com/archives/community/ 3. 配置 3.1 将下载的文件解压到合适的位置 我最终将myql文件 放在:D:\develop\mysql 最终放的位...

Lucky_Me
今天
2
0
linux服务器修改mtu值优化cpu

一、jumbo frames 相关 1、什么是jumbo frames Jumbo frames 是指比标准Ethernet Frames长的frame,即比1518/1522 bit大的frames,Jumbo frame的大小是每个设备厂商规定的,不属于IEEE标准;...

问题终结者
今天
2
0
expect脚本同步文件expect脚本指定host和要同步的文件 构建文件分发系统批量远程执行命令

expect脚本同步文件 在一台机器上把文件同步到多台机器上 自动同步文件 vim 4.expect [root@yong-01 sbin]# vim 4.expect#!/usr/bin/expectset passwd "20655739"spawn rsync -av ro...

lyy549745
今天
1
0
36.rsync下 日志 screen

10.32/10.33 rsync通过服务同步 10.34 linux系统日志 10.35 screen工具 10.32/10.33 rsync通过服务同步: rsync还可以通过服务的方式同步。那需要开启一个服务,他的架构是cs架构,客户端服务...

王鑫linux
今天
1
0
matplotlib 保存图片时的参数

简单绘图 import matplotlib.pyplot as pltplt.plot(range(10)) 保存为csv格式,放大后依然很清晰 plt.savefig('t1.svg') 普通保存放大后会有点模糊文件大小20多k plt.savefig('t5.p...

阿豪boy
今天
3
0
java 8 复合Lambda 表达式

comparator 比较器复合 //排序Comparator.comparing(Apple::getWeight);List<Apple> list = Stream.of(new Apple(1, "a"), new Apple(2, "b"), new Apple(3, "c")) .collect(......

Canaan_
昨天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部