文档章节

Python 中 Ctrl+C 不能终止 Multiprocessing Pool 的解决方案

mickelfeng
 mickelfeng
发布于 2017/04/07 16:41
字数 817
阅读 119
收藏 1

本文理论上对 multiprocessing.dummy 的Pool同样有效。

python2.x中multiprocessing提供的基于函数进程池,按下 ctrl+c 不能停止所有的进程并退出。即必须 ctrl+z 后找到残留的子进程,把它们干掉。先看一段 ctrl+c 无效的代码:

#!/usr/bin/env python
import multiprocessing
import os
import time


def do_work(x):
    print 'Work Started: %s' % os.getpid()
    time.sleep(10)
    return x * x


def main():
    pool = multiprocessing.Pool(4)
    try:
        result = pool.map_async(do_work, range(8))
        pool.close()
        pool.join()
        print result
    except KeyboardInterrupt:
        print 'parent received control-c'
        pool.terminate()
        pool.join()
 

if __name__ == "__main__":
    main()

这段代码运行后,按 ^c 一个进程也杀不掉,最后会残留包括主进程在内共5个进程(1+4),kill掉主进程能让其全部退出。很明显,使用进程池时 KeyboardInterrupt 不能被进程捕捉。解决方法有两种。

方案一

下面这段是python源码里multiprocessing下的 pool.py 中的一段,ApplyResult就是Pool用来保存函数运行结果的类

class ApplyResult(object):

    def __init__(self, cache, callback):
        self._cond = threading.Condition(threading.Lock())
        self._job = job_counter.next()
        self._cache = cache
        self._ready = False
        self._callback = callback
        cache[self._job] = self

而下面这段代码也是 ^c 无效的代码

if __name__ == '__main__':
    import threading

    cond = threading.Condition(threading.Lock())
    cond.acquire()
    cond.wait()
    print "done"

很明显, threading.Condition(threading.Lock()) 对象无法接收 KeyboardInterrupt ,但稍微修改一下,给 cond.wait() 一个timeout参数即可,这个timeout可以在 map_async 后用get传递,把

result = pool.map_async(do_work, range(4))

改为

result = pool.map_async(do_work, range(4)).get(1)

就能成功接收 ^c 了, get 里面填1填99999还是0xffff都行

方案二

另一种方法当然就是自己写进程池了,需要使用队列,贴一段代码感受下

#!/usr/bin/env python
import multiprocessing, os, signal, time, Queue

def do_work():
    print 'Work Started: %d' % os.getpid()
    time.sleep(2)
    return 'Success'

def manual_function(job_queue, result_queue):
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    while not job_queue.empty():
        try:
            job = job_queue.get(block=False)
            result_queue.put(do_work())
        except Queue.Empty:
            pass
        #except KeyboardInterrupt: pass

def main():
    job_queue = multiprocessing.Queue()
    result_queue = multiprocessing.Queue()

    for i in range(6):
        job_queue.put(None)

    workers = []
    for i in range(3):
        tmp = multiprocessing.Process(target=manual_function,
                                      args=(job_queue, result_queue))
        tmp.start()
        workers.append(tmp)

    try:
        for worker in workers:
            worker.join()
    except KeyboardInterrupt:
        print 'parent received ctrl-c'
        for worker in workers:
            worker.terminate()
            worker.join()

    while not result_queue.empty():
        print result_queue.get(block=False)

if __name__ == "__main__":
    main()

常见的错误方案

这个必须要提一下,我发现segmentfault上都有人被误导了

理论上,在Pool初始化时传递一个 initializer 函数,让子进程忽略 SIGINT 信号,也就是^c ,然后Pool进行 terminate 处理。代码

#!/usr/bin/env python
import multiprocessing
import os
import signal
import time


def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def run_worker(x):
    print "child: %s" % os.getpid()
    time.sleep(20)
    return x * x


def main():
    pool = multiprocessing.Pool(4, init_worker)
    try:
        results = []
        print "Starting jobs"
        for x in range(8):
            results.append(pool.apply_async(run_worker, args=(x,)))

        time.sleep(5)
        pool.close()
        pool.join()
        print [x.get() for x in results]
    except KeyboardInterrupt:
        print "Caught KeyboardInterrupt, terminating workers"
        pool.terminate()
        pool.join()


if __name__ == "__main__":
    main()

然而这段代码只有在运行在 time.sleep(5) 处的时候才能用 ctrl+c 中断,即前5s你按 ^c 有效,一旦 pool.join() 后则完全无效!

建议

先确认是否真的需要用到多进程,如果是IO多的程序建议用多线程或协程,计算特别多则用多进程。如果非要用多进程,可以利用Python3的concurrent.futures包(python2.x也能装),编写更加简单易用的多线程/多进程代码,其使用和Java的concurrent框架有些相似

参考

  1. http://bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/#georges

  2. http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool#comment12678760_6191991

本文转载自:http://www.tuicool.com/articles/YFvMBnm

共有 人打赏支持
mickelfeng

mickelfeng

粉丝 227
博文 2634
码字总数 568692
作品 0
成都
高级程序员
python多进程并发之multiprocessing

Python并发之多进程 -- multiprocessing multiprocessing包是Python中的多进程管理包。 它与 threading.Thread类似,可以利用multiprocessing.Process对象来创建一个进程。该进程可以允许放在...

_Change_
2017/11/03
0
0
python自动化运维之多进程

python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,P...

炫维
06/26
0
0
python--多进程的用法详解实例

想让python实现多进程(multiprocessing),我们要先区分不同的操作系统的不同之处。 Linux操作系统下提供了一个fork()系统调用,普通函数调用一次返回一次,fork()调用一次返回两次,因为操作...

山有木兮有木兮
05/14
0
0
性能测试工具开发基础:python库介绍-multiprocessing:多进程

简介 进程是运行的程序,每个进程有自己的系统状态,包含了内存、打开文件列表、程序计数器(跟踪执行的指令)、存储函数本地调用变量的堆栈。 使用os或subprocess可以创建新进程,比如:os....

人工智能python自动化测试
08/21
0
0
[雪峰磁针石博客]python库介绍-multiprocessing:多进程

简介 进程是运行的程序,每个进程有自己的系统状态,包含了内存、打开文件列表、程序计数器(跟踪执行的指令)、存储函数本地调用变量的堆栈。 使用os或subprocess可以创建新进程,比如:os....

磁针石
08/21
0
0

没有更多内容

加载失败,请刷新页面

加载更多

WinDbg

参考来自:http://www.cnit.net.cn/?id=225 SRV*C:\Symbols*http://msdl.microsoft.com/download/symbols ctrl + d to open dump_file Microsoft (R) Windows Debugger Version 6.12.0002.633......

xueyuse0012
今天
2
0
OSChina 周五乱弹 —— 想不想把92年的萝莉退货

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @罗马的王:分享松澤由美的单曲《地球ぎ》 很久没看圣斗士星矢了 《地球ぎ》- 松澤由美 手机党少年们想听歌,请使劲儿戳(这里) @开源中国首...

小小编辑
今天
14
2
springBoot条件配置

本篇介绍下,如何通过springboot的条件配置,控制Bean的创建 介绍下开发环境 JDK版本1.8 springboot版本是1.5.2 开发工具为 intellij idea(2018.2) 开发环境为 15款MacBook Pro 前言 很多时候,...

贺小五
今天
1
0
javascript source map 的使用

之前发现VS.NET会为压缩的js文添加一个与文件名同名的.map文件,一直没有搞懂他是用来做什么的,直接删除掉运行时浏览器又会报错,后来google了一直才真正搞懂了这个小小的map文件背后的巨大...

粒子数反转
昨天
1
0
谈谈如何学Linux和它在如今社会的影响

昨天,还在农耕脑力社会,今天已经人工智能技术、大数据、信息技术的科技社会了,高速开展并迅速浸透到当今科技社会的各个方面,Linux日益成为人们信息时代的到来,更加考验我们对信息的处理程...

linux-tao
昨天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部