From e0cb7655405edbccb6332512bf34db8e2fa2d7be Mon Sep 17 00:00:00 2001 From: James-hp Date: Tue, 1 Sep 2020 22:47:44 +0800 Subject: [PATCH] mqtt --- .../ruoyi/common/mqtt/domain/MqttBody.java | 9 ++ .../ruoyi/common/mqtt/domain/MqttHead.java | 10 ++ .../ruoyi/common/mqtt/service/Callback.java | 38 ++++++ .../common/mqtt/service/MqttService.java | 123 ++++++++++++++++++ .../com/ruoyi/common/mqtt/service/ss.java | 12 ++ .../common/redis/service/MqttService.java | 7 - 6 files changed, 192 insertions(+), 7 deletions(-) create mode 100644 ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/mqtt/domain/MqttBody.java create mode 100644 ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/mqtt/domain/MqttHead.java create mode 100644 ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/mqtt/service/Callback.java create mode 100644 ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/mqtt/service/MqttService.java create mode 100644 ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/mqtt/service/ss.java delete mode 100644 ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/redis/service/MqttService.java diff --git a/ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/mqtt/domain/MqttBody.java b/ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/mqtt/domain/MqttBody.java new file mode 100644 index 00000000..8dbbc7f7 --- /dev/null +++ b/ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/mqtt/domain/MqttBody.java @@ -0,0 +1,9 @@ +package com.ruoyi.common.mqtt.domain; + +import java.io.Serializable; + +public class MqttBody implements Serializable { + private int code; + private String msg; + private Object body; +} diff --git a/ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/mqtt/domain/MqttHead.java b/ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/mqtt/domain/MqttHead.java new file mode 100644 index 00000000..bd81fdb0 --- /dev/null +++ b/ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/mqtt/domain/MqttHead.java @@ -0,0 +1,10 @@ +package com.ruoyi.common.mqtt.domain; + +import java.io.Serializable; + +public class MqttHead implements Serializable { + + private int type; + private String topic; + private MqttBody body; +} diff --git a/ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/mqtt/service/Callback.java b/ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/mqtt/service/Callback.java new file mode 100644 index 00000000..d267a147 --- /dev/null +++ b/ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/mqtt/service/Callback.java @@ -0,0 +1,38 @@ +package com.ruoyi.common.mqtt.service; + +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class Callback implements MqttCallback { + private static final Logger log = LoggerFactory.getLogger(Callback.class); + /** + * MQTT 断开连接会执行此方法 + */ + @Override + public void connectionLost(Throwable throwable) { + log.info("断开了MQTT连接 :{}", throwable.getMessage()); + log.error(throwable.getMessage(), throwable); + } + + /** + * publish发布成功后会执行到这里 + */ + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + log.info("发布消息成功"); + } + + /** + * subscribe订阅后得到的消息会执行到这里 + */ + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + // TODO 此处可以将订阅得到的消息进行业务处理、数据存储 + log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload())); + } + +} diff --git a/ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/mqtt/service/MqttService.java b/ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/mqtt/service/MqttService.java new file mode 100644 index 00000000..29328b72 --- /dev/null +++ b/ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/mqtt/service/MqttService.java @@ -0,0 +1,123 @@ +package com.ruoyi.common.mqtt.service; + +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.stereotype.Component; +import org.eclipse.paho.client.mqttv3.MqttCallback; + +@Component +public class MqttService { + + private String HOST = "tcp://127.0.0.1:1883"; //mqtt服务器的地址和端口号 + private final String clientId = "DC" + (int) (Math.random() * 100000000); + private MqttClient mqttClient; + + /** + * 客户端connect连接mqtt服务器 + * + * @param userName 用户名 + * @param passWord 密码 + * @param mqttCallback 回调函数 + **/ + public void setMqttClient(String userName, String passWord, MqttCallback mqttCallback) throws MqttException { + MqttConnectOptions options = mqttConnectOptions(userName, passWord); + if (mqttCallback == null) { + mqttClient.setCallback(new Callback()); + } else { + mqttClient.setCallback(mqttCallback); + } + mqttClient.connect(options); + } + + /** + * MQTT连接参数设置 + */ + private MqttConnectOptions mqttConnectOptions(String userName, String passWord) throws MqttException { + mqttClient = new MqttClient(HOST, clientId, new MemoryPersistence()); + MqttConnectOptions options = new MqttConnectOptions(); + options.setUserName(userName); + options.setPassword(passWord.toCharArray()); + options.setConnectionTimeout(10);///默认:30 + options.setAutomaticReconnect(true);//默认:false + options.setCleanSession(false);//默认:true + //options.setKeepAliveInterval(20);//默认:60 + return options; + } + + /** + * 关闭MQTT连接 + */ + public void close() throws MqttException { + mqttClient.close(); + mqttClient.disconnect(); + } + + /** + * 向某个主题发布消息 默认qos:1 + * + * @param topic:发布的主题 + * @param msg:发布的消息 + */ + public void pub(String topic, String msg) throws MqttException { + MqttMessage mqttMessage = new MqttMessage(); + //mqttMessage.setQos(2); + mqttMessage.setPayload(msg.getBytes()); + MqttTopic mqttTopic = mqttClient.getTopic(topic); + MqttDeliveryToken token = mqttTopic.publish(mqttMessage); + token.waitForCompletion(); + } + + /** + * 向某个主题发布消息 + * + * @param topic: 发布的主题 + * @param msg: 发布的消息 + * @param qos: 消息质量 Qos:0、1、2 + */ + public void pub(String topic, String msg, int qos) throws MqttException { + MqttMessage mqttMessage = new MqttMessage(); + mqttMessage.setQos(qos); + mqttMessage.setPayload(msg.getBytes()); + MqttTopic mqttTopic = mqttClient.getTopic(topic); + MqttDeliveryToken token = mqttTopic.publish(mqttMessage); + token.waitForCompletion(); + } + + /** + * 订阅某一个主题 ,此方法默认的的Qos等级为:1 + * + * @param topic 主题 + */ + public void sub(String topic) throws MqttException { + mqttClient.subscribe(topic); + } + + /** + * 订阅某一个主题,可携带Qos + * + * @param topic 所要订阅的主题 + * @param qos 消息质量:0、1、2 + */ + public void sub(String topic, int qos) throws MqttException { + mqttClient.subscribe(topic, qos); + } + + /** + * main函数自己测试用 + */ + public static void main(String[] args) throws MqttException { + MqttService mqttConnect = new MqttService(); + mqttConnect.setMqttClient("admin", "public", new Callback()); + mqttConnect.pub("com/iot/init", "Mr.Qu" + (int) (Math.random() * 100000000)); + //mqttConnect.sub("com/iot/init"); + } + /** + * main函数自己测试用 + */ + public static void main1(String[] args) throws MqttException { + MqttService mqttConnect = new MqttService(); + mqttConnect.setMqttClient("admin", "public", new Callback()); + mqttConnect.pub("com/iot/init", "Mr.Qu" + (int) (Math.random() * 100000000)); + //mqttConnect.sub("com/iot/init"); + } +} diff --git a/ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/mqtt/service/ss.java b/ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/mqtt/service/ss.java new file mode 100644 index 00000000..9c0d01cb --- /dev/null +++ b/ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/mqtt/service/ss.java @@ -0,0 +1,12 @@ +package com.ruoyi.common.mqtt.service; + +import org.eclipse.paho.client.mqttv3.MqttException; + +public class ss { + public static void main(String[] args) throws MqttException { + MqttService mqttConnect = new MqttService(); + mqttConnect.setMqttClient("admin", "public", new Callback()); + mqttConnect.pub("com/iot/init", "Mr.Qu" + (int) (Math.random() * 100000000)); + //mqttConnect.sub("com/iot/init"); + } +} diff --git a/ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/redis/service/MqttService.java b/ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/redis/service/MqttService.java deleted file mode 100644 index 9b782baf..00000000 --- a/ruoyi-common/ruoyi-common-mqtt/src/main/java/com/ruoyi/common/redis/service/MqttService.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.ruoyi.common.redis.service; - -import org.springframework.stereotype.Component; - -@Component -public class MqttService { -}