0


Java轻松使用Kafka生产者,消费者

Java轻松使用Kafka生产者,消费者

一、环境说明

  1. 项目中需要下面的依赖:(版本自定义)
<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>1.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.2</version>
        </dependency>

2.yml配置文件设置

 kafka:
    bootstrap-servers: ip:端口
    jaas:
      enabled: false
    listener:
      type: single
      concurrency: 3
    consumer:
#      key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      group-id: group-id数据
      auto-offset-reset: latest
      enable-auto-commit: false
      max-poll-records: 100

  # kafka topic
  task:
    execute:
      topic: topic名称

二、生产者

1.简单生产者的书写:

@Component
public class SendKafkaUtil {

    @Autowired
    KafkaTemplate<Object, Object> kafkaTemplate;

    @Value("${spring.task.execute.topic}")
    private String topic;

    public void sendMessageKafka() {
        kafkaTemplate.send(topic, "{json数据}");
    }

}

三、消费者

Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。

1.简单消费者的书写:

 @KafkaListener(topics = {"${spring.kafka.topic}"}, groupId = "${spring.kafka.consumer.group-id}")
    public void processMessage(List<ConsumerRecord<String, String>> records){
        logger.info("kafka消费消息数量:" + records.size());
}

2.消费者批量消费的书写(批量控制:max-poll-records: 100)

@Bean
    public KafkaListenerContainerFactory<?> batchFactory(KafkaProperties properties) {
      Map<String, Object> consumerProperties = properties.buildConsumerProperties();
      ConcurrentKafkaListenerContainerFactory<String, String> factory = new
        ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProperties));
      factory.setBatchListener(true); // 开启批量监听
      return factory;
    }
  }
  /**
   * 消费者1
   * 批处理统一方法
   *
   * @param records
   */

  @KafkaListener(topics = {"${spring.task.execute.topic}"},containerFactory = "batchFactory",topicPartitions = {
    @TopicPartition(partitions = {"0"}, topic = "${spring.task.execute.topic}") })
  public void consumer1(List<ConsumerRecord<String, Object>> records) throws IOException {
    log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
    log.info("Id1 records size " +  records.size());
    // todo数据逻辑处理
  }
  /**
   * 消费者2
   * 批处理统一方法
   *
   * @param records
   */

  @KafkaListener(topics = {"${spring.task.execute.topic}"}, containerFactory = "batchFactory",topicPartitions = {
    @TopicPartition(partitions = {"1"}, topic = "${spring.task.execute.topic}") })
  public void consumer2(List<ConsumerRecord<String, Object>> records) throws IOException {
    log.info("Id2 Listener, Thread ID: " + Thread.currentThread().getId());
    log.info("Id2 records size " + records.size());
   // todo数据逻辑处理
  }

注:多消费者时,需要对应kafka中配置的分区;多少的Partition就有多少个消费者,以免资源浪费

标签: kafka java 分布式

本文转载自: https://blog.csdn.net/qq_38946379/article/details/127441511
版权归原作者 束曙 所有, 如有侵权,请联系我们删除。

“Java轻松使用Kafka生产者,消费者”的评论:

还没有评论