Go
本文主要介绍如何在 Go 项目中,使用 paho.mqtt.golang 库实现一个 MQTT 客戶端与 SX-IOT物联网设备的连接、订阅、收发消息等功能。 Go 是 Google 开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言。而 paho.mqtt.golang 是一个 MQTT 库,它提供了一个简单的 API,用于在 Go 项目中连接到 MQTT 服务器,并发送和接收消息。
前提条件
安装依赖包
sudo apt install golang-go -y
go env -w GOPROXY=https://goproxy.cn
cd mqtt-client-Go-paho
go mod init mqtt-client-Go-paho
go get github.com/eclipse/paho.mqtt.golang
连接使用
连接设置
本文将使用自定义的接入认证方式,服务器接入信息如下:
- Broker: mqtt.geek-smart.cn
- TCP Port: 1883
- WebSocket Port: 8083
导入依赖包
import (
"fmt"
"log"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
定义连接地址、认证信息以及消息发布主题
const protocol = "tcp"
const broker = "mqtt.geek-smart.cn"
const port = 1883
const sub_topic = "/HYUGHV/lVtAcHuor***/4cebd60bf***/subscribe"
const pub_topic = "/HYUGHV/lVtAcHuor***/4cebd60bf***/publish"
const username = "************"
const password = "************"
定义消息发布函数
func publish(client mqtt.Client) {
qos := 0
msgCount := 0
for {
payload := `{"type":"info"}`
if token := client.Publish(pub_topic, byte(qos), false, payload); token.Wait() && token.Error() != nil {
fmt.Printf("publish failed, topic: %s, payload: %s\n", pub_topic, payload)
} else {
fmt.Printf("publish success, topic: %s, payload: %s\n", pub_topic, payload)
}
msgCount++
time.Sleep(time.Second * 1)
}
}
定义 on_message 回调函数,用于打印订阅主题接收的消息内容
func subscribe(client mqtt.Client) {
qos := 0
client.Subscribe(sub_topic, byte(qos), func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received `%s` from `%s` topic\n", msg.Payload(), msg.Topic())
})
}
初始化 MQTT 客户端并订阅主题
func createMqttClient() mqtt.Client {
connectAddress := fmt.Sprintf("%s://%s:%d", protocol, broker, port)
clientID := fmt.Sprintf("go-client")
fmt.Println("connect address: ", connectAddress)
opts := mqtt.NewClientOptions()
opts.AddBroker(connectAddress)
opts.SetUsername(username)
opts.SetPassword(password)
opts.SetClientID(clientID)
opts.SetKeepAlive(time.Second * 60)
client := mqtt.NewClient(opts)
token := client.Connect()
if token.WaitTimeout(3*time.Second) && token.Error() != nil {
log.Fatal(token.Error())
}
return client
}
完整代码
package main
import (
"fmt"
"log"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
const protocol = "tcp"
const broker = "mqtt.geek-smart.cn"
const port = 1883
const sub_topic = "/HYUGHV/lVtAcHuor***/4cebd60bf***/subscribe"
const pub_topic = "/HYUGHV/lVtAcHuor***/4cebd60bf***/publish"
const username = "************"
const password = "************"
func main() {
client := createMqttClient()
go subscribe(client) // 在主函数里, 我们用另起一个 go 协程来订阅消息
time.Sleep(time.Second * 1) // 暂停一秒等待 subscribe 完成
publish(client)
}
func createMqttClient() mqtt.Client {
connectAddress := fmt.Sprintf("%s://%s:%d", protocol, broker, port)
clientID := fmt.Sprintf("go-client")
fmt.Println("connect address: ", connectAddress)
opts := mqtt.NewClientOptions()
opts.AddBroker(connectAddress)
opts.SetUsername(username)
opts.SetPassword(password)
opts.SetClientID(clientID)
opts.SetKeepAlive(time.Second * 60)
client := mqtt.NewClient(opts)
token := client.Connect()
if token.WaitTimeout(3*time.Second) && token.Error() != nil {
log.Fatal(token.Error())
}
return client
}
func publish(client mqtt.Client) {
qos := 0
msgCount := 0
for {
payload := `{"type":"info"}`
if token := client.Publish(pub_topic, byte(qos), false, payload); token.Wait() && token.Error() != nil {
fmt.Printf("publish failed, topic: %s, payload: %s\n", pub_topic, payload)
} else {
fmt.Printf("publish success, topic: %s, payload: %s\n", pub_topic, payload)
}
msgCount++
time.Sleep(time.Second * 1)
}
}
func subscribe(client mqtt.Client) {
qos := 0
client.Subscribe(sub_topic, byte(qos), func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received `%s` from `%s` topic\n", msg.Payload(), msg.Topic())
})
}
测试验证
运行
go run main.go