0


springBoot集成Kafka

1. 环境搭建

(1)搭建工程kafka-spring-boot-demo 添加pom依赖,最终的依赖信息

    <!-- 继承Spring boot工程 -->
     <parent>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-parent</artifactId>
         <version>2.2.8.RELEASE</version>
     </parent>
     <properties>
         <fastjson.version>1.2.58</fastjson.version>
     </properties>
     <dependencies>
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-web</artifactId>
         </dependency>
         <!-- kafkfa -->
         <dependency>
             <groupId>org.springframework.kafka</groupId>
             <artifactId>spring-kafka</artifactId>
         </dependency>
         <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
             <version>${fastjson.version}</version>
         </dependency>
     </dependencies>

(2)在resources下创建文件application.yml

server:
  port: 8081
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: 192.168.137.136:9092
    consumer:
      group-id: kafka-demo-kafka-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

(3)引导类

    package com.kafka;

    import org.springframework.boot.SpringApplication;
     import org.springframework.boot.autoconfigure.SpringBootApplication;

     @SpringBootApplication
     public class WebApp {
         public static void main(String[] args) {
             SpringApplication.run(WebApp.class, args);
         }
     }

2. 消息生产者

    新建controller
        package com.kafka.controller;
        import org.springframework.beans.factory.annotation.Autowired;
        import org.springframework.kafka.core.KafkaTemplate;
        import org.springframework.web.bind.annotation.GetMapping;
        import org.springframework.web.bind.annotation.RequestMapping;
        import org.springframework.web.bind.annotation.RestController;
​
        @RestController
        @RequestMapping("/test")
        public class TestController {
​
            @Autowired
            private KafkaTemplate<String,String> kafkaTemplate;
​
            @GetMapping("/send")
            public String sendMessage() {
                // 发送消息到kafka
                // 需要使用KafkaTemplate
                String topic = "spring_test_169";
                kafkaTemplate.send(topic,"hello spring boot kafka!");
                return "发送成功.";
            }
        }

5.3 消息消费者

1.5.3 消息消费者

    新建监听类:
        package com.heima.kafka.listener;
        import org.apache.kafka.clients.consumer.ConsumerRecord;
        import org.springframework.kafka.annotation.KafkaListener;
        import org.springframework.stereotype.Service;
​
        @Service
        public class HelloListener {
​
            /**
             * 消费者端:指定监听话题
             *
             * @param consumerRecord 监听到数据
             */
            @KafkaListener(topics = {"spring_test_169"})
            public void handlerMsg(ConsumerRecord<String, String> consumerRecord) {
                System.out.println("接收到消息:消息值:" + consumerRecord.value() + ",         消息偏移量:" + consumerRecord.offset());
            }
​

本文转载自: https://blog.csdn.net/weixin_66545010/article/details/125633876
版权归原作者 浩泽叶 所有, 如有侵权,请联系我们删除。

“springBoot集成Kafka”的评论:

还没有评论