[Java]计算单机TPS、并对任务限流
[Java]计算单机TPS、并对任务限流
健康的程序员 发表于2年前
[Java]计算单机TPS、并对任务限流
  • 发表于 2年前
  • 阅读 55
  • 收藏 1
  • 点赞 0
  • 评论 0
摘要: 简单、实用计算TPS的方法。核心思想:毫秒数(UTC 1970 年 1 月 1 日午夜开始经过的毫秒数)除...

类结构

说明:

SpeedControlConstant 定义的一些与限流相关的常量
SpeedControlHelper 单机TPS限流类,提供当前任务TPS的计算功能
TaskDO 任务DO,定义了任务类型、任务的执行逻辑。实现了Runnable接口
TaskStateDO 任务状态DO,定义了任务的TPS、执行次数、执行时间等
TaskStateSingleton 统计所有任务状态的单例类
TpsTest 测试类


直接上代码

package com.taobao.tps;

/**
 * 限流常量类
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class SpeedControlConstant {
	// 单机tps设置50
	public static int serverTps = 50;
}
package com.taobao.tps;

/**
 * 单机TPS限流功能类
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class SpeedControlHelper {

	/**
	 * 判断是否被限流
	 * 
	 * @param taskType
	 * @return false限流
	 */
	public static boolean speedControl(String taskType) {
		int tpsThreshod = SpeedControlConstant.serverTps;// tps阀值
		int currentTps = currentTps(taskType);// 当前tps
		if (tpsThreshod <= currentTps) {
			return false;
		}
		return true;
	}

	/**
	 * 获取当前任务类型tps
	 * 
	 * @param taskType
	 * @return
	 */
	public static int currentTps(String taskType) {
		TaskStateSingleton taskStateSingleton = TaskStateSingleton.getInstance();
		TaskStateDO currentTask = taskStateSingleton.getTaskStateDO(taskType);
		if (currentTask == null) {
			// 这里有并发问题,但是统计tps不需要特别精准。添加了并发控制反而会影响性能
			currentTask = new TaskStateDO();
			taskStateSingleton.putTaskStateDO(taskType, currentTask);
		}
		return currentTask.calcuTps();
	}

	/**
	 * 设置当前任务类型执行时间
	 * 
	 * @param taskType
	 * @param time
	 */
	public static void setExecTime(String taskType, long time) {
		TaskStateSingleton taskStateSingleton = TaskStateSingleton.getInstance();
		TaskStateDO currentTask = taskStateSingleton.getTaskStateDO(taskType);
		if (currentTask == null) {
			// logger..
			return;
		}
		currentTask.state(time);
	}
}
package com.taobao.tps;

/**
 * 任务
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class TaskDO implements Runnable {
	private String taskType;

	public TaskDO(String taskType) {
		this.taskType = taskType;
	}

	public String getTaskType() {
		return taskType;
	}

	public void run() {
		try {
			long startTime = System.currentTimeMillis();
			Thread.sleep(15);
			long endTime = System.currentTimeMillis();
			SpeedControlHelper.setExecTime(taskType, endTime - startTime);
		} catch (InterruptedException e) {
			// logger..
		}
	}

}
package com.taobao.tps;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 任务状态
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class TaskStateDO {
	/**
	 * 任务类型
	 */
	private String taskType;

	/**
	 * 一个统计时间区间内的总执行次数
	 */
	private int execCount;

	/**
	 * 一个统计时间区间内的总执行耗时
	 */
	private long totalTime;

	/**
	 * 一个统计时间区间内的平均执行耗时
	 */
	private int averageTime;

	/**
	 * 当前1秒内执行的次数
	 */
	private AtomicInteger secondExecCount = new AtomicInteger(0);

	/**
	 * 当前秒
	 */
	private long currentSecond = 0;

	public TaskStateDO() {
		reset();
	}

	/**
	 * 获取taskType
	 * 
	 * @return taskType
	 */
	public String getTaskType() {
		return taskType;
	}

	/**
	 * 设置taskType
	 * 
	 * @param taskType 要设置的taskType
	 */
	public void setTaskType(String taskType) {
		this.taskType = taskType;
	}

	/**
	 * 获取totalTime
	 * 
	 * @return totalTime
	 */
	public long getTotalTime() {
		return totalTime;
	}

	/**
	 * 获取execCount
	 * 
	 * @return execCount
	 */
	public int getExecCount() {
		return execCount;
	}

	/**
	 * 获取averageTime
	 * 
	 * @return averageTime
	 */
	public int getAverageTime() {
		return averageTime;
	}

	/**
	 * 统计任务的秒级执行次数
	 * 
	 * @return
	 */
	public int calcuTps() {
		if (currentSecond == System.currentTimeMillis() / 1000) {
			return secondExecCount.incrementAndGet();// 1s的执行次数
		} else {
			currentSecond = System.currentTimeMillis() / 1000;
			secondExecCount.set(1);
			return 1;
		}
	}

	/**
	 * 统计任务执行时间
	 * 
	 * @param time
	 */
	public void state(long time) {
		try {
			execCount++;
			totalTime += time;
			averageTime = (int) (totalTime / execCount);
		} catch (Exception e) {
			reset();
		}
	}

	/**
	 * 重置方法,没有设置为零,防止除法抛异常
	 */
	public void reset() {
		execCount = 1;
		totalTime = 100;
	}

}
package com.taobao.tps;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 统计全量任务单例
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class TaskStateSingleton {
	private Map<String, TaskStateDO> taskStateMap = new HashMap<String, TaskStateDO>();

	private ReadWriteLock lock = new ReentrantReadWriteLock();

	private Lock read = lock.readLock();

	private Lock write = lock.writeLock();

	private TaskStateSingleton() {
	}

	// 获取实例
	public static TaskStateSingleton getInstance() {
		return SingletonHolder.taskStateSingleton;
	}

	private static class SingletonHolder {
		private static final TaskStateSingleton taskStateSingleton = new TaskStateSingleton();
	}

	public Map<String, TaskStateDO> getTaskStateMap() {
		try {
			read.lock();
			return taskStateMap;
		} finally {
			read.unlock();
		}
	}

	/**
	 * 新增一个任务统计信息
	 */
	public void putTaskStateDO(String taskType, TaskStateDO taskStateDO) {
		try {
			write.lock();
			taskStateMap.put(taskType, taskStateDO);
		} finally {
			write.unlock();
		}
	}

	/**
	 * 查询一个任务统计信息
	 */
	public TaskStateDO getTaskStateDO(String taskType) {
		try {
			read.lock();
			return taskStateMap.get(taskType);
		} finally {
			read.unlock();
		}
	}

}
package com.taobao.tps;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 测试类
 * 
 * @author <a href="mailto:zq_dong@sina.cn">zhangqi.dzq</a>
 * @version 1.0
 * @since 2015年12月29日
 */
public class TpsTest {
	public static ThreadPoolExecutor executer = new ThreadPoolExecutor(4, 4, 4, TimeUnit.SECONDS,
			new LinkedBlockingQueue<Runnable>(500));

	public static void main(String[] args) throws Exception {
		while (true) {
			TaskDO task = new TaskDO("calcu_item_value");
			if (!SpeedControlHelper.speedControl(task.getTaskType())) {
				print("被限流..");
				continue;
			}
			executer.submit(task);
			print("添加任务成功");
			Thread.sleep(10);
		}
	}

	public static void print(Object obj) {
		System.out.print(obj.toString());
	}
}


建议用服务器测试代码,工作PC测试可以比较卡。。



共有 人打赏支持
粉丝 7
博文 90
码字总数 29366
×
健康的程序员
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: