引入消息队列可以方便地实现系统解耦、削峰填谷等作用。但是消息队列使用不当,可能会引起消息丢失,在一些消息敏感的业务场景下,这是不允许的。今天我们使用一些Java示例来聊一聊 RocketMQ 怎么做能确保消息不丢失。
RocketMQ 是阿里巴巴开源的分布式消息中间件,整体架构如下图:
RocketMQ 主要包括 Producer、Consumer 和 Broker,同时 Name Server 进行集群注册管理和保存元数据。
要想保证消息不丢失,需要从以下几个方面考虑:
代码如下:
- public void send() throws Exception {
- String message = "test producer";
- Message sendMessage = new Message("topic1", "tag1", message.getBytes());
- sendMessage.putUserProperty("name1","value1");
- SendResult sendResult = null;
-
-
- DefaultMQProducer producer = new DefaultMQProducer("testGroup");
- producer.setNamesrvAddr("localhost:9876");
- producer.setRetryTimesWhenSendFailed(3);
- try {
- sendResult = producer.send(sendMessage);
- } catch (Exception e) {
- e.printStackTrace();
- }
- if (sendResult != null) {
- System.out.println(sendResult.getSendStatus());
- }
- }
同步发送会返回 4 个状态码:
根据返回的状态码,可以做消息重试,这里设置的重试次数是 3。
消息重试时,消费端一定要做好幂等处理。
代码如下:
- public void sendAsync() throws Exception {
- String message = "test producer";
- Message sendMessage = new Message("topic1", "tag1", message.getBytes());
- sendMessage.putUserProperty("name1","value1");
-
-
- DefaultMQProducer producer = new DefaultMQProducer("testGroup");
- producer.setNamesrvAddr("localhost:9876");
- producer.setRetryTimesWhenSendFailed(3);
- producer.send(sendMessage, new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
-
- }
-
-
- @Override
- public void onException(Throwable e) {
- // TODO 可以在这里加入重试逻辑
- }
- });
- }
异步发送,可以重写回调函数,回调函数捕获到 Exception 时表示发送失败,这时可以进行重试,这里设置的重试次数是 3。
1)异步刷盘
默认。消息写入 CommitLog 时,并不会直接写入磁盘,而是先写入 PageCache 缓存后返回成功,然后用后台线程异步把消息刷入磁盘。异步刷盘提高了消息吞吐量,但是可能会有消息丢失的情况,比如断点导致机器停机,PageCache 中没来得及刷盘的消息就会丢失。
2)同步刷盘
消息写入内存后,立刻请求刷盘线程进行刷盘,如果消息未在约定的时间内(默认 5 s)刷盘成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到这个响应后,可以进行重试。同步刷盘策略保证了消息的可靠性,同时降低了吞吐量,增加了延迟。要开启同步刷盘,需要增加下面配置:
flushDiskType=SYNC_FLUSH
Broker 为了保证高可用,采用一主多从的方式部署。如下图:
消息发送到 master 节点后,slave 节点会从 master 拉取消息保持跟 master 的一致。这个过程默认是异步的,即 master 收到消息后,不等 slave 节点复制消息就直接给 Producer 返回成功。
这样会有一个问题,如果 slave 节点还没有完成消息复制,这时 master 宕机了,进行主备切换后就会有消息丢失。为了避免这个问题,可以采用 slave 节点同步复制消息,即等 slave 节点复制消息成功后再给 Producer 返回发送成功。只需要增加下面的配置:
brokerRole=SYNC_MASTER
改为同步复制后,消息复制流程如下:
Consumer 消费消息的代码如下:
- public void consume() throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
- consumer.setNamesrvAddr("localhost:9876");
- consumer.setMessageModel(MessageModel.CLUSTERING);
- consumer.subscribe("topic1", "tag1");
- consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
- try{
- System.out.printf("Receive New Messages: %s", msgs);
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }catch (Exception e){
- e.printStackTrace();
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- });
- consumer.start();
- }
如果 Consumer 消费成功,返回 CONSUME_SUCCESS,提交 offset 并从 Broker 拉取下一批消息。
Consumer 消费失败,这里有 3 种情况:
Broker 收到这个响应后,会把这条消息放入重试队列,重新发送给 Consumer。
注意:
其实重试 3 次都失败就可以说明代码有问题,这时 Consumer 可以把消息存入本地,给 Broker 返回CONSUME_SUCCESS 来结束重试。代码如下:
- int count = ((MessageExt) msgs).getReconsumeTimes();
-
- if (count > 2) {
-
- //TODO 把消息写入本地存储
-
- System.out.println("重试次数超过3次");
-
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-
- }
RocketMQ支持事务消息,整体流程如下图:
代码如下:
- public class ProducerTransactionListenerImpl implements TransactionListener {
-
-
- @Override
- public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
- /**
- * 这里执行本地事务,执行成功返回LocalTransactionState.COMMIT_MESSAGE,执行失败返回
- * LocalTransactionState.ROLLBACK_MESSAGE,如果返回LocalTransactionState.UNKNOW,
- * Broker会回来查询,所以需要记录事务执行状态
- */
- return LocalTransactionState.COMMIT_MESSAGE;
- }
-
-
- @Override
- public LocalTransactionState checkLocalTransaction(MessageExt msg) {
- /**
- * 这里查询事务执行状态,根据事务状态返回LocalTransactionState.COMMIT_MESSAGE或
- * LocalTransactionState.ROLLBACK_MESSAGE,如果没有查询到返回LocalTransactionState.UNKNOW,
- * Broker会再次查询,可以记录查询次数,超过次数后返回ROLLBACK_MESSAGE
- */
- return LocalTransactionState.UNKNOW;
- }
- }
我们知道,RocketMQ 核心的数据文件有 3 个:CommitLog、ConsumeQueue 和 Index。其中Index 文件就是一个索引文件,结构如下图:
查找消息时,首先根据消息 key 的 hashcode 计算出 Hash 槽的位置,然后读取 Hash 槽的值计算 Index 条目的位置,从Index 条目位置读取到消息在 CommitLog 文件中的 offset,从而查找到消息。
在 Producer 发送消息时,可以指定一个 key,代码如下:
- Message sendMessage = new Message("topic1", "tag1", message.getBytes());
-
- sendMessage.setKeys("weiyiid");
这样可以通过 RocketMQ 提供的命令或者管理控制台来查询消息是否发送成功。
如果对消息丢失零容忍,我们必须要考虑极端情况,比如整个 RocketMQ 集群挂了,这时 Producer 端发送消息一定会失败,可以考虑在 Producer 端做降级,把要发送的消息保存到本地数据库或磁盘,等 RocketMQ 恢复以后再把本地消息推送出去。
在一些特殊的业务场景,比如支付、银行核算等,需要确保消息不丢失,但是同时也要看到,消息不丢失的方案会大大降低 RocketMQ 的吞吐量,需要综合考虑。