本文共 1904 字,大约阅读时间需要 6 分钟。
一、RabbitMQ如何保证消息不丢失?
这是面试时最喜欢问的问题,其实这个问题对所有MQ产品都有共性,解决方案也基本一致,但针对不同的产品会有不同的细节差异。基于这点,我们可以从RabbitMQ的角度来分析。
哪些环节会有丢消息的可能?
我们从通用的MQ场景来分析。下面是可能存在丢失消息的环节:
网络传输环节:消息在跨网络传输过程中可能丢失。
消息在磁盘存储过程中:消息首先写入操作系统的缓存页cache,再由操作系统异步写入硬盘,这中间可能存在丢失的风险。
生产者发送消息环节:如果生产者没有确认消息成功发送,消息可能丢失。
RabbitMQ的消息零丢失方案
有两种主要方式:
生产者的事务机制:通过channel.txSelect()开启事务,channel.txCommit()提交事务,channel.txRollback()回滚事务。这种方式会对channel产生阻塞,吞吐量会下降。
生产者确认机制:这种机制类似于RocketMQ的事务消息机制。RabbitMQ本身就支持生产者确认机制,具体可以参考前面的课程内容。
- RabbitMQ消息存盘不丢消息
- RabbitMQ主从消息同步时不丢消息
- RabbitMQ消费者不丢失消息
自动应答模式:消费者在完成业务处理后会自动发送应答。如果消费者的业务逻辑抛出异常,RabbitMQ会自动重试,这样不会丢失消息,但可能会导致消息重复消费。
手动应答模式:可以提高消息消费的可靠性。在代码中可以通过channel.basicAck(deliveryTag, false)手动确认消息接收。在SpringBoot集成案例中,可以通过配置文件中的spring.rabbitmq.listener.simple.acknowledge-mode属性来指定应答模式。NONE模式是不启动应答机制,这样效率更高,但也会有丢失消息的可能。
- RabbitMQ的自动重试功能
- 制定重试策略
- 业务上处理幂等性
在RabbitMQ中,解决这个问题相对简单。只需要将队列声明为持久化队列,或者使用新增的Quorum类型的队列即可。
这涉及到RabbitMQ的集群架构。普通集群模式下,消息是分散存储的,不会主动进行消息同步,这样会有丢失消息的可能。镜像集群模式下,数据会主动在集群各个节点之间同步,这样丢失消息的概率会降低。更进一步,使用Quorum类型的队列并结合Raft协议,可以更好地保证消息的主从同步。
RabbitMQ在消费消息时,可以指定是自动应答模式还是手动应答模式。
二、如何保证消息幂等?
当消费者处理业务逻辑时,如果抛出异常,默认情况下RabbitMQ会无限重试。这可以起到一定程度的幂等保证。
在SpringBoot集成RabbitMQ时,可以通过配置文件中的spring.rabbitmq.listener.simple.retry开头的一系列属性来制定重试策略。
为了确保幂等,可以为每个消息指定一个全局唯一的MessageID。在消费者端对MessageID进行幂等性判断。关键代码如下:
// 发送者指定ID字段Message message = MessageBuilder.withBody(message.getBytes()).setMessageId(UUID.randomUUID().toString()).build();rabbitTemplate.send(message);// 消费者获取MessageID,做幂等性判断@RabbitListener(queues = "fanout_email_queue")public void process(Message message) throws Exception { String messageId = message.getMessageProperties().getMessageId(); // 具体业务逻辑判断} 当然,实际工作中,最好将MessageID替换为有业务意义的字段,这样能更好地防止重复消费对业务数据的影响。
三、如何保证消息的顺序?
关于消息顺序问题,RocketMQ有更完善的解决方案。RabbitMQ本身并没有提供顺序消息的功能,因此我们需要模拟RocketMQ的顺序消息机制来设计消息消费方式。
在发送者端,可以保证一组有序的消息依次发送到同一个队列。消费者消费时,可以通过单独的消费者实例来消费这个队列上的消息。这种方式虽然能保证消息顺序,但会严重影响MQ的性能和吞吐量,因此需要谨慎使用。
发表评论
最新留言
关于作者