1、kafka是什么,应用场景
Kafka 是一个分布式流式处理平台
三个关键功能:
消息队列:发布和订阅消息流,这个功能类似于消息队列,这也是 Kafka 也被归类为消息队列的原因。
容错的持久方式存储记录消息流:Kafka 会把消息持久化到磁盘,有效避免了消息丢失的风险。
流式处理平台: 在消息发布的时候进行处理,Kafka 提供了一个完整的流式处理类库。
两大应用场景
消息队列:建立实时流数据管道,以可靠地在系统或应用程序之间获取数据。
数据处理: 构建实时的流数据处理程序来转换或处理数据流。
2、与其他消息队列的优势
极致的性能:基于 Scala 和 Java 语言开发,设计中大量使用了批量处理和异步的思想,最高可以每秒处理千万级别的消息。
生态系统兼容性无可匹敌:Kafka 与周边生态系统的兼容性是最好的没有之一,尤其在大数据和流计算领域。
3、队列模型和消息模型
队列模型就是数据结构中的线性队列
kafka的消息模型是发布-订阅者模式,一条消息可以被多个消费者消费
4、核心概念
producer、consumer、topic、partition分区、broker代理,相当于集群中的一个节点
5、多副本的好处
一个partition可以有多个副本,且每个副本可以分布在不同的broker上,多副本中必须有一个leader,其他的都叫follower,消息会发送到leader,其他follower会同步leader的数据,生产者和消费者只会连接leader,follower只是为了提高存储安全性
好处:
分布在不同的broker上,能提供比较好的并发能力,负载均衡
极大地提高了消息存储的安全性, 提高了容灾能力
注意:kafka的多副本机制与其他的分布式框架不一样,虽然都是主从模式,但kafka的从节点是不对外提供服务的,只是备份数据,然后在主节点挂了之后,可以从从节点中重新选出一个主节点。
6、zookeeper和kafka
zk在kafka中的作用
(1)注册broker,在 Zookeeper 上会有一个专门用来进行 Broker 服务器列表记录的节点,记录每个broker的ip端口等
(2)注册topic,同一个Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护
(3)负载均衡,partition分布在不同的broker上,提升负载均衡能力
zk还是很笨重的,在2.8版本之后引入了raft和kraft协议替代zk
7、如何保证消息的顺序消费、消息丢失和重复消费
(1)保证消息有序则可以顺序消费,kafka只能保证partition内部消息是有序的
1 个 Topic 只对应一个 Partition即可以解决问题,但是违背了kafka的设计初衷,所以可以通过指定消息的partition来实现
(2)避免消息丢失
生产者确保消息发过去了
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));
还可以设置重试次数
消费者消费分两步:拉取分区消息,提交offset,提交后消费者挂了,那其实消息没有被真正消费,简单粗暴就是消费完后手动提交offset。
但也会有新问题:消费者在消费完后未提交offset就挂了,那消息下次还会被消费
kafka本身消息丢失:
分区的leader副本挂了,重新选举follower之前,消息都丢失了
解决办法:
设置 acks = all,这是生产者的参数,默认为 1,代表消息被 leader 副本接收之后就算被成功发送。当配置 acks = all 表示只有所有 ISR 列表的副本全部收到消息时,生产者才会接收到来自服务器的响应
设置 replication.factor >= 3,保证有三个以上follower副本存在
设置 min.insync.replicas > 1,保证消息被写入两个以上副本才算成功
设置 unclean.leader.election.enable = false,保证leader挂了,重新选leader的时候,不会从那些同步程度达不到要求的follower的副本中选举
(3)保证消息不重复消费
原因:
服务端侧已经消费的数据没有成功提交 offset(根本原因)。
Kafka 侧 由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死,触发了分区 rebalance。
解决方案:
消费消息服务做幂等校验
将 enable.auto.commit 参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。
8、Kafka 重试机制
(1)消费失败
固定时间间隔后重试,达到最大重试次数后,跳过该消息
默认10次,时间间隔为0
自定义重试次数以及时间间隔,只需要在 DefaultErrorHandler 初始化的时候传入自定义的 FixedBackOff 即可。重新实现一个 KafkaListenerContainerFactory ,调用 setCommonErrorHandler 设置新的自定义的错误处理器就可以实现。
@Bean
public KafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
// 自定义重试时间间隔以及次数
FixedBackOff fixedBackOff = new FixedBackOff(1000, 5);
factory.setCommonErrorHandler(new DefaultErrorHandler(fixedBackOff));
factory.setConsumerFactory(consumerFactory);
return factory;
}
(2)重试失败后告警:
自定义DelErrorHandler
public class DelErrorHandler extends DefaultErrorHandler {
public DelErrorHandler(FixedBackOff backOff) {
super(null,backOff);
}
@Override
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
super.handleRemaining(thrownException, records, consumer, container);
log.info("重试多次失败");
// 自定义操作
}
}
(3)重试失败后的数据如何再次处理
失败后,消息放入死信队列,然后既可以用 @DltHandler 处理,也可以使用 @KafkaListener 重新消费。
// 重试 5 次,重试间隔 100 毫秒,最大间隔 1 秒
@RetryableTopic(
attempts = "5",
backoff = @Backoff(delay = 100, maxDelay = 1000)
)
@KafkaListener(topics = {KafkaConst.TEST_TOPIC}, groupId = "apple")
private void customer(String message) {
log.info("kafka customer:{}", message);
Integer n = Integer.parseInt(message);
if (n % 5 == 0) {
throw new RuntimeException();
}
System.out.println(n);
}
9、使用
生产者:
Producer<String, String> producer = kafkaCommonClientService.getProducerString();
producer.send(new ProducerRecord<String, String>(topicName, key, jsonValue))
消费者就用@KafkaListener注解标记一个方法就行了
10、kafka为什么有高吞吐量
(1)分布式架构:Kafka 是一个分布式的消息传递系统,消息被分布在多个 Broker 节点上的多个分区中。每个分区可以并行地进行读写操作,实现了消息的并行处理,从而提高了系统的吞吐量。
(2)零拷贝机制:Kafka 使用了零拷贝机制来提高数据的传输效率。当消息从生产者传输到 Kafka 时,避免了数据的多次拷贝操作,减少了不必要的数据复制开销,提高了数据传输的效率。
(3)批量处理:Kafka 支持批量处理消息。生产者可以将多条消息打包成一个批次进行发送,消费者可以一次性获取多条消息进行处理。通过批量处理,可以减少网络传输的次数和消费者的处理开销,提高了吞吐量。
(4)高效的磁盘存储:Kafka 使用了顺序写和零拷贝技术来优化磁盘的写入操作。消息在写入磁盘时,会以顺序的方式进行写入,减少了磁盘的随机写操作,提高了写入性能。同时,采用零拷贝机制避免了不必要的数据复制操作,进一步提高了写入效率。
(5)基于文件的存储结构:Kafka 的消息存储采用了基于文件的存储结构,每个分区的消息都以文件的形式进行存储。这种存储结构使得消息的读取和写入操作都可以通过文件的定位和偏移量来进行,提高了数据的读写效率。
(6)数据压缩:Kafka 支持对消息进行压缩,减小消息在网络传输和磁盘存储时的数据量。压缩后的消息可以减少网络带宽的使用和磁盘存储的空间占用,提高了系统的吞吐量。
11、生产者和消费者的策略
(1)生产策略
ProducerRecord是发送给broker的键值对,封装了基础数据信息,简称PR,数据结构大致为:
Topic(名字)
PartitionID(可选)
Key(可选)
Value
如果指定了Partition ID的话,那么PR就会被发送到指定的Partition里。
如果没有指定Partition ID,但是指定了Key,那么PR就会按照hash(key)发送到相对应的Partition里
如果没有指定Partition ID,也没有指定Key,PR就会使用默认的round-robin轮训发送到每一个Partition里(消费者消费partition分区默认是range模式)
如果同时指定了Partition ID与Key的话,PR只会发送到指定的Partition(这时候的Key不起作用,代码逻辑决定)
(2)消费策略
消费者是采用Pull拉取方式从broker的partition获取数据,那为什么是pull模式而不是push呢?pull模式可以根据消费者的消费能力来进行自己调整,不同的消费者性能不一样。如果broker没有数据的话,消费者可以配置timeout的时间,进行阻塞等待一段时间后再返回。但如果是broker主动Push,push的优点是可以快速的处理消息,但是容易对消费者处理不过来,造成消息的堆积和延迟。
多个消费者组成一个消费者组,一个topic可以对应多个消费者,但到底消费者会消费哪个partition的消息呢?
a.轮询
给消费者按字典顺序排序,一个一个轮询的分配到每个消费者
10个partition对应3个消费者,那么这3个消费者的消费策略就是
Consumer1:1 4 7 10
Consumer2:2 5 8
Consumer3:3 6 9
由于最后多了一个partition,所以按顺序应该分配给Consumer1
b.range范围
跟轮询类似,只不过是提前计算出每个Consumer平均可以分配到多少个partition,然后一个区间内都给一个Consumer
如上述例子,每个Consumer应该平均分配到三个,但会有一个多余,于是给前余数个Consumer每个再多分一个,那1-4给第一个Consumer,5-7给第二个Consumer,8-10给第三个Consumer
轮询和range都是以平均分配为原则的,但同一个消费者组中的多个消费者并不是都订阅一样的topic,每个topic也不会有同样的partition数,所以在某些场景下,其实是分配不均匀的。
例如:
假设消费组内有3个消费者(C0、C1 和 C2),它们共订阅了3个主题(t0、t1、t2),这3个主题分别有1、2、3个分区,即整个消费组订阅了 t0p0、t1p0、t1p1、t2p0、t2p1、t2p2 这6个分区。具体而言,消费者 C0 订阅的是主题 t0,消费者 C1 订阅的是主题 t0 和 t1,消费者 C2 订阅的是主题 t0、t1 和 t2,那么按照轮询策略最终的分配结果为
C0:t0p0
C1:t1p0
C2:t1p1、t2p0、t2p1、t2p2
这是不均匀的。于是有了第三种消费策略:sticky(黏性)
c.sticky
两个目的:
目的1:分区的分配要尽可能均匀。
目的2:分区的分配尽可能与上次分配的保持相同。
目的1优先于目的2
上面的例子使用sticky策略后的分配结果是:
C0:t0p0
C1:t1p0、t1p1
C2:t2p0、t2p1、t2p2
首先要保证消费者订阅了对应的topic才能分配,然后在同一个消费组中尽量平均。
一旦某个消费者挂了,需要把partition分配到其他的Consumer上,那也不会重新分配,因为有目的2做支撑,只会分配需要重分的partition给剩下的Consumer,并且尽量保证平均
d.自定义策略
比如因为网络等其他原因,指定某个partition就要被分配给某个Consumer,那就自己实现分配策略
实现org.apache.kafka.clients.consumer.internals.PartitionAssignor接口即可
版权归原作者 weston666 所有, 如有侵权,请联系我们删除。