0


springboot 连接 kafka集群(kafka版本 2.13-3.4.0)

springboot 连接 kafka集群

一、环境搭建

1.1 springboot 环境

JDK 11+
Maven 3.8.x+
springboot 2.5.4 +

1.2 kafka 依赖

springboot的pom文件导入

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version></dependency>

二、 kafka 配置类

2.1 发布者

2.1.1 配置

发布者我们使用 KafkaTemplate 来进行消息发布,所以需要先对其进行一些必要的配置。

@Configuration@EnableKafkapublicclassKafkaConfig{/***** 发布者 *****///生产者工厂@BeanpublicProducerFactory<Integer,String>producerFactory(){returnnewDefaultKafkaProducerFactory<>(producerConfigs());}//生产者配置@BeanpublicMap<String,Object>producerConfigs(){Map<String,Object> props =newHashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);return props;}//生产者模板@BeanpublicKafkaTemplate<Integer,String>kafkaTemplate(){returnnewKafkaTemplate<>(producerFactory());}}

2.1.2 构建发布者类

配置完发布者,下来就是发布消息,我们需要继承 ProducerListener<K, V> 接口,该接口完整信息如下:

publicinterfaceProducerListener<K,V>{voidonSuccess(ProducerRecord<K,V> producerRecord,RecordMetadata recordMetadata);voidonError(ProducerRecord<K,V> producerRecord,RecordMetadata recordMetadata,Exception exception);}

实现该接口的方法,我们可以获取包含发送结果(成功或失败)的异步回调,也就是可以在这个接口的实现中获取发送结果。

我们简单的实现构建一个发布者类,接收主题和发布消息参数,并打印发布结果。

@ComponentpublicclassKafkaProducerimplementsProducerListener<Object,Object>{privatestaticfinalLogger producerlog =LoggerFactory.getLogger(KafkaProducer.class);privatefinalKafkaTemplate<Integer,String> kafkaTemplate;publicKafkaProducer(KafkaTemplate<Integer,String> kafkaTemplate){this.kafkaTemplate = kafkaTemplate;}publicvoid producer (String msg,String topic){ListenableFuture<SendResult<Integer,String>> future = kafkaTemplate.send(topic,0, msg);
        future.addCallback(newKafkaSendCallback<Integer,String>(){@OverridepublicvoidonSuccess(SendResult<Integer,String> result){
                producerlog.info("发送成功 {}", result);}@OverridepublicvoidonFailure(KafkaProducerException ex){ProducerRecord<Integer,String> failed = ex.getFailedProducerRecord();
                producerlog.info("发送失败 {}",failed);}});}}

2.1.3 发布消息

写一个controller类来测试我们构建的发布者类,这个类中打印接收到的消息,来确保信息接收不出问题。

@RestControllerpublicclassKafkaTestController{privatestaticfinalLogger kafkaTestLog =LoggerFactory.getLogger(KafkaTestController.class);@ResourceprivateKafkaProducer kafkaProducer;@GetMapping("/kafkaTest")publicvoidkafkaTest(String msg,String topic){
        kafkaProducer.producer(msg,topic);
        kafkaTestLog.info("接收到消息 {} {}",msg,topic);}}

一切准备就绪,我们启动程序利用postman来进行简单的测试。

进行消息发布:
在这里插入图片描述

发布结果:
在这里插入图片描述
可以看到消息发送成功。

我们再看看kafka消费者有没有接收到消息:

在这里插入图片描述

看以看到,kakfa的消费者也接收到了消息。

2.2 消费者

2.2.1 配置

消息的接受有多种方式,我们这里选择的是使用 @KafkaListener 注解来进行消息接收。它的使用像下面这样:

publicclassListener{@KafkaListener(id ="foo", topics ="myTopic", clientIdPrefix ="myClientId")publicvoidlisten(String data){...}}

看起来不是太难吧,但使用这个注解,我们需要配置底层 ConcurrentMessageListenerContainer.kafkaListenerContainerFactor。

我们在原来的kafka配置类 KafkaConfig 中,继续配置消费者,大概就像下面这样

@Configuration@EnableKafkapublicclassKafkaConfig{/***** 发布者 *****///生产者工厂@BeanpublicProducerFactory<Integer,String>producerFactory(){returnnewDefaultKafkaProducerFactory<>(producerConfigs());}//生产者配置@BeanpublicMap<String,Object>producerConfigs(){Map<String,Object> props =newHashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);return props;}//生产者模板@BeanpublicKafkaTemplate<Integer,String>kafkaTemplate(){returnnewKafkaTemplate<>(producerFactory());}/***** 消费者 *****///容器监听工厂@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,String>>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<Integer,String> factory =newConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);return factory;}//消费者工厂@BeanpublicConsumerFactory<Integer,String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}//消费者配置@BeanpublicMap<String,Object>consumerConfigs(){Map<String,Object> props =newHashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.2.83:9092,192.168.2.84:9092,192.168.2.86:9092");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS,JsonDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,JsonDeserializer.class.getName());
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);return props;}}

注意,要设置容器属性必须使用getContainerProperties()工厂方法。它用作注入容器的实际属性的模板

2.2.2 构建消费者类

配置好后,我们就可以使用这个注解了。这个注解的使用有多种方式:

1、用它来覆盖容器工厂的concurrency和属性

@KafkaListener(id ="myListener", topics ="myTopic",
        autoStartup ="${listen.auto.start:true}", concurrency ="${listen.concurrency:3}")publicvoidlisten(String data){...}

2、可以使用显式主题和分区(以及可选的初始偏移量)

@KafkaListener(id ="thing2", topicPartitions ={@TopicPartition(topic ="topic1", partitions ={"0","1"}),@TopicPartition(topic ="topic2", partitions ="0",
             partitionOffsets =@PartitionOffset(partition ="1", initialOffset ="100"))})publicvoidlisten(ConsumerRecord<?,?>record){...}

3、将初始偏移应用于所有已分配的分区

@KafkaListener(id ="thing3", topicPartitions ={@TopicPartition(topic ="topic1", partitions ={"0","1"},
             partitionOffsets =@PartitionOffset(partition ="*", initialOffset ="0"))})publicvoidlisten(ConsumerRecord<?,?>record){...}

4、指定以逗号分隔的分区列表或分区范围

@KafkaListener(id ="pp", autoStartup ="false",
        topicPartitions =@TopicPartition(topic ="topic1",
                partitions ="0-5, 7, 10-15"))publicvoidprocess(String in){...}

5、可以向侦听器提供Acknowledgment

@KafkaListener(id ="cat", topics ="myTopic",
          containerFactory ="kafkaManualAckListenerContainerFactory")publicvoidlisten(String data,Acknowledgment ack){...
    ack.acknowledge();}

6、添加标头

@KafkaListener(id ="list", topics ="myTopic", containerFactory ="batchFactory")publicvoidlisten(List<String> list,@Header(KafkaHeaders.RECEIVED_KEY)List<Integer> keys,@Header(KafkaHeaders.RECEIVED_PARTITION)List<Integer> partitions,@Header(KafkaHeaders.RECEIVED_TOPIC)List<String> topics,@Header(KafkaHeaders.OFFSET)List<Long> offsets){...}

我们这里写一个简单的,只用它来接受指定主题的数据:

@ComponentpublicclassKafkaConsumer{privatestaticfinalLogger consumerlog =LoggerFactory.getLogger(KafkaConsumer.class);@KafkaListener(topicPartitions  =@TopicPartition(topic ="kafka-topic-test",
            partitions ="0"))publicvoid consumer (String data){
        consumerlog.info("消费者接收数据 {}",data);}}

这里解释一下,因为我们进行了手动分配主题/分区,所以 注解中group.id 可以为空。若要指定group.id请在消费者配置中加上props.put(ConsumerConfig.GROUP_ID_CONFIG, “bzt001”); 或在 @TopicPartition 注解后加上 groupId = “组id”

2.2.3 进行消息消费

继续使用postman调用我们写好的发布者发布消息,观察控制台的消费者类是否有相关日志出现。
在这里插入图片描述

标签: kafka spring boot java

本文转载自: https://blog.csdn.net/qq_35241329/article/details/131119920
版权归原作者 timi先生 所有, 如有侵权,请联系我们删除。

“springboot 连接 kafka集群(kafka版本 2.13-3.4.0)”的评论:

还没有评论