文档章节

阻塞式ThreadPoolExecutor

轻扬
 轻扬
发布于 2015/04/22 10:19
字数 1133
阅读 181
收藏 0

最近在做一个大文件批量数据导入数据库的时候遇到个问题,在使用ThreadPoolExecutor提交任务的时候,发现在线程池满的时候,不能达到阻塞线程的作用。导致的后果就是文件被不断读取到内存,然后丢给ThreadPoolExecutor执行,由于消费速度跟不上生产速度,导致内存不断增长,最后OOM。

于是开始研究ThreadPoolExecutor如何实现在任务满的情况下阻塞线程。

ThreadPoolExecutor类提供了多个参数用于定制化自己的线程池,常用的有corePoolSize,maximumPoolSize,workQueue等几个,如下面构造函数:

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

为了实现阻塞式线程池,workQueue参数需要传一个有界的BlockQueue,默认Executors.newFixedThreadPool()传入的无参数LinkedBlockingQueue边界为Integer.MAX_VALUE,不能起到Block的效果。

传了有界BlockQueue之后,ThreadPoolExecutor在线程队列Blcok的时候不会阻塞线程提交,而是调用RejectedExecutionHandler,抛出RejectedExecutionException异常。

JDK默认提供了4种失败策略: 
    AbortPolicy(中止)、CallersRunPolicy(调用者运行)、DiscardPolicy(丢弃)、DiscardOldestPolicy(丢弃最旧的)

JDK默认使用了AbortPolicy(中止)策略,这个可以通过handler参数来设置。


这里收集了几种阻塞线程池提交的方法:

一、通过CallersRunPolicy调用策略实现

其中CallersRunPolicy(调用者运行)方法,在线程池队列满了后会调用主线程来执行任务,同样可以达到阻塞线程提交的目的。这样做有两个缺点:

    1、执行任务的线程会是size+1个(主线程),这在有些资源敏感的场景是不被允许的

    2、由于主线程被用于执行任务,如果这个任务比较大,会长时间阻塞主线程的执行,导致其他线程空闲时候也不能接受新的任务,形成资源浪费

实例代码如下:

new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),
                new ThreadPoolExecutor.CallerRunsPolicy())


二、通过自定义RejectedExecutionHandler实现

通过自定义RejectedExecutionHandler,显示调用queue.put()阻塞方法来实现线程池阻塞。这种方法能够避免CallersRunPolicy方法的两个缺点。

示例代码如下:

new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        try {
                            executor.getQueue().put(r);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                });

三、通过其他多线程工具辅助控制

比如常用的可以通过信号量来控制,在提交任务的时候acquire,任务执行完后release。

这种方法的缺点是会侵入任务的执行过程

示例代码如下:

public static void main(String[] args) throws InterruptedException, ExecutionException {
//        ExecutorService executorService = Executors.newFixedThreadPool(5);
        ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(10));
        final Semaphore semaphore = new Semaphore(5);
        final AtomicInteger counter = new AtomicInteger(0);
        int i = 0;
        while (true) {
            semaphore.acquire();
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        int count = counter.addAndGet(1);
                        System.out.println(Thread.currentThread() + "start, counter: " + count);
                        try {
                            Thread.sleep(1000 * 2);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(Thread.currentThread() + "end, counter: " + count);
                    } finally {
                        semaphore.release();
                    }
                }
            });
            if (++i > 20) {
                break;
            }
            System.out.println("now it is " + i);
        }

        System.out.println("shotdown...");
        ExecutorUtils.shutdownAndWait(executorService, executorService.toString());
        System.out.println("Test ends.");
    }

这里注意线程池的shutdown过程,没有使用简单的shutdown,因为这样会导致部分task没有执行完成

ExecutorUtils.shutdownAndWait方法代码如下:

    public static void shutdownAndWait(ExecutorService executor, String name) {
        log.info("Shutting down " + name);
        executor.shutdown();
        awaitTermination(executor, name);
    }
    private static void awaitTermination(ExecutorService executor, String name) {
        try {
            while (!executor.awaitTermination(EXECUTOR_TERMINATION_TIMEOUT, TimeUnit.SECONDS)) {
                log.info("Waiting for all tasks complete execution in " + name);
            }
            log.info(name + " is shut down.");
        } catch (InterruptedException e) {
            log.error("Shutting down " + name + " failed.", e);
            Thread.currentThread().interrupt();
        }
    }


© 著作权归作者所有

轻扬
粉丝 0
博文 1
码字总数 1133
作品 0
长宁
高级程序员
私信 提问
Java里阻塞线程的三种实现方法

在日常开发中,我们有时会遇到遇到多线程处理任务的情况,JDK里提供了便利的ThreadPoolExecutor以及其包装的工具类Executors。但是我们知道ExecutorService.excute(Runnable r)是异步的,超过...

黄亿华
2013/03/18
15.3K
3
ThreadPoolExecutor源码分析及阻塞提交任务方法

ThreadPoolExecutor源码 ThreadPoolExecutor 基本使用参考:ThreadPoolExecutor执行过程分析 线程池状态标志 ctl 保存了线程池的运行状态(runState)和线程池内有效线程数量(workerCount)...

4rnold
2018/06/28
56
0
亲,ThreadPoolExecutor了解一下: BlockingQueue (阻塞队列)

描述 超出核心线程数(corePoolSize)的task,会优先放到队列中 线程执行完task后,会从队列中取task去执行. 几种实现方案: LinkedBlockingQueue : 默认实现队列,可选边界的,基于链表实现的. Arr...

ol_O_O_lo
2018/11/06
13
0
ThreadPoolExecutor源码详解

我之前一篇文章谈到了ThreadPoolExecutor的作用(http://my.oschina.net/xionghui/blog/494004),这篇文章介绍下它的原理,并根据原理分析下它的实现源码。 我们先来查看一下ThreadPoolExe...

xionghuiCoder
2015/08/19
3.2K
3
线程池 线程池的四种实现方式

ExecutorService是线程池接口。它定义了4中线程池: 1. newCachedThreadPool: 底层:返回ThreadPoolExecutor实例,corePoolSize为0;maximumPoolSize为Integer.MAX_VALUE;keepAliveTime为6...

edison_kwok
05/04
60
0

没有更多内容

加载失败,请刷新页面

加载更多

Spring Boot + Mybatis-Plus 集成与使用(二)

前言: 本章节介绍MyBatis-Puls的CRUD使用。在开始之前,先简单讲解下上章节关于Spring Boot是如何自动配置MyBatis-Plus。 一、自动配置 当Spring Boot应用从主方法main()启动后,首先加载S...

伴学编程
昨天
7
0
用最通俗的方法讲spring [一] ──── AOP

@[TOC](用最通俗的方法讲spring [一] ──── AOP) 写这个系列的目的(可以跳过不看) 自己写这个系列的目的,是因为自己是个比较笨的人,我曾一度怀疑自己的智商不适合干编程这个行业.因为在我...

小贼贼子
昨天
7
0
Flutter系列之在 macOS 上安装和配置 Flutter 开发环境

本文为Flutter开发环境在macOS下安装全过程: 一、系统配置要求 想要安装并运行 Flutter,你的开发环境需要最低满足以下要求: 操作系统:macOS(64位) 磁盘空间:700 MB(不包含 IDE 或其余...

過愙
昨天
6
0
OSChina 周六乱弹 —— 早上儿子问我他是怎么来的

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @凉小生 :#今日歌曲推荐# 少点戾气,愿你和这个世界温柔以待。中岛美嘉的单曲《僕が死のうと思ったのは (曾经我也想过一了百了)》 《僕が死の...

小小编辑
昨天
2.7K
16
Excption与Error包结构,OOM 你遇到过哪些情况,SOF 你遇到过哪些情况

Throwable 是 Java 中所有错误与异常的超类,Throwable 包含两个子类,Error 与 Exception 。用于指示发生了异常情况。 Java 抛出的 Throwable 可以分成三种类型。 被检查异常(checked Exc...

Garphy
昨天
42
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部