0


PHP小白搭建Kafka环境以及初步使用rdkafka

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录


前言

提示:windows环境安装失败,Linux环境安装成功(以下并没有windows安装示例)

一、安装java(Kafka必须安装java,因为kafka依赖java核心)

下载地址:链接: https://www.oracle.com/java/technologies/downloads/#jdk20-linux
在这里插入图片描述
将文件放在Linux目录中后进行解压:

假设我把[jdk-20_linux-x64_bin.tar.gz]包放在了/root/src/uap/web/third 目录下
1、tar -zxvf jdk-20_linux-x64_bin.tar.gz
2、mv jdk.0.20./jdk
3、vim /etc/profile 
 JAVA_HOME=/root/src/uap/web/third/jdk
 PATH=/root/src/uap/web/third/jdk/bin:$PATH
 export JAVA_HOME
4、source /ect/profile
5、java -version (出现下图极为成功)

在这里插入图片描述

二、安装以及配置Kafka、zookeeper

1.下载Kafka(无需下载zookeeper,使用kafka自带的即可)

下载地址:https://kafka.apache.org/downloads
提示:不要下载带src的那个,具体我也不知道,因为我也是个小白
在这里插入图片描述

假设我把[kafka_2.12-3.5.1.tgz]包放在了/root/src/uap/web/third 目录下
1、tar -zxvf kafka_2.12-3.5.1.tgz
2、mv kafka.2.12./kafka
3、创建kafka日志文件
 mkdir -p ./kafka_data/log/kafka
 mkdir -p ./kafka_data/log/zookeeper
 mkdir -p ./kafka_data/zookeeper
4、cd ./kafka/config
vim server.properties
 listeners=PLAINTEXT://localhost:9092 (34行左右,添加对应的host、port)
 broker.id=0
 port=9092
 host.name=192.168.1.241
 log.dirs=/root/src/uap/web/third/kafka_data/log/kafka
 zookeeper.connect=localhost:2181
wd
vim zookeeper.properties
 dataDir=/root/src/uap/web/third/kafka_data/zookeeper
 dataLogDir=/root/src/uap/web/third/kafka_data/log/zookeeper
 clientPort=2181
 maxClientCnxns=100
 tickTimes=2000
 initLimit=10
 syncLimit=5
wd
5、cd ../ 进入kafka目录下
#启动zookeeper
./bin/zookeeper-server-start.sh ./config/zookeeper.properties &//如果其中报错,大部分应该是报JAVA_HOME 这个说明你没有配置 /etc/profile 上面有./bin/kafka-server-start.sh -daemon ./config/server.properties &

2.配置topid

代码如下(示例):

kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic myt
返回值:Created topic myt.  创建成功/否则失败

3.安装PHP的rdkafka,这个网上教程很多,基本上都是正确的

例如:阿里云开发者社区,php安装rdkafka教程
剩下逻辑就直接贴代码了

生产者:
publicfunctionproducer(){$conf=newRdKafka\Conf();$conf->set('metadata.broker.list','localhost:9092');$producer=newRdKafka\Producer($conf);$topic=$producer->newTopic("mytest");//获取数据库数据,存入kafka中$wanchk=$this->db->query("SELECT * FROM hf_alarm_wanchk");foreach($wanchkas$k=>$v){$topic->produce(RD_KAFKA_PARTITION_UA,0,array2json($v));$producer->poll(0);}$result=$producer->flush(10000);if(RD_KAFKA_RESP_ERR_NO_ERROR!==$result){thrownew\RuntimeException('Was unable to flush, messages might be lost!');}$producer->purge(RD_KAFKA_PURGE_F_QUEUE);$producer->flush(10000);}
消费者://这个代码需要使用终端运行:// /bin/php -c /etc/php.ini  -f  /入口文件目录/index.php (类)consumer (方法)consumerpublicfunctionconsumer(){$conf=new\RdKafka\Conf();$conf->set('group.id','mytest');$rk=new\RdKafka\Consumer($conf);$rk->addBrokers("127.0.0.1");$topicConf=new\RdKafka\TopicConf();$topicConf->set('auto.commit.interval.ms',100);$topicConf->set('offset.store.method','broker');$topicConf->set('auto.offset.reset','smallest');$topic=$rk->newTopic('mytest',$topicConf);$topic->consumeStart(0,RD_KAFKA_OFFSET_STORED);while(true){$message=$topic->consume(0,120*10000);switch($message->err){caseRD_KAFKA_RESP_ERR_NO_ERROR:var_dump($message);break;caseRD_KAFKA_RESP_ERR__PARTITION_EOF:echo"No more messages; will wait for more\n";break;caseRD_KAFKA_RESP_ERR__TIMED_OUT:echo"Timed out\n";break;default:thrownew\Exception($message->errstr(),$message->err);break;}}}
标签: php kafka 开发语言

本文转载自: https://blog.csdn.net/a836236241/article/details/132495663
版权归原作者 白鹭天行 所有, 如有侵权,请联系我们删除。

“PHP小白搭建Kafka环境以及初步使用rdkafka”的评论:

还没有评论