文档章节

Java实现Redis发布/订阅

夜辰
 夜辰
发布于 2016/03/28 22:43
字数 1431
阅读 3095
收藏 39

    今天经理让我实现一个Redis发布/订阅功能,用来记录审计信息。我查了一天,才弄出来。总结如下,大神勿喷。第二天我百度“JedisPubSub ”发现好多代码,后来测试了一下发现,百度关键词“Jedis订阅发布”确实没多少可以参考的,但是百度关键词“Jedis的Publish/Subscribe”代码就有很多,我也是醉了。

    redis的发布/订阅模式是消息机制之一,另外一个叫生成者消费者模式。Redis发布订阅模式讲解可以参考菜鸟教程的这篇文章http://www.runoob.com/redis/redis-pub-sub.html

    1、Redis发布订阅模式客户端实现。在打开Redis服务器后,再打开两个客户端,客户端1用来接收消息,客户端2用来发布消息。

客户端1订阅bar频道。格式:SUBSCRIBE name1 name2。 
成功订阅回复,分别对应订阅类型、订阅频道、订阅数量。

127.0.0.1:6379> SUBSCRIBE bar
Reading messages... (press Ctrl-C to quit)1) "subscribe"2) "bar"3) (integer) 1

客户端2,发送消息。格式:publish channelName Message。

127.0.0.1:6379> publish bar val
(integer) 1

客户端1订阅回复,分别对应消息类型,频道,消息。

1) "message"2) "bar"3) "val"

效果图如下:

Redis支持模式匹配订阅,*为模糊匹配符。

订阅所有频道的消息

PSUBSCRIBE *  

订阅以news.开头的所有频道。

PSUBSCRIBE news.*

    其他操作这里不在赘述。

    

    2、Java实现Redis的发布订阅。

    Java实现Redis的功能大部分是使用jedis的jar包来进行操作。个人感觉,jedis封装了操作redis的常用命令,写多了就会发现知道redis命令怎么写的,就可以猜出来jedis中怎么写的。

    首先在我们封装的JedisUtils中加入发布和订阅操作的方法。大家没有JedisUtils的可以参考JeeSite中的JedisUtils怎么写的。

 

 /**
  * 发布一个消息
  * @param channel
  * @param message
  */
 public static void publishMsg(String channel,String message){
  Jedis jedis = null;
  try {
   jedis = getResource();
   
jedis.publish(channel, message);
   logger.debug("publishMsg {} = {}", channel, message);
  } catch (Exception e) {
   logger.warn("publishMsg {} = {}", channel, message, e);
  } finally {
   returnResource(jedis);
  }
 }
 
 /**
  * 发布一个消息
  * @param channel
  * @param message
  */
 public static void publishMsg(byte[] channel,byte[] message){
  Jedis jedis = null;
  try {
   jedis = getResource();
  
 jedis.publish(channel, message);
   logger.debug("publishMsg {} = {}", channel, message);
  } catch (Exception e) {
   logger.warn("publishMsg {} = {}", channel, message, e);
  } finally {
   returnResource(jedis);
  }
 }

 

     上面的两个方法的核心处理就是jedis.publish(channel, message);参数channel是消息的频道,message是消息的内容。在Junit测试或者其他的地方,使用工具类的此方法即可发布一个消息。

    接收消息代码多一些。首先定义一个类继承JedisPubSub,然后实现其中的未实现的方法,最后在工具类JedisUtils中定义一个操作的方法即可。代码如下:

 /**
  * 接收消息。在main方法调用后,会一直执行下去。当有发布对应消息时,就会在jedisPubSub中接收到!
  * @param jedisPubSub
  * @param channels
  */
 public static void subscribeMsg(JedisPubSub jedisPubSub,String channels){
  Jedis jedis = null;
  try {
   jedis = getResource();
   jedis.subscribe(jedisPubSub, channels);
   logger.debug("subscribeMsg {} = {}", jedisPubSub, channels);
  } catch (Exception e) {
   logger.warn("subscribeMsg {} = {}", jedisPubSub, channels, e);
  } finally {
   returnResource(jedis);
  }
 }

JedisPubSub类的定义如下:

package com.aq.web.shiro.redis.msg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisPubSub;
import com.aq.dao.info.RedisMsgAuditInfo;
import com.aq.web.shiro.redis.SelfObjectUtils;
/**
 * Redis监听订阅事件 
 * @author 
 *
 */
public class RedisMsgPubSubListener extends JedisPubSub{
 private static Logger logger = LoggerFactory.getLogger(RedisMsgPubSubListener.class);
 
 
 /**
  * 取得订阅的消息后的处理
  */
 @Override
 public void onMessage(String channel, String message) {
  logger.debug("onMessage: channel["+channel+"], message["+message+"]");
  
  //如果消息类型与定义的审计专用类型一致
  if(RedisMsgJedisUtils.getRedisMsgChannelString().equalsIgnoreCase(channel)){
   //处理审计的消息
   this.AuditMsgHandler(message);
  }
 }
 /**
  * 取得按表达式的方式订阅的消息后的处理
  */
 @Override
 public void onPMessage(String pattern, String channel, String message) {
  logger.debug("onPMessage: channel["+channel+"], message["+message+"]");
  
 }
 /**
  * 初始化订阅时候的处理   
  */
 @Override
 public void onSubscribe(String channel, int subscribedChannels) {
   logger.debug("onSubscribe: channel["+channel+"],"+
     "subscribedChannels["+subscribedChannels+"]");
  
 }
 /**
  * 取消订阅时候的处理 
  */
 @Override
 public void onUnsubscribe(String channel, int subscribedChannels) {
  logger.debug("onUnsubscribe: channel["+channel+"], "+
                                               "subscribedChannels["+subscribedChannels+"]");
  
 }
 /**
  * 取消按表达式的方式订阅时候的处理
  */
 @Override
 public void onPUnsubscribe(String pattern, int subscribedChannels) {
   logger.debug("onPUnsubscribe: pattern["+pattern+"],"+
                                               "subscribedChannels["+subscribedChannels+"]");
  
 }
 
 /**
  * 初始化按表达式的方式订阅时候的处理 
  */
 @Override
 public void onPSubscribe(String pattern, int subscribedChannels) {
  logger.debug("onPSubscribe: pattern["+pattern+"], "+
                                               "subscribedChannels["+subscribedChannels+"]");
  
 }
 
 /**
  * 私有方法:用于处理审计的消息
  */
 private void AuditMsgHandler(String message){
  //审计日志反序列化 String -> byte[] -> RedisMsgAuditInfo
  RedisMsgAuditInfo msg3 = (RedisMsgAuditInfo) SelfObjectUtils.unserialize(message.getBytes());
  logger.debug(msg3.toString());
  //TODO 后续怎么存先不写...
 }
 
}

 审计日志反序列化部分代码可以省略掉。

 

 最后Junit中测试如下:

 

package ap.shiro.redis.msg;
import org.junit.Test;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
import com.aq.web.shiro.redis.JedisUtils;
import com.aq.web.shiro.redis.msg.RedisMsgJedisUtils;
import com.aq.web.shiro.redis.msg.RedisMsgPubSubListener;
@ContextConfiguration(locations = "/mysql-test.xml")
public class RedisMsgPubSubTester extends AbstractJUnit4SpringContextTests{
 
 @Test
 public void testMsg_pub(){
  
  RedisMsgJedisUtils.publishMsg("news.share", "2016年3月28日 15:34:37");
  
 }
 
 @Test
 public void testMsg_sub(){
  RedisMsgPubSubListener pubsub = new RedisMsgPubSubListener();
  RedisMsgJedisUtils.subscribeMsg(pubsub, "news.share");
  
 }
}

    建议测试时,分开方法进行测试。其中testMsg_pub()方法是用来测试发布消息的,testMsg_sub()是用来测试接收订阅消息的。这里只是使用了普通订阅,大家还可以使用模式订阅。执行testMsg_sub()方法后,客户端会一直开启着,不会关闭。另外,在其他的redis客户端中发布一条消息,控制台就会立刻输出该消息。

    这样Redis的订阅发布的Java实现就完成了。总体不太难,我今天搜资料的时候发现资料很多也很乱,当然没在开源中国搜。

    另外,上面例子中使用的JedisPubSub类的子类接收消息。第二天测试时发现,JedisPubSub类的子类接收字符串类的消息没问题,但是接收对象转byte[]的消息后不能正确地转换回对象。这里的对象是自定义的,通过ByteArray流和Object流完成Object与byte[]之间的转换。

    经过查资料后发现,使用BinaryJedisPubSub类的子类接收消息可以正确地转换对象,不会出现上述问题。大家自己试下。继承BinaryJedisPubSub类的监听器跟JedisPubSub类的监听器类似。

    

 

 

 

 

 

 

 

 

© 著作权归作者所有

共有 人打赏支持
夜辰
粉丝 11
博文 26
码字总数 7946
作品 0
昌平
程序员
私信 提问
加载中

评论(2)

夜辰
夜辰

引用来自“徐伟思”的评论

多个客户端的情况呢?
这是一年前被外包公司的架构逼着搞的。当时我也没有试过多客户端。😁
徐伟思
徐伟思
多个客户端的情况呢?
Spring Data Redis实现一个订阅/发布系统

Redis是一个key-value的存储系统,提供的key-value类似与Memcached而数据结构又多于memcached,而且性能优异.广泛用于缓存,临时存储等.而我今天这个例子是使用Redis实现一个订阅/发布系统,而不...

震秦
2012/10/26
0
11
Redisson 1.2.0 发布,Redis 客户端

Redisson 1.2.0 发布,此版本更新内容如下: 新特性:支持 cluster 模式 bug 修复: RList iterator race conditions RDeque.addFirst RDeque.addLast 方法 OSGi 支持 Redisson 是基于Redis...

君枫
2015/01/27
3.2K
5
Redisson 2.2.2 发布,Redis 客户端

Resisson 2.2.2 发布,此版本更新内容: Feature - and methods were added to RedissonClient and RedissonReactiveClient Feature - new object added Fixed - RLock expiration renewal ......

淡漠悠然
2015/12/15
2.2K
3
Redisson 2.2.6 发布,Redis 客户端

Redisson 2.2.6发布,更新内容暂未发布,详情请看:CHANGELOG.md 下载地址:redisson-2.2.6 Redisson 是基于Redis服务之上构建的分布式、可伸缩的Java数据结构,高级的Redis客户端。【redis...

淡漠悠然
2016/01/29
984
0
Redisson 2.2.1 发布,Redis 客户端

Redisson 2.2.1 发布,此版本更新内容如下: Feature - new object added with reentrant read/write locking Feature - new object added map-based cache with TTL support for each entr......

淡漠悠然
2015/12/11
838
0

没有更多内容

加载失败,请刷新页面

加载更多

php __call,__callStatic

// demo1.php<?phpclass test{public function run(){static::who();test::who();self::who(); $this->who();}public static function __callS...

小张525
3分钟前
0
0
Java发展历史

1995年5月23日,Java语言诞生 1996年1月,第一个JDK-JDK1.0诞生 1996年4月,10个最主要的操作系统供应商申明将在其产品中嵌入JAVA技术 1996年9月,约8.3万个网页应用了JAVA技术来制作 1997年...

二九结狐六体
4分钟前
0
0
蚂蚁金服核心技术:百亿特征实时推荐算法揭秘

摘要: 文章提出一整套创新算法与架构,通过对TensorFlow底层的弹性改造,解决了在线学习的弹性特征伸缩和稳定性问题,并以GroupLasso和特征在线频次过滤等自研算法优化了模型稀疏性。在支付...

阿里云官方博客
9分钟前
0
0
Dubbo底层采用Socket进行通信详解

Dubbo底层采用Socket进行通信详解 由于Dubbo底层采用Socket进行通信,自己对通信理理论也不是很清楚,所以顺便把通信的知识也学习一下。 n 通信理论 计算机与外界的信息交换称为通信。基本的...

DemonsI
18分钟前
0
0
Sublime Text3快捷键大全

Sublime Text3快捷键大全 选择类 Ctrl+D 选中光标所占的文本,继续操作则会选中下一个相同的文本。 Alt+F3 选中文本按下快捷键,即可一次性选择全部的相同文本进行同时编辑。举个栗子:快速选...

linjin200
23分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部