MQTT的java客户端

原创
2023/09/14 22:07
阅读数 114

最近需要做一个mqtt的数据订阅持久化及处理工作,简单做了个规划,利用生产者消费者模型对数据进行解耦,mqtt订阅到的数据放入待处理队列,应用处理这些数据后,再把回传的数据放入发布队列,另一个mqtt客户端从发布队列中取出数据,然后发布。整个过程中,将接收到的原始数据和待发布的原始数据保存到数据库。

下面给出源代码,该程序采用java来实现,数据订阅者,数据处理器,数据发布者分别是不同的线程。

MqttSubscriber.java

package com.hyzm.netty;

import java.util.concurrent.BlockingQueue;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.context.ApplicationContext;

import com.hyzm.netty.controller.OriginalDataController;

public class MqttSubscriber implements Runnable {
/*
 * 数据生产者,将订阅到的数据放入队列中
 */
	private BlockingQueue<Message> m_receivedMsg;
	private MqttClient m_client;
	private String m_topic = "application/#";;
	private String m_hostUrl = "tcp://xx.xx.xx.xx:1883";  // 订阅地址ַ
	private String m_user = null;
	private String m_passwd = null;
	private int m_qos = Common.QoS;

	private OriginalDataController m_origDataCtrl;
	
	public MqttSubscriber() {
		m_receivedMsg = Common.receivedMessage;
	    ApplicationContext context = Common.getApplicationContext();
		m_origDataCtrl = context.getBean(OriginalDataController.class);
	}
	
	public MqttSubscriber(String host_url) {
		m_receivedMsg = Common.receivedMessage;
		m_hostUrl = host_url;
	    ApplicationContext context = Common.getApplicationContext();
		m_origDataCtrl = context.getBean(OriginalDataController.class);		
	}
	
	private void connect() {
		try {
			String clientID = "subscriber" + MqttClient.generateClientId();
			m_client = new MqttClient(m_hostUrl, clientID, new MemoryPersistence());
			
			// MQTT 连接选项
			MqttConnectOptions connOpts = new MqttConnectOptions();
			if (m_user != null) {
	            connOpts.setUserName(m_user);
	            connOpts.setPassword(m_passwd.toCharArray());				
			}

			// 保留会话
            connOpts.setCleanSession(false);
            connOpts.setAutomaticReconnect(true);
            connOpts.setConnectionTimeout(0);
            
            // 设置回调
            MqttMsgCallback callback = new MqttMsgCallback(m_receivedMsg, m_client, m_origDataCtrl);
            m_client.setCallback(callback);
 
            // 建立连接
//            System.out.println("Connecting to broker: " + broker);
            while (!m_client.isConnected()) {
                m_client.connect(connOpts);            	
            }

            // 开始订阅
            m_client.subscribe(m_topic, m_qos);
		} catch (MqttException me) {
			me.printStackTrace();
		}
	}
	
	@Override
	public void run() {
		connect();
	}
}

 

公共变量定义Common.java

package com.hyzm.netty;



import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

@Component
public class Common implements ApplicationContextAware {
/*
 * 公共变量,用于数据发送与接收
 * 所有时间变量单位均为秒
 */
	public static BlockingQueue<Message> receivedMessage = new ArrayBlockingQueue<Message>(5);
	public static BlockingQueue<Message> sentHttpMsg = new ArrayBlockingQueue<Message>(5);
	public static BlockingQueue<Message> sentMqttMsg = new ArrayBlockingQueue<Message>(5);
	
	public static int QoS = 2;
	public static int TIME_GAP = 30;   // 间隔30秒
	
    private static ApplicationContext applicationContext = null;
	
	public static String Base64toHex(String data) {
		return "";
	}
	
	public static ApplicationContext getApplicationContext() {
		return applicationContext;
	}

	@Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (Common.applicationContext == null) {
            Common.applicationContext = applicationContext;
        }
    }
	
    // 获取Bean
    public static <T> T getBean(Class<T> clazz) {
        return getApplicationContext().getBean(clazz);
    }

    // 通过name, 类Clazz获取Bean
    public static <T> T getBean(String name, Class<T> clazz) {
        return getApplicationContext().getBean(name, clazz);
    }
}

 

消息定义Message.java

package com.hyzm.netty;

import java.time.LocalDateTime;
import java.util.Base64;

import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializeFilter;
import com.alibaba.fastjson.serializer.SimplePropertyPreFilter;
import com.hyzm.netty.entity.OriginalData;

public class Message extends MqttMessage {
/*
 * 用于保存/传输收发到的数据
 * 可在此进行解码
 */
	private OriginalData m_data;
	private String m_json;
	
	// received msg
	public Message(String topic, LocalDateTime dt, MqttMessage msg) {
		super();
		m_json = msg.toString();

		deserialization(m_json);
		m_data.setTopic(topic);
		m_data.setReceived_date(dt);
		String decodedData = decode(m_data.getData());
		m_data.setDecode_data(decodedData);
	}
	
	// publish data
	public Message(OriginalData data) {
		super();
		m_data = data;
		
		String encodedData = encode(data.getDecode_data());
		m_data.setData(encodedData);
		m_json = JSON.toJSONString(m_data);
	}
	
	private void deserialization(String json) {
		m_data = JSON.parseObject(json, OriginalData.class);
	}
	
	private String decode(String orig) {
		if (orig != null) {
			return Common.Base64toHex(orig);			
		}

		return "";
	}
	
	private String encode(String hex) {
		Base64.Encoder encoder = Base64.getEncoder();
		String tmp1 = new String(hex.getBytes());
		try {
			byte[] tmp2 = Hex.decodeHex(tmp1.toCharArray());
			return encoder.encodeToString(tmp2);
		} catch (DecoderException de) {
			de.printStackTrace();
		}
		
		return "";
	}
	
	public OriginalData getOriginalData() {
		return m_data;
	}
	
	// prepare for print
	public String toJSON(SerializeFilter filter) {
		return JSON.toJSONString(m_data, filter);
	}
	
	public String toJSON() {
		return m_json;
	}
	
	// prepare for publish
	public MqttMessage toMqttMessage() {
		SimplePropertyPreFilter filter = new SimplePropertyPreFilter();
		filter.getIncludes().add("confirmed");
		filter.getIncludes().add("fPort");
		filter.getIncludes().add("data");
		String json = JSON.toJSONString(m_data, filter);
		MqttMessage message = new MqttMessage(json.getBytes());
		return message;
	}
}

 

MqttMsgCallback.java

package com.hyzm.netty;

import java.time.LocalDateTime;
import java.util.concurrent.BlockingQueue;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import com.hyzm.netty.controller.OriginalDataController;

public class MqttMsgCallback implements MqttCallback {
	private BlockingQueue<Message> m_receivedMsg;
	private MqttClient m_client;

	private OriginalDataController m_origDataCtrl;
	
	// 用于publisher
	public MqttMsgCallback(BlockingQueue<Message> messageList, MqttClient client) {
		super();
		m_receivedMsg = messageList;
		m_client = client;
	}	
	
	// 用于subscriber
	public MqttMsgCallback(BlockingQueue<Message> messageList, 
			MqttClient client, OriginalDataController ctrl) {
		m_receivedMsg = messageList;
		m_client = client;
		m_origDataCtrl = ctrl;
	}

	public void connectionLost(Throwable cause) {
		cause.printStackTrace();
		System.out.println("MqttMsgCallback: 连接丢失...");
		try {
			m_client.reconnect();
			System.out.println("MqttMsgCallback: 重连成功!");
		} catch (MqttException me) {
			me.printStackTrace();
			System.out.println("MqttMsgCallback: 重连失败!");
		}
	}
	
	public void messageArrived(String topic, MqttMessage message) {
		LocalDateTime dt = LocalDateTime.now();   // 获取当前时间,标记为获取数据的时间
		Message msg = new Message(topic, dt, message);
		try {
			System.out.println("Subscriber转发数据: " + topic + "\n" + msg.toJSON());
			
			m_receivedMsg.put(msg);   // 放入处理队列
			m_origDataCtrl.save(msg.getOriginalData());   // 保存到数据库
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	public void deliveryComplete(IMqttDeliveryToken token) {
//		System.out.println("Publisher数据发送完成!");
	}
}	

 

数据处理器DataManager.java

package com.hyzm.netty;

import java.util.concurrent.BlockingQueue;

public class DataManager implements Runnable {
/*
 * 	设备管理类,作为获取数据生产者消费者中的消费者
 *  如果有需要回传的数据,则作为回传数据的生产者,将回传数据放入缓存
 *  采用数据驱动模式,即当有数据过来时立即处理,否则阻塞,这样能做到对数据的即时处理
 */
	private BlockingQueue<Message> m_receivedMsg;
	private BlockingQueue<Message> m_sentHttpMsg;
	private BlockingQueue<Message> m_sentMqttMsg;
	
	public DataManager() {
		m_receivedMsg = Common.receivedMessage;
		m_sentHttpMsg = Common.sentHttpMsg;
		m_sentMqttMsg = Common.sentMqttMsg;
	}
	
	public void run() {
		while (true) {
			try {
				Message msg1 = m_receivedMsg.take();   // 取出数据
				Message msg2 = null;

                msg2 = process(msg1);
				
				String returnMsg = msg2 == null ? "null" : msg2.toJSON();
				System.out.println("DM收到数据: " + msg1.toJSON() 
						+ "\n 然后回传数据: " + returnMsg);
				
				if (msg2 != null) {
					m_sentHttpMsg.put(msg2);   // 回传数据
					m_sentMqttMsg.put(msg2);   // 回传数据
				}				
			} catch (InterruptedException e) {
				e.printStackTrace();
			}	
		}
	}

	// 处理数据,未实现
    private Message process(Message msg) {
        return msg;
    }
}

 

接下来定义数据结构和controller,方便保存数据。

OriginalDataController.java

package com.hyzm.netty.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RestController;

import com.hyzm.netty.entity.OriginalData;
import com.hyzm.netty.repository.OriginalDataRepository;

@RestController
public class OriginalDataController {
	@Autowired
	private OriginalDataRepository m_origDataRepository;
	
	public void save(OriginalData data) {
		m_origDataRepository.saveAndFlush(data);
	}
}

 

OriginalData.java

package com.hyzm.netty.entity;

import java.time.LocalDateTime;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;

import org.hibernate.annotations.GenericGenerator;

import lombok.Data;

@Data
@Entity(name = "original_data")
public class OriginalData {
	@Column(name = "primary_key")
	@Id
	@GeneratedValue(generator = "system_uuid")
	@GenericGenerator(name = "system_uuid", strategy = "uuid")
	String primary_key;
	
	@Column(name = "topic")
	String topic;
	
	@Column(name = "received_date")
	LocalDateTime received_date;
	
	@Column(name = "confirmed")
	int confirmed;
	
	@Column(name = "fport")
	int fPort;
	
	@Column(name = "data")
	String data;
	
	@Column(name = "decode_data")
	String decode_data;
}

 

OriginalDataRepository.java

package com.hyzm.netty.repository;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

import com.hyzm.netty.entity.OriginalData;

@Repository
public interface OriginalDataRepository extends JpaRepository<OriginalData, String> {
	
}

 

对于回传数据,采用了继承的方式,同时支持mqtt和http回传。定义抽象基类PublisherBase,然后MqttPublisher和HttpPublisher分别继承PublisherBase。从而实现了同时支持两种方式回传的功能。

PublisherBase.java

package com.hyzm.netty;

import org.springframework.context.ApplicationContext;
import com.hyzm.netty.controller.OriginalDataController;

public class PublisherBase implements Runnable {
/*
 * 回传数据的消费者,将回传的数据发布
 */
		protected OriginalDataController m_origDataCtrl;
		
		public PublisherBase() {
		    ApplicationContext context = Common.getApplicationContext();
			m_origDataCtrl = context.getBean(OriginalDataController.class);
		}
		
		protected void connect() {
			
		}

		@Override
		public void run() {
			
		}
}

 

 MqttPublisher.java

package com.hyzm.netty;

import java.util.concurrent.BlockingQueue;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.context.ApplicationContext;

import com.hyzm.netty.controller.OriginalDataController;

public class MqttPublisher extends PublisherBase {
/*
 * 回传数据的消费者,将回传的数据发布到mqtt服务器
 */
	private BlockingQueue<Message> m_sendMsg;
	private MqttClient m_client;
	private String m_hostUrl = "tcp://xx.xx.xx.xx:1883";   //   仅用于测试
	private String m_user = null;
	private String m_passwd = null;
	private int m_qos = Common.QoS;
		
	public MqttPublisher() {
		super();
		m_sendMsg = Common.sentMqttMsg;
		connect();
	}
	
	public MqttPublisher(String host_url) {
		super();
		m_hostUrl = host_url;
		m_sendMsg = Common.sentMqttMsg;
		connect();
	}
	
	@Override
	protected void connect() {
		try {
			String clientID = "publisher" + MqttClient.generateClientId();
			m_client = new MqttClient(m_hostUrl, clientID, new MemoryPersistence());
			
			// MQTT 连接选项
			MqttConnectOptions connOpts = new MqttConnectOptions();
			if (m_user != null) {
	            connOpts.setUserName(m_user);
	            connOpts.setPassword(m_passwd.toCharArray());				
			}

            // 保留会话
            connOpts.setCleanSession(false);
            connOpts.setAutomaticReconnect(true);
            connOpts.setConnectionTimeout(0);
 
            // 设置回调
            MqttMsgCallback callback = new MqttMsgCallback(m_sendMsg, m_client);
            m_client.setCallback(callback);
 
            // 建立连接
//            System.out.println("Connecting to broker: " + broker);
            while (!m_client.isConnected()) {
                m_client.connect(connOpts);            	
            }
		} catch (MqttException me) {
			me.printStackTrace();
		}
	}
	
	@Override
	public void run() {
		while (true) {
			try {
				Message msg = m_sendMsg.take();
				msg.setQos(m_qos);
				// 将得到的数据发布出去
				System.out.println("Mqtt Publisher收到回传的数据: " + msg.toJSON());
				m_client.publish(msg.getOriginalData().getTopic(), msg.toMqttMessage());
				m_origDataCtrl.save(msg.getOriginalData());
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (MqttException me) {
				me.printStackTrace();
			}
		}
	}
}

 

HttpPublisher.java

package com.hyzm.netty;

import java.io.IOException;
import java.net.Authenticator;
import java.net.InetSocketAddress;
import java.net.PasswordAuthentication;
import java.net.ProxySelector;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;

import com.hyzm.netty.entity.OriginalData;

public class HttpPublisher extends PublisherBase {
/*
 * 回传数据的消费者,将回传的数据发布到http服务器
 */
	private BlockingQueue<Message> m_sendMsg;
	private HttpClient m_client;
	private String m_hostUrl = "https://postman-echo.com/post";   //   仅用于测试
	private String m_user = null;
	private String m_password = null;
	
	public HttpPublisher() {
		super();
		m_sendMsg = Common.sentHttpMsg;
		connect();
	}
	
	public HttpPublisher(String host_url) {
		super();
		m_hostUrl = host_url;
		m_sendMsg = Common.sentHttpMsg;
		connect();
	}

	@Override
	protected void connect() {
		m_client = HttpClient.newBuilder()
                .version(HttpClient.Version.HTTP_2)
                .connectTimeout(Duration.ofSeconds(20))
//                .authenticator(new Authenticator() {
//                    @Override
//                    protected PasswordAuthentication getPasswordAuthentication() {
//                        return new PasswordAuthentication(m_user, m_password.toCharArray());
//                    }
//                })
//                .proxy(ProxySelector.of(new InetSocketAddress("proxy.example.com", 80)))
                .build();
	}
	
	@Override
	public void run() {
		while (true) {
			try {
				Message msg = m_sendMsg.take();
				// 将得到的数据发布出去
				System.out.println("Http Publisher收到回传的数据: " + msg.toJSON());
				OriginalData data = msg.getOriginalData();
				HttpResponse<String> res = requestPost(m_hostUrl, msg.toJSON());
				if (res != null && res.statusCode() == 200) {  // 发送成功
					data.setConfirmed(1);   // 暂时设置此值表示发送成功
				} else {
					data.setConfirmed(-1);  // 暂时设置此值表示发送失败
				}
				m_origDataCtrl.save(data);
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}	
	}
	
	// 实现了get请求
	public HttpResponse<String> requestGet(String url) {
		HttpResponse<String> response = null;
		try {
			HttpRequest request = HttpRequest.newBuilder()
					.uri(new URI(url))
					.timeout(Duration.ofSeconds(10))
					.headers("Content-Type", "text/plain;charset=UTF-8")
					.GET()
					.build();
			response = m_client.send(request, HttpResponse.BodyHandlers.ofString());
		} catch (IOException ioe) {
			ioe.printStackTrace();
		} catch (InterruptedException ie) {
			ie.printStackTrace();
		} catch (URISyntaxException use) {
			use.printStackTrace();
		}
		
		return response;
	}
	
	// 实现了post请求
	public HttpResponse<String> requestPost(String url, String param) {
		HttpResponse<String> response = null;
		try {
			HttpRequest request = HttpRequest.newBuilder()
					.uri(new URI(url))
					.timeout(Duration.ofSeconds(10))
					.header("Content-Type", "application/json;charset=UTF-8")
					.POST(HttpRequest.BodyPublishers.ofString(param))
					.build();
			response = m_client.send(request, HttpResponse.BodyHandlers.ofString());			
		} catch (IOException ioe) {
			ioe.printStackTrace();
		} catch (InterruptedException ie) {
			ie.printStackTrace();
		} catch (URISyntaxException use) {
			use.printStackTrace();
		}
		
		return response;
	}
}

 

如此,则可在主程序中调用并运行,实现从mqtt订阅到数据,经过处理之后,用http发布的整个流程。

MainThread.java

package com.hyzm.netty;

import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
@Order(2)
public class MainThread implements CommandLineRunner {

	@Override
	public void run(String... args) {
    	DataManager dm = new DataManager();
		Thread intermediator = new Thread(dm, "DataManager");
		
		MqttSubscriber subscriber = new MqttSubscriber();
		Thread producer = new Thread(subscriber, "MqttListener");
		
		PublisherBase publisher1 = new MqttPublisher();
		Thread mqttConsumer = new Thread(publisher1, "MqttPublisher");
		
		PublisherBase publisher2 = new HttpPublisher();
		Thread httpConsumer = new Thread(publisher2, "HttpPublisher");
		
		
		producer.start();
		intermediator.start();
		mqttConsumer.start();
		httpConsumer.start();
	}
	
}

AppStarter.java

package com.hyzm.netty;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class AppStarter {

    public static void main(String[] args) {
    	SpringApplication.run(AppStarter.class, args);
    }   
}

 

以上完整的maven工程可在如下地址下载:

https://download.csdn.net/download/u014559935/88340538

 

注意上面的程序需要设置mysql数据库才能正常运行,用容器的话可采用如下命令运行mysql容器。

# 将mysql5.7容器映射到主机3306端口,做目录映射,设置环境变量,令root密码为123456,且可从任意地址登录,并能自动重启
docker run --name mysql -p 3306:3306 -v /usr/local/mysql/conf:/etc/mysql/conf.d -v /usr/local/mysql/data:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 -e MYSQL_ROOT_HOST='%' --restart=always -d mysql:5.7

 

 

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
返回顶部
顶部