0


基于docker的confluent-kafka搭建及python接口使用

基于docker的confluent-kafka搭建及python接口使用

本文介绍基于docker搭建的confluent-kafka及其python接口的使用。

本文只搭建了一个单Broker的confluent-kafka测试环境,考虑到高可用、高吞吐等因素,正式生产环境一般至少要3个节点。

本文采用的系统配置如下:

  • LinuxMint 20.3 (兼容 Ununtu 20.04)
  • docker 20.10.21
  • docker-compose 2.14.2
  • python 3.9.16
  • confluent-kafka(python包) 2.1.1

1. 安装docker以及docker-compose

1.1 安装docker

docker-compose依赖于docker,因此需要先安装docker。

curl -fsSL https://test.docker.com -o test-docker.sh
sudosh test-docker.sh

1.2 安装docker-compose

Compose 是用于定义和运行多容器 Docker 应用程序的工具。通过 Compose,您可以使用 YML 文件来配置应用程序需要的所有服务。然后,使用一个命令,就可以从 YML 文件配置中创建并启动所有服务。

curl -L https://get.daocloud.io/docker/compose/releases/download/v2.14.2/docker-compose-`uname -s`-`uname -m`> /usr/local/bin/docker-compose

要安装其他版本的 Compose,请替换 v2.14.2。

2. 安装confluent-kafka

新建文件并创建docker-compose.yml文件:

version:'3'services:zookeeper:image: confluentinc/cp-zookeeper:7.0.1
    container_name: zookeeper
    environment:ZOOKEEPER_CLIENT_PORT:2181ZOOKEEPER_TICK_TIME:2000broker:image: confluentinc/cp-kafka:7.0.1
    container_name: broker
    ports:# To learn about configuring Kafka for access across networks see# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/-"9092:9092"depends_on:- zookeeper
    environment:KAFKA_BROKER_ID:1KAFKA_ZOOKEEPER_CONNECT:'zookeeper:2181'KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:1KAFKA_TRANSACTION_STATE_LOG_MIN_ISR:1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR:1

注意 这里搭建的是本地环境,如果需要从网络中的另一位置访问kafka,需要将

KAFKA_ADVERTISED_LISTENERS

中的

localhost

换成kafka所在主机的真实地址/域名。

进入该文件夹并运行:

docker-compose -f docker-compose.yml up -d

运行后在docker中看到类似结果说明启动成功:

aa@bb:~/docker_scripts$ dockerps
CONTAINER ID   IMAGE                             COMMAND                  CREATED         STATUS         PORTS                                       NAMES
e6fbc05d61b1   confluentinc/cp-kafka:7.0.1       "/etc/confluent/dock…"7 minutes ago   Up 7 minutes   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp   broker
58b04385f2bf   confluentinc/cp-zookeeper:7.0.1   "/etc/confluent/dock…"7 minutes ago   Up 7 minutes   2181/tcp, 2888/tcp, 3888/tcp                zookeeper

这里kafka端口为9092。

关闭容器服务:

docker-compose -f docker-compose.yml down

3. python接口使用

3.1 安装依赖包

安装依赖包:

pip3 install confluent-kafka

3.2 创建、查看topic

进入kafka镜像:

dockerexec -ti broker bash

查看topic:

[aa@bb ~]$ /bin/kafka-topics --list --bootstrap-server 127.0.0.1:9092

新建名为test001的topic:

[aa@bb ~]$ /bin/kafka-topics --create --bootstrap-server 127.0.0.1:9092 --topic test001 --partitions 2
Created topic test001.

查看topic:

[aa@bb ~]$ /bin/kafka-topics --list --bootstrap-server 127.0.0.1:9092
test001

通过

Ctrl

+

P

+

Q

回到终端。

3.3 python接口-broker

创建producer代码

producer1.py

import socket

from confluent_kafka import Producer

conf ={'bootstrap.servers':"localhost:9092",'client.id': socket.gethostname()}

producer = Producer(conf)def__publish_delivery_report(err, msg)->None:if err isnotNone:print(f"send msg:{msg} fail, err is not None")else:print(f"send msg{msg} success")defsend_msg(topic:str, data):
    producer.produce(topic, data, callback=__publish_delivery_report)
    producer.flush()if __name__ =='__main__':
    msg ="hello kafka"
    topic ="test001"
    send_msg(topic, msg)

运行结果:

aa@bb:~/codes/kafka_test$ python3 producer1.py
send msg<cimpl.Message object at 0x7f8d6fe6acc0> success

3.4 python接口-consumer

创建consumer代码

consumer1.py

from confluent_kafka import Consumer
 

classKafkaConsumer:def__init__(self, brokers, group):
        config =dict()
        config['bootstrap.servers']= brokers
        config['group.id']= group
        config['auto.offset.reset']='earliest'
        self.consumer = Consumer(config)defsubscribe(self, topics):
        self.consumer.subscribe(topics=topics)defpull(self):whileTrue:
            msg = self.consumer.poll(1.0)if msg isNone:continueif msg.error():print("Consumer error: {}".format(msg.error()))continueprint('Received message: {}'.format(msg.value().decode('utf-8')))defclose(self):
        self.consumer.close()if __name__ =="__main__":
    consumer = KafkaConsumer("127.0.0.1:9092","test_group1")
    consumer.subscribe(['test001'])
    consumer.pull()
    consumer.close()

运行结果:

aa@bb:~/codes/kafka_test$ python3 consumer1.py
Received message: hello kafka

参考链接

  1. Hello Kafka(八)——Confluent Kafka简介
  2. Docker Compose | 菜鸟教程
  3. confluent_kafka生产者 - luckygxf - 博客园
  4. Hello Kafka(十二)——Python客户端_kafka python客户端_天山老妖的博客-CSDN博客
标签: kafka docker python

本文转载自: https://blog.csdn.net/willian113/article/details/130560234
版权归原作者 摇光65535 所有, 如有侵权,请联系我们删除。

“基于docker的confluent-kafka搭建及python接口使用”的评论:

还没有评论