偶遇 神秘莫动
偶遇 神秘莫动
奔向蔚蓝的深坑 发表于5个月前
偶遇 神秘莫动
  • 发表于 5个月前
  • 阅读 28
  • 收藏 0
  • 点赞 0
  • 评论 0

标题:腾讯云 新注册用户域名抢购1元起>>>   

  Review数据调度平台代码的时候,发现了一个“神秘莫动”的小坑。在此就作为博客素材分享吧。请看图:


  说实话看到这里,心中不由得有些“烦躁”,以这种心态“玩”,还能愉快的玩耍不?但是在随后的几分钟内变峰回路转了。随着更加深入的去看其实现,发现此coding作者:注释全面、逻辑清晰、集群协作、横向扩展 编码风格很好。抽出简要时序图如下:

部分代码片段:

public class TranFullService {
	private static final Logger logger = LoggerFactory.getLogger(TranFullService.class);
	private static final int batchPageSize = 100000;
	private ExecutorService threadPool = Executors.newFixedThreadPool(10);
	@Autowired
	private TranDao trandao;
	@Autowired
	private RabbitMQMessageProducer rcsRePhProducer;
	/**
	 * 开始执行数据同步
	 * @param dateBatch 日期任务
	 * ********************************************
	 * 使用应当尽量规避数据集增量密集时间段;               *
	 * 如在数据增量密集时间段使用,会出现非预期结果。*
	 * ********************************************
	 */
	public void beginImbue(List<Map<String, Date>> dateBatch){
		if(threadPool.isShutdown()){
			logger.info("线程池已关闭;创建fixedpool10。");
			threadPool = Executors.newFixedThreadPool(10);
		}
		Lock lock = new ReentrantLock(true);
		//load counter
		CounterForTran counter = CounterForTran.COUNTER;
		for (Map<String, Date> map : dateBatch) {
			Date startDate=map.get("startDate"),endDate=map.get("endDate");
			int rows = trandao.getTranCount(startDate,endDate);
			if(counter.load(rows)){
				//nextPage
				for (int page = counter.nextPage(lock); page != -1; page = counter.nextPage(lock)){
					final int threadPage = page;
					threadPool.execute(() -> {
						System.out.println(Thread.currentThread().getId()+ " 骑士已开启征程。");
						List<TRcsTran> trans = trandao.getTran(startDate,endDate, threadPage, batchPageSize);
						trans.forEach((TRcsTran tran) -> {
							if (tranDoFilter(tran)){
								counter.mqCount.getAndIncrement();
								rcsRePhProducer.sendMessage(JsonUtils.toJson(tran),RcsMQDestination.TRAN_RCS_BH11);
							}
						});
					});
				}
			}else{
				try {
					counter.latch.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
		threadPool.shutdown();
		printReport();
	}
	/**
	 * 反馈同步报告
	 */
	public void printReport(){
		
	}
	
	/**
	 * 根据TranCd滤掉不需要数据
	 * why?为什么不在sql中过滤?
	 * 因为为了优化查询效率,数据表数据量巨大减少不必要的查询条件。
	 * @param tranCd
	 * @return
	 */
	private boolean tranDoFilter(TRcsTran tran){
		
	}
	/**
	 * 大批量数据,分批计数
	 */
	enum CounterForTran{
		COUNTER();
		private AtomicInteger pages;//任务批次总数
		private AtomicInteger currentPage;//当前批次
		private AtomicInteger rubbishCount = new AtomicInteger();//排除数量
		private AtomicInteger mqCount = new AtomicInteger();//发送数量
		CountDownLatch latch = new CountDownLatch(0);
		public final void printReport(){
			logger.info("-----------------------数-据-同-步-报告---------------------------");
			logger.info("--------送达数据:{}",mqCount);
			logger.info("--------过滤无效数据:{}",rubbishCount);
			logger.info("-----------------------数-据-同-步-完毕---------------------------");
			rubbishCount = new AtomicInteger();
			mqCount = new AtomicInteger();
		}
		/**
		 * pageCount==-1:没有后续的页码;pageCount>0具体查询页码
		 * @return 返回要查询的批次
		 */
		public int nextPage(Lock lock){
			lock.lock();
			if(currentPage.get()<pages.get()){
				latch.countDown();
				try{
					currentPage.getAndIncrement();
					return currentPage.get();
				}finally{
					lock.unlock();
				}
			}else{
				try{
					return -1;
				}finally{
					lock.unlock();
				}
			}
		}
		/**
		 * 初始化当前“任务计数器”实例
		 * @param rows 同步数据总行数
		 * @return true初始化成功; false当前“任务计数器”有任务未完成,不能重新load;
		 */
		public boolean load(int rows) {
			if(rows<=0){
				return false;
			}
			//上一次load还存在没执行完的任务
			if(latch!=null&&latch.getCount()>0){
				return false;
			}
			if(rows<=batchPageSize){
				pages = new AtomicInteger(1);
			}else{
				pages = new AtomicInteger((rows / batchPageSize));
				if (rows > batchPageSize * pages.get()) {
					pages.getAndIncrement();
				}
			}
			this.currentPage = new AtomicInteger(-1);
			latch=new CountDownLatch(pages.get());
			logger.info("----当前日期rows总量:{},批次量:{}", rows, pages);
			return true;
		}
	}
	
}


  可见其实现过程中,在实现逻辑,性能扩展方面,考虑十分充分。结果也正是如此10kw/h/n;至此我的好奇心已经升至顶点。一个经过用心设计的系统,究竟被什么遮住了双眼。决心找出那片“叶子”(一叶障目);反复推敲两遍发现了不合理的地方。

  实测执行sql返回10W行结果集的时间是 35-55s;这里选择了查询时效最大化,来提升系统性能。
也许我发现了那片叶子。 ACID 中的一员 C一致性: 事务开始和结束之间的中间状态不会被其他看到 ;

一致读:从Select执行那一刹那开始,到45s后执行结束。期间数据发生任何变化,对于此次查询数据结果都是无感知的。在关系型数据库保住了“一致性”节操的情况下,我们的预期同步结果就发生了差异。

结论:如细微的差距可以忽略不计的情况下,这种以质量换效率的做法也未尝不可。在要求数据相对精确,且不得关闭其他增量服务的情况下,我们可以缩小batchPageSize来提高查询效率,并减小查询消耗时间。从而提高数据精度。要求一定精确的话,就得换个实现思路了。添加弥补变动的措施进去。来个阻塞队列,将同步服务启动后的变更数据,发送到队列中。待同步结束后,消费掉变更数据。

标签: ACID、一致性
共有 人打赏支持
粉丝 0
博文 2
码字总数 2232
×
奔向蔚蓝的深坑
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: