自建平台API接入
VUE3

VUE3

本文主要介绍如何在 VUE3 项目中使用 MQTT.js 客户端库 ,实现 SX-IOT设备与 MQTT 服务器的连接、订阅、收发消息等功能。

前提条件

安装依赖包

sudo apt install nodejs npm -y
npm install

连接使用

连接设置

本文将使用自定义的接入认证方式,服务器接入信息如下: -Broker: mqtt.geek-smart.cn -TCP Port: 1883 -WebSocket Port: 8083

导入依赖包

import * as mqtt from "mqtt/dist/mqtt.min";
import { reactive, ref } from "vue";

定义连接地址、认证信息以及消息发布主题

 const connection = reactive({
   // ws or wss
   protocol: "ws",
   host: "mqtt.geek-smart.cn",
   // ws -> 8083; wss -> 8084
   port: 8083,
   clientId: "vue3-client",
 
   username: "xxxxxxxxxxxxxx",
   password: "xxxxxxxxxxxxxxxxxxxxxx",
   clean: true,
   connectTimeout: 30 * 1000, // ms
   reconnectPeriod: 4000, // ms
 });
 const pub_topic = '/HYUGHV/lVtAcHuorxxx/4cebd60bfxxx/publish'
 const sub_topic = '/HYUGHV/lVtAcHuorxxx/4cebd60bfxxx/subscribe'

定义消息发布函数

 const publish = ref({
   topic: pub_topic,
   qos: 0 as mqtt.QoS,
   payload: '{ "type": "info" }',
 });

定义 doSubscribe,doUnSubscribe 回调函数,用于打印订阅主题接收的消息内容

 // subscribe topic
 // https://github.com/mqttjs/MQTT.js#mqttclientsubscribetopictopic-arraytopic-object-options-callback
 const doSubscribe = () => {
   btnLoadingType.value = "subscribe";
   const { topic, qos } = subscription.value;
   client.value.subscribe(
     topic,
     { qos },
     (error: Error, granted: mqtt.ISubscriptionGrant[]) => {
       btnLoadingType.value = "";
       if (error) {
         console.log("subscribe error:", error);
         return;
       }
       subscribedSuccess.value = true;
       console.log("subscribe successfully:", granted);
     }
   );
 };
 
 // unsubscribe topic
 // https://github.com/mqttjs/MQTT.js#mqttclientunsubscribetopictopic-array-options-callback
 const doUnSubscribe = () => {
   btnLoadingType.value = "unsubscribe";
   const { topic, qos } = subscription.value;
   client.value.unsubscribe(topic, { qos }, (error) => {
     btnLoadingType.value = "";
     subscribedSuccess.value = false;
     if (error) {
       console.log("unsubscribe error:", error);
       return;
     }
     console.log(`unsubscribed topic: ${topic}`);
   });
 };

连接 MQTT

 const createConnection = () => {
   try {
     btnLoadingType.value = "connect";
     const { protocol, host, port, ...options } = connection;
     const connectUrl = `${protocol}://${host}:${port}/mqtt`;
 
     client.value = mqtt.connect(connectUrl, options);
 
     if (client.value.on) {
       // https://github.com/mqttjs/MQTT.js#event-connect
       client.value.on("connect", () => {
         btnLoadingType.value = "";
         console.log("connection successful");
       });
 
       // https://github.com/mqttjs/MQTT.js#event-reconnect
       client.value.on("reconnect", handleOnReConnect);
 
       // https://github.com/mqttjs/MQTT.js#event-error
       client.value.on("error", (error) => {
         console.log("connection error:", error);
       });
 
       // https://github.com/mqttjs/MQTT.js#event-message
       client.value.on("message", (topic: string, message) => {
         receivedMessages.value = receivedMessages.value.concat(
           message.toString()
         );
         console.log(`received message: ${message} from topic: ${topic}`);
       });
     }
   } catch (error) {
     btnLoadingType.value = "";
     console.log("mqtt.connect error:", error);
   }
 };

完整代码

 // vue 3 + vite use MQTT.js refer to https://github.com/mqttjs/MQTT.js/issues/1269
 import * as mqtt from "mqtt/dist/mqtt.min";
 import { reactive, ref } from "vue";
 
 // https://github.com/mqttjs/MQTT.js#qos
 const qosList = [0, 1, 2];
 
 
 const connection = reactive({
   // ws or wss
   protocol: "ws",
   host: "mqtt.geek-smart.cn",
   // ws -> 8083; wss -> 8084
   port: 8083,
   clientId: "vue3-client",
 
   username: "xxxxxxxxxxxxxx",
   password: "xxxxxxxxxxxxxxxxxxxxxx",
   clean: true,
   connectTimeout: 30 * 1000, // ms
   reconnectPeriod: 4000, // ms
 });
 const pub_topic = '/HYUGHV/lVtAcHuorxxx/4cebd60bfxxx/publish'
 const sub_topic = '/HYUGHV/lVtAcHuorxxx/4cebd60bfxxx/subscribe'
 // topic & QoS for MQTT subscribing
 const subscription = ref({
   topic: sub_topic,
   qos: 0 as mqtt.QoS,
 });
 
 // topic, QoS & payload for publishing message
 const publish = ref({
   topic: pub_topic,
   qos: 0 as mqtt.QoS,
   payload: '{ "type": "info" }',
 });
 
 let client = ref({
   connected: false,
 } as mqtt.MqttClient);
 const receivedMessages = ref("");
 const subscribedSuccess = ref(false);
 const btnLoadingType = ref("");
 const retryTimes = ref(0);
 
 const initData = () => {
   client.value = {
     connected: false,
   } as mqtt.MqttClient;
   retryTimes.value = 0;
   btnLoadingType.value = "";
   subscribedSuccess.value = false;
 };
 
 const handleOnReConnect = () => {
   retryTimes.value += 1;
   if (retryTimes.value > 5) {
     try {
       client.value.end();
       initData();
       console.log("connection maxReconnectTimes limit, stop retry");
     } catch (error) {
       console.log("handleOnReConnect catch error:", error);
     }
   }
 };
 
 // create MQTT connection
 const createConnection = () => {
   try {
     btnLoadingType.value = "connect";
     const { protocol, host, port, ...options } = connection;
     const connectUrl = `${protocol}://${host}:${port}/mqtt`;
 
     client.value = mqtt.connect(connectUrl, options);
 
     if (client.value.on) {
       // https://github.com/mqttjs/MQTT.js#event-connect
       client.value.on("connect", () => {
         btnLoadingType.value = "";
         console.log("connection successful");
       });
 
       // https://github.com/mqttjs/MQTT.js#event-reconnect
       client.value.on("reconnect", handleOnReConnect);
 
       // https://github.com/mqttjs/MQTT.js#event-error
       client.value.on("error", (error) => {
         console.log("connection error:", error);
       });
 
       // https://github.com/mqttjs/MQTT.js#event-message
       client.value.on("message", (topic: string, message) => {
         receivedMessages.value = receivedMessages.value.concat(
           message.toString()
         );
         console.log(`received message: ${message} from topic: ${topic}`);
       });
     }
   } catch (error) {
     btnLoadingType.value = "";
     console.log("mqtt.connect error:", error);
   }
 };
 
 const doSubscribe = () => {
   btnLoadingType.value = "subscribe";
   const { topic, qos } = subscription.value;
   client.value.subscribe(
     topic,
     { qos },
     (error: Error, granted: mqtt.ISubscriptionGrant[]) => {
       btnLoadingType.value = "";
       if (error) {
         console.log("subscribe error:", error);
         return;
       }
       subscribedSuccess.value = true;
       console.log("subscribe successfully:", granted);
     }
   );
 };
 
 const doUnSubscribe = () => {
   btnLoadingType.value = "unsubscribe";
   const { topic, qos } = subscription.value;
   client.value.unsubscribe(topic, { qos }, (error) => {
     btnLoadingType.value = "";
     subscribedSuccess.value = false;
     if (error) {
       console.log("unsubscribe error:", error);
       return;
     }
     console.log(`unsubscribed topic: ${topic}`);
   });
 };
 
 
 const doPublish = () => {
   btnLoadingType.value = "publish";
   const { topic, qos, payload } = publish.value;
   client.value.publish(topic, payload, { qos }, (error) => {
     btnLoadingType.value = "";
     if (error) {
       console.log("publish error:", error);
       return;
     }
     console.log(`published message: ${payload}`);
   });
 };
 
 const destroyConnection = () => {
   if (client.value.connected) {
     btnLoadingType.value = "disconnect";
     try {
       client.value.end(false, () => {
         initData();
         console.log("disconnected successfully");
       });
     } catch (error) {
       btnLoadingType.value = "";
       console.log("disconnect error:", error);
     }
   }
 };
 
 const handleProtocolChange = (value: string) => {
   connection.port = value === "wss" ? 8084 : 8083;
 };

测试验证

运行

npm run dev