MQ消息队列

原创
2018/11/26 17:23
阅读数 50

何为MQ消息队列

    MQ 是message queue ,消息队列,也叫消息中间件,遵守JMS(java message service)规范的一种软件。(同时还有另一个叫AMQP的应用层协议,语言无关性不受产品 语言等限制,rabbitMQ支持这个 )(借用一下

    MQ消息对象分为Queues和Topic,其中Queues为点对点的消息发送,Topic为广播式的消息推送。

    MQ一般有生产者/发布者和消费者/订阅者    生产者/发布者是用来生产消息,将消息发送至MQ服务器,由MQ服务器存储到消息队列中,消费者/订阅者则从消息队列中进行获取消息,从而消费消息,即MQ消息队列的基础原理。

    广播推送:前提必须先订阅,否则不会收到任何消息。比如:现在有100条消息推送,甲在推送前就关注了,则甲可以收到这100条消息,乙在推送后关注,乙只能收到之后发送的消息,而收不到和甲一样的100条消息。并且如果推送100条消息,则每个关注过的用户都会收到100条消息。

    点对点消息推送:如果有100条消息发送,则所有的消费信息总和为100条

下面是一些例子:

package com.jms.ActionMQ;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 点对点消费生产者
 * @author l
 *
 */

public class JMSProducer {

	public static void main(String[] args) {
		ConnectionFactory connectionFactory;//创建连接工厂
		Connection connection = null;
		Session session;
		Destination destination;
		MessageProducer messageProducer;
		
		connectionFactory = new ActiveMQConnectionFactory("tcp://10.17.3.78:61616");//创建连接工厂
		try {
			connection = connectionFactory.createConnection();//创建连接
			connection.start();//打开连接
			session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);//设置session连接方式,TRUE开启事务,FALSE关闭事务
			destination = session.createQueue("eweb");//创建消息队列
			messageProducer = session.createProducer(destination);//创建消息生产者
			sendMessage(session,messageProducer);//生产消息
			session.commit();//注意:不要忘了提交,不然不会提交事务
		} catch (JMSException e) {
			e.printStackTrace();
		}finally{
			try {
				if(connection!=null)
					connection.close();//关闭连接
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
		
	}
	public static void sendMessage(Session session,MessageProducer messageProducer){
		try {
			for (int i = 0; i < 10; i++) {
				TextMessage message = session.createTextMessage("Action"+i);
				System.out.println("Action"+i);
				messageProducer.send(message);//发送消息
			}
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
}
package com.jms.ActionMQ;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.console.command.CreateCommand;

/**
 * 点对点消费者
 * @author l
 *
 */
public class JMDConsumer {

	public static void main(String[] args) {
		ConnectionFactory connectionFactory;
		Connection connection = null;
		Session session;
		Destination destination;
		MessageConsumer messageConsumer;
		
		connectionFactory = new ActiveMQConnectionFactory("tcp://10.17.3.78:61616");
		try {
			connection = connectionFactory.createConnection();
			connection.start();
			session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("eweb");
			messageConsumer = session.createConsumer(destination);//创建消费者
			//使用true循环一直请求消息,这样效率过低不建议使用
			/*while(true){
				TextMessage message = (TextMessage) messageConsumer.receive();
				System.out.println(message.getText());
			}*/
			//创建Listener添加监听,可以减少资源消耗
			messageConsumer.setMessageListener(new MessageListener() {
				
				@Override
				public void onMessage(Message paramMessage) {
					try {
						System.out.println("接收到的消息:"+((TextMessage)paramMessage).getText());
					} catch (JMSException e) {
						e.printStackTrace();
					}
				}
			});
		} catch (JMSException e) {
			e.printStackTrace();
		}
		//注意:在消费者中不能关闭连接,否则无法获取到消息
		/*finally{
			if(connection!=null){
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}*/
		
		
	}
}
package com.jms.producer2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * topic生产者
 * @author l
 *
 */

public class JMSProducer {
private static final int SENDNUM = 10;
	
	public static void main(String[] args) {
		ConnectionFactory connectionFactory;
		Connection connection = null;
		Session session;//会话 接收或者发送消息的线程
		Destination destination;//消息的目的地
		MessageProducer messageProducer;//消息生产者
		
		
//		connectionFactory = new ActiveMQConnectionFactory("admin", "123456", "tcp://10.17.3.78:61616");
		connectionFactory = new ActiveMQConnectionFactory("tcp://10.17.3.78:61616");
		try {
			connection = connectionFactory.createConnection();
			connection.start();
			session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			destination = session.createTopic("FirstTopic1");//创建topic
			messageProducer = session.createProducer(destination);
			sendMessage(session, messageProducer);
			session.commit();
		} catch (JMSException e) {
			e.printStackTrace();
		}finally {
			if (connection!=null) {
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
		
	}
	public static void sendMessage(Session session,MessageProducer messageProducer ) throws JMSException{
		for (int i = 0; i < JMSProducer.SENDNUM; i++) {
			TextMessage message = session.createTextMessage("ActiveMQ 发送的消息"+i);
			System.out.println("发送的消息:"+"ActiveMQ 发送的消息"+i);
			messageProducer.send(message);
		}
	}
}
package com.jms.producer2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * topic消费者1
 * @author l
 *
 */
public class JSMConsumer1 {
	public static void main(String[] args) {
		ConnectionFactory connectionFactory;
		Connection connection = null;
		Session session;//会话 接收或者发送消息的线程
		Destination destination;//消息的目的地
		MessageConsumer messageconsumer;
		
		connectionFactory = new ActiveMQConnectionFactory("admin", "123456", "tcp://10.17.3.78:61616");
//		connectionFactory = new ActiveMQConnectionFactory("tcp://10.17.3.78:61616");
		
		try {
			connection = connectionFactory.createConnection();
			connection.start();
			session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
			destination = session.createTopic("FirstTopic1");
			messageconsumer = session.createConsumer(destination);
			messageconsumer.setMessageListener(new MessageListener() {
				
				@Override
				public void onMessage(Message paramMessage) {
					try {
						System.out.println("接收到的消息:"+((TextMessage)paramMessage).getText());
					} catch (JMSException e) {
						e.printStackTrace();
					}
				}
			}); // 注册消息监听
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
}
package com.jms.producer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JSMConsumer2 {
	public static void main(String[] args) {
		ConnectionFactory connectionFactory;
		Connection connection = null;
		Session session;//会话 接收或者发送消息的线程
		Destination destination;//消息的目的地
		MessageConsumer messageconsumer;
		
//		connectionFactory = new ActiveMQConnectionFactory("admin", "123456", "tcp://10.17.3.78:61616");
		connectionFactory = new ActiveMQConnectionFactory("tcp://10.17.3.78:61616");
		
		try {
			connection = connectionFactory.createConnection();
			connection.start();
			session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("FirstQueue");
			messageconsumer = session.createConsumer(destination);
			messageconsumer.setMessageListener(new MessageListener() {
				
				@Override
				public void onMessage(Message paramMessage) {
					try {
						System.out.println("接收到的消息:"+((TextMessage)paramMessage).getText());
					} catch (JMSException e) {
						e.printStackTrace();
					}
				}
			}); // 注册消息监听
		} catch (JMSException e) {
			e.printStackTrace();
		}finally {
			if (connection!=null) {
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

注意:在实际应用中消费者最好将请求回来的数据放在缓存中,可以提高效率,比如使减少登录时长

展开阅读全文
加载中

作者的其它热门文章

打赏
0
0 收藏
分享
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部