0


Kafka - 异步/同步发送API

文章目录

在这里插入图片描述

异步发送

普通异步发送

需求:创建Kafka生产者,采用异步的方式发送到Kafka broker

异步发送流程

在这里插入图片描述

Code

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version></dependency>
packagecom.artisan.pc;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importorg.apache.kafka.common.serialization.StringSerializer;importjava.util.Properties;importjava.util.concurrent.ExecutionException;/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */publicclassCustomProducer{publicstaticvoidmain(String[] args)throwsExecutionException,InterruptedException{// 1. 创建kafka生产者的配置对象Properties properties =newProperties();// 2. 给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.126.170:9092");// key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for(int i =0; i <10; i++){RecordMetadata art = kafkaProducer.send(newProducerRecord<>("art","kafka-msg-"+ i)).get();System.out.println(art.offset());System.out.println("over - "+ i);}// 5. 关闭资源
        kafkaProducer.close();}}

输出

31
over - 032
over - 133
over - 234
over - 335
over - 436
over - 537
over - 638
over - 739
over - 840
over - 9

忽略我这个offset … 我都发了好多次了…

看控制台的吧

在这里插入图片描述


带回调函数的异步发送

回调函数callback()会在producer收到ack时调用,为异步调用。

该方法有两个参数分别是RecordMetadata(元数据信息)和Exception(异常信息)。

  • 如果Exception为null,说明消息发送成功,
  • 如果Exception不为null,说明消息发送失败

带回调函数的异步发送流程

在这里插入图片描述

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

Code

package com.artisan.pc;import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;import java.util.concurrent.ExecutionException;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomProducerWithCallBack {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");

        // key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用send方法,发送消息
        for(int i =0; i <10; i++){
            // 添加回调
            // 该方法在Producer收到ack时调用,为异步调用
            kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-callback-" + i), (recordMetadata, e) ->{
                // 没有异常,输出信息到控制台
                System.out.println("主题" + recordMetadata.topic() + ", 分区:" + recordMetadata.partition() + ", 偏移量:" + recordMetadata.offset());});}

        // 5. 关闭资源
        kafkaProducer.close();}}

在这里插入图片描述

控制台

在这里插入图片描述


同步发送API

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可

在这里插入图片描述

packagecom.artisan.pc;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.common.serialization.StringSerializer;importjava.util.Properties;importjava.util.concurrent.ExecutionException;/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */publicclassCustomProducerSync{publicstaticvoidmain(String[] args)throwsExecutionException,InterruptedException{// 1. 创建kafka生产者的配置对象Properties properties =newProperties();// 2. 给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.126.170:9092");// key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer =newKafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for(int i =0; i <10; i++){// 通过Future接口的get实现同步阻塞
            kafkaProducer.send(newProducerRecord<>("art","kafka-msg-get-"+ i)).get();}// 5. 关闭资源
        kafkaProducer.close();}}

在这里插入图片描述

标签: kafka 分布式

本文转载自: https://blog.csdn.net/yangshangwei/article/details/134045040
版权归原作者 小小工匠 所有, 如有侵权,请联系我们删除。

“Kafka - 异步/同步发送API”的评论:

还没有评论