文档章节

一次 HashSet 所引起的并发问题

crossoverJie
 crossoverJie
发布于 11/08 07:56
字数 2113
阅读 1661
收藏 65

背景

上午刚到公司,准备开始一天的摸鱼之旅时突然收到了一封监控中心的邮件。

心中暗道不好,因为监控系统从来不会告诉我应用完美无 bug,其实系统挺猥琐。

打开邮件一看,果然告知我有一个应用的线程池队列达到阈值触发了报警。

由于这个应用出问题非常影响用户体验;于是立马让运维保留现场 dump 线程和内存同时重启应用,还好重启之后恢复正常。于是开始着手排查问题。

分析

首先了解下这个应用大概是做什么的。

简单来说就是从 MQ 中取出数据然后丢到后面的业务线程池中做具体的业务处理。

而报警的队列正好就是这个线程池的队列。

跟踪代码发现构建线程池的方式如下:

ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize,
              0L, TimeUnit.MILLISECONDS,
              new LinkedBlockingQueue<Runnable>());;
             put(poolName,executor);

采用的是默认的 LinkedBlockingQueue 并没有指定大小(这也是个坑),于是这个队列的默认大小为 Integer.MAX_VALUE

由于应用已经重启,只能从仅存的线程快照和内存快照进行分析。

内存分析

先利用 MAT 分析了内存,的到了如下报告。

其中有两个比较大的对象,一个就是之前线程池存放任务的 LinkedBlockingQueue,还有一个则是 HashSet

当然其中队列占用了大量的内存,所以优先查看,HashSet 一会儿再看。

由于队列的大小给的够大,所以结合目前的情况来看应当是线程池里的任务处理较慢,导致队列的任务越堆越多,至少这是目前可以得出的结论。

线程分析

再来看看线程的分析,这里利用 fastthread.io 这个网站进行线程分析。

因为从表现来看线程池里的任务迟迟没有执行完毕,所以主要看看它们在干嘛。

正好他们都处于 RUNNABLE 状态,同时堆栈如下:

发现正好就是在处理上文提到的 HashSet,看这个堆栈是在查询 key 是否存在。通过查看 312 行的业务代码确实也是如此。

这里的线程名字也是个坑,让我找了好久。

定位

分析了内存和线程的堆栈之后其实已经大概猜出一些问题了。

这里其实有一个前提忘记讲到:

这个告警是凌晨三点发出的邮件,但并没有电话提醒之类的,所以大家都不知道。

到了早上上班时才发现并立即 dump 了上面的证据。

所有有一个很重要的事实:这几个业务线程在查询 HashSet 的时候运行了 6 7 个小时都没有返回

通过之前的监控曲线图也可以看出:

操作系统在之前一直处于高负载中,直到我们早上看到报警重启之后才降低。

同时发现这个应用生产上运行的是 JDK1.7 ,所以我初步认为应该是在查询 key 的时候进入了 HashMap 的环形链表导致 CPU 高负载同时也进入了死循环。

为了验证这个问题再次 review 了代码。

整理之后的伪代码如下:

//线程池
private ExecutorService executor;

private Set<String> set = new hashSet();

private void execute(){
	
	while(true){
		//从 MQ 中获取数据
		String key = subMQ();
		executor.excute(new Worker(key)) ;
	}
}

public class Worker extends Thread{
	private String key ;

	public Worker(String key){
		this.key = key;
	}

	@Override
	private void run(){
		if(!set.contains(key)){

			//数据库查询
			if(queryDB(key)){
				set.add(key);
				return;
			}
		}

		//达到某种条件时清空 set
		if(flag){
			set = null ;
		}
	}	
}

大致的流程如下:

  • 源源不断的从 MQ 中获取数据。
  • 将数据丢到业务线程池中。
  • 判断数据是否已经写入了 Set
  • 没有则查询数据库。
  • 之后写入到 Set 中。

这里有一个很明显的问题,那就是作为共享资源的 Set 并没有做任何的同步处理

这里会有多个线程并发的操作,由于 HashSet 其实本质上就是 HashMap,所以它肯定是线程不安全的,所以会出现两个问题:

  • Set 中的数据在并发写入时被覆盖导致数据不准确。
  • 会在扩容的时候形成环形链表

第一个问题相对于第二个还能接受。

通过上文的内存分析我们已经知道这个 set 中的数据已经不少了。同时由于初始化时并没有指定大小,仅仅只是默认值,所以在大量的并发写入时候会导致频繁的扩容,而在 1.7 的条件下又可能会形成环形链表

不巧的是代码中也有查询操作(contains()),观察上文的堆栈情况:

发现是运行在 HashMap 的 465 行,来看看 1.7 中那里具体在做什么:

已经很明显了。这里在遍历链表,同时由于形成了环形链表导致这个 e.next 永远不为空,所以这个循环也不会退出了。

到这里其实已经找到问题了,但还有一个疑问是为什么线程池里的任务队列会越堆越多。我第一直觉是任务执行太慢导致的。

仔细查看了代码发现只有一个地方可能会慢:也就是有一个数据库的查询

把这个 SQL 拿到生产环境执行发现确实不快,查看索引发现都有命中。

但我一看表中的数据发现已经快有 7000W 的数据了。同时经过运维得知 MySQL 那台服务器的 IO 压力也比较大。

所以这个原因也比较明显了:

由于每消费一条数据都要去查询一次数据库,MySQL 本身压力就比较大,加上数据量也很高所以导致这个 IO 响应较慢,导致整个任务处理的就比较慢了。

但还有一个原因也不能忽视;由于所有的业务线程在某个时间点都进入了死循环,根本没有执行完任务的机会,而后面的数据还在源源不断的进入,所以这个队列只会越堆越多!

这其实是一个老应用了,可能会有人问为什么之前没出现问题。

这是因为之前数据量都比较少,即使是并发写入也没有出现并发扩容形成环形链表的情况。这段时间业务量的暴增正好把这个隐藏的雷给揪出来了。所以还是得信墨菲他老人家的话。

总结

至此整个排查结束,而我们后续的调整措施大概如下:

  • HashSet 不是线程安全的,换为 ConcurrentHashMap同时把 value 写死一样可以达到 set 的效果。
  • 根据我们后面的监控,初始化 ConcurrentHashMap 的大小尽量大一些,避免频繁的扩容。
  • MySQL 中很多数据都已经不用了,进行冷热处理。尽量降低单表数据量。同时后期考虑分表。
  • 查数据那里调整为查缓存,提高查询效率。
  • 线程池的名称一定得取的有意义,不然是自己给自己增加难度。
  • 根据监控将线程池的队列大小调整为一个具体值,并且要有拒绝策略。
  • 升级到 JDK1.8
  • 再一个是报警邮件酌情考虑为电话通知。

HashMap 的死循环问题在网上层出不穷,没想到还真被我遇到了。现在要满足这个条件还是挺少见的,比如 1.8 以下的 JDK 这一条可能大多数人就碰不到,正好又证实了一次墨菲定律。

同时我会将文章更到这里,方便大家阅读和查询。 https://crossoverjie.top/JCSprout/

你的点赞与分享是对我最大的支持

© 著作权归作者所有

共有 人打赏支持
crossoverJie
粉丝 535
博文 71
码字总数 132407
作品 0
江北
后端工程师
私信 提问
加载中

评论(18)

crossoverJie
crossoverJie

引用来自“就像风”的评论

如果是我的话会在 ConsumerThread 中做添加 set 操作,而不是在 WorkerThread 线程中做。ConsumerThread 是单线程,不存在并发问题。其次这里添加 set 的操作无非是避免重复无意义的消费相同的数据。
在我们的业务场景下,不在消费线程中处理就没意义了。
就像风
就像风
如果是我的话会在 ConsumerThread 中做添加 set 操作,而不是在 WorkerThread 线程中做。ConsumerThread 是单线程,不存在并发问题。其次这里添加 set 的操作无非是避免重复无意义的消费相同的数据。
crossoverJie
crossoverJie

引用来自“onlyfish”的评论

优化建议再加一个逻辑,首先线程池队列肯定要有界,这个你也说到了,可以再从MQ消费消息前先判断队列是否满了,如果满了就等待。这样可以把数据缓存在MQ中,也不会用到拒绝的策略,其实拒绝策略你也不能很好的处理。


//线程池
private static int MAX_QUEUE_SIZE = 10000;

private static ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 10, TimeUnit.MINUTES,
    new LinkedBlockingQueue<Runnable>(MAX_QUEUE_SIZE), new BasicThreadFactory.Builder()
.namingPattern("consumer-pool").build(), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
}
}
});

private void execute(){
  
  while(true){
if (executor.getQueue().size() > MAX_QUEUE_SIZE - 100) {
  try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        LOG.warn(Exceptions.getStackTrace(e));
      }
continue;
}

    //从 MQ 中获取数据
    String key = subMQ();
    executor.excute(new Worker(key)) ;
  }
}
队列满了阻塞不是队列自己的职责嘛?

还有一个是:我这里只是写的伪代码,真实场景是多个消费实例共用的一个线程池,这样每次取出来判断线程也不安全。
onlyfish
onlyfish
优化建议再加一个逻辑,首先线程池队列肯定要有界,这个你也说到了,可以再从MQ消费消息前先判断队列是否满了,如果满了就等待。这样可以把数据缓存在MQ中,也不会用到拒绝的策略,其实拒绝策略你也不能很好的处理。


//线程池
private static int MAX_QUEUE_SIZE = 10000;

private static ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 10, TimeUnit.MINUTES,
    new LinkedBlockingQueue<Runnable>(MAX_QUEUE_SIZE), new BasicThreadFactory.Builder()
.namingPattern("consumer-pool").build(), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
}
}
});

private void execute(){
  
  while(true){
if (executor.getQueue().size() > MAX_QUEUE_SIZE - 100) {
  try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        LOG.warn(Exceptions.getStackTrace(e));
      }
continue;
}

    //从 MQ 中获取数据
    String key = subMQ();
    executor.excute(new Worker(key)) ;
  }
}
crossoverJie
crossoverJie

引用来自“吕兵阳”的评论

我觉得你们这样设计好像有问题,本身我们应该将更多的数据缓冲到kafka而不应该缓冲在jvm的任务队列
没有问题啊,这是标准的从 kafka 取数据然后内部线程池消费的场景。

任何你用了线程池都会有这种情况,除非你线程池的队列只有一个长度。

那都会存在重启之后内存数据丢失,所以如果要求高的话需要写上 hook 函数,把数据持久化起来,重启之后还能继续读取之前丢失的数据。 但 kill -9 谁也没招。
吕兵阳
吕兵阳
我觉得你们这样设计好像有问题,本身我们应该将更多的数据缓冲到kafka而不应该缓冲在jvm的任务队列
crossoverJie
crossoverJie

引用来自“吕兵阳”的评论

这么随意就重启,你jvm队列里面没消跑完的任务不是丢了吗?
线程都跑进死循环了 不重启也没招了 再说数据是从 Kakfa 里取出来的,大不了重新消费一次。
窗外有个蓝蓝天
窗外有个蓝蓝天
。。。。。。多线程资源共享还敢不用线程安全的类,平时用hashmap做内部变量缓存,我都瑟瑟发抖,把写这行代码的人拖出去直接打死就对了:innocent:
吕兵阳
吕兵阳
这么随意就重启,你jvm队列里面没消跑完的任务不是丢了吗?
crossoverJie
crossoverJie

引用来自“LinkerLin”的评论

直接用Redis啊
Redis 当然可以,但也要分场景。我这里就内存 HashSet 就满足了,而且存内存操作还要快些。
并发编程(二):非线程安全集合类

前言 线程不安全的集合类 ArrayList: 结果一: 结果二: 抛出异常:ArrayIndexOutofBoundsException异常; 现象:出现null值; 出现输出不全的现象; 抛出异常; 原因: ArrayList中的add方...

mengdonghui123456
2017/08/14
0
0
多线程性能问题

如何优化性能: 如果重复计算量大的话,使用缓存来保存旧的结果,以便下次计算时使用; 减少阻塞,运行和阻塞会增加上下文切换。 因为锁是串行的这会引起大量的阻塞:所以我们在使用锁的时候要...

恰同学少年
2016/06/18
109
0
验证数独 Valid Sudoku

问题:Determine if a Sudoku is valid, according to: Here are three rules: Each row must have the numbers 1-9 occuring just once. Each column must have the numbers 1-9 occuring ......

叶枫啦啦
2017/09/04
0
0
Java实战equals()与hashCode()

一.equals()方法详解 equals()方法在object类中定义如下:   代码 public boolean equals(Object obj) { return (this == obj); }   很明显是对两个对象的地址值进行的比较(即比较引用是...

凯哥学堂
2016/12/01
12
0
concurrency - Avoid excessive synchronization

avoid excessive synchronization 避免过度使用synchronization Here is an example,which implements an observable set wrapper.It allow clients tosubscribe to notifications when elem......

why_Dk37
2016/12/11
2
0

没有更多内容

加载失败,请刷新页面

加载更多

springboot中filter的用法

一、在spring的应用中我们存在两种过滤的用法,一种是拦截器、另外一种当然是过滤器。我们这里介绍过滤器在springboot的用法,在springmvc中的用法基本上一样,只是配置上面有点区别。 二、f...

xiaomin0322
2分钟前
0
0
java项目修改了更换了jdk版本报错进行修改

java项目原来用的是1.8版本的,改成1.7版本后,项目会报错,要进行的修改是 然后是clean一下项目,然后是选中项目的buildpath,然后是configurebuildpath,然后是看jdk是否进行修改...

myAll_myAll
14分钟前
0
0
Gartner 2018 数据库系列报告发布 巨杉数据库连续两年入选

近期,Gartner陆续发布了2018年的数据库系列报告,包括《数据库魔力象限》《数据库核心能力》以及《数据库推荐报告》。其中,SequoiaDB巨杉数据库作为业界领先的金融级分布式交易型数据库产品...

巨杉数据库
16分钟前
0
0
Navicat闲置一段时间卡死问题的解决

先关闭连接,再右键点击所需要设置的链接,进入编辑连接,进入高级项,勾选保持连续间隔(秒):时间设置短一些,比如30秒,完成!!

joyStalker
17分钟前
0
0
理解Java中的弱引用(Weak Reference)

1. What——什么是弱引用? Java中的弱引用具体指的是java.lang.ref.WeakReference<T>类,我们首先来看一下官方文档对它做的说明: 弱引用对象的存在不会阻止它所指向的对象变被垃圾回收器回...

绝地逢生
17分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部