阿里云消息队列使用问题汇总

使用多可用区架构,为云主机提供了增强的可用性和持久性
正常情况下一个业务系统尽可能用一个 Topic,消息子类型用 tags 来标识,tags 可以由业务系统自由设置。只有发送消息设置了 tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤。
    FCMessage msg = new FCMessage (); // 初始化消息对象
    msg.setTags("TagA"); // 设置消息TAG
   
每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过 topic,key 来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。
    FCMessage msg = new FCMessage (); // 初始化消息对象
    msg.setTags("TagA"); // 设置消息TAG
    String orderId = "20034568923546"; // 订单Id
    msg.setKeys(orderId);
   
消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。
 
end消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在 sendResult 里定义。
对于消息不可丢失应用,务必要有消息重发机制。例如如果消息发送失败,存储到数据库,能有定时程序尝试重发,或者人工触发重发。
 
 
如何判断成功发送消息
 
客户端 Producer 调用 send 消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在 sendResult 里定义。
 
返回状态       状态释义
SEND_OK      消息发送成功
FLUSH_DISK_TIMEOUT 消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时MASTER服务器宕机,消息才会丢失
FLUSH_SLAVE_TIMEOUT     消息发送成功,但是服务器同步到 Slave 时超时,消息已经进入服务器队列,只有此时 SLAVE 服务器宕机,消息才会丢失
SLAVE_NOT_AVAILABLE     消息发送成功,但是此时 SLAVE 不可用,消息已经进入服务器队列,只有此时 SLAVE 服务器宕机,消息才会丢失
目前PAMQ采用两主两从共 4 台 Broker 机器,针对大部分业务系统来讲,只要 PAMQ 没有抛出异常,可以默认消息已成功发送。建议业务系统针对发送消息后所有非 SEND_OK 状态的消息,打印 Warning 日志,并在运营端设置对应的监控规则,及时发邮件提醒。
 
如何判断成功消费消息
客户端 Consumer 在 FCMessageListener 中实现 pushMessage(),遍历并处理消息后会返回给 PAMQ 端消费的状态,状态只有消费成功或者消费失败两种状态。
 
消费状态       状态释义
CONSUME_OK      消费成功
CONSUME_FAIL    消费失败
 
 
消费端如何实现定时消费?
 
在某些业务场景下,消费端希望在业务低峰 (例如半夜 12 点后) 时开始从 PAMQ 拉取消息,在业务高峰期前关闭掉消费功能,以此来降低系统负载。这种类似场景涉及到如何在不停业务服务的场景下,多次的开启和关闭 PAMQ 消费服务。
 
PAMQ 的消费者本身是可以多实例初始化的,每个实例的消费者服务开启和关闭也是独立的,所以可以很良好的支持定时消费的场景。
 
如果业务系统有类似的需求,我们建议:
 
业务系统本身需要添加功能开关,支持配置化的方式来开启或关闭消费服务。其实现本身比较简单,就是调用 PAMQ 消费者的 start || shutdown 方法。必要时需要对方法添加上层逻辑封装,来实现定制化的需求。
 
调用完 consumer 对象的 shutdown 方法后,不要立即初始化下一个 consumer 对象并启用服务,建议至少延迟几秒种,等相关的资源回收完毕。
 
完善业务端开启或关闭消息服务的日志,方便后续运维处理问题。
 
 
 
生产端如何实现定时发送消息?
 
生产者对应在客户端是单例实现的,相关资源只能初始化一次,多次初始化会报错,即便是在调用了 Producer 的 shutdown() 方法之后。发送消息的定时控制,相对于 consumer 来说实现比较简单,在需要的时候直接调用 Producer 的 send() 方法发送消息即可。
 
 
 
消费端没有取到消息,如何定位问题?
 
请不正常情况下生产者发送消息到 PAMQ,消息被投递到消费端的延时应该在毫秒级。如果消费端迟迟没有收到消息,建议采用下面的步骤来排查问题:
 
获得消息的消息 ID 或者 KEY,去阿里云消息队列的消息查询模块,根据自己的消费者 ID 找到其对应的消费状态,常见见的投递状态有:
 
SUBSCRIBED_AND_CONSUMED 订阅了,而且消费了(Offset越过了)
SUBSCRIBED_BUT_FILTERD  订阅了,但是被过滤掉了
the consumer group[***] not online   订阅了,但是消费者未启动
SUBSCRIBED_AND_NOT_CONSUME_YET  订阅了,但是没有消费(Offset小)
UNKNOW_EXCEPTION 未知异常
注: ``SUBSCRIBED_AND_CONSUMED` 状态,表示消息已被正常消费掉,如果此时有异常,需要业务系统检查日志,分析看看是否因为解析消息时有异常,导致消息未被正确处理。
 
SUBSCRIBED_BUT_FILTERD状态,需要业务系统检查初始化 Consumer 对象时传入的 TagList 是否和生产者定义的 tag 匹配。
 
SUBSCRIBED_AND_NOT_CONSUME_YET 状态,可能的原因是由于消息有积压,消息还未被取走,可以稍等几十秒再去查询一下状态。
 
the consumer group[***] not online 状态,表示对应 CID 的消费者还未正确启动。业务系统需要检查消费者是否已启动,如果已启动请检查是否启动时有报错。有可能相关的配置项配置错误,导致 consumer 启动时校验失败。
 
UNKNOW_EXCEPTION 表示消息平台有异常,请联系阿里云消息队列开发。
 

如何避免接受的消息是乱码?
 
对于生产者来说,建议将消息 body 转为 byte 数组时显示指定为 UTF-8 编码。对于消费者来说,建议在接收到消息后将 byte 数组转为 String 时指定 UTF-8 编码。这样可以避免因为消息 body 中有中文或者特殊字符,消费端解析时乱码,进而造成消息解析失败。

消息标签(TAG)怎么使用?
 
在消息中间件实际的使用场景中,消费者只需要消息队列中的部分消息,其余消息希望默认不被接收,直接丢弃掉。针对类似这种场景,PAMQ 提供通过合理使用消息标签 (Tag) 的方式来实现消费端灵活过滤队列中消息的功能,实现方式如下:
 
生产者和消费者双方约定消息标签具体的设置值及其代表的含义。
 
生产者在发送消息时,组装消息对象的时候,需要给对应消息设置正确的消息标签。
 
    FCMessage msg = new FCMessage();
    //设置过滤标签---大小写敏感
    msg.setTag("SystemTag");
   
消费者在组装消费者对象时,需要正确设置消息过滤的过滤器
    FCMessageFilter  FCFilter = new FCMessageFilter ();
    List<String> list = new ArrayList<String>();
    FCFilter.setTags(list);
    // 需要注意的是同一CID下的多个应用实例需要设置为同样的tag列表来过滤消息,以保证消息不会被过滤取走但未被业务系统处理list.add("SystemTag");