0


本地idea连接Centos7kafka操作

搭建完kafka,一般都是使用本地来链接虚拟机的,初次链接会出现各种神奇的问题,特此记录一下。

首先,请先对生产者,消费者,topic有一个大概的认识

-----本地的工作

先在本地idea写生产者的代码

package com.example.kafkademo.demos;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerQuickStart {

    public static void main(String[] args) {
        //
        Properties prop=new Properties();
        //kafka链接地址
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.12.130:9092");
        //序列化
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //创建对象
        KafkaProducer<String,String> producer=new KafkaProducer<String, String>(prop);
        //发送

        /**
         * topic key value
         */
        ProducerRecord<String,String> kvProducerRecord =new ProducerRecord<String,String>("mytopic","key-001","hello kafka1");
        producer.send(kvProducerRecord);
        //必须关闭
        producer.close();
    }
}

看不懂复制粘贴到gpt

这里可知道,我们对

192.168.12.130:9092 虚拟机kafka的发了一个("mytopic","key-001","hello kafka1");
向mytopic这个主题发送了一个ello kafka1。
-------本地防火墙的工作

我们要打开本地9092的端口来和虚拟机进行交流

具体教程站内有,有需要请移步,关键词 win防火墙 ,端口

总之

打开9092端口(kafka默认工作在9092)

--------虚拟机防火墙工作

进入到linux中,我们打开9092端口的防火墙,(默认9092)

执行以下命令以开放端口 9092(假设你使用的是 firewalld 防火墙):

firewall-cmd --zone=public --add-port=9092/tcp --permanent

这条命令会将 9092 端口添加到防火墙规则中,并持久化保存。

重新加载防火墙配置使更改生效:
firewall-cmd --reload

现在,端口 9092 应该已经被打开了。你可以通过检查防火墙规则来确认端口是否成功打开:

firewall-cmd --list-ports
--------看看能不能联通(有自信的可以跳过)
接下来我们可以使用telnet来尝试一下本地能不能访问kafka

在本地win系统下(没有可以花费30秒装一个)安装教程本站有,【高效开发工具系列】Windows 怎么使用 telnet_windows telnet-CSDN博客

测试telnet前必须保证虚拟机的测试端口正在运行(telnet只能测试被使用的端口)

虚拟机启动kafka

本地使用telnet

telnet 192.168 .12.130 9092 (测试虚拟机9092端口 )

联通成功可以进入telnet中,如图成功链接

---------kafka操作

还记得我们之前代码里的topic,叫做"mytopic"吗

如果没有mytopic的情况下是不会自动创建topic的,我们得在kafka手动创建这个topic

我们到kafka的bin目录下

./kafka-topics.sh --list --zookeeper 192.168.12.130:2181
看当前的所有topic


./kafka-topics.sh --create --zookeeper 192.168.12.130:2181 --replication-factor 1 --partitions 1 --topic mytopic

在 Kafka 中通过 ZooKeeper 创建一个名为

mytopic

的主题。这个命令看起来是正确的,假设你的 Kafka 集群中有一个运行 ZooKeeper 的节点,并且该节点的地址是

192.168.12.130:2181

执行该命令后,Kafka 将会尝试在指定的 ZooKeeper 上创建一个名为

mytopic

的主题,该主题将只有一个分区和一个副本。如果一切顺利,Kafka 将成功创建这个主题。


创建完和idea相似的代码后服务不要关闭

-----------启动idea生产者代码

10秒内肯定能执行完毕,即或不然出错了。

去kafka执行

./kafka-console-consumer.sh --bootstrap-server 192.168.12.130:9092 --topic test-topic --from-beginning --max-messages 10   

这个命令将在命令行中启动一个简单的 Kafka 消费者客户端,并连接到指定的 Kafka 服务器(broker)来消费名为

test-topic

的主题中的消息。该命令还设置了一些选项:

  • --bootstrap-server 192.168.12.130:9092:指定要连接的 Kafka 服务器的地址和端口。
  • --topic test-topic:指定要消费消息的主题名称,这里是 test-topic
  • --from-beginning:表示从该主题的起始位置开始消费消息。
  • --max-messages 10:指定最大的消息数量,这里设置为 10 条。

一旦你执行这个命令,Kafka 将会在命令行中显示来自

test-topic

主题的前10条消息(如果有的话)。如果没有设置

--max-messages

参数,默认情况下会持续监听该主题并打印新到达的消息。也就是不会主动关闭

停止centos7当前进程的命令是ctrl+c

生产者已经注入了kafka

-----------遇到了无法访问的问题怎么办
  1. 请你回到上述联通的步骤,使用telnet工具

2,能联通,访问不了的话,我们就把kafka集群,zookeeper所有出现“本机”的字段更改成虚拟机地址(极度不推荐,麻烦,会出现后续问题)

在zookeeper.properties文件中加上

clientPortAddress=192.168.12.130                 #示例

在所有kafka文件

Service-properties文件中加上

listeners=PLAINTEXT://192.168.12.130:9092
advertised.listeners=PLAINTEXT://192.168.12.130:9092 #改成你的虚拟机的ip和端口

将原来的

#listeners=PLAINTEXT://:9092 注释掉

标签: kafka 分布式

本文转载自: https://blog.csdn.net/m0_70420478/article/details/136710178
版权归原作者 史上最优企划 所有, 如有侵权,请联系我们删除。

“本地idea连接Centos7kafka操作”的评论:

还没有评论