0


kafka实现延迟队列

前言

首先说一下延迟队列这个东西,实际上实现他的方法有很多,kafka实现并不是一个最好的选择,例如redis的zset可以实现,rocketmq天然的可以实现,rabbitmq也可以实现。如果切换前几种方案成本高的情况下,那么就使用kafka实现,实际上kafka实现延迟队列也是借用了rocketmq的延迟队列思想,rocketmq的延迟时间是固定的几个,并不是自定义的,但是kafka可以实现自定义的延迟时间,但是不能过多,因为是依据topic实现的,接下来我使用go实现简单的kafka的延迟队列。

实现方案

1、首先创建两个topic、一个delayTopic、一个realTopic

2、生产者把消息先发送到delayTopic

3、延迟服务再把delayTopic里面的消息超过我们所设置的时间写入到realTopic

4、消费者再消费realTopic里面的数据即可

具体实现

1、生产者发送消息到延迟队列
  1. msg :=&sarama.ProducerMessage{
  2. Topic: kafka.DelayTopic,
  3. Timestamp: time.Now(),
  4. Key: sarama.StringEncoder("rta_key"),
  5. Value: sarama.StringEncoder(riStr),}
  6. partition, offset, err := kafka.KafkaDelayQueue.SendMessage(msg)
2、延迟服务的消费者(消费延迟队列里面的数据到real队列)
  1. const(
  2. DelayTime = time.Minute *5
  3. DelayTopic ="delayTopic"
  4. RealTopic ="realTopic")// KafkaDelayQueueProducer 延迟队列生产者,包含了生产者和延迟服务type KafkaDelayQueueProducer struct{
  5. producer sarama.SyncProducer // 生产者
  6. delayTopic string// 延迟服务主题}// NewKafkaDelayQueueProducer 创建延迟队列生产者// producer 生产者// delayServiceConsumerGroup 延迟服务消费者组// delayTime 延迟时间// delayTopic 延迟服务主题// realTopic 真实队列主题funcNewKafkaDelayQueueProducer(producer sarama.SyncProducer, delayServiceConsumerGroup sarama.ConsumerGroup,
  7. delayTime time.Duration, delayTopic, realTopic string, log *log)*KafkaDelayQueueProducer {var(
  8. signals =make(chan os.Signal,1))
  9. signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, os.Interrupt)// 启动延迟服务
  10. consumer :=NewDelayServiceConsumer(producer, delayTime, realTopic, log)
  11. log.Info("[NewKafkaDelayQueueProducer] delay queue consumer start")gofunc(){for{if err := delayServiceConsumerGroup.Consume(context.Background(),[]string{delayTopic}, consumer); err !=nil{
  12. log.Error("[NewKafkaDelayQueueProducer] delay queue consumer failed,err: ", zap.Error(err))break}
  13. time.Sleep(2* time.Second)
  14. log.Info("[NewKafkaDelayQueueProducer] 检测消费函数是否一直执行")// 检查是否接收到中断信号,如果是则退出循环select{case sin :=<-signals:
  15. consumer.Logger.Info("[NewKafkaDelayQueueProducer]get signal,", zap.Any("signal", sin))returndefault:}}
  16. log.Info("[NewKafkaDelayQueueProducer] consumer func exit")}()
  17. log.Info("[NewKafkaDelayQueueProducer] return KafkaDelayQueueProducer")return&KafkaDelayQueueProducer{
  18. producer: producer,
  19. delayTopic: delayTopic,}}// SendMessage 发送消息func(q *KafkaDelayQueueProducer)SendMessage(msg *sarama.ProducerMessage)(partition int32, offset int64, err error){
  20. msg.Topic = q.delayTopic
  21. return q.producer.SendMessage(msg)}// DelayServiceConsumer 延迟服务消费者type DelayServiceConsumer struct{
  22. producer sarama.SyncProducer
  23. delay time.Duration
  24. realTopic string
  25. Logger *log.DomobLog
  26. }funcNewDelayServiceConsumer(producer sarama.SyncProducer, delay time.Duration,
  27. realTopic string, log *log.DomobLog)*DelayServiceConsumer {return&DelayServiceConsumer{
  28. producer: producer,
  29. delay: delay,
  30. realTopic: realTopic,
  31. Logger: log,}}func(c *DelayServiceConsumer)ConsumeClaim(session sarama.ConsumerGroupSession,
  32. claim sarama.ConsumerGroupClaim)error{
  33. c.Logger.Info("[delaye ConsumerClaim] cc")for message :=range claim.Messages(){// 如果消息已经超时,把消息发送到真实队列
  34. now := time.Now()
  35. c.Logger.Info("[delay ConsumeClaim] out",
  36. zap.Any("send real topic res", now.Sub(message.Timestamp)>= c.delay),
  37. zap.Any("message.Timestamp", message.Timestamp),
  38. zap.Any("c.delay", c.delay),
  39. zap.Any("claim.Messages len",len(claim.Messages())),
  40. zap.Any("sub:", now.Sub(message.Timestamp)),
  41. zap.Any("meskey:", message.Key),
  42. zap.Any("message:",string(message.Value)),)if now.Sub(message.Timestamp)>= c.delay {
  43. c.Logger.Info("[delay ConsumeClaim] jinlai", zap.Any("mes",string(message.Value)))_,_, err := c.producer.SendMessage(&sarama.ProducerMessage{
  44. Topic: c.realTopic,
  45. Timestamp: message.Timestamp,
  46. Key: sarama.ByteEncoder(message.Key),
  47. Value: sarama.ByteEncoder(message.Value),})if err !=nil{
  48. c.Logger.Info("[delay ConsumeClaim] delay already send to real topic failed", zap.Error(err))returnnil}if err ==nil{
  49. session.MarkMessage(message,"")
  50. c.Logger.Info("[delay ConsumeClaim] delay already send to real topic success")continue}}// 否则休眠一秒
  51. time.Sleep(time.Second)returnnil}
  52. c.Logger.Info("[delay ConsumeClaim] ph",
  53. zap.Any("partitiion", claim.Partition()),
  54. zap.Any("HighWaterMarkOffset", claim.HighWaterMarkOffset()))
  55. c.Logger.Info("[delay ConsumeClaim] delay consumer end")returnnil}func(c *DelayServiceConsumer)Setup(sarama.ConsumerGroupSession)error{returnnil}func(c *DelayServiceConsumer)Cleanup(sarama.ConsumerGroupSession)error{returnnil}

这个方法整体逻辑就是不断消费延迟队列里面的消息,判断消息时间是否大于现在,如果大于现在说明消息超时了,就把该消息发送到真实的队列里面去了,真实队列是一直在消费的。如果没超时的话就不会标记消息,还会重新消费,消费成功会标记该消息。

重点:我在测试的时候是一秒拉一次消息,但这个也不是太准时,不过最终结果差距不大,想知道具体怎么消费的可以自己debug

3、真实队列里面的消费逻辑
  1. type ConsumerRta struct{
  2. Logger *log
  3. }funcConsumerToRequestRta(consumerGroup sarama.ConsumerGroup, lg *log){var(
  4. signals =make(chan os.Signal,1)
  5. wg =&sync.WaitGroup{})
  6. signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, os.Interrupt)
  7. wg.Add(1)// 启动消费者协程gofunc(){defer wg.Done()
  8. consumer :=NewConsumerRta(lg)
  9. consumer.Logger.Info("[ConsumerToRequestRta] consumer group start")// 执行消费者组消费for{if err := consumerGroup.Consume(context.Background(),[]string{kafka.RealTopic}, consumer); err !=nil{
  10. consumer.Logger.Error("[ConsumerToRequestRta] Error from consumer group:", zap.Error(err))break}
  11. time.Sleep(2* time.Second)// 等待一段时间后重试// 检查是否接收到中断信号,如果是则退出循环select{case sin :=<-signals:
  12. consumer.Logger.Info("get signal,", zap.Any("signal", sin))returndefault:}}}()
  13. wg.Wait()
  14. lg.Info("[ConsumerToRequestRta] consumer end & exit")}funcNewConsumerRta(lg *log)*ConsumerRta {return&ConsumerRta{
  15. Logger: lg,}}func(c *ConsumerRta)ConsumeClaim(session sarama.ConsumerGroupSession,
  16. claim sarama.ConsumerGroupClaim)error{for message :=range claim.Messages(){// 消费逻辑
  17. session.MarkMessage(message,"")returnnil}returnnil}func(c *ConsumerRta)Setup(sarama.ConsumerGroupSession)error{returnnil}func(c *ConsumerRta)Cleanup(sarama.ConsumerGroupSession)error{returnnil}
4、kafka配置
  1. type KafkaConfig struct{
  2. BrokerList []string
  3. Topic []string
  4. GroupId []string
  5. Cfg *sarama.Config
  6. PemPath string
  7. KeyPath string
  8. CaPemPath string}var(
  9. Producer sarama.SyncProducer
  10. ConsumerGroupReal sarama.ConsumerGroup
  11. ConsumerGroupDelay sarama.ConsumerGroup
  12. KafkaDelayQueue *KafkaDelayQueueProducer
  13. )funcNewKafkaConfig(cfg KafkaConfig)(err error){
  14. Producer, err = sarama.NewSyncProducer(cfg.BrokerList, cfg.Cfg)if err !=nil{return err
  15. }
  16. ConsumerGroupReal, err = sarama.NewConsumerGroup(cfg.BrokerList, cfg.GroupId[0], cfg.Cfg)if err !=nil{return err
  17. }
  18. ConsumerGroupDelay, err = sarama.NewConsumerGroup(cfg.BrokerList, cfg.GroupId[1], cfg.Cfg)if err !=nil{return err
  19. }returnnil}funcGetKafkaDelayQueue(log *log){
  20. KafkaDelayQueue =NewKafkaDelayQueueProducer(Producer, ConsumerGroupDelay, DelayTime, DelayTopic, RealTopic, log)}

这个里面我没有怎么封装,可以自行封装,使用的是IBM的sarama客户端

总结

基本上就是以上三步实现,里面的一些log日志可以传递自己的log日志即可,使用的是消费者组消费的,添加上自己的topic和groupid即可

重点:以上实现延迟时间可能不是太精准,我使用的时候还是有点小小的误差,不过误差不大,强相关业务还是使用其他专业实现延迟队列mq,或使用自行方案

标签: kafka 分布式 golang

本文转载自: https://blog.csdn.net/weixin_51991615/article/details/135959182
版权归原作者 走,我们去吹风 所有, 如有侵权,请联系我们删除。

“kafka实现延迟队列”的评论:

还没有评论