0


PHP Kafka 使用详解

一 安装PHP扩展
1 rdkafka 安装需要依赖 librdkafka , 所以先安装 librdkafka 自定义目录 下载扩展并且安装

git clone https://github.com/edenhill/librdkafka.git

cd librdkafka ## 找到自己的安装路径./configure

make && make install

2 安装 php-rdkafka 扩展

git clone https://github.com/arnaud-lb/php-rdkafka.git

cd php-rdkafka ## 找到自己的安装路径/www/server/php/72/bin/phpize ## 这里根据自己安装PHP的路径执行./configure --with-php-config=/www/server/php/72/bin/php-config  ## 这里根据自己安装的PHP版本情况填写路径

make && make install

3 在PHP配置文件 php-ini 加上

extension =/www/server/php/72/lib/php/extensions/no-debug-non-zts-20170718/rdkafka.so ##找到自己使用的PHP路径 如果安装多个版本的PHP注意要找到项目使用的PHP版本

重启 php-fpm,查看phpinfo() 或者再命令行执行 php -m 查看扩展是否安装成功

在这里插入图片描述
在这里插入图片描述如果php-m找不到扩展 则需在安装的PHP下找到 /www/server/php/72/etc php-cli.ini (此路径为我的PHP安装路径 需要找到你自己的PHP安装路径) 在此配置文件最下端添加

extension = /www/server/php/72/lib/php/extensions/no-debug-non-zts-20170718/rdkafka.so

三 安装 Kafka 服务 开放端口 kafka默认9092端口

1 直接到 kafka官网 , 下载最新的

下载后放在服务器项目的自定目录下 解压  可放在/www/server/ 目录下(Linux服务器)

2 使用命令下载

    命令 如没有wget命令则需yum安装
    wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz
    解压,进入目录
    tar -zxvf kafka_2.13-2.5.0.tgz
    cd kafka_2.13-2.5.0

四 启动Kafka服务(在放置kafka文件的根目录下执行命令)

1 使用安装包中的脚本启动单节点 Zookeeper 实例   (新版本的kafka已无需执行此命令)
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
2 使用 kafka-server-start.sh 启动 kafka 服务
 bin/kafka-server-start.sh config/server.properties //普通启动
 
 bin/kafka-server-start.sh -daemon ./config/server.properties //项目上线后需要以守护进程的方式启动

3 创建topic:

bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092--replication-factor 1--partitions 1--topic test

发消息: bin/kafka-console-producer.sh --broker-list127.0.0.1:9092--topic test
>hello
>world

监听消息:bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092--topic test --from-beginning

五 PHP 使用Kafka

1创建一个生产者类

<?phpclassKafkaProducer{publicstatic$brokerList='127.0.0.1:9092';publicstaticfunctionsend($message,$topic){self::producer($message,$topic);}publicstaticfunctionproducer($message,$topic='test'){$conf=new\RdKafka\Conf();$conf->set('metadata.broker.list',self::$brokerList);$producer=new\RdKafka\Producer($conf);$topic=$producer->newTopic($topic);$topic->produce(RD_KAFKA_PARTITION_UA,0,json_encode($message));$producer->poll(0);$result=$producer->flush(10000);if(RD_KAFKA_RESP_ERR_NO_ERROR!==$result){thrownew\RuntimeException('Was unable to flush, messages might be lost!');}}}

2 创建一个消费类

<?phpclassKafkaConsumer{publicstatic$brokerList='127.0.0.1:9092';publicstaticfunctionconsumer(){$conf=new\RdKafka\Conf();$conf->set('group.id','test');$rk=new\RdKafka\Consumer($conf);$rk->addBrokers("127.0.0.1");$topicConf=new\RdKafka\TopicConf();$topicConf->set('auto.commit.interval.ms',100);$topicConf->set('offset.store.method','broker');$topicConf->set('auto.offset.reset','smallest');$topic=$rk->newTopic('test',$topicConf);$topic->consumeStart(0,RD_KAFKA_OFFSET_STORED);while(true){$message=$topic->consume(0,120*10000);switch($message->err){caseRD_KAFKA_RESP_ERR_NO_ERROR:var_dump($message);break;caseRD_KAFKA_RESP_ERR__PARTITION_EOF:echo"No more messages; will wait for more\n";break;caseRD_KAFKA_RESP_ERR__TIMED_OUT:echo"Timed out\n";break;default:thrownew\Exception($message->errstr(),$message->err);break;}}}}

问题汇总

1、 No Java runtime present, requesting install

因为 kafka 需要 java 环境支持,所以安装 java 环境。可以到 javase-jdk14-downloads 选择自己的版本进行下载安装

2、创建 topic 出现:Replication factor: 1 larger than available brokers: 0

意思是至少有一个 brokers. 也就是说并没有有效的 brokers 可以用。你要确保你的 kafka 已经启动了

3、如果安装多个PHP版本 配置信息需要配置到项目使用的PHP版本

标签: kafka php linux

本文转载自: https://blog.csdn.net/weixin_39104010/article/details/125313440
版权归原作者 元方你也玩 所有, 如有侵权,请联系我们删除。

“PHP Kafka 使用详解”的评论:

还没有评论