0


【项目实战】Java 开发 Kafka 生产者

👉博主介绍: 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO TOP红人

Java知识图谱点击链接:体系化学习Java(Java面试专题)

💕💕 感兴趣的同学可以收藏关注下不然下次找不到哟💕💕

✊✊ 感觉对你有帮助的朋友,可以给博主一个三连,非常感谢 🙏🙏🙏

在这里插入图片描述

文章目录

1、什么是 Kafka 生产者

在这里插入图片描述

Kafka 生产者是指使用 Apache Kafka 的应用程序,用于向 Kafka 集群发送消息。生产者将消息发布到 Kafka 主题(topic),然后消费者可以从该主题订阅并接收这些消息。Kafka 生产者是实现消息发布的一方,可以是任何编程语言中的应用程序。

2、Java 如何使用 Kafka 生产者

  1. 首先,在Java项目中添加Kafka客户端依赖项。您可以在构建工具(如Maven或Gradle)中添加以下依赖项:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.7.2</version></dependency>
  1. 创建Kafka生产者配置。您需要指定Kafka集群的地址和端口等配置信息。以下是一个示例配置:
Properties props =newProperties();
props.put("bootstrap.servers","localhost:9092");// Kafka集群地址和端口
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");// 键的序列化器
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 值的序列化器Producer<String,String> producer;try{
    producer =newKafkaProducer<>(props);String topic ="your-topic-name";String key ="your-message-key";String value ="your-message-value";ProducerRecord<String,String> record =newProducerRecord<>(topic, key, value);
    producer.send(record);}catch(Exception ex){}finally{try{
        producer.close();}catch(Exception ex){}}

但是在 SpringBoot 的项目中我们会使用 KafkaTemplate 去实现生产消息的发送。

3、SpringBoot 如何使用 Kafka 生产者

都需添加以下依赖项:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.7.2</version></dependency>

3.1、方式一:代码

@ConfigurationpublicclassKafkaProducerConfig{/**
     * kafka 地址
     */@Value("${kafka.bootstrap-servers}")privateString bootstrapServers;@BeanpublicProducerFactory<String,String>producerFactory(){Map<String,Object> configProps =newHashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);returnnewDefaultKafkaProducerFactory<>(configProps);}@BeanpublicKafkaTemplate<String,String>kafkaTemplate(){returnnewKafkaTemplate<>(producerFactory());}}

使用如下:

@ServicepublicclassKafkaProducerService{@AutowiredprivateKafkaTemplate<String,String> kafkaTemplate;publicvoidsendMessage(String topic,String key,String value){
        kafkaTemplate.send(topic, key, value);}}

3.2、方式二:配置文件

可以 application.properties: 加上:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.value-serializer=org.apache.kafka.common.serialization.StringSerializer

或者 yml 里面加上

spring:kafka:bootstrap-servers: localhost:9092key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.apache.kafka.common.serialization.StringSerializer

直接使用

@ServicepublicclassKafkaProducerService{@AutowiredprivateKafkaTemplate<String,String> kafkaTemplate;publicvoidsendMessage(String topic,String key,String value){
        kafkaTemplate.send(topic, key, value);}}

以上也只是一个简单的实例,后面我们根据 【项目实战】手把手教你搭建前后端分离项目 SpringBoot + Vue + Element UI + Mysql, 在这个教程的基础上,我们写如何实战。

4、Kafka Properties 的详细讲解

以下是所有参数的详细解释:

  1. bootstrap.servers :生产者用于与Kafka集群建立初始连接的主机和端口列表。
  2. acks :生产者要求leader在认为请求完成之前接收的确认数。可能的值有:
  • 0 :生产者不等待任何确认。
  • 1 :生产者等待leader确认请求。
  • all :生产者等待所有同步副本确认请求。
  1. retries :在放弃之前,生产者将重试发送失败的消息的次数。设置大于0的值以启用重试。
  2. batch.size :生产者尝试发送到Kafka代理的批次的大小(以字节为单位)。较大的批次大小可以提高吞吐量,但会增加消息传递的延迟。
  3. linger.ms :生产者在将批次发送到Kafka代理之前等待更多消息累积的时间(以毫秒为单位)。这有助于批处理,减少发送到代理的请求数量。
  4. buffer.memory :生产者用于缓冲等待发送到Kafka代理的消息的总内存量。
  5. key.serializer :用于将键对象序列化为字节的类。常见的选项是 StringSerializerByteArraySerializer
  6. value.serializer :用于将值对象序列化为字节的类。常见的选项是 StringSerializerByteArraySerializer
  7. compression.type :用于消息的压缩类型。支持的值有 nonegzipsnappylz4 。压缩可以减少网络带宽和存储要求。
  8. max.in.flight.requests.per.connection :在阻塞之前,生产者可以有的未确认请求的最大数量。将此值设置为较高的值可以增加吞吐量,但也会增加用于缓冲的内存。
  9. request.timeout.ms :生产者在考虑请求失败之前,从Kafka代理等待响应的最长时间(以毫秒为单位)。
  10. max.block.ms :当缓冲区已满或元数据不可用时,生产者在 send() 方法中阻塞的最长时间(以毫秒为单位)。

以上这些是Kafka生产者配置中常用的一些属性,使用方法如下:

Properties props =newProperties();
props.put("bootstrap.servers","localhost:9092");// Kafka集群地址和端口
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");// 键的序列化器
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 值的序列化器

5、Spring-Kafka Yml 配置参数

spring:kafka:bootstrap-servers: <bootstrap-servers>producer:key-serializer: <key-serializer>value-serializer: <value-serializer>retries: <retries>batch-size: <batch-size>linger-ms: <linger-ms>buffer-memory: <buffer-memory>compression-type: <compression-type>consumer:group-id: <group-id>key-deserializer: <key-deserializer>value-deserializer: <value-deserializer>auto-offset-reset: <auto-offset-reset>enable-auto-commit: <enable-auto-commit>max-poll-records: <max-poll-records>

以下是每个参数的解释:

  • bootstrap-servers :Kafka broker地址的逗号分隔列表。
  • producer.key-serializer :用于将键对象序列化为字节的类。
  • producer.value-serializer :用于将值对象序列化为字节的类。
  • producer.retries :在放弃之前,生产者将重试发送失败的消息的次数。
  • producer.batch-size :生产者将尝试发送到Kafka broker的批次的大小(以字节为单位)。
  • producer.linger-ms :生产者在将批次发送到Kafka broker之前等待更多消息累积的时间(以毫秒为单位)。
  • producer.buffer-memory :生产者用于缓冲等待发送到Kafka broker的消息的总内存量。
  • producer.compression-type :消息的压缩类型。
  • consumer.group-id :消费者组ID。
  • consumer.key-deserializer :用于将键对象从字节反序列化的类。
  • consumer.value-deserializer :用于将值对象从字节反序列化的类。
  • consumer.auto-offset-reset :当Kafka中没有初始偏移量或当前偏移量不再存在时,使用的策略。
  • consumer.enable-auto-commit :消费者的偏移量是否应自动提交。
  • consumer.max-poll-records :消费者在一次轮询中最多获取的记录数。

6、Kafka 生产者异步回调方式生产消息

6.1、什么是异步回调

什么是异步回调要搞清楚,异步回调指的是我发送完成了,我就不管了,我不需要等你的返回。具体的定义如下:

异步回调是一种编程模式,用于处理异步操作的结果。在异步回调中,当一个操作被触发时,程序不会立即阻塞等待结果,而是继续执行其他任务。当操作完成后,系统会调用预先定义的回调函数来处理操作的结果。

异步回调常用于处理需要等待时间较长的操作,例如网络请求、数据库查询等。通过使用异步回调,可以提高系统的响应性能和并发处理能力,避免阻塞和等待的情况。

在异步回调中,通常将回调函数作为参数传递给异步操作的方法。当操作完成后,系统会调用回调函数,并将操作的结果作为参数传递给回调函数,以便进行后续处理。

异步回调在编写异步代码时非常有用,可以帮助开发人员处理异步操作的结果,而无需显式地等待操作完成。这种方式可以提高系统的性能和可伸缩性,同时保持代码的简洁性和可读性。

6.2、匿名内部类的方式做异步回调

publicclassKafkaProducerExample{privatestaticfinalStringTOPIC_NAME="test-topic";privatestaticfinalStringBOOTSTRAP_SERVERS="localhost:9092";publicstaticvoidmain(String[] args){Properties props =newProperties();
        props.put("bootstrap.servers",BOOTSTRAP_SERVERS);
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String,String> producer =newKafkaProducer<>(props);for(int i =0; i <10; i++){String message ="Hello, Kafka! This is message "+ i;ProducerRecord<String,String> record =newProducerRecord<>(TOPIC_NAME, message);
            producer.send(record,newCallback(){@OverridepublicvoidonCompletion(RecordMetadata recordMetadata,Exception e){// 匿名内部类的方式做异步回调}});}

        producer.close();}}

6.3、 KafkaTemplate 的异步回调

packagecom.pany.camp.kafka;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.support.SendResult;importorg.springframework.stereotype.Component;importorg.springframework.util.concurrent.ListenableFuture;importorg.springframework.util.concurrent.ListenableFutureCallback;importjavax.annotation.Resource;/**
 *
 * @description:  生产者
 * @copyright: @Copyright (c) 2022
 * @company: Aiocloud
 * @author: pany
 * @version: 1.0.0
 * @createTime: 2023-06-26 18:10
 */@ComponentpublicclassKafkaProducer{@ResourceprivateKafkaTemplate<String,String> kafkaTemplate;publicvoidsendMessage(String topic,String message){ListenableFuture<SendResult<String,String>> future = kafkaTemplate.send(topic, message);

        future.addCallback(newListenableFutureCallback<>(){@OverridepublicvoidonSuccess(Object o){}@OverridepublicvoidonFailure(Throwable ex){// Handle failure callbackSystem.err.println("Failed to send message: "+ ex.getMessage());}});}}

💕💕 本文由激流原创,原创不易,希望大家关注、点赞、收藏,给博主一点鼓励,感谢!!!
🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃🎃
在这里插入图片描述

标签: java kafka 生产者

本文转载自: https://blog.csdn.net/qq_37967783/article/details/131693969
版权归原作者 激流丶 所有, 如有侵权,请联系我们删除。

“【项目实战】Java 开发 Kafka 生产者”的评论:

还没有评论