最近需要做一个mqtt的数据订阅持久化及处理工作,简单做了个规划,利用生产者消费者模型对数据进行解耦,mqtt订阅到的数据放入待处理队列,应用处理这些数据后,再把回传的数据放入发布队列,另一个mqtt客户端从发布队列中取出数据,然后发布。整个过程中,将接收到的原始数据和待发布的原始数据保存到数据库。
下面给出源代码,该程序采用java来实现,数据订阅者,数据处理器,数据发布者分别是不同的线程。
MqttSubscriber.java
package com.hyzm.netty;
import java.util.concurrent.BlockingQueue;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.context.ApplicationContext;
import com.hyzm.netty.controller.OriginalDataController;
public class MqttSubscriber implements Runnable {
/*
* 数据生产者,将订阅到的数据放入队列中
*/
private BlockingQueue<Message> m_receivedMsg;
private MqttClient m_client;
private String m_topic = "application/#";;
private String m_hostUrl = "tcp://xx.xx.xx.xx:1883"; // 订阅地址ַ
private String m_user = null;
private String m_passwd = null;
private int m_qos = Common.QoS;
private OriginalDataController m_origDataCtrl;
public MqttSubscriber() {
m_receivedMsg = Common.receivedMessage;
ApplicationContext context = Common.getApplicationContext();
m_origDataCtrl = context.getBean(OriginalDataController.class);
}
public MqttSubscriber(String host_url) {
m_receivedMsg = Common.receivedMessage;
m_hostUrl = host_url;
ApplicationContext context = Common.getApplicationContext();
m_origDataCtrl = context.getBean(OriginalDataController.class);
}
private void connect() {
try {
String clientID = "subscriber" + MqttClient.generateClientId();
m_client = new MqttClient(m_hostUrl, clientID, new MemoryPersistence());
// MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
if (m_user != null) {
connOpts.setUserName(m_user);
connOpts.setPassword(m_passwd.toCharArray());
}
// 保留会话
connOpts.setCleanSession(false);
connOpts.setAutomaticReconnect(true);
connOpts.setConnectionTimeout(0);
// 设置回调
MqttMsgCallback callback = new MqttMsgCallback(m_receivedMsg, m_client, m_origDataCtrl);
m_client.setCallback(callback);
// 建立连接
// System.out.println("Connecting to broker: " + broker);
while (!m_client.isConnected()) {
m_client.connect(connOpts);
}
// 开始订阅
m_client.subscribe(m_topic, m_qos);
} catch (MqttException me) {
me.printStackTrace();
}
}
@Override
public void run() {
connect();
}
}
公共变量定义Common.java
package com.hyzm.netty;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class Common implements ApplicationContextAware {
/*
* 公共变量,用于数据发送与接收
* 所有时间变量单位均为秒
*/
public static BlockingQueue<Message> receivedMessage = new ArrayBlockingQueue<Message>(5);
public static BlockingQueue<Message> sentHttpMsg = new ArrayBlockingQueue<Message>(5);
public static BlockingQueue<Message> sentMqttMsg = new ArrayBlockingQueue<Message>(5);
public static int QoS = 2;
public static int TIME_GAP = 30; // 间隔30秒
private static ApplicationContext applicationContext = null;
public static String Base64toHex(String data) {
return "";
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (Common.applicationContext == null) {
Common.applicationContext = applicationContext;
}
}
// 获取Bean
public static <T> T getBean(Class<T> clazz) {
return getApplicationContext().getBean(clazz);
}
// 通过name, 类Clazz获取Bean
public static <T> T getBean(String name, Class<T> clazz) {
return getApplicationContext().getBean(name, clazz);
}
}
消息定义Message.java
package com.hyzm.netty;
import java.time.LocalDateTime;
import java.util.Base64;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializeFilter;
import com.alibaba.fastjson.serializer.SimplePropertyPreFilter;
import com.hyzm.netty.entity.OriginalData;
public class Message extends MqttMessage {
/*
* 用于保存/传输收发到的数据
* 可在此进行解码
*/
private OriginalData m_data;
private String m_json;
// received msg
public Message(String topic, LocalDateTime dt, MqttMessage msg) {
super();
m_json = msg.toString();
deserialization(m_json);
m_data.setTopic(topic);
m_data.setReceived_date(dt);
String decodedData = decode(m_data.getData());
m_data.setDecode_data(decodedData);
}
// publish data
public Message(OriginalData data) {
super();
m_data = data;
String encodedData = encode(data.getDecode_data());
m_data.setData(encodedData);
m_json = JSON.toJSONString(m_data);
}
private void deserialization(String json) {
m_data = JSON.parseObject(json, OriginalData.class);
}
private String decode(String orig) {
if (orig != null) {
return Common.Base64toHex(orig);
}
return "";
}
private String encode(String hex) {
Base64.Encoder encoder = Base64.getEncoder();
String tmp1 = new String(hex.getBytes());
try {
byte[] tmp2 = Hex.decodeHex(tmp1.toCharArray());
return encoder.encodeToString(tmp2);
} catch (DecoderException de) {
de.printStackTrace();
}
return "";
}
public OriginalData getOriginalData() {
return m_data;
}
// prepare for print
public String toJSON(SerializeFilter filter) {
return JSON.toJSONString(m_data, filter);
}
public String toJSON() {
return m_json;
}
// prepare for publish
public MqttMessage toMqttMessage() {
SimplePropertyPreFilter filter = new SimplePropertyPreFilter();
filter.getIncludes().add("confirmed");
filter.getIncludes().add("fPort");
filter.getIncludes().add("data");
String json = JSON.toJSONString(m_data, filter);
MqttMessage message = new MqttMessage(json.getBytes());
return message;
}
}
MqttMsgCallback.java
package com.hyzm.netty;
import java.time.LocalDateTime;
import java.util.concurrent.BlockingQueue;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import com.hyzm.netty.controller.OriginalDataController;
public class MqttMsgCallback implements MqttCallback {
private BlockingQueue<Message> m_receivedMsg;
private MqttClient m_client;
private OriginalDataController m_origDataCtrl;
// 用于publisher
public MqttMsgCallback(BlockingQueue<Message> messageList, MqttClient client) {
super();
m_receivedMsg = messageList;
m_client = client;
}
// 用于subscriber
public MqttMsgCallback(BlockingQueue<Message> messageList,
MqttClient client, OriginalDataController ctrl) {
m_receivedMsg = messageList;
m_client = client;
m_origDataCtrl = ctrl;
}
public void connectionLost(Throwable cause) {
cause.printStackTrace();
System.out.println("MqttMsgCallback: 连接丢失...");
try {
m_client.reconnect();
System.out.println("MqttMsgCallback: 重连成功!");
} catch (MqttException me) {
me.printStackTrace();
System.out.println("MqttMsgCallback: 重连失败!");
}
}
public void messageArrived(String topic, MqttMessage message) {
LocalDateTime dt = LocalDateTime.now(); // 获取当前时间,标记为获取数据的时间
Message msg = new Message(topic, dt, message);
try {
System.out.println("Subscriber转发数据: " + topic + "\n" + msg.toJSON());
m_receivedMsg.put(msg); // 放入处理队列
m_origDataCtrl.save(msg.getOriginalData()); // 保存到数据库
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void deliveryComplete(IMqttDeliveryToken token) {
// System.out.println("Publisher数据发送完成!");
}
}
数据处理器DataManager.java
package com.hyzm.netty;
import java.util.concurrent.BlockingQueue;
public class DataManager implements Runnable {
/*
* 设备管理类,作为获取数据生产者消费者中的消费者
* 如果有需要回传的数据,则作为回传数据的生产者,将回传数据放入缓存
* 采用数据驱动模式,即当有数据过来时立即处理,否则阻塞,这样能做到对数据的即时处理
*/
private BlockingQueue<Message> m_receivedMsg;
private BlockingQueue<Message> m_sentHttpMsg;
private BlockingQueue<Message> m_sentMqttMsg;
public DataManager() {
m_receivedMsg = Common.receivedMessage;
m_sentHttpMsg = Common.sentHttpMsg;
m_sentMqttMsg = Common.sentMqttMsg;
}
public void run() {
while (true) {
try {
Message msg1 = m_receivedMsg.take(); // 取出数据
Message msg2 = null;
msg2 = process(msg1);
String returnMsg = msg2 == null ? "null" : msg2.toJSON();
System.out.println("DM收到数据: " + msg1.toJSON()
+ "\n 然后回传数据: " + returnMsg);
if (msg2 != null) {
m_sentHttpMsg.put(msg2); // 回传数据
m_sentMqttMsg.put(msg2); // 回传数据
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 处理数据,未实现
private Message process(Message msg) {
return msg;
}
}
接下来定义数据结构和controller,方便保存数据。
OriginalDataController.java
package com.hyzm.netty.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RestController;
import com.hyzm.netty.entity.OriginalData;
import com.hyzm.netty.repository.OriginalDataRepository;
@RestController
public class OriginalDataController {
@Autowired
private OriginalDataRepository m_origDataRepository;
public void save(OriginalData data) {
m_origDataRepository.saveAndFlush(data);
}
}
OriginalData.java
package com.hyzm.netty.entity;
import java.time.LocalDateTime;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import org.hibernate.annotations.GenericGenerator;
import lombok.Data;
@Data
@Entity(name = "original_data")
public class OriginalData {
@Column(name = "primary_key")
@Id
@GeneratedValue(generator = "system_uuid")
@GenericGenerator(name = "system_uuid", strategy = "uuid")
String primary_key;
@Column(name = "topic")
String topic;
@Column(name = "received_date")
LocalDateTime received_date;
@Column(name = "confirmed")
int confirmed;
@Column(name = "fport")
int fPort;
@Column(name = "data")
String data;
@Column(name = "decode_data")
String decode_data;
}
OriginalDataRepository.java
package com.hyzm.netty.repository;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import com.hyzm.netty.entity.OriginalData;
@Repository
public interface OriginalDataRepository extends JpaRepository<OriginalData, String> {
}
对于回传数据,采用了继承的方式,同时支持mqtt和http回传。定义抽象基类PublisherBase,然后MqttPublisher和HttpPublisher分别继承PublisherBase。从而实现了同时支持两种方式回传的功能。
PublisherBase.java
package com.hyzm.netty;
import org.springframework.context.ApplicationContext;
import com.hyzm.netty.controller.OriginalDataController;
public class PublisherBase implements Runnable {
/*
* 回传数据的消费者,将回传的数据发布
*/
protected OriginalDataController m_origDataCtrl;
public PublisherBase() {
ApplicationContext context = Common.getApplicationContext();
m_origDataCtrl = context.getBean(OriginalDataController.class);
}
protected void connect() {
}
@Override
public void run() {
}
}
MqttPublisher.java
package com.hyzm.netty;
import java.util.concurrent.BlockingQueue;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.context.ApplicationContext;
import com.hyzm.netty.controller.OriginalDataController;
public class MqttPublisher extends PublisherBase {
/*
* 回传数据的消费者,将回传的数据发布到mqtt服务器
*/
private BlockingQueue<Message> m_sendMsg;
private MqttClient m_client;
private String m_hostUrl = "tcp://xx.xx.xx.xx:1883"; // 仅用于测试
private String m_user = null;
private String m_passwd = null;
private int m_qos = Common.QoS;
public MqttPublisher() {
super();
m_sendMsg = Common.sentMqttMsg;
connect();
}
public MqttPublisher(String host_url) {
super();
m_hostUrl = host_url;
m_sendMsg = Common.sentMqttMsg;
connect();
}
@Override
protected void connect() {
try {
String clientID = "publisher" + MqttClient.generateClientId();
m_client = new MqttClient(m_hostUrl, clientID, new MemoryPersistence());
// MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
if (m_user != null) {
connOpts.setUserName(m_user);
connOpts.setPassword(m_passwd.toCharArray());
}
// 保留会话
connOpts.setCleanSession(false);
connOpts.setAutomaticReconnect(true);
connOpts.setConnectionTimeout(0);
// 设置回调
MqttMsgCallback callback = new MqttMsgCallback(m_sendMsg, m_client);
m_client.setCallback(callback);
// 建立连接
// System.out.println("Connecting to broker: " + broker);
while (!m_client.isConnected()) {
m_client.connect(connOpts);
}
} catch (MqttException me) {
me.printStackTrace();
}
}
@Override
public void run() {
while (true) {
try {
Message msg = m_sendMsg.take();
msg.setQos(m_qos);
// 将得到的数据发布出去
System.out.println("Mqtt Publisher收到回传的数据: " + msg.toJSON());
m_client.publish(msg.getOriginalData().getTopic(), msg.toMqttMessage());
m_origDataCtrl.save(msg.getOriginalData());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (MqttException me) {
me.printStackTrace();
}
}
}
}
HttpPublisher.java
package com.hyzm.netty;
import java.io.IOException;
import java.net.Authenticator;
import java.net.InetSocketAddress;
import java.net.PasswordAuthentication;
import java.net.ProxySelector;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import com.hyzm.netty.entity.OriginalData;
public class HttpPublisher extends PublisherBase {
/*
* 回传数据的消费者,将回传的数据发布到http服务器
*/
private BlockingQueue<Message> m_sendMsg;
private HttpClient m_client;
private String m_hostUrl = "https://postman-echo.com/post"; // 仅用于测试
private String m_user = null;
private String m_password = null;
public HttpPublisher() {
super();
m_sendMsg = Common.sentHttpMsg;
connect();
}
public HttpPublisher(String host_url) {
super();
m_hostUrl = host_url;
m_sendMsg = Common.sentHttpMsg;
connect();
}
@Override
protected void connect() {
m_client = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2)
.connectTimeout(Duration.ofSeconds(20))
// .authenticator(new Authenticator() {
// @Override
// protected PasswordAuthentication getPasswordAuthentication() {
// return new PasswordAuthentication(m_user, m_password.toCharArray());
// }
// })
// .proxy(ProxySelector.of(new InetSocketAddress("proxy.example.com", 80)))
.build();
}
@Override
public void run() {
while (true) {
try {
Message msg = m_sendMsg.take();
// 将得到的数据发布出去
System.out.println("Http Publisher收到回传的数据: " + msg.toJSON());
OriginalData data = msg.getOriginalData();
HttpResponse<String> res = requestPost(m_hostUrl, msg.toJSON());
if (res != null && res.statusCode() == 200) { // 发送成功
data.setConfirmed(1); // 暂时设置此值表示发送成功
} else {
data.setConfirmed(-1); // 暂时设置此值表示发送失败
}
m_origDataCtrl.save(data);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 实现了get请求
public HttpResponse<String> requestGet(String url) {
HttpResponse<String> response = null;
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(new URI(url))
.timeout(Duration.ofSeconds(10))
.headers("Content-Type", "text/plain;charset=UTF-8")
.GET()
.build();
response = m_client.send(request, HttpResponse.BodyHandlers.ofString());
} catch (IOException ioe) {
ioe.printStackTrace();
} catch (InterruptedException ie) {
ie.printStackTrace();
} catch (URISyntaxException use) {
use.printStackTrace();
}
return response;
}
// 实现了post请求
public HttpResponse<String> requestPost(String url, String param) {
HttpResponse<String> response = null;
try {
HttpRequest request = HttpRequest.newBuilder()
.uri(new URI(url))
.timeout(Duration.ofSeconds(10))
.header("Content-Type", "application/json;charset=UTF-8")
.POST(HttpRequest.BodyPublishers.ofString(param))
.build();
response = m_client.send(request, HttpResponse.BodyHandlers.ofString());
} catch (IOException ioe) {
ioe.printStackTrace();
} catch (InterruptedException ie) {
ie.printStackTrace();
} catch (URISyntaxException use) {
use.printStackTrace();
}
return response;
}
}
如此,则可在主程序中调用并运行,实现从mqtt订阅到数据,经过处理之后,用http发布的整个流程。
MainThread.java
package com.hyzm.netty;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
@Order(2)
public class MainThread implements CommandLineRunner {
@Override
public void run(String... args) {
DataManager dm = new DataManager();
Thread intermediator = new Thread(dm, "DataManager");
MqttSubscriber subscriber = new MqttSubscriber();
Thread producer = new Thread(subscriber, "MqttListener");
PublisherBase publisher1 = new MqttPublisher();
Thread mqttConsumer = new Thread(publisher1, "MqttPublisher");
PublisherBase publisher2 = new HttpPublisher();
Thread httpConsumer = new Thread(publisher2, "HttpPublisher");
producer.start();
intermediator.start();
mqttConsumer.start();
httpConsumer.start();
}
}
AppStarter.java
package com.hyzm.netty;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class AppStarter {
public static void main(String[] args) {
SpringApplication.run(AppStarter.class, args);
}
}
以上完整的maven工程可在如下地址下载:
https://download.csdn.net/download/u014559935/88340538
注意上面的程序需要设置mysql数据库才能正常运行,用容器的话可采用如下命令运行mysql容器。
# 将mysql5.7容器映射到主机3306端口,做目录映射,设置环境变量,令root密码为123456,且可从任意地址登录,并能自动重启
docker run --name mysql -p 3306:3306 -v /usr/local/mysql/conf:/etc/mysql/conf.d -v /usr/local/mysql/data:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 -e MYSQL_ROOT_HOST='%' --restart=always -d mysql:5.7