0


Python 操作 Kafka,生产者和消费者代码 Demo


-- coding:utf-8 --

from kafka import KafkaProducer

此处ip可以是多个[‘0.0.0.1:9092’,‘0.0.0.2:9092’,‘0.0.0.3:9092’ ]

producer = KafkaProducer(bootstrap_servers=[‘localhost:9092’], compression_type=‘gzip’)

for i in range(3):

msg = “msg%d” % i

producer.send(‘test’, msg)

producer.close()

若消息过大,可压缩消息发送,可选值为

gzip

,

snappy

,

lz4

生产者-json 数据


-- coding:utf-8 --

import json

from kafka import KafkaProducer

此处ip可以是多个[‘0.0.0.1:9092’,‘0.0.0.2:9092’,‘0.0.0.3:9092’ ]

producer = KafkaProducer(bootstrap_servers=[‘localhost:9092’], value_serializer=lambda m: json.dumps(m).encode(‘ascii’))

for i in range(3):

msg = “msg%d” % i

producer.send(‘test’, {msg: msg})

producer.close()

消费者:


-- coding:utf-8 --

from kafka import KafkaConsumer

consumer = KafkaConsumer(‘test’, bootstrap_servers=[‘localhost:9092’])

for message in consumer:

print (“%s:%d:%d: key=%s value=%s” % (message.topic, message.partition, message.offset, message.key, message.value))

先启动消费者,再启动生产者,可以看到


本文转载自: https://blog.csdn.net/m0_61369360/article/details/137655619
版权归原作者 港迪学编程 所有, 如有侵权,请联系我们删除。

“Python 操作 Kafka,生产者和消费者代码 Demo”的评论:

还没有评论