文档章节

消息中间件(1)-JMS规范

haoran_10
 haoran_10
发布于 2016/07/15 16:37
字数 2209
阅读 138
收藏 2
点赞 0
评论 0

一、什么是JMS,为什么需要它

(1)、消息中间件的定义

       指利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。消息中间件可以即支持同步方式,又支持异步方式。异步中间件比同步中间件具有更强的容错性,在系统故障时可以保证消息的正常传输。异步中间件技术又分为两类:广播方式和发布/订阅方式。由于发布/订阅方式可以指定哪种类型的用户可以接受哪种类型的消息,更加有针对性,事实上已成为异步中间件的非正式标准。

(2)、JMS定义

        从上个世纪90年代初,随着不同厂商消息中间件大量上市,消息中间件技术得到了长足的发展。目前,IBM和BEA的中间件产品在银行、证券、电信等高端行业,以及IT等行业中得到广泛应用。由于没有统一的规范和标准,基于消息中间件的应用不可移植,不同的消息中间件也不能互操作,这大大阻碍了消息中间件的发展。                   Java Message Service(JMS, Java消息服务)是SUN及其伙伴公司提出的旨在统一各种消息中间件系统接口的规范。它定义了一套通用的接口和相关语义,提供了诸如持久、验证和事务的消息服务,它最主要的目的是允许Java应用程序访问现有的消息中间件。JMS规范没有指定在消息节点间所使用的通讯底层协议,来保证应用开发人员不用与其细节打交道,一个特定的JMS实现可能提供基于TCP/IP、HTTP、UDP或者其它的协议。目前许多厂商采用并实现了JMS API,现在,JMS产品能够为企业提供一套完整的消息传递功能,下面是一些比较流行的JMS商业软件和开源产品。

(3)、解决了什么问题:

        采用异步通信模式:发送消息者可以在发送消息后进行其它的工作,不用等待接收者的回应,而接收者也不必在接到消息后立即对发送者的请求进行处理;

        客户和服务对象生命周期的松耦合关系:客户进程和服务对象进程不要求都正常运行,如果由于服务对象崩溃或者网络故障导致客户的请求不可达,客户不会接收到异常,消息中间件能保证消息不会丢失。

 

二、体系结构

整体结构如下:



(1)、ConnectionFactory

  连接工厂,JMS 用它创建连接Connection,一般设为单例模式,一旦创建,就一直运行在应用容器内

 

package javax.jms;
public interface ConnectionFactory {
    Connection createConnection() throws JMSException;//创建一个连接
    Connection createConnection(String userName, String password)//创建一个有密码的连接
        throws JMSException;
}

 

 

(2)、Connection

 

package javax.jms;
public interface Connection {
    Session createSession(boolean transacted, int acknowledgeMode)
        throws JMSException;
    String getClientID() throws JMSException; //唯一客户端ID
    void setClientID(String clientID) throws JMSException;
    ConnectionMetaData getMetaData() throws JMSException;
    ExceptionListener getExceptionListener() throws JMSException;
    void setExceptionListener(ExceptionListener listener) throws JMSException;
    void start() throws JMSException; //开启连接
    void stop() throws JMSException; //停止连接
    void close() throws JMSException; //关闭连接
    ConnectionConsumer createConnectionConsumer(
        Destination destination,
        String messageSelector,
        ServerSessionPool sessionPool,
        int maxMessages)
        throws JMSException;
    ConnectionConsumer createDurableConnectionConsumer(
        Topic topic,
        String subscriptionName,
        String messageSelector,
        ServerSessionPool sessionPool,
        int maxMessages)
        throws JMSException;
}

 JMS 客户端到JMS Provider 的连接,可以理解为数据库里的Connection

 

 

(3)、Session:一个发送或接收消息的线程

 

package javax.jms;
import java.io.Serializable;
public interface Session extends Runnable {
    static final int AUTO_ACKNOWLEDGE = 1; //四种模式
    static final int CLIENT_ACKNOWLEDGE = 2;
    static final int DUPS_OK_ACKNOWLEDGE = 3;
    static final int SESSION_TRANSACTED = 0;
    BytesMessage createBytesMessage() throws JMSException; //创建消息
    MapMessage createMapMessage() throws JMSException;
    Message createMessage() throws JMSException;
    ObjectMessage createObjectMessage() throws JMSException;
    ObjectMessage createObjectMessage(Serializable object) throws JMSException;
    StreamMessage createStreamMessage() throws JMSException;
    TextMessage createTextMessage() throws JMSException;
    TextMessage createTextMessage(String text) throws JMSException;
    boolean getTransacted() throws JMSException;
    int getAcknowledgeMode() throws JMSException;
    void commit() throws JMSException;
    void rollback() throws JMSException;
    void close() throws JMSException;
    void recover() throws JMSException;
    MessageListener getMessageListener() throws JMSException;
    void setMessageListener(MessageListener listener) throws JMSException;
    public void run();
    MessageProducer createProducer(Destination destination) //创建消息生产者
        throws JMSException;
    MessageConsumer createConsumer(Destination destination)
        throws JMSException;
    MessageConsumer createConsumer(
        Destination destination,
        java.lang.String messageSelector)
        throws JMSException;
    MessageConsumer createConsumer(
        Destination destination,
        java.lang.String messageSelector,
        boolean NoLocal)
        throws JMSException;
    Queue createQueue(String queueName) throws JMSException; //创建队列
    Topic createTopic(String topicName) throws JMSException; //创建主题
    TopicSubscriber createDurableSubscriber(Topic topic, String name)//创建主题订阅者
        throws JMSException;
    TopicSubscriber createDurableSubscriber(
        Topic topic,
        String name,
        String messageSelector,
        boolean noLocal)
        throws JMSException;
    QueueBrowser createBrowser(Queue queue) throws JMSException;
    QueueBrowser createBrowser(Queue queue, String messageSelector)
        throws JMSException;
    TemporaryQueue createTemporaryQueue() throws JMSException;
    TemporaryTopic createTemporaryTopic() throws JMSException;
    void unsubscribe(String name) throws JMSException;
}

 

所以,session具有创建消息,主题,队列,生产者,消费者功能。

(4)、MessageProducer Session 对象创建的用来发送消息的对象

(5)、Destination:消息的目的地,包括队列(PTP),主题(Pub/Sub

(6)、MessageConsumer Session 对象创建的用来接收消息的对象

(7)、Message:

JMS 消息由以下几部分组成:消息头,属性,消息体。

   消息头(header):JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表示消息、设置优先权和失效时间等等,并且为消息确定路由。

   属性(property):由消息发送者产生,用来添加删除消息头以外的附加信息。

  消息体(body):由消息发送者产生,JMS中定义了5种消息体:ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage。

 

三、两种消息传递模型

JMS Parent

PTP Domain

Pub/Sub Domain

ConnectionFactory

QueueConnectionFactory

TopicConnectionFactory

Connection

QueueConnection

TopicConnection

Destination

Queue

Topic

Session

QueueSession

TopicSession

MessageProducer

QueueSender

TopicPublisher

MessageConsumer

QueueReceiver

TopicSubscriber

(1)、点对点模型(PTP) 

        点对点模型用于消息生产者和消息消费者之间点到点的通信。消息生产者将消息发动到由某个名字标识的特定消费者。这个名字实际上对应于消息服务中的一个队列(Queue),在消息传动给消费者之前它被存储在这个队列中。队列可以是持久的,以保证在消息服务出现故障时仍然能够传递消息。

(2)、发布-订阅模型(Pub/Sub)

        发布-订阅模型用称为主题(topic)的内容分层结构代替了PTP模型中的惟一目的地,发送应用程序发布自己的消息,指出消息描述的是有关分层结构中的一个主题的信息。希望接收这些消息的应用程序订阅了这个主题。订阅包含子主题的分层结构中的主题的订阅者可以接收该主题和其子主题发表的所有消息

三、原生态JMS编程实践

 

广义上说,一个JMS应用是几个JMS 客户端交换消息,开发JMS客户端应用由以下几步构成

(1)、用JNDI 得到ConnectionFactory对象;

(2)、用JNDI 得到目标队列或主题对象,即Destination对象;

(3)、用ConnectionFactory创建Connection 对象;

(4)、用Connection对象创建一个或多个JMS Session;

(5)、用Session 和Destination 创建MessageProducer和MessageConsumer;

(6)、通知Connection 开始传递消息。

消费生产者

import java.io.*;
import javax.jms.*;
import javax.naming.*;
 
public class Sender {
 
    public static void main(String[] args) {
        new Sender().send();
    }
 
    public void send() {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            //Prompt for JNDI names
            System.out.println("Enter ConnectionFactory name:");
            String factoryName = reader.readLine();
            System.out.println("Enter Destination name:");
            String destinationName = reader.readLine();
 
            //Look up administered objects
            InitialContext initContext = new InitialContext();
            ConnectionFactory factory =
                (ConnectionFactory) initContext.lookup(factoryName);
            Destination destination = (Destination) initContext.lookup(destinationName);
            initContext.close();
 
            //Create JMS objects
            Connection connection = factory.createConnection();
            Session session =
                connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageProducer sender = session.createProducer(queue);
 
            //Send messages
            String messageText = null;
            while (true) {
                System.out.println("Enter message to send or 'quit':");
                messageText = reader.readLine();
                if ("quit".equals(messageText))
                    break;
                TextMessage message = session.createTextMessage(messageText);
                sender.send(message);
            }
 
            //Exit
            System.out.println("Exiting...");
            reader.close();
            connection.close();
            System.out.println("Goodbye!");
 
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
}

消息消费者

import java.io.*;
import javax.jms.*;
import javax.naming.*;
 
public class Receiver implements MessageListener {
    private boolean stop = false;
    public static void main(String[] args) {
        new Receiver().receive();
    }
 
    public void receive() {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            //Prompt for JNDI names
            System.out.println("Enter ConnectionFactory name:");
            String factoryName = reader.readLine();
            System.out.println("Enter Destination name:");
            String destinationName = reader.readLine();
            reader.close();
 
            //Look up administered objects
            InitialContext initContext = new InitialContext();
            ConnectionFactory factory =
                (ConnectionFactory) initContext.lookup(factoryName);
            Destination destination = (Destination) initContext.lookup(destinationName);
            initContext.close();
 
            //Create JMS objects
            Connection connection = factory.createConnection();
            Session session =
                connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageConsumer receiver = session.createConsumer(queue);
            receiver.setMessageListener(this);
            connection.start();
 
            //Wait for stop
            while (!stop) {
                Thread.sleep(1000);
            }
 
            //Exit
            System.out.println("Exiting...");
            connection.close();
            System.out.println("Goodbye!");
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
 
    public void onMessage(Message message) {
        try {
            String msgText = ((TextMessage) message).getText();
            System.out.println(msgText);
            if ("stop".equals(msgText))
                stop = true;
        } catch (JMSException e) {
            e.printStackTrace();
            stop = true;
        }
    }
}

 

四、总结

目前许多厂商采用并实现了JMS API,比如Active MQ

Active MQ是一个基于Apcache 2.0 licenced发布,开放源码的JMS产品。其特点为:

(1)、提供点到点消息模式和发布/订阅消息模式;

(2)、支持JBoss、Geronimo等开源应用服务器,支持Spring框架的消息驱动;

(3)、新增了一个P2P传输层,可以用于创建可靠的P2P JMS网络连接;

(4)、拥有消息持久化、事务、集群支持等JMS基础设施服务。

下篇学习Active MQ

 

© 著作权归作者所有

共有 人打赏支持
haoran_10
粉丝 25
博文 88
码字总数 80846
作品 0
杭州
程序员
Java消息中间件的概述与JMS规范

为什么需要使用消息中间件 在介绍消息中间件之前,我们先来看一个故事: 老王的睡前故事: 在很久很久以前,小明隔壁有个姓王的邻居,姑且就叫隔壁老王吧。隔壁老王有个大女儿,名叫王兰花秀...

ZeroOne01
05/25
0
0
2.ActiveMQ消息队列安装使用

全程是MOM (Message Oriented Middleware) 消息中间件 消息中间件有很多,比如: 1.ActiveMQ java语言编写的和java系统结合紧密 2.RabbitMQ Erlong语言开发的,天生支持高并发,性能优于A...

小杰java
2017/10/26
0
0
activeMQ5官方文档翻译-初始化配置

首先你需要把jar包加到classpath 所需的jar包 为了使ActiveMQ更容易使用,默认的activemq-all.jar包包含了所有需要用到的库文件。如果你喜欢以明确的控制jar包的方式来使用ActiveMQ,那下面是...

z_jordon
2015/05/29
0
0
Java消息中间件之ActiveMQ

一、消息中间件 1.消息中间件概述 中间件:非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件。 消息中间件:关注与数据的发送和接...

aibinxiao
2017/11/01
0
0
9.java web的发展 javaweb是什么 J2EE发展历史 规范 J2EE是什么 发展背景 组件标准 J2EE好处作用 Servlet 含义 本质 发展 java在web中的发展 servlet工作流程 java 开发web项目发展 javaBean jsp

javaweb 本质上来说就是使用java 语言来解决企业web应用中一系列技术体系与规范; jdk1.2 playground 此版本中把java技术体系拆分为三个方向 J2SE J2EE J2ME 这个大家都知道 其中之一的J2EE,...

noteless
06/29
0
0
spring整合jms系列之----点对点(一)

JMS作为一个支持点对点(PTP)和订阅式(pub/sub)式的消息中间件,为很多项目开发者所使用。Spring对JMS提供了很好的支持,可以通过JmsTemplate来方便地实现消息服务,由于JMS对Spring的支持...

码上中国博客
2015/11/12
0
0
JCA——一个名不见经传却重要的JavaEE规范

JCA(Java EE Connector Architecture)规范可以说是JavaEE规范集合里最“默默无闻”的,在JavaEE1.3规范发布时就加入了,比现在重要成员JPA, CDI等都早了很多。从应用开发角度来看,开发一个...

z_jordon
2015/08/04
0
0
一起学ActiveMQ 01(JMS简介)

介绍JMS JMS API 说明书是Java程序创建,发送,接收异步消息的标准接口。许多企业J或组织实现了JMS规范说明书,就有了JMS产品,有时候叫消息中间件。现在流行的有如下几个 TIBCO EMS (TIBCO...

maxingji
2017/11/05
0
0
关于 Jms Topic 持久订阅

消息中间件的 Topic 机制,一般情况下没有保存消息。一没连接,再次连接时不会收到失去连接期间的消息。这种机制在对消息可丢失的场景应用好。当然消息中间件都有保存消息的功 能。Jms 规范里...

小编辑
2010/02/27
0
1
在WAS v6使用JMS激活规范代替侦听端口

最近碰上使用WebSphere Application Server V6中(简称WASV6)使用JMS的侦听端口问题,由于工作需要,需要进一步了解JMS规范与websphere MQ产品,特作此文。 我们有两个选择去解决这个问题:...

晨曦之光
2012/03/09
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Laravel5.5 MySQL配置、读写分离及操作

Laravel 让连接不同数据库以及对数据库进行增删改查操作: 参考:http://laravelacademy.org/post/854.html 配置读写分离 应用的数据库配置位于 config/database.php(但是数据库用户及密码等...

MichaelShu
5分钟前
0
0
TraitsUI与Mayavi实例

一:创建一个简单的TraitsUI与Mayavi实例 # -*- coding: utf-8 -*-from numpy import sqrt,sin,mgridfrom traits.api import HasTraits,Instancefrom traitsui.api import View,Item......

wangxuwei
10分钟前
0
0
Linux 查看用户

存储帐号的文件:/etc/passwd 存储密码的文件:/etc/shadow 查看当前系统所有用户 grep bash /etc/passwd root修改普通用户的密码 sudo passwd user_name 然后连续两次输入新的用户密码即可...

yeahlife
11分钟前
0
0
Webpack使用nodemon实时打包编译

业务场景: 1.编写一个npm组件包并且link到了项目文件中 2.需要不断的修改并run build编译npm包并且在项目run dev 查看效果 3.问题: 每次改完npm包都要手动run build编译十分的麻烦且低效,可不...

JamesView
22分钟前
0
0
电脑炸了,浪费我好几天时间,还是简要记下来吧

我的小本本一直在兢兢业业的干活,然而前几天说炸就炸了...... 爆炸现场: 软件: windows10 pro + EIS11+ 360卫士 BIOS:N1DET98W 2.24 硬件: Xeon E3 1505-V5 nv-M3000M thinkpadP70:20E...

Oh_really
27分钟前
0
0
Git之branch和checkout

1.branch是查看、创建、删除分支 #>git branch --helpNAME git-branch - List, create, or delete branchesSYNOPSIS git branch [--color[=<when>] | --no-color] [......

汉斯-冯-拉特
28分钟前
0
0
Mybatis拦截器之数据权限过滤与分页集成

需求场景 最近项目有个数据权限的业务需求,要求大致为每个单位只能查看本级单位及下属单位的数据,例如:一个集团军下属十二个旅,那么军级用户可以看到所有数据,而每个旅则只能看到本旅部...

佛系程序猿灬
38分钟前
9
0
SpringCloud 微服务 (十六) 服务追踪 Zipkin

问题 在服务中,有一个接口,该A接口中又调用了其他服务的B、C、D接口,出现一个请求耗时大的问题,这时候并不知道该B、C、D接口中哪个接口造成的耗时量,然后比如确定C服务接口出现的耗时量大,但...

___大侠
今天
0
0
Java面试基础篇——第八篇:抽象类与接口的区别

1.抽象类 抽象类:如果一个类中包含有抽象方法,或这个类使用abstract关键字修饰,则称这个类是抽象类。 抽象方法是什么呢?抽象方法就是指用abstract关键字修饰的方法。 需要注意的是:抽象...

developlee的潇洒人生
今天
2
0
jsoup 相关资料

1.jsoup 2.Jsoup概述 3.jsoup入门 4.jsoup Java HTML Parser 1.11.3 API

IT追寻者
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部