文档章节

延迟队列实现精准的订单超时取消--自我记录备份

checkboxMan
 checkboxMan
发布于 10/23 19:31
字数 2148
阅读 26
收藏 22

订单的超时取消很多系统采用定时任务,实际上达不到要求。我用的是延迟队列,但缺点是只实现了基于jvm的,分布式采用的是修改之前去查询订单状态,以及分布式锁获取的方式来控制,这样获得锁的,先去查订单是否已经取消掉,如果没有,就改成去掉。但这种方式虽然效果不错,我对这个半吊子的思路还是不满意的,后面有时间再优化一些,做成分布式的。

整个功能实际就是利用延迟队列的特性。延迟队列有个时间属性,一旦到达这个时间节点,会立即从队列中弹出任务,这时候,轮询守护线程就可以执行这个任务了。如果没有到达这个时间节点,无论轮询守护线程怎么轮询,都查不到对应的任务弹出。从jdk1.5开始,java就提供了Delayed接口,用于延迟队列。该接口是继承于Comparable<T>的。至于延迟队列里的任务是什么样子,则由自己定义,我为了便于多线程跑,使用了继承了Runnable的任务,所以定义成这个样子。

执行流程:

1.系统启动,初始化延迟队列。随即会启动守护线程,用于轮询延迟队列里的任务到期弹出

2.将现有数据库中处于待支付状态的订单插入延迟队列

3.一旦有新的订单加入,则放入延迟队列中

4.一旦有任务到期,则守护线程会接到弹出的任务,并执行该任务

5.该任务会从更新数据库中的订单状态

6.一旦有用户取消订单/支付订单,则从延迟队列中查询到该订单的延迟任务,手动删除该任务

这个是使用的spring boot的线程池的,所以需要配置一个线程池。

先上延迟队列代码:

package com.xxxx.xxxx.config.taskThreadPool;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @Author: xiehao
 * @Date: 2018-08-11
 * @Description 延迟队列中放入的task,里面包含到时间之后要处理的任务,该任务类实现了Runnable,用于交给线程池管理
 */
public class DelayCancelOrderTask<T extends Runnable> implements Delayed {

   /**
    * 到期时间
    */
   private final long            time;

   /**
    * 任务对象
    */
   private final T processor;
   /**
    * 原子类
    */
   private static final AtomicLong atomic = new AtomicLong(0);

   private final long sequence;

   public DelayCancelOrderTask(long timeout, T processor){
      this.time = System.nanoTime() + timeout;
      this.processor = processor;
      this.sequence = atomic.getAndIncrement();
   }
   @Override
   public long getDelay(TimeUnit unit) {
      return unit.convert(this.time - System.nanoTime(),TimeUnit.NANOSECONDS);
   }

   @Override
   public int compareTo(Delayed o) {
      if (o == this){
         return 0;
      }
      if (o instanceof DelayCancelOrderTask){
         DelayCancelOrderTask<?> other = (DelayCancelOrderTask<?>)o;
         long diff = this.time - other.time;
         if (diff > 0) {
            return 1;
         } else if (diff < 0) {
            return -1;
         }else if(this.sequence < other.sequence){
            return -1;
         } else {
            return 0;
         }
      }
      long diffrent = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
      return (diffrent == 0)?0:((diffrent < 0) ? -1 : 1);
   }
   public T getProcessor(){
      return  this.processor;
   }
   @Override
   public int hashCode(){
      return processor.hashCode();
   }
   @Override
   public boolean equals(Object object)
   {
      if (object != null)
      {
         return object.hashCode() == hashCode() ? true : false;
      }
      return false;
   }

}

延迟队列中放置的任务:

package com.xxxx.xxxx.config.taskThreadPool;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;

import com.xxxx.xxxx.domain.vo.MonitorOrderVO;
import com.xxxx.xxxx.enums.EnumWorkOrderSource;
import com.xxxx.xxxx.utils.MessageProducer;
import org.springframework.scheduling.annotation.Async;

import com.xxxx.xxxx.domain.pojo.MonitorOrder;
import com.xxxx.xxxx.enums.EnumB2BOrderStatus;
import com.xxxx.xxxx.service.MonitorOrderService;
import com.xxxx.xxxx.utils.BeanFactoryGetter;

/**
 * @Author: xiehao
 * @Date: 2018-08-11
 * @Description 超时取消的时间到了,这里要执行对订单的具体取消操作
 */
public class PayTimeoutCancelOrderProcessor implements Runnable {
   private String b2bTaskNo;
   public PayTimeoutCancelOrderProcessor(String b2bTaskNo){
      this.b2bTaskNo = b2bTaskNo;
   }
   @Override
   @Async
   public void run(){
      //因service无法注入,只能从bean工厂中拉取
      MonitorOrderService monitorOrderService = (MonitorOrderService)BeanFactoryGetter.getBean("monitorOrderService");
      MonitorOrder monitorOrder = new MonitorOrder();
      monitorOrder.setB2bTaskNo(b2bTaskNo);
      monitorOrder.setCurrentStatus(EnumB2BOrderStatus.OVERTIME_PAYMENT.getKey());
      monitorOrder.setCurrentStatusTime(LocalDateTime.now());
      monitorOrderService.uptCurrentStatusByTaskNoForOverTimeCancel(monitorOrder);
      //推送直接用户订单消息。现在还没有推送系统正式上线,所以先暂时注释掉
   }
}

延迟队列的管理类,有轮询守护线程:

package com.xxxx.xxxx.config.taskThreadPool;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
 * @Author: xiehao
 * @Date: 2018-08-11
 * @Description 超时取消的管理类,管理延迟队列,放出一个线程轮询监控延迟队列;有一些方法用来往延迟队列中压入task,有方法用来从队列中remove task。还有初始化方法,开始执行轮询监控线程
 */
@Component
public class DelayCancelOrderTaskManager {
   private static final Logger logger = LoggerFactory.getLogger(DelayCancelOrderTaskManager.class);
   //延迟队列,一切都是围绕它来运转的
   private DelayQueue<DelayCancelOrderTask<?>> delayQueue;
   //超时取消的管理类是个单例,因为系统启动要有方法往队列中插入延迟task,所以搞成饿汉模式
   private static DelayCancelOrderTaskManager instance = new DelayCancelOrderTaskManager();
   // 守护线程,用于轮询延时队列
   private Thread               daemonThread;
   //该方法初始管理类时调用,初始化延迟队列,同时初始化轮询线程
   private DelayCancelOrderTaskManager(){
      delayQueue = new DelayQueue<DelayCancelOrderTask<?>>();
      this.init();
   }
   public static DelayCancelOrderTaskManager getInstance(){
      return instance;
   }
   //初始化轮询监控守护线程
   public void init(){
      //lambda表达式,->的意思其实是静态方法,初始化随即执行的。
      daemonThread = new Thread(()-> {
         try{
            System.out.println("daemonThread start");
            execute();
         }catch (Exception e){
            logger.error("轮询线程出错",e);
         }
      });
      daemonThread.setName("DelayQueueMonitorThread");
      daemonThread.start();
   }
   //初始化的时候开始执行
   private void execute(){
      //不断轮询
      while(true){
         //此处仅为打印日志方便
         Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
         logger.info("线程数--------------" + map.size());
         logger.info(System.currentTimeMillis()+" 队列中的个数:"+delayQueue.size());
         try{
            //从队列中取出可以取出的task。延时队列有个特点,不到时间取不出来,所以能取出来的,都是到时间即将执行的。
            DelayCancelOrderTask<?> delayCancelOrderTask = delayQueue.take();
            //task不为空,则开始执行
            if(delayCancelOrderTask != null){
               //获取task里面的处理线程,该线程会丢到线程池中处理
               Runnable payTimeoutCancelOrderProcessor = delayCancelOrderTask.getProcessor();
               if(payTimeoutCancelOrderProcessor == null){
                  continue;
               }
               //线程执行
               payTimeoutCancelOrderProcessor.run();
               //执行完毕,从队列中删除task
               this.removeTask(delayCancelOrderTask);
            }
         }catch (Exception e){
            logger.error("线程执行错误:",e);
         }
      }
   }

   /**
    * @Author xiehao
    * @Date 2018/8/11 18:15
    * @Param
    * @Description 传入的超时时间为以秒为单位
    */
   public void putTaskInSeconds(Runnable task,long timeoutPeriod){
      long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,TimeUnit.SECONDS);
      DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task);
      delayQueue.put(delayCancelOrderTask);
   }

   /**
    * @Author xiehao
    * @Date 2018/8/11 18:15
    * @Param
    * @Description 传入的超时时间以为分钟为单位
    */
   public void putTaskInMinites(Runnable task,long timeoutPeriod){
      long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,TimeUnit.MINUTES);
      DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task);
      delayQueue.put(delayCancelOrderTask);
   }
   /**
    * @Author xiehao
    * @Date 2018/8/11 18:15
    * @Param
    * @Description 传入的超时时间以小时为单位
    */
   public void putTaskInHours(Runnable task,long timeoutPeriod){
      long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,TimeUnit.HOURS);
      DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task);
      delayQueue.put(delayCancelOrderTask);
   }
   /**
    * @Author xiehao
    * @Date 2018/8/11 18:15
    * @Param
    * @Description 传入的超时时间为自定义的单位
    */
   public void putTaskInOwnDefine(Runnable task,long timeoutPeriod,TimeUnit unit){
      long timeout = TimeUnit.NANOSECONDS.convert(timeoutPeriod,unit);
      DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task);
      delayQueue.put(delayCancelOrderTask);
   }

   /**
    * @Author xiehao
    * @Date 2018/8/11 18:15
    * @Param
    * @Description 传入的时间为超时时间点
    */
   public void putTaskInTimeoutTime(Runnable task,LocalDateTime timeoutTime){
      Duration duration = Duration.between(LocalDateTime.now(),timeoutTime);
      long timeout = TimeUnit.NANOSECONDS.convert(duration.toNanos(),TimeUnit.NANOSECONDS);
      DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout,task);
      delayQueue.put(delayCancelOrderTask);
   }

   /**
    * @Author xiehao
    * @Date 2018/8/13 15:43
    * @Param
    * @Description 从队列中删除某个task。一般在用户自己取消订单的时候执行
    */
   public boolean removeTask(DelayCancelOrderTask<? extends Runnable> task){
      return delayQueue.remove(task);
   }

   /**
    * @Author xiehao
    * @Date 2018/8/13 15:44
    * @Param
    * @Description 判断队列中是否含有某个task
    */
   public boolean contains(DelayCancelOrderTask<? extends Runnable> task){
      return delayQueue.contains(task);
   }
   //获取队列个数。这个方法专门给打印日志核对数据用的。一般用不着它
   public Integer getDelayQueueSize(){
      System.out.println("队列中的个数:"+delayQueue.size());
      return delayQueue.size();
   }
}

在系统初始化的时候,需要将数据库中已经处于等待超时状态的订单打入延迟队列:

package com.xxxx.xxxx.config.runner;

import com.xxxx.xxxx.config.taskThreadPool.DelayCancelOrderTaskManager;
import com.xxxx.xxxx.config.taskThreadPool.PayTimeoutCancelOrderProcessor;
import com.xxxx.xxxx.domain.pojo.MonitorOrder;
import com.xxxx.xxxx.service.MonitorOrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @Author: xiehao
 * @Date: 2018-08-11
 * @Description 系统初始化时将处于待支付状态的单据丢到延时队列中,等待执行超时任务
 */

@Component
public class OrderPayTimeoutQueueRunner  implements CommandLineRunner {
   @Autowired
   private MonitorOrderService monitorOrderService;
   @Override
   public void run(String... strings) throws Exception {
      DelayCancelOrderTaskManager delayCancelOrderTaskManager = DelayCancelOrderTaskManager.getInstance();
      List<MonitorOrder> monitorOrderList = monitorOrderService.getPayNotTimeoutOrderList();
      for(MonitorOrder monitorOrder : monitorOrderList){
         PayTimeoutCancelOrderProcessor processor = new PayTimeoutCancelOrderProcessor(monitorOrder.getB2bTaskNo());
         delayCancelOrderTaskManager.putTaskInTimeoutTime(processor,monitorOrder.getOrderPayTimeout());
      }
   }
}

如果用户取消订单/中间执行支付操作后,则从延迟队列中去掉任务:

//取消预约单,则删除延时队列中的超时等待信号 xiehao start
MonitorOrder targetOrder = monitorOrderMapper.getMonitorOrderByTaskNo(taskNum);
DelayCancelOrderTaskManager delayCancelOrderTaskManager = DelayCancelOrderTaskManager.getInstance();
PayTimeoutCancelOrderProcessor processor = new PayTimeoutCancelOrderProcessor(targetOrder.getB2bTaskNo());
Duration duration = Duration.between(LocalDateTime.now(), targetOrder.getOrderPayTimeout());
long timeout = TimeUnit.NANOSECONDS.convert(duration.toNanos(), TimeUnit.NANOSECONDS);
DelayCancelOrderTask<?> delayCancelOrderTask = new DelayCancelOrderTask<>(timeout, processor);
delayCancelOrderTaskManager.removeTask(delayCancelOrderTask);
//取消预约单,则删除延时队列中的超时等待信号 xiehao end

如果用户中间下单,则加入延迟队列:

//加入延时队列,等待超时取消 xiehao
            DelayCancelOrderTaskManager delayCancelOrderTaskManager = DelayCancelOrderTaskManager.getInstance();
            PayTimeoutCancelOrderProcessor processor = new PayTimeoutCancelOrderProcessor(order.getB2bTaskNo());
            delayCancelOrderTaskManager.putTaskInTimeoutTime(processor, order.getOrderPayTimeout());

工具类:

package com.xxxx.xxxx.utils;

import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.stereotype.Component;

/**
 * @Author: xiehao
 * @Date: 2018-08-04
 */

@Component
public class BeanFactoryGetter implements BeanFactoryAware {

   //Bean工厂必须是static类型,否则系统启动的时候将无法写入factory
   private static BeanFactory factory;

   @Override
   public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
      factory = beanFactory;
   }

   public static Object getBean(String beanName){
      if(StringUtils.isEmpty(beanName)){
         return null;
      }
      Object t= factory.getBean(beanName);
      return t;
   }
}

© 著作权归作者所有

共有 人打赏支持
checkboxMan
粉丝 5
博文 7
码字总数 7966
作品 0
杭州
私信 提问
java中定时器的设置

在java后台开发中,我们常常会碰到一种需求:定时任务,比如超时取消未支付订单、定时推送通知(发送短信)、定时清理日志等等。这些需求在我们的开发中是随处可见的,但是往往对于一些特殊的...

Imstillaboy
2017/12/08
0
0
rabbitmq延迟队列之php实现

延迟任务应用场景 场景一:物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时。 场景二:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。 实...

china_lx1
08/22
0
0
基于rabbitMQ 消息延时队列方案 模拟电商超时未支付订单处理场景

前言 传统处理超时订单 采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,并且当处理大量订单起来会很力不从心,而且实时性也不是特别好...

芷恋灬
10/18
0
0
zephyr笔记 2.1.5 工作队列线程

我正在学习 Zephyr,一个很可能会用到很多物联网设备上的操作系统,如果你也感兴趣,可点此查看帖子zephyr学习笔记汇总。 1 前言 工作队列是一个内核对象,它使用专用线程以先进先出的方式处...

iotisan
04/18
0
0
执行后台任务工具--Hangfire

Hangfire是一个开源且商业免费使用的工具函数库。可以让你非常容易地在ASP.NET应用(也可以不在ASP.NET应用)中执行多种类型的后台任务,而无需自行定制开发和管理基于Windows Service后台任...

匿名
2016/10/18
910
0

没有更多内容

加载失败,请刷新页面

加载更多

ConcurrentHashMap 高并发性的实现机制

ConcurrentHashMap 的结构分析 为了更好的理解 ConcurrentHashMap 高并发的具体实现,让我们先探索它的结构模型。 ConcurrentHashMap 类中包含两个静态内部类 HashEntry 和 Segment。HashEnt...

TonyStarkSir
今天
3
0
大数据教程(7.4)HDFS的java客户端API(流处理方式)

博主上一篇博客分享了namenode和datanode的工作原理,本章节将继前面的HDFS的java客户端简单API后深度讲述HDFS流处理API。 场景:博主前面的文章介绍过HDFS上存的大文件会成不同的块存储在不...

em_aaron
昨天
2
0
聊聊storm的window trigger

序 本文主要研究一下storm的window trigger WindowTridentProcessor.prepare storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.java public v......

go4it
昨天
6
0
CentOS 生产环境配置

初始配置 对于一般配置来说,不需要安装 epel-release 仓库,本文主要在于希望跟随 RHEL 的配置流程,紧跟红帽公司对于服务器的配置说明。 # yum update 安装 centos-release-scl # yum ins...

clin003
昨天
9
0
GPON网络故障处理手册

导读 为了方便广大网络工作者工作需要,特搜集以下GPON网络处理流程供大家学习参考。开始—初步定为故障—检查光纤状况—检查ONU状态--检查设备运行状态—检查设备数据配置—检查上层设备状态...

问题终结者
昨天
10
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部