ActiveMQ接受byte消息正确姿势。

原创
2017/11/06 11:24
阅读数 4.8K

致那些年,被ActiveMQ浪费的青春。

 

网上有很多很多小朋友分享的文章,青春比较多的可以去多看看。

曾经我上传了一个病毒在一个群里,一位网友下载后电脑坏了。于是他在群里说:好东西,大家快去下载试试。真心不错。

或者国人总喜欢把一篇错误的文章转载几百次来误导下一位学习者寻求心里平衡。

网上一切文章关于ByteMessage的读取方式都是转为字符串。或者直接写文件。我就纳闷了,要是转为字符串就完事,那我闲着蛋疼发byte[]?

那么如果我传输的内容是一段byte[]呢。比如说是压缩后的json、图片等等呢?

苦寻无果,自己包装一下又怕影响效率,于是看了一下ActiveMQByteMessage的代码,贴上如下解决方案

局部代码:

	    BytesMessage bm = (BytesMessage) message;
		Object[] params = null;
		//调用initializeReading方法初始化数据
		invoke(bm, "initializeReading", params);
		//读取dataIn字段内容(即实际内容)
		DataInputStream dataInputStream = (DataInputStream)PropertUtil.getFieldValue(bm, "dataIn");
		//inputstream转为byte数组。百度一下input2byte找到内容copy过来即可,不贴代码
		byte[] data = FileUtils.input2byte(dataInputStream);
		System.out.println("接收消息 BytesMessage:\t"+data);

全部代码:

@Resource
	private JmsTemplate jt ;

	public void MQTrigger() {
		SysThreadHandle.sysThreadPool.execute(new Runnable() {
			@Override
			public void run() {
				// 接收消息
				while (true) {
					try {
						Message message = (Message) jt.receive();
						// 如果是文本消息
						if (message instanceof TextMessage) {
							TextMessage tm = (TextMessage) message;
							System.out.println("接受消息  textMessage:\t" + tm.getText());
						}

						// 如果是Map消息
						if (message instanceof MapMessage) {
							MapMessage mm = (MapMessage) message;
							System.out.println("接受消息 textMessage:\t" + mm.getString("msgId"));
						}
						// 如果是bytes消息
						if (message instanceof BytesMessage) {
							BytesMessage bm = (BytesMessage) message;
							Object[] params = null;
							invoke(bm, "initializeReading", params);
							DataInputStream dataInputStream = (DataInputStream) PropertUtil.getFieldValue(bm, "dataIn");
							byte[] data = FileUtils.input2byte(dataInputStream);
							System.out.println("接收消息 BytesMessage:\t"+data);
						}

						// 如果是Stream消息
						if (message instanceof StreamMessage) {
							StreamMessage sm = (StreamMessage) message;
							System.out.println(sm.readString());
							System.out.println(sm.readInt());
						}

						// 如果是Object消息
						if (message instanceof ObjectMessage) {
							ObjectMessage om = (ObjectMessage) message;
							Object object = om.getObject();
							System.out.println("接受消息  ObjectMessage:\t" + object);
						}

					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}
		});
	}

	public static Object invoke(Object beanOrClass, String methodName, Object... params) {
		try {
			Class<?> targeClass = beanOrClass.getClass();
			Method[] methods = targeClass.getDeclaredMethods();
			for (Method method : methods) {
				if (methodName.equals(method.getName())) {
					method.setAccessible(true);
					return method.invoke(beanOrClass, params);
				}
			}
			return null;
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}

	}

	/**
	 * 获取字段值,支持点属性
	 * 
	 * @param bean
	 * @param paraName
	 * @return
	 */
	public static Object getFieldValue(Object bean, String paraName) {
		if (StringUtil.isNullOrEmpty(bean)) {
			return null;
		}
		Field[] fs = bean.getClass().getDeclaredFields();
		for (Field f : fs) {
			try {
				if (paraName.equals(f.getName())) {
					f.setAccessible(true);
					return f.get(bean);
				}
			} catch (Exception e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		return null;
	}

	
	@Override
	public void afterPropertiesSet() throws Exception {
		MQTrigger();
	}

 

思路说明:

Message有一个getBodyLength的方法,但是经过测试发现,长度少了几个。

不知道mq为何要如此封装,让我们这么难以取出来。

于是看了一下ActiveMQByteMessage源码。发现调用initializeReading方法初始化data数据。

有一个dataIn的字段,保存着消息内容,ByteMessage里面就是DataInputStream对象。至此,转为byte[]轻而易举。

 

Java交流群:218481849

展开阅读全文
加载中
点击加入讨论🔥(1) 发布并加入讨论🔥
打赏
1 评论
0 收藏
2
分享
返回顶部
顶部