自建平台API接入
Python3

Python3

本文主要介绍如何在 Python 项目中使用 paho-mqtt 客户端库 ,实现 SX-IOT设备与 MQTT 服务器的连接、订阅、收发消息等功能。 paho-mqtt 是目前 Python 中使用较多的 MQTT 客户端库, 它在 Python 2.7.9+ 或 3.6+ 上为客户端类提供了对 MQTT v5.0,v3.1 和 v3.1.1 的支持。它还提供了一些帮助程序功能,使将消息发布到 MQTT 服务器变得非常简单。

前提条件

安装依赖包

sudo apt install python3 python3-pip -y
python3 -m pip install paho-mqtt

连接使用

连接设置

本文将使用自定义的接入认证方式,服务器接入信息如下:

  • Broker: mqtt.geek-smart.cn
  • TCP Port: 1883
  • WebSocket Port: 8083

导入依赖包

from paho.mqtt import client as mqtt_client

定义连接地址、认证信息以及消息发布主题

BROKER = 'mqtt.geek-smart.cn'
PORT = 1883
PUB_TOPIC = "/HYUGHV/lVtAcHuor***/4cebd60bf***/publish"
SUB_TOPIC = "/HYUGHV/lVtAcHuor***/4cebd60bf***/subscribe"
CLIENT_ID = f'python-mqtt-tcp-client'
USERNAME = '************'
PASSWORD = '************'

定义消息发布函数

 def publish(client):
     msg_count = 0
     while not FLAG_EXIT:
         msg_dict = {
             'type': 'info'
         }
         msg = json.dumps(msg_dict)
         if not client.is_connected():
             logging.error("publish: MQTT client is not connected!")
             time.sleep(1)
             continue
         result = client.publish(PUB_TOPIC, msg)
         # result: [0, 1]
         status = result[0]
         if status == 0:
             print(f'Send `{msg}` to topic `{PUB_TOPIC}`')
         else:
             print(f'Failed to send message to topic {PUB_TOPIC}')
         msg_count += 1
         time.sleep(1)

定义 on_message 回调函数,用于打印订阅主题接收的消息内容

 def on_message(client, userdata, msg):
     print(f'Received `{msg.payload.decode()}` from `{msg.topic}` topic')

初始化 MQTT 客户端并订阅主题

 def on_connect(client, userdata, flags, rc):
     if rc == 0 and client.is_connected():
         print("Connected to MQTT Broker!")
         client.subscribe(SUB_TOPIC)
     else:
         print(f'Failed to connect, return code {rc}')
 
 def connect_mqtt():
     client = mqtt_client.Client(CLIENT_ID)
     client.username_pw_set(USERNAME, PASSWORD)
     client.on_connect = on_connect
     client.on_message = on_message
     client.connect(BROKER, PORT, keepalive=120)
     client.on_disconnect = on_disconnect
     return client

完整代码

 import json
 import logging
 import time
 
 from paho.mqtt import client as mqtt_client
 
 BROKER = 'mqtt.geek-smart.cn'
 PORT = 1883
 PUB_TOPIC = "/HYUGHV/lVtAcHuor***/4cebd60bf***/publish"
 SUB_TOPIC = "/HYUGHV/lVtAcHuor***/4cebd60bf***/subscribe"
 CLIENT_ID = f'python-mqtt-tcp-client'
 USERNAME = '************'
 PASSWORD = '************'
 
 
 FIRST_RECONNECT_DELAY = 1
 RECONNECT_RATE = 2
 MAX_RECONNECT_COUNT = 12
 MAX_RECONNECT_DELAY = 60
 
 FLAG_EXIT = False
 
 
 def on_connect(client, userdata, flags, rc):
     if rc == 0 and client.is_connected():
         print("Connected to MQTT Broker!")
         client.subscribe(SUB_TOPIC)
     else:
         print(f'Failed to connect, return code {rc}')
 
 
 def on_disconnect(client, userdata, rc):
     logging.info("Disconnected with result code: %s", rc)
     reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
     while reconnect_count < MAX_RECONNECT_COUNT:
         logging.info("Reconnecting in %d seconds...", reconnect_delay)
         time.sleep(reconnect_delay)
 
         try:
             client.reconnect()
             logging.info("Reconnected successfully!")
             return
         except Exception as err:
             logging.error("%s. Reconnect failed. Retrying...", err)
 
         reconnect_delay *= RECONNECT_RATE
         reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
         reconnect_count += 1
     logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count)
     global FLAG_EXIT
     FLAG_EXIT = True
 
 
 def on_message(client, userdata, msg):
     print(f'Received `{msg.payload.decode()}` from `{msg.topic}` topic')
 
 
 def connect_mqtt():
     client = mqtt_client.Client(CLIENT_ID)
     client.username_pw_set(USERNAME, PASSWORD)
     client.on_connect = on_connect
     client.on_message = on_message
     client.connect(BROKER, PORT, keepalive=120)
     client.on_disconnect = on_disconnect
     return client
 
 
 def publish(client):
     msg_count = 0
     while not FLAG_EXIT:
         msg_dict = {
             'type': 'info'
         }
         msg = json.dumps(msg_dict)
         if not client.is_connected():
             logging.error("publish: MQTT client is not connected!")
             time.sleep(1)
             continue
         result = client.publish(PUB_TOPIC, msg)
         # result: [0, 1]
         status = result[0]
         if status == 0:
             print(f'Send `{msg}` to topic `{PUB_TOPIC}`')
         else:
             print(f'Failed to send message to topic {PUB_TOPIC}')
         msg_count += 1
         time.sleep(1)
 
 
 def run():
     logging.basicConfig(format='%(asctime)s - %(levelname)s: %(message)s',
                         level=logging.DEBUG)
     client = connect_mqtt()
     client.loop_start()
     time.sleep(1)
     if client.is_connected():
         publish(client)
     else:
         client.loop_stop()
 
 
 if __name__ == '__main__':
     run()    

测试验证

运行

python3 pub_sub_tcp.py