学习笔记
一、环境
使用Kafka3.0.0
masterslave1slave2ip193.168.3.34193.168.3.35193.168.3.36
二、maven引入
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
三、application配置
spring:kafka:bootstrap-servers: 192.168.3.34:9092,192.168.3.35:9092,192.168.3.36:9092# 指定 kafka 的地址producer:#生产者retries:0#重复次数 ,失败不重发batch-size:16384#每次批量发送消息的数量buffer-memory:33554432#缓存大小达到buffer.memory就发送数据acks:1# 0=生产者将不会等待来自服务器的任何确认 1=leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应 -1 =leader将等待完整的同步副本集以确认记录 key-serializer: org.apache.kafka.common.serialization.StringSerializer #指定 key 的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer #指定 value 的序列化器consumer:group-id: nacl #指定消费者组的 group_idauto-offset-reset: earliest #latest 最新的位置 , earliest最早的位置auto-commit-interval:100#自动提交offset频率 100毫秒 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #指定 key 的反序列化器value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #指定 value 的反序列化器listener:concurrency:3#3个并行监听
四、SpringBoot-生产者
importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importjavax.annotation.Resource;@CrossOrigin@RestControllerpublicclassProducerController{// Kafka 模板用来向 kafka 发送数据@ResourceprivateKafkaTemplate<String,Object> kafkaTemplate;@RequestMapping("/kf")publicStringdata(){
kafkaTemplate.send("first","hello");return"ok";}}
五、SpringBoot-消费者
import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.KafkaListener;
@Configuration
public class KafkaConsumer {
// 指定要监听的 topic
@KafkaListener(topics ="first")
public void consumeTopic(String msg){ // 参数: 收到的 value
System.out.println("收到的信息: " + msg);}}
六、SpringBoot-主题分区
importorg.apache.kafka.clients.admin.NewTopic;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassKafkaTopic{@BeanpublicNewTopicbatchTopic(){//项目启动时,自动创建topic,指定分区和副本数量returnnewNewTopic("first",3,(short)1);}}
本文转载自: https://blog.csdn.net/lushixuan12345/article/details/128393911
版权归原作者 擅长开发Bug的Mr.NaCl 所有, 如有侵权,请联系我们删除。
版权归原作者 擅长开发Bug的Mr.NaCl 所有, 如有侵权,请联系我们删除。