文档章节

ActiveMQ的使用

保罗的寓言
 保罗的寓言
发布于 2013/01/06 16:53
字数 988
阅读 318
收藏 2

整理一下ActiveMq的使用:

创建一个Queue 的sender:


package com.ule.myq;

import java.util.HashMap;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {
    private static final int SEND_NUMBER = 5;

    public static void main(String[] args) {
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory;
        // Connection :JMS 客户端到JMS Provider 的连接
        Connection connection = null;
        // Session: 一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // MessageProducer:消息发送者
        MessageProducer producer;
        // TextMessage message;
        // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
               // "tcp://172.24.147.47:61618"
//               "tcp://172.24.207.30:61618"
                //"tcp://172.24.207.30:61618"
                "tcp://localhost:61616"
                );
        try {
            // 构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操作连接
            session = connection.createSession(Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
          //  destination = session.createQueue("ULE_PROMOENG_GIFT_MONINTOR");
            destination = session.createQueue("DylanTestQueue");
            // 得到消息生成者【发送者】
            
            producer = session.createProducer(destination);
            // 设置不持久化,此处学习,实际根据项目决定
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            // 构造消息,此处写死,项目就是参数,或者方法获取
           ObjectMessage om= session.createObjectMessage();
          /* HashMap<String,String> sendMap=new HashMap<String,String>();
           sendMap.put("eventId", "708");
           sendMap.put("taskType", "1");*/
            HashMap<String,HashMap> sendMap=new HashMap<String,HashMap>();
           HashMap<String,Object> orderMap=new HashMap<String,Object>();
           orderMap.put("escOrderid", "2012090600119687");
           orderMap.put("buyerOnlyid", "9637");
           orderMap.put("transUsrPhone", "18702141138");
           orderMap.put("productAmount", 100.00);
           HashMap<String,Object> payMap=new HashMap<String,Object>();
           payMap.put("remark","6212855510000000017");
           payMap.put("paymentSubtype","11");
           payMap.put("serialNum","100037531");
           payMap.put("paymentType","9");
           HashMap<String,Object> promotionMap=new HashMap<String,Object>();
           promotionMap.put("promotionTicket", "2012090700030254");
           promotionMap.put("promotionId", "652");
           sendMap.put("order", orderMap);
           sendMap.put("payment", payMap);
           sendMap.put("promotion", promotionMap);
           om.setObject(sendMap);
           System.out.println("开发发送Queue。。。。。。。。。。。。。。。。。。。。。。。。。。。");
           producer.send(om);
           System.out.println("成功发送Queue。。。。。。。。。。。。。。。。。。。。。。。。。。。");
          // sendMessage(session, producer);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }

    public static void sendMessage(Session session, MessageProducer producer)
            throws Exception {
//        for (int i = 1; i <= SEND_NUMBER; i++) {
//            TextMessage message = session
//                    .createTextMessage("ActiveMq 发送的消息" + i);
//            // 发送消息到目的地方
//            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
//            producer.send(message);
//        }
    	TextMessage message = session.createTextMessage("搞搞更健康~~");
        producer.send(message);
    }
}
2.一个queue的消费者进行消费message的demo



package com.ule.myq;


import java.util.HashMap;
import java.util.Map;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {
    public static void main(String[] args) {
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory;
        // Connection :JMS 客户端到JMS Provider 的连接
        Connection connection = null;
        // Session: 一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // 消费者,消息接收者
        MessageConsumer consumer;
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
//                  "tcp://172.24.147.47:61618"
                //"tcp://172.24.207.30:61618"
                "tcp://localhost:61616"
                );
        try {
            // 构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操作连接
            session = connection.createSession(Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
            destination = session.createQueue("DylanTestQueue");
            consumer = session.createConsumer(destination);
            /*while (true) {
                //设置接收者接收消息的时间,为了便于测试,这里谁定为100s
                TextMessage message = (TextMessage) consumer.receive(100000);
                if (null != message) {
                    System.out.println("收到消息" + message.getText());
                } else {
                    break;
                }
            }*/
            
            while (true) {
                //设置接收者接收消息的时间,为了便于测试,这里谁定为100s
            	ObjectMessage message = (ObjectMessage) consumer.receive(100);
                System.out.println(message.getObject());
                HashMap msgMap=(HashMap)message.getObject();
                Map orderMap=(Map) msgMap.get("order");
    			Map paymentMap=(Map) msgMap.get("payment");
    			Map promotionMap=(Map) msgMap.get("promotion");
                
                if (null != message) {
                    System.out.println("小丁00 ::::收到1条消息消息,开始进行收听。。。。。。" + message.toString());
                    
                    System.out.println(orderMap.get("transUsrPhone"));
                    System.out.println(orderMap.get("escOrderid"));
                    System.out.println(orderMap.get("buyerOnlyid"));
                    System.out.println(paymentMap.get("remark"));
                    System.out.println(paymentMap.get("paymentSubtype"));
                    System.out.println(paymentMap.get("serialNum"));
                    System.out.println(paymentMap.get("paymentType"));
                    System.out.println(promotionMap.get("promotionTicket"));
                    System.out.println(promotionMap.get("promtionEventCode"));
                    System.out.println("小丁00 ::::收到1条消息消息,收听完毕。。。。。。");
                    
                } else {
                    break;
                }
            }
            
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }
}


3. 创建一个topic 的发布者:


package com.ule.demo;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
public class TopicPublisher {
	public static void main(String[] args) throws JMSException {
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		Connection connection = factory.createConnection();
		connection.start();
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Topic topic = session.createTopic("myTopic.messages");

		MessageProducer producer = session.createProducer(topic);
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

		//while(true) {
			TextMessage message = session.createTextMessage();
			message.setText("message_" + System.currentTimeMillis());
			producer.send(message);
			System.out.println("Sent message: " + message.getText());

			/*try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}*/
		//}

		session.close();
		connection.stop();
		connection.close();
	}
}


4.创建订阅者:

package com.ule.demo;


import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicSubscriber {
	public static void main(String[] args) throws JMSException {
		ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		Connection connection = factory.createConnection();
		connection.start();
		
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Topic topic = session.createTopic("myTopic.messages");

		MessageConsumer consumer1 = session.createConsumer(topic);
		consumer1.setMessageListener(new MessageListener() {
			public void onMessage(Message message) {
				System.out.println(":xiaoding1 收到一条消息: 开始收听消息");
				TextMessage tm = (TextMessage) message;
				try {
					System.out.println("Received message: " + tm.getText());
				} catch (JMSException e) {
					e.printStackTrace();
				}
				System.out.println(":xiaoding1 收听了一条消息: 处理完毕!。。。。。");
			}
		});
		session.close();
		connection.stop();
		connection.close();
	}
}

© 著作权归作者所有

上一篇: JDBC的连接方式
下一篇: sql统计
保罗的寓言
粉丝 25
博文 70
码字总数 40506
作品 0
杨浦
高级程序员
私信 提问
Win7环境下安装ActiveMQ

参考ActiveMQ官方文档:http://activemq.apache.org/getting-started.html 安装ActiveMQ 近来要学习JMS,在网上查了些资料,发现ActiveMQ是比较流行的JMS开源框架,决定使用ActiveMQ来学习J...

纠结名字
2015/08/09
1K
0
ActiveMQ安装配置和使用简例

本文作者:Zhang Phil 原文链接:http://blog.csdn.net/zhangphil/article/details/48173665 ActiveMQ安装配置和使用简例 ActiveMQ是一套JMS(Java Message Service)开源消息服务实现的组件...

开开心心过
2015/09/02
0
0
JMS配置说明-----activeMQ-5.6

1 简介 activeMQ是一个完全支持JMS1.1 和J2EE规范的JMS Provider实现; 尽管规范出台已经是很久的事情了,但JMS在当今的J2EE应用中仍然扮演着特殊的地位; 特性列表 多种语言和协议编写客户端...

次渠龙哥
2018/06/26
0
0
[ActiveMQ]初识ActiveMQ

初识ActiveMQ ActiveMQ介绍 官方网站:http://activemq.apache.org/ 最新版本:ActiveMQ 5.14.1(2016-10-28) 最新版本下载链接:http://activemq.apache.org/download.html 历史版本下载链接...

candy-yun
2016/10/29
0
0
ActiveMQ 持久化(数据库),查询队列剩余消息数、出队数的实现

《ActiveMQ 持久化(文件),查询队列剩余消息数、出队数的实现》分析了消息队列持久化保存,假如activemq服务器突然停止,服务器启动后,还可以继续查找队列中的消息。现在分析队列中的消息...

cookqq
2013/03/04
4.1K
0

没有更多内容

加载失败,请刷新页面

加载更多

1、Docker学习,第一天

Docker学习,第一天 一、Docker简介 环境配置如此之麻烦,换台机器,重来一次,费事费力。安装的时候,把原始环境一模一样的复制过来。开发人员利用Docker可以消除写作编码时,”在我的机器上...

有一个小阿飞
10分钟前
2
0
10.23

一、编写一个程序,把用分钟表示的时间转换成用小时和分钟表示的时间。使用#define或const创建一个表示60的符号常量或const变量。通过while循环让用户重复输入值,直到用户输入小于或等于0...

197王彧涛
36分钟前
2
0
手机视频如何制作GIF动图

很多小伙伴都喜欢用GIF动图在各大社交软件上与好友斗图,那你知道这些好玩有趣的GIF动图是如何制作的吗?下面教你一个将手机视频制作成GIF动图的方法,让你都可以随时随地制作有趣的表情包,...

白米稀饭2019
41分钟前
5
0
Spring Security 实战干货:实现自定义退出登录

1. 前言 上一篇对 Spring Security 所有内置的 Filter 进行了介绍。今天我们来实战如何安全退出应用程序。 2. 我们使用 Spring Security 登录后都做了什么 这个问题我们必须搞清楚!一般登录...

码农小胖哥
今天
10
0
JVM核心知识-类加载机制

JVM中类的生命周期包括7个阶段,加载、准备、验证、解析、初始化、使用、卸载。其中准备、验证、解析被归为连接阶段。 加载 jvm在这个阶段完成的工作 通过类名获取类的二进制字节流 将这个字...

moon888
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部