搭建完kafka,一般都是使用本地来链接虚拟机的,初次链接会出现各种神奇的问题,特此记录一下。
首先,请先对生产者,消费者,topic有一个大概的认识
-----本地的工作
先在本地idea写生产者的代码
package com.example.kafkademo.demos;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ProducerQuickStart {
public static void main(String[] args) {
//
Properties prop=new Properties();
//kafka链接地址
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.12.130:9092");
//序列化
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//创建对象
KafkaProducer<String,String> producer=new KafkaProducer<String, String>(prop);
//发送
/**
* topic key value
*/
ProducerRecord<String,String> kvProducerRecord =new ProducerRecord<String,String>("mytopic","key-001","hello kafka1");
producer.send(kvProducerRecord);
//必须关闭
producer.close();
}
}
看不懂复制粘贴到gpt
这里可知道,我们对
192.168.12.130:9092 虚拟机kafka的发了一个("mytopic","key-001","hello kafka1");
向mytopic这个主题发送了一个ello kafka1。
-------本地防火墙的工作
我们要打开本地9092的端口来和虚拟机进行交流
具体教程站内有,有需要请移步,关键词 win防火墙 ,端口
总之
打开9092端口(kafka默认工作在9092)
--------虚拟机防火墙工作
进入到linux中,我们打开9092端口的防火墙,(默认9092)
执行以下命令以开放端口 9092(假设你使用的是 firewalld 防火墙):
firewall-cmd --zone=public --add-port=9092/tcp --permanent
这条命令会将 9092 端口添加到防火墙规则中,并持久化保存。
重新加载防火墙配置使更改生效:
firewall-cmd --reload
现在,端口 9092 应该已经被打开了。你可以通过检查防火墙规则来确认端口是否成功打开:
firewall-cmd --list-ports
--------看看能不能联通(有自信的可以跳过)
接下来我们可以使用telnet来尝试一下本地能不能访问kafka
在本地win系统下(没有可以花费30秒装一个)安装教程本站有,【高效开发工具系列】Windows 怎么使用 telnet_windows telnet-CSDN博客
测试telnet前必须保证虚拟机的测试端口正在运行(telnet只能测试被使用的端口)
虚拟机启动kafka
本地使用telnet
telnet 192.168 .12.130 9092 (测试虚拟机9092端口 )
联通成功可以进入telnet中,如图成功链接
---------kafka操作
还记得我们之前代码里的topic,叫做"mytopic"吗
如果没有mytopic的情况下是不会自动创建topic的,我们得在kafka手动创建这个topic
我们到kafka的bin目录下
./kafka-topics.sh --list --zookeeper 192.168.12.130:2181
看当前的所有topic
./kafka-topics.sh --create --zookeeper 192.168.12.130:2181 --replication-factor 1 --partitions 1 --topic mytopic
在 Kafka 中通过 ZooKeeper 创建一个名为
mytopic
的主题。这个命令看起来是正确的,假设你的 Kafka 集群中有一个运行 ZooKeeper 的节点,并且该节点的地址是
192.168.12.130:2181
。
执行该命令后,Kafka 将会尝试在指定的 ZooKeeper 上创建一个名为
mytopic
的主题,该主题将只有一个分区和一个副本。如果一切顺利,Kafka 将成功创建这个主题。
创建完和idea相似的代码后服务不要关闭
-----------启动idea生产者代码
10秒内肯定能执行完毕,即或不然出错了。
去kafka执行
./kafka-console-consumer.sh --bootstrap-server 192.168.12.130:9092 --topic test-topic --from-beginning --max-messages 10
这个命令将在命令行中启动一个简单的 Kafka 消费者客户端,并连接到指定的 Kafka 服务器(broker)来消费名为
test-topic
的主题中的消息。该命令还设置了一些选项:
--bootstrap-server 192.168.12.130:9092
:指定要连接的 Kafka 服务器的地址和端口。--topic test-topic
:指定要消费消息的主题名称,这里是test-topic
。--from-beginning
:表示从该主题的起始位置开始消费消息。--max-messages 10
:指定最大的消息数量,这里设置为 10 条。
一旦你执行这个命令,Kafka 将会在命令行中显示来自
test-topic
主题的前10条消息(如果有的话)。如果没有设置
--max-messages
参数,默认情况下会持续监听该主题并打印新到达的消息。也就是不会主动关闭
停止centos7当前进程的命令是ctrl+c
生产者已经注入了kafka
-----------遇到了无法访问的问题怎么办
- 请你回到上述联通的步骤,使用telnet工具
2,能联通,访问不了的话,我们就把kafka集群,zookeeper所有出现“本机”的字段更改成虚拟机地址(极度不推荐,麻烦,会出现后续问题)
在zookeeper.properties文件中加上
clientPortAddress=192.168.12.130 #示例
在所有kafka文件
Service-properties文件中加上
listeners=PLAINTEXT://192.168.12.130:9092
advertised.listeners=PLAINTEXT://192.168.12.130:9092 #改成你的虚拟机的ip和端口
将原来的
#listeners=PLAINTEXT://:9092 注释掉
版权归原作者 史上最优企划 所有, 如有侵权,请联系我们删除。