一、安装Kafka的环境要求
首先在安装 Kafka 之前,需要满足的环境要求:
1、Java运行环境
Kafka 是使用 Java 语言编写的,因此需要在安装 Kafka 之前先安装 Java 运行环境。Kafka 支持 Java 8 及以上版本。可以通过以下命令检查 Java 运行环境的版本
java -version
2、ZooKeeper
Kafka 的运⾏环境依赖于 ZooKeeper,Kafka 使用 ZooKeeper 进行分布式协调,因此在安装 Kafka 之前,需要先安装 ZooKeeper。
二、安装ZooKeeper
1、解压安装
在 /usr/local/ 下创建 zookeeper ⽂件夹并进⼊,
将 ZooKeeper 安装包解压到 /usr/local/zookeeper 中即可,
[root@localhost zookeeper]# tar -zxvf /root/apache-zookeeper-3.6.1-bin.tar.gz -C ./
2、创建一个data目录
这⾥直接在 /usr/local/zookeeper/apache-zookeeper-3.6.1-bin ⽬录中创建⼀个 data ⽬录。
该 data ⽬录地址要配到 ZooKeeper 的配置⽂件中。
3、创建配置文件并修改
进⼊到 zookeeper 的 conf ⽬录,复制 zoo_sample.cfg 得到 zoo.cfg :
[root@localhost apache-zookeeper-3.6.1-bin]# cd conf/
[root@localhost conf]# cp zoo_sample.cfg zoo.cfg
修改配置⽂件 zoo.cfg ,将其中的 dataDir 修改为上⾯刚创建的 data ⽬录。
4、启动ZooKeeper
[root@localhost apache-zookeeper-3.6.1-bin]# ./bin/zkServer.sh start
查看启动状态
[root@localhost apache-zookeeper-3.6.1-bin]# ./bin/zkServer.sh status
5、把ZooKeeper设置为开机自启动
⾸先进⼊ /etc/rc.d/init.d ⽬录,创建⼀个名为 zookeeper 的⽂件,并赋予执⾏权限。
[root@localhost ~]# cd /etc/rc.d/init.d/
[root@localhost init.d]# touch zookeeper
[root@localhost init.d]# chmod +x zookeeper
接下来编辑 zookeeper ⽂件,并在其中加⼊如下内容:
#!/bin/bash
#chkconfig:- 20 90
#description:zookeeper
#processname:zookeeper
ZOOKEEPER_HOME=/usr/local/zookeeper/apache-zookeeper-3.6.1-bin
export JAVA_HOME=/usr/local/java/jdk1.8.0_161 # 此处根据你的实际情况更换对
应
case $1 in
start) su root $ZOOKEEPER_HOME/bin/zkServer.sh start;;
stop) su root $ZOOKEEPER_HOME/bin/zkServer.sh stop;;
status) su root $ZOOKEEPER_HOME/bin/zkServer.sh status;;
restart) su root $ZOOKEEPER_HOME/bin/zkServer.sh restart;;
*) echo "require start|stop|status|restart" ;;
esac
最后加⼊开机启动即可:
chkconfig --add zookeeper
chkconfig zookeeper on
三、安装Kafka
1、解压安装
在 /usr/local/ 下创建 kafka ⽂件夹并进⼊
cd /usr/local/
mkdir kafka
cd kafka
将Kafka安装包解压到 /usr/local/kafka 中
[root@localhost kafka]# tar -zxvf /root/kafka_2.12-2.5.0.tgz -C ./
2、创建logs目录
这⾥直接在 /usr/local/kafka/kafka_2.12-2.5.0 ⽬录中创建⼀个 logs ⽬录
该 logs ⽬录地址要配到Kafka的配置⽂件中。
3、修改配置文件
进⼊到 Kafka 的 config ⽬录,编辑配置⽂件 server.properties
[root@localhost kafka_2.12-2.5.0]# cd config/
[root@localhost config]# vim server.properties
修改配置⽂件,⼀是将其中的 log.dirs 修改为上⾯刚创建的 logs ⽬录
四、启动Kafka,并使用命令行操作生产者消费者
1、启动Kafka
启动Kafka之前,需要先启动ZooKeeper,目前ZooKeeper已经设置成开机自启动了。
使用命令行启动Kafka。
./bin/kafka-server-start.sh ./config/server.properties
2、创建主题topic
⾸先创建⼀个名为 Winter 的 topic :
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic Winter
列出⽬前已有的 topic 列表
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
3、创建一个消费者
创建⼀个消费者,⽤于在 codesheep 这个 topic 上获取消息
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Winter
4、创建生产者
创建⼀个⽣产者,⽤于在 codesheep 这个 topic 上⽣产消息
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic Winter
5、使用生产者发送数据,消费者就可以接受数据。
五、使用Java集成Kafka,实现生产者和消费者。
1、首先创建一个Maven项目
引入依赖pom.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
2、创建生产者KafkaProducerTest.java
public class KafkaProducerTest {
public static void main(String[] args) {
// TODO 创建配置对象
Map<String, Object> configMap = new HashMap<>();
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.116.101:9092");
// TODO 对生产的数据K,V进行序列化的操作
configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// TODO 创建生产者对象
// 生产者对象需要设定泛型,数据的类型约束
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configMap);
// TODO 创建数据
// 构造数据时,需要传递三个参数
// 1. topic: 要发送数据的主题
// 2. key: 数据的key
// 3. value: 数据的值
// ProducerRecord<String, String> record = new ProducerRecord<String, String>(
// "Winter",
// "key",
// "value"
// );
//
// // TODO 通过生产者对象将数据发送给Kafka
// producer.send(record);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"Winter",
"key" + i,
"value" + i
);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
System.out.println("发送消息失败:" + exception.getMessage());
} else {
System.out.println("发送消息成功:" + metadata.toString());
}
});
}
// TODO 关闭资源
producer.close();
}
}
3、创建消费者KafkaConsumerTest.java
public class KafkaConsumerTest {
public static void main(String[] args) {
// TODO 创建配置对象
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.116.101:9092");
// TODO 对数据进行反序列化
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
// TODO 创建消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(consumerConfig);
// TODO 订阅主题
consumer.subscribe(Collections.singletonList("Winter"));
// TODO 从kafka的主题中获取数据
// 消费者从Kafka中拉取数据
while (true){
final ConsumerRecords<String, String> datas = consumer.poll(100);
for (ConsumerRecord<String, String> data : datas){
System.out.println(data);
}
}
// TODO 关闭消费者对象
//consumer.close();
}
}
4、启动Java和Linux的生产者消费者进行发送接收数据
成功运行Java的生产者KafkaProducerTest.java,发送数据
Java的消费者和Linux系统的消费者 都会收到生产者发过来的数据
使用Linux的生产者发送数据也是同理。
5、运行时遇到的问题
Linux系统Kafka的生产者和消费者可以正常 发送和接受数据。
但是在运行Java程序时,Java的生产者一直运行不停止,发送不了数据,消费者也接收不到数据。仔细检查IP、端口并没有写错,代码也没错误。
解决办法是
将kafka目录中的./config/server.properties文件中advertised.listeners改为如下属性。192.168.116.101是我虚拟机的IP。改完后重启,OK了。Java端的代码终于能通信了。
版权归原作者 Winter@ 所有, 如有侵权,请联系我们删除。