文档章节

分分钟玩转多进程编程

乐搏学院
 乐搏学院
发布于 2017/02/27 11:13
字数 2647
阅读 2
收藏 0

简单介绍:

此模块主要为了解决Python非真正多线程导致无法充分利用多核CPU资源问题,提供了Process,Lock,Semaphore,Event,Queue,Pipe,Pool等组件实现子进程,通信,共享数据,同步方式等

 

快速安装:

pip install multiprocessing

 

公共属性:

multiprocessing.current_process() -> Process

说明: 返回当前运行的子进程对象

multiprocessing.cpu_count() -> int

说明: 返回宿主机CPU核心数

multiprocessing.active_children() -> list

说明: 返回存活的子进程列表

 

多线程类:

1. Process类,主要用于创建管理子进程

p = multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}) -> Process

说明: 创建子进程对象,target表示调用对象,name表示子进程名称,args表示调用对象的位置参数元组,kwargs表示调用对象的参数字典

p.daemon -> boolean

说明: 设置或返回子进程是否随主进程结束,默认为false,主进程必须等待所有子进程结束后才结束,一旦设置为true,则一旦主进程执行完毕后,即使子进程还没执行完毕也强制结束,必须在start之前设置,可设置p.join来强制主进程等待子进程执行完毕

p.join(timeout=None)

说明: 等待此子进程返回后再执行其它子进程/主进程,timeout为等待时间

p.pid -> int/None

说明: 返回子进程pid

p.exitcode -> int/None

说明: 运行时为None,-N表示信号N结束

p.is_alive() -> boolean

说明: 返回进程是否存活

p.start() -> None

说明: 启动子进程,会自动调用子类中的run方法

p.terminate() -> None

说明: 终止子进程

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

#!/usr/bin/env python

# -*- coding: utf-8 -*-

"""

#

# Authors: limanman

# OsChina: http://xmdevops.blog.51cto.com/

# Purpose:

#

"""

# 说明: 导入公共模块

import time

import multiprocessing

# 说明: 导入其它模块

# 方式一: 任务处理类

class TaskHandler(multiprocessing.Process):

    def __init__(self, interval, *args, **kwargs):

        super(TaskHandler, self).__init__(*args, **kwargs)

        self.interval = interval

    # 调用p.start()时自动调用子类run方法

    def run(self):

        for in xrange(10):

            time.sleep(self.interval)

# 方式二: 任务处理函数

def taskhandler(interval):

    for in xrange(10):

        time.sleep(interval)

if __name__ == '__main__':

    processes = []

    for in xrange(5):

        processes.append(TaskHandler(1))

    for in processes:

        p.start()

    print 'cpu number is:', multiprocessing.cpu_count()

    for in multiprocessing.active_children():

        print 'process pid:', p.pid

        print 'process name:', p.name

    time.sleep(10)

    print '-----------------------------------------'

    processes = []

    for in xrange(5):

        processes.append(multiprocessing.Process(target=taskhandler, args=(1,)))

    for in processes:

        p.daemon = True

        p.start()

        p.join()

    # 思考: 此处为何没有打印任何子进程信息?

    for in multiprocessing.active_children():

        print 'process pid:', p.pid

        print 'process name:', p.name

2. Lock类,主要用于多个进程互斥访问共享资源,避免冲突

l = multiprocessing.Lock() -> Lock

说明: 创建互斥锁对象,推荐使用with写法来代替acquire()和release()来手动创建释放锁.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

#!/usr/bin/env python

# -*- coding: utf-8 -*-

"""

#

# Authors: limanman

# OsChina: http://xmdevops.blog.51cto.com/

# Purpose:

#

"""

# 说明: 导入公共模块

import os

import time

import multiprocessing

# 说明: 导入其它模块

class TaskHandler(multiprocessing.Process):

    def __init__(self, lock, fpath, *args, **kwargs):

        super(TaskHandler, self).__init__(*args, **kwargs)

        self.lock = lock

        self.fpath = fpath

    def run(self):

        with self.lock:

            with open(self.fpath, 'a+b') as f:

                data = ''.join([str(time.time()), os.linesep])

                f.write(data)

if __name__ == '__main__':

    proceses = []

    lock = multiprocessing.Lock()

    fpath = 'multiprocessing.log'

    for in xrange(10):

        proceses.append(TaskHandler(lock, fpath))

    for in proceses:

        p.start()

3. Semaphore类,主要用于控制同时对共享资源访问子进程数,如池的最大连接数限定

s = multiprocessing.Semaphore(value=1) -> Semaphore

说明: 创建信号量对象,value表示同时对共享资源访问的子进程数

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

#!/usr/bin/env python

# -*- coding: utf-8 -*-

"""

#

# Authors: limanman

# OsChina: http://xmdevops.blog.51cto.com/

# Purpose:

#

"""

# 说明: 导入公共模块

import os

import time

import multiprocessing

# 说明: 导入其它模块

class TaskHandler(multiprocessing.Process):

    def __init__(self, s, *args, **kwargs):

        super(TaskHandler, self).__init__(*args, **kwargs)

        self.semaphore = s

    def run(self):

        # 限制同时只能有5个子进程访问共享资源

        with self.semaphore:

            time.sleep(5)

if __name__ == '__main__':

    = multiprocessing.Semaphore(5)

    for in xrange(20):

        = TaskHandler(s)

        p.daemon = True

        p.start()

    while True:

        processes = multiprocessing.active_children()

        if not len(processes):

            break

        print 'running process => num: %s list: %s' % (len(processes), processes)

        time.sleep(1)

4. Event类,主要用于控制进程间同步通信

e = multiprocessing.Event() -> Event

说明: 创建信号对象,主要用于子进程之间同步通信

e.set() -> None

说明: 设置标志位

e.clear() -> None

说明: 清除标志位

e.is_set() -> boolean

说明: 判断是否设置了标志位

e.wait(self, timeout=None) -> None

说明: 阻塞当前子进程直到标志位被设置

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

#!/usr/bin/env python

# -*- coding: utf-8 -*-

"""

#

# Authors: limanman

# OsChina: http://xmdevops.blog.51cto.com/

# Purpose:

#

"""

# 说明: 导入公共模块

import os

import time

import multiprocessing

# 说明: 导入其它模块

def task001(e):

    for in xrange(0100):

        print _

    e.set()

def task002(e):

    e.wait()

    print 'found notice: event is set...'

    for in xrange(100200):

        print _

if __name__ == '__main__':

    = multiprocessing.Event()

    p001 = multiprocessing.Process(target=task001, args=(e,))

    p002 = multiprocessing.Process(target=task002, args=(e,))

    p001.start()

    p002.start()

说明: 通过Evant类可以实现很方便的实现子进程与子进程,子进程与主进程之间的通信,甚至可以将所有子进程daemon设置为True,最后e.wait()阻塞,子进程中去设置此标识位来控制主进程的执行流程.

5. Pipe类,主要用于两个子进程之间的数据传递

p = multiprocessing.Pipe(duplex=True) -> tuple

说明: 创建通道对象,主要用于两个子进程之间的数据传递,返回管道的两个端对象1/2,如果duplex为true则全双工可以互相收发,否则1端只能接受消息,2端只能发送消息

p[0/1].send(picklable) -> None

说明: 发送数据支持任意可序列化对象

p[0/1].recv() -> picklable

说明: 如果没有消息可接收,recv会一直阻塞直至管道被关闭抛出EOFError异常

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

#!/usr/bin/env python

# -*- coding: utf-8 -*-

"""

#

# Authors: limanman

# OsChina: http://xmdevops.blog.51cto.com/

# Purpose:

#

"""

# 说明: 导入公共模块

import time

import multiprocessing

from Queue import Empty, Full

# 说明: 导入其它模块

def producer(pipe):

    while True:

        data = {

            'thread': multiprocessing.current_process().name,

            'value': time.time()

        }

        try:

            pipe.send(data)

        except EOFError, e:

            break

        time.sleep(1)

def consumer(pipe):

    while True:

        try:

            print 'producer: %(thread)s current value: %(value)s' % pipe.recv()

        except EOFError, e:

            break

        time.sleep(1)

if __name__ == '__main__':

    # 半双工模式下pipe[0]负责接收消息,pipe[1]负责发送消息

    pipe = multiprocessing.Pipe(duplex=False)

    = multiprocessing.Process(target=producer, args=(pipe[1],))

    = multiprocessing.Process(target=consumer, args=(pipe[0],))

    p.start()

    c.start()

6. Queue类,主要用于多个子进程之间的数据传递

q = multiprocessing.Queue(maxsize=0) -> Queue

说明: 创建队列对象,主要用于多个进程之间的数据传递

q.full() -> boolean

说明: 判断队列是否已满

q.close() -> None

说明: 关闭队列

q.empty() -> boolean

说明: 判断队列是否已空

q.put(obj, block=True, timeout=None) -> None

说明: 插入队列,block为False会立即抛出Queue.Full异常,否则会阻塞timeout时间,直到队列有剩余的空间,如果超时会抛出Queue.Full异常,还有一个同类方法q.put_nowait(obj)非阻塞插入立即抛Queue.Full异常

q.get(block=True, timeout=None) -> None

说明: 取出队列,block为false会立即抛出Queue.Empty异常,否则会阻塞timeout时间,直到队列有新对象插入,如果超时会抛出Queue.Empty异常,还有一个同类方法q.get_nowait()非阻塞读取立抛Queue.Empty异常

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

#!/usr/bin/env python

# -*- coding: utf-8 -*-

"""

#

# Authors: limanman

# OsChina: http://xmdevops.blog.51cto.com/

# Purpose:

#

"""

# 说明: 导入公共模块

import time

import multiprocessing

from Queue import Empty, Full

# 说明: 导入其它模块

def producer(q):

    while True:

        data = {

            'thread': multiprocessing.current_process().name,

            'value': time.time()

        }

        try:

            q.put(data, block=False)

        except Full, e:

            continue

        time.sleep(1)

def consumer(q):

    while True:

        try:

            print 'producer: %(thread)s current value: %(value)s' % q.get(block=False)

        except Empty, e:

            continue

        time.sleep(1)

if __name__ == '__main__':

    = multiprocessing.Queue()

    = multiprocessing.Process(target=producer, args=(q,))

    = multiprocessing.Process(target=consumer, args=(q,))

    p.start()

    c.start()

7. Pool类,主要用于以进程池的形式自动管控进程池内子进程数目

p = multiprocessing.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None) -> Pool

说明: 创建包含规定数目子进程池对象,并向这些工作进程传递作业,直到没有更多作业为止,processes表示初始化状态下的子进程数,maxtasksperchild表示为每个进程执行N个作业数后重新启动一个工作子进程防止运行时间过长导致消耗太多系统资源.

p.close() -> None

说明: 禁止新的子进程加入,所以必须放在p.join()前面

p.join() -> None

说明: 主进程阻塞等待子进程退出,必须出现在p.close()和p.terminate() 的后面

p.terminate() -> None

说明: 结束工作进程,不再处理未处理的任务.

p.apply(self, func, args=(), kwds={}) -> obj

说明: 同内置函数apply,默认等待进程池中子进程返回结果

p.apply_async(self, func, args=(), kwds={}, callback=None) -> ApplyResult

说明: 同内置函数apply,默认不等待子进程返回结果直接返回,结果使用返回对象get()方法回调获取

p.map(self, func, iterable, chunksize=None) -> list

p.map_async(self, func, iterable, chunksize=None, callback=None) -> MapResult

说明: 同上,但是支持接受iterable序列化对象,简化进程池调用,而且速度更快,推荐使用,结果使用返回对象get()方法回调获取

 

p.imap(self, func, iterable, chunksize=None) -> IMapIterator

p.imap_unordered(self, func, iterable, chunksize=1) -> IMapUnorderedIterator

说明: 同上,但是imap返回的是序列对象,而imap_unordered返回的是未排序的结果,也就是按照原始执行顺序返回

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

#!/usr/bin/env python

# -*- coding: utf-8 -*-

"""

#

# Authors: limanman

# OsChina: http://xmdevops.blog.51cto.com/

# Purpose:

#

"""

# 说明: 导入公共模块

import os

import pprint

import multiprocessing

# 说明: 导入其它模块

def read_filelist(p):

    result = []

    if not os.path.isdir(p):

        result.append(p)

        return result

    for root, dirs, files in os.walk(p):

        for item in files:

            fpath = os.path.join(root, item)

            result.append(fpath)

    return result

def read_filesize(f):

    return os.path.getsize(f), f

if __name__ == '__main__':

    file_list = read_filelist('C:\Users\Administrator\Desktop')

    pool = multiprocessing.Pool(20)

    file_size = pool.map_async(read_filesize, file_list)

    pool.close()

    pool.join()

    pprint.pprint(file_size.get())

说明: 如上例子先获取文件列表,然后通过异步回调获取所有文件大小,相对于使用apply或是apply_async需要每次append到一个列表中,此方法更加简化了多进程池的使用.推荐使用

 

登录乐搏学院官网http://www.learnbo.com/

或关注我们的官方微博微信,还有更多惊喜哦~

 

本文出自 “满满李 - 运维开发之路” 博客,请务必保留此出处http://xmdevops.blog.51cto.com/11144840/1861632

© 著作权归作者所有

共有 人打赏支持
乐搏学院
粉丝 6
博文 526
码字总数 707467
作品 0
丰台
程序员
怎样才能成为一个电玩程序员(转)

电玩游戏广受青少年们欢迎,其中不少年轻人专注于此,甚至想到要自己编写一个游戏。另外一方面电玩游戏工程师被大多数人认为是大有“钱”途的。如果你做的游戏走红了,成为百万富翁也是分分钟...

咲晚杍
2014/07/28
0
0
haproxy实现的web反向代理,动静分离,以及基于keepalived实现的haproxy的高可用

haproxy于Nginx一样都是做反向代理,但是与其相比,haproxy更专注于web代理。HAProxy是单进程多请求,也支持多进程,HAProxy运行在当前的硬件上,完全可以支持数以万计的并发连接。 haproxy功...

chinahaike
2014/05/02
0
0
多进程与多线程的区别

进程是程序在计算机上的一次执行活动。当你运行一个程序,你就启动了一个进程。显然,程序是死的(静态的),进程是活的(动态的)。进程可以分为系统进程和用户进程。凡是用于完成操作系统的各种...

技术mix呢
2017/11/09
0
0
5天玩转C#并行和多线程编程 —— 第五天 多线程编程大总结

5天玩转C#并行和多线程编程系列文章目录 5天玩转C#并行和多线程编程 —— 第一天 认识Parallel 5天玩转C#并行和多线程编程 —— 第二天 并行集合和PLinq 5天玩转C#并行和多线程编程 —— 第三...

雲霏霏
2014/11/26
0
0
5天玩转C#并行和多线程编程 —— 第二天 并行集合和PLinq

5天玩转C#并行和多线程编程系列文章目录 5天玩转C#并行和多线程编程 —— 第一天 认识Parallel 5天玩转C#并行和多线程编程 —— 第二天 并行集合和PLinq 5天玩转C#并行和多线程编程 —— 第三...

雲霏霏
2014/09/28
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

jetbrains系产品IDEA:mac上面提示快捷键设置

原因 由于Mac上面的Ctrl+空格变成输入法切换的快捷键,在使用IDEA的过程中,代码提示很不方便,需要使用option+/这种传统eclipse上面的代码提示快捷键作为主要快捷键。 怎么修改? 移除【opt...

亚林瓜子
35分钟前
0
0
Exclipse 输出结果时换行

System.out.println(f1 + "\n" + d1 + "\n" + d2);

笑丶笑
35分钟前
1
0
怎样治疗标签不能触发onblur事件

I realize this was over a year ago, but it showed up for me in Google while trying to solve this same issue. It seems Chrome does not consider some elements, like body and ancho......

Weijuer
38分钟前
0
0
vue常见库安装

移动设备上的浏览器默认会在用户点击屏幕大约延迟300毫秒后才会触发点击事件,这是为了检查用户是否在做双击。为了能够立即响应用户的点击事件,才有了FastClick。 安装fastclick npm insta...

林夏夕
40分钟前
0
0
kafka 教程(三) kafka Java API 编程

下午写

MrPei
41分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部