文档章节

代码实现Redis异步任务

Mr_Qi
 Mr_Qi
发布于 2017/09/14 23:00
字数 900
阅读 1629
收藏 62

上文说到Redis实现优先级队列Redis实现优先级队列

那么代码的实现呢也比较简单

接口概述

定义几个消息类型

package com.air.tqb.model;
 
import java.io.Serializable;
 
public class Message implements Serializable{
    public Message(Distributable messageBean, Long timeStamp, String docType, Integer times, String from, String to) {
        this.messageBean = messageBean;
        this.timeStamp = timeStamp;
        this.docType = docType;
        this.times = times;
        this.from = from;
        this.to = to;
    }
 
    public Message() {
    }
 
    public Message(String docType) {
        this.docType = docType;
    }
 
    public Message(Distributable messageBean, String docType) {
        this.messageBean = messageBean;
        this.docType = docType;
    }
 
    private static final long serialVersionUID = -3005565413488441986L;
    private Distributable messageBean;
    private Long timeStamp;
    private String docType;
    private Integer times;
    private String from;
    private String to;
 
    public Distributable getMessageBean() {
        return messageBean;
    }
 
    public void setMessageBean(Distributable messageBean) {
        this.messageBean = messageBean;
    }
 
    public Long getTimeStamp() {
        return timeStamp;
    }
 
    public void setTimeStamp(Long timeStamp) {
        this.timeStamp = timeStamp;
    }
 
    public String getDocType() {
        return docType;
    }
 
    public void setDocType(String docType) {
        this.docType = docType;
    }
 
    public Integer getTimes() {
        return times;
    }
 
    public void setTimes(Integer times) {
        this.times = times;
    }
 
    public String getFrom() {
        return from;
    }
 
    public void setFrom(String from) {
        this.from = from;
    }
 
    public String getTo() {
        return to;
    }
 
    public void setTo(String to) {
        this.to = to;
    }
 
    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("Message{");
        sb.append("messageBean=").append(messageBean);
        sb.append(", timeStamp=").append(timeStamp);
        sb.append(", docType='").append(docType).append('\'');
        sb.append(", times=").append(times);
        sb.append(", from='").append(from).append('\'');
        sb.append(", to='").append(to).append('\'');
        sb.append('}');
        return sb.toString();
    }
}
  1. messageBean为业务bean,对应必须要求为Distributable类型
  2. docType为业务类型,对应必须要求不可为空

定义如下接口

package com.air.tqb.message;
 
public interface MessageDispatcher {
    void consume();
}

该接口主要完成任务的转发,会一直监听对应的队列

package com.air.tqb.message;
 
import com.air.tqb.model.Message;
 
public interface MessageProcessor {
 
    boolean process(Message message);
 
    boolean isInterestedIn(String docType);
}

该接口表示对应processor处理业务逻辑以及对应需要关注对象

package com.air.tqb.message;
 
import com.air.tqb.model.Message;
 
public interface MessageSender {
    void sendMessage(Message message);
 
    void sendMessage(Message message, Priority priority);
 
    void sendFailMessage(Message message);
 
    void sendDeadMessage(Message message);
}

该接口表示发送消息,分别对应发送接口 发送失败信息接口(处理失败)发送死信队列接口(无processsor处理)

package com.air.tqb.message;
 
public enum Priority {
    high, middle, low;
}

优先级枚举

实现

package com.air.tqb.message.redis;
 
import com.air.tqb.common.AppConstant;
import com.air.tqb.helper.MessageHelper;
import com.air.tqb.message.AbstartactMessageDispatcher;
import com.air.tqb.message.Priority;
import com.air.tqb.model.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
 
import javax.annotation.PostConstruct;
import java.util.List;
 
@Component
public class RedisMessageDispatcher extends AbstartactMessageDispatcher {
    @Autowired
    private RedisConnectionFactory redisConnectionFactory;
    @Autowired
    private MessageHelper messageHelper;
 
    private byte[][] messageQueueNames = new byte[Priority.values().length][];
    private RedisSerializer redisSerializer = new JdkSerializationRedisSerializer();
 
    @PostConstruct
    public void init() {
        int i = 0;
        for (Priority priority : Priority.values()) {
            messageQueueNames[i++] = redisSerializer.serialize(AppConstant.MESSAGE_QUEUE_PREFIX + priority.name());
        }
 
    }
 
 
    @Override
    public void consume() {
        RedisConnection redisConnection = redisConnectionFactory.getConnection();
        while (true) {
            List<byte[]> rlts = redisConnection.bLPop(0, messageQueueNames);
            if (rlts == null || rlts.isEmpty()) {
                //impossible
                logger.warn("blpop list null or empty");
            } else {
                try {
                    final Message message = (Message) redisSerializer.deserialize(rlts.get(1));
                    if (messageHelper.canBeImported(message)) {
                        submitNewTask((new Runnable() {
                            @Override
                            public void run() {
                                boolean rlt = messageHelper.doProcess(message);
                                if (!rlt) {
                                    messageHelper.sendFailMessage(message);
                                }
                            }
                        }));
 
                    } else {
                        messageHelper.sendDeadMessage(message);
                    }
                } catch (Exception ex) {
                    logger.warn(ex.getMessage(), ex);
                }
            }
        }
 
 
    }
 
 
}

利用Redis的blpop特性(阻塞)完成监听 

当处理失败后直接丢入failLetter,无人处理则丢入deadLetter

package com.air.tqb.helper;
 
import com.air.tqb.message.MessageProcessor;
import com.air.tqb.message.MessageSender;
import com.air.tqb.message.NoOpMessageProcessor;
import com.air.tqb.message.Priority;
import com.air.tqb.model.Message;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.concurrent.TimeUnit;
 
@Component
public class MessageHelper {
    @Autowired
    private List<MessageProcessor> processorList = Lists.newArrayList();
    @Autowired
    private MessageSender messageSender;
    private static final MessageProcessor NO_OP_MESSAGE_PROCESSOR = new NoOpMessageProcessor();
    private LoadingCache<String, MessageProcessor> docTypeCache = null;
 
 
    @PostConstruct
    public void init() {
        docTypeCache = CacheBuilder.newBuilder()
                .maximumSize(1000)
                .expireAfterAccess(100, TimeUnit.MINUTES)
                .build(
                        new CacheLoader<String, MessageProcessor>() {
                            @Override
                            public MessageProcessor load(String key) throws Exception {
                                if (Strings.isNullOrEmpty(key)) return NO_OP_MESSAGE_PROCESSOR;
                                for (MessageProcessor processor : processorList) {
                                    if (processor.isInterestedIn(key)) {
                                        return processor;
                                    }
                                }
                                return NO_OP_MESSAGE_PROCESSOR;
                            }
                        });
    }
 
 
    public boolean canBeImported(Message message) {
        Preconditions.checkArgument(message != null && message.getDocType() != null);
        MessageProcessor unchecked = docTypeCache.getUnchecked(message.getDocType());
        return !unchecked.equals(NO_OP_MESSAGE_PROCESSOR);
    }
 
    public boolean doProcess(Message message) {
        MessageProcessor unchecked = docTypeCache.getUnchecked(message.getDocType());
        return unchecked.process(message);
    }
 
    public void sendMessage(Message message, Priority priority) {
        messageSender.sendMessage(message, priority);
    }
 
    public void sendMessage(Message message) {
        this.sendMessage(message, Priority.middle);
    }
 
 
    public void sendFailMessage(Message message) {
        messageSender.sendFailMessage(message);
    }
 
    public void sendDeadMessage(Message message) {
        messageSender.sendDeadMessage(message);
    }
}

对应消息发送&处理均在此

package com.air.tqb.message.redis;
 
import com.air.tqb.common.AppConstant;
import com.air.tqb.message.MessageSender;
import com.air.tqb.message.Priority;
import com.air.tqb.model.Message;
import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
 
@Component
public class RedisMessageSender implements MessageSender {
    @Autowired
    @Qualifier(value = "redisTemplate")
    private RedisTemplate template;
 
    public void sendMessage(Message message, Priority priority) {
        template.opsForList().rightPush(AppConstant.MESSAGE_QUEUE_PREFIX + priority.name(), message);
    }
 
    public void sendMessage(Message message) {
        this.sendMessage(message, Priority.middle);
    }
 
 
    public void sendFailMessage(Message message) {
        message.setTimes(message.getTimes() == null ? 1 : message.getTimes() + 1);
        template.opsForList().rightPush(AppConstant.MESSAGE_FAILLETTER,message);
    }
 
    public void sendDeadMessage(Message message) {
        template.opsForList().rightPush(AppConstant.MESSAGE_DEADLETTER, message);
    }
}

将对应信息发送到Redis的队列中

一个正常的VIN码关注Processor如下

package com.air.tqb.message;
 
import com.air.tqb.model.Message;
import org.springframework.stereotype.Component;
 
@Component
public class VinMessageProcessor implements MessageProcessor {
    @Override
    public boolean process(Message message) {
        System.out.println(message);
        return true;
    }
 
    @Override
    public boolean isInterestedIn(String docType) {
        return "VIN".equals(docType);
    }
}

© 著作权归作者所有

共有 人打赏支持
Mr_Qi

Mr_Qi

粉丝 280
博文 359
码字总数 369228
作品 0
南京
程序员
私信 提问
加载中

评论(3)

他好像条狗啊
他好像条狗啊
Mr_Qi
Mr_Qi
主要是小的系统引入mq中间件太重 而且运维比较麻烦。。。
雪吖头
雪吖头
长见识了。
异步任务神器 Celery 简明笔记

Celery 在程序的运行过程中,我们经常会碰到一些耗时耗资源的操作,为了避免它们阻塞主程序的运行,我们经常会采用多线程或异步任务。比如,在 Web 开发中,对新用户的注册,我们通常会给他发...

funhacks
2017/11/29
0
0
Redis异步化组件模型

摘要 Redis对客户端的IO事件处理是由主线程串行执行的,除了IO事件之外,这个线程还负责过期键的处理、复制协调、集群协调等等,这些除了IO事件之外的逻辑会被封装成周期性的任务由主线程周期...

Float_Luuu
2016/06/18
1K
0
Redis 异步化组件模型

本文作者:伯乐在线 -Float_Lu 。未经作者许可,禁止转载! 欢迎加入伯乐在线专栏作者。 Redis 线程体系 Redis可以说是基于单线程模型的,因为对于客户端的所有读写请求的处理,都由一个主线...

伯乐在线
2016/06/28
0
0
异步任务神器 Celery 简明笔记

在程序的运行过程中,我们经常会碰到一些耗时耗资源的操作,为了避免它们阻塞主程序的运行,我们经常会采用多线程或异步任务。比如,在 Web 开发中,对新用户的注册,我们通常会给他发一封激...

_Change_
2017/10/24
0
0
如履薄冰 —— Redis懒惰删除的巨大牺牲

之前我们介绍了Redis懒惰删除的特性,它是使用异步线程对已经删除的节点进行延后内存回收。但是还不够深入,所以本节我们要对异步线程逻辑处理的细节进行分析,看看Antirez是如何实现异步线程...

Java填坑之路
08/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

[LintCode] Serialize and Deserialize Binary Tree(二叉树的序列化和反序列化)

描述 设计一个算法,并编写代码来序列化和反序列化二叉树。将树写入一个文件被称为“序列化”,读取文件后重建同样的二叉树被称为“反序列化”。 如何反序列化或序列化二叉树是没有限制的,你...

honeymose
今天
5
0
java框架学习日志-7(静态代理和JDK代理)

静态代理 我们平时去餐厅吃饭,不是直接告诉厨师做什么菜的,而是先告诉服务员点什么菜,然后由服务员传到给厨师,相当于服务员是厨师的代理,我们通过代理让厨师炒菜,这就是代理模式。代理...

白话
今天
23
0
Flink Window

1.Flink窗口 Window Assigner分配器。 窗口可以是时间驱动的(Time Window,例如:每30秒钟),也可以是数据驱动的(Count Window,例如:每一百个元素)。 一种经典的窗口分类可以分成: 翻...

满小茂
今天
18
0
my.ini

1

architect刘源源
今天
16
0
docker dns

There is a opensource application that solves this issue, it's called DNS Proxy Server It's a DNS server that solves containers hostnames, if could not found a hostname that mat......

kut
今天
17
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部