自建平台API接入
Java

java

本文主要介绍如何在 Java 项目中,使用 Eclipse Paho Java Client (opens in a new tab) 库实现一个 MQTT 客戶端与 SX-IOT物联网设备的连接、订阅、收发消息等功能。

前提条件

安装依赖包

sudo apt install openjdk-11-jdk maven -v
cd mqtt-client-Java-paho
mvn install

连接使用

连接设置

本文将使用自定义的接入认证方式,服务器接入信息如下: -Broker: mqtt.geek-smart.cn -TCP Port: 1883 -WebSocket Port: 8083

导入依赖包

 package io.geekopen.mqtt;
 
 import org.eclipse.paho.client.mqttv3.*;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

定义连接地址、认证信息以及消息发布主题

String sub_topic = "/HYUGHV/lVtAcHuorxxx/4cebd60bfxxx/subscribe";
String pub_topic = "/HYUGHV/lVtAcHuorxxx/4cebd60bfxxx/publish";
String content = "{\"type\":\"info\"}";
int qos = 0;
String broker = "tcp://mqtt.geek-smart.cn:1883";
String username = "xxxxxxxxxxxxx";
String password = "xxxxxxxxxxxxx";
String clientId = MqttClient.generateClientId();

消息发布

 MqttMessage message = new MqttMessage(content.getBytes());
 message.setQos(qos);
 client.publish(pub_topic, message);

Subscribe 回调函数,用于打印订阅主题接收的消息内容

 MqttClient client = new MqttClient(broker, clientId, persistence);
 
 client.setCallback(new MqttCallback() {
     public void connectionLost(Throwable cause) {
         System.out.println("connectionLost: " + cause.getMessage());
     }
     public void messageArrived(String sub_topic, MqttMessage message) {
         System.out.println("topic: " + sub_topic);
         System.out.println("Qos: " + message.getQos());
         System.out.println("message content: " + new String(message.getPayload()));
     }
     public void deliveryComplete(IMqttDeliveryToken token) {
         System.out.println("deliveryComplete---------" + token.isComplete());
     }
 });

初始化 MQTT 客户端并订阅主题

 String clientId = MqttClient.generateClientId();
 MemoryPersistence persistence = new MemoryPersistence();
 MqttConnectOptions connOpts = new MqttConnectOptions();
 connOpts.setCleanSession(true);
 connOpts.setUserName(username);
 connOpts.setPassword(password.toCharArray());
 connOpts.setConnectionTimeout(60);
 connOpts.setKeepAliveInterval(60);

完整代码

 package io.geekopen.mqtt;
 
 import org.eclipse.paho.client.mqttv3.*;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
 
 public class MqttSample {
     public static void main(String[] args) {
         String sub_topic = "/HYUGHV/lVtAcHuorxxx/4cebd60bfxxx/subscribe";
         String pub_topic = "/HYUGHV/lVtAcHuorxxx/4cebd60bfxxx/publish";
         String content = "{\"type\":\"info\"}";
         int qos = 0;
         String broker = "tcp://mqtt.geek-smart.cn:1883";
         String username = "xxxxxxxxxxxxx";
         String password = "xxxxxxxxxxxxx";
         String clientId = MqttClient.generateClientId();
         MemoryPersistence persistence = new MemoryPersistence();
         MqttConnectOptions connOpts = new MqttConnectOptions();
         connOpts.setCleanSession(true);
         connOpts.setUserName(username);
         connOpts.setPassword(password.toCharArray());
         connOpts.setConnectionTimeout(60);
         connOpts.setKeepAliveInterval(60);
 
         try {
             MqttClient client = new MqttClient(broker, clientId, persistence);
 
             client.setCallback(new MqttCallback() {
 
                 public void connectionLost(Throwable cause) {
                     System.out.println("connectionLost: " + cause.getMessage());
                 }
 
                 public void messageArrived(String sub_topic, MqttMessage message) {
                     System.out.println("topic: " + sub_topic);
                     System.out.println("Qos: " + message.getQos());
                     System.out.println("message content: " + new String(message.getPayload()));
 
                 }
 
                 public void deliveryComplete(IMqttDeliveryToken token) {
                     System.out.println("deliveryComplete---------" + token.isComplete());
                 }
 
             });
 
             System.out.println("Connecting to broker: " + broker);
             client.connect(connOpts);
             System.out.println("Connected to broker: " + broker);
 
             MqttMessage message = new MqttMessage(content.getBytes());
             message.setQos(qos);
             client.publish(pub_topic, message);
             System.out.println("Message published");
             client.subscribe(sub_topic, qos);
             System.out.println("Subscribed to topic: " + sub_topic);
             System.out.println("Subscribed to message: " + message);
             // client.disconnect();
             // System.out.println("Disconnected");
             // client.close();
             // System.exit(0);
         } catch (MqttException me) {
             System.out.println("reason " + me.getReasonCode());
             System.out.println("msg " + me.getMessage());
             System.out.println("loc " + me.getLocalizedMessage());
             System.out.println("cause " + me.getCause());
             System.out.println("excep " + me);
             me.printStackTrace();
         }
     }
 
 }

测试验证

运行

 mvn compile
 mvn exec:java -Dexec.mainClass="io.geekopen.mqtt.MqttExample"
 # Only Publish
 mvn exec:java -Dexec.mainClass="io.geekopen.mqtt.PublishSample"
 # Only Subscribe
 mvn exec:java -Dexec.mainClass="io.geekopen.mqtt.SubscribeSample"