0


springboot kafka配置与使用

springboot kafka配置与使用

引入spring-kafka依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

application配置

可以根据情况只配置生产着或消费者

spring:kafka:# 以逗号分隔的地址列表,用于建立与 Kafka 集群的初始连接 (kafka 默认的端口号为 9092)bootstrap-servers: ip:port,ip:port,ip:port
    # 生产者配置producer:# 消息重发的次数retries:0# 一个批次可以使用的内存大小batch-size:16384# 设置生产者内存缓冲区的大小buffer-memory:33444432# 键的序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      acks: all

    # 消费者配置consumer:max:poll:records:1broker-id:0# 自动提交的时间间隔 在 spring boot 2.X 版本中这里采用的是值的类型为 Duration 需要符合特定的格式,如 1S,1M,2H,5Dauto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是 true,为了避免出现重复数据和数据丢失,可以把它设置为 false,然后手动提交偏移量enable-auto-commit:false# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 更换消费组就可以从头消费数据,同一个消费组用户不会重复消费数据# groupid可以消费的时候再指定group-id: capb_group
    listener:# 在侦听器容器中运行的线程数。concurrency:3# 手工ack,调用ack后立刻提交offsetack-mode: manual_immediate

向topic发送数据

privatestaticfinalStringTOPIC_NAME="my_topic_name";@TestpublicvoidtestSendKafka(){
    kafkaTemplate.send(TOPIC_NAME,"{\"json\":\"ok\"}").addCallback(success ->{String topic = success.getRecordMetadata().topic();int partition = success.getRecordMetadata().partition();long offset = success.getRecordMetadata().offset();System.out.println("发送成功:topic="+ topic +", partition="+ partition +", offset="+ offset);}, failue ->{System.out.println("发送消息失败:"+ failue.getMessage());});}

KafkaListener消费topic数据

每个groupId都可以完整消费指定topic的所有数据,要想重新消费所有数据可以更换groupid组

// 可以在配置文件中灵活配置// @KafkaListener(topics = {"my_topic_name"}, groupId = "my_groupId")@KafkaListener(topics ={"my_topic_name"})publicvoidonMessage(ConsumerRecord<?,?> record,Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC)String topic){
    log.info("消费消息:"+ record.topic()+"----"+ record.partition()+"----"+ record.value());System.out.println(JSON.parseObject(record.value().toString(),KafkaEvent.class));// 消费完后手动通知已消费,在上面有配置
    ack.acknowledge();}
标签: kafka spring boot java

本文转载自: https://blog.csdn.net/a2272062968/article/details/128040881
版权归原作者 WalkingWithTheWind~ 所有, 如有侵权,请联系我们删除。

“springboot kafka配置与使用”的评论:

还没有评论