- 生产者通过
producerRecord
对象封装消息主题、消息的value(内容)、timestamp(时间戳)等 - 生产者通过
send()
方法发送消息,send()方法会经过如下几步 1. 首先将消息交给拦截器(Interceptor)
处理, 拦截器对生产者而言,对所有消息都是生效的,拦截器也支持链式编程(责任器链)的效果,拦截器一般将一些通用的功能加进来,通常在消息发送前,producer回调逻辑前对消息做一些定制化需求,消息头部添加消息的属性等 2. 接下来交给序列化器(Serializer)
,Key的序列化器和value的序列化器,对消息的key和value进行序列化,序列化为字节数组, 3. 然后将序列化的结果交给分区器(Partitioner)
,分区器有3种策略
来计算消息应该属于哪个分区,- 在producerRecord中直接指定分区
,分区器会直接将消息放到指定分区- 如果没有指定分区器,但是消息有key
,分区器会根据消息的key计算hash值,根据主题分区数量取模,来决定将消息放到哪个分区- 如果没有指定分区、也没有指定key,分区器会以轮询(Round Robin)
的方式给消息分配分区在这里插入图片描述 - 消息经过以上拦截器->序列化器->分区器 进行加工后,会将消息放到
RecordAccumulator缓冲区
,对每个分区都会有一个单独的缓冲区,经过分区器计算出分区号之后,不同的消息就会分配给不同的缓冲区,缓冲区里面消息也是有序
的,我们可以指定对缓冲区里的消息进行分批次
,也可以指定缓冲区大小
- 当缓冲区中消息达到条件会按批次发送到broker对应分区上
- broker将接收到的消息进行刷盘持久化
- 一个消息发出去之后,服务器(broker)会返回给producer响应,producer再来判断消息是否发送成功,
- broker返回元数据信息 - > 落盘成功 ->生产者继续发送后面消息
- broker返回元数据信息 - >落盘失败 - 生产者设置了重试次数 -> producer 会将消息重新放入缓冲区进行排队,等待再次发送,当一个消息发送失败重试需要重发,消息是放到缓冲区队尾,
- 生产者去缓冲区重试发送
生产者在重试消息时,消息的顺序就错了,那怎么保证消息的有序性呢?
针对这种情况,可以做一个配置,
参数:
max.in.flight.requests.per.connection
表示producer 在收到broker响应之前可以发送多少批消息,默认5,
设置此值是1,表示broker在响应之前producer不能再向同一个broker发送请求,就是我确认一批你再发下一批,这样可以保证消息有序性,对消息顺序要求不高情况可以不考虑
补充:
- Producer 创建时,会创建一个Sender线程(IO线程)设置为守护线程
- Producer 创建时,会创建缓冲区
- Producer 生产消息,内部是一个
异步
流程,Sender
线程不断轮询
RecordAccumulator,满足条件后进行真正的网络IO发送消息 - RecordAccumulator(缓冲区) 对每一个分区都有一个缓冲区- 每个分区的缓冲区中消息也是有序的- 可以指定缓冲区中的消息按
批次
发送 - 缓冲区大小达到batch.size
,默认16KB- 在缓冲区等待时间lingger.ms
达到上限- 以上两个条件满足一个即发送一批- 可以指定整个缓冲区的大小
批次的概念很好理解,缓冲区就像一辆公交车,有两种发车方式,一是人满了就发车,一是等5分钟就发车,不管是人满了还是到5分钟了,发车,go~
分批发送可以减少网络IO,节省带宽使用,减少网络传输的压力,提升吞吐量
- 一个批次消息发送后,通过网络,发往Kafka指定分区,然后刷盘到broker
- 如果Producer设置了
retries
参数值>0,那么允许消息发送失败进行重试,重试机制由客户端Producer内部实现 - Broker端消息落盘成功,会返回元数据给生产者 - 通过阻塞直接返回 (同步发送)- 通过回调函数返回(异步发送)
版权归原作者 FightingITPanda 所有, 如有侵权,请联系我们删除。