问题背景
最近的某个业务中,遇到一个问题,一个用户动作,会产生A和B两个行为,分别通过对应的esb消息总线发出。
我们的业务线对AB两条esb消息队列进行监听(两个进程),做数据的同步更新操作。
在正常过程中,A行为比B行为先产生,并且A行为优先级高于B行为,数据最终会根据A行为做更新。
但是在实际应用中,出现了并发问题,数据最终根据B行为做了更新,覆盖了A行为。
最开始通过redis缓存进行上锁,在收到A消息时,在redis中添加一个key,处理完毕后删除key 。处理过程中收到B消息,直接返回。
但测试的时候发现并不可用,可能先收到B消息,后收到A消息, 但是先更新A数据,再更新B数据,还是进行了覆盖。
还有一种方法是修改底层代码,通过自定义sql的方法,先比较再update 。
问题分析
除此之外,还在考虑是否还有别的办法,问题的产生原因就是A和B的消息队列基本都在同一时间点拿到数据,对程序来说造成了并发操作。
如果我们可以把B的消息队列的都延迟一个时间点,保证两个消息队列不在同一时间点获得数据,基本上就可以解决这个问题。
于是就上网开始搜索,查到了延迟队列DelayQueue。
虽然我们不能让公司的消息队列延迟发送,但是我们可以延迟处理。当收到消息时先不处理,放入延迟消息队列中,另外一个线程再从延迟队列中获得数据进行处理。
类介绍
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E>
DelayQueue 是 Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size 方法同时返回到期和未到期元素的计数。此队列不允许使用 null 元素。
放入DelayQueue的对象需要实现Delayed接口。
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
测试demo
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @author lujianing01@58.com
* @Description:
* @date 2016/6/21
*/
public class DelayQueueTest {
public static void main(String[] args) {
DelayQueue<DelayedElement> delayQueue = new DelayQueue<DelayedElement>();
//生产者
producer(delayQueue);
//消费者
consumer(delayQueue);
while (true){
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 每100毫秒创建一个对象,放入延迟队列,延迟时间1毫秒
* @param delayQueue
*/
private static void producer(final DelayQueue<DelayedElement> delayQueue){
new Thread(new Runnable() {
@Override
public void run() {
while (true){
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
DelayedElement element = new DelayedElement(1000,"test");
delayQueue.offer(element);
}
}
}).start();
/**
* 每秒打印延迟队列中的对象个数
*/
new Thread(new Runnable() {
@Override
public void run() {
while (true){
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("delayQueue size:"+delayQueue.size());
}
}
}).start();
}
/**
* 消费者,从延迟队列中获得数据,进行处理
* @param delayQueue
*/
private static void consumer(final DelayQueue<DelayedElement> delayQueue){
new Thread(new Runnable() {
@Override
public void run() {
while (true){
DelayedElement element = null;
try {
element = delayQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis()+"---"+element);
}
}
}).start();
}
}
class DelayedElement implements Delayed {
private final long delay; //延迟时间
private final long expire; //到期时间
private final String msg; //数据
private final long now; //创建时间
public DelayedElement(long delay, String msg) {
this.delay = delay;
this.msg = msg;
expire = System.currentTimeMillis() + delay; //到期时间 = 当前时间+延迟时间
now = System.currentTimeMillis();
}
/**
* 需要实现的接口,获得延迟时间 用过期时间-当前时间
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
}
/**
* 用于延迟队列内部比较排序 当前时间的延迟时间 - 比较对象的延迟时间
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("DelayedElement{");
sb.append("delay=").append(delay);
sb.append(", expire=").append(expire);
sb.append(", msg='").append(msg).append('\'');
sb.append(", now=").append(now);
sb.append('}');
return sb.toString();
}
}
补充说明
1.参考网上一些的例子,有些 compareTo 方法就是错的, 要么造成队列中数据积压,要么不能起到延迟的效果。所以一定要经过自己的用例测试确保没有问题。
2.楼主的使用场景,需要考虑,如果进程关闭时,要先等本地延迟队列中的数据被处理完后,再结束进程。
引用来自“张皓不是张浩”的评论
上面的Demo有个bug。实现Delayed接口的两个方法中你选取的时间单位都是毫秒,这是不准确的,DelayQueue的take方法实现里使用的await方法是以毫秒为时间单位阻塞的,使用毫秒计算会频繁的await操作造成性能损失。引用来自“蛙牛”的评论
看了下DelayQueue的take方法 long delay = first.getDelay(TimeUnit.NANOSECONDS);也就是最终它是要把你的getDalay计算出的时间转成NANOSECONDS
所以用的MILLISECONDS 实现getDelay 和 compareTo 应该没有问题 你所指的性能损失具体指的那部分
希望探讨一下~
引用来自“张皓不是张浩”的评论
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
上面是JDK8里DelayQueue.take()方法的实现,里面有一句available.awaitNanos(delay);这个方法是根据纳秒为单位等待的。总的等待时间是一定的,因为单位造成每次等待时间变小,那么就会导致循环次数变多。类似的问题你可以参考:http://www.blogjava.net/killme2008/archive/2010/10/22/335897.html
文章里面提出的bug是 getDelay方法 没有用 传入的unit做时间转换
public long getDelay(TimeUnit unit) {
return nextReconnectTimestamp - System.currentTimeMillis();
}
提出的解决方案 就是我写的getDelay的方法啊
return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
available.awaitNanos(delay); 的等待逻辑和 实现的毫秒也没关系啊~
引用来自“张皓不是张浩”的评论
上面的Demo有个bug。实现Delayed接口的两个方法中你选取的时间单位都是毫秒,这是不准确的,DelayQueue的take方法实现里使用的await方法是以毫秒为时间单位阻塞的,使用毫秒计算会频繁的await操作造成性能损失。引用来自“蛙牛”的评论
看了下DelayQueue的take方法 long delay = first.getDelay(TimeUnit.NANOSECONDS);也就是最终它是要把你的getDalay计算出的时间转成NANOSECONDS
所以用的MILLISECONDS 实现getDelay 和 compareTo 应该没有问题 你所指的性能损失具体指的那部分
希望探讨一下~
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
上面是JDK8里DelayQueue.take()方法的实现,里面有一句available.awaitNanos(delay);这个方法是根据纳秒为单位等待的。总的等待时间是一定的,因为单位造成每次等待时间变小,那么就会导致循环次数变多。类似的问题你可以参考:http://www.blogjava.net/killme2008/archive/2010/10/22/335897.html
引用来自“张皓不是张浩”的评论
上面的Demo有个bug。实现Delayed接口的两个方法中你选取的时间单位都是毫秒,这是不准确的,DelayQueue的take方法实现里使用的await方法是以毫秒为时间单位阻塞的,使用毫秒计算会频繁的await操作造成性能损失。也就是最终它是要把你的getDalay计算出的时间转成NANOSECONDS
所以用的MILLISECONDS 实现getDelay 和 compareTo 应该没有问题 你所指的性能损失具体指的那部分
希望探讨一下~