0


【SpringBoot】整合Kafka集群

学习笔记

一、环境

使用Kafka3.0.0
masterslave1slave2ip193.168.3.34193.168.3.35193.168.3.36

二、maven引入

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

三、application配置

spring:kafka:bootstrap-servers: 192.168.3.34:9092,192.168.3.35:9092,192.168.3.36:9092# 指定 kafka 的地址producer:#生产者retries:0#重复次数 ,失败不重发batch-size:16384#每次批量发送消息的数量buffer-memory:33554432#缓存大小达到buffer.memory就发送数据acks:1# 0=生产者将不会等待来自服务器的任何确认  1=leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应  -1 =leader将等待完整的同步副本集以确认记录      key-serializer: org.apache.kafka.common.serialization.StringSerializer #指定 key 的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer #指定 value 的序列化器consumer:group-id: nacl #指定消费者组的 group_idauto-offset-reset: earliest   #latest 最新的位置 , earliest最早的位置auto-commit-interval:100#自动提交offset频率 100毫秒      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #指定 key 的反序列化器value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #指定 value 的反序列化器listener:concurrency:3#3个并行监听

四、SpringBoot-生产者

importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importjavax.annotation.Resource;@CrossOrigin@RestControllerpublicclassProducerController{// Kafka 模板用来向 kafka 发送数据@ResourceprivateKafkaTemplate<String,Object> kafkaTemplate;@RequestMapping("/kf")publicStringdata(){
        kafkaTemplate.send("first","hello");return"ok";}}

五、SpringBoot-消费者

import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.KafkaListener;

@Configuration
public class KafkaConsumer {
    // 指定要监听的 topic
    @KafkaListener(topics ="first")
    public void consumeTopic(String msg){ // 参数: 收到的 value
        System.out.println("收到的信息: " + msg);}}

六、SpringBoot-主题分区

importorg.apache.kafka.clients.admin.NewTopic;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassKafkaTopic{@BeanpublicNewTopicbatchTopic(){//项目启动时,自动创建topic,指定分区和副本数量returnnewNewTopic("first",3,(short)1);}}
标签: kafka spring boot java

本文转载自: https://blog.csdn.net/lushixuan12345/article/details/128393911
版权归原作者 擅长开发Bug的Mr.NaCl 所有, 如有侵权,请联系我们删除。

“【SpringBoot】整合Kafka集群”的评论:

还没有评论