文章目录
前提
- 我的配置是
M1 芯片 Macbook pro
- 你的 kafka 处于启动状态,如果尚未启动,则通过以下命令依次运行
zookeeper
和kafka
,如果有安装问题可以参考上一篇文章brew services start zookeeperbrew services start kafka
python 环境配置
- 首先,确保安装了
confluent-kafka``````pip install confluent-kafka
- 使用以下Python脚本创建一个新的Kafka主题:
from confluent_kafka.admin import AdminClient, NewTopic# Kafka服务器配置admin_client = AdminClient({"bootstrap.servers":"localhost:9092"})# 创建新主题的配置topic_list =[NewTopic("my_new_topic", num_partitions=3, replication_factor=1)]# 注意: replication_factor 和 num_partitions 可能需要根据你的Kafka集群配置进行调整; # 创建主题fs = admin_client.create_topics(topic_list)# 处理结果for topic, f in fs.items():try: f.result()# The result itself is Noneprint(f"Topic {topic} created")except Exception as e:print(f"Failed to create topic {topic}: {e}")
> > - 注意replication_factor
不能超过broker
的数量> - 具体原因可以参考视频> - 通过python
只能创建 broker 的主题,而不能控制创建多个broker
,增加或管理brokers
的过程需要在集群的配置和部署阶段进行,而不能通过像confluent_kafka
这样的客户端库来实现。
Kafka 生产消费者模型
生产者 producer
"""
@file: producer.py
@Time : 2024/3/29
@Author : Peinuan qin
"""from confluent_kafka import Producer
import json
# Kafka配置
config ={'bootstrap.servers':'localhost:9092'}# 创建生产者
producer = Producer(**config)# 模拟的用户活动数据
data ={'user_id':1234,'activity':'page_view','page':'homepage'}# 发送数据
producer.produce('user_activities', key=str(data['user_id']), value=json.dumps(data))
producer.flush()print("Data sent to Kafka")
Data sent to Kafka
检查当前存在的所有 topic / 是否自动创建 topic
- 可以用如下命令来检查已经存在的
topics``````kafka-topics --list--bootstrap-server localhost:9092
- 对于上述
producer
中,我的server.property
中的auto.create.topics.enable
设置为 True,这意味着如果当前topics
不存在会自动创建。 - 检查
auto.create.topics.enable
的方式:grep "auto.create.topics.enable"/path/to/your/kafka/config/server.properties
- 一般
/path/to/your/kafka/config/server.properties
在我的 上篇文章 中提到了, m1 芯片的 mac 的地址是在/opt/homebrew/etc/kafka/server.properties
,所以对应的查看命令就是:grep "auto.create.topics.enable"/opt/homebrew/etc/kafka/server.properties
- 如果这一行在你的
server.properties
中并不存在,则默认为true
,如果想更改,需要在server.properties
中加入auto.create.topics.enable=false
然后保存更改,重新启动kafka
- 当你设置成
auto.create.topics.enable=false
,再次运行上面的代码,但是topic
换成一个新的
producer.produce('user_activities1', key=str(data['user_id']), value=json.dumps(data))
- 你会发现执行结果还是下面内容,并且没有报错:> Data sent to Kafka
- 但是当你列出所有的 topic,却发现其实
user_activities1
并没有创建成功。
为什么 producer 要通过 key, value 来发布数据
键(Key)
- 分区选择: 键主要用于决定消息被发送到主题的哪个分区。如果为消息指定了键,Kafka会对键进行哈希处理,根据哈希值将消息均匀分配到不同的分区。这种方式确保了相同键的所有消息都会被发送到同一个分区中,保证了消息的顺序性。如果没有指定键,消息会以轮询的方式分配到所有分区,这可能不会保证相同键的消息顺序。
- 日志压缩: 键还用于日志压缩(log compaction)功能。在这个模式下,Kafka保证每个键在分区日志中只保留最后一次更新的值。这对于维护长期运行的聚合状态非常有用。
值(Value)
- 消息内容: 值部分承载了消息的实际内容。这是生产者想要发送给消费者的数据。值可以是任何格式的数据,比如字符串、JSON对象、序列化后的字节码等。
- 使用场景示例- 订单系统:在一个订单系统中,订单ID可以作为键,而订单的详细信息(如客户信息、订单项、价格等)作为值。使用订单ID作为键确保了相同订单的更新会被顺序地发送到同一个分区,并且通过日志压缩,Kafka可以只保留订单的最新状态。- 用户行为跟踪:在用户行为跟踪应用中,用户ID 可以作为键,用户的行为(如点击、浏览等)作为值。这样,相同用户的所有行为都会被顺序地记录在同一个分区中,便于后续进行用户行为分析。
消费者 consumer
"""
@file: consumer.py
@Time : 2024/3/29
@Author : Peinuan qin
"""from confluent_kafka import Consumer, KafkaError
# Kafka配置
config ={'bootstrap.servers':'localhost:9092','group.id':'user-activity-group','auto.offset.reset':'earliest'}# 创建消费者
consumer = Consumer(**config)
consumer.subscribe(['user_activities'])# 读取数据try:whileTrue:
msg = consumer.poll(timeout=1.0)# 1秒超时if msg isNone:continueif msg.error():if msg.error().code()== KafkaError._PARTITION_EOF:# End of partition eventcontinueelse:print(msg.error())break# 成功接收消息print('Received message: {}'.format(msg.value().decode('utf-8')))except KeyboardInterrupt:passfinally:# 清理操作
consumer.close()
- 可以看到我们订阅了
'user_activities'
这个topic
,从其中源源不断地取数据来进行消费(处理) - 但是同时在
consumer
的代码中定义了一个group.id
,而这个是producer
中没有的,这样做的原因是:- 负载均衡:在同一个消费者组中,每个消费者可以负责消费特定的分区中的消息,这样可以在消费者之间分摊负载。如果一个消费者组中有多个消费者实例,Kafka会尽量平衡地将分区分配给每个消费者,确保每个分区只被组内的一个消费者消费。这意味着增加消费者可以提高消费的并行度,加快处理速度。- 容错和高可用性: 如果某个消费者失败,它负责的分区会被重新分配给同一消费者组内的其他消费者,这样可以确保消息的持续消费,提高了系统的容错能力。- 消息广播: 通过使用不同的消费者组,可以实现消息的广播模式,即相同的消息可以被多个消费者组独立消费。 - 如果不是认为还不是很清楚,可以 参考视频
consumer 得到的 message 有哪些方法?
print("msg dict:",dir(msg))
msg dict:['__class__','__delattr__','__dir__','__doc__','__eq__','__format__','__ge__','__getattribute__','__gt__','__hash__','__init__','__init_subclass__','__le__','__len__','__lt__','__ne__','__new__','__reduce__','__reduce_ex__','__repr__','__setattr__','__sizeof__','__str__','__subclasshook__','error','headers','key','latency','leader_epoch','offset','partition','set_headers','set_key','set_value','timestamp','topic','value']
- Key: 消息的键(如果有)。键用于消息的分区内排序和日志压缩。
- Value: 消息的实际内容或负载。
- Topic: 消息所属的主题。
- Partition: 消息所在的分区号。Kafka中的每个主题可以被分割成多个分区,分区号从0开始。
- Offset: 消息在其分区中的偏移量。偏移量是一个递增的序列号,用于唯一标识分区中的每条消息。
- Timestamp: 消息的时间戳。它可以是消息创建时的时间戳(生产者发送消息的时间)或者是消息被追加到日志的时间戳。时间戳的具体含义取决于Kafka生产者的配置。
- Headers: 消息头部,是键值对的集合,可以用来存储与消息相关的附加信息。生产者可以添加任意多的键值对作为消息的一部分,消费者可以读取这些信息进行相应的处理。
- Serialized Key Size: 键的序列化后的大小(以字节为单位)。如果消息没有键,通常这个值是-1。
- Serialized Value Size: 值的序列化后的大小(以字节为单位)。
- Leader Epoch (Kafka 0.11.0及以上版本): 分区领导者的纪元号。这是一个内部使用的字段,用于Kafka的复制机制,以确保数据一致性。
为什么 consumer 拿到的内容需要 decode
- 消费者在接收到Kafka中的消息后需要进行解码(decoding),原因在于消息的生产者在发送消息到Kafka之前通常会对消息的键(key)和值(value)进行编码(encoding)。编码和解码是为了在网络上传输数据时确保数据的一致性和完整性,同时也支持消息的有效存储。
- 一般使用
utf-8
进行编解码 - 上述的
producer
中没有显式调用encoder
是因为json.dumps
本身就是序列化的过程,也就是编码的过程。 但好习惯应该是对key
和value
都进行encode
:producer.produce('user_activities1', key=str(data['user_id']).encode("utf-8"), value=json.dumps(data).encode("utf-8"))
本文转载自: https://blog.csdn.net/qq_42902997/article/details/137159213
版权归原作者 暖仔会飞 所有, 如有侵权,请联系我们删除。
版权归原作者 暖仔会飞 所有, 如有侵权,请联系我们删除。