0


springboot的kafka动态工具类(动态创建topic、监听和监听方法)

springboot的kafka动态工具类(动态创建topic、监听和监听方法)

一、使用场景

需要动态创建topic,然后动态创建该topic的监听容器,同时可以指定该监听容器的处理方法,避免增删监听topic时需要重启操作等情况。

很多情况下,使用kafka一般都会主动创建好队列(Topic)和消费者监听(@KafkaListener),特别是监听者,一般都是动态创建好后,然后使用@KafkaListener指定Topic后创建。

上述情况的优点在于:可以明确topic和消费者,启动时程序主动就创建好对应topic的消费容器和消费方法,直接消费即可。

缺点:如果需要监听新的topic,则需要添加@KafkaListener的配置并且重新启动项目,对于灵活性要求高或者线上的程序是比较麻烦的。

二、工具类概述

所以基于上述情况,为了更加灵活的创建和使用Kafka的topic和listener,专门写了一个kafka相关的工具类:

topic相关的包含了:topic创建、删除、列表、是否存在等方法。

Listener相关包含了:容器的创建、启动、停止、暂停、恢复等方法。

这里有个概念需要先了解下,监听容器里有两个状态,可以简单理解为:一个是容器的运行状态running,一个是容器的监听状态pauseRequest,再容器运行状态开启的基础上,监听状态开启,才能够正常消费消息。

三、代码展示

  1. 那么老规矩,万事先依赖:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId
</dependency>
  1. 然后是配置文件类KafkaConfig,设置kafka相关配置
importorg.apache.kafka.clients.admin.AdminClient;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.common.serialization.StringDeserializer;importorg.apache.kafka.common.serialization.StringSerializer;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.kafka.annotation.EnableKafka;importorg.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;importorg.springframework.kafka.core.*;importjava.util.HashMap;importjava.util.Map;importjava.util.Properties;/**
 *@ClassName KafkaConfig
 *@Description: TODO kafka的配置类
 **/@Configuration@EnableKafkapublicclassKafkaConfig{privatestaticfinalString kafkaServer ="kafka-ip:9092";//kafka地址/**
     * @Title producerFactory
     * @Description TODO 生产者工厂类,设置生产者相关配置
     * @return org.springframework.kafka.core.ProducerFactory<java.lang.String,java.lang.Object>
     */@BeanpublicProducerFactory<String,Object>producerFactory(){Map<String,Object> props =newHashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);//kafka 地址
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);//序列化
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);//序列化
        props.put(ProducerConfig.ACKS_CONFIG,"all");//确认机制,all是所有副本确认,1是一个副本确认,0是不需要副本确认
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,"10");//批量发送大小
        props.put(ProducerConfig.LINGER_MS_CONFIG,"1");//批量发送等待时间  和上面的batch-size谁先到先发送returnnewDefaultKafkaProducerFactory<>(props);}/**
     * @Title kafkaTemplate
     * @Description TODO kafka生产者工具类
     * @return org.springframework.kafka.core.KafkaTemplate<java.lang.String,java.lang.Object>
     */@BeanpublicKafkaTemplate<String,Object>kafkaTemplate(){returnnewKafkaTemplate<>(producerFactory());}/**
     * @Title consumerFactory
     * @Description TODO 消费者工厂类,配置消费者的一些配置
     * @return org.springframework.kafka.core.ConsumerFactory<java.lang.String,java.lang.Object>
     */@BeanpublicConsumerFactory<String,Object>consumerFactory(){Map<String,Object> props =newHashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,5*1024*1024);//每次抓取消息的大小
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//是否自动提交
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,50*1000*1000);//请求超时时间
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,100);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,60000000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);returnnewDefaultKafkaConsumerFactory<>(props);}/**
     * @Title kafkaListenerContainerFactory
     * @Description TODO 监听容器的工厂类,创建监听容器时使用
     * @return org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory<java.lang.String,java.lang.Object>
     */@BeanpublicConcurrentKafkaListenerContainerFactory<String,Object>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,Object> factory =newConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());return factory;}/**
     * @Title adminClient
     * @Description TODO kafka客户端
     * @return org.apache.kafka.clients.admin.AdminClient
     */@BeanpublicAdminClientadminClient(){Properties props =newProperties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);AdminClient adminClient =AdminClient.create(props);return adminClient;}}
  1. kafka的相关配置设置好了,就可以使用工具类KafkaUtil了
importlombok.extern.slf4j.Slf4j;importorg.apache.kafka.clients.admin.*;importorg.apache.kafka.common.config.ConfigResource;importorg.apache.kafka.common.config.TopicConfig;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.config.KafkaListenerContainerFactory;importorg.springframework.kafka.config.KafkaListenerEndpointRegistry;importorg.springframework.kafka.config.MethodKafkaListenerEndpoint;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.kafka.listener.MessageListenerContainer;importorg.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;importorg.springframework.stereotype.Component;importjava.lang.reflect.Method;importjava.util.Collections;importjava.util.Set;/**
 * @ClassName: KafkaUtil
 * @Description: TODO 用于创建kafka Topic队列和listener监听容器的工具类
 **/@Component@Slf4jpublicclassKafkaUtil{privatestaticAdminClient adminClient;privatestaticKafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;privatestaticKafkaTemplate kafkaTemplate;/**
     * @Title KafkaUtil
     * @Description 构造函数注入
     * @param adminClient kafka客户端对象
     * @param kafkaListenerEndpointRegistry kafka监听容器注册对象
     * @param kafkaListenerEndpointRegistry kafka生产者工具类
     * @return
     */@AutowiredpublicKafkaUtil(AdminClient adminClient,KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry,KafkaTemplate kafkaTemplate){KafkaUtil.adminClient = adminClient;KafkaUtil.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;KafkaUtil.kafkaTemplate = kafkaTemplate;}//region topic相关方法/**
     * @Title createTopic
     * @Description 创建kafka topic
     * @param topicName topic名
     * @param partitions 分区数
     * @param replicas 副本数(short)
     * @return void
     */publicstaticvoidcreateTopic(String topicName,int partitions,short replicas)throwsException{NewTopic newTopic =newNewTopic(topicName, partitions, replicas);CreateTopicsResult topics = adminClient.createTopics(Collections.singleton(newTopic));
        topics.all().get();
        log.info("【{}】topic创建成功", topicName);}/**
     * @Title deleteTopic
     * @Description 删除topic
     * @param topicName  topic名称
     * @return void
     */publicstaticvoiddeleteTopic(String topicName)throwsException{DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singleton(topicName));
        deleteTopicsResult.all().get();
        log.info("【{}】topic删除成功", topicName);}/**
     * @Title updateTopicRetention
     * @Description 修改topic的过期时间
     * @param topicName  topic名称
     * @param ms  过期时间(毫秒值)
     * @return void
     */publicstaticvoidupdateTopicRetention(String topicName,String ms)throwsException{ConfigResource resource =newConfigResource(ConfigResource.Type.TOPIC, topicName);ConfigEntry configEntry =newConfigEntry(TopicConfig.RETENTION_MS_CONFIG, ms);Config config =newConfig(Collections.singleton(configEntry));// 创建AlterConfigsOptionsAlterConfigsOptions alterConfigsOptions =newAlterConfigsOptions().timeoutMs(10000);// 执行修改操作
        adminClient.alterConfigs(Collections.singletonMap(resource, config), alterConfigsOptions).all().get();
        log.info("【{}】topic过期时间设置完成,过期时间为:{}毫秒", topicName, ms);}/**
     * @Title listTopic
     * @Description 获取topic列表
     * @return java.util.Set<java.lang.String>
     */publicstaticSet<String>listTopic()throwsException{ListTopicsResult listTopicsResult = adminClient.listTopics();Set<String> strings = listTopicsResult.names().get();return strings;}/**
     * @Title existTopic
     * @Description topic是否存在
     * @param topicName topic名称
     * @return boolean
     */publicstaticbooleanexistTopic(String topicName)throwsException{Set<String> strings =listTopic();if(strings ==null|| strings.isEmpty()){returnfalse;}return strings.contains(topicName);}//endregion//region 生产者发送消息示例/**
     * @Title sendMsg
     * @Description 通过注册信息找到对应的容器并启动
     * @param topic 队列名称
     * @param msg 消息
     * @return void
     */publicstaticvoidsendMsg(String topic,Object msg)throwsException{
        kafkaTemplate.send(topic, msg);//kafkaTemplate.send(topic,2,"key",msg);//带有分区和key值的}//endregion//region 消费者监听容器相关方法/**
     * @Title existListenerContainer
     * @Description TODO 根据ID查询容器是否存在
     * @param id 监听容器id
     * @return boolean
     */publicstaticbooleanexistListenerContainer(String id)throwsException{Set<String> listenerIds = kafkaListenerEndpointRegistry.getListenerContainerIds();return listenerIds.contains(id);}/**
     * @Title registerListener
     * @Description TODO  创建kafka监听容器并注册到注册信息中,一次可以注册多个topic的监听容器
     * @param id 容器id,自定义
     * @param consumerGroupId 消费者组id自定义
     * @param processBean 处理消息的类
     * @param processMethod 处理消息的方法
     * @param topics 需要监听的topic数组
     * @return void
     */publicstaticvoidregisterListenerContainer(String id,String consumerGroupId,Object processBean,Method processMethod,String... topics)throwsException{//判断id是否存在if(existListenerContainer(id)){//如果当前id的容器已存在,不添加
            log.info("当前id为{}的容器已存在,不进行添加操作!", id);return;}//判断所有队列是否存在for(String topic : topics){if(!existTopic(topic)){//如果存在topic不存在,不添加
                log.info("【{}】topic不存在,不进行添加操作!", topic);return;}}MethodKafkaListenerEndpoint<String,String> endpoint =newMethodKafkaListenerEndpoint<>();//设置监听器端点相关信息//设置Id
        endpoint.setId(id);//设置消费者组
        endpoint.setGroupId(consumerGroupId);//设置要监听的topic数组,可以是多个
        endpoint.setTopics(topics);//设置每个监听器线程数
        endpoint.setConcurrency(3);//设置批量监听
        endpoint.setBatchListener(true);//设置消息处理工厂类,这里用的是默认工厂
        endpoint.setMessageHandlerMethodFactory(newDefaultMessageHandlerMethodFactory());//设置实际处理的Bean对象,即实际的对象,比如new Class();
        endpoint.setBean(processBean);//设置实际处理的方法(包含方法名和参数)
        endpoint.setMethod(processMethod);//注册Container并启动,startImmediately表示立马启动
        kafkaListenerEndpointRegistry.registerListenerContainer(endpoint,SpringUtil.getBean(KafkaListenerContainerFactory.class),true);
        log.info("Kafka监听容器操作:ID为{}的容器已【注册】,监听的topics:{}", id, topics);//        for (String topicName : topics) {//            if (!KafkaConfig.notExistTopicCreateContainerFlag && !nameTopics.contains(topicName)) {//                log.info("【{}】topic不存在,不创建容器!", topicName);//                continue;//            }//            //创建一个kafka监听器端点对象//            MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();//            //设置监听器端点相关信息//            //设置Id//            endpoint.setId(topicName);//            //设置消费者组//            endpoint.setGroupId(topicName + "_consumer_group");//            //设置主题//            endpoint.setTopics(topicName);//            //设置每个监听器线程数//            endpoint.setConcurrency(3);//            //设置批量监听//            endpoint.setBatchListener(true);//            //设置默认处理工厂//            endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());//            //设置实际处理的Bean对象//            endpoint.setBean(new ConsumerController());//            //设置实际处理的方法名和参数类型//            endpoint.setMethod(ConsumerController.class.getMethod("consumeMessage", String.class));//            //注册Container并启动//            kafkaListenerEndpointRegistry.registerListenerContainer(endpoint, SpringUtil.getBean(KafkaListenerContainerFactory.class), true);//            log.info("Kafka监听容器操作:ID为{}的容器已【注册】", topicName);//        }}/**
     * @Title startListenerContainer
     * @Description 根据id开启监听容器的运行状态
     * @param id 监听容器的id
     * @return void
     */publicstaticvoidstartListenerContainer(String id)throwsException{MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);if(listenerContainer ==null){
            log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!", id);return;}
        listenerContainer.start();
        log.info("Kafka监听容器操作:ID为{}的容器已【开启】", id);}/**
     * @Title stopListenerContainer
     * @Description TODO 根据id停止监听容器的运行状态
     * @param id 监听容器的id
     * @return void
     */publicstaticvoidstopListenerContainer(String id)throwsException{MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);if(listenerContainer ==null){
            log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!", id);return;}
        listenerContainer.stop();
        log.info("Kafka监听容器操作:ID为{}的容器已【停止】", id);}/**
     * @Title pauseListenerContainer
     * @Description TODO 根据id暂停监听容器的监听状态
     * @param id 监听容器的id
     * @return void
     */publicstaticvoidpauseListenerContainer(String id)throwsException{MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);if(listenerContainer ==null){
            log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!", id);return;}
        listenerContainer.pause();
        log.info("Kafka监听容器操作:ID为{}的容器已【暂停】", id);}/**
     * @Title resumeListenerContainer
     * @Description TODO  根据id恢复监听容器的监听状态
     * @param id 监听容器的id
     * @return void
     */publicstaticvoidresumeListenerContainer(String id)throwsException{MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);if(listenerContainer ==null){
            log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!", id);return;}
        listenerContainer.resume();
        log.info("Kafka监听容器操作:ID为{}的容器已【恢复】", id);}/**
     * @Title isNormalStateListenerContainer
     * @Description 是否是正常状态的容器
     * (kafka监听容器的运行状态标志是running,监听状态标志是pauseRequested,停止是关闭了资源,暂停是停止消费)
     *  只有running是true,并且pauseRequested是false,监听容器才能正常消费消息
     * @param id 监听容器的id
     * @return boolean
     */publicstaticbooleanisNormalStateListenerContainer(String id)throwsException{MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);//如果不存在此id容器,则返回falseif(listenerContainer ==null){returnfalse;}//存在则返回容器的运行状态和非暂停状态return listenerContainer.isRunning()&&!listenerContainer.isPauseRequested();}/**
     * @Title getPauseStateListenerContainer
     * @Description 获取监听容器的暂停状态(监听的状态)
     * @param id 监听容器id
     * @return boolean
     */publicstaticbooleangetPauseStateListenerContainer(String id)throwsException{MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);if(listenerContainer ==null){returntrue;}return listenerContainer.isPauseRequested();}/**
     * @Title getRunningStateListenerContainer
     * @Description 获取监听容器的运行状态(容器的状态)
     * @param id 监听容器id
     * @return boolean
     */publicstaticbooleangetRunningStateListenerContainer(String id)throwsException{MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);if(listenerContainer ==null){returnfalse;}return listenerContainer.isRunning();}/**
     * @Title setStateNormalListenerContainer
     * @Description 使容器的运行状态和监听状态都是正常
     * @param id 监听容器的id
     * @return boolean 正常返回true,非正常返回false
     */publicstaticbooleansetStateNormalListenerContainer(String id)throwsException{if(!existListenerContainer(id)){
            log.info("Kafka监听容器操作:ID为{}的容器不存在,不操作!", id);returnfalse;}//先判断容器运行状态是否正常,如果不正常,则开启if(!getRunningStateListenerContainer(id)){startListenerContainer(id);}//再判断容器监听状态是否正常,如果不正常,则恢复if(getPauseStateListenerContainer(id)){resumeListenerContainer(id);}//设置完后,再查询状态并返回。returnisNormalStateListenerContainer(id);}//endregion}
  1. 然后编写KakfaTest测试类开始测试
/**
 *@ClassName KakfaTest
 *@Description: TODO
 *@Date 2024/5/16 15:21
 **/@RestControllerpublicclassKakfaTest{@RequestMapping("kafkatest")publicvoidtest(){try{String topicName ="kafka-test-1";KafkaUtil.createTopic(topicName,1,(short)1);//创建topicKafkaUtil.updateTopicRetention(topicName,String.valueOf(1000000));//更新topic的过期时间Set<String> strings =KafkaUtil.listTopic();//查出所有topicSystem.out.println("所有topic:"+ strings);boolean b =KafkaUtil.existTopic(topicName);//查询topic是否存在System.out.println("topic-是否存在:"+ b);String listenerID ="kafka-test-listener-1";//创建监听容器KafkaUtil.registerListenerContainer(listenerID,"test-consumer-group",newKakfaTest(),KakfaTest.class.getDeclaredMethod("consumerMessage",List.class), topicName);boolean b1 =KafkaUtil.existListenerContainer(listenerID);//查询监听容器是否存在System.out.println("容器-是否存在:"+ b1);boolean normalStateListenerContainer =KafkaUtil.isNormalStateListenerContainer(listenerID);//查询监听容器是否为正常状态System.out.println("容器-状态:"+ normalStateListenerContainer);KafkaUtil.pauseListenerContainer(listenerID);//暂停监听容器的监听状态boolean pauseStateListenerContainer =KafkaUtil.getPauseStateListenerContainer(listenerID);//查询监听容器的监听状态System.out.println("容器-监听状态:"+!pauseStateListenerContainer);KafkaUtil.stopListenerContainer(listenerID);//暂停监听容器的监听状态boolean runningStateListenerContainer =KafkaUtil.getRunningStateListenerContainer(listenerID);//查询监听容器的监听状态System.out.println("容器-运行状态:"+ runningStateListenerContainer);boolean normalStateListenerContainer2 =KafkaUtil.isNormalStateListenerContainer(listenerID);//查询监听容器是否为正常状态System.out.println("容器-状态:"+ normalStateListenerContainer2);boolean b2 =KafkaUtil.setStateNormalListenerContainer(listenerID);//设置监听容器为正常状态boolean normalStateListenerContainer3 =KafkaUtil.isNormalStateListenerContainer(listenerID);//查询监听容器是否为正常状态System.out.println("容器-状态:"+ normalStateListenerContainer3);}catch(Exception e){thrownewRuntimeException(e);}}@RequestMapping("del")publicvoiddeleteTopic()throwsException{String topicName ="kafka-test-1";KafkaUtil.deleteTopic(topicName);//删除topic}@RequestMapping("send")publicvoidsendMsg()throwsException{String topicName ="kafka-test-1";KafkaUtil.sendMsg(topicName,"haha");boolean b =KafkaUtil.existTopic(topicName);//查询topic是否存在System.out.println("topic-是否存在:"+ b);}/**
     * @Title consumerMessage
     * @Description TODO 消费监听处理消息的方法
     * @param message 接受来自kakfa的参数
     * @param ack 消息确认参数
     * @return void
     */publicvoidconsumerMessage(List<ConsumerRecord<String,Object>> message,Acknowledgment ack){System.out.println("收到消息:"+ message);//消息确认
        ack.acknowledge();}
  1. 测试日志查看:

① 调用测试(/kafkatest)接口:
在这里插入图片描述

② 调用发送消息(/send)接口,消费端日志:

在这里插入图片描述

③ 调用删除(/del)接口:
在这里插入图片描述

四、总结:

kafka动态的topic创建和设置主要是通过客户端adminClient来操作,而监听容器则是通过监听注册的kafkaListenerEndpointRegistry来进行创建和管理。
通过这个工具类我们可以快速且动态的创建topic、监听容器和定义监听消息的处理方法,非常nice。

🙉在小小的电脑上面敲呀敲呀敲,写短短的代码,埋小小的坑🙈

🙉在大大的电脑上面敲呀敲呀敲,写大大的代码,埋大大的坑🙈

🙉在特别大的电脑上面敲呀敲呀敲,写特别大的代码,埋特别大的坑🙈

🙉优秀的你肯定是一个不爱写Bug并且爱点赞关注的靓仔吧!🙈

标签: spring boot kafka java

本文转载自: https://blog.csdn.net/python_small_pan/article/details/138969580
版权归原作者 黄焖大排骨 所有, 如有侵权,请联系我们删除。

“springboot的kafka动态工具类(动态创建topic、监听和监听方法)”的评论:

还没有评论