文档章节

Python 中 Ctrl+C 不能终止 Multiprocessing Pool 的解决方案

mickelfeng
 mickelfeng
发布于 2017/04/07 16:41
字数 817
阅读 116
收藏 1

本文理论上对 multiprocessing.dummy 的Pool同样有效。

python2.x中multiprocessing提供的基于函数进程池,按下 ctrl+c 不能停止所有的进程并退出。即必须 ctrl+z 后找到残留的子进程,把它们干掉。先看一段 ctrl+c 无效的代码:

#!/usr/bin/env python
import multiprocessing
import os
import time


def do_work(x):
    print 'Work Started: %s' % os.getpid()
    time.sleep(10)
    return x * x


def main():
    pool = multiprocessing.Pool(4)
    try:
        result = pool.map_async(do_work, range(8))
        pool.close()
        pool.join()
        print result
    except KeyboardInterrupt:
        print 'parent received control-c'
        pool.terminate()
        pool.join()
 

if __name__ == "__main__":
    main()

这段代码运行后,按 ^c 一个进程也杀不掉,最后会残留包括主进程在内共5个进程(1+4),kill掉主进程能让其全部退出。很明显,使用进程池时 KeyboardInterrupt 不能被进程捕捉。解决方法有两种。

方案一

下面这段是python源码里multiprocessing下的 pool.py 中的一段,ApplyResult就是Pool用来保存函数运行结果的类

class ApplyResult(object):

    def __init__(self, cache, callback):
        self._cond = threading.Condition(threading.Lock())
        self._job = job_counter.next()
        self._cache = cache
        self._ready = False
        self._callback = callback
        cache[self._job] = self

而下面这段代码也是 ^c 无效的代码

if __name__ == '__main__':
    import threading

    cond = threading.Condition(threading.Lock())
    cond.acquire()
    cond.wait()
    print "done"

很明显, threading.Condition(threading.Lock()) 对象无法接收 KeyboardInterrupt ,但稍微修改一下,给 cond.wait() 一个timeout参数即可,这个timeout可以在 map_async 后用get传递,把

result = pool.map_async(do_work, range(4))

改为

result = pool.map_async(do_work, range(4)).get(1)

就能成功接收 ^c 了, get 里面填1填99999还是0xffff都行

方案二

另一种方法当然就是自己写进程池了,需要使用队列,贴一段代码感受下

#!/usr/bin/env python
import multiprocessing, os, signal, time, Queue

def do_work():
    print 'Work Started: %d' % os.getpid()
    time.sleep(2)
    return 'Success'

def manual_function(job_queue, result_queue):
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    while not job_queue.empty():
        try:
            job = job_queue.get(block=False)
            result_queue.put(do_work())
        except Queue.Empty:
            pass
        #except KeyboardInterrupt: pass

def main():
    job_queue = multiprocessing.Queue()
    result_queue = multiprocessing.Queue()

    for i in range(6):
        job_queue.put(None)

    workers = []
    for i in range(3):
        tmp = multiprocessing.Process(target=manual_function,
                                      args=(job_queue, result_queue))
        tmp.start()
        workers.append(tmp)

    try:
        for worker in workers:
            worker.join()
    except KeyboardInterrupt:
        print 'parent received ctrl-c'
        for worker in workers:
            worker.terminate()
            worker.join()

    while not result_queue.empty():
        print result_queue.get(block=False)

if __name__ == "__main__":
    main()

常见的错误方案

这个必须要提一下,我发现segmentfault上都有人被误导了

理论上,在Pool初始化时传递一个 initializer 函数,让子进程忽略 SIGINT 信号,也就是^c ,然后Pool进行 terminate 处理。代码

#!/usr/bin/env python
import multiprocessing
import os
import signal
import time


def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def run_worker(x):
    print "child: %s" % os.getpid()
    time.sleep(20)
    return x * x


def main():
    pool = multiprocessing.Pool(4, init_worker)
    try:
        results = []
        print "Starting jobs"
        for x in range(8):
            results.append(pool.apply_async(run_worker, args=(x,)))

        time.sleep(5)
        pool.close()
        pool.join()
        print [x.get() for x in results]
    except KeyboardInterrupt:
        print "Caught KeyboardInterrupt, terminating workers"
        pool.terminate()
        pool.join()


if __name__ == "__main__":
    main()

然而这段代码只有在运行在 time.sleep(5) 处的时候才能用 ctrl+c 中断,即前5s你按 ^c 有效,一旦 pool.join() 后则完全无效!

建议

先确认是否真的需要用到多进程,如果是IO多的程序建议用多线程或协程,计算特别多则用多进程。如果非要用多进程,可以利用Python3的concurrent.futures包(python2.x也能装),编写更加简单易用的多线程/多进程代码,其使用和Java的concurrent框架有些相似

参考

  1. http://bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/#georges

  2. http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool#comment12678760_6191991

本文转载自:http://www.tuicool.com/articles/YFvMBnm

共有 人打赏支持
mickelfeng

mickelfeng

粉丝 227
博文 2584
码字总数 562007
作品 0
成都
高级程序员
python自动化运维之多进程

python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,P...

炫维
06/26
0
0
python--多进程的用法详解实例

想让python实现多进程(multiprocessing),我们要先区分不同的操作系统的不同之处。 Linux操作系统下提供了一个fork()系统调用,普通函数调用一次返回一次,fork()调用一次返回两次,因为操作...

山有木兮有木兮
05/14
0
0
Python 2.6 亮点:multiprocessing模块

本来以为Python 2.6只是Python 3.0的过渡版本,不会有太多的新功能。但看到这个2.6的重大改动列表,才发现自己挺落后的。在2.6中新增的multiprocessing模块也绝对是Python 2.6的杀手级应用(...

索隆
2012/05/02
0
0
python多进程并发之multiprocessing

multiprocessing包是Python中的多进程管理包。它与 threading.Thread类似,可以利用multiprocessing.Process对象来创建一个进程。该进程可以允许放在Python程序内部编写的函数中。该Process...

_Change_
2017/11/03
0
0
python --- mulitprocessing(多进程)模块使用

1. 什么是进程?   进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进...

码农47
2017/11/12
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Nginx防盗链、访问控制、Nginx解析PHP相关配置、Nginx代理

Nginx防盗链 在配置文件里写入以下内容: 用curl测试 访问控制 Nginx限制某些IP不能访问或者只允许某些IP访问。 配置文件写入如下内容: allow 表示允许访问的IP,deny限制访问的IP。 匹配正...

黄昏残影
10分钟前
0
0
自己动手实现RPC服务调用框架

转载 TCP的RPC 引言 本文利用java自带的socket编程实现了一个简单的rpc调用框架,由两个工程组成分别名为battercake-provider(服务提供者)、battercake-consumer(服务调用者)。 设计思路...

雨中漫步的鱼
12分钟前
0
0
Centos6.x安装之后的9件事

Centos6.x安装之后的9件事 这些不是必须都做的,只不过是我个人的习惯,在此记录一下。 1.修改yum源到国内 CentOS系统更换软件安装源 备份你的原镜像文件,以免出错后可以恢复。 mv /etc/yu...

叶云轩
18分钟前
5
0
springboot2 使用jsp NoHandlerFoundException

开发图片上传功能,为验证测试功能是否正常,使用JSP编写表单提交进行测试 开发完成后,请求API提示如下异常: No mapping found for HTTP request with URI [/WEB-INF/jsp/avatar_upload.j...

showlike
24分钟前
0
0
springboot踩坑记--springboot正常启动但访问404

一 spring boot的启动类不能直接放在main(src.java.main)这个包下面,把它放在有包的里面就可以了。 二 正常启动了,但是我写了一个controller ,用的@RestController 注解去配置的controlle...

onedotdot
25分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部