阿里云消息队列RocketMQ发送/订阅普通消息SDK代码

阿里云优惠券 阿里云知识 2年前 (2021-05-21) 203次浏览 0个评论 扫描二维码

发送普通消息
消息队列RocketMQ产品发送普通消息有两种实现方式:可靠同步发送、可靠异步发送:

发送方式

发送 TPS

发送结果反馈

可靠性

同步发送

不丢失

异步发送

不丢失

同步发送示例代码
 
package com.chinamobile.tuxedo.sdk.example;
 
import com.chinamobile.tuxedo.sdk.api.Message;
import com.chinamobile.tuxedo.sdk.api.Producer;
import com.chinamobile.tuxedo.sdk.api.PropertyKeyConst;
import com.chinamobile.tuxedo.sdk.api.SendResult;
import com.chinamobile.tuxedo.sdk.api.TuxeFactory;
import java.util.Date;
import java.util.Properties;
 
public class ProducerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //  控制台创建的Group
        properties.put(PropertyKeyConst.GROUP_ID, "GID_xxx");
        //  控制台创建的实例
        properties.put(PropertyKeyConst.INSTANCE_ID, "MQ_INST_xxx");
        // 启用鉴权认证
        properties.setProperty(PropertyKeyConst.AuthenticationRequired, "true");
        // 开启保存消息轨迹
        properties.put(PropertyKeyConst.MsgTraceSwitch, true);
        // AccessKey 身份验证,在移动云管理控制台创建
        properties.put(PropertyKeyConst.AccessKey, "xxx");
        // SecretKey 身份验证,在移动云管理控制台创建
        properties.put(PropertyKeyConst.SecretKey, "xxx");
        //设置发送超时时间,单位毫秒
        properties.put(PropertyKeyConst.SendMsgTimeoutMillis, 30000);
        // 设置公网接入点
        properties.put(PropertyKeyConst.TuxeAddr, "xxx");
        Producer producer = TuxeFactory.createProducer(properties);
        // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
        producer.start();
        //循环发送消息
        for (int i = 0; i < 10; i++) {
            Message msg = new Message( //
                // Message 所属的 Topic, 控制台创建
                "xxx",
                // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列RocketMQ的服务器过滤
                "TagA",
                // Message Body 可以是任何二进制形式的数据, 消息队列RocketMQ不做任何干预,
                // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
                "Hello MQ".getBytes());
            // 设置代表消息的业务关键属性,请尽可能全局唯一。
            // 以方便您在无法正常收到消息情况下,可通过移动云消息队列RocketMQ管理控制台查询消息并补发
            // 注意:不设置也不会影响消息正常收发
            msg.setKey("ORDERID_" + i);
            try {
                SendResult sendResult = producer.send(msg);
                // 同步发送消息,只要不抛异常就是成功
                if (sendResult != null) {
                    System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                }
            } catch (Exception e) {
                // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
                System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }
        // 在应用退出前,销毁 Producer 对象
        // 注意:如果不销毁也没有问题
        producer.shutdown();
    }
}
 
 
 
异步发送示例代码
 
package com.chinamobile.tuxedo.sdk.example;
 
import com.chinamobile.tuxedo.sdk.api.Message;
import com.chinamobile.tuxedo.sdk.api.OnExceptionContext;
import com.chinamobile.tuxedo.sdk.api.Producer;
import com.chinamobile.tuxedo.sdk.api.PropertyKeyConst;
import com.chinamobile.tuxedo.sdk.api.SendCallback;
import com.chinamobile.tuxedo.sdk.api.SendResult;
import com.chinamobile.tuxedo.sdk.api.TuxeFactory;
import java.util.Properties;
 
public class ProducerAsyncSendTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //  控制台创建的Group
        properties.put(PropertyKeyConst.GROUP_ID, "xxx");
        //  控制台创建的实例
        properties.put(PropertyKeyConst.INSTANCE_ID, "MQ_INST_xxx");
        // 启用鉴权认证
        properties.setProperty(PropertyKeyConst.AuthenticationRequired, "true");
        // 开启保存消息轨迹
        properties.put(PropertyKeyConst.MsgTraceSwitch, true);
        // AccessKey 身份验证,在移动云管理控制台创建
        properties.put(PropertyKeyConst.AccessKey, "xxx");
        // SecretKey 身份验证,在移动云管理控制台创建
        properties.put(PropertyKeyConst.SecretKey, "xxx");
        // 设置发送超时时间,单位毫秒
        properties.put(PropertyKeyConst.SendMsgTimeoutMillis, 30000);
        // 设置公网接入点
        properties.put(PropertyKeyConst.TuxeAddr, "xxx");
        Producer producer = TuxeFactory.createProducer(properties);
        // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
        producer.start();
        Message msg = new Message(
            // Message 所属的 Topic, 控制台创建
            "xxx",
            // Message Tag,可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列的服务器过滤
            "TagA",
            // Message Body,任何二进制形式的数据,消息队列RocketMQ不做任何干预,需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
            "Hello MQ".getBytes());
        // 设置代表消息的业务关键属性,请尽可能全局唯一。 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
        // 注意:不设置也不会影响消息正常收发
        msg.setKey("KeyA");
        // 异步发送消息, 发送结果通过 callback 返回给客户端。
        producer.sendAsync(msg, new SendCallback() {
            @Override
            public void onSuccess(final SendResult sendResult) {
                // 消费发送成功
                System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
            }
 
            @Override
            public void onException(OnExceptionContext context) {
                // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
                System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
            }
        });
    }
}
订阅普通消息
 
 
消息队列RocketMQ支持以下两种订阅方式:
 
集群订阅:同一个Group ID所标识的所有Consumer平均分摊消费消息。 例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例会分摊消费,最后总共消费的消息总数为9条消息。
 
// 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
广播订阅:同一个Group ID所标识的所有Consumer都会各自消费某条消息一次。 例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。
 
// 广播订阅方式设置
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
 
 
示例代码
 
package com.chinamobile.tuxedo.sdk.example;
 
import com.chinamobile.tuxedo.sdk.api.Action;
import com.chinamobile.tuxedo.sdk.api.ConsumeContext;
import com.chinamobile.tuxedo.sdk.api.Consumer;
import com.chinamobile.tuxedo.sdk.api.Message;
import com.chinamobile.tuxedo.sdk.api.MessageListener;
import com.chinamobile.tuxedo.sdk.api.PropertyKeyConst;
import com.chinamobile.tuxedo.sdk.api.TuxeFactory;
import java.util.Properties;
 
public class ConsumerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //  控制台创建的Group
        properties.put(PropertyKeyConst.GROUP_ID, "xxx");
        //  控制台创建的实例
        properties.put(PropertyKeyConst.INSTANCE_ID, "MQ_INST_xxx");
        // 启用鉴权认证
        properties.setProperty(PropertyKeyConst.AuthenticationRequired, "true");
        // 开启保存消息轨迹
        properties.put(PropertyKeyConst.MsgTraceSwitch, true);
        // AccessKey 身份验证,在移动云管理控制台创建
        properties.put(PropertyKeyConst.AccessKey, "XXX");
        // SecretKey 身份验证,在移动云管理控制台创建
        properties.put(PropertyKeyConst.SecretKey, "XXX");
        // 设置消费超时时间,单位毫秒
        properties.put(PropertyKeyConst.ConsumeTimeout, 30000);
        // 设置公网接入点
        properties.put(PropertyKeyConst.TuxeAddr, "xxx");
        //  集群订阅方式 (默认)
        //  properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
        //  广播订阅方式
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
        Consumer consumer = TuxeFactory.createConsumer(properties);
        /*consumer.subscribe("xxx", "TagA||TagB", new MessageListener() { //订阅多个 Tag
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });*/
        //订阅另外一个 Topic
        consumer.subscribe("xxx", // 控制台创建的topic
            "*", // 订阅全部 Tag
            new MessageListener() {
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });
        consumer.start();
        System.out.println("Consumer Started");
    }
}

喜欢 (0)
阿里云最新优惠活动,点击查看
腾讯云最新优惠活动,点击查看
腾讯云香港及海外免备案服务器优惠活动,点击查看
华为云服务器本周优惠活动,点击查看

文章评论已关闭!