关于 go 的 kafka client 有很多开源项目,例如
- sarama: 具有完整协议支持的纯 Go 实现。包括消费者和生产者实施,支持 GZIP 和 Snappy 压缩。
- confluent-kafka-go: Confluent 的 Golang Kafka 客户端包装了 librdkafka C 库,提供完整的 Kafka 协议支持,具有出色的性能和可靠性。提供了高级生产者和消费者,支持 Apache Kafka 0.9 及更高版本的平衡消费者组。
- franz-go: 纯 Go 实现提供完整的协议支持、出色的性能以及对所有面向客户端的 KI P 的支持。
本文使用 sarama 作为 kafka client,并尽可能去贴合 java 的书写习惯来实现 kafka 的 producer 和 consumer,例如:producer 的异步回调和 consumer 异步提交。当然 sarama 也支持 producer 的事务。首先引入 sarama 依赖
go get github.com/Shopify/sarama
one: producer
sarama 提供两种 producer 的实现:
AsyncProducer
和
SyncProducer
,即同步和异步的生产者,本文只介绍
AsyncProducer
的使用,通过
sarama.NewAsyncProducer
获取生产者实例,其中要求传入两个参数
funcNewAsyncProducer(addrs []string, conf *Config)(AsyncProducer,error){
client, err :=NewClient(addrs, conf)if err !=nil{returnnil, err
}returnnewAsyncProducer(client)}
- addrs: 即为 kafka broker 地址,对于 kafka 集群可以全部都填,也可以只填一个,保证有一个地址可用即可
- conf: 为 sarama 封装的生产者、消费者、客户端、网络与一体的配置,通过
sarama.NewConfig
快速获取默认值的配置文件
对于生产者我们通常修改的配置就是 ack 了
// 生产者默认配置
config := sarama.NewConfig()// 配置 ack
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.RequiredAcks
的类型本质上是 int16,所以习惯
0,1,-1
这种配置也可以直接赋值,但为了消除魔数的影响,建议使用 sarama 封装的常数
const(
NoResponse RequiredAcks =0
WaitForLocal RequiredAcks =1
WaitForAll RequiredAcks =-1)
如果需要开启开启异步回调,则需要开启
config.Producer.Return.Successes
和
config.Producer.Return.Errors
,例如
// 开启异步回调
config.Producer.Return.Successes =true
config.Producer.Return.Errors =true
生产者内部对 Successes 和 Errors 分别维护两个 channel,下面是
AsyncProducer
需要实现的方法
type AsyncProducer interface{AsyncClose()Close()errorInput()chan<-*ProducerMessage
Successes()<-chan*ProducerMessage
Errors()<-chan*ProducerError
IsTransactional()boolTxnStatus() ProducerTxnStatusFlag
BeginTxn()errorCommitTxn()errorAbortTxn()errorAddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string)errorAddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string)error}
当生产者发送消息的正常回调信息会投递到
Successes
的 channel 中,错误回调信息会投递到
Errors
的 channel 中。需要注意的是如果开启了异步回调则一定要去消费这两个 channel 的数据否则会阻塞 producer,同时下面是
AsyncProducer
实现者的初始化
p :=&asyncProducer{
client: client,
conf: client.Config(),
errors:make(chan*ProducerError),
input:make(chan*ProducerMessage),
successes:make(chan*ProducerMessage),
retries:make(chan*ProducerMessage),
brokers:make(map[*Broker]*brokerProducer),
brokerRefs:make(map[*brokerProducer]int),
txnmgr: txnmgr,
metricsRegistry:newCleanupRegistry(client.Config().MetricRegistry),}
可以看到
Successes
和
Errors
是一个同步队列,因此对这两个 channel 的回调一定不要有过重的逻辑,例如:
// 搞一个上下文用于退出异步回调
ctx, cancelFunc := context.WithCancel(context.Background())defercancelFunc()// 异步回调gofunc(){
EXITED:for{select{case message :=<-producer.Successes():
logger.Println("success: ", message.Value, message.Partition, message.Offset)case err :=<-producer.Errors():
logger.Println("errors: ", err.Err.Error())case<-ctx.Done():
logger.Println("exited:", ctx.Err())break EXITED
}}}()
Tips: 如何想要查看 sarama 内部日志,可以通过下面方式将其输入到控制台,默认是
io.Discard
相当于/dev/null
sarama.Logger = log.New(os.Stdout,"[Sarama] ", log.LstdFlags)
而发送消息则是向
Input
的 channel 发送数据即可,我们只需要构建特定的消息结构体即可
type ProducerMessage struct{
Topic string
Key Encoder
Value Encoder
Headers []RecordHeader
Metadata interface{}// Below this point are filled in by the producer as the message is processed
Offset int64
Partition int32
Timestamp time.Time
retries int
flags flagSet
expectation chan*ProducerError
sequenceNumber int32
producerEpoch int16
hasSequence bool}
用户只需要构建该结构体导出字段中的
Topic
、
Key
、
Value
、
Headers
以及
Metadata
,对于
Metadata
是 sarama 内部维护的一个字段,在发送数据时会被忽略用作在回调时进行填充,也就是说它不是 kafka 所需要的字段只在客户端内部扭转。而
Offset
、
Partition
和
Timestamp
则是在处理消息时 sarama 自动进行填充。
因此发送消息伪代码
// 发送数据for i :=0; i < maxRecordNum; i++{
producer.Input()<-&sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(fmt.Sprintf("srama-%v", i)),}}
最终完整代码如下:
package example
import("context""fmt""github.com/Shopify/sarama""log""os""time")var maxRecordNum =1000var topic ="sarama"// SimpleProducer 简单生产生funcSimpleProducer(){
brokers :=[]string{"127.0.0.1:9092"}// 生产者默认配置
config := sarama.NewConfig()
config.ClientID ="demo-producer"// 配置 ack
config.Producer.RequiredAcks = sarama.WaitForAll
// 开启异步回调
config.Producer.Return.Successes =true
config.Producer.Return.Errors =true// 开启日志
logger := log.New(os.Stdout,"[Sarama] ", log.LstdFlags)
sarama.Logger = logger
producer, err := sarama.NewAsyncProducer(brokers, config)if err !=nil{panic(err)}deferfunc(){_= producer.Close()}()// 搞一个上下文用于退出异步回调
ctx, cancelFunc := context.WithCancel(context.Background())defercancelFunc()// 异步回调gofunc(){
EXITED:for{select{case message :=<-producer.Successes():
logger.Println("success: ", message.Value, message.Partition, message.Offset)case err :=<-producer.Errors():
logger.Println("errors: ", err.Err.Error())case<-ctx.Done():
logger.Println("exited:", ctx.Err())break EXITED
}}}()// 发送数据for i :=0; i < maxRecordNum; i++{
producer.Input()<-&sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(fmt.Sprintf("srama-%v", i)),}}// 等 5 s
time.Sleep(5* time.Second)}
这里的等待是为了将回调信息全部打印完毕,经过研究发现当调用
producer.Close()
时会将内部维护的 channel 都关闭
func(p *asyncProducer)shutdown(){
Logger.Println("Producer shutting down.")
p.inFlight.Add(1)
p.input <-&ProducerMessage{flags: shutdown}
p.inFlight.Wait()
err := p.client.Close()if err !=nil{
Logger.Println("producer/shutdown failed to close the embedded client:", err)}close(p.input)close(p.retries)close(p.errors)close(p.successes)
p.metricsRegistry.UnregisterAll()}
因此可以使用
sync.WaitGroup
进行改造
package example
import("fmt""github.com/Shopify/sarama""log""os""sync")var maxRecordNum =1000var topic ="sarama"// SimpleProducer 简单生产生funcSimpleProducer(){
brokers :=[]string{"127.0.0.1:9092"}// 生产者默认配置
config := sarama.NewConfig()
config.ClientID ="demo-producer"// 配置 ack
config.Producer.RequiredAcks = sarama.WaitForAll
// 开启异步回调
config.Producer.Return.Successes =true
config.Producer.Return.Errors =true// 开启日志
logger := log.New(os.Stdout,"[Sarama] ", log.LstdFlags)
sarama.Logger = logger
producer, err := sarama.NewAsyncProducer(brokers, config)if err !=nil{panic(err)}var wg sync.WaitGroup
// 异步回调gofunc(){
wg.Add(1)for message :=range producer.Successes(){
logger.Println("success: ", message.Value, message.Partition, message.Offset)}
wg.Done()}()gofunc(){
wg.Add(1)for err :=range producer.Errors(){
logger.Println("errors: ", err.Err.Error())}
wg.Done()}()// 发送数据for i :=0; i < maxRecordNum; i++{
producer.Input()<-&sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(fmt.Sprintf("srama-%v", i)),}}// 关闭生产者_= producer.Close()// 等待处理完所有的回调信息
wg.Wait()}
改造后的代码架构变得清晰了,但依然需要等待,因为如何发送完立刻关闭 producer,此时还有很多消息并没有来得及回调写入 channel 中。因此还是需要结合业务场景来考虑如何去接收所有的回调,有更好思路的同学我们也可以沟通一下。
two: consumer
sarama 的 consumer 将会变得复杂一点,需要我们去实现它的消费 handler 接口,同时提供
Consumer
和
ConsumerGroup
两种,不在赘述消费者和消费者组的概念,这里使用
ConsumerGroup
来演示。创建方式和 producer 基本一致
var groupID ="sarama-consumer"var brokers =[]string{"127.0.0.1:9092"}var config = sarama.NewConfig()
sarama.NewConsumerGroup(brokers, groupID, config)
对于 consumer 来说通常的配置就是关闭自动提交和偏移量初始化策略(oldest或latest),配置如下
// 关闭自动提交
config.Consumer.Offsets.AutoCommit.Enable =false
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.Initial
和 ack 一致,本质上是 int64,这里同样是为了消除魔数
下面开始消费数据,其方法如下
Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler)error
- ctx: 上下文对象,因为消费逻辑在 goroutine 中进行,需要使用 context 来控制
- topics: 需要消费的 topic
- handler: 封装消费逻辑
对于 ctx 为了让 consumer 优雅退出
// 搞一个上下文用于终止消费者
ctx, cancelFunc := context.WithCancel(context.Background())// 监听终止信号gofunc(){
quit :=make(chan os.Signal,1)
signal.Notify(quit, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)<-quit
cancelFunc()}()
当监听到系统终止等信号时执行
cancelFunc
停止
handler
的消费逻辑,我们需要实现
ConsumerGroupHandler
的接口
type ConsumerGroupHandler interface{// Setup is run at the beginning of a new session, before ConsumeClaim.Setup(ConsumerGroupSession)error// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited// but before the offsets are committed for the very last time.Cleanup(ConsumerGroupSession)error// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().// Once the Messages() channel is closed, the Handler must finish its processing// loop and exit.ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim)error}
其执行顺序是 Setup -> Loop(ConsumeClaim) -> Cleanup
创建一个空结构体去实现这个接口,暂时先不关注
Setup
和
Cleanup
type Consumer struct{}func(c *Consumer)ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim)error{
EXIT:for{select{case message :=<-claim.Messages():
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s",string(message.Value), message.Timestamp, message.Topic)// 标记消息,并不是提交偏移量
session.MarkMessage(message,"")// 同步提交
session.Commit()case<-session.Context().Done():
log.Println("cancel consumer")break EXIT
}}returnnil}
Tips:
session.MarkMessage(message, "")
是很重要的操作,标记当前消息已经处理完成并在内部记录当前分区的偏移量,但并不是提交偏移量,Mark 的作用相当于 flink 的状态,当调用
session.Commit()
时才会将内部保存的 offset 提交到 kafka 中。为了确保消息不丢失一定是先处理数据再标记消息
为了提高效率需要将
session.Commit()
进行异步处理,因此在
Setup
中启动一个 goroutine 专门用于异步提交,当消息处理完告知这个 goroutine 进行一次 Commit,因此需要维护一个 channel
var asyncOffset chanstruct{}var wg sync.WaitGroup // 后面再解释const defaultOffsetChannelSize = math.MaxInt
func(c *Consumer)Setup(session sarama.ConsumerGroupSession)error{// 初始化异步提交的channel
asyncOffset =make(chanstruct{}, defaultOffsetChannelSize)
wg.Add(1)// 异步提交偏移量gofunc(){forrange asyncOffset {
session.Commit()}
wg.Done()}()returnnil}
Cleanup
用于关闭通道
func(c *Consumer)Cleanup(_ sarama.ConsumerGroupSession)error{// 关闭通道close(asyncOffset)returnnil}
ConsumeClaim
则触发提交偏移量
func(c *Consumer)ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim)error{
EXIT:for{select{case message :=<-claim.Messages():
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s",string(message.Value), message.Timestamp, message.Topic)// 标记消息,并不是提交偏移量
session.MarkMessage(message,"")// 异步提交
asyncOffset <-struct{}{}case<-session.Context().Done():
log.Println("cancel consumer")break EXIT
}}returnnil}
Tips: 空 struct{} 在 go 中是不占用内存空间的
为了当客户端停止消费时所有的偏移量都能提交则使用
sync.WaitGroup
,最终完整代码如下
package example
import("context""github.com/Shopify/sarama""log""math""os""os/signal""sync""syscall")var groupID ="sarama-consumer"var asyncOffset chanstruct{}var wg sync.WaitGroup
const defaultOffsetChannelSize = math.MaxInt
funcSimpleConsumer(){
brokers :=[]string{"127.0.0.1:9092"}// 消费者配置
config := sarama.NewConfig()// 关闭自动提交
config.Consumer.Offsets.AutoCommit.Enable =false
config.Consumer.Offsets.Initial = sarama.OffsetOldest
// 开启日志
logger := log.New(os.Stdout,"[Sarama] ", log.LstdFlags)
sarama.Logger = logger
consumer, err := sarama.NewConsumerGroup(brokers, groupID, config)if err !=nil{panic(err)}deferfunc(){_= consumer.Close()}()// 搞一个上下文用于终止消费者
ctx, cancelFunc := context.WithCancel(context.Background())// 监听终止信号gofunc(){
logger.Println("monitor signal")
quit :=make(chan os.Signal,1)
signal.Notify(quit, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)<-quit
logger.Println("stop consumer")cancelFunc()}()// 消费数据
err = consumer.Consume(ctx,[]string{topic},&Consumer{})if err !=nil{panic(err)}// 等待所有偏移量都提交完毕再退出
logger.Println("当前存在未提交的偏移量")
wg.Wait()}type Consumer struct{}func(c *Consumer)Setup(session sarama.ConsumerGroupSession)error{// 初始化异步提交的channel
asyncOffset =make(chanstruct{}, defaultOffsetChannelSize)
wg.Add(1)// 异步提交偏移量gofunc(){forrange asyncOffset {
session.Commit()}
wg.Done()}()returnnil}func(c *Consumer)Cleanup(_ sarama.ConsumerGroupSession)error{// 关闭通道close(asyncOffset)returnnil}func(c *Consumer)ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim)error{
EXIT:for{select{case message :=<-claim.Messages():
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s",string(message.Value), message.Timestamp, message.Topic)// 标记消息,并不是提交偏移量
session.MarkMessage(message,"")// 异步提交
asyncOffset <-struct{}{}case<-session.Context().Done():
log.Println("cancel consumer")break EXIT
}}returnnil}
Tips: 代码以上传至 github 中: https://github.com/kpretty/kafka-client-go
版权归原作者 小王是个弟弟 所有, 如有侵权,请联系我们删除。