JMS基本概念
博客专区 > longload 的博客 > 博客详情
JMS基本概念
longload 发表于1年前
JMS基本概念
  • 发表于 1年前
  • 阅读 70
  • 收藏 0
  • 点赞 0
  • 评论 0
摘要: JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,即:Java消息服务。

基础信息 

  1. JMS的模型

    • P2P(Point-to-Point)点对点

      • 消息生产者(发送者)与消息消费者(接收者)在没有时间上的依赖。生产消息将发送到queue中(及算完成),消息消费者从queue中取出并且消费消息。
      • 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)。
      • Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费
      • 消费者在成功接收消息之后需向队列应答成功
    • Pub/Sub(Publish/Subscribe)发布订阅模型            

            PUB/SUB方式的工作流程,首先subscriber(订阅者)向JMS容器订阅(Listen to)自己感兴趣的topic(主题),多个订阅者可以同时对一个主题进行订阅,消息发布者发布一条消息,所有订阅了该主题的订阅者都能收到这个消息。默认情况下,pub/sub方式下的消息不是持久的,这意味着,消息一经发出,不管有没有人接收,都不会保存下来,而且订阅者只能接收到自已订阅之后发布者发出的消息。这种方式有点像订阅报刊杂志,一种报刊可以有多人同时订阅,但订阅者只能收到开始订阅之后的报社发行的期刊。

      • 消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息
      • 发布到topic的消息会被所有订阅者消费。
      • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,消费者必须保持激活(运行)状态,才能消费生产者的消息(激活时间内的消息)。
      • 为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。即使订阅者没有被激活(运行),它也能接收订阅之前生产者生产的消息。
  2. 持久订阅和非持久订阅

    • 非持久订阅:只有当客户端处于激活状态,也就是和JMS Provider 保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。 
    • 持久订阅时:客户端向JMS 服务器注册一个自己身份的ID,当这个客户端处于离线时,JMS Provider 会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS Provider时,会根据自己的ID得到所有当自己处于离线时发送到主题的消息。

编程模式

  1. 连接工厂(Connection Factories)

     连接工厂是用来创建客户端到JMS容器之间JMS连接的工厂,连接工厂有两种:(QueueConnectionFactory和TopicConnectionFactory),分别用来创建QueueConnection 和 TopicConnection的。

    Context ctx = new InitialContext();
    QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");
    TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) ctx.lookup("TopicConnectionFactory");
    
    

     

  2. 目的地(Destinations)

           目的地是消息生产者(producer)消息发住的目的地,也是消费者(consumer)接收消息的来源地,它有点像信箱,邮递员把信件投往信箱,收件人从信箱取信件。对P2P方式来说,目的地就是Queue,对pub/sub方式来说,目的地就是Topic。我们要得到这个目的地的引用,只能通过JNDI查找(lookup)的方式得到,因为目的地是注册在JMS服务器的(后面的章节会讲到如何注册一个目的地)

    Topic myTopic = (Topic) ctx.lookup("MyTopic");
    Queue myQueue = (Queue) ctx.lookup("MyQueue");
    

     

  3. 连接(Connection)

    连接是指客户端与JMS提供者(容器)之间的连接。有两种:QueueConnection (P2P连接)和TopicConnection (Pub/Sub连接)。注意:连接用完之后必须记得关闭,否则连接资源不会被释放掉。关闭连接的同时会自动把会话、产生者、消费者都关闭掉。

    QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
    TopicConnection topicConnection = topicConnectionFactory.createTopicConnection();
  4. 会话(Session)

    会话是用来创建消息产生者和消息消费者的单线程环境,你可以它来创建消息生产者、消费者、消息,用它来维持消息监听。

    TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    QueueSession queueSession = queueConnection.createQueueSession(true, 0);
  5. 消息生产者(Message Producers)

    消息生产者(消息的发送者),QueueSender(P2P方式)或者TopicPublisher(Pub/Sub方式)是由session创建的,用来把把消息发送到目的地的对象。

    QueueSender queueSender = queueSession.createSender(myQueue);
    TopicPublisher topicPublisher = topicSession.createPublisher(myTopic);
    
    /*发送消息*/
    queueSender.send(message);
    topicPublisher.publish(message);
  6. 消息消费者(Message Consumer)

       消息消费者(消息的接收者),QueueReceiverP2P方式TopicSubscriber(Pub/Sub方式)是由session来创建的,用来接收来自目的地消息的对象。JMS容器来负责把消息从目的地投递到注册了该目的地的消息消费者。

    QueueReceiver queueReceiver = queueSession.createReceiver(myQueue);
    TopicSubscriber topicSubscriber = topicSession.createSubscriber(myTopic);

           一旦创建好消息消费者,它就是活动的,你可以用它来接收消息,你也可以用close()方法来使它失效(Inactive)。当你调用Connection的start()方法之前,消费者是不会接收到任何消息的。两种接收者都有一个receive方法,这是一个同步的方法,也就是说程序执行到这个方法会被阻塞,直到收到消息为止。

    queueConnection.start();
    Message m = queueReceiver.receive();
    
    /*订阅模式*/
    topicConnection.start();
    Message m = topicSubscriber.receive(1000); // time out after a second
  7. 消息监听器(Message Listener)

           消息监听器是一个充当消息的异步事件处理器的对象,它实现了MessageListener接口,这个接口只有一个方法onMessage,在这个方法里,你可以定义当接收到消息之后的要做的操作。你可以用setMessageListener方法为消息消费者注册一个监听器。温馨提示:启动Connection 应setMessageListener 。

    MessageListener listener = new MessageListener( {
        /*当有消息的时候自动调用*/
    	public void onMessage(Message msg) {          
    	  //业务逻辑
    	}
    
    	});
    	topicSubscriber.setMessageListener(listener); // 注册监听
    	topicConnection.start();
  8. 消息选择器(Message Selectors)

           假如你只需要一个对滤器来过滤收到的消息,那么你可以使用消息选择器,它允许消费者指定只对特定的消息感兴趣。消息选择器只能是工作在JMS容器的,而不是我们的应用程序上。消息选择器是一个包含一个表达式的字符串,这个表达式的语法类似SQL的条件表达式,在createReceiver, createSubscriber这些方法里有一个参数让你指定一个消息选择器,由这些方法创建的消费者就只能收到与消息选择器匹配的消息了。

  9. 消息(Messages)

    JMS消息包括三个部分:消息头(Header),属性(Properties),消息体(Body)

    • 消息头里你可以指定JMSMessageID, JMSCorrelationID, JMSReplyTo, JMSType等信息。

    • 属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。

    • 消息体是消息的内容:

      • StreamMessage:Java 数据流消息,用标准流操作来顺序的填充和读取。

      • MapMessage:一个Map类型的消息;名称为 string 类型,而值为 Java 的基本类型。

      • TextMessage:普通字符串消息,包含一个String。

      • ObjectMessage:对象消息,包含一个可序列化的Java 对象

      •  BytesMessage:二进制数组消息,包含一个byte[]。

      • XMLMessage:  一个XML类型的消息。

TextMessage message = queueSession.createTextMessage();
message.setText(msg_text);

 在消费者端,接收到的总是一个通用的Message对象,你需要把它转型成特定的类型才能提取出里面的内容。

Message m = queueReceiver.receive();
if (m instanceof TextMessage) {    
    TextMessage message = (TextMessage) m;    
    System.out.println("Reading message: " + message.getText());
} else {    
    // Handle error
}

 

标签: JMS
共有 人打赏支持
粉丝 4
博文 66
码字总数 104702
×
longload
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: