- kafka的使用场景1. 大数据领域:网站行为分析,日志聚合,应用监控,流式数据处理,在线和离线数据分析等领域2. 数据集成:将消息导入Maxcompute,OSS,RDS,Hadoop,HBase等离线数据仓库3. 流计算集成:与StreamCompute,E-MapReduce,Spark,Storm等流计算引擎集成
- topic1. 发送到不存在的topic时,topic可以自动创建,允许永久删除topic2. num.partitions = 1 auto.create.topics.enable=true delete.topic.enable = true
- 分区副本1. 副本数比机器数多的话,会直接报错2. 不同的副本必须在不同的机器,防止一台机器挂了,消息丢失3. 只有leader可以为客户端提供读写的功能,follower节点只能做备份
- 消费者组1. 多个消费者组成消费者组,消费者组中的消费者消费一个topic中的消息,但是同组的不同消费者一定消费不同partition中的消息2. 消费者比分区多了,会有消费者消费不到消息3. 消费者比分区少了,一个消费者可以消费多个分区
- offset1. 记录消费的偏移量,保证消费消息不会重复,保证消费消息的连续性2. 偏移量文件,默认创建50个,groupId先进性hash,然后取模50,得到哪个就放在哪个文件中3. _consumer_offsets-44文件名
- springboot中的kafka1. 2. 3.
- 消息的幂等性
1:PID(Producer ID)2:sequence number3:不是全局有序的,只能保证一个分区的有序,一个会话的有序4:一个是生成的唯一id,一个是消息的序列,可以判断消费的消息得序列是否连续,如果小于上一个,就是重复消费, 如果差2个及以上,则为异常5:开启消息幂等性 prop.put("enable.idempotence",true);
- kafka的事务
1:什么时候需要事务 1:发送多条消息 2:发送消息到多个topic或者多个partition 3:消费以后发出消息consume-process-produce 从上游接收到一个消息之后,再把他发送到下游2:发送多个消息时,要么全部成功,要么全部失败3:producer.sendOffsetsTransaction();
- kafka生产者代码
- kafka消费端代码
- 事务的实现原理
1:2PC2:Transaction Coordinator3:事务日志:topic_transaction_state4:生产者事务ID:transaction.id5:先预先提交,等到所有的都可以提交了之后在一起提交6:服务端的一个协调者Coordinator角色,协调大家一起提交7:事务日志:记录所有的事务的状态,如果协调者挂了,再连上要继续处理事务的,会在事务日志中查看 事务的状态,提交或者回滚8:如果生产者挂了,要想重启之后继续处理事务,或者把事务丢到另一个kafka的broker上处理,所以生产者 要有一个唯一事务id来识别是哪个生产者的事务
- 事务相关的API
1:initTransactions()2:beginTransaction()3:commitTransaction()4:abortTransaction()5:sendOffsetsToTransaction()
- 事务的操作流程
1:生产者通过initTransactions API 向Coordinator注册事务ID2:Coordinator记录事务日志3:生产者把消息写入目标分区4:分区和Coordinator的交互,当事务完成以后,消息的状态应该是已提交,这样消费者才可以消费到
- kafka的特性
1:高吞吐,低延迟2:高伸缩性3:持久性,可靠性4:容错性5:高并发
- producer发送消息
- 发送消息时的拦截器
- 自己创建一个拦截器
- 自定义拦截器代码
- 生产者消息发往那个分区
- 发往指定分区
- 自定义分区器
- 默认分区器
- 当默认分区器没有key时
- 累加器
- 副本分配规则
AdminUtils.sacla--assignReplicasToBrokers1:副本因子不能大于broker的个数2:第一个分区(编号为0)的第一个副本放置位置是随机从brokerList选择的3:其他分区的第一个副本放置位置相对于第0个分区依次往后移(nextReplicaShift)
- 稀疏索引1. 2. 3.
- 时间索引文件
.index偏移量(offset)索引文件.timeindex时间戳(timestamp)索引文件log.message.timestamp.type=CreateTime/LogAppendTime可以设置是创建消息的时候就在时间戳索引文件中记录,还是在log文件中写入时在记录(落盘时)
- 索引检索过程
1:根据offset判断在哪个segment中2:在segment的indexfile中,根据offset找到消息的position3:根据position从log文件中比较,最终找到消息
- 副本选举
1:谁来主持选举:三个broker会在zookeeper的controller目录下写入,谁先写入谁就是broker controller2:谁可以参加选举:AR:所有的副本,ISR:是和leader保持同步的副本,OSR:是有异常的副本,被排除在外的副本 AP=ISR+OSE,只有ISR才能进行选举,如果ISR列表中没有,就只能在OSR中选举,此时成为不干净的选举, 因为OSR列表中的副本没有与leader节点保持同步,此时选举为leader会使消息丢失3:unclean.leader.election.enable 指定副本是否能够不在ISR中被选举为leader,4:如何选举:PacificA算法,会让ISR列表中的第一个副本变为leader,优先算法
- 主从如何保持同步
1:follower节点会向leader发送一个fetch请求,leader向follower发送数据后,需要更新follower的LEO2:follower接收到数据响应后,一次写入消息并且更新LEO3:Leader更新HW(ISR最小的LEO)
- _consumer_offsets存储结构
- rebalance分区重分配
1:从broker中找到一个coordinator。保持公平公正,是监督作用,不做具体方案2:让消费者全部加入到一个组内3:选出一个leader,做出来分区重新分配的方案4:把方案给coordinator,coordinator同意方案之后,会通知所有额消费者5:如果消费者数量变化了,或者分区数量变化了,会触发rebalance分区重新分配
- kafka为什么这么快
1:顺序读写:log文件不删除,顺序IO读写2:索引3:批量读写和文件压缩4:零拷贝
- 传统IO
- 零拷贝
本文转载自: https://blog.csdn.net/weixin_43924419/article/details/136722080
版权归原作者 莫得等待 所有, 如有侵权,请联系我们删除。
版权归原作者 莫得等待 所有, 如有侵权,请联系我们删除。