0


kafka client for go

关于 go 的 kafka client 有很多开源项目,例如

  1. sarama: 具有完整协议支持的纯 Go 实现。包括消费者和生产者实施,支持 GZIP 和 Snappy 压缩。
  2. confluent-kafka-go: Confluent 的 Golang Kafka 客户端包装了 librdkafka C 库,提供完整的 Kafka 协议支持,具有出色的性能和可靠性。提供了高级生产者和消费者,支持 Apache Kafka 0.9 及更高版本的平衡消费者组。
  3. franz-go: 纯 Go 实现提供完整的协议支持、出色的性能以及对所有面向客户端的 KI P 的支持。

本文使用 sarama 作为 kafka client,并尽可能去贴合 java 的书写习惯来实现 kafka 的 producer 和 consumer,例如:producer 的异步回调和 consumer 异步提交。当然 sarama 也支持 producer 的事务。首先引入 sarama 依赖

  1. go get github.com/Shopify/sarama

one: producer

sarama 提供两种 producer 的实现:

  1. AsyncProducer

  1. SyncProducer

,即同步和异步的生产者,本文只介绍

  1. AsyncProducer

的使用,通过

  1. sarama.NewAsyncProducer

获取生产者实例,其中要求传入两个参数

  1. funcNewAsyncProducer(addrs []string, conf *Config)(AsyncProducer,error){
  2. client, err :=NewClient(addrs, conf)if err !=nil{returnnil, err
  3. }returnnewAsyncProducer(client)}
  • addrs: 即为 kafka broker 地址,对于 kafka 集群可以全部都填,也可以只填一个,保证有一个地址可用即可
  • conf: 为 sarama 封装的生产者、消费者、客户端、网络与一体的配置,通过sarama.NewConfig快速获取默认值的配置文件

对于生产者我们通常修改的配置就是 ack 了

  1. // 生产者默认配置
  2. config := sarama.NewConfig()// 配置 ack
  3. config.Producer.RequiredAcks = sarama.WaitForAll
  1. config.Producer.RequiredAcks

的类型本质上是 int16,所以习惯

  1. 0,1,-1

这种配置也可以直接赋值,但为了消除魔数的影响,建议使用 sarama 封装的常数

  1. const(
  2. NoResponse RequiredAcks =0
  3. WaitForLocal RequiredAcks =1
  4. WaitForAll RequiredAcks =-1)

如果需要开启开启异步回调,则需要开启

  1. config.Producer.Return.Successes

  1. config.Producer.Return.Errors

,例如

  1. // 开启异步回调
  2. config.Producer.Return.Successes =true
  3. config.Producer.Return.Errors =true

生产者内部对 Successes 和 Errors 分别维护两个 channel,下面是

  1. AsyncProducer

需要实现的方法

  1. type AsyncProducer interface{AsyncClose()Close()errorInput()chan<-*ProducerMessage
  2. Successes()<-chan*ProducerMessage
  3. Errors()<-chan*ProducerError
  4. IsTransactional()boolTxnStatus() ProducerTxnStatusFlag
  5. BeginTxn()errorCommitTxn()errorAbortTxn()errorAddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string)errorAddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string)error}

当生产者发送消息的正常回调信息会投递到

  1. Successes

的 channel 中,错误回调信息会投递到

  1. Errors

的 channel 中。需要注意的是如果开启了异步回调则一定要去消费这两个 channel 的数据否则会阻塞 producer,同时下面是

  1. AsyncProducer

实现者的初始化

  1. p :=&asyncProducer{
  2. client: client,
  3. conf: client.Config(),
  4. errors:make(chan*ProducerError),
  5. input:make(chan*ProducerMessage),
  6. successes:make(chan*ProducerMessage),
  7. retries:make(chan*ProducerMessage),
  8. brokers:make(map[*Broker]*brokerProducer),
  9. brokerRefs:make(map[*brokerProducer]int),
  10. txnmgr: txnmgr,
  11. metricsRegistry:newCleanupRegistry(client.Config().MetricRegistry),}

可以看到

  1. Successes

  1. Errors

是一个同步队列,因此对这两个 channel 的回调一定不要有过重的逻辑,例如:

  1. // 搞一个上下文用于退出异步回调
  2. ctx, cancelFunc := context.WithCancel(context.Background())defercancelFunc()// 异步回调gofunc(){
  3. EXITED:for{select{case message :=<-producer.Successes():
  4. logger.Println("success: ", message.Value, message.Partition, message.Offset)case err :=<-producer.Errors():
  5. logger.Println("errors: ", err.Err.Error())case<-ctx.Done():
  6. logger.Println("exited:", ctx.Err())break EXITED
  7. }}}()

Tips: 如何想要查看 sarama 内部日志,可以通过下面方式将其输入到控制台,默认是

  1. io.Discard

相当于/dev/null

  1. sarama.Logger = log.New(os.Stdout,"[Sarama] ", log.LstdFlags)

而发送消息则是向

  1. Input

的 channel 发送数据即可,我们只需要构建特定的消息结构体即可

  1. type ProducerMessage struct{
  2. Topic string
  3. Key Encoder
  4. Value Encoder
  5. Headers []RecordHeader
  6. Metadata interface{}// Below this point are filled in by the producer as the message is processed
  7. Offset int64
  8. Partition int32
  9. Timestamp time.Time
  10. retries int
  11. flags flagSet
  12. expectation chan*ProducerError
  13. sequenceNumber int32
  14. producerEpoch int16
  15. hasSequence bool}

用户只需要构建该结构体导出字段中的

  1. Topic

  1. Key

  1. Value

  1. Headers

以及

  1. Metadata

,对于

  1. Metadata

是 sarama 内部维护的一个字段,在发送数据时会被忽略用作在回调时进行填充,也就是说它不是 kafka 所需要的字段只在客户端内部扭转。而

  1. Offset

  1. Partition

  1. Timestamp

则是在处理消息时 sarama 自动进行填充。

因此发送消息伪代码

  1. // 发送数据for i :=0; i < maxRecordNum; i++{
  2. producer.Input()<-&sarama.ProducerMessage{
  3. Topic: topic,
  4. Value: sarama.StringEncoder(fmt.Sprintf("srama-%v", i)),}}

最终完整代码如下:

  1. package example
  2. import("context""fmt""github.com/Shopify/sarama""log""os""time")var maxRecordNum =1000var topic ="sarama"// SimpleProducer 简单生产生funcSimpleProducer(){
  3. brokers :=[]string{"127.0.0.1:9092"}// 生产者默认配置
  4. config := sarama.NewConfig()
  5. config.ClientID ="demo-producer"// 配置 ack
  6. config.Producer.RequiredAcks = sarama.WaitForAll
  7. // 开启异步回调
  8. config.Producer.Return.Successes =true
  9. config.Producer.Return.Errors =true// 开启日志
  10. logger := log.New(os.Stdout,"[Sarama] ", log.LstdFlags)
  11. sarama.Logger = logger
  12. producer, err := sarama.NewAsyncProducer(brokers, config)if err !=nil{panic(err)}deferfunc(){_= producer.Close()}()// 搞一个上下文用于退出异步回调
  13. ctx, cancelFunc := context.WithCancel(context.Background())defercancelFunc()// 异步回调gofunc(){
  14. EXITED:for{select{case message :=<-producer.Successes():
  15. logger.Println("success: ", message.Value, message.Partition, message.Offset)case err :=<-producer.Errors():
  16. logger.Println("errors: ", err.Err.Error())case<-ctx.Done():
  17. logger.Println("exited:", ctx.Err())break EXITED
  18. }}}()// 发送数据for i :=0; i < maxRecordNum; i++{
  19. producer.Input()<-&sarama.ProducerMessage{
  20. Topic: topic,
  21. Value: sarama.StringEncoder(fmt.Sprintf("srama-%v", i)),}}// 等 5 s
  22. time.Sleep(5* time.Second)}

这里的等待是为了将回调信息全部打印完毕,经过研究发现当调用

  1. producer.Close()

时会将内部维护的 channel 都关闭

  1. func(p *asyncProducer)shutdown(){
  2. Logger.Println("Producer shutting down.")
  3. p.inFlight.Add(1)
  4. p.input <-&ProducerMessage{flags: shutdown}
  5. p.inFlight.Wait()
  6. err := p.client.Close()if err !=nil{
  7. Logger.Println("producer/shutdown failed to close the embedded client:", err)}close(p.input)close(p.retries)close(p.errors)close(p.successes)
  8. p.metricsRegistry.UnregisterAll()}

因此可以使用

  1. sync.WaitGroup

进行改造

  1. package example
  2. import("fmt""github.com/Shopify/sarama""log""os""sync")var maxRecordNum =1000var topic ="sarama"// SimpleProducer 简单生产生funcSimpleProducer(){
  3. brokers :=[]string{"127.0.0.1:9092"}// 生产者默认配置
  4. config := sarama.NewConfig()
  5. config.ClientID ="demo-producer"// 配置 ack
  6. config.Producer.RequiredAcks = sarama.WaitForAll
  7. // 开启异步回调
  8. config.Producer.Return.Successes =true
  9. config.Producer.Return.Errors =true// 开启日志
  10. logger := log.New(os.Stdout,"[Sarama] ", log.LstdFlags)
  11. sarama.Logger = logger
  12. producer, err := sarama.NewAsyncProducer(brokers, config)if err !=nil{panic(err)}var wg sync.WaitGroup
  13. // 异步回调gofunc(){
  14. wg.Add(1)for message :=range producer.Successes(){
  15. logger.Println("success: ", message.Value, message.Partition, message.Offset)}
  16. wg.Done()}()gofunc(){
  17. wg.Add(1)for err :=range producer.Errors(){
  18. logger.Println("errors: ", err.Err.Error())}
  19. wg.Done()}()// 发送数据for i :=0; i < maxRecordNum; i++{
  20. producer.Input()<-&sarama.ProducerMessage{
  21. Topic: topic,
  22. Value: sarama.StringEncoder(fmt.Sprintf("srama-%v", i)),}}// 关闭生产者_= producer.Close()// 等待处理完所有的回调信息
  23. wg.Wait()}

改造后的代码架构变得清晰了,但依然需要等待,因为如何发送完立刻关闭 producer,此时还有很多消息并没有来得及回调写入 channel 中。因此还是需要结合业务场景来考虑如何去接收所有的回调,有更好思路的同学我们也可以沟通一下。

two: consumer

sarama 的 consumer 将会变得复杂一点,需要我们去实现它的消费 handler 接口,同时提供

  1. Consumer

  1. ConsumerGroup

两种,不在赘述消费者和消费者组的概念,这里使用

  1. ConsumerGroup

来演示。创建方式和 producer 基本一致

  1. var groupID ="sarama-consumer"var brokers =[]string{"127.0.0.1:9092"}var config = sarama.NewConfig()
  2. sarama.NewConsumerGroup(brokers, groupID, config)

对于 consumer 来说通常的配置就是关闭自动提交和偏移量初始化策略(oldest或latest),配置如下

  1. // 关闭自动提交
  2. config.Consumer.Offsets.AutoCommit.Enable =false
  3. config.Consumer.Offsets.Initial = sarama.OffsetOldest
  1. config.Consumer.Offsets.Initial

和 ack 一致,本质上是 int64,这里同样是为了消除魔数

下面开始消费数据,其方法如下

  1. Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler)error
  • ctx: 上下文对象,因为消费逻辑在 goroutine 中进行,需要使用 context 来控制
  • topics: 需要消费的 topic
  • handler: 封装消费逻辑

对于 ctx 为了让 consumer 优雅退出

  1. // 搞一个上下文用于终止消费者
  2. ctx, cancelFunc := context.WithCancel(context.Background())// 监听终止信号gofunc(){
  3. quit :=make(chan os.Signal,1)
  4. signal.Notify(quit, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)<-quit
  5. cancelFunc()}()

当监听到系统终止等信号时执行

  1. cancelFunc

停止

  1. handler

的消费逻辑,我们需要实现

  1. ConsumerGroupHandler

的接口

  1. type ConsumerGroupHandler interface{// Setup is run at the beginning of a new session, before ConsumeClaim.Setup(ConsumerGroupSession)error// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited// but before the offsets are committed for the very last time.Cleanup(ConsumerGroupSession)error// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().// Once the Messages() channel is closed, the Handler must finish its processing// loop and exit.ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim)error}

其执行顺序是 Setup -> Loop(ConsumeClaim) -> Cleanup

创建一个空结构体去实现这个接口,暂时先不关注

  1. Setup

  1. Cleanup
  1. type Consumer struct{}func(c *Consumer)ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim)error{
  2. EXIT:for{select{case message :=<-claim.Messages():
  3. log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s",string(message.Value), message.Timestamp, message.Topic)// 标记消息,并不是提交偏移量
  4. session.MarkMessage(message,"")// 同步提交
  5. session.Commit()case<-session.Context().Done():
  6. log.Println("cancel consumer")break EXIT
  7. }}returnnil}

Tips:

  1. session.MarkMessage(message, "")

是很重要的操作,标记当前消息已经处理完成并在内部记录当前分区的偏移量,但并不是提交偏移量,Mark 的作用相当于 flink 的状态,当调用

  1. session.Commit()

时才会将内部保存的 offset 提交到 kafka 中。为了确保消息不丢失一定是先处理数据再标记消息

为了提高效率需要将

  1. session.Commit()

进行异步处理,因此在

  1. Setup

中启动一个 goroutine 专门用于异步提交,当消息处理完告知这个 goroutine 进行一次 Commit,因此需要维护一个 channel

  1. var asyncOffset chanstruct{}var wg sync.WaitGroup // 后面再解释const defaultOffsetChannelSize = math.MaxInt
  2. func(c *Consumer)Setup(session sarama.ConsumerGroupSession)error{// 初始化异步提交的channel
  3. asyncOffset =make(chanstruct{}, defaultOffsetChannelSize)
  4. wg.Add(1)// 异步提交偏移量gofunc(){forrange asyncOffset {
  5. session.Commit()}
  6. wg.Done()}()returnnil}
  1. Cleanup

用于关闭通道

  1. func(c *Consumer)Cleanup(_ sarama.ConsumerGroupSession)error{// 关闭通道close(asyncOffset)returnnil}
  1. ConsumeClaim

则触发提交偏移量

  1. func(c *Consumer)ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim)error{
  2. EXIT:for{select{case message :=<-claim.Messages():
  3. log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s",string(message.Value), message.Timestamp, message.Topic)// 标记消息,并不是提交偏移量
  4. session.MarkMessage(message,"")// 异步提交
  5. asyncOffset <-struct{}{}case<-session.Context().Done():
  6. log.Println("cancel consumer")break EXIT
  7. }}returnnil}

Tips: 空 struct{} 在 go 中是不占用内存空间的

为了当客户端停止消费时所有的偏移量都能提交则使用

  1. sync.WaitGroup

,最终完整代码如下

  1. package example
  2. import("context""github.com/Shopify/sarama""log""math""os""os/signal""sync""syscall")var groupID ="sarama-consumer"var asyncOffset chanstruct{}var wg sync.WaitGroup
  3. const defaultOffsetChannelSize = math.MaxInt
  4. funcSimpleConsumer(){
  5. brokers :=[]string{"127.0.0.1:9092"}// 消费者配置
  6. config := sarama.NewConfig()// 关闭自动提交
  7. config.Consumer.Offsets.AutoCommit.Enable =false
  8. config.Consumer.Offsets.Initial = sarama.OffsetOldest
  9. // 开启日志
  10. logger := log.New(os.Stdout,"[Sarama] ", log.LstdFlags)
  11. sarama.Logger = logger
  12. consumer, err := sarama.NewConsumerGroup(brokers, groupID, config)if err !=nil{panic(err)}deferfunc(){_= consumer.Close()}()// 搞一个上下文用于终止消费者
  13. ctx, cancelFunc := context.WithCancel(context.Background())// 监听终止信号gofunc(){
  14. logger.Println("monitor signal")
  15. quit :=make(chan os.Signal,1)
  16. signal.Notify(quit, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)<-quit
  17. logger.Println("stop consumer")cancelFunc()}()// 消费数据
  18. err = consumer.Consume(ctx,[]string{topic},&Consumer{})if err !=nil{panic(err)}// 等待所有偏移量都提交完毕再退出
  19. logger.Println("当前存在未提交的偏移量")
  20. wg.Wait()}type Consumer struct{}func(c *Consumer)Setup(session sarama.ConsumerGroupSession)error{// 初始化异步提交的channel
  21. asyncOffset =make(chanstruct{}, defaultOffsetChannelSize)
  22. wg.Add(1)// 异步提交偏移量gofunc(){forrange asyncOffset {
  23. session.Commit()}
  24. wg.Done()}()returnnil}func(c *Consumer)Cleanup(_ sarama.ConsumerGroupSession)error{// 关闭通道close(asyncOffset)returnnil}func(c *Consumer)ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim)error{
  25. EXIT:for{select{case message :=<-claim.Messages():
  26. log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s",string(message.Value), message.Timestamp, message.Topic)// 标记消息,并不是提交偏移量
  27. session.MarkMessage(message,"")// 异步提交
  28. asyncOffset <-struct{}{}case<-session.Context().Done():
  29. log.Println("cancel consumer")break EXIT
  30. }}returnnil}

Tips: 代码以上传至 github 中: https://github.com/kpretty/kafka-client-go

标签: kafka golang 分布式

本文转载自: https://blog.csdn.net/qq_41858402/article/details/130525368
版权归原作者 小王是个弟弟 所有, 如有侵权,请联系我们删除。

“kafka client for go”的评论:

还没有评论