0


Idea 中kafka 生产者无法正常生产消息,SpringBoot编写简单的kafka实例

在idea中,kafka无法消费生产者的信息,无法正常退出。

首先观察kafka9092端口和zookeeper的2181端口的是否对外开放

是否开放

打开防火墙:

systemctl start firewalld

开放端口2181:

firewall-cmd --zone=public --add-port=2181/tcp --permanent

开放端口9092:

firewall-cmd --zone=public --add-port=9092/tcp --permanent

重启防火墙

firewall-cmd --reload

查看开放的端口:

firewall-cmd --list-ports

端口出现 2181/tcp 9092/tcp

证明端口开放

还未解决?

如果还是解决不了问题,可能是配置文件错误:

打开server.properties:

192.168.200.88 是本虚拟机的IP

listeners=PLAINTEXT://192.168.200.88:9092

(listeners=PLAINTEXT://0.0.0.0:9092)

advertised.listeners=PLAINTEXT://192.168.200.88:9092

集群记得修改其他的虚拟机,包括端口的开放和配置文件

在这个错误还没发现前,尝试使用springboot 启动kafka (因为springboot的日志要更加全面一些,我是小白嘻嘻🤭)

结果发现Broker disconnect 还有 不知道这样的主机(centos01) 还有centos02、centos03都不知道

那时心里想:你都不知道,我哪知道?

就考虑到端口的是否开放和连通性问题:

附上Springboot代码:

pom.xml文件:

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.kafka</groupId>
  7. <artifactId>spring-kafka</artifactId>
  8. <version>3.0.2</version>
  9. </dependency>
  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.kafka</groupId>
  7. <artifactId>spring-kafka</artifactId>
  8. <version>3.0.2</version>
  9. </dependency>

yaml文件:

  1. spring:
  2. kafka:
  3. producer:
  4. bootstrap-servers: 192.168.200.88:9092
  5. consumer:
  6. bootstrap-servers: 192.168.200.88:9092
  7. group-id: my-consumer-1
  1. spring:
  2. kafka:
  3. producer:
  4. bootstrap-servers: 192.168.200.88:9092
  5. consumer:
  6. bootstrap-servers: 192.168.200.88:9092
  7. group-id: my-consumer-1

KafkaConsumer:

  1. package com.example.springbootkakfa.kafka;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class KafkaConsumer {
  6. @KafkaListener(topics = "topictest",groupId = "my-consumer-1")
  7. public void processMessage(String content) {
  8. System.err.println("收到kafka消息: " + content);
  9. }
  10. }
  1. package com.example.springbootkakfa.kafka;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class KafkaConsumer {
  6. @KafkaListener(topics = "topictest",groupId = "my-consumer-1")
  7. public void processMessage(String content) {
  8. System.err.println("收到kafka消息: " + content);
  9. }
  10. }

KafkaController:

  1. package com.example.springbootkakfa.kafka;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.web.bind.annotation.*;
  5. @RestController
  6. public class KafkaController {
  7. @Autowired
  8. private KafkaTemplate<String, String> kafkaTemplate;
  9. @GetMapping("/sendMessage/{message}")
  10. public void sendMessage(@PathVariable("message") String message) {
  11. kafkaTemplate.send("topictest", message);
  12. }
  13. }
  1. package com.example.springbootkakfa.kafka;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.web.bind.annotation.*;
  5. @RestController
  6. public class KafkaController {
  7. @Autowired
  8. private KafkaTemplate<String, String> kafkaTemplate;
  9. @GetMapping("/sendMessage/{message}")
  10. public void sendMessage(@PathVariable("message") String message) {
  11. kafkaTemplate.send("topictest", message);
  12. }
  13. }

启动Springboot,

Get请求:localhost:8080/sendMessage/nihao

可以看到控制台输出

注意:


这个是springboot3.0.2和spring-kafka3.0.2记得要考虑kafka和springboot的兼容性

Springboot中记得要加上@Service和@RestController

让Springboot发现它

结尾:

(第一次发,不知道该说些什么哈哈😄)

希望对您有帮助,点赞👍如有错误,请纠正。谢谢


本文转载自: https://blog.csdn.net/m0_74930902/article/details/143228826
版权归原作者 m0_74930902 所有, 如有侵权,请联系我们删除。

“Idea 中kafka 生产者无法正常生产消息,SpringBoot编写简单的kafka实例”的评论:

还没有评论