0


Kafka生产者——消息发送流程,同步、异步发送API

生产者消息发送流程

发送原理

Kafka的Producer发送消息采用的是异步发送的方式。
在消息发送的过程中,涉及到了两个线程:main线程和Sender线程,以及一个线程共享变量:RecordAccumulator。
①main线程中创建了一个双端队列RecordAccumulator,将消息发送给RecordAccumulator。
②Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
在这里插入图片描述

  • batch.size: 只有数据积累到batch.size之后,sender才会发送数据。默认16k
  • linger.ms: 如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。 0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据,Leader收到数据后应答。-1 ( al1) :生产者发送过来的数据,Leader和和ISR队列里面的所有节点收齐数据后应答。-1和al1等价。

生产者重要参数列表

  • bootstrap.servers: 生产者连接集群所需的broker地址清单。可以设置1个或者多个,中间用逗号隔开。生产者从给定的broker里查找到其他broker信息。
  • key.serializer、 value.serializer: 指定发送消息的key和value的序列化类型。要写全类名。(反射获取)
  • buffer.memory: RecordAccumulator缓冲区总大小,默认32m。
  • batch.size: 缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
  • linger.ms: 如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。
  • acks: 0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据,Leader数据落盘后应答。 -1(all):生产者发送过来的数据,Leader和isr队列里面的所有节点数据都落盘后应答。默认值是-1
  • max.in.flight.requests.per.connection: 允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。
  • Retries(重试): 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
  • retry.backoff.ms: 两次重试之间的时间间隔,默认是100ms。
  • enable.idempotence: 是否开启幂等性,默认true,开启幂等性。
  • compression.type 生产者发送的所有数据的压缩方式。默认是none,不压缩。 支持压缩类型:none、gzip、snappy、lz4和zstd。

异步发送API

普通异步发送

  1. 需求:创建Kafka生产者,采用异步的方式发送到Kafka broker
  2. 异步发送流程如下:在这里插入图片描述batch.size: 只有数据积累到batch.size之后,sender才会发送数据。默认16klinger.ms: 如果数据迟迟未达到batch.size,sender等待lingerms设置的时间到了之后就会发送数据。单位ms,默认值是Oms,表示没有延迟。
  3. 代码编写 1)创建工程kafka-demo 2)导入依赖
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency></dependencies>

3)创建包名:com.taohua.kafka.producer
4)编写代码:不带回调函数的API

packagecom.taohua.kafka.producer;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importjava.util.Properties;publicclassCustomProducer{publicstaticvoidmain(String[] args)throwsInterruptedException{// 1. 创建kafka生产者的配置对象Properties properties =newProperties();// 2. 给kafka配置对象添加配置信息
        properties.put("bootstrap.servers","hadoop102:9092");// key,value序列化
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for(int i =0; i <10; i++){
            kafkaProducer.send(newProducerRecord<>("first","kafka"+ i));}// 5. 关闭资源
        kafkaProducer.close();}}

5)测试:
在hadoop102上开启kafka消费者

[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

在IDEA中执行上述代码,观察hadoop102消费者输出

[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
kafka0
kafka1
kafka2
kafka3
……

带回调函数的异步发送

  1. 回调函数callback()会在producer收到ack时调用,为异步调用。 该方法有两个参数分别是RecordMetadata(元数据信息)和Exception(异常信息)。 ·如果Exception为null,说明消息发送成功, ·如果Exception不为null,说明消息发送失败。
  2. 带回调函数的异步调用发送流程在这里插入图片描述batch.size: 只有数据积累到batch.size之后,sender才会发送数据。默认16klinger.ns: 如果数据迟迟未达到batch.size,sender等待linger:ms设置的时间到了之后就会发送数据。单位ms,默认值是Oms,表示没有延迟。
  3. 编写代码:带回调函数的生产者
packagecom.taohua.kafka.producer;importorg.apache.kafka.clients.producer.*;importjava.util.Properties;publicclassCustomProducerCallback{publicstaticvoidmain(String[] args)throwsInterruptedException{// 1. 创建kafka生产者的配置对象Properties properties =newProperties();// 2. 给kafka配置对象添加配置信息
        properties.put("bootstrap.servers","hadoop102:9092");
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key,value序列化(必须)
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for(int i =0; i <10; i++){// 添加回调
            kafkaProducer.send(newProducerRecord<>("first","kafka"+ i),newCallback(){// 该方法在Producer收到ack时调用,为异步调用@OverridepublicvoidonCompletion(RecordMetadata metadata,Exception exception){if(exception ==null)// 没有异常,输出信息到控制台System.out.println("主题"+recordMetadata.topic()+", 分区:"+recordMetadata.partition()+", 偏移量:"+recordMetadata.offset());}});}// 5. 关闭资源
        kafkaProducer.close();}}
  1. 测试 1)在hadoop102上开启kafka消费者
[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

2)在IDEA中执行代码,观察hadoop102消费者输出

[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
kafka0
kafka1
kafka2
……

3)在IDEA控制台观察回调信息

主题first, 分区:0, 偏移量:10
主题first, 分区:0, 偏移量:11
主题first, 分区:0, 偏移量:12
主题first, 分区:0, 偏移量:13
主题first, 分区:0, 偏移量:14
主题first, 分区:0, 偏移量:15
主题first, 分区:0, 偏移量:16
主题first, 分区:0, 偏移量:17
主题first, 分区:0, 偏移量:18
主题first, 分区:0, 偏移量:19
……

同步发送API

  1. 同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。 由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可。
  2. 同步发送流程示意图如下:在这里插入图片描述batch.size: 只有数据积累到batch.size之后,sender才会发送数据。默认16klinger.ns: 如果数据迟迟未达到batch.size,sender等待linger:ms设置的时间到了之后就会发送数据。单位ms,默认值是Oms,表示没有延迟。
  3. 编写代码:同步发送消息的生产者
packagecom.atguigu.kafka.producer;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importjava.util.Properties;importjava.util.concurrent.ExecutionException;publicclassConsumerProducerSync{publicstaticvoidmain(String[] args)throwsInterruptedException,ExecutionException{// 1. 创建kafka生产者的配置对象Properties properties =newProperties();// 2. 给kafka配置对象添加配置信息//properties.put("bootstrap.servers","hadoop102:9092");
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");// key,value序列化(必须)
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for(int i =0; i <10; i++){// 同步发送
            kafkaProducer.send(newProducerRecord<>("first","kafka"+ i)).get();}// 5. 关闭资源
        kafkaProducer.close();}}
  1. 测试

1)在hadoop102上开启kafka消费者

[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

2)在IDEA中执行代码,观察hadoop102消费者的消费情况

[atguigu@hadoop104 kafka]$ sbin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
kafka0
kafka1
kafka2
……
标签: kafka java 分布式

本文转载自: https://blog.csdn.net/weixin_50843918/article/details/128645949
版权归原作者 桃花键神 所有, 如有侵权,请联系我们删除。

“Kafka生产者——消息发送流程,同步、异步发送API”的评论:

还没有评论