一、概述
物联网大多基于MQTT协议进行消息传输,其中EMQX是比较流行的开源实现,EMQX简单易用,社区资源丰富,可参加官网https://www.emqx.com/,本文是简单初探,通过安装EMQX、客户端测试,代码测试等三块进行一个体验。
二、安装
EMQX有很多种部署方式,官网写的很详细,参加https://www.emqx.io/docs/zh/v5.0/deploy/install-docker.html,本文采用centos7+docker安装方式,执行:
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.20
然后访问:http://host:18083/可看到如下页面(默认账号admin:public):
三、客户端测试
EMQX适配了很系统的桌面客户端,笔者使用的MacOs,下载的客户端界面如下:
链接到刚才部署的EMQX上,可以简单做个测试:
四、代码测试
代码语言为JAVA,官网提供的测试项目地址:https://github.com/emqx/MQTT-Client-Examples/tree/master/mqtt-client-Java
第一步,引入依赖
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
第二步,编写异步回调(业务处理类)
package io.emqx.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import java.io.IOException;
public class MqttExample implements MqttCallback {
public static void main(String[] args) {
String broker = "tcp://broker.emqx.io:1883";
int qos = 0;
String action = "publish";
String topic = "test/topic";
String message = "Hello MQTT";
String clientId = MqttClient.generateClientId();
boolean cleanSession = true;
String userName = "emqx";
String password = "public";
for (int i = 0; i < args.length; i++) {
if (args[i].length() == 2 && args[i].startsWith("-")) {
char arg = args[i].charAt(1);
if (arg == 'h') {
help();
return;
}
if (i == args.length - 1 || args[i + 1].charAt(0) == '-') {
System.out.println("Missing value for argument: " + args[i]);
help();
return;
}
switch (arg) {
case 'b':
broker = args[++i];
break;
case 'a':
action = args[++i];
break;
case 't':
topic = args[++i];
break;
case 'q':
qos = Integer.parseInt(args[++i]);
break;
case 'c':
cleanSession = Boolean.parseBoolean(args[++i]);
break;
case 'u':
userName = args[++i];
break;
case 'z':
password = args[++i];
break;
default:
System.out.println("Unknown argument: " + args[i]);
help();
return;
}
} else {
System.out.println("Unknown argument: " + args[i]);
help();
return;
}
}
if (!action.equals("publish") && !action.equals("subscribe")) {
System.out.println("Invalid action: " + action);
help();
return;
}
if (qos < 0 || qos > 2) {
System.out.println("Invalid QoS: " + qos);
help();
return;
}
MqttExample sample = new MqttExample(broker, clientId, cleanSession, userName, password);
try {
if (action.equals("publish")) {
sample.publish(topic, qos, message.getBytes());
} else {
sample.subscribe(topic, qos);
}
} catch (MqttException e) {
e.printStackTrace();
}
}
private MqttClient client;
private String brokerUrl;
private MqttConnectOptions options;
private boolean clean;
private String password;
private String userName;
public MqttExample(String brokerUrl, String clientId, boolean cleanSession, String userName, String password) {
this.brokerUrl = brokerUrl;
this.clean = cleanSession;
this.password = password;
this.userName = userName;
options = new MqttConnectOptions();
options.setCleanSession(clean);
if (userName != null) {
options.setUserName(this.userName);
}
if (password != null) {
options.setPassword(this.password.toCharArray());
}
try {
client = new MqttClient(this.brokerUrl, clientId);
client.setCallback(this);
} catch (MqttException e) {
e.printStackTrace();
log(e.toString());
System.exit(1);
}
}
public void subscribe(String topicName, int qos) throws MqttException {
client.connect(options);
log("Connected to " + brokerUrl + " with client ID " + client.getClientId());
client.subscribe(topicName, qos);
log("Subscribed to topic: " + topicName + " qos " + qos);
try {
System.in.read();
} catch (IOException e) {
}
client.disconnect();
log("Disconnected");
}
public void publish(String topicName, int qos, byte[] payload) throws MqttException {
client.connect(options);
log("Connected to " + brokerUrl + " with client ID " + client.getClientId());
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
client.publish(topicName, message);
log("Published to topic \"" + topicName + "\" qos " + qos);
client.disconnect();
log("Disconnected");
}
private void log(String message) {
System.out.println(message);
}
public void connectionLost(Throwable throwable) {
log("Connection lost: " + throwable);
System.exit(1);
}
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
log("Received message:\n" +
"Topic: " + s + "\t" +
"Message: " + mqttMessage.toString()
);
}
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
private static void help() {
System.out.println(
"Args:\n" +
"-h Help information\n" +
"-b MQTT broker url [default: tcp://broker.emqx.io:1883]\n" +
"-a publish/subscribe action [default: publish]\n" +
"-u Username [default: emqx]\n" +
"-z Password [default: public]\n" +
"-c Clean session [default: true]\n" +
"-t Publish/Subscribe topic [default: test/topic]\n" +
"-q QoS [default: 0]"
);
}
}
测试类
package io.emqx.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttSample {
public static void main(String[] args) {
String topic = "test/topic";
String content = "Hello World";
int qos = 2;
String broker = "tcp://broker.emqx.io:1883";
String clientId = MqttClient.generateClientId();
MemoryPersistence persistence = new MemoryPersistence();
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("emqx_user");
connOpts.setPassword("emqx_password".toCharArray());
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
client.setCallback(new SampleCallback());
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected to broker: " + broker);
client.subscribe(topic, qos);
System.out.println("Subscribed to topic: " + topic);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(topic, message);
System.out.println("Message published");
client.disconnect();
System.out.println("Disconnected");
client.close();
System.exit(0);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
有兴趣的可以关注下微信公共号。