0


【Kafka】Kafka架构设计之组件详解

Meta(元信息)

broker :server

topic:queue组,partition:queue,默认1:1,可以1:多,每个partition对应一个磁盘中的文件

为什么要设计topic和partition,1:多的关系?

  • kafka的设计上broker会存在消息积压,最终msg会落地到磁盘文件中持久化,大数据量的情况下要考虑分布式存储,所以要把topic拆成几个partition,存在放在不同的物理机上实现分布式存储
  • 指定不同的consumer消费不同的partition,提升消费并行度,不需要考虑单个queue并发情况下的同步(当然一个consumer也可以消费多个partition)

创建一个名字为“test”的Topic,这个topic只有一个partition,并且备份因子也设置为1:

bin/kafka‐topics.sh ‐‐create ‐‐zookeeper 192.168.65.60:2181 ‐‐replication‐factor 1 ‐‐partitions 1 ‐‐topic test

这个topic的元信息放在/broker/topic里面,真正queue里的信息放在kafka的broker里面

msg存在kafka的磁盘文件中,默认保存一周

创建主题:

1 bin/kafka‐topics.sh ‐‐create ‐‐zookeeper 192.168.65.60:2181 ‐‐replication‐factor 1 ‐‐partitions 1 ‐‐topic test

元信息放zookeeper, 真正消息队列中的数据放broker

kafka默认只消费消费端启动后所生产的消息,也可以通过修改启动参数【–from-beginning】来使消费端消费存量的消息(对没有指定group的consumer而言)

Topic(Rule):逻辑概念(可以理解成一种消息转发规则),Partition:物理概念

–replication-factor设置了每个partition的总副本数(包含leader),随机选一个Replicas作为这个Partition的leader

Topic: my-replicated-topic    Partition: 0    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
Topic: my-replicated-topic    Partition: 1    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2

副本数量(Replicas)不能大于节点数量(broker)

[2022-02-11 11:10:58,505] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 5 larger than available brokers: 3.
 (kafka.admin.TopicCommand$)

例如Partition=2,Replicas=3:每个broker节点上的有2个存放某个topic消息的文件,例如topic=my-replicated-topic,文件名分别是my-replicated-topic-0,my-replicated-topic-1

所以我们可以总结:Partition决定了每个broker节点里的某个topic消息文件的数量,比如Partition=10,topic=my-replicated-topic,文件名就是my-replicated-topic-0 => my-replicated-topic-10。Replicas决定了哪些broker节点上可以有这些Partition分片。

所以只要我们把Replicas的num配成和broker的num一致,就可以实现每个broker上分布所有partition分片

  • Kafka只有被标记为Leader的broker节点中的partition可以写入数据,其他follow节点不行
  • Isr:已数据同步的副本
  • 细节:不同Partition的Leader副本会尽可能选择不一样的broker节点,比如分区0和分区1,保证了分区1服务挂了以后,相同topic的数据还可以往分区0中写数据,也起了容灾作用
  • 细节:kafka的Leader是到Partiton这个级别的,没有到broker这种节点的级别,要和其他的分布式组件(如zk、nacos等)区别开。也就是kafka的broker没有Leader、Follow这些关系

Producer

什么时候用同步发送:发送已是业务逻辑的最后一步

什么时候用同步发送:发送已是业务逻辑的最后一步

什么时候用异步发送:发送完后还有其他业务逻辑

生产者主动向服务端推送batch块

Consumer

消费者通过poll长轮询主动从服务端轮询msg(cur_offset_idx => end_offset_idx) (activemq,rabbitmq是服务端主动push到consumer)

问题1:如果consumer配置的auto_commit = true,interval = 1000ms,但是consumer具体的业务消费过程比较慢,5000ms才能消费完,如何保证消息的不丢失?

问题2:如果consumer配置的auto_commit = true,interval = 1000ms,但是consumer具体的业务消费过程比较快,100ms就消费完,如何保证消息的不重复消费?

解决问题1和问题2:一般业务中用手动提交(auto_commit = false)

// 手动同步提交offset,当前线程会阻塞直到offset提交成功
// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
//consumer.commitSync();
当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费
latest(默认) :只消费自己启动之后发送到主题的消息(根据时间戳判断,从自己能感知到的offset的下一条开始)
earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)

问题3:如果consumer设置auto_commit = false,比如一次poll了5条消息,处理完后再去commit。如果处理到第2条,服务挂了,下次服务启动又会重复消费这2条消息,如何避免?

解决问题3:通过在poll和业务处理逻辑之间加一层幂等性处理机制,例如redis,来保证poll到的已经消费过的数据不再进入业务处理逻辑

解决问题3:也可以处理一条commit一条来解决,但这种方式性能低,而且要保证业务处理的原子性

问题4:

ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG
如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,
会将其踢出消费组,将分区分配给别的consumer消费
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

解决问题4:

1.扩大ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG(不推荐)

2.检查消费逻辑的性能。

3.减少一次poll的msg数量。

一个新的Group会从Broker-Consumer-Offset的log中的最后一个offset的下一位,开始消费,可以通过修改AUTO_OFFSET_RESET_CONFIG = "earliest"来重定位offset

业务场景用:AUTO_OFFSET_RESET_CONFIG = “earliest”

大数据场景用:AUTO_OFFSET_RESET_CONFIG = “latest”

问题5:如何控制消息的幂等性?

解决问题5:通过幂等性来控制重复消费,而非通过手动提交/自动提交来控制。

  • 1、如果consumer端数据落地时有唯一标识,例如订单(order)有一个唯一索引订单号:order_code,那么就可以由数据库来控制。
  • 2、用redis:setnx类似于分布式锁的实现方式来实现幂等性,也就是consumer消费到msg先丢redis,再从redis中取。

Broker

消费的重分配:(consumer的rebalance机制)

服务端broker多久感知不到一个consumer心跳就认为他故障了,会将其踢出消费组,
对应的Partition也会被重新分配给其他consumer,默认是10秒
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

consumer给broker发送心跳的间隔时间,broker接收到心跳。
如果此时有rebalance发生会通过心跳响应将rebalance方案下发给相关的consumer,这个时间可以稍微短一点
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);

创建总控制器Controller的过程:

1.所有Broker向zk发起create /controller的命令,先创建成功的就是总控制器Controller

2.所有Broker会watch监听这个节点,如果发生变化(EPHEMERAL_NODE)消失,也就是Broker崩溃,则重新发起选举


本文转载自: https://blog.csdn.net/haohaoxuexiyai/article/details/122919638
版权归原作者 Cry丶 所有, 如有侵权,请联系我们删除。

“【Kafka】Kafka架构设计之组件详解”的评论:

还没有评论