0


kafka client for go

关于 go 的 kafka client 有很多开源项目,例如

  1. sarama: 具有完整协议支持的纯 Go 实现。包括消费者和生产者实施,支持 GZIP 和 Snappy 压缩。
  2. confluent-kafka-go: Confluent 的 Golang Kafka 客户端包装了 librdkafka C 库,提供完整的 Kafka 协议支持,具有出色的性能和可靠性。提供了高级生产者和消费者,支持 Apache Kafka 0.9 及更高版本的平衡消费者组。
  3. franz-go: 纯 Go 实现提供完整的协议支持、出色的性能以及对所有面向客户端的 KI P 的支持。

本文使用 sarama 作为 kafka client,并尽可能去贴合 java 的书写习惯来实现 kafka 的 producer 和 consumer,例如:producer 的异步回调和 consumer 异步提交。当然 sarama 也支持 producer 的事务。首先引入 sarama 依赖

go get github.com/Shopify/sarama

one: producer

sarama 提供两种 producer 的实现:

AsyncProducer

SyncProducer

,即同步和异步的生产者,本文只介绍

AsyncProducer

的使用,通过

sarama.NewAsyncProducer

获取生产者实例,其中要求传入两个参数

funcNewAsyncProducer(addrs []string, conf *Config)(AsyncProducer,error){
    client, err :=NewClient(addrs, conf)if err !=nil{returnnil, err
    }returnnewAsyncProducer(client)}
  • addrs: 即为 kafka broker 地址,对于 kafka 集群可以全部都填,也可以只填一个,保证有一个地址可用即可
  • conf: 为 sarama 封装的生产者、消费者、客户端、网络与一体的配置,通过sarama.NewConfig快速获取默认值的配置文件

对于生产者我们通常修改的配置就是 ack 了

// 生产者默认配置
config := sarama.NewConfig()// 配置 ack
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.RequiredAcks

的类型本质上是 int16,所以习惯

0,1,-1

这种配置也可以直接赋值,但为了消除魔数的影响,建议使用 sarama 封装的常数

const(
    NoResponse RequiredAcks =0
  
    WaitForLocal RequiredAcks =1
  
    WaitForAll RequiredAcks =-1)

如果需要开启开启异步回调,则需要开启

config.Producer.Return.Successes

config.Producer.Return.Errors

,例如

// 开启异步回调
config.Producer.Return.Successes =true
config.Producer.Return.Errors =true

生产者内部对 Successes 和 Errors 分别维护两个 channel,下面是

AsyncProducer

需要实现的方法

type AsyncProducer interface{AsyncClose()Close()errorInput()chan<-*ProducerMessage

    Successes()<-chan*ProducerMessage

    Errors()<-chan*ProducerError

    IsTransactional()boolTxnStatus() ProducerTxnStatusFlag

    BeginTxn()errorCommitTxn()errorAbortTxn()errorAddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string)errorAddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string)error}

当生产者发送消息的正常回调信息会投递到

Successes

的 channel 中,错误回调信息会投递到

Errors

的 channel 中。需要注意的是如果开启了异步回调则一定要去消费这两个 channel 的数据否则会阻塞 producer,同时下面是

AsyncProducer

实现者的初始化

p :=&asyncProducer{
  client:          client,
  conf:            client.Config(),
  errors:make(chan*ProducerError),
  input:make(chan*ProducerMessage),
  successes:make(chan*ProducerMessage),
  retries:make(chan*ProducerMessage),
  brokers:make(map[*Broker]*brokerProducer),
  brokerRefs:make(map[*brokerProducer]int),
  txnmgr:          txnmgr,
  metricsRegistry:newCleanupRegistry(client.Config().MetricRegistry),}

可以看到

Successes

Errors

是一个同步队列,因此对这两个 channel 的回调一定不要有过重的逻辑,例如:

// 搞一个上下文用于退出异步回调
ctx, cancelFunc := context.WithCancel(context.Background())defercancelFunc()// 异步回调gofunc(){
EXITED:for{select{case message :=<-producer.Successes():
      logger.Println("success: ", message.Value, message.Partition, message.Offset)case err :=<-producer.Errors():
      logger.Println("errors: ", err.Err.Error())case<-ctx.Done():
      logger.Println("exited:", ctx.Err())break EXITED
    }}}()

Tips: 如何想要查看 sarama 内部日志,可以通过下面方式将其输入到控制台,默认是

io.Discard

相当于/dev/null

sarama.Logger = log.New(os.Stdout,"[Sarama] ", log.LstdFlags)

而发送消息则是向

Input

的 channel 发送数据即可,我们只需要构建特定的消息结构体即可

type ProducerMessage struct{
    Topic string

    Key Encoder

    Value Encoder

    Headers []RecordHeader

    Metadata interface{}// Below this point are filled in by the producer as the message is processed
    Offset int64

    Partition int32
  
    Timestamp time.Time

    retries        int
    flags          flagSet
    expectation    chan*ProducerError
    sequenceNumber int32
    producerEpoch  int16
    hasSequence    bool}

用户只需要构建该结构体导出字段中的

Topic

Key

Value

Headers

以及

Metadata

,对于

Metadata

是 sarama 内部维护的一个字段,在发送数据时会被忽略用作在回调时进行填充,也就是说它不是 kafka 所需要的字段只在客户端内部扭转。而

Offset

Partition

Timestamp

则是在处理消息时 sarama 自动进行填充。

因此发送消息伪代码

// 发送数据for i :=0; i < maxRecordNum; i++{
  producer.Input()<-&sarama.ProducerMessage{
    Topic: topic,
    Value: sarama.StringEncoder(fmt.Sprintf("srama-%v", i)),}}

最终完整代码如下:

package example

import("context""fmt""github.com/Shopify/sarama""log""os""time")var maxRecordNum =1000var topic ="sarama"// SimpleProducer 简单生产生funcSimpleProducer(){
    brokers :=[]string{"127.0.0.1:9092"}// 生产者默认配置
    config := sarama.NewConfig()
    config.ClientID ="demo-producer"// 配置 ack
    config.Producer.RequiredAcks = sarama.WaitForAll
    // 开启异步回调
    config.Producer.Return.Successes =true
    config.Producer.Return.Errors =true// 开启日志
    logger := log.New(os.Stdout,"[Sarama] ", log.LstdFlags)
    sarama.Logger = logger
    producer, err := sarama.NewAsyncProducer(brokers, config)if err !=nil{panic(err)}deferfunc(){_= producer.Close()}()// 搞一个上下文用于退出异步回调
    ctx, cancelFunc := context.WithCancel(context.Background())defercancelFunc()// 异步回调gofunc(){
    EXITED:for{select{case message :=<-producer.Successes():
                logger.Println("success: ", message.Value, message.Partition, message.Offset)case err :=<-producer.Errors():
                logger.Println("errors: ", err.Err.Error())case<-ctx.Done():
                logger.Println("exited:", ctx.Err())break EXITED
            }}}()// 发送数据for i :=0; i < maxRecordNum; i++{
        producer.Input()<-&sarama.ProducerMessage{
            Topic: topic,
            Value: sarama.StringEncoder(fmt.Sprintf("srama-%v", i)),}}// 等 5 s
    time.Sleep(5* time.Second)}

这里的等待是为了将回调信息全部打印完毕,经过研究发现当调用

producer.Close()

时会将内部维护的 channel 都关闭

func(p *asyncProducer)shutdown(){
    Logger.Println("Producer shutting down.")
    p.inFlight.Add(1)
    p.input <-&ProducerMessage{flags: shutdown}

    p.inFlight.Wait()

    err := p.client.Close()if err !=nil{
        Logger.Println("producer/shutdown failed to close the embedded client:", err)}close(p.input)close(p.retries)close(p.errors)close(p.successes)

    p.metricsRegistry.UnregisterAll()}

因此可以使用

sync.WaitGroup

进行改造

package example

import("fmt""github.com/Shopify/sarama""log""os""sync")var maxRecordNum =1000var topic ="sarama"// SimpleProducer 简单生产生funcSimpleProducer(){
    brokers :=[]string{"127.0.0.1:9092"}// 生产者默认配置
    config := sarama.NewConfig()
    config.ClientID ="demo-producer"// 配置 ack
    config.Producer.RequiredAcks = sarama.WaitForAll
    // 开启异步回调
    config.Producer.Return.Successes =true
    config.Producer.Return.Errors =true// 开启日志
    logger := log.New(os.Stdout,"[Sarama] ", log.LstdFlags)
    sarama.Logger = logger
    producer, err := sarama.NewAsyncProducer(brokers, config)if err !=nil{panic(err)}var wg sync.WaitGroup
    // 异步回调gofunc(){
        wg.Add(1)for message :=range producer.Successes(){
            logger.Println("success: ", message.Value, message.Partition, message.Offset)}
        wg.Done()}()gofunc(){
        wg.Add(1)for err :=range producer.Errors(){
            logger.Println("errors: ", err.Err.Error())}
        wg.Done()}()// 发送数据for i :=0; i < maxRecordNum; i++{
        producer.Input()<-&sarama.ProducerMessage{
            Topic: topic,
            Value: sarama.StringEncoder(fmt.Sprintf("srama-%v", i)),}}// 关闭生产者_= producer.Close()// 等待处理完所有的回调信息
    wg.Wait()}

改造后的代码架构变得清晰了,但依然需要等待,因为如何发送完立刻关闭 producer,此时还有很多消息并没有来得及回调写入 channel 中。因此还是需要结合业务场景来考虑如何去接收所有的回调,有更好思路的同学我们也可以沟通一下。

two: consumer

sarama 的 consumer 将会变得复杂一点,需要我们去实现它的消费 handler 接口,同时提供

Consumer

ConsumerGroup

两种,不在赘述消费者和消费者组的概念,这里使用

ConsumerGroup

来演示。创建方式和 producer 基本一致

var groupID ="sarama-consumer"var brokers =[]string{"127.0.0.1:9092"}var config = sarama.NewConfig()
sarama.NewConsumerGroup(brokers, groupID, config)

对于 consumer 来说通常的配置就是关闭自动提交和偏移量初始化策略(oldest或latest),配置如下

// 关闭自动提交
config.Consumer.Offsets.AutoCommit.Enable =false
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.Initial

和 ack 一致,本质上是 int64,这里同样是为了消除魔数

下面开始消费数据,其方法如下

Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler)error
  • ctx: 上下文对象,因为消费逻辑在 goroutine 中进行,需要使用 context 来控制
  • topics: 需要消费的 topic
  • handler: 封装消费逻辑

对于 ctx 为了让 consumer 优雅退出

// 搞一个上下文用于终止消费者
ctx, cancelFunc := context.WithCancel(context.Background())// 监听终止信号gofunc(){
  quit :=make(chan os.Signal,1)
  signal.Notify(quit, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)<-quit
  cancelFunc()}()

当监听到系统终止等信号时执行

cancelFunc

停止

handler

的消费逻辑,我们需要实现

ConsumerGroupHandler

的接口

type ConsumerGroupHandler interface{// Setup is run at the beginning of a new session, before ConsumeClaim.Setup(ConsumerGroupSession)error// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited// but before the offsets are committed for the very last time.Cleanup(ConsumerGroupSession)error// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().// Once the Messages() channel is closed, the Handler must finish its processing// loop and exit.ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim)error}

其执行顺序是 Setup -> Loop(ConsumeClaim) -> Cleanup

创建一个空结构体去实现这个接口,暂时先不关注

Setup

Cleanup
type Consumer struct{}func(c *Consumer)ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim)error{
EXIT:for{select{case message :=<-claim.Messages():
            log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s",string(message.Value), message.Timestamp, message.Topic)// 标记消息,并不是提交偏移量
            session.MarkMessage(message,"")// 同步提交
      session.Commit()case<-session.Context().Done():
            log.Println("cancel consumer")break EXIT
        }}returnnil}

Tips:

session.MarkMessage(message, "")

是很重要的操作,标记当前消息已经处理完成并在内部记录当前分区的偏移量,但并不是提交偏移量,Mark 的作用相当于 flink 的状态,当调用

session.Commit()

时才会将内部保存的 offset 提交到 kafka 中。为了确保消息不丢失一定是先处理数据再标记消息

为了提高效率需要将

session.Commit()

进行异步处理,因此在

Setup

中启动一个 goroutine 专门用于异步提交,当消息处理完告知这个 goroutine 进行一次 Commit,因此需要维护一个 channel

var asyncOffset chanstruct{}var wg sync.WaitGroup // 后面再解释const defaultOffsetChannelSize = math.MaxInt

func(c *Consumer)Setup(session sarama.ConsumerGroupSession)error{// 初始化异步提交的channel
    asyncOffset =make(chanstruct{}, defaultOffsetChannelSize)
    wg.Add(1)// 异步提交偏移量gofunc(){forrange asyncOffset {
            session.Commit()}
        wg.Done()}()returnnil}
Cleanup

用于关闭通道

func(c *Consumer)Cleanup(_ sarama.ConsumerGroupSession)error{// 关闭通道close(asyncOffset)returnnil}
ConsumeClaim

则触发提交偏移量

func(c *Consumer)ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim)error{
EXIT:for{select{case message :=<-claim.Messages():
            log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s",string(message.Value), message.Timestamp, message.Topic)// 标记消息,并不是提交偏移量
            session.MarkMessage(message,"")// 异步提交
            asyncOffset <-struct{}{}case<-session.Context().Done():
            log.Println("cancel consumer")break EXIT
        }}returnnil}

Tips: 空 struct{} 在 go 中是不占用内存空间的

为了当客户端停止消费时所有的偏移量都能提交则使用

sync.WaitGroup

,最终完整代码如下

package example

import("context""github.com/Shopify/sarama""log""math""os""os/signal""sync""syscall")var groupID ="sarama-consumer"var asyncOffset chanstruct{}var wg sync.WaitGroup

const defaultOffsetChannelSize = math.MaxInt

funcSimpleConsumer(){
    brokers :=[]string{"127.0.0.1:9092"}// 消费者配置
    config := sarama.NewConfig()// 关闭自动提交
    config.Consumer.Offsets.AutoCommit.Enable =false
    config.Consumer.Offsets.Initial = sarama.OffsetOldest
    // 开启日志
    logger := log.New(os.Stdout,"[Sarama] ", log.LstdFlags)
    sarama.Logger = logger
    consumer, err := sarama.NewConsumerGroup(brokers, groupID, config)if err !=nil{panic(err)}deferfunc(){_= consumer.Close()}()// 搞一个上下文用于终止消费者
    ctx, cancelFunc := context.WithCancel(context.Background())// 监听终止信号gofunc(){
        logger.Println("monitor signal")
        quit :=make(chan os.Signal,1)
        signal.Notify(quit, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)<-quit
        logger.Println("stop consumer")cancelFunc()}()// 消费数据
    err = consumer.Consume(ctx,[]string{topic},&Consumer{})if err !=nil{panic(err)}// 等待所有偏移量都提交完毕再退出
    logger.Println("当前存在未提交的偏移量")
    wg.Wait()}type Consumer struct{}func(c *Consumer)Setup(session sarama.ConsumerGroupSession)error{// 初始化异步提交的channel
    asyncOffset =make(chanstruct{}, defaultOffsetChannelSize)
    wg.Add(1)// 异步提交偏移量gofunc(){forrange asyncOffset {
            session.Commit()}
        wg.Done()}()returnnil}func(c *Consumer)Cleanup(_ sarama.ConsumerGroupSession)error{// 关闭通道close(asyncOffset)returnnil}func(c *Consumer)ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim)error{
EXIT:for{select{case message :=<-claim.Messages():
            log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s",string(message.Value), message.Timestamp, message.Topic)// 标记消息,并不是提交偏移量
            session.MarkMessage(message,"")// 异步提交
            asyncOffset <-struct{}{}case<-session.Context().Done():
            log.Println("cancel consumer")break EXIT
        }}returnnil}

Tips: 代码以上传至 github 中: https://github.com/kpretty/kafka-client-go

标签: kafka golang 分布式

本文转载自: https://blog.csdn.net/qq_41858402/article/details/130525368
版权归原作者 小王是个弟弟 所有, 如有侵权,请联系我们删除。

“kafka client for go”的评论:

还没有评论