文档章节

[Java]计算单机TPS、并对任务限流

健康的程序员
 健康的程序员
发布于 2015/12/29 19:50
字数 935
阅读 102
收藏 1

类结构

说明:

SpeedControlConstant 定义的一些与限流相关的常量
SpeedControlHelper 单机TPS限流类,提供当前任务TPS的计算功能
TaskDO 任务DO,定义了任务类型、任务的执行逻辑。实现了Runnable接口
TaskStateDO 任务状态DO,定义了任务的TPS、执行次数、执行时间等
TaskStateSingleton 统计所有任务状态的单例类
TpsTest 测试类


直接上代码

package com.taobao.tps;

/**
 * 限流常量类
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class SpeedControlConstant {
	// 单机tps设置50
	public static int serverTps = 50;
}
package com.taobao.tps;

/**
 * 单机TPS限流功能类
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class SpeedControlHelper {

	/**
	 * 判断是否被限流
	 * 
	 * @param taskType
	 * @return false限流
	 */
	public static boolean speedControl(String taskType) {
		int tpsThreshod = SpeedControlConstant.serverTps;// tps阀值
		int currentTps = currentTps(taskType);// 当前tps
		if (tpsThreshod <= currentTps) {
			return false;
		}
		return true;
	}

	/**
	 * 获取当前任务类型tps
	 * 
	 * @param taskType
	 * @return
	 */
	public static int currentTps(String taskType) {
		TaskStateSingleton taskStateSingleton = TaskStateSingleton.getInstance();
		TaskStateDO currentTask = taskStateSingleton.getTaskStateDO(taskType);
		if (currentTask == null) {
			// 这里有并发问题,但是统计tps不需要特别精准。添加了并发控制反而会影响性能
			currentTask = new TaskStateDO();
			taskStateSingleton.putTaskStateDO(taskType, currentTask);
		}
		return currentTask.calcuTps();
	}

	/**
	 * 设置当前任务类型执行时间
	 * 
	 * @param taskType
	 * @param time
	 */
	public static void setExecTime(String taskType, long time) {
		TaskStateSingleton taskStateSingleton = TaskStateSingleton.getInstance();
		TaskStateDO currentTask = taskStateSingleton.getTaskStateDO(taskType);
		if (currentTask == null) {
			// logger..
			return;
		}
		currentTask.state(time);
	}
}
package com.taobao.tps;

/**
 * 任务
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class TaskDO implements Runnable {
	private String taskType;

	public TaskDO(String taskType) {
		this.taskType = taskType;
	}

	public String getTaskType() {
		return taskType;
	}

	public void run() {
		try {
			long startTime = System.currentTimeMillis();
			Thread.sleep(15);
			long endTime = System.currentTimeMillis();
			SpeedControlHelper.setExecTime(taskType, endTime - startTime);
		} catch (InterruptedException e) {
			// logger..
		}
	}

}
package com.taobao.tps;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 任务状态
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class TaskStateDO {
	/**
	 * 任务类型
	 */
	private String taskType;

	/**
	 * 一个统计时间区间内的总执行次数
	 */
	private int execCount;

	/**
	 * 一个统计时间区间内的总执行耗时
	 */
	private long totalTime;

	/**
	 * 一个统计时间区间内的平均执行耗时
	 */
	private int averageTime;

	/**
	 * 当前1秒内执行的次数
	 */
	private AtomicInteger secondExecCount = new AtomicInteger(0);

	/**
	 * 当前秒
	 */
	private long currentSecond = 0;

	public TaskStateDO() {
		reset();
	}

	/**
	 * 获取taskType
	 * 
	 * @return taskType
	 */
	public String getTaskType() {
		return taskType;
	}

	/**
	 * 设置taskType
	 * 
	 * @param taskType 要设置的taskType
	 */
	public void setTaskType(String taskType) {
		this.taskType = taskType;
	}

	/**
	 * 获取totalTime
	 * 
	 * @return totalTime
	 */
	public long getTotalTime() {
		return totalTime;
	}

	/**
	 * 获取execCount
	 * 
	 * @return execCount
	 */
	public int getExecCount() {
		return execCount;
	}

	/**
	 * 获取averageTime
	 * 
	 * @return averageTime
	 */
	public int getAverageTime() {
		return averageTime;
	}

	/**
	 * 统计任务的秒级执行次数
	 * 
	 * @return
	 */
	public int calcuTps() {
		if (currentSecond == System.currentTimeMillis() / 1000) {
			return secondExecCount.incrementAndGet();// 1s的执行次数
		} else {
			currentSecond = System.currentTimeMillis() / 1000;
			secondExecCount.set(1);
			return 1;
		}
	}

	/**
	 * 统计任务执行时间
	 * 
	 * @param time
	 */
	public void state(long time) {
		try {
			execCount++;
			totalTime += time;
			averageTime = (int) (totalTime / execCount);
		} catch (Exception e) {
			reset();
		}
	}

	/**
	 * 重置方法,没有设置为零,防止除法抛异常
	 */
	public void reset() {
		execCount = 1;
		totalTime = 100;
	}

}
package com.taobao.tps;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 统计全量任务单例
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class TaskStateSingleton {
	private Map<String, TaskStateDO> taskStateMap = new HashMap<String, TaskStateDO>();

	private ReadWriteLock lock = new ReentrantReadWriteLock();

	private Lock read = lock.readLock();

	private Lock write = lock.writeLock();

	private TaskStateSingleton() {
	}

	// 获取实例
	public static TaskStateSingleton getInstance() {
		return SingletonHolder.taskStateSingleton;
	}

	private static class SingletonHolder {
		private static final TaskStateSingleton taskStateSingleton = new TaskStateSingleton();
	}

	public Map<String, TaskStateDO> getTaskStateMap() {
		try {
			read.lock();
			return taskStateMap;
		} finally {
			read.unlock();
		}
	}

	/**
	 * 新增一个任务统计信息
	 */
	public void putTaskStateDO(String taskType, TaskStateDO taskStateDO) {
		try {
			write.lock();
			taskStateMap.put(taskType, taskStateDO);
		} finally {
			write.unlock();
		}
	}

	/**
	 * 查询一个任务统计信息
	 */
	public TaskStateDO getTaskStateDO(String taskType) {
		try {
			read.lock();
			return taskStateMap.get(taskType);
		} finally {
			read.unlock();
		}
	}

}
package com.taobao.tps;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 测试类
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class TpsTest {
	public static ThreadPoolExecutor executer = new ThreadPoolExecutor(4, 4, 4, TimeUnit.SECONDS,
			new LinkedBlockingQueue<Runnable>(500));

	public static void main(String[] args) throws Exception {
		while (true) {
			TaskDO task = new TaskDO("calcu_item_value");
			if (!SpeedControlHelper.speedControl(task.getTaskType())) {
				print("被限流..");
				continue;
			}
			executer.submit(task);
			print("添加任务成功");
			Thread.sleep(10);
		}
	}

	public static void print(Object obj) {
		System.out.print(obj.toString());
	}
}


建议用服务器测试代码,工作PC测试可以比较卡。。



© 著作权归作者所有

共有 人打赏支持
上一篇: Btrace使用
健康的程序员
粉丝 8
博文 170
码字总数 47593
作品 0
杭州
程序员
私信 提问
Java并发:分布式应用限流 Redis + Lua 实践

任何限流都不是漫无目的的,也不是一个开关就可以解决的问题,常用的限流算法有:令牌桶,漏桶。在之前的文章中,也讲到过,但是那是基于单机场景来写。 之前文章:接口限流算法:漏桶算法&...

关注公众号_搜云库_每天更新
2018/08/17
0
0
轻量级数据库中间件利器Sharding-JDBC深度解析

本文根据DBAplus社群第114期线上分享整理而成。 主题简介: 1、关系型数据库中间件核心功能介绍 2、Sharding-JDBC架构及内核解析 3、Sharding-JDBC未来展望 关系型数据库凭借灵活查询的SQL和...

张亮
2017/07/28
0
0
老问题新测试:java和C单机性能比较

Java和C最大的不同是在于Java的可伸缩性Scalable,能够平滑发展到分布式云计算平台,通过云计算能够处理不断增长的业务访问量,这个代价过程是非常小容易的。 那么在单机环境下,Java过去一直...

JavaGG
2010/03/24
276
0
老问题新测试:java和C单机性能比较

Java和C最大的不同是在于Java的可伸缩性Scalable,能够平滑发展到分布式云计算平台,通过云计算能够处理不断增长的业务访问量,这个代价过程是非常小容易的。 那么在单机环境下,Java过去一直...

JavaGG
2009/09/14
2.4K
7
RocketMQ与Kafka对比

RocketMQ与Kafka对比(18项差异) 淘宝内部的交易系统使用了淘宝自主研发的Notify消息中间件,使用Mysql作为消息存储媒介,可完全水平扩容,为了进一步降低成本,我们认为存储部分可以进一步...

莫问viva
2015/05/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Nextjs+React非页面组件SSR渲染

@随风溜达的向日葵 Nextjs Nextjs是React生态中非常受欢迎的SSR(server side render——服务端渲染)框架,只需要几个步骤就可以搭建一个支持SSR的工程(_Nextjs_的快速搭建见Next.js入门)...

随风溜达的向日葵
54分钟前
0
0
如何在 Linux 系统查询机器最近重启时间

在你的 Linux 或类 UNIX 系统中,你是如何查询系统上次重新启动的日期和时间?怎样显示系统关机的日期和时间? last 命令不仅可以按照时间从近到远的顺序列出该会话的特定用户、终端和主机名...

来来来来来
今天
3
0
Redis协议是什么样的

前言 我们用过很多redis的客户端,有没有相过自己撸一个redis客户端? 其实很简单,基于socket,监听6379端口,解析数据就可以了。 redis协议 解析数据的过程主要依赖于redis的协议了。 我们...

春哥大魔王的博客
今天
6
0
乱入Linux界的我是如何学习的

欢迎来到建哥学Linux,咳!咳!咳!开个玩笑哈,我是一个IT男,IT界的入门选手,正在学习Linux。 在之前,一直想进军IT界,学习IT技术,但是苦于没有人指导,也不知道学什么,最开始我自己在...

linuxCool
今天
4
0
携程Apollo统一配置中心的搭建和使用(java)

一.Apollo配置中心介绍 1、What is Apollo 1.1 Apollo简介 Apollo(阿波罗)是携程框架部门研发的开源配置管理中心,能够集中化管理应用不同环境、不同集群的配置,配置修改后能够实时推送到...

morpheusWB
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部