ActiveMQ发布订阅模式
博客专区 > postdep 的博客 > 博客详情
ActiveMQ发布订阅模式
postdep 发表于2年前
ActiveMQ发布订阅模式
  • 发表于 2年前
  • 阅读 118
  • 收藏 1
  • 点赞 0
  • 评论 0

腾讯云 新注册用户 域名抢购1元起>>>   

摘要: ActiveMQ的另一种模式就SUB/HUB即发布订阅模式,是SUB/hub就是一拖N的USB分线器的意思。意思就是一个来源分到N个出口。还是上节的例子,当一个订单产生后,后台N个系统需要联动,但有一个前提是都需要收到订单信息,那么我们就需要将一个生产者的消息发布到N个消费者。

ActiveMQ的另一种模式就SUB/HUB即发布订阅模式,是SUB/hub就是一拖N的USB分线器的意思。意思就是一个来源分到N个出口。还是上节的例子,当一个订单产生后,后台N个系统需要联动,但有一个前提是都需要收到订单信息,那么我们就需要将一个生产者的消息发布到N个消费者。

生产者:

 

复制代码
try { //Create the Connection Factory  IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/"); using (IConnection connection = factory.CreateConnection()) { //Create the Session  using (ISession session = connection.CreateSession()) { //Create the Producer for the topic/queue  IMessageProducer prod = session.CreateProducer( new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing")); //Send Messages  int i = 0; while (!Console.KeyAvailable) { ITextMessage msg = prod.CreateTextMessage(); msg.Text = i.ToString(); Console.WriteLine("Sending: " + i.ToString()); prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue); System.Threading.Thread.Sleep(5000); i++; } } } Console.ReadLine(); } catch (System.Exception e) { Console.WriteLine("{0}", e.Message); Console.ReadLine(); }
复制代码

假设生产者每5秒发送一次消息:

wps3E59.tmp

消费者:

复制代码
static void Main(string[] args) { try { //Create the Connection factory  IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/"); //Create the connection  using (IConnection connection = factory.CreateConnection()) { connection.ClientId = "testing listener1"; connection.Start(); //Create the Session  using (ISession session = connection.CreateSession()) { //Create the Consumer  IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener1", null, false); consumer.Listener += new MessageListener(consumer_Listener); Console.ReadLine(); } connection.Stop(); connection.Close(); } } catch (System.Exception e) { Console.WriteLine(e.Message); } } static void consumer_Listener(IMessage message) { try { ITextMessage msg = (ITextMessage)message; Console.WriteLine("Receive: " + msg.Text); } catch (System.Exception e) { Console.WriteLine(e.Message); } }
复制代码

 

启动一个消费者:

wps3E5A.tmp

我们发现他是从15开始的,而不是像上节一样从头开始,再启动另一个消费者:

wps3E5B.tmp

我们发现就是从启动时开始接受消息的,之前的消息就丢失了。

整体状态如下:

wps3E6B.tmp

我们观察管理界面:

wps3E6C.tmp

产生了一个testing的Topics,而订阅方有2个都订阅的是testing:

wps3E6D.tmp

这样只需要在需要获取消息的地方订阅即可及时获得。

源代码下载

共有 人打赏支持
postdep
粉丝 76
博文 247
码字总数 259349
×
postdep
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: