文档章节

消息中间件(1)-JMS规范

haoran_10
 haoran_10
发布于 2016/07/15 16:37
字数 2209
阅读 141
收藏 2

一、什么是JMS,为什么需要它

(1)、消息中间件的定义

       指利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。消息中间件可以即支持同步方式,又支持异步方式。异步中间件比同步中间件具有更强的容错性,在系统故障时可以保证消息的正常传输。异步中间件技术又分为两类:广播方式和发布/订阅方式。由于发布/订阅方式可以指定哪种类型的用户可以接受哪种类型的消息,更加有针对性,事实上已成为异步中间件的非正式标准。

(2)、JMS定义

        从上个世纪90年代初,随着不同厂商消息中间件大量上市,消息中间件技术得到了长足的发展。目前,IBM和BEA的中间件产品在银行、证券、电信等高端行业,以及IT等行业中得到广泛应用。由于没有统一的规范和标准,基于消息中间件的应用不可移植,不同的消息中间件也不能互操作,这大大阻碍了消息中间件的发展。                   Java Message Service(JMS, Java消息服务)是SUN及其伙伴公司提出的旨在统一各种消息中间件系统接口的规范。它定义了一套通用的接口和相关语义,提供了诸如持久、验证和事务的消息服务,它最主要的目的是允许Java应用程序访问现有的消息中间件。JMS规范没有指定在消息节点间所使用的通讯底层协议,来保证应用开发人员不用与其细节打交道,一个特定的JMS实现可能提供基于TCP/IP、HTTP、UDP或者其它的协议。目前许多厂商采用并实现了JMS API,现在,JMS产品能够为企业提供一套完整的消息传递功能,下面是一些比较流行的JMS商业软件和开源产品。

(3)、解决了什么问题:

        采用异步通信模式:发送消息者可以在发送消息后进行其它的工作,不用等待接收者的回应,而接收者也不必在接到消息后立即对发送者的请求进行处理;

        客户和服务对象生命周期的松耦合关系:客户进程和服务对象进程不要求都正常运行,如果由于服务对象崩溃或者网络故障导致客户的请求不可达,客户不会接收到异常,消息中间件能保证消息不会丢失。

 

二、体系结构

整体结构如下:



(1)、ConnectionFactory

  连接工厂,JMS 用它创建连接Connection,一般设为单例模式,一旦创建,就一直运行在应用容器内

 

package javax.jms;
public interface ConnectionFactory {
    Connection createConnection() throws JMSException;//创建一个连接
    Connection createConnection(String userName, String password)//创建一个有密码的连接
        throws JMSException;
}

 

 

(2)、Connection

 

package javax.jms;
public interface Connection {
    Session createSession(boolean transacted, int acknowledgeMode)
        throws JMSException;
    String getClientID() throws JMSException; //唯一客户端ID
    void setClientID(String clientID) throws JMSException;
    ConnectionMetaData getMetaData() throws JMSException;
    ExceptionListener getExceptionListener() throws JMSException;
    void setExceptionListener(ExceptionListener listener) throws JMSException;
    void start() throws JMSException; //开启连接
    void stop() throws JMSException; //停止连接
    void close() throws JMSException; //关闭连接
    ConnectionConsumer createConnectionConsumer(
        Destination destination,
        String messageSelector,
        ServerSessionPool sessionPool,
        int maxMessages)
        throws JMSException;
    ConnectionConsumer createDurableConnectionConsumer(
        Topic topic,
        String subscriptionName,
        String messageSelector,
        ServerSessionPool sessionPool,
        int maxMessages)
        throws JMSException;
}

 JMS 客户端到JMS Provider 的连接,可以理解为数据库里的Connection

 

 

(3)、Session:一个发送或接收消息的线程

 

package javax.jms;
import java.io.Serializable;
public interface Session extends Runnable {
    static final int AUTO_ACKNOWLEDGE = 1; //四种模式
    static final int CLIENT_ACKNOWLEDGE = 2;
    static final int DUPS_OK_ACKNOWLEDGE = 3;
    static final int SESSION_TRANSACTED = 0;
    BytesMessage createBytesMessage() throws JMSException; //创建消息
    MapMessage createMapMessage() throws JMSException;
    Message createMessage() throws JMSException;
    ObjectMessage createObjectMessage() throws JMSException;
    ObjectMessage createObjectMessage(Serializable object) throws JMSException;
    StreamMessage createStreamMessage() throws JMSException;
    TextMessage createTextMessage() throws JMSException;
    TextMessage createTextMessage(String text) throws JMSException;
    boolean getTransacted() throws JMSException;
    int getAcknowledgeMode() throws JMSException;
    void commit() throws JMSException;
    void rollback() throws JMSException;
    void close() throws JMSException;
    void recover() throws JMSException;
    MessageListener getMessageListener() throws JMSException;
    void setMessageListener(MessageListener listener) throws JMSException;
    public void run();
    MessageProducer createProducer(Destination destination) //创建消息生产者
        throws JMSException;
    MessageConsumer createConsumer(Destination destination)
        throws JMSException;
    MessageConsumer createConsumer(
        Destination destination,
        java.lang.String messageSelector)
        throws JMSException;
    MessageConsumer createConsumer(
        Destination destination,
        java.lang.String messageSelector,
        boolean NoLocal)
        throws JMSException;
    Queue createQueue(String queueName) throws JMSException; //创建队列
    Topic createTopic(String topicName) throws JMSException; //创建主题
    TopicSubscriber createDurableSubscriber(Topic topic, String name)//创建主题订阅者
        throws JMSException;
    TopicSubscriber createDurableSubscriber(
        Topic topic,
        String name,
        String messageSelector,
        boolean noLocal)
        throws JMSException;
    QueueBrowser createBrowser(Queue queue) throws JMSException;
    QueueBrowser createBrowser(Queue queue, String messageSelector)
        throws JMSException;
    TemporaryQueue createTemporaryQueue() throws JMSException;
    TemporaryTopic createTemporaryTopic() throws JMSException;
    void unsubscribe(String name) throws JMSException;
}

 

所以,session具有创建消息,主题,队列,生产者,消费者功能。

(4)、MessageProducer Session 对象创建的用来发送消息的对象

(5)、Destination:消息的目的地,包括队列(PTP),主题(Pub/Sub

(6)、MessageConsumer Session 对象创建的用来接收消息的对象

(7)、Message:

JMS 消息由以下几部分组成:消息头,属性,消息体。

   消息头(header):JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表示消息、设置优先权和失效时间等等,并且为消息确定路由。

   属性(property):由消息发送者产生,用来添加删除消息头以外的附加信息。

  消息体(body):由消息发送者产生,JMS中定义了5种消息体:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。

 

三、两种消息传递模型

JMS Parent

PTP Domain

Pub/Sub Domain

ConnectionFactory

QueueConnectionFactory

TopicConnectionFactory

Connection

QueueConnection

TopicConnection

Destination

Queue

Topic

Session

QueueSession

TopicSession

MessageProducer

QueueSender

TopicPublisher

MessageConsumer

QueueReceiver

TopicSubscriber

(1)、点对点模型(PTP) 

        点对点模型用于消息生产者和消息消费者之间点到点的通信。消息生产者将消息发动到由某个名字标识的特定消费者。这个名字实际上对应于消息服务中的一个队列(Queue),在消息传动给消费者之前它被存储在这个队列中。队列可以是持久的,以保证在消息服务出现故障时仍然能够传递消息。

(2)、发布-订阅模型(Pub/Sub)

        发布-订阅模型用称为主题(topic)的内容分层结构代替了PTP模型中的惟一目的地,发送应用程序发布自己的消息,指出消息描述的是有关分层结构中的一个主题的信息。希望接收这些消息的应用程序订阅了这个主题。订阅包含子主题的分层结构中的主题的订阅者可以接收该主题和其子主题发表的所有消息

三、原生态JMS编程实践

 

广义上说,一个JMS应用是几个JMS 客户端交换消息,开发JMS客户端应用由以下几步构成

(1)、用JNDI 得到ConnectionFactory对象;

(2)、用JNDI 得到目标队列或主题对象,即Destination对象;

(3)、用ConnectionFactory创建Connection 对象;

(4)、用Connection对象创建一个或多个JMS Session;

(5)、用Session 和Destination 创建MessageProducer和MessageConsumer;

(6)、通知Connection 开始传递消息。

消费生产者

import java.io.*;
import javax.jms.*;
import javax.naming.*;
 
public class Sender {
 
    public static void main(String[] args) {
        new Sender().send();
    }
 
    public void send() {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            //Prompt for JNDI names
            System.out.println("Enter ConnectionFactory name:");
            String factoryName = reader.readLine();
            System.out.println("Enter Destination name:");
            String destinationName = reader.readLine();
 
            //Look up administered objects
            InitialContext initContext = new InitialContext();
            ConnectionFactory factory =
                (ConnectionFactory) initContext.lookup(factoryName);
            Destination destination = (Destination) initContext.lookup(destinationName);
            initContext.close();
 
            //Create JMS objects
            Connection connection = factory.createConnection();
            Session session =
                connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageProducer sender = session.createProducer(queue);
 
            //Send messages
            String messageText = null;
            while (true) {
                System.out.println("Enter message to send or 'quit':");
                messageText = reader.readLine();
                if ("quit".equals(messageText))
                    break;
                TextMessage message = session.createTextMessage(messageText);
                sender.send(message);
            }
 
            //Exit
            System.out.println("Exiting...");
            reader.close();
            connection.close();
            System.out.println("Goodbye!");
 
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
}

消息消费者

import java.io.*;
import javax.jms.*;
import javax.naming.*;
 
public class Receiver implements MessageListener {
    private boolean stop = false;
    public static void main(String[] args) {
        new Receiver().receive();
    }
 
    public void receive() {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            //Prompt for JNDI names
            System.out.println("Enter ConnectionFactory name:");
            String factoryName = reader.readLine();
            System.out.println("Enter Destination name:");
            String destinationName = reader.readLine();
            reader.close();
 
            //Look up administered objects
            InitialContext initContext = new InitialContext();
            ConnectionFactory factory =
                (ConnectionFactory) initContext.lookup(factoryName);
            Destination destination = (Destination) initContext.lookup(destinationName);
            initContext.close();
 
            //Create JMS objects
            Connection connection = factory.createConnection();
            Session session =
                connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageConsumer receiver = session.createConsumer(queue);
            receiver.setMessageListener(this);
            connection.start();
 
            //Wait for stop
            while (!stop) {
                Thread.sleep(1000);
            }
 
            //Exit
            System.out.println("Exiting...");
            connection.close();
            System.out.println("Goodbye!");
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
 
    public void onMessage(Message message) {
        try {
            String msgText = ((TextMessage) message).getText();
            System.out.println(msgText);
            if ("stop".equals(msgText))
                stop = true;
        } catch (JMSException e) {
            e.printStackTrace();
            stop = true;
        }
    }
}

 

四、总结

目前许多厂商采用并实现了JMS API,比如Active MQ

Active MQ是一个基于Apcache 2.0 licenced发布,开放源码的JMS产品。其特点为:

(1)、提供点到点消息模式和发布/订阅消息模式;

(2)、支持JBoss、Geronimo等开源应用服务器,支持Spring框架的消息驱动;

(3)、新增了一个P2P传输层,可以用于创建可靠的P2P JMS网络连接;

(4)、拥有消息持久化、事务、集群支持等JMS基础设施服务。

下篇学习Active MQ

 

© 著作权归作者所有

共有 人打赏支持
haoran_10
粉丝 25
博文 88
码字总数 80846
作品 0
杭州
程序员
搭建JEESZ分布式架构--消息中间件简介

消息中间件在JEESZ分布式架构中的作用 1) 消息中间件在分布式系统中完成消息的发送和接收。 2) 消息中间件可利用高效可靠的消息传递机制进行平台无关的数据交流, 并基于数据通信来进行分布式...

明理萝
08/14
0
0
2.ActiveMQ消息队列安装使用

全程是MOM (Message Oriented Middleware) 消息中间件 消息中间件有很多,比如: 1.ActiveMQ java语言编写的和java系统结合紧密 2.RabbitMQ Erlong语言开发的,天生支持高并发,性能优于A...

小杰java
2017/10/26
0
0
Java消息中间件的概述与JMS规范

为什么需要使用消息中间件 在介绍消息中间件之前,我们先来看一个故事: 老王的睡前故事: 在很久很久以前,小明隔壁有个姓王的邻居,姑且就叫隔壁老王吧。隔壁老王有个大女儿,名叫王兰花秀...

ZeroOne01
05/25
0
0
activeMQ5官方文档翻译-初始化配置

首先你需要把jar包加到classpath 所需的jar包 为了使ActiveMQ更容易使用,默认的activemq-all.jar包包含了所有需要用到的库文件。如果你喜欢以明确的控制jar包的方式来使用ActiveMQ,那下面是...

z_jordon
2015/05/29
0
0
spring整合jms系列之----点对点(一)

JMS作为一个支持点对点(PTP)和订阅式(pub/sub)式的消息中间件,为很多项目开发者所使用。Spring对JMS提供了很好的支持,可以通过JmsTemplate来方便地实现消息服务,由于JMS对Spring的支持...

码上中国博客
2015/11/12
0
0

没有更多内容

加载失败,请刷新页面

加载更多

利用碎片化时间Get Linux系统

起初,我做着一份与IT毫无关系的工作,每月领着可怜的工资,一直想改变现状,但无从下手,也就是大家熟知的迷茫。我相信,每一个人都会或多或少的经历过迷茫,迷茫每一个选择,迷茫工作或者生...

Linux就该这么学
17分钟前
0
0
图像显示深入学习一:Activity启动过程

一个月左右写了图像显示深入学习之文章开篇文章表明了自己近期的计划,前半年重新学习了opengl es,c++以及Linux的一些知识,觉得是时候开始看图像这一块的源码了,边看边补缺补漏吧。 作为该...

JerryLin123
40分钟前
1
0
给MySQL授权远程访问

putty登录服务器; 登录MySQL: mysql -u root -p 新建远程用户: CREATE USER 'myusername' IDENTIFIED BY 'mypassword'; 授权: grant all on *.* to john@'101.102.103.104' identified by......

sweethome
今天
1
0
在t-io老巢造谣,不过有造谣的就会有反造谣的!

只发当事人的截图,不发表评论,以免有引导嫌疑 PS: 截图是由不同的人发过来的 本人已经不在此微信群 图3:有造谣的,就有反造谣的 图4是2018-09-23的t-io官方群的一个发言小统计,有助于让...

talent-tan
今天
99
0
heartbeat 资源

drbd+apache+heartbeat : http://blog.51cto.com/11838039/1827901 heartbeat双机热备的架设 : http://blog.51cto.com/11838039/1827560 对heaetbeat的深一步认识 : http://blog.51cto.co......

寰宇01
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部