文档章节

基于memcache的java分布式队列实现。

朱轩
 朱轩
发布于 2016/03/31 17:18
字数 839
阅读 156
收藏 2

主要有两个类,一个队列类和一个job的抽象类。

保证队列类中的key的唯一性,就可以用spring配置多个实例。水平有限,欢迎吐槽。

上代码:

1、队列类

import net.spy.memcached.MemcachedClient;
import net.spy.memcached.internal.OperationFuture;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

import com.izx.services.common.Constant;

/**
 * 
* @ClassName: MemCacheQueue 
* @Description: 基于memcache的消息队列的实现
* @author hai.zhu
* @date 2016-3-31 下午3:29:00 
*
 */
public class MemCacheQueue implements InitializingBean, DisposableBean,ApplicationContextAware {
    private static final Log log = LogFactory.getLog(MemCacheQueue.class);
    
    /**
     * 队列名
     */
    private String key;
    
    /**
     * 队列锁失效分钟
     */
    private Integer lockExpireMinite = 3;
    
    private MemcachedClient memcachedClient;
    
    private ApplicationContext applicationContext;
    
    ListenerThread listenerThread = new ListenerThread();
    
    public void setKey(String key) {
        this.key = key;
    }
    
    public void setMemcachedClient(MemcachedClient memcachedClient) {
        this.memcachedClient = memcachedClient;
    }
    
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @Override
    public void destroy() throws Exception {
        try {
            this.sign = false;
            listenerThread.interrupt();
        } catch (Exception e) {
            log.error(e);
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        //初始化队列,用add防止重启覆盖
        memcachedClient.add(Constant.MEMCACHE_GLOBAL_QUEUE_STARTKEY + key, 0, "0");
        memcachedClient.add(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY + key, 0, "0");
        //设置任务线程
        listenerThread.setDaemon(true);
        listenerThread.start();
    }
    
    /**
     * 
    * @Title: push 
    * @Description: 唯一对外方法,放入要执行的任务
    * @param @param value
    * @param @throws Exception    设定文件 
    * @return void    返回类型 
    * @throws
     */
    public synchronized void push(MemCacheQueueJobAdaptor value) throws Exception {
        //分布加锁
        queuelock();
        //放入队列
        memcachedClient.incr(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY + key, 1);
        Object keyorder = memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY + key);
        memcachedClient.set(Constant.MEMCACHE_GLOBAL_QUEUE_VARIABLE + key + "_" + keyorder, 0, value);
        //分布解锁
        queueUnLock();
    }
    
    /**
     * 
    * @Title: pop 
    * @Description: 取出要执行的任务
    * @param @return
    * @param @throws Exception    设定文件 
    * @return MemCacheQueueJobAdaptor    返回类型 
    * @throws
     */
    private synchronized MemCacheQueueJobAdaptor pop() throws Exception {
        Object keyorderstart = memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_STARTKEY + key);
        Object keyorderend = memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY + key);
        if(keyorderstart.equals(keyorderend)){
            return null;
        }
        MemCacheQueueJobAdaptor adaptor = (MemCacheQueueJobAdaptor)memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_VARIABLE + key + "_" + keyorderstart);
        memcachedClient.incr(Constant.MEMCACHE_GLOBAL_QUEUE_STARTKEY + key, 1);
        memcachedClient.delete(Constant.MEMCACHE_GLOBAL_QUEUE_VARIABLE + key + "_" + keyorderstart);
        return adaptor;
    }
    
    /**
     * 
    * @Title: queuelock 
    * @Description: 加锁
    * @param @throws InterruptedException    设定文件 
    * @return void    返回类型 
    * @throws
     */
    private void queuelock() throws Exception {
        do {
            OperationFuture<Boolean> sign = memcachedClient.add(Constant.MEMCACHE_GLOBAL_QUEUE_LOCK + key, lockExpireMinite * 60, key);
            if(sign.get()){
                return;
            } else {
                log.debug("key: " + key + " locked by another business");
            }
            Thread.sleep(300);
        } while (true);
    }
    
    /**
     * 
    * @Title: queueUnLock 
    * @Description: 解锁
    * @param     设定文件 
    * @return void    返回类型 
    * @throws
     */
    private void queueUnLock() {
        memcachedClient.delete(Constant.MEMCACHE_GLOBAL_QUEUE_LOCK + key);
    }
    
    private boolean sign = true;
    private long THREAD_SLEEP = 10;
    class ListenerThread extends Thread {
        @Override
        public void run(){
            log.error("队列["+key+"]开始执行");
            while(sign){
                try {
                    Thread.sleep(THREAD_SLEEP);
                    dojob();
                } catch (Exception e) {
                    log.error(e);
                }
            }
        }
        
        private void dojob(){
            try{
                queuelock();
                MemCacheQueueJobAdaptor adaptor = pop();
                //逐个执行
                if(adaptor != null){
                    THREAD_SLEEP = 10;
                    try {
                        adaptor.setApplicationContext(applicationContext);
                        adaptor.onMessage();
                    } catch (Exception e) {
                        log.error(e);
                    }
                }else{
                    THREAD_SLEEP = 5000;
                }
            }catch(Exception e){
                log.error(e);
            }finally{
                queueUnLock();
            }
        }
    }
}

2、job抽象类

import org.springframework.context.ApplicationContext;
import java.io.Serializable;

/**
 * 
 * @ClassName: MemCacheQueueJobAdaptor 
 * @Description: 基于memcache队列的任务适配器
 * @author hai.zhu
 * @date 2015-12-11 上午11:48:26 
 * @param <T>
 */
public abstract class MemCacheQueueJobAdaptor implements Serializable{
    private static final long serialVersionUID = -5071415952097756327L;
    
    private ApplicationContext applicationContext;
    
    public ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }
    
    /**
     * 
     * @Title: onMessage 
     * @Description: 异步执行任务接口
     * @author hai.zhu
     * @param @param value 设定文件 
     * @return void 返回类型 
     * @throws
     */
    public abstract void onMessage();
}

3、部分放在constant的常量

/**
     * 基于memcache的队列存放前缀
     */
    public static String MEMCACHE_GLOBAL_QUEUE_VARIABLE = "MEMCACHE_GLOBAL_QUEUE_VARIABLE_";
    
    /**
     * 基于memcache的队列锁的前缀
     */
    public static String MEMCACHE_GLOBAL_QUEUE_LOCK = "MEMCACHE_GLOBAL_QUEUE_LOCK_";
    
    /**
     * 基于memcache的队列锁的开始元素
     */
    public static String MEMCACHE_GLOBAL_QUEUE_STARTKEY = "MEMCACHE_GLOBAL_QUEUE_STARTKEY_";
    
    /**
     * 基于memcache的队列锁的结束元素
     */
    public static String MEMCACHE_GLOBAL_QUEUE_ENDKEY = "MEMCACHE_GLOBAL_QUEUE_ENDKEY_";

4、spring配置,保证队列名的唯一性就可以配置多个队列

<!-- 枚举类型需要转换 -->
    <bean id="KETAMA_HASH" class="org.springframework.beans.factory.config.FieldRetrievingFactoryBean">    
        <property name="staticField" value="net.spy.memcached.DefaultHashAlgorithm.KETAMA_HASH" />    
    </bean>
    
    <!-- memcache客户端 -->
    <bean id="memcachedClient" class="net.spy.memcached.spring.MemcachedClientFactoryBean">
        <property name="servers" value="192.168.75.154:11277,192.168.75.154:11277,192.168.75.154:11277"/>
        <property name="protocol" value="BINARY"/>
        <property name="transcoder">
            <bean class="net.spy.memcached.transcoders.SerializingTranscoder">
                <property name="compressionThreshold" value="1024"/>
            </bean>
        </property>
        <property name="opTimeout" value="1000"/>
        <property name="timeoutExceptionThreshold" value="1998"/>
        <property name="hashAlg" ref="KETAMA_HASH"/>
        <property name="locatorType" value="CONSISTENT"/> 
        <property name="failureMode" value="Redistribute"/>
        <property name="useNagleAlgorithm" value="false"/>
    </bean>
    
    <!-- 队列配置 -->
    <bean id="onequeue" class="com.izx.services.queque.MemCacheQueue">
        <property name="memcachedClient" ref="memcachedClient"/>
        <property name="key" value="onequeue"/>
    </bean>


© 著作权归作者所有

朱轩
粉丝 0
博文 7
码字总数 1615
作品 0
深圳
后端工程师
私信 提问
加载中

评论(0)

Ehcache和MemCached比较分析

项目 Memcache Ehcache 分布式 不完全,集群默认不实现 支持 集群 可通过客户端实现 支持(默认是异步同步) 持久化 可通过第三方应用实现,如sina研发的memcachedb,将cache的数据保存到[ur...

浮躁的码农
2015/07/24
2.3K
0
Hazelcast: Java分布式内存网格框架(平台)

网址: http://www.hazelcast.com/。 下边是它的宣传内容: hazelcast是一个开放源码集群和高度可扩展的数据分发平台,这是为Java: 1. 快如闪电;数以千计的运算/秒。 2. 故障安全;崩溃后没有...

晨曦之光
2012/04/12
916
0
面试必看!2018年4月份阿里最新的java程序员面试题目

目录 技术一面(23问) 技术二面(3大块) 性能优化(21点) 项目实战(34块) JAVA方向技术考察点(15点) JAVA开发技术面试中可能问到的问题(17问) 阿里技术面试1 1.Java IO流的层次结构...

美的让人心动
2018/04/16
276
5
BAT最新Java面试题汇总:并发编程+JVM+Spring+分布式+缓存等!

前言 作为一个开发人员,你是否面上了自己理想的公司,薪资达到心中理想的高度? 面试:如果不准备充分的面试,完全是浪费时间,更是对自己的不负责。 今天给大家分享下我整理的Java架构面试...

别打我会飞
2019/06/03
328
0
Spring Boot 集成 Memcached

Memcached 介绍 Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载。它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态、数据库驱动网站的...

中关村的老男孩
2019/06/21
50
0

没有更多内容

加载失败,请刷新页面

加载更多

广州哪里有开餐饮费发票

广州开餐饮费发票发票电薇13564998196陈晨100 % 真。从主业来看,2019年众诚保险围绕车险业务采取增设分支机构、加强合作、优化用户体验等动作,但综合成本率仍有所上行,业内指出,车险的价...

枅票微fp2090
6分钟前
11
0
深圳哪里有开餐饮费发票

深圳开餐饮费发票发票电薇13564998196陈晨100 % 真。从主业来看,2019年众诚保险围绕车险业务采取增设分支机构、加强合作、优化用户体验等动作,但综合成本率仍有所上行,业内指出,车险的价...

枅票嶶fp2090
9分钟前
5
0
略谈分布式系统中的容器设计模式

本文作者:zytan_cocoa 略谈分布式系统中的容器设计模式 谭中意 2020/3/5 前言:云原生(Cloud Native)不仅仅是趋势,更是现在进行时,它是构建现代的,可弹性伸缩的,快速迭代的计算网络服...

百度开发者中心
03/11
21
0
OSChina 周三乱弹 —— 小姐姐的领带有点带歪了,请帮忙正一下

Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @薛定谔的兄弟 :分享洛神有语创建的歌单「我喜欢的音乐」: 《アイタクテ -voice & piano-》- 和紗 手机党少年们想听歌,请使劲儿戳(这里) ...

小小编辑
今天
25
0
对象名称前的单下划线和双下划线是什么意思?

问题: Can someone please explain the exact meaning of having leading underscores before an object's name in Python? 有人可以解释一下在Python中对象名称前加下划线的确切含义吗? ......

技术盛宴
今天
29
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部