文档章节

限流任务提交

萌萌哒李大帅
 萌萌哒李大帅
发布于 2017/03/30 15:21
字数 567
阅读 14
收藏 0

近日在学习java并发编程方面的知识,在参考了并发编程网的相关文章并发实战题(一)及方腾飞的并发编程艺术后,实现一个简单的限流线程任务提交工具。

package com.jerry.common;

import com.jerry.common.task.MyTask;
import com.jerry.common.task.RunThread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author Jerry
 * @created 2017年3月29日 下午2:08:22
 * @overview
 */
public class ExecutorQueueStore {

    private volatile static ExecutorQueueStore INSTANCE;
    final static int threadPoolLimitNum = 100;
    final static int semaphoreLimitNum = 10;
    final Semaphore semaphore = new Semaphore(semaphoreLimitNum);
    final ScheduledExecutorService scheduled = Executors
            .newSingleThreadScheduledExecutor();
    // final ExecutorService executor = Executors.newFixedThreadPool(limitNum);
    final ExecutorService executor = new ThreadPoolExecutor(threadPoolLimitNum,
            threadPoolLimitNum, 0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<Runnable>(threadPoolLimitNum), new MyExecutionHandler());

    public static ExecutorQueueStore getInstence() {
        if (null == INSTANCE) {
            synchronized (ExecutorQueueStore.class) {
                if (null == INSTANCE) {
                    INSTANCE = new ExecutorQueueStore();
                    INSTANCE.runSchedule();
                }
            }
        }
        return INSTANCE;
    }

    private void runSchedule() {
        scheduled.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                semaphore.release(semaphoreLimitNum);
            }
        }, 2000, 2000, TimeUnit.MILLISECONDS);
    }

    public String submit(Callable<String> task) {
        try {
            semaphore.acquire();
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        Future<String> future = executor.submit(task);
        // TODO 此处可以做些其他的事

        String result;
        try {
            result = future.get();
        } catch (InterruptedException e) {
            System.out.println("Error:" + e.getMessage());
            return "FAIL";
        } catch (ExecutionException e) {
            System.out.println("Error:" + e.getMessage());
            return "FAIL";
        }
        return result;
    }

    public static void destroy() {
        if (null != INSTANCE) {
            INSTANCE.executor.shutdown();
            while (!INSTANCE.executor.isTerminated()) {
                try {
                    INSTANCE.executor.awaitTermination(5000,
                            TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    System.out.println("executor shutdown fail, errorMsg:"
                            + e.getMessage());
                }
                INSTANCE.executor.shutdown();
            }
            INSTANCE.scheduled.shutdown();
            while (!INSTANCE.scheduled.isTerminated()) {
                try {
                    INSTANCE.scheduled.awaitTermination(3000,
                            TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    System.out.println("scheduled shutdown fail, errorMsg:"
                            + e.getMessage());
                }
                INSTANCE.scheduled.shutdown();
            }
            INSTANCE = null;
        }

    }

    public static void main(String[] args) {
        MyTask task;
        for (int i = 0; i < 1000; i++) {
            task = new MyTask("ThreadNo" + i);
            RunThread rt = new RunThread(task);
            rt.start();
        }
//		System.out.println("线程池关闭");
//		ExecutorQueueStore.destroy();
    }
}


package com.jerry.common;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 
 * @author Jerry
 * @created 2017年3月30日 下午4:22:39
 * @overview 
 */
public class MyExecutionHandler implements RejectedExecutionHandler {

	@Override
	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
		System.out.println("队列已满,开始执行异常处理流程,听张震讲鬼故事");
		// log
		// db
		System.out.println("异常处理流程结束");
	}

}


package com.jerry.common.task;

import java.util.concurrent.Callable;

/**
 *
 * @author Jerry
 * @created 2017年3月24日 下午5:13:32
 * @overview
 */
public class MyTask implements Callable<String> {

	private String name;

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	@Override
	public String call() throws Exception {
		String no = name.substring(name.indexOf('o') + 1);
		int num = Integer.parseInt(no);
		if (num % 10 == 0) {
			return "FAIL," + name;
		}
		return name;
	}

	public MyTask(String name) {
		super();
		this.name = name;
	}
}



package com.jerry.common.task;

import com.jerry.common.ExecutorQueueStore;

/**
 * 
 * @author Jerry
 * @created 2017年3月30日 下午5:06:45
 * @overview 
 */
public class RunThread extends Thread{
	
	private MyTask mytask;
	
	public RunThread(MyTask mytask) {
		super();
		this.mytask = mytask;
	}

	public void run(){
		String result = ExecutorQueueStore.getInstence().submit(mytask);
		if (result.contains("FAIL")) {
			System.out.println("任务执行失败");
			System.out.println("失败原因:" + result);
		} else {
			System.out.println("执行成功");
			System.out.println("任务名称:" + result);
		}
	}
	

}

目前结束线程时有一些问题,正在思考.....

© 著作权归作者所有

萌萌哒李大帅
粉丝 0
博文 5
码字总数 1010
作品 0
昌平
私信 提问
不做需求复印机——批量操作流程设计

相信每个技术人员都不会甘心做“需求复印机”。 不做需求复印机,有两种简单的方式。一种是在代码/模块/系统的结构上下功夫,例如前面几篇设计方案(审批、分发等)。另一种则是直接对业务流...

winters1224
2017/08/27
0
0
面试宝典系列-怎么设计一个秒杀系统

方向:将请求尽量拦截在系统上游 思路:限流和削峰 1、限流:屏蔽掉无用的流量,允许少部分流量流向后端。 2、削峰:瞬时大流量峰值容易压垮系统。常用的消峰方法有异步处理、缓存和消息中间...

suyain
2018/08/01
388
0
[Java]计算单机TPS、并对任务限流

类结构 说明: SpeedControlConstant 定义的一些与限流相关的常量 SpeedControlHelper 单机TPS限流类,提供当前任务TPS的计算功能 TaskDO 任务DO,定义了任务类型、任务的执行逻辑。实现了R...

健康的程序员
2015/12/29
214
0
spark和jstorm的一些经验(坑)

jstorm jstorm项目目前貌似停止了,提issue也没人解决,一些插件,比如jstorm-kafka支持的kafka版本较低,而且没有打包好的二进制jar,需要自己下代码进行编译,虽然最新版本jstorm2.4发布很...

whaon
2018/01/10
970
6
redis限流的问题

某系统内部流程使用mq异步处理,现在内部流程有个环节需要调用外部系统对数据进行处理,考虑到这个系统的承受能力,需要对此接口进行限流,我们使用了redis+lua进行接口限流,这部分被限流的...

ewen
2018/02/10
580
1

没有更多内容

加载失败,请刷新页面

加载更多

程序设计基础(C)第06讲例程

1summing.c /* summing.c -- 根据用户键入的整数求和 */#include <stdio.h>int main(void){ long num; long sum = 0L; /* 把sum 初始化为0 */ int status; p......

树人大学数字媒体吴凡
28分钟前
6
0
聊聊nacos config的publishConfig

序 本文主要研究一下nacos config的publishConfig ConfigController nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigController.java @Controller@R......

go4it
55分钟前
5
0
Eureka应用注册与集群数据同步源码解析

在之前的EurekaClient自动装配及启动流程解析一文中我们提到过,在构造DiscoveryClient类时,会把自身注册到服务端,本文就来分析一下这个注册流程 客户端发起注册 boolean register() t...

Java学习录
今天
11
0
Java描述设计模式(15):责任链模式

本文源码:GitHub·点这里 || GitEE·点这里 一、生活场景描述 1、请假审批流程 公司常见的请假审批流程:请假天数 当 day<=3 天,项目经理审批当 3<day<=5 天,部门经理审批当 day>5 天...

知了一笑
今天
11
0
总结:数组与链表

1、内存申请:数组在内存上是连续的空间;链表,内存地址上可以是不连续的。 2、查询速度:数组可以随机访问,链表必须顺序访问,即从首个元素开始遍历,逐个查找,所以数组查询很快。 3、写入...

浮躁的码农
今天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部