最近在做毕业设计,其实有一个地方需要用到消息中间件,老实讲,在来公司实习之前,一直没闹明白啥是JMS,啥又是消息中间件。后来把百度百科里面的解释看了又看,貌似明白了一点,再把关于MQ的解释看了下,基本明白了吧。举个例子吧(如有不当之处,麻烦指出):
例如,现在在淘宝上,客户A购买了一件商品,然后他付款了,假设付款的处理是由应用APP1处理,但大家知道,付款是需要到银行的系统中去请求,但银行啥时候处理完成,我们并
不知道。这个时候,负责通知发货的子应用APP2并不确定啥时候扣款成功,因此他需要等,这个时候,我们可以用到消息中间件。那么,当APP1收到银行的通行时,他会向消息队伍中发送一条扣款成功的通知,APP2则会一直监听消息服务器,此时他一监听到能和,就可以马上通知发货了。
不知道我的表述是否清楚啊,我的理解大概就这样吧,其实他的应用远不止这些,还有一些应用场景,例如,在两个系统之间,一个系统负责处理业务,另一个子系统负责同步多个数据库之间的数据,但这种同步的实时性要求并不高,而且数据量比较大,那么我们就可以选择每天将数据存储到消息中间件中,到某一时刻再启动程序进行同步。这样的话服务器压力小很多。好了,扯了这么多啊,下面就来个简单的例子啊,百度上的例子基本就出于几个人之手,然后到处copy,而且还都是与spring整合的了,看的我是蛋疼啊,本人比较反对一上来就啥都与spring整合,因为spring帮我做了很多事,结果细节我们都不了解,可能某一天程序出问题了,我们不知道为啥,用一句不好听的话就是“死都不知道咋死的”!
对了,忘了介绍几个重要的概念,在JMS中有几个比较重要的概念:ConnectionFactory、Connection、Session、Destination、MessageProducer、MessageConsumer、Message,下面大概介绍下这个几概念:
ConnectionFactory:用来创建连接的连接工厂,就像我们的数据库连接工厂
Connection:封装了JSM与消息生产者之间的一个虚拟连接,类似于我们数据库的连接
Session:Session 是生产和消费消息的一个单线程上下文,也就是说,消息生产者、消息消费者、消息三者都与他打交道,从字面理解就是一次会话。
MessageProducer:消息生产者,这个从字面理解就差不多了
MessageConsumer:消息消费者
Message:消息
我使用的是Apache的activeMQ5.6,这个是apache的一个开源的项目。
我们需要用到的jar有至少有:
activemq-core-5.6.0.jar
commons-logging-1.1.1.jar
geronimo-j2ee-management_1.1_spec-1.0.1.jar
geronimo-jms_1.1_spec-1.1.1.jar(存储的是JMS的接口包,其实就是javax.jmx.*)
kahadb-5.6.0.jar(存储消息默认的处理的JAR包)
slf4j-api-1.6.4.jar
log4j-1.2.16.jar(可选,建议选,因为那样我们可以输出它的输出消息,知道发生了啥事,不然错了根本不知道为啥)
slf4j-log4j12-1.6.4.jar(可选)
首先我们从官方下载的例子中有DEMO,但我现在不看他的例子,首先,要发送消息,就必须要发送到一个消息的处理中间人吧(Broker):
Servier.java
package server;
import org.apache.activemq.broker.BrokerService;
public class Server{
public static void startServer() {
BrokerService service = new BrokerService();
try {
service.addConnector("tcp://localhost:10000");
service.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
上面的就是创建一个Broker,负责处理消息;
咱再来一个消息生产者吧:
MyMessageProducer.java
package producter;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MyMessageProducer{
public void product(String url,String queue) throws JMSException {
try {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(null,null, url);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queue);
MessageProducer producer = session.createProducer(destination);
sendMsg(session, producer);
session.commit();
connection.close();
} finally {
System.out.println("-----发送完毕!");
}
}
private void sendMsg(Session session, MessageProducer producer)
throws JMSException {
for (int i = 1; i <= 5; i++) {
TextMessage message = session.createTextMessage("学习ActiveMq 发送的消息"
+ i);
producer.send(message);
}
}
}
然后来个消息消费者吧:
MyMessageConsumer.java
package consumer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MyMessageConsumer{
private String name;
public ConsumerDemo1(String name) {
this.name = name;
}
public void revice(String url, String queue) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(null,null, url);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queue);
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
System.out.println(name+"------等待消息------");
Message msg = consumer.receive(1000);
TextMessage message = (TextMessage) msg;
if (null != message) {
System.out
.println("收到消息:" + message.getText());
}
}
}
}
这里需要说明下的是,接收消息并不止这一种方式,我们还可以使用java.jms.MessageListener来注册消息监听器。现在时间不够,下次再分享吧,其实也比较简单。
好了,上面差不多把我们需要的角色都给建起来了,现在需要测试了,来个Main.java吧
package main;
import javax.jms.JMSException;
import server.Server;
import producter.SenderDemo1;
import consumer.ConsumerDemo1;
public class Main {
/**
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
SenderDemo1 sender = new SenderDemo1();
ConsumerDemo1 consumer = new ConsumerDemo1("zhc");
String url = "tcp://localhost:10000",queue="zhc.queue";
try {
Server.startServer();
sender.product(url,queue);
consumer.revice(url,queue);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
我们运行这个类就可以运行了。相信大家可以看到控制台会正常输出的,不过由于上面的代码是从我测试的代码中抽出来的,有些地方我是在这个编辑器里面修改的,大家可能发现有些方法参数不对啥的,可以自己试着修改试下,不行可以问我。然后上面需要说明的是,默认消息会选择存储到本地文件中,在后面的介绍中我会讲到如何存储到数据库不,仍然不使用spring,那样的话理解更深刻一些吧。