丢失消息有 3 种不同的情况,针对每一种情况有不同的解决方案。
- 生产者丢失消息的情况
- 消费者丢失消息的情况
- Kafka 弄丢了消息
生产者丢失消息的情况
生产者(
Producer
) 调用
send
方法发送消息之后,消息可能因为网络问题并没有发送过去。所以,我们不能默认在调用
send()
方法发送消息之后消息消息发送成功了。
为了确定消息是发送成功,我们要判断消息发送的结果。
但是,要注意的是
Producer
使用
send()
方法发送消息实际上是异步的操作,我们可以通过
get()
方法获取调用结果,但是这样也让它变为了同步操作,示例代码如下:
SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();
if (sendResult.getRecordMetadata() != null) {
logger.info("生产者成功发送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendRe
sult.getProducerRecord().value().toString());
}
但是一般不推荐这么做!可以采用为其添加回调函数的形式,示例代码如下:
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));
如果消息发送失败的话,我们检查失败的原因之后重新发送即可!
另外,这里推荐为
Producer
的
retries
(重试次数)设置一个比较合理的值,一般是
3
,但是为了保证消息不丢失的话一般会设置比较大一点。设置完成之后,当出现网络问题之后能够自动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太小的话重试的效果就不明显了,网络波动一次你 3 次一下子就重试完了
消费者丢失消息的情况
我们知道消息在被追加到
Partition
(分区)的时候都会分配一个特定的偏移量(
offset
)。
offset
表示
Consumer
当前消费到的
Partition
(分区)的所在的位置。Kafka 通过偏移量(
offset
)可以保证消息在分区内的顺序性。
当消费者拉取到了分区的某个消息之后,消费者会自动提交了
offset
。自动提交的话会有一个问题,试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是
offset
却被自动提交了。
这种情况的解决办法也比较粗暴,我们手动关闭自动提交
offset
,每次在真正消费完消息之后之后再自己手动提交
offset
。但是,细心的朋友一定会发现,这样会带来消息被重新消费的问题。比如你刚刚消费完消息之后,还没提交
offset
,结果自己挂掉了,那么这个消息理论上就会被消费两次。
Kafka 弄丢了消息
我们知道
Kafka
为
Partition
引入了多副本(
Replica
)机制。
Partition
中的多个副本之间会有一个叫做
Leader
的家伙,其他副本称为
Follower
。我们发送的消息会被发送到
Leader
副本,然后
Follower
副本才能从
Leader
副本中拉取消息进行同步。生产者和消费者只与
Leader
副本交互。你可以理解为其他副本只是
Leader
副本的拷贝,它们的存在只是为了保证消息存储的安全性。
试想一种情况:假如
Leader
副本所在的
Broker
突然挂掉,那么就要从
Fllower
副本重新选出一个
Leader
,但是
Leader
的数据还有一些没有被
Follower
副本的同步的话,就会造成消息丢失。
设置 acks = all
解决办法就是我们设置 acks = all。acks 是 Kafka 生产者(Producer) 很重要的一个参数。
acks 的默认值即为1,代表我们的消息被leader副本接收之后就算被成功发送。当我们配置 acks = all 表示只有所有 ISR 列表的副本全部收到消息时,生产者才会接收到来自服务器的响应. 这种模式是最高级别的,也是最安全的,可以确保不止一个 Broker 接收到了消息. 该模式的延迟会很高.
设置 replication.factor >= 3
为了保证
Leader
副本能有
Follower
副本能同步消息,我们一般会为
Topic
设置
**replication.factor >= 3**
。这样就可以保证每个
Partition
至少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。
设置 min.insync.replicas > 1
一般情况下我们还需要设置
**min.insync.replicas> 1**
,这样配置代表消息至少要被写入到 2 个副本才算是被成功发送。
**min.insync.replicas**
的默认值为 1 ,在实际生产中应尽量避免默认值 1。
但是,为了保证整个 Kafka 服务的高可用性,你需要确保
**replication.factor > min.insync.replicas**
。为什么呢?设想一下假如两者相等的话,只要是有一个副本挂掉,整个分区就无法正常工作了。这明显违反高可用性!一般推荐设置成
**replication.factor = min.insync.replicas + 1**
。
设置 unclean.leader.election.enable = false
Kafka 0.11.0.0 版本开始 unclean.leader.election.enable 参数的默认值由原来的 true 改为 false
我们最开始也说了我们发送的消息会被发送到
Leader
副本,然后
Follower
副本才能从
Leader
副本中拉取消息进行同步。多个
Follower
副本之间的消息同步情况不一样,当我们配置了
**unclean.leader.election.enable = false**
的话,当
Leader
副本发生故障时就不会从
Follower
副本中和
Leader
同步程度达不到要求的副本中选择出
Leader
,这样降低了消息丢失的可能性。
版权归原作者 豆奶快攻 所有, 如有侵权,请联系我们删除。