VUE3
本文主要介绍如何在 VUE3 项目中使用 MQTT.js 客户端库 ,实现 SX-IOT设备与 MQTT 服务器的连接、订阅、收发消息等功能。
前提条件
安装依赖包
sudo apt install nodejs npm -y
npm install
连接使用
连接设置
本文将使用自定义的接入认证方式,服务器接入信息如下: -Broker: mqtt.geek-smart.cn -TCP Port: 1883 -WebSocket Port: 8083
导入依赖包
import * as mqtt from "mqtt/dist/mqtt.min";
import { reactive, ref } from "vue";
定义连接地址、认证信息以及消息发布主题
const connection = reactive({
// ws or wss
protocol: "ws",
host: "mqtt.geek-smart.cn",
// ws -> 8083; wss -> 8084
port: 8083,
clientId: "vue3-client",
username: "xxxxxxxxxxxxxx",
password: "xxxxxxxxxxxxxxxxxxxxxx",
clean: true,
connectTimeout: 30 * 1000, // ms
reconnectPeriod: 4000, // ms
});
const pub_topic = '/HYUGHV/lVtAcHuorxxx/4cebd60bfxxx/publish'
const sub_topic = '/HYUGHV/lVtAcHuorxxx/4cebd60bfxxx/subscribe'
定义消息发布函数
const publish = ref({
topic: pub_topic,
qos: 0 as mqtt.QoS,
payload: '{ "type": "info" }',
});
定义 doSubscribe,doUnSubscribe 回调函数,用于打印订阅主题接收的消息内容
// subscribe topic
// https://github.com/mqttjs/MQTT.js#mqttclientsubscribetopictopic-arraytopic-object-options-callback
const doSubscribe = () => {
btnLoadingType.value = "subscribe";
const { topic, qos } = subscription.value;
client.value.subscribe(
topic,
{ qos },
(error: Error, granted: mqtt.ISubscriptionGrant[]) => {
btnLoadingType.value = "";
if (error) {
console.log("subscribe error:", error);
return;
}
subscribedSuccess.value = true;
console.log("subscribe successfully:", granted);
}
);
};
// unsubscribe topic
// https://github.com/mqttjs/MQTT.js#mqttclientunsubscribetopictopic-array-options-callback
const doUnSubscribe = () => {
btnLoadingType.value = "unsubscribe";
const { topic, qos } = subscription.value;
client.value.unsubscribe(topic, { qos }, (error) => {
btnLoadingType.value = "";
subscribedSuccess.value = false;
if (error) {
console.log("unsubscribe error:", error);
return;
}
console.log(`unsubscribed topic: ${topic}`);
});
};
连接 MQTT
const createConnection = () => {
try {
btnLoadingType.value = "connect";
const { protocol, host, port, ...options } = connection;
const connectUrl = `${protocol}://${host}:${port}/mqtt`;
client.value = mqtt.connect(connectUrl, options);
if (client.value.on) {
// https://github.com/mqttjs/MQTT.js#event-connect
client.value.on("connect", () => {
btnLoadingType.value = "";
console.log("connection successful");
});
// https://github.com/mqttjs/MQTT.js#event-reconnect
client.value.on("reconnect", handleOnReConnect);
// https://github.com/mqttjs/MQTT.js#event-error
client.value.on("error", (error) => {
console.log("connection error:", error);
});
// https://github.com/mqttjs/MQTT.js#event-message
client.value.on("message", (topic: string, message) => {
receivedMessages.value = receivedMessages.value.concat(
message.toString()
);
console.log(`received message: ${message} from topic: ${topic}`);
});
}
} catch (error) {
btnLoadingType.value = "";
console.log("mqtt.connect error:", error);
}
};
完整代码
// vue 3 + vite use MQTT.js refer to https://github.com/mqttjs/MQTT.js/issues/1269
import * as mqtt from "mqtt/dist/mqtt.min";
import { reactive, ref } from "vue";
// https://github.com/mqttjs/MQTT.js#qos
const qosList = [0, 1, 2];
const connection = reactive({
// ws or wss
protocol: "ws",
host: "mqtt.geek-smart.cn",
// ws -> 8083; wss -> 8084
port: 8083,
clientId: "vue3-client",
username: "xxxxxxxxxxxxxx",
password: "xxxxxxxxxxxxxxxxxxxxxx",
clean: true,
connectTimeout: 30 * 1000, // ms
reconnectPeriod: 4000, // ms
});
const pub_topic = '/HYUGHV/lVtAcHuorxxx/4cebd60bfxxx/publish'
const sub_topic = '/HYUGHV/lVtAcHuorxxx/4cebd60bfxxx/subscribe'
// topic & QoS for MQTT subscribing
const subscription = ref({
topic: sub_topic,
qos: 0 as mqtt.QoS,
});
// topic, QoS & payload for publishing message
const publish = ref({
topic: pub_topic,
qos: 0 as mqtt.QoS,
payload: '{ "type": "info" }',
});
let client = ref({
connected: false,
} as mqtt.MqttClient);
const receivedMessages = ref("");
const subscribedSuccess = ref(false);
const btnLoadingType = ref("");
const retryTimes = ref(0);
const initData = () => {
client.value = {
connected: false,
} as mqtt.MqttClient;
retryTimes.value = 0;
btnLoadingType.value = "";
subscribedSuccess.value = false;
};
const handleOnReConnect = () => {
retryTimes.value += 1;
if (retryTimes.value > 5) {
try {
client.value.end();
initData();
console.log("connection maxReconnectTimes limit, stop retry");
} catch (error) {
console.log("handleOnReConnect catch error:", error);
}
}
};
// create MQTT connection
const createConnection = () => {
try {
btnLoadingType.value = "connect";
const { protocol, host, port, ...options } = connection;
const connectUrl = `${protocol}://${host}:${port}/mqtt`;
client.value = mqtt.connect(connectUrl, options);
if (client.value.on) {
// https://github.com/mqttjs/MQTT.js#event-connect
client.value.on("connect", () => {
btnLoadingType.value = "";
console.log("connection successful");
});
// https://github.com/mqttjs/MQTT.js#event-reconnect
client.value.on("reconnect", handleOnReConnect);
// https://github.com/mqttjs/MQTT.js#event-error
client.value.on("error", (error) => {
console.log("connection error:", error);
});
// https://github.com/mqttjs/MQTT.js#event-message
client.value.on("message", (topic: string, message) => {
receivedMessages.value = receivedMessages.value.concat(
message.toString()
);
console.log(`received message: ${message} from topic: ${topic}`);
});
}
} catch (error) {
btnLoadingType.value = "";
console.log("mqtt.connect error:", error);
}
};
const doSubscribe = () => {
btnLoadingType.value = "subscribe";
const { topic, qos } = subscription.value;
client.value.subscribe(
topic,
{ qos },
(error: Error, granted: mqtt.ISubscriptionGrant[]) => {
btnLoadingType.value = "";
if (error) {
console.log("subscribe error:", error);
return;
}
subscribedSuccess.value = true;
console.log("subscribe successfully:", granted);
}
);
};
const doUnSubscribe = () => {
btnLoadingType.value = "unsubscribe";
const { topic, qos } = subscription.value;
client.value.unsubscribe(topic, { qos }, (error) => {
btnLoadingType.value = "";
subscribedSuccess.value = false;
if (error) {
console.log("unsubscribe error:", error);
return;
}
console.log(`unsubscribed topic: ${topic}`);
});
};
const doPublish = () => {
btnLoadingType.value = "publish";
const { topic, qos, payload } = publish.value;
client.value.publish(topic, payload, { qos }, (error) => {
btnLoadingType.value = "";
if (error) {
console.log("publish error:", error);
return;
}
console.log(`published message: ${payload}`);
});
};
const destroyConnection = () => {
if (client.value.connected) {
btnLoadingType.value = "disconnect";
try {
client.value.end(false, () => {
initData();
console.log("disconnected successfully");
});
} catch (error) {
btnLoadingType.value = "";
console.log("disconnect error:", error);
}
}
};
const handleProtocolChange = (value: string) => {
connection.port = value === "wss" ? 8084 : 8083;
};
测试验证
运行
npm run dev