0


SpringBoot整合Kafka简单配置实现生产消费

文章目录

*本文基于SpringBoot整合Kafka,通过简单配置实现生产及消费,包括生产消费的配置说明、消费者偏移设置方式等。更多功能细节可参考

spring kafka 文档:https://docs.spring.io/spring-kafka/docs/current/reference/html

前提条件

  • 搭建Kafka环境,参考Kafka集群环境搭建及使用
  • Java环境:JDK1.8
  • Maven版本:apache-maven-3.6.3
  • 开发工具:IntelliJ IDEA

项目环境

  1. 创建Springboot项目。
  2. pom.xml文件中引入kafka依赖。
<dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies>

创建Topic

创建topic命名为testtopic并指定2个分区。

./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create--topic testtopic --partitions2

配置信息

application.yml配置文件信息

spring:
  application:
    name: kafka_springboot
  kafka:
    bootstrap-servers:127.0.0.1:9092
    producer:
      #ACK机制,默认为1(0,1,-1)
      acks:-1
      key-serializer:org.apache.kafka.common.serialization.StringSerializer
      value-serializer:org.apache.kafka.common.serialization.StringSerializer
      properties:
        # 自定义分区策略
        partitioner:class:org.bg.kafka.PartitionPolicy

    consumer:
      #设置是否自动提交,默认为true
      enable-auto-commit:false
      key-deserializer:org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer:org.apache.kafka.common.serialization.StringDeserializer
      #当一个新的消费组或者消费信息丢失后,在哪里开始进行消费。earliest:消费最早的消息。latest(默认):消费最近可用的消息。none:没有找到消费组消费数据时报异常。
      auto-offset-reset: latest
      #批量消费时每次poll的数量
      #max-poll-records:5
    listener:
      #      当每一条记录被消费者监听器处理之后提交
      #      RECORD,
      #      当每一批数据被消费者监听器处理之后提交
      #      BATCH,
      #      当每一批数据被消费者监听器处理之后,距离上次提交时间大于TIME时提交
      #      TIME,
      #      当每一批数据被消费者监听器处理之后,被处理record数量大于等于COUNT时提交
      #      COUNT,
      #      #TIME|COUNT 有一个条件满足时提交
      #      COUNT_TIME,
      #      #当每一批数据被消费者监听器处理之后,手动调用Acknowledgment.acknowledge()后提交:
      #      MANUAL,
      #      # 手动调用Acknowledgment.acknowledge()后立即提交
      #      MANUAL_IMMEDIATE;
      ack-mode: manual
      #批量消费
      type: batch

更多配置信息查看KafkaProperties

生产消息

@ComponentpublicclassProducer{@AutowiredprivateKafkaTemplate kafkaTemplate;publicvoidsend(String msg){
        kafkaTemplate.send(newProducerRecord<String,String>("testtopic","key111", msg));}}

生产自定义分区策略

packageorg.bg.kafka;importorg.apache.kafka.clients.producer.Partitioner;importorg.apache.kafka.common.Cluster;importorg.apache.kafka.common.PartitionInfo;importorg.apache.kafka.common.utils.Utils;importjava.util.List;importjava.util.Map;importjava.util.concurrent.ConcurrentHashMap;importjava.util.concurrent.ConcurrentMap;importjava.util.concurrent.ThreadLocalRandom;importjava.util.concurrent.atomic.AtomicInteger;publicclassPartitionPolicyimplementsPartitioner{privatefinalConcurrentMap<String,AtomicInteger> topicCounterMap =newConcurrentHashMap();@Overridepublicintpartition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster){List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if(keyBytes ==null){int nextValue =this.nextValue(topic);List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if(availablePartitions.size()>0){int part =Utils.toPositive(nextValue)% availablePartitions.size();return((PartitionInfo)availablePartitions.get(part)).partition();}else{returnUtils.toPositive(nextValue)% numPartitions;}}else{returnUtils.toPositive(Utils.murmur2(keyBytes))% numPartitions;}}privateintnextValue(String topic){AtomicInteger counter =(AtomicInteger)this.topicCounterMap.get(topic);if(null== counter){
            counter =newAtomicInteger(ThreadLocalRandom.current().nextInt());AtomicInteger currentCounter =(AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);if(currentCounter !=null){
                counter = currentCounter;}}return counter.getAndIncrement();}@Overridepublicvoidclose(){}@Overridepublicvoidconfigure(Map<String,?> map){}}

生产到指定分区

ProducerRecord有指定分区的构造方法,设置分区号

public ProducerRecord(String topic, Integer partition, K key, V value) 

kafkaTemplate.send(newProducerRecord<String,String>("testtopic",1,"key111", msg));

消费消息

/**
 * 自定义seek参考
 * https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek
 */@ComponentpublicclassConsumerimplementsConsumerSeekAware{@KafkaListener(topics ={"testtopic"},groupId ="test_group",clientIdPrefix ="bg",id ="testconsumer")publicvoidonMessage(List<ConsumerRecord<String,String>> records,Acknowledgment ack){System.out.println(records.size());System.out.println(records.toString());
        ack.acknowledge();}@OverridepublicvoidonPartitionsAssigned(Map<TopicPartition,Long> assignments,ConsumerSeekCallback callback){//按照时间戳设置偏移
        callback.seekToTimestamp(assignments.keySet(),1670233826705L);//设置偏移到最近
        callback.seekToEnd(assignments.keySet());//设置偏移到最开始
        callback.seekToBeginning(assignments.keySet());//指定 offsetfor(TopicPartition topicPartition : assignments.keySet()){
            callback.seek(topicPartition.topic(),topicPartition.partition(),0L);}}}

offset设置方式

如代码所示,实现ConsumerSeekAware接口,设置offset几种方式:

  • 指定 offset,需要自己维护 offset,方便重试。
  • 指定从头开始消费。
  • 指定 offset 为最近可用的 offset (默认)。
  • 根据时间戳获取 offset,设置 offset。

代码仓库

https://gitee.com/codeWBG/learn_kafka

标签: kafka spring boot java

本文转载自: https://blog.csdn.net/qq_28314431/article/details/128190795
版权归原作者 叫我二蛋 所有, 如有侵权,请联系我们删除。

“SpringBoot整合Kafka简单配置实现生产消费”的评论:

还没有评论