文档章节

Java实现Redis发布/订阅

问题达人
 问题达人
发布于 2016/03/29 10:29
字数 1370
阅读 275
收藏 5

今天经理让我实现一个Redis发布/订阅功能,用来记录审计信息。我查了一天,才弄出来。总结如下,大神勿喷。

redis的发布/订阅模式是消息机制之一,另外一个叫生成者消费者模式。Redis发布订阅模式讲解可以参考菜鸟教程的这篇文章http://www.runoob.com/redis/redis-pub-sub.html。

1、Redis发布订阅模式客户端实现。在打开Redis服务器后,再打开两个客户端,客户端1用来接收消息,客户端2用来发布消息。

客户端1订阅bar频道。格式:SUBSCRIBE name1 name2。 成功订阅回复,分别对应订阅类型、订阅频道、订阅数量。 127.0.0.1:6379> SUBSCRIBE bar Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "bar"3) (integer) 1 客户端2,发送消息。格式:publish channelName Message。 127.0.0.1:6379> publish bar val (integer) 1 客户端1订阅回复,分别对应消息类型,频道,消息。

  1. "message"2) "bar"3) "val" 效果图如下:

Redis支持模式匹配订阅,*为模糊匹配符。

订阅所有频道的消息

PSUBSCRIBE *
订阅以news.开头的所有频道。

PSUBSCRIBE news.* 其他操作这里不在赘述。

2、Java实现Redis的发布订阅。

Java实现Redis的功能大部分是使用jedis的jar包来进行操作。个人感觉,jedis封装了操作redis的常用命令,写多了就会发现知道redis命令怎么写的,就可以猜出来jedis中怎么写的。

首先在我们封装的JedisUtils中加入发布和订阅操作的方法。大家没有JedisUtils的可以参考JeeSite中的JedisUtils怎么写的。

? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 /**

  • 发布一个消息
  • @param channel
  • @param message */ public static void publishMsg(String channel,String message){ Jedis jedis = null; try { jedis = getResource();

jedis.publish(channel, message); logger.debug("publishMsg {} = {}", channel, message); } catch (Exception e) { logger.warn("publishMsg {} = {}", channel, message, e); } finally { returnResource(jedis); } }

/**

  • 发布一个消息
  • @param channel
  • @param message */ public static void publishMsg(byte[] channel,byte[] message){ Jedis jedis = null; try { jedis = getResource();

jedis.publish(channel, message); logger.debug("publishMsg {} = {}", channel, message); } catch (Exception e) { logger.warn("publishMsg {} = {}", channel, message, e); } finally { returnResource(jedis); } }

 上面的两个方法的核心处理就是jedis.publish(channel, message);参数channel是消息的频道,message是消息的内容。在Junit测试或者其他的地方,使用工具类的此方法即可发布一个消息。

接收消息代码多一些。首先定义一个类继承JedisPubSub,然后实现其中的未实现的方法,最后在工具类JedisUtils中定义一个操作的方法即可。代码如下:

? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 /**

  • 接收消息。在main方法调用后,会一直执行下去。当有发布对应消息时,就会在jedisPubSub中接收到!
  • @param jedisPubSub
  • @param channels */ public static void subscribeMsg(JedisPubSub jedisPubSub,String channels){ Jedis jedis = null; try { jedis = getResource(); jedis.subscribe(jedisPubSub, channels); logger.debug("subscribeMsg {} = {}", jedisPubSub, channels); } catch (Exception e) { logger.warn("subscribeMsg {} = {}", jedisPubSub, channels, e); } finally { returnResource(jedis); } } JedisPubSub类的定义如下:

? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 package com.aq.web.shiro.redis.msg; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.JedisPubSub; import com.aq.dao.info.RedisMsgAuditInfo; import com.aq.web.shiro.redis.SelfObjectUtils; /**

  • Redis监听订阅事件
  • @author

*/ public class RedisMsgPubSubListener extends JedisPubSub{ private static Logger logger = LoggerFactory.getLogger(RedisMsgPubSubListener.class);

/**

  • 取得订阅的消息后的处理 */ @Override public void onMessage(String channel, String message) { logger.debug("onMessage: channel["+channel+"], message["+message+"]");

//如果消息类型与定义的审计专用类型一致 if(RedisMsgJedisUtils.getRedisMsgChannelString().equalsIgnoreCase(channel)){ //处理审计的消息 this.AuditMsgHandler(message); } } /**

  • 取得按表达式的方式订阅的消息后的处理 */ @Override public void onPMessage(String pattern, String channel, String message) { logger.debug("onPMessage: channel["+channel+"], message["+message+"]");

} /**

  • 初始化订阅时候的处理
    */ @Override public void onSubscribe(String channel, int subscribedChannels) { logger.debug("onSubscribe: channel["+channel+"],"+ "subscribedChannels["+subscribedChannels+"]");

} /**

  • 取消订阅时候的处理 */ @Override public void onUnsubscribe(String channel, int subscribedChannels) { logger.debug("onUnsubscribe: channel["+channel+"], "+ "subscribedChannels["+subscribedChannels+"]");

} /**

  • 取消按表达式的方式订阅时候的处理 */ @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { logger.debug("onPUnsubscribe: pattern["+pattern+"],"+ "subscribedChannels["+subscribedChannels+"]");

}

/**

  • 初始化按表达式的方式订阅时候的处理 */ @Override public void onPSubscribe(String pattern, int subscribedChannels) { logger.debug("onPSubscribe: pattern["+pattern+"], "+ "subscribedChannels["+subscribedChannels+"]");

}

/**

  • 私有方法:用于处理审计的消息 */ private void AuditMsgHandler(String message){ //审计日志反序列化 String -> byte[] -> RedisMsgAuditInfo RedisMsgAuditInfo msg3 = (RedisMsgAuditInfo) SelfObjectUtils.unserialize(message.getBytes()); logger.debug(msg3.toString()); //TODO 后续怎么存先不写... }

} 审计日志反序列化部分代码可以省略掉。

最后Junit中测试如下:

? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package ap.shiro.redis.msg; import org.junit.Test; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests; import com.aq.web.shiro.redis.JedisUtils; import com.aq.web.shiro.redis.msg.RedisMsgJedisUtils; import com.aq.web.shiro.redis.msg.RedisMsgPubSubListener; @ContextConfiguration(locations = "/mysql-test.xml") public class RedisMsgPubSubTester extends AbstractJUnit4SpringContextTests{

@Test public void testMsg_pub(){

RedisMsgJedisUtils.publishMsg("news.share", "2016年3月28日 15:34:37");

}

@Test public void testMsg_sub(){ RedisMsgPubSubListener pubsub = new RedisMsgPubSubListener(); RedisMsgJedisUtils.subscribeMsg(pubsub, "news.share");

} } 建议测试时,分开方法进行测试。其中testMsg_pub()方法是用来测试发布消息的,testMsg_sub()是用来测试接收订阅消息的。这里只是使用了普通订阅,大家还可以使用模式订阅。执行testMsg_sub()方法后,客户端会一直开启着,不会关闭。另外,在其他的redis客户端中发布一条消息,控制台就会立刻输出该消息。

这样Redis的订阅发布的Java实现就完成了。总体不太难,我今天搜资料的时候发现资料很多也很乱,当然没在开源中国搜。

© 著作权归作者所有

问题达人
粉丝 14
博文 94
码字总数 87450
作品 0
昌平
程序员
私信 提问
Spring Data Redis实现一个订阅/发布系统

Redis是一个key-value的存储系统,提供的key-value类似与Memcached而数据结构又多于memcached,而且性能优异.广泛用于缓存,临时存储等.而我今天这个例子是使用Redis实现一个订阅/发布系统,而不...

震秦
2012/10/26
15.8K
11
Redisson 1.2.0 发布,Redis 客户端

Redisson 1.2.0 发布,此版本更新内容如下: 新特性:支持 cluster 模式 bug 修复: RList iterator race conditions RDeque.addFirst RDeque.addLast 方法 OSGi 支持 Redisson 是基于Redis...

君枫
2015/01/27
3.2K
5
Redisson 2.2.2 发布,Redis 客户端

Resisson 2.2.2 发布,此版本更新内容: Feature - and methods were added to RedissonClient and RedissonReactiveClient Feature - new object added Fixed - RLock expiration renewal ......

淡漠悠然
2015/12/15
2.2K
3
Redisson 2.2.6 发布,Redis 客户端

Redisson 2.2.6发布,更新内容暂未发布,详情请看:CHANGELOG.md 下载地址:redisson-2.2.6 Redisson 是基于Redis服务之上构建的分布式、可伸缩的Java数据结构,高级的Redis客户端。【redis...

淡漠悠然
2016/01/29
1K
0
Redisson 2.2.1 发布,Redis 客户端

Redisson 2.2.1 发布,此版本更新内容如下: Feature - new object added with reentrant read/write locking Feature - new object added map-based cache with TTL support for each entr......

淡漠悠然
2015/12/11
841
0

没有更多内容

加载失败,请刷新页面

加载更多

好程序员Java教程分享Zookeeper基本原理与运用场景

好程序员Java教程分享Zookeeper基本原理与运用场景一、什么是Zookeeper? zookeeper是一个分布式的一致性协调服务。 换句话说,也可以把zookeeper看成一个小型的分布式文件系统。但是和FastD...

好程序员官网
5分钟前
2
0
mysql表情符

1 修改表字段为utf8md4 ALTER table property_info MODIFY `address` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL 2 MySQL数据库服务器配置文件mysqld.cn......

干死it
25分钟前
2
0
正则表达式的基本语法

本文摘自LTP.NET知识库。 正则表达式的形式一般如下: /love/ 其中位于“/”定界符之间的部分就是将要在目标对象中进行匹配的模式。 用户只要把希望查找匹配对象的模式内容放入“/”定界符之...

木庄
27分钟前
3
0
java 框架有哪些?

十大常用框架: 一、SpringMVC 二、Spring 三、Mybatis 四、Dubbo 五、Maven 六、RabbitMQ 七、Log4j 八、Ehcache 九、Redis 十、Shiro 延展阅读: 一、SpringMVC Spring Web MVC是一种基于J...

java框架开发者
27分钟前
6
0
细谈Mysql事务

文章原创于公众号:程序猿周先森。本平台不定时更新,喜欢我的文章,欢迎关注我的微信公众号。 上一篇着重谈到了MySQL锁的概念,里面谈到了事务的概念,其实大部分开发者对于事务肯定不陌生,...

程序猿周先森
36分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部