文档章节

Executors+CyclicBarrier实现的并发测试小例子

孟飞阳
 孟飞阳
发布于 2015/10/13 18:49
字数 870
阅读 334
收藏 2
package org.phoenix.cases.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import com.meterware.httpunit.GetMethodWebRequest;
import com.meterware.httpunit.WebConversation;
import com.meterware.httpunit.WebResponse;

/**
 * Executors+CyclicBarrier实现的并发测试小例子<br>
 * 例子实现了并发测试中使用的集合点,集合点超时时间及思考时间等技术
 * @author mengfeiyang
 *
 */
public class FlushGenerator {
	private static volatile boolean RUNFLAG = true;
	private CyclicBarrier rendzvous;
	private static int threads;
	private static AtomicInteger totalCount = new AtomicInteger();
	private static AtomicInteger startedCount = new AtomicInteger();
	private static AtomicInteger finishCount = new AtomicInteger();
	private static AtomicInteger runCount = new AtomicInteger();
	private static AtomicInteger successCount = new AtomicInteger();
	private static AtomicInteger failCount = new AtomicInteger();
	private String url;
	private long rendzvousWaitTime = 0;
	private long thinkTime = 0;
	private int iteration = 0;
	
	/**
	 * 初始值设置
	 * @param url 被测url
	 * @param threads 总线程数
	 * @param iteration 每个线程迭代次数
	 * @param rendzvousWaitTime 集合点超时时间,如果不启用超时时间,请将此值设置为0.<br>
	 * 							如果不启用集合点,请将此值设置为-1<br>
	 * 							如果不启用超时时间,则等待所有的线程全部到达后,才会继续往下执行<br>
	 * @param thinkTime 思考时间,如果启用思考时间,请将此值设置为0
	 */
	public FlushGenerator(String url,int threads,int iteration,long rendzvousWaitTime,long thinkTime){
		totalCount.getAndSet(threads);
		FlushGenerator.threads = threads;
		this.url = url;
		this.iteration = iteration;
		this.rendzvousWaitTime = rendzvousWaitTime;
		this.thinkTime = thinkTime;
	}

	public static ThreadCount getThreadCount(){
		return new ThreadCount(threads,runCount.get(),startedCount.get(),finishCount.get(),successCount.get(),failCount.get());
	}
	
	public static boolean isRun(){
		return finishCount.get() != threads;
	}
	
	public synchronized static void stop(){
		RUNFLAG = false;
	}
	
	public void runTask(){
		List<Future<String>> resultList = new ArrayList<Future<String>>();
		ExecutorService exeService = Executors.newFixedThreadPool(threads);
		rendzvous = new CyclicBarrier(threads);//默认加载全部线程
		for(int i=0;i<threads;i++){
			resultList.add(exeService.submit(new TaskThread(i,url,iteration,rendzvousWaitTime,thinkTime)));
		}
		exeService.shutdown();
		for(int j=0;j<resultList.size();j++){
			try{
				System.out.println(resultList.get(j).get());
			}catch(Exception e){
				e.printStackTrace();
			}
		}
		stop();
	}
	
	static class ThreadCount {
        public final int runThreads;
        public final int startedThreads;
        public final int finishedThreads;
        public final int totalThreads;
        public final int successCount;
        public final int failCount;
        
        
        public ThreadCount(int totalThreads,int runThreads, int startedThreads, int finishedThreads,int successCount,int failCount) {
        	this.totalThreads = totalThreads;
        	this.runThreads = runThreads;
        	this.startedThreads = startedThreads;
        	this.finishedThreads = finishedThreads;
            this.successCount = successCount;
            this.failCount = failCount;
        }
	}
	
	private class TaskThread implements Callable<String> {
		private String url;
		private long rendzvousWaitTime = 0;
		private long thinkTime = 0;
		private int iteration = 0;
		private int iterCount = 0;
		private int taskId;
		
		/**
		 * 任务执行者属性设置
		 * @param taskId 任务id号
		 * @param url 被测url
		 * @param iteration 迭代次数,如果一直执行则需将此值设置为0
		 * @param rendzvousWaitTime 集合点超时时间,如果不需要设置时间,则将此值设置为0。如果不需要设置集合点,则将此值设置为-1
		 * @param thinkTime 思考时间,如果不需要设置思考时间,则将此值设置为0
		 */
		public TaskThread(int taskId,String url,int iteration, long rendzvousWaitTime,long thinkTime){
			this.taskId = taskId;
			this.url = url;
			this.rendzvousWaitTime = rendzvousWaitTime;
			this.thinkTime = thinkTime;
			this.iteration = iteration;
		}
		@Override
		public String call() throws Exception{
			startedCount.getAndIncrement();
			runCount.getAndIncrement();
			while(RUNFLAG && iterCount<iteration){
				if(iteration != 0)iterCount++;
				try{
						if(rendzvousWaitTime > 0){
							try{
								System.out.println("任务:task-"+taskId+" 已到达集合点...等待其他线程,集合点等待超时时间为:"+rendzvousWaitTime);
								rendzvous.await(rendzvousWaitTime,TimeUnit.MICROSECONDS);
							} catch (InterruptedException e) {
							} catch (BrokenBarrierException e) {
								System.out.println("task-"+taskId+" 等待时间已超过集合点超时时间:"+rendzvousWaitTime+" ms,将开始执行任务....");
							} catch (TimeoutException e) {
							}
						} else if(rendzvousWaitTime == 0){
							try{
								System.out.println("任务:task-"+taskId+" 已到达集合点...等待其他线程");
								rendzvous.await();
							} catch (InterruptedException e) {
							} catch (BrokenBarrierException e) {
							}
						}
					WebResponse wr = new WebConversation().getResponse(new GetMethodWebRequest(url));
					System.out.println("线程:task-"+taskId+" 获取到的资源大小:"+wr.getText().length()+",状态码:"+wr.getResponseCode());
					successCount.getAndIncrement();
					if(thinkTime !=0){
						System.out.println("task-"+taskId+" 距下次启动时间:"+thinkTime);
						Thread.sleep(thinkTime);
					}
				}catch(Exception e){
					failCount.getAndIncrement();
				}
			}
			finishCount.getAndIncrement();
			runCount.decrementAndGet();
			return Thread.currentThread().getName()+" 执行完成!";
		}
	}
	
	public static void main(String[] args) {
		new Thread(){
			public void run(){
				new FlushGenerator("http://10.108.79.8:8080/hh.php",5,3,0,200).runTask();
			}
		}.start();
		
		new Thread(){
			public void run(){
				while(true){
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println("isRun:"+FlushGenerator.isRun());
					System.out.println("totalThreads:"+FlushGenerator.getThreadCount().totalThreads);
					System.out.println("startedThreads:"+FlushGenerator.getThreadCount().startedThreads);
					System.out.println("runThreads:"+FlushGenerator.getThreadCount().runThreads);
					System.out.println("finishedThread:"+FlushGenerator.getThreadCount().finishedThreads);
					System.out.println("successCount:"+FlushGenerator.getThreadCount().successCount);
					System.out.println("failCount:"+FlushGenerator.getThreadCount().failCount);
					System.out.println();
				}
			}
		}.start();
	}
}

 

© 著作权归作者所有

共有 人打赏支持
孟飞阳
粉丝 207
博文 973
码字总数 544644
作品 5
朝阳
个人站长
私信 提问
C#高性能大容量SOCKET并发(一):IOCP完成端口例子介绍

原文:C#高性能大容量SOCKET并发(一):IOCP完成端口例子介绍 例子主要包括SocketAsyncEventArgs通讯封装、服务端实现日志查看、SCOKET列表、上传、下载、远程文件流、吞吐量协议,用于测试S...

杰克.陈
2017/12/06
0
0
并发酷刑: 在 Java 内存模型中测试代码

不知道你是否这么觉得,而对于我来说最烦恼的bug就是处理并发问题。除非你已经掌握了利用其他工具摒弃你现有的工具. 但是,这里还有另外一个例子,因为我们已经在这里讲述了很多关于Java的知...

oschina
2014/03/12
1K
0
PostgreSQL 高并发任务分配系统 实践

标签 PostgreSQL , 高并发消费 , pgtryadvisoryxactlock , 秒杀 , 任务分配 背景 给任务分配线程ID,或让线程去抢占任务执行,是任务分配系统中的基本需求。 目的是能够快速的消耗掉所有的任...

德哥
2017/12/22
0
0
【JBehave】JBehave介绍

JBehave介绍 JBehave是一个用Java编写的BDD(Behavior-Driven-Design)框架, java界的Cucumber。(注: 1、BDD主要的目的是能够从业务领域专家的视角来编写测试用例,以解决技术人员和业务领域...

一路向北的兔斯基
2016/04/20
87
0
JAVA和Go语言的多线程并发测试二

以前做过一次Go和Java的多线程并发对比测试(Java、Scala和Go语言多线程并发对比测试)。当时,测试所采用的例子是CPU运算密集型的,会占用大量的CPU资源。测试的结果Go并不占优势,可能的原...

qinhui99
2012/06/05
0
4

没有更多内容

加载失败,请刷新页面

加载更多

127.0.0.1 和 0.0.0.0 地址的区别

1. IP地址分类 1.1 IP地址表示 IP地址由两个部分组成,net-id和host-id,即网络号和主机号。 net-id:表示ip地址所在的网络号。 host-id:表示ip地址所在网络中的某个主机号码。 即: IP-a...

华山猛男
19分钟前
6
0
解决Unknown host 'd29vzk4ow07wi7.cloudfront.net'. You may need to adjust the proxy settings in Gradle.

把 总项目 下的 build.gradle 中的 两个 jcenter() 用 maven{ url ‘http://maven.aliyun.com/nexus/content/groups/public/’} 代替。...

lanyu96
25分钟前
2
0
基于redis的分布式锁

redisson提供了基于redis的分布式锁实现方式,本文就尝试了下锁的使用方式。Redisson同时还为分布式锁提供了异步执行的相关方法,第二节执行介绍。 一、可重入锁验证 同一个jvm里面同一线程的...

noob_chr
33分钟前
8
0
CPU性能过剩提升乏力影响未来行业发展吗?

虽然CPU仍然在不断发展,但是它的性能已经不再仅仅受限于单个处理器类型或制造工艺上了。和过去相比,CPU性能提升的步伐明显放缓了,接下来怎么办,成为横亘在整个行业面前的大问题。 自201...

linux-tao
36分钟前
3
0
设计模式“6”大原则!

面向对象设计原则 概述 对于面向对象软件系统的设计而言,在支持可维护性的同时,提高系统的可复用性是一个至关重要的问题,如何同时提高一个软件系统的可维护性和可复用性是面向对象设计需要...

Java干货分享
52分钟前
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部