阿里云消息队列MQTT SDK代码示例

在控制台完成资源创建后,您可参考如下示例代码发送MQTT消息:
 
package com.chinamobile.tuxedo.mqtt.demo;
 
import com.chinamobile.tuxedo.mqtt.util.ConnectionOptionWrapper;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
/**
 * 本代码提供签名鉴权模式下 Mqtt 客户端发送消息到 Mqtt 客户端的示例,其中初始化参数请根据实际情况修改 签名模式即使用阿里Tuxedo账号系统提供的 AccessKey 和 SecretKey
 * 对每个客户端计算出一个独立的签名供客户端识别使用。 对于实际业务场景使用过程中,考虑到私钥 SecretKey 的隐私性,可以将签名过程放在受信任的环境完成。
 */
public class MqttClientDemo {
    public static void main(String[] args) throws Exception {
        /**
         * 接入点地址,购买 Mqtt 实例,且配置完成后即可获取,此处可填写域名,IPV4地址,IPV6地址。写法示例分别如下:
         * 域名:emqtt-ingress-changsha-1.cmecloud.cn
         * IPV4地址:112.33.255.205
         * IPV6地址:[2409:8050:3000:5000:1400:0000:0000:04C9]
         */
        String endPoint = "XXXXX";
        /**
         * 账号 instanceId,从账号系统控制台获取
         */
        String instanceId = "XXXXX";
        /**
         * 账号 groupId,从账号系统控制台获取
         */
        String groupId = "XXXXX";
        /**
         * 账号 accesskey,从账号系统控制台获取
         */
        String accessKey = "XXXXX";
        /**
         * 账号 secretKey,从账号系统控制台获取,仅在Signature鉴权模式下需要设置
         */
        String secretKey = "XXXXX";
        /**
         * Mqtt clientId,由业务系统分配,需要保证每个 tcp 连接都不一样,保证全局唯一,如果不同的客户端对象(tcp 连接)使用了相同的 clientId 会导致连接异常断开。
         * clientId 由三部分组成,格式为 InstanceId@@@GroupID@@@DeviceId,其中InstanceId、GroupID 在 MQTT 控制台申请,GroupID不得超过32个字符,以GID_或GID-开头,可以包含字母A-Za-z数字0-9,
         * 中划线-和下划线_;DeviceId 由业务方自己设置,可以包含字母A-Za-z数字0-9,中划线-和下划线_, clientId 总长度不得超过100个字符。
         */
        String clientId = instanceId + "@@@" + groupId + "@@@" + "XXXXX";
        /**
         * Mqtt 消息的一级 topic,需要在控制台申请才能使用。
         * 如果使用了没有申请或者没有被授权的 topic 会导致鉴权失败,服务端会断开客户端连接。
         */
        final String parentTopic = "XXXXX";
        /**
         * Mqtt 支持子级 topic,用来做自定义的过滤,此处为示意,可以填写任何字符串
         * 需要注意的是,完整的topic长度不得超过256个字符。
         */
        final String mqttTopic = parentTopic + "/" + "testMqtt";
        /**
         * QoS参数代表传输质量,可选0,1,2根据实际需求合理设置
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * 客户端使用的协议和端口必须匹配, MqttClient是同步client, MqttAsyncClient是异步client,
         * final MqttAsyncClient mqttAsyncClient = new MqttAsyncClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
         * 如果是 SSL 加密,则设置 ssl://endpoiont:8883
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
 
        /**
         * 客户端设置好发送超时时间,防止无限阻塞
         */
        mqttClient.setTimeToWait(5000);
        mqttClient.setCallback(new MqttCallbackExtended() {
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * 客户端连接成功后就需要尽快订阅需要的 topic
                 */
                System.out.println("connect success");
 
                try {
                    final String topicFilter[] = {mqttTopic};
                    final int[] qos = {qosLevel};
                    mqttClient.subscribe(topicFilter, qos);
                } catch (MqttException e) {
                    e.printStackTrace();
                }
 
            }
 
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }
 
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                /**
                 * 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。
                 * 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。
                 */
                System.out.println(
                    "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
            }
 
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
 
        /**
         *  采用 SSL 加密传输时,若 client 端无法联网验证 server 证书, 须配置根证书,选择加密协议版本
         */
        // 仅验证 server 证书
        // connectionOptionWrapper.serverOnlyAuthentication("XXXX", "TLSv1");
        // client 和 server 证书都验证
        //connectionOptionWrapper.serverAndClientAuthentication("XXXX","XXXX","XXXXX","XXX","XXX");
 
        // connectionOptionWrapper.getMqttConnectOptions().setHttpsHostnameVerificationEnabled(false);
 
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 10; i++) {
            MqttMessage message = new MqttMessage("hello mqtt pub sub msg".getBytes());
            message.setQos(qosLevel);
            /**
             *  发送普通消息时,topic 必须和接收方订阅的 topic 一致,或者符合通配符匹配规则
             */
            mqttClient.publish(mqttTopic, message);
        }
        /**
         *  客户端取消订阅的topic
         */
        mqttClient.unsubscribe(mqttTopic);
 
        Thread.sleep(Long.MAX_VALUE);
    }
}  
 
 
在使用MQTT SDK时,请注意以下参数的填写:
accessKey      您账号的 accessKeyId,可在阿里云管理控制台上获取。
secretKey       您账号的secretKey,可在阿里云管理控制台获取,仅在Signature鉴权模式下需要设置。
endPoint MQTT服务的接入点地址,可在控制台概览页查看。
clientID   clientId由三部分组成,格式为 InstanceId@@@GroupID@@@DeviceId,其中 InstanceId、GroupId 在 MQTT 控制台申请,groupId长度范围未7-32个字符,以GID_或GID-开头,可以包含字母A-Za-z数字0-9,中划线-和下划线_;DeviceId 由业务方自己设置,可以包含字母A-Za-z数字0-9,中划线-和下划线_, clientId 总长度不得超过100个字符。
parentTopic   MQTT的一级Topic,需要在控制台申请才可使用。
qosLevel QoS参数代表传输质量,可选0,1和2。