0


大数据-59 Kafka 高级特性 消息发送03-自定义拦截器、整体原理剖析

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(正在更新…)

章节内容

上节我们完成了如下的内容:

  • Kafka 序列化器
  • Kafka 自定义序列化器
  • Kafka 分区器
  • Kafka 自定义分区器

在这里插入图片描述

拦截器

在这里插入图片描述

Producer拦截器

(Interceptor)和

Consumer拦截器

Kafka0.10

版本中

引入

的,主要是Client端的定制化控制逻辑。
对于Producer而言,

Interceptor

使得用户在消息

发送前

以及Producer

回调逻辑前

有机会对消息做一些

定制化

的需要,比如修改消息、修改时间等。
同时Producer允许指定

多个Interceptor

顺序

作用在同一条消息上,形成一个

拦截链

(Interceptor chain)。

  • onSend(ProducerRecord):该方法封装进KafkaProducer.send方法中,即运行在主线程中,Producer确保在消息序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何的操作,但最好不要修改消息所属的分区、topic等等,否则会影响分区的计算。
  • onAckonwledgement(RecordMetadata, Exception):该方法会在消息被应答之前或消息失败时调用,并且通常都是在Producer回调逻辑触发之前,onAcknowledgement运行在Producer的IO线程中,因此不要再该方法中放入很重要的逻辑,并要用于执行一些资源清理工作。
  • close:关闭Interceptor,主要执行一些资源清理工作。

如果上所述,

Interceptor

可能被运行在

多个线程

中,因为在具体实现时用户需要自行确保

线程的安全


倘若指定了多个Inteceptor,则Producer会按照顺序进行调用他们,并且其中

可能抛出的异常

记录到日志

中而

不是向上传递

!!!

自定义拦截器

根据对拦截器的观察学习,我们知道了,要实现自定义的拦截器,我们需要:

  • 实现ProducerInterceptor接口
  • KafkaProducer的设置中定义自定义的拦截器

自定义类

(上一节 大数据 Kafka 58 点击跳转)
借用我们刚才实现的 User 类,这里就不再写了。

自定义拦截器

自定义拦截器01

publicclassInterceptor01<K,V>implementsProducerInterceptor<K,V>{@OverridepublicProducerRecord<K,V>onSend(ProducerRecord<K,V> record){System.out.println("=== 拦截器01 onSend ===");// 做一些操作return record;}@OverridepublicvoidonAcknowledgement(RecordMetadata metadata,Exception exception){System.out.println("=== 拦截器01 onAcknowledgement ===");if(null!= exception){// 此处应该记录日志等操作
            exception.printStackTrace();}}@Overridepublicvoidclose(){}@Overridepublicvoidconfigure(Map<String,?> configs){}}

自定义拦截器02

publicclassInterceptor02<K,V>implementsProducerInterceptor<K,V>{@OverridepublicProducerRecord<K,V>onSend(ProducerRecord<K,V> record){System.out.println("=== 拦截器02 onSend ===");// 做一些操作return record;}@OverridepublicvoidonAcknowledgement(RecordMetadata metadata,Exception exception){System.out.println("=== 拦截器02 onAcknowledgement ===");if(null!= exception){// 此处应该记录日志等操作
            exception.printStackTrace();}}@Overridepublicvoidclose(){}@Overridepublicvoidconfigure(Map<String,?> configs){}}

使用拦截器

configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"icu.wzk.model.Interceptor01,icu.wzk.model.Interceptor02");

原理剖析

整体原理图

在这里插入图片描述

主线程

负责消息创建,拦截器,序列化器,分区器操作,并将消息追加到收集器。

RecordAccumulator:

  • 消息收集器RecorderAccumulator每个分区维护一个Deque类型的双端队列
  • ProducerBatch可以理解为ProducerRecord集合,批量发送有利于提升吞吐量,降低网络影响
  • 由于生产者客户端使用 ByteBuffer 在发送消息之前进行消息保存,并维护了一个 BufferPool 实现 ByteBuffer 的复用。该缓存池只针对特定大小的 ByteBuffer 进行管理,如果消息过大,不能做到重复利用。
  • 每次追加一条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取一个ProducerBatch,判断当前消息的大小是否可以写入到该批次中。若可以写入则写入,若不可以写入则新建一个ProducerBatch。
  • 该线程从消息收集器获取缓存的消息,将其处理 <Node, List> 形式,Node表示集群的Broker的节点。
  • 进一步将 <Node, List> 转换为 <Node, Request> 形式,此时才可以向服务端发送数据。
  • 在发送之前,Sender线程将消息以 Map<NodeId, Deque<Deque> 形式保存到 InFlightRequests中进行缓存。可以通过获取 leastLoadedNode,即当前Node中负载压力最小的一个,以实现消息的尽快发出。

本文转载自: https://blog.csdn.net/w776341482/article/details/140837771
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。

“大数据-59 Kafka 高级特性 消息发送03-自定义拦截器、整体原理剖析”的评论:

还没有评论