-- coding:utf-8 --
from kafka import KafkaProducer
此处ip可以是多个[‘0.0.0.1:9092’,‘0.0.0.2:9092’,‘0.0.0.3:9092’ ]
producer = KafkaProducer(bootstrap_servers=[‘localhost:9092’], compression_type=‘gzip’)
for i in range(3):
msg = “msg%d” % i
producer.send(‘test’, msg)
producer.close()
若消息过大,可压缩消息发送,可选值为
gzip
,
snappy
,
lz4
。
生产者-json 数据
-- coding:utf-8 --
import json
from kafka import KafkaProducer
此处ip可以是多个[‘0.0.0.1:9092’,‘0.0.0.2:9092’,‘0.0.0.3:9092’ ]
producer = KafkaProducer(bootstrap_servers=[‘localhost:9092’], value_serializer=lambda m: json.dumps(m).encode(‘ascii’))
for i in range(3):
msg = “msg%d” % i
producer.send(‘test’, {msg: msg})
producer.close()
消费者:
-- coding:utf-8 --
from kafka import KafkaConsumer
consumer = KafkaConsumer(‘test’, bootstrap_servers=[‘localhost:9092’])
for message in consumer:
print (“%s:%d:%d: key=%s value=%s” % (message.topic, message.partition, message.offset, message.key, message.value))
先启动消费者,再启动生产者,可以看到
版权归原作者 港迪学编程 所有, 如有侵权,请联系我们删除。