文档章节

[Java]计算单机TPS、并对任务限流

健康的程序员
 健康的程序员
发布于 2015/12/29 19:50
字数 935
阅读 78
收藏 1
点赞 0
评论 0

类结构

说明:

SpeedControlConstant 定义的一些与限流相关的常量
SpeedControlHelper 单机TPS限流类,提供当前任务TPS的计算功能
TaskDO 任务DO,定义了任务类型、任务的执行逻辑。实现了Runnable接口
TaskStateDO 任务状态DO,定义了任务的TPS、执行次数、执行时间等
TaskStateSingleton 统计所有任务状态的单例类
TpsTest 测试类


直接上代码

package com.taobao.tps;

/**
 * 限流常量类
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class SpeedControlConstant {
	// 单机tps设置50
	public static int serverTps = 50;
}
package com.taobao.tps;

/**
 * 单机TPS限流功能类
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class SpeedControlHelper {

	/**
	 * 判断是否被限流
	 * 
	 * @param taskType
	 * @return false限流
	 */
	public static boolean speedControl(String taskType) {
		int tpsThreshod = SpeedControlConstant.serverTps;// tps阀值
		int currentTps = currentTps(taskType);// 当前tps
		if (tpsThreshod <= currentTps) {
			return false;
		}
		return true;
	}

	/**
	 * 获取当前任务类型tps
	 * 
	 * @param taskType
	 * @return
	 */
	public static int currentTps(String taskType) {
		TaskStateSingleton taskStateSingleton = TaskStateSingleton.getInstance();
		TaskStateDO currentTask = taskStateSingleton.getTaskStateDO(taskType);
		if (currentTask == null) {
			// 这里有并发问题,但是统计tps不需要特别精准。添加了并发控制反而会影响性能
			currentTask = new TaskStateDO();
			taskStateSingleton.putTaskStateDO(taskType, currentTask);
		}
		return currentTask.calcuTps();
	}

	/**
	 * 设置当前任务类型执行时间
	 * 
	 * @param taskType
	 * @param time
	 */
	public static void setExecTime(String taskType, long time) {
		TaskStateSingleton taskStateSingleton = TaskStateSingleton.getInstance();
		TaskStateDO currentTask = taskStateSingleton.getTaskStateDO(taskType);
		if (currentTask == null) {
			// logger..
			return;
		}
		currentTask.state(time);
	}
}
package com.taobao.tps;

/**
 * 任务
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class TaskDO implements Runnable {
	private String taskType;

	public TaskDO(String taskType) {
		this.taskType = taskType;
	}

	public String getTaskType() {
		return taskType;
	}

	public void run() {
		try {
			long startTime = System.currentTimeMillis();
			Thread.sleep(15);
			long endTime = System.currentTimeMillis();
			SpeedControlHelper.setExecTime(taskType, endTime - startTime);
		} catch (InterruptedException e) {
			// logger..
		}
	}

}
package com.taobao.tps;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 任务状态
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class TaskStateDO {
	/**
	 * 任务类型
	 */
	private String taskType;

	/**
	 * 一个统计时间区间内的总执行次数
	 */
	private int execCount;

	/**
	 * 一个统计时间区间内的总执行耗时
	 */
	private long totalTime;

	/**
	 * 一个统计时间区间内的平均执行耗时
	 */
	private int averageTime;

	/**
	 * 当前1秒内执行的次数
	 */
	private AtomicInteger secondExecCount = new AtomicInteger(0);

	/**
	 * 当前秒
	 */
	private long currentSecond = 0;

	public TaskStateDO() {
		reset();
	}

	/**
	 * 获取taskType
	 * 
	 * @return taskType
	 */
	public String getTaskType() {
		return taskType;
	}

	/**
	 * 设置taskType
	 * 
	 * @param taskType 要设置的taskType
	 */
	public void setTaskType(String taskType) {
		this.taskType = taskType;
	}

	/**
	 * 获取totalTime
	 * 
	 * @return totalTime
	 */
	public long getTotalTime() {
		return totalTime;
	}

	/**
	 * 获取execCount
	 * 
	 * @return execCount
	 */
	public int getExecCount() {
		return execCount;
	}

	/**
	 * 获取averageTime
	 * 
	 * @return averageTime
	 */
	public int getAverageTime() {
		return averageTime;
	}

	/**
	 * 统计任务的秒级执行次数
	 * 
	 * @return
	 */
	public int calcuTps() {
		if (currentSecond == System.currentTimeMillis() / 1000) {
			return secondExecCount.incrementAndGet();// 1s的执行次数
		} else {
			currentSecond = System.currentTimeMillis() / 1000;
			secondExecCount.set(1);
			return 1;
		}
	}

	/**
	 * 统计任务执行时间
	 * 
	 * @param time
	 */
	public void state(long time) {
		try {
			execCount++;
			totalTime += time;
			averageTime = (int) (totalTime / execCount);
		} catch (Exception e) {
			reset();
		}
	}

	/**
	 * 重置方法,没有设置为零,防止除法抛异常
	 */
	public void reset() {
		execCount = 1;
		totalTime = 100;
	}

}
package com.taobao.tps;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 统计全量任务单例
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class TaskStateSingleton {
	private Map<String, TaskStateDO> taskStateMap = new HashMap<String, TaskStateDO>();

	private ReadWriteLock lock = new ReentrantReadWriteLock();

	private Lock read = lock.readLock();

	private Lock write = lock.writeLock();

	private TaskStateSingleton() {
	}

	// 获取实例
	public static TaskStateSingleton getInstance() {
		return SingletonHolder.taskStateSingleton;
	}

	private static class SingletonHolder {
		private static final TaskStateSingleton taskStateSingleton = new TaskStateSingleton();
	}

	public Map<String, TaskStateDO> getTaskStateMap() {
		try {
			read.lock();
			return taskStateMap;
		} finally {
			read.unlock();
		}
	}

	/**
	 * 新增一个任务统计信息
	 */
	public void putTaskStateDO(String taskType, TaskStateDO taskStateDO) {
		try {
			write.lock();
			taskStateMap.put(taskType, taskStateDO);
		} finally {
			write.unlock();
		}
	}

	/**
	 * 查询一个任务统计信息
	 */
	public TaskStateDO getTaskStateDO(String taskType) {
		try {
			read.lock();
			return taskStateMap.get(taskType);
		} finally {
			read.unlock();
		}
	}

}
package com.taobao.tps;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 测试类
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class TpsTest {
	public static ThreadPoolExecutor executer = new ThreadPoolExecutor(4, 4, 4, TimeUnit.SECONDS,
			new LinkedBlockingQueue<Runnable>(500));

	public static void main(String[] args) throws Exception {
		while (true) {
			TaskDO task = new TaskDO("calcu_item_value");
			if (!SpeedControlHelper.speedControl(task.getTaskType())) {
				print("被限流..");
				continue;
			}
			executer.submit(task);
			print("添加任务成功");
			Thread.sleep(10);
		}
	}

	public static void print(Object obj) {
		System.out.print(obj.toString());
	}
}


建议用服务器测试代码,工作PC测试可以比较卡。。



© 著作权归作者所有

共有 人打赏支持
健康的程序员
粉丝 6
博文 153
码字总数 35551
作品 0
杭州
程序员
JavaAgent-SandBox

1.前言 之前初步学习了javaAgent,并做了一份总结《JavaAgent学习笔记》。然后在看到《JVM-Sandbox 基于JVM的非侵入式运行期AOP解决方案》之后,接触到了集团的sandBox。并尝试使用这种有真正...

何度
05/09
0
0
RocketMQ与Kafka对比

RocketMQ与Kafka对比(18项差异) 淘宝内部的交易系统使用了淘宝自主研发的Notify消息中间件,使用Mysql作为消息存储媒介,可完全水平扩容,为了进一步降低成本,我们认为存储部分可以进一步...

莫问viva
2015/05/08
0
0
Java:大数据技术领域的一匹黑马

诞生于1991年的Java如今已经成为世界范围内应用最为广泛的编程语言之一。在今天的文章中,我们将共同了解Java所拥有的七大关键新特性,展望其如何在未来的超级计算、大数据以及物联网等领域继...

Java大数据处理
04/22
0
0
centos7 yum安装java运行环境,初识hadoop

安装java运行环境 1.实验机相关信息: [root@node2 ~]# cat /etc/redhat-release CentOS Linux release 7.2.1511 (Core) [root@node2 ~]# uname -r 3.10.0-327.el7.x86_6 2.配置epel源,以y......

smile68
04/21
0
0
1.4.1 下载和安装java 7的jdk

JDK的全称是 java SE Development Kit,即 java标准版开发包,是Sun提供的一套用于开发java 应用程序的开发包,它提供了编译、运行 java程序所需的各种工具和资源,包括java编译器,java运行...

Gooiem
2015/08/18
0
0
Hadoop2安装——单机模式

Hadoop有三种模式 单机模式、伪分布模式和完全分布模式 这里先简单介绍单机模式 ,默认情况下,Hadoop被配置成一个非分布式模式,独立运行JAVA进程,适合开始做调试工作。 Hadoop 网址http:...

tngou
2012/12/01
0
12
记几次JAVA系统故障问题定位过程

把自己以前碰到的case汇总列下,作为对自己过去的一部分工作总结。 问题定位一般步骤 具备常见的理论知识,不一定要全记住细节。但是需要知道问题的关联性,然后根据某些关键字搜索或者查阅资...

geecoodeer
2014/01/27
0
5
Java 8时间和日期API 20例

伴随lambda表达式、streams以及一系列小优化,Java 8 推出了全新的日期时间API,在教程中我们将通过一些简单的实例来学习如何使用新API。Java处理日期、日历和时间的方式一直为社区所诟病,将...

黄梦巍
2015/06/19
0
0
Java 并发实践 — ConcurrentHashMap 与 CAS

最近在做接口限流时涉及到了一个有意思问题,牵扯出了关于concurrentHashMap的一些用法,以及CAS的一些概念。限流算法很多,我主要就以最简单的计数器法来做引。先抽象化一下需求:统计每个接...

大数据之路
2012/10/07
0
0
【Java并发性和多线程】Java并发性和多线程介绍

本文为转载学习 原文链接:http://tutorials.jenkov.com/java-concurrency/index.html 译文链接:http://ifeve.com/java-concurrency-thread/ 在过去单CPU时代,单任务在一个时间点只能执行单...

heroShane
2014/01/28
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

gRPC学习笔记

gRPC编程流程 1. proto文件定义 proto文件用于定义需要通过gRPC生成的接口,可以理解为接口定义文档 2. 通过构建工具生成服务基类代码-Maven或Gradle 3. 服务端开发 服务端实现类须实现通过构...

OSC_fly
16分钟前
0
0
Docker Mac (三) Dockerfile 及命令

Dockerfile 最近学习docker的时候,遇到一件怪事,关于docker镜像可能会被破坏,还不知道它会有此措施 所以需要了解构建Dockerfile的正确方法 Dockerfile是由一系列命令和参数构成的脚本,这些命...

___大侠
43分钟前
0
0
NetCat Tutorials

Hacking with Netcat part 1: The Basics Hacking with Netcat part 2: Bind and reverse shells Hacking with Netcat part 3: Advanced Techniques 10 Introduction to Netcat - pdf NetCat......

zungyiu
43分钟前
0
0
Android Studio+NDK+Cmake 移植FFmpeg-4.0.2命令行工具

一、编译 参考大神的帖子,亲测一次编译成功:https://blog.csdn.net/bobcat_kay/article/details/80889398 鉴于以前查文档的经验,这里附上编写例子的时间:2018年7月22日 我用的是ubantu,...

她叫我小渝
44分钟前
0
0
mysql创建数据库

登录MYSQL mysql -u root -p 脚本创建数据库WeChat,并制定默认的字符集是utf8mb4。 CREATE DATABASE Wechat DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_general_ci; 授权 grant all......

niithub
58分钟前
0
0
svn: Unable to connect to a repository URL 的解决方案

错误图示: 解决办法:清除本地保存的授权信息; 1:右键点击本地文件夹,选择设置; TortoiseSVN -> Settings 2:在弹出的对话框中选择 Saved Data, 右侧选择:授权地方清理所有。 然后点确...

宁哥实战课堂
今天
1
0
sleep与wait的区别

Thread.sleep(XXX)方法消耗CPU吗? 这个知识点是我之前认识一直有错误的一个知识点,在我以前的认识里面,我一直认为Thread.sleep(1000)的这一秒钟的时间内,线程的休眠是一直占用着CPU的时间...

码代码的小司机
今天
1
0
20位活跃在Github上的国内技术大牛 leij 何小鹏 亚信

本文列举了20位在Github上非常活跃的国内大牛,看看其中是不是很多熟悉的面孔? 1. lifesinger(玉伯) Github主页: https://github.com/lifesinger 微博:@ 玉伯也叫射雕 玉伯(王保平),...

海博1600
今天
1
0
Mybatis收集配置

一、Mybatis取Clob数据 1、Mapper.xml配置 <resultMap type="com.test.User" id="user"> <result column="id" property="id"/> <result column="json_data" property="jsonData" ......

星痕2018
今天
1
0
centos7设置以多用户模式启动

1、旧版本linux系统修改inittab文件,在新版本执行vi /etc/inittab 会有以下提示 # inittab is no longer used when using systemd. # # ADDING CONFIGURATION HERE WILL HAVE NO EFFECT ON......

haha360
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部