0


Idea 中kafka 生产者无法正常生产消息,SpringBoot编写简单的kafka实例

在idea中,kafka无法消费生产者的信息,无法正常退出。

首先观察kafka9092端口和zookeeper的2181端口的是否对外开放

是否开放

打开防火墙:

systemctl start firewalld

开放端口2181:

firewall-cmd --zone=public --add-port=2181/tcp --permanent

开放端口9092:

firewall-cmd --zone=public --add-port=9092/tcp --permanent

重启防火墙

firewall-cmd --reload

查看开放的端口:

firewall-cmd --list-ports

端口出现 2181/tcp 9092/tcp

证明端口开放

还未解决?

如果还是解决不了问题,可能是配置文件错误:

打开server.properties:

192.168.200.88 是本虚拟机的IP

listeners=PLAINTEXT://192.168.200.88:9092

(listeners=PLAINTEXT://0.0.0.0:9092)

advertised.listeners=PLAINTEXT://192.168.200.88:9092

集群记得修改其他的虚拟机,包括端口的开放和配置文件

在这个错误还没发现前,尝试使用springboot 启动kafka (因为springboot的日志要更加全面一些,我是小白嘻嘻🤭)

结果发现Broker disconnect 还有 不知道这样的主机(centos01) 还有centos02、centos03都不知道

那时心里想:你都不知道,我哪知道?

就考虑到端口的是否开放和连通性问题:

附上Springboot代码:

pom.xml文件:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.0.2</version>
</dependency>
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>3.0.2</version>
        </dependency>

yaml文件:

spring:
  kafka:
    producer:
      bootstrap-servers: 192.168.200.88:9092
    consumer:
      bootstrap-servers: 192.168.200.88:9092
      group-id: my-consumer-1
spring:
  kafka:
    producer:
      bootstrap-servers: 192.168.200.88:9092
    consumer:
      bootstrap-servers: 192.168.200.88:9092
      group-id: my-consumer-1

KafkaConsumer:

package com.example.springbootkakfa.kafka;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {
    @KafkaListener(topics = "topictest",groupId = "my-consumer-1")
    public void processMessage(String content) {
        System.err.println("收到kafka消息: " + content);
    }
}
package com.example.springbootkakfa.kafka;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {
    @KafkaListener(topics = "topictest",groupId = "my-consumer-1")
    public void processMessage(String content) {
        System.err.println("收到kafka消息: " + content);
    }
}

KafkaController:

package com.example.springbootkakfa.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;

@RestController
public class KafkaController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable("message") String message) {
        kafkaTemplate.send("topictest", message);
    }

}
package com.example.springbootkakfa.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;

@RestController
public class KafkaController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable("message") String message) {
        kafkaTemplate.send("topictest", message);
    }

}

启动Springboot,

Get请求:localhost:8080/sendMessage/nihao

可以看到控制台输出

注意:


这个是springboot3.0.2和spring-kafka3.0.2记得要考虑kafka和springboot的兼容性

Springboot中记得要加上@Service和@RestController

让Springboot发现它

结尾:

(第一次发,不知道该说些什么哈哈😄)

希望对您有帮助,点赞👍如有错误,请纠正。谢谢


本文转载自: https://blog.csdn.net/m0_74930902/article/details/143228826
版权归原作者 m0_74930902 所有, 如有侵权,请联系我们删除。

“Idea 中kafka 生产者无法正常生产消息,SpringBoot编写简单的kafka实例”的评论:

还没有评论