0


Python Kafka客户端性能测试比较

前言

  1. 由于工作原因使用到了 Kafka,而现有的代码并不能满足性能需求,所以需要开发高效读写 Kafka 的工具,本文是一个 Python Kafka Client 的性能测试记录,通过本次测试,可以知道选用什么第三方库的性能最高,选用什么编程模型开发出来的工具效率最高。

第三方库性能测试

1.第三方库

  1. 此次测试的是三个主要的 Python Kafka Clientpykafkakafka-python confluent-kafka,具体介绍见官网:
  • pykafka:pykafka · PyPI
  • kafka-python:kafka-python · PyPI
  • confluent_kafka:confluent-kafka · PyPI

测试环境

  1. 此次测试使用的 Python 版本是2.7,第三方库的版本为:
  • pykafka:2.8.0

  • kafka-python:2.0.2

  • confluent-kafka:1.5.0

    使用的数据总量有50万,每条数据大小为2KB,总共为966MB。

测试过程

(1)Kafka Producer 测试

  分别使用 pykafka、kafka-python 和 confluent-kafka 实例化一个 Kafka 的 Producer 对象,然后调用相应的 produce 方法将数据推送给 Kafka,数据总条数为50万,比较三个库所耗费的时间,并计算每秒钟可以推送的数据条数和大小,比较得出性能最优的。

  代码示例(以 pykafka 为例):

  1. import sys
  2. from datetime import datetime
  3. from pykafka import KafkaClient
  4. class KafkaProducerTool():
  5. def __init__(self, broker, topic):
  6. client = KafkaClient(hosts=broker)
  7. self.topic = client.topics[topic]
  8. self.producer = self.topic.get_producer()
  9. def send_msg(self, msg):
  10. self.producer.produce(msg)
  11. if __name__ == '__main__':
  12. producer = KafkaProducerTool(broker, topic)
  13. print(datetime.now())
  14. for line in sys.stdin:
  15. producer.send_msg(line.strip())
  16. producer.producer.stop()
  17. print(datetime.now())

(2)Kafka Consumer 测试

  分别使用 pykafka、kafka-python 和 confluent-kafka 实例化一个 Kafka 的 Consumer 对象,然后调用相应的 consume 方法从 Kafka 中消费数据,要消费下来的数据总条数为50万,比较三个库所耗费的时间,并计算每秒钟可以消费的数据条数和大小,比较得出性能最优的。

  代码示例(以 pykafka 为例):

  1. from datetime import datetime
  2. from pykafka import KafkaClient
  3. class KafkaConsumerTool():
  4. def __init__(self, broker, topic):
  5. client = KafkaClient(hosts=broker)
  6. self.topic = client.topics[topic]
  7. self.consumer = self.topic.get_simple_consumer()
  8. def receive_msg(self):
  9. count = 0
  10. print(datetime.now())
  11. while True:
  12. msg = self.consumer.consume()
  13. if msg:
  14. count += 1
  15. if count == 500000:
  16. print(datetime.now())
  17. return
  18. if __name__ == '__main__':
  19. consumer = KafkaConsumerTool(broker, topic)
  20. consumer.receive_msg()
  21. consumer.consumer.stop()

测试结果

  • Kafka Producer 测试结果:
    总耗时/秒每秒数据量/MB每秒数据条数confluent_kafka3527.9014285.71pykafka5019.5310000kafka-python5321.83939.85

  • Kafka Consumer 测试结果:
    总耗时/秒每秒数据量/MB每秒数据条数confluent_kafka3925.0412820.51kafka-python5218.789615.38pykafka3352.921492.54

    测试结论

  经过测试,在此次测试的三个库中,生产消息的效率排名是:confluent-kafka > pykafka > kafka-python,消费消息的效率排名是:confluent-kafka > kafka-python > pykafka,由此可见 confluent-kafka 的性能是其中最优的,因而选用这个库进行后续开发。

多线程模型性能测试

编程模型

  经过前面的测试已经知道 confluent-kafka 这个库的性能是很优秀的了,但如果还需要更高的效率,应该怎么办呢?当单线程(或者单进程)不能满足需求时,我们很容易想到使用多线程(或者多进程)来增加并发提高效率,考虑到线程的资源消耗比进程少,所以打算选用多线程来进行开发。那么多线程消费 Kafka 有什么实现方式呢?我想到的有两种:

  1. 一个线程实现一个 Kafka Consumer,最多可以有 n 个线程同时消费 Topic(其中 n 是该 Topic 下的分区数量);

  1. 多个线程共用一个 Kafka Consumer,此时也可以实例化多个 Consumer 同时消费。

对比这两种多线程模型:

  • 模型1实现方便,可以保证每个分区有序消费,但 Partition 数量会限制消费能力;
  • 模型2并发度高,可扩展能力强,消费能力不受 Partition 限制。

测试过程

(1)多线程模型1

  测试代码:

  1. import time
  2. from threading import Thread
  3. from datetime import datetime
  4. from confluent_kafka import Consumer
  5. class ChildThread(Thread):
  6. def __init__(self, name, broker, topic):
  7. Thread.__init__(self, name=name)
  8. self.con = KafkaConsumerTool(broker, topic)
  9. def run(self):
  10. self.con.receive_msg()
  11. class KafkaConsumerTool:
  12. def __init__(self, broker, topic):
  13. config = {
  14. 'bootstrap.servers': broker,
  15. 'session.timeout.ms': 30000,
  16. 'auto.offset.reset': 'earliest',
  17. 'api.version.request': False,
  18. 'broker.version.fallback': '2.6.0',
  19. 'group.id': 'test'
  20. }
  21. self.consumer = Consumer(config)
  22. self.topic = topic
  23. def receive_msg(self):
  24. self.consumer.subscribe([self.topic])
  25. print(datetime.now())
  26. while True:
  27. msg = self.consumer.poll(timeout=30.0)
  28. print(msg)
  29. if __name__ == '__main__':
  30. thread_num = 10
  31. threads = [ChildThread("thread_" + str(i + 1), broker, topic) for i in ge(thread_num)]
  32. for i in range(thread_num):
  33. threads[i].setDaemon(True)
  34. for i in range(thread_num):
  35. threads[i].start()

  因为我使用的 Topic 共有8个分区,所以我分别测试了线程数在5个、8个和10个时消费50万数据所需要的时间,并计算每秒可消费的数据条数。

(2)多线程模型2

  测试代码:

  1. import time
  2. from datetime import datetime
  3. from confluent_kafka import Consumer
  4. from threadpool import ThreadPool, makeRequests
  5. class KafkaConsumerTool:
  6. def __init__(self, broker, topic):
  7. config = {
  8. 'bootstrap.servers': broker,
  9. 'session.timeout.ms': 30000,
  10. 'auto.offset.reset': 'earliest',
  11. 'api.version.request': False,
  12. 'broker.version.fallback': '2.6.0',
  13. 'group.id': 'mini-spider'
  14. }
  15. self.consumer = Consumer(config)
  16. self.topic = topic
  17. def receive_msg(self, x):
  18. self.consumer.subscribe([self.topic])
  19. print(datetime.now())
  20. while True:
  21. msg = self.consumer.poll(timeout=30.0)
  22. print(msg)
  23. if __name__ == '__main__':
  24. thread_num = 10
  25. consumer = KafkaConsumerTool(broker, topic)
  26. pool = ThreadPool(thread_num)
  27. for r in makeRequests(consumer.receive_msg, [i for i in range(thread_num)]):
  28. pool.putRequest(r)
  29. pool.wait()

  主要使用 threadpool 这个第三方库来实现线程池,此处当然也可以使用其他库来实现,这里我分别测试了线程数量在5个和10个时消费50万数据所需要的时间,并计算每秒可消费的数据条数。

测试结果

  • 多线程模型1
    总数据量/万线程数量总耗时/秒每秒数据条数5052718518.515082420833.3350102619230.76

  • 多线程模型2
    总数据量/万线程数量总耗时/秒每秒数据条数5051729411.7650101338461.53

    测试结论

  使用多线程可以有效提高 Kafka 的 Consumer 消费数据的效率,而选用线程池共用一个 KafkaConsumer 的消费方式的消费效率更高。

标签: kafka java 分布式

本文转载自: https://blog.csdn.net/u012206617/article/details/128727595
版权归原作者 墨痕诉清风 所有, 如有侵权,请联系我们删除。

“Python Kafka客户端性能测试比较”的评论:

还没有评论