文档章节

分分钟玩转多进程编程

乐搏学院
 乐搏学院
发布于 2017/02/27 11:13
字数 2647
阅读 2
收藏 0
点赞 0
评论 0

简单介绍:

此模块主要为了解决Python非真正多线程导致无法充分利用多核CPU资源问题,提供了Process,Lock,Semaphore,Event,Queue,Pipe,Pool等组件实现子进程,通信,共享数据,同步方式等

 

快速安装:

pip install multiprocessing

 

公共属性:

multiprocessing.current_process() -> Process

说明: 返回当前运行的子进程对象

multiprocessing.cpu_count() -> int

说明: 返回宿主机CPU核心数

multiprocessing.active_children() -> list

说明: 返回存活的子进程列表

 

多线程类:

1. Process类,主要用于创建管理子进程

p = multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}) -> Process

说明: 创建子进程对象,target表示调用对象,name表示子进程名称,args表示调用对象的位置参数元组,kwargs表示调用对象的参数字典

p.daemon -> boolean

说明: 设置或返回子进程是否随主进程结束,默认为false,主进程必须等待所有子进程结束后才结束,一旦设置为true,则一旦主进程执行完毕后,即使子进程还没执行完毕也强制结束,必须在start之前设置,可设置p.join来强制主进程等待子进程执行完毕

p.join(timeout=None)

说明: 等待此子进程返回后再执行其它子进程/主进程,timeout为等待时间

p.pid -> int/None

说明: 返回子进程pid

p.exitcode -> int/None

说明: 运行时为None,-N表示信号N结束

p.is_alive() -> boolean

说明: 返回进程是否存活

p.start() -> None

说明: 启动子进程,会自动调用子类中的run方法

p.terminate() -> None

说明: 终止子进程

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

#!/usr/bin/env python

# -*- coding: utf-8 -*-

"""

#

# Authors: limanman

# OsChina: http://xmdevops.blog.51cto.com/

# Purpose:

#

"""

# 说明: 导入公共模块

import time

import multiprocessing

# 说明: 导入其它模块

# 方式一: 任务处理类

class TaskHandler(multiprocessing.Process):

    def __init__(self, interval, *args, **kwargs):

        super(TaskHandler, self).__init__(*args, **kwargs)

        self.interval = interval

    # 调用p.start()时自动调用子类run方法

    def run(self):

        for in xrange(10):

            time.sleep(self.interval)

# 方式二: 任务处理函数

def taskhandler(interval):

    for in xrange(10):

        time.sleep(interval)

if __name__ == '__main__':

    processes = []

    for in xrange(5):

        processes.append(TaskHandler(1))

    for in processes:

        p.start()

    print 'cpu number is:', multiprocessing.cpu_count()

    for in multiprocessing.active_children():

        print 'process pid:', p.pid

        print 'process name:', p.name

    time.sleep(10)

    print '-----------------------------------------'

    processes = []

    for in xrange(5):

        processes.append(multiprocessing.Process(target=taskhandler, args=(1,)))

    for in processes:

        p.daemon = True

        p.start()

        p.join()

    # 思考: 此处为何没有打印任何子进程信息?

    for in multiprocessing.active_children():

        print 'process pid:', p.pid

        print 'process name:', p.name

2. Lock类,主要用于多个进程互斥访问共享资源,避免冲突

l = multiprocessing.Lock() -> Lock

说明: 创建互斥锁对象,推荐使用with写法来代替acquire()和release()来手动创建释放锁.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

#!/usr/bin/env python

# -*- coding: utf-8 -*-

"""

#

# Authors: limanman

# OsChina: http://xmdevops.blog.51cto.com/

# Purpose:

#

"""

# 说明: 导入公共模块

import os

import time

import multiprocessing

# 说明: 导入其它模块

class TaskHandler(multiprocessing.Process):

    def __init__(self, lock, fpath, *args, **kwargs):

        super(TaskHandler, self).__init__(*args, **kwargs)

        self.lock = lock

        self.fpath = fpath

    def run(self):

        with self.lock:

            with open(self.fpath, 'a+b') as f:

                data = ''.join([str(time.time()), os.linesep])

                f.write(data)

if __name__ == '__main__':

    proceses = []

    lock = multiprocessing.Lock()

    fpath = 'multiprocessing.log'

    for in xrange(10):

        proceses.append(TaskHandler(lock, fpath))

    for in proceses:

        p.start()

3. Semaphore类,主要用于控制同时对共享资源访问子进程数,如池的最大连接数限定

s = multiprocessing.Semaphore(value=1) -> Semaphore

说明: 创建信号量对象,value表示同时对共享资源访问的子进程数

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

#!/usr/bin/env python

# -*- coding: utf-8 -*-

"""

#

# Authors: limanman

# OsChina: http://xmdevops.blog.51cto.com/

# Purpose:

#

"""

# 说明: 导入公共模块

import os

import time

import multiprocessing

# 说明: 导入其它模块

class TaskHandler(multiprocessing.Process):

    def __init__(self, s, *args, **kwargs):

        super(TaskHandler, self).__init__(*args, **kwargs)

        self.semaphore = s

    def run(self):

        # 限制同时只能有5个子进程访问共享资源

        with self.semaphore:

            time.sleep(5)

if __name__ == '__main__':

    = multiprocessing.Semaphore(5)

    for in xrange(20):

        = TaskHandler(s)

        p.daemon = True

        p.start()

    while True:

        processes = multiprocessing.active_children()

        if not len(processes):

            break

        print 'running process => num: %s list: %s' % (len(processes), processes)

        time.sleep(1)

4. Event类,主要用于控制进程间同步通信

e = multiprocessing.Event() -> Event

说明: 创建信号对象,主要用于子进程之间同步通信

e.set() -> None

说明: 设置标志位

e.clear() -> None

说明: 清除标志位

e.is_set() -> boolean

说明: 判断是否设置了标志位

e.wait(self, timeout=None) -> None

说明: 阻塞当前子进程直到标志位被设置

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

#!/usr/bin/env python

# -*- coding: utf-8 -*-

"""

#

# Authors: limanman

# OsChina: http://xmdevops.blog.51cto.com/

# Purpose:

#

"""

# 说明: 导入公共模块

import os

import time

import multiprocessing

# 说明: 导入其它模块

def task001(e):

    for in xrange(0100):

        print _

    e.set()

def task002(e):

    e.wait()

    print 'found notice: event is set...'

    for in xrange(100200):

        print _

if __name__ == '__main__':

    = multiprocessing.Event()

    p001 = multiprocessing.Process(target=task001, args=(e,))

    p002 = multiprocessing.Process(target=task002, args=(e,))

    p001.start()

    p002.start()

说明: 通过Evant类可以实现很方便的实现子进程与子进程,子进程与主进程之间的通信,甚至可以将所有子进程daemon设置为True,最后e.wait()阻塞,子进程中去设置此标识位来控制主进程的执行流程.

5. Pipe类,主要用于两个子进程之间的数据传递

p = multiprocessing.Pipe(duplex=True) -> tuple

说明: 创建通道对象,主要用于两个子进程之间的数据传递,返回管道的两个端对象1/2,如果duplex为true则全双工可以互相收发,否则1端只能接受消息,2端只能发送消息

p[0/1].send(picklable) -> None

说明: 发送数据支持任意可序列化对象

p[0/1].recv() -> picklable

说明: 如果没有消息可接收,recv会一直阻塞直至管道被关闭抛出EOFError异常

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

#!/usr/bin/env python

# -*- coding: utf-8 -*-

"""

#

# Authors: limanman

# OsChina: http://xmdevops.blog.51cto.com/

# Purpose:

#

"""

# 说明: 导入公共模块

import time

import multiprocessing

from Queue import Empty, Full

# 说明: 导入其它模块

def producer(pipe):

    while True:

        data = {

            'thread': multiprocessing.current_process().name,

            'value': time.time()

        }

        try:

            pipe.send(data)

        except EOFError, e:

            break

        time.sleep(1)

def consumer(pipe):

    while True:

        try:

            print 'producer: %(thread)s current value: %(value)s' % pipe.recv()

        except EOFError, e:

            break

        time.sleep(1)

if __name__ == '__main__':

    # 半双工模式下pipe[0]负责接收消息,pipe[1]负责发送消息

    pipe = multiprocessing.Pipe(duplex=False)

    = multiprocessing.Process(target=producer, args=(pipe[1],))

    = multiprocessing.Process(target=consumer, args=(pipe[0],))

    p.start()

    c.start()

6. Queue类,主要用于多个子进程之间的数据传递

q = multiprocessing.Queue(maxsize=0) -> Queue

说明: 创建队列对象,主要用于多个进程之间的数据传递

q.full() -> boolean

说明: 判断队列是否已满

q.close() -> None

说明: 关闭队列

q.empty() -> boolean

说明: 判断队列是否已空

q.put(obj, block=True, timeout=None) -> None

说明: 插入队列,block为False会立即抛出Queue.Full异常,否则会阻塞timeout时间,直到队列有剩余的空间,如果超时会抛出Queue.Full异常,还有一个同类方法q.put_nowait(obj)非阻塞插入立即抛Queue.Full异常

q.get(block=True, timeout=None) -> None

说明: 取出队列,block为false会立即抛出Queue.Empty异常,否则会阻塞timeout时间,直到队列有新对象插入,如果超时会抛出Queue.Empty异常,还有一个同类方法q.get_nowait()非阻塞读取立抛Queue.Empty异常

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

#!/usr/bin/env python

# -*- coding: utf-8 -*-

"""

#

# Authors: limanman

# OsChina: http://xmdevops.blog.51cto.com/

# Purpose:

#

"""

# 说明: 导入公共模块

import time

import multiprocessing

from Queue import Empty, Full

# 说明: 导入其它模块

def producer(q):

    while True:

        data = {

            'thread': multiprocessing.current_process().name,

            'value': time.time()

        }

        try:

            q.put(data, block=False)

        except Full, e:

            continue

        time.sleep(1)

def consumer(q):

    while True:

        try:

            print 'producer: %(thread)s current value: %(value)s' % q.get(block=False)

        except Empty, e:

            continue

        time.sleep(1)

if __name__ == '__main__':

    = multiprocessing.Queue()

    = multiprocessing.Process(target=producer, args=(q,))

    = multiprocessing.Process(target=consumer, args=(q,))

    p.start()

    c.start()

7. Pool类,主要用于以进程池的形式自动管控进程池内子进程数目

p = multiprocessing.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None) -> Pool

说明: 创建包含规定数目子进程池对象,并向这些工作进程传递作业,直到没有更多作业为止,processes表示初始化状态下的子进程数,maxtasksperchild表示为每个进程执行N个作业数后重新启动一个工作子进程防止运行时间过长导致消耗太多系统资源.

p.close() -> None

说明: 禁止新的子进程加入,所以必须放在p.join()前面

p.join() -> None

说明: 主进程阻塞等待子进程退出,必须出现在p.close()和p.terminate() 的后面

p.terminate() -> None

说明: 结束工作进程,不再处理未处理的任务.

p.apply(self, func, args=(), kwds={}) -> obj

说明: 同内置函数apply,默认等待进程池中子进程返回结果

p.apply_async(self, func, args=(), kwds={}, callback=None) -> ApplyResult

说明: 同内置函数apply,默认不等待子进程返回结果直接返回,结果使用返回对象get()方法回调获取

p.map(self, func, iterable, chunksize=None) -> list

p.map_async(self, func, iterable, chunksize=None, callback=None) -> MapResult

说明: 同上,但是支持接受iterable序列化对象,简化进程池调用,而且速度更快,推荐使用,结果使用返回对象get()方法回调获取

 

p.imap(self, func, iterable, chunksize=None) -> IMapIterator

p.imap_unordered(self, func, iterable, chunksize=1) -> IMapUnorderedIterator

说明: 同上,但是imap返回的是序列对象,而imap_unordered返回的是未排序的结果,也就是按照原始执行顺序返回

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

#!/usr/bin/env python

# -*- coding: utf-8 -*-

"""

#

# Authors: limanman

# OsChina: http://xmdevops.blog.51cto.com/

# Purpose:

#

"""

# 说明: 导入公共模块

import os

import pprint

import multiprocessing

# 说明: 导入其它模块

def read_filelist(p):

    result = []

    if not os.path.isdir(p):

        result.append(p)

        return result

    for root, dirs, files in os.walk(p):

        for item in files:

            fpath = os.path.join(root, item)

            result.append(fpath)

    return result

def read_filesize(f):

    return os.path.getsize(f), f

if __name__ == '__main__':

    file_list = read_filelist('C:\Users\Administrator\Desktop')

    pool = multiprocessing.Pool(20)

    file_size = pool.map_async(read_filesize, file_list)

    pool.close()

    pool.join()

    pprint.pprint(file_size.get())

说明: 如上例子先获取文件列表,然后通过异步回调获取所有文件大小,相对于使用apply或是apply_async需要每次append到一个列表中,此方法更加简化了多进程池的使用.推荐使用

 

登录乐搏学院官网http://www.learnbo.com/

或关注我们的官方微博微信,还有更多惊喜哦~

 

本文出自 “满满李 - 运维开发之路” 博客,请务必保留此出处http://xmdevops.blog.51cto.com/11144840/1861632

© 著作权归作者所有

共有 人打赏支持
乐搏学院
粉丝 6
博文 526
码字总数 707467
作品 0
丰台
程序员
怎样才能成为一个电玩程序员(转)

电玩游戏广受青少年们欢迎,其中不少年轻人专注于此,甚至想到要自己编写一个游戏。另外一方面电玩游戏工程师被大多数人认为是大有“钱”途的。如果你做的游戏走红了,成为百万富翁也是分分钟...

咲晚杍 ⋅ 2014/07/28 ⋅ 0

多进程与多线程的区别

进程是程序在计算机上的一次执行活动。当你运行一个程序,你就启动了一个进程。显然,程序是死的(静态的),进程是活的(动态的)。进程可以分为系统进程和用户进程。凡是用于完成操作系统的各种...

技术mix呢 ⋅ 2017/11/09 ⋅ 0

haproxy实现的web反向代理,动静分离,以及基于keepalived实现的haproxy的高可用

haproxy于Nginx一样都是做反向代理,但是与其相比,haproxy更专注于web代理。HAProxy是单进程多请求,也支持多进程,HAProxy运行在当前的硬件上,完全可以支持数以万计的并发连接。 haproxy功...

chinahaike ⋅ 2014/05/02 ⋅ 0

5天玩转C#并行和多线程编程 —— 第二天 并行集合和PLinq

5天玩转C#并行和多线程编程系列文章目录 5天玩转C#并行和多线程编程 —— 第一天 认识Parallel 5天玩转C#并行和多线程编程 —— 第二天 并行集合和PLinq 5天玩转C#并行和多线程编程 —— 第三...

雲霏霏 ⋅ 2014/09/28 ⋅ 0

5天玩转C#并行和多线程编程 —— 第五天 多线程编程大总结

5天玩转C#并行和多线程编程系列文章目录 5天玩转C#并行和多线程编程 —— 第一天 认识Parallel 5天玩转C#并行和多线程编程 —— 第二天 并行集合和PLinq 5天玩转C#并行和多线程编程 —— 第三...

雲霏霏 ⋅ 2014/11/26 ⋅ 0

并发 并行 同步 异步 多线程的区别

并发:在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行。其中两种并发关系分别是同步和互斥 互斥:进程间相互排斥的使用临...

多态生命 ⋅ 2016/05/12 ⋅ 0

都说Djnago框架重,那就让哥用15行代码写个django web程序!

很多初学django的小伙伴都会了解到,django是个大而全的网络框架,本身集成了ORM、模型绑定、模板引擎、缓存、Session等诸多功能。要学这么多内容,要学到猴年马月啊!? 不过世界真是奇妙,...

alex3714 ⋅ 2016/06/14 ⋅ 0

天才程序员12岁就创业 新公司融资1000万美元

   在25岁的年龄,Mitchell Hashimoto就已是一家颇有前途的年轻公司HashiCorp的联合创始人兼CEO,而且已拥有了13年的从业经验。   在12岁的时候,他就开始了首次创业。“我希望教别人如...

fnnn99 ⋅ 2015/01/02 ⋅ 4

5天玩转C#并行和多线程编程 —— 第三天 认识和使用Task

5天玩转C#并行和多线程编程系列文章目录 5天玩转C#并行和多线程编程 —— 第一天 认识Parallel 5天玩转C#并行和多线程编程 —— 第二天 并行集合和PLinq 5天玩转C#并行和多线程编程 —— 第三...

雲霏霏 ⋅ 2014/11/19 ⋅ 0

2016年 10月 06日 星期四 08:43:51 CST

1)版本控制:Git、GitHub 注:推荐这个交互式的 Git 入门资源,号称 15 分钟就够了。 2)正则表达式 注:推荐《55分钟学会正则表达式》 3)awk 译注:《「sed & awk」读书笔记之 awk 》 4)...

四明狂客 ⋅ 2016/10/06 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

mysql in action / alter table

change character set ALTER SCHEMA `employees` DEFAULT CHARACTER SET utf8mb4 DEFAULT COLLATE utf8mb4_general_ci ;ALTER TABLE `employees`.`t2` CHARACTER SET = utf8mb4 , COLLAT......

qwfys ⋅ 今天 ⋅ 0

Java 开发者不容错过的 12 种高效工具

Java 开发者常常都会想办法如何更快地编写 Java 代码,让编程变得更加轻松。目前,市面上涌现出越来越多的高效编程工具。所以,以下总结了一系列工具列表,其中包含了大多数开发人员已经使用...

jason_kiss ⋅ 昨天 ⋅ 0

Linux下php访问远程ms sqlserver

1、安装freetds(略,安装在/opt/local/freetds 下) 2、cd /path/to/php-5.6.36/ 进入PHP源码目录 3、cd ext/mssql进入MSSQL模块源码目录 4、/opt/php/bin/phpize生成编译配置文件 5、 . ./...

wangxuwei ⋅ 昨天 ⋅ 0

如何成为技术专家

文章来源于 -- 时间的朋友 拥有良好的心态。首先要有空杯心态,用欣赏的眼光发现并学习别人的长处,包括但不限于工具的使用,工作方法,解决问题以及规划未来的能力等。向别人学习的同时要注...

长安一梦 ⋅ 昨天 ⋅ 0

Linux vmstat命令实战详解

vmstat命令是最常见的Linux/Unix监控工具,可以展现给定时间间隔的服务器的状态值,包括服务器的CPU使用率,内存使用,虚拟内存交换情况,IO读写情况。这个命令是我查看Linux/Unix最喜爱的命令...

刘祖鹏 ⋅ 昨天 ⋅ 0

MySQL

查看表相关命令 - 查看表结构    desc 表名- 查看生成表的SQL    show create table 表名- 查看索引    show index from  表名 使用索引和不使用索引 由于索引是专门用于加...

stars永恒 ⋅ 昨天 ⋅ 0

easyui学习笔记

EasyUI常用控件禁用方法 combobox $("#id").combobox({ disabled: true }); ----- $("#id").combobox({ disabled: false}); validatebox $("#id").attr("readonly", true); ----- $("#id").r......

miaojiangmin ⋅ 昨天 ⋅ 0

金山WPS发布了Linux WPS Office

导读 近日,金山WPS发布了Linux WPS Office中文社区版新版本,支持大部分主流Linux系统,功能更加完善,兼容性、稳定性大幅度提升。本次更新WPS将首次在Linux提供专业办公文件云存储服务,实...

问题终结者 ⋅ 昨天 ⋅ 0

springboot2输出metrics到influxdb

序 本文主要研究一下如何将springboot2的metrics输出到influxdb maven <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-bo......

go4it ⋅ 昨天 ⋅ 0

微信小程序 - 选择图片显示操作菜单

之前我分享过选择图片这个文章,但是我在实际开发测试使用中发现一个问题在使用 wx.chooseImage 选择照片显示出第一格是拍照,后面是相册里的图片。这种实现之前说过了,效果如下。 但是你从...

hello_hp ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部