0


SpringBoot3 整合Kafka

官网:https://kafka.apache.org/documentation/

消息队列-场景

1. 异步

img

2. 解耦

img

3. 削峰

img

4. 缓冲

img

消息队列-Kafka

1. 消息模式

img

消息发布订阅模式,MessageQueue中的消息不删除,会记录消费者的偏移量

2. Kafka工作原理

img

同一个消费者组里的消费者是队列竞争模式:Consumer1消费Broker-0的news消息,Consumer2消费Broker-1的news消息,Consumer3消费Broker-2的news消息。如果有Consumer4,那他哪个分区都不能消费,就是消费的饥饿问题。

不同消费组中的消费者是发布/订阅模式:Consumer1和Consumer4都能消费0分区(Broker0)。

3. SpringBoot整合

参照:https://docs.spring.io/spring-kafka/docs/current/reference/html/#preface

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

配置

spring.kafka.bootstrap-servers=172.20.128.1:9092

修改

C:\Windows\System32\drivers\etc\hosts

文件,配置

8.130.32.70 kafka

4. 消息发送

kafkaTemplate发送消息的内容是对象时,需要用json序列化,在配置文件中加上:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
@SpringBootTestclassBoot07KafkaApplicationTests{@AutowiredKafkaTemplate kafkaTemplate;@TestvoidcontextLoads()throwsExecutionException,InterruptedException{StopWatch watch =newStopWatch();
        watch.start();CompletableFuture[] futures =newCompletableFuture[10000];for(int i =0; i <10000; i++){CompletableFuture send = kafkaTemplate.send("order","order.create."+i,"订单创建了:"+i);
            futures[i]=send;}CompletableFuture.allOf(futures).join();
        watch.stop();System.out.println("总耗时:"+watch.getTotalTimeMillis());}}
importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.stereotype.Component;@ComponentpublicclassMyBean{privatefinalKafkaTemplate<String,String> kafkaTemplate;publicMyBean(KafkaTemplate<String,String> kafkaTemplate){this.kafkaTemplate = kafkaTemplate;}publicvoidsomeMethod(){this.kafkaTemplate.send("someTopic","Hello");}}

5. 消息监听

@ComponentpublicclassOrderMsgListener{@KafkaListener(topics ="order",groupId ="order-service")publicvoidlisten(ConsumerRecord record){System.out.println("收到消息:"+record);//可以监听到发给kafka的新消息,以前的拿不到}@KafkaListener(groupId ="order-service-2",topicPartitions ={@TopicPartition(topic ="order",partitionOffsets ={@PartitionOffset(partition ="0",initialOffset ="0")})})publicvoidlistenAll(ConsumerRecord record){System.out.println("收到partion-0消息:"+record);}}

6. 参数配置

消费者

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another

生产者

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false

7. 自动配置原理

kafka 自动配置在

KafkaAutoConfiguration
  1. 容器中放了 KafkaTemplate 可以进行消息收发
  2. 容器中放了KafkaAdmin 可以进行 Kafka 的管理,比如创建 topic 等
  3. kafka 的配置在KafkaProperties
  4. @EnableKafka可以开启基于注解的模式

toConfiguration`

  1. 容器中放了 KafkaTemplate 可以进行消息收发
  2. 容器中放了KafkaAdmin 可以进行 Kafka 的管理,比如创建 topic 等
  3. kafka 的配置在KafkaProperties
  4. @EnableKafka可以开启基于注解的模式

image.png

标签: kafka spring boot java

本文转载自: https://blog.csdn.net/qq_36942720/article/details/135223914
版权归原作者 Please Sit Down 所有, 如有侵权,请联系我们删除。

“SpringBoot3 整合Kafka”的评论:

还没有评论