初入JMS之activeMQ

原创
2012/11/24 17:48
阅读数 2.5K

最近在做毕业设计,其实有一个地方需要用到消息中间件,老实讲,在来公司实习之前,一直没闹明白啥是JMS,啥又是消息中间件。后来把百度百科里面的解释看了又看,貌似明白了一点,再把关于MQ的解释看了下,基本明白了吧。举个例子吧(如有不当之处,麻烦指出):

    例如,现在在淘宝上,客户A购买了一件商品,然后他付款了,假设付款的处理是由应用APP1处理,但大家知道,付款是需要到银行的系统中去请求,但银行啥时候处理完成,我们并
不知道。这个时候,负责通知发货的子应用APP2并不确定啥时候扣款成功,因此他需要等,这个时候,我们可以用到消息中间件。那么,当APP1收到银行的通行时,他会向消息队伍中发送一条扣款成功的通知,APP2则会一直监听消息服务器,此时他一监听到能和,就可以马上通知发货了。

    不知道我的表述是否清楚啊,我的理解大概就这样吧,其实他的应用远不止这些,还有一些应用场景,例如,在两个系统之间,一个系统负责处理业务,另一个子系统负责同步多个数据库之间的数据,但这种同步的实时性要求并不高,而且数据量比较大,那么我们就可以选择每天将数据存储到消息中间件中,到某一时刻再启动程序进行同步。这样的话服务器压力小很多。好了,扯了这么多啊,下面就来个简单的例子啊,百度上的例子基本就出于几个人之手,然后到处copy,而且还都是与spring整合的了,看的我是蛋疼啊,本人比较反对一上来就啥都与spring整合,因为spring帮我做了很多事,结果细节我们都不了解,可能某一天程序出问题了,我们不知道为啥,用一句不好听的话就是“死都不知道咋死的”!

    对了,忘了介绍几个重要的概念,在JMS中有几个比较重要的概念:ConnectionFactory、Connection、Session、Destination、MessageProducer、MessageConsumer、Message,下面大概介绍下这个几概念:

    ConnectionFactory:用来创建连接的连接工厂,就像我们的数据库连接工厂
    Connection:封装了JSM与消息生产者之间的一个虚拟连接,类似于我们数据库的连接
    Session:Session 是生产和消费消息的一个单线程上下文,也就是说,消息生产者、消息消费者、消息三者都与他打交道,从字面理解就是一次会话。
    MessageProducer:消息生产者,这个从字面理解就差不多了
    MessageConsumer:消息消费者
    Message:消息

    我使用的是Apache的activeMQ5.6,这个是apache的一个开源的项目。

    我们需要用到的jar有至少有:

    activemq-core-5.6.0.jar
    commons-logging-1.1.1.jar
    geronimo-j2ee-management_1.1_spec-1.0.1.jar
    geronimo-jms_1.1_spec-1.1.1.jar(存储的是JMS的接口包,其实就是javax.jmx.*
    kahadb-5.6.0.jar(存储消息默认的处理的JAR包
    slf4j-api-1.6.4.jar
    log4j-1.2.16.jar(可选,建议选,因为那样我们可以输出它的输出消息,知道发生了啥事,不然错了根本不知道为啥
    slf4j-log4j12-1.6.4.jar(可选


    首先我们从官方下载的例子中有DEMO,但我现在不看他的例子,首先,要发送消息,就必须要发送到一个消息的处理中间人吧(Broker):

    Servier.java

package server;
import org.apache.activemq.broker.BrokerService;
public class Server{
    public static void startServer() {
		BrokerService service = new BrokerService();
		try {
			service.addConnector("tcp://localhost:10000");
			service.start();
		} catch (Exception e) {
			e.printStackTrace();
		}
    }
}

    上面的就是创建一个Broker,负责处理消息;

     咱再来一个消息生产者吧:

    MyMessageProducer.java   

    

package producter;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class MyMessageProducer{
	public void product(String url,String queue) throws JMSException {
		try {
			ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(null,null, url);
			Connection connection = connectionFactory.createConnection();
			connection.start();
			Session session = connection.createSession(true,
					Session.AUTO_ACKNOWLEDGE);
			Destination destination = session.createQueue(queue);
			MessageProducer producer = session.createProducer(destination);
			sendMsg(session, producer);
			session.commit();
			connection.close();
		} finally {
			System.out.println("-----发送完毕!");
		}
	}

	private void sendMsg(Session session, MessageProducer producer)
			throws JMSException {
		for (int i = 1; i <= 5; i++) {
			TextMessage message = session.createTextMessage("学习ActiveMq 发送的消息"
					+ i);
			producer.send(message);
		}
	}
}

然后来个消息消费者吧:

    MyMessageConsumer.java

    

package consumer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;

public class MyMessageConsumer{
	private String name;

	public ConsumerDemo1(String name) {
		this.name = name;
	}

	public void revice(String url, String queue) throws JMSException {
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(null,null, url);
		Connection connection = connectionFactory.createConnection();
		connection.start();
		Session session = connection.createSession(true,
				Session.AUTO_ACKNOWLEDGE);

		Destination destination = session.createQueue(queue);
		MessageConsumer consumer = session.createConsumer(destination);
		while (true) {
			System.out.println(name+"------等待消息------");
			Message msg = consumer.receive(1000);
			TextMessage message = (TextMessage) msg;
			if (null != message) {
				System.out
						.println("收到消息:" + message.getText());
			} 
		}
	}

}


    这里需要说明下的是,接收消息并不止这一种方式,我们还可以使用java.jms.MessageListener来注册消息监听器。现在时间不够,下次再分享吧,其实也比较简单。

    好了,上面差不多把我们需要的角色都给建起来了,现在需要测试了,来个Main.java吧


package main;

import javax.jms.JMSException;

import server.Server;
import producter.SenderDemo1;
import consumer.ConsumerDemo1;

public class Main {

	/**
	 * @param args
	 * @throws InterruptedException
	 */
	public static void main(String[] args) throws InterruptedException {
		SenderDemo1 sender = new SenderDemo1();
		ConsumerDemo1 consumer = new ConsumerDemo1("zhc");
		String url = "tcp://localhost:10000",queue="zhc.queue";
		try {
			Server.startServer();
			sender.product(url,queue);
			consumer.revice(url,queue);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}
    我们运行这个类就可以运行了。相信大家可以看到控制台会正常输出的,不过由于上面的代码是从我测试的代码中抽出来的,有些地方我是在这个编辑器里面修改的,大家可能发现有些方法参数不对啥的,可以自己试着修改试下,不行可以问我。然后上面需要说明的是,默认消息会选择存储到本地文件中,在后面的介绍中我会讲到如何存储到数据库不,仍然不使用spring,那样的话理解更深刻一些吧。



展开阅读全文
打赏
1
16 收藏
分享
加载中
我测试完了,感谢楼主,适合入门级教程
2015/08/11 11:08
回复
举报
tianpeng91博主

引用来自“zkf17983”的评论

以下两个类不存在
SenderDemo1 sender = new SenderDemo1();
ConsumerDemo1 consumer = new ConsumerDemo1("zhc");

...我的测试类
2013/08/30 18:03
回复
举报
以下两个类不存在
SenderDemo1 sender = new SenderDemo1();
ConsumerDemo1 consumer = new ConsumerDemo1("zhc");
2013/08/30 15:41
回复
举报
tianpeng91博主

引用来自“dongfangke”的评论

你确定你的例子能跑得起来么?

确定啊,我跑过啊
2013/05/28 17:03
回复
举报
你确定你的例子能跑得起来么?
2013/05/28 09:38
回复
举报
更多评论
打赏
5 评论
16 收藏
1
分享
返回顶部
顶部