mq: rabbitmq, rocketmq, kafka
文章目录
1.RocketMQ
RocketMQ是如何最大限度的保证消息不丢失
- 生产阶段:消息在 Producer 发送端创建出来,经过网络传输发送到 Broker 存储端。
- 存储阶段:消息在 Broker 端存储,如果是主备或者多副本,消息会在这个阶段被复制到其他的节点或者副本上。
- 消费阶段:Consumer 消费端从 Broker存储端拉取消息,经过网络传输发送到 Consumer 消费端上,并通过重试来最大限度的保证消息的消费。
Producer:
- 默认情况下,可以通过同步的方式阻塞式的发送,check SendStatus,状态是OK,表示消息一定成功的投递到了Broker,状态超时或者失败,则会触发默认的2次重试。此方法的发送结果,可能Broker存储成功了,也可能没成功
- 采取事务消息的投递方式,并不能保证消息100%投递成功到了Broker,但是如果消息发送Ack失败的话,此消息会存储在CommitLog当中,但是对ConsumerQueue是不可见的。可以在日志中查看到这条异常的消息,严格意义上来讲,也并没有完全丢失
- RocketMQ支持 日志的索引,如果一条消息发送之后超时,也可以通过查询日志的API,来check是否在Broker存储成功
Broker
- 消息支持持久化到Commitlog里面,即使宕机后重启,未消费的消息也是可以加载出来的
- Broker自身支持同步刷盘、异步刷盘的策略,可以保证接收到的消息一定存储在本地的内存中
- Broker集群支持 1主N从的策略,支持同步复制和异步复制的方式,同步复制可以保证即使Master 磁盘崩溃,消息仍然不会丢失
cunmser
- Consumer自身维护一个持久化的offset(对应MessageQueue里面的min offset),标记已经成功消费或者已经成功发回到broker的消息下标
- 如果Consumer消费失败,那么它会把这个消息发回给Broker,发回成功后,再更新自己的offset
- 如果Consumer消费失败,发回给broker时,broker挂掉了,那么Consumer会定时重试这个操作
- 如果Consumer和broker一起挂了,消息也不会丢失,因为consumer 里面的offset是定时持久化的,重启之后,继续拉取offset之前的
2.Kafka
- Producer:生产者,负责创建消息,然后投递到 Kafka 集群中,投递时需要指定消息所属的 Topic,同时确定好发往哪个 Partition。
- Consumer:消费者,会根据它所订阅的 Topic 以及所属的消费组,决定从哪些 Partition 中拉取消息。
- Broker:消息服务器,可水平扩展,负责分区管理、消息的持久化、故障自动转移等。
- Zookeeper:负责集群的元数据管理等功能,比如集群中有哪些 broker 节点以及 Topic,每个 Topic 又有哪些 Partition 等。
2.1 消息传递语义剖析
- at least once: Producer 向 Broker 发送数据后,会进行 commit,如果 commit 成功,由于 Replica 副本机制的存在,则意味着消息不会丢失,但是 Producer 发送数据给 Broker 后,遇到网络问题而造成通信中断,那么 Producer 就无法准确判断该消息是否已经被提交(commit),这就可能造成 at least once 语义。
- at most once: 从 Consumer 角度来剖析, 我们知道 Offset 是由 Consumer 自己来维护的, 如果 Consumer 收到消息后更新 Offset, 这时 Consumer 异常 crash 掉, 那么新的 Consumer 接管后再次重启消费,就会造成 at most once 语义(消息会丢,但不重复)。
- 如果 Consumer 消费消息完成后, 再更新 Offset, 如果这时 Consumer crash 掉,那么新的 Consumer 接管后重新用这个 Offset 拉取消息, 这时就会造成 at least once 语义(消息不丢,但被多次重复处理)。
- exactly once: 0.11.0.0 版本之后, Producer 支持幂等传递选项,保证重新发送不会导致消息在日志出现重复。Broker 为 Producer 分配了一个ID,并通过每条消息的序列号进行去重。
默认 Kafka 提供 「at least once」语义的消息传递,允许用户通过在处理消息之前保存 Offset 的方式提供 「at most once」 语义。如果我们可以自己实现消费幂等,理想情况下这个系统的消息传递就是严格的「exactly once」, 也就是保证不丢失、且只会被精确的处理一次,但是这样是很难做到的。
2.2 Producer 端丢失场景剖析
导致 Producer 端消息没有发送成功有以下原因:
- 网络原因:由于网络抖动导致数据根本就没发送到 Broker 端。
- 数据原因:消息体太大超出 Broker 承受范围而导致 Broker 拒收消息。
解决问题:
1.request.required.acks:
- acks = 0:由于发送后就自认为发送成功,这时如果发生网络抖动, Producer 端并不会校验 ACK 自然也就丢了,且无法重试。
- acks = 1:消息发送 Leader Parition 接收成功就表示发送成功,这时只要 Leader Partition 不 Crash 掉,就可以保证 Leader Partition 不丢数据,但是如果 Leader Partition 异常 Crash 掉了, Follower Partition 还未同步完数据且没有 ACK,这时就会丢数据。
- acks = -1 或者 all: 消息发送需要等待 ISR 中 Leader Partition 和 所有的 Follower Partition 都确认收到消息才算发送成功,
可靠性最高
, 但也不能保证不丢数据,比如当 ISR 中只剩下 Leader Partition 了, 这样就变成 acks = 1 的情况了。生成者消息确认机制
2.弃用调用发后即焚的方式,使用带回调通知函数的方法进行发送消息: Producer.send(msg, callback)
生产者消息同步投递
3.重试次数 retries: Producer 端发送消息的重试次数, 设置为大于0的数
4.重试时间 retry.backoff.ms: 推荐设置为300ms。
2.3 Broker 端丢失场景剖析
Kafka Broker 集群接收到数据后会将数据进行持久化存储到磁盘: 同步刷盘和异步刷盘
kafka 通过「多 Partition (分区)多 Replica(副本)机制」已经可以
最大限度
的保证数据不丢失,如果数据已经写入 PageCache 中但是还没来得及刷写到磁盘,此时如果所在 Broker 突然宕机挂掉或者停电,极端情况还是会造成数据丢失。
多分区多副本:设置参数:
unclean.leader.election.enable:false
replication.factor >=3
min.insync.replicas > 1
replication.factor = min.insync.replicas +1
2.4 Consumer 端丢失场景剖析
拉取数据、业务逻辑处理、提交消费 Offset 位移信息。
enable.auto.commit = false, 采用手动提交位移的方式。设置为手动提交不断去尝试
对于消费消息重复的情况,业务自己保证幂等性, 保证只成功消费一次即可。
3.如何保证RabbitMQ全链路数据100%不丢失
3.1 生产端可靠性投递
- 事务消息机制:事务消息机制由于会严重降低性能,使用confirm消息确认机制
- confirm消息确认机制: 生产端投递的消息一旦投递到RabbitMQ后,RabbitMQ就会发送一个确认消息给生产端,让生产端知道我已经收到消息了,否则这条消息就可能已经丢失了,需要生产端重新发送消息了。
channel.confirmSelect();// 开启发送方确认模式
然后异步监听确认和未确认的消息:
channel.addConfirmListener(newConfirmListener(){//消息正确到达broker@OverridepublicvoidhandleAck(long deliveryTag,boolean multiple)throwsIOException{System.out.println("已收到消息");//做一些其他处理}//RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息@OverridepublicvoidhandleNack(long deliveryTag,boolean multiple)throwsIOException{System.out.println("未确认消息,标识:"+ deliveryTag);//做一些其他处理,比如消息重发等}});
这样就可以让生产端感知到消息是否投递到RabbitMQ中了
- 消息持久化
message消息到达RabbitMQ后先是到exchange交换机中,然后路由给queue队列,最后发送给消费端。
所有需要给exchange、queue和message都进行持久化:
//第三个参数true表示这个exchange持久化
channel.exchangeDeclare(EXCHANGE_NAME,"direct",true);//第二个参数true表示这个queue持久化
channel.queueDeclare(QUEUE_NAME,true,false,false,null);//第三个参数MessageProperties.PERSISTENT_TEXT_PLAIN表示这条消息持久化
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
如果RabbitMQ收到消息后挂了,重启后会自行恢复消息。
- 消息入库
消息保存到数据库中
status=0: 表示生产端将消息发送给了RabbitMQ但还没收到确认。
status=1: 表示RabbitMQ已收到消息。
生产端这边开一个定时器,定时检索消息表,将status=0并且超过固定时间后还没收到确认的消息取出重发(第二种情况下这里会造成消息重复,消费者端要做幂等性)。
3.2 消费端消息不丢失
- 在RabbitMQ将消息发出后,消费端还没接收到消息之前,发生网络故障,消费端与RabbitMQ断开连接,此时消息会丢失;
- 在RabbitMQ将消息发出后,消费端还没接收到消息之前,消费端挂了,此时消息会丢失;
- 消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失。
自动ack机制改为手动ack机制。
DeliverCallback deliverCallback =(consumerTag, delivery)->{try{//接收到消息,做处理//手动确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}catch(Exception e){//出错处理,这里可以让消息重回队列重新发送或直接丢弃消息}};//第二个参数autoAck设为false表示关闭自动确认机制,需手动确认
channel.basicConsume(QUEUE_NAME,false, deliverCallback, consumerTag ->{});
autoAck参数置为false, 那么RabbitMQ服务端的队列分为两部分: 1.等待投递给消费端的消息 2.已经投递给消费端。如果RabbitMQ一直没有收到消费端的确认信号, RabbitMQ会安排该消息重新进入队列(放在队列头部)等待投递给下一个消费者。
版权归原作者 959y 所有, 如有侵权,请联系我们删除。