在idea中,kafka无法消费生产者的信息,无法正常退出。
首先观察kafka9092端口和zookeeper的2181端口的是否对外开放
是否开放
打开防火墙:
systemctl start firewalld
开放端口2181:
firewall-cmd --zone=public --add-port=2181/tcp --permanent
开放端口9092:
firewall-cmd --zone=public --add-port=9092/tcp --permanent
重启防火墙
firewall-cmd --reload
查看开放的端口:
firewall-cmd --list-ports
端口出现 2181/tcp 9092/tcp
证明端口开放
还未解决?
如果还是解决不了问题,可能是配置文件错误:
打开server.properties:
192.168.200.88 是本虚拟机的IP
listeners=PLAINTEXT://192.168.200.88:9092
(listeners=PLAINTEXT://0.0.0.0:9092)
advertised.listeners=PLAINTEXT://192.168.200.88:9092
集群记得修改其他的虚拟机,包括端口的开放和配置文件
在这个错误还没发现前,尝试使用springboot 启动kafka (因为springboot的日志要更加全面一些,我是小白嘻嘻🤭)
结果发现Broker disconnect 还有 不知道这样的主机(centos01) 还有centos02、centos03都不知道
那时心里想:你都不知道,我哪知道?
就考虑到端口的是否开放和连通性问题:
附上Springboot代码:
pom.xml文件:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.2</version>
</dependency>
yaml文件:
spring:
kafka:
producer:
bootstrap-servers: 192.168.200.88:9092
consumer:
bootstrap-servers: 192.168.200.88:9092
group-id: my-consumer-1
spring:
kafka:
producer:
bootstrap-servers: 192.168.200.88:9092
consumer:
bootstrap-servers: 192.168.200.88:9092
group-id: my-consumer-1
KafkaConsumer:
package com.example.springbootkakfa.kafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "topictest",groupId = "my-consumer-1")
public void processMessage(String content) {
System.err.println("收到kafka消息: " + content);
}
}
package com.example.springbootkakfa.kafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "topictest",groupId = "my-consumer-1")
public void processMessage(String content) {
System.err.println("收到kafka消息: " + content);
}
}
KafkaController:
package com.example.springbootkakfa.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;
@RestController
public class KafkaController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable("message") String message) {
kafkaTemplate.send("topictest", message);
}
}
package com.example.springbootkakfa.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;
@RestController
public class KafkaController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable("message") String message) {
kafkaTemplate.send("topictest", message);
}
}
启动Springboot,
Get请求:localhost:8080/sendMessage/nihao
可以看到控制台输出
注意:
这个是springboot3.0.2和spring-kafka3.0.2记得要考虑kafka和springboot的兼容性
Springboot中记得要加上@Service和@RestController
让Springboot发现它
结尾:
(第一次发,不知道该说些什么哈哈😄)
希望对您有帮助,点赞👍如有错误,请纠正。谢谢
版权归原作者 m0_74930902 所有, 如有侵权,请联系我们删除。