0


[springboot配置Kafka] springboot配置多个kafka,包含账号密码

说明

本示例只配置了Consumer没有配置Producer,可参考配置文件_1中注释内容部分

1.引入依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2.yml配置

spring:
  kafka:
    one:
    #测试环境
      bootstrap-servers:127.0.0.1:9092
      topic: default_topic
      properties:
        security:
          protocol: SASL_PLAINTEXT
        sasl:
          mechanism: SCRAM-SHA-512
          jaas:
            config:org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";
      consumer:
        # 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
        group-id: defaultName
        #关闭自动提交
        enable-auto-commit:false
        #重置消费者的offset
        # smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
        auto-offset-reset: latest
        #key value 的反序列化
        key-deserializer:org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer:org.apache.kafka.common.serialization.StringDeserializer
        max-poll-records:5
    two:
      #测试环境
      bootstrap-servers:127.0.0.1:9092
      topic: default_topic_two
      consumer:
        # 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
        group-id: defaultName_two
        #关闭自动提交
        enable-auto-commit:false
        #重置消费者的offset
        # smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
        auto-offset-reset: latest
        #key value 的反序列化
        key-deserializer:org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer:org.apache.kafka.common.serialization.StringDeserializer
        max-poll-records:5

3.新建配置文件

3.1配置文件_1

@Configuration@EnableKafkapublicclassK1kafkaConfiguration{@Value("${spring.kafka.one.bootstrap-servers}")privateString bootstrapServers;@Value("${spring.kafka.one.consumer.group-id}")privateString groupId;@Value("${spring.kafka.one.consumer.enable-auto-commit}")privateString enableAutoCommit;@Value("${spring.kafka.one.consumer.auto-offset-reset}")privateString autoOffsetReset;@Value("${spring.kafka.one.consumer.max-poll-records}")privateString maxPollRecords;@Value("${spring.kafka.one.properties.security.protocol}")privateString securityprotocol;@Value("${spring.kafka.one.properties.sasl.mechanism}")privateString mechanism;@Value("${spring.kafka.one.properties.sasl.jaas.config}")privateString jaasconfig;//@Value("${spring.kafka.one.producer.linger-ms}")//private Integer lingerMs;//@Value("${spring.kafka.one.producer.max-request-size}")//private Integer maxRequestSize;//@Value("${spring.kafka.one.producer.batch-size}")//private Integer batchSize;//@Value("${spring.kafka.one.producer.buffer-memory}")//private Integer bufferMemory;//@Bean//public KafkaTemplate<String, String> kafkaOneTemplate() {//    return new KafkaTemplate<>(producerFactory());//}@Bean@Primary//理解为默认优先选择当前容器下的消费者工厂KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,String>>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<Integer,String> factory =newConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());//并发数量
        factory.setConcurrency(1);//开启批量监听//factory.setBatchListener(type);// 被过滤的消息将被丢弃// factory.setAckDiscarded(true);
        factory.getContainerProperties().setPollTimeout(3000);//设置手动提交ackMode
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setMissingTopicsFatal(false);// 设置记录筛选策略//factory.setRecordFilterStrategy(new RecordFilterStrategy() {//    @Override//    public boolean filter(ConsumerRecord consumerRecord) {//        String msg = consumerRecord.value().toString();//        if(Integer.parseInt(msg.substring(msg.length() - 1)) % 2 == 0){//            return false;//        }//        // 返回true消息将会被丢弃//        return true;//    }//});return factory;}//private ProducerFactory<String, String> producerFactory() {//    return new DefaultKafkaProducerFactory<>(producerConfigs());//}@Bean//第一个消费者工厂的beanpublicConsumerFactory<Integer,String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}//private Map<String, Object> producerConfigs() {//    Map<String, Object> props = new HashMap<>();//    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);//    props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs);//    props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);//    props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);//    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);//    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//    return props;//}@BeanpublicMap<String,Object>consumerConfigs(){Map<String,Object> props =newHashMap<>();
        props.put("security.protocol",securityprotocol);
        props.put("sasl.mechanism",mechanism);
        props.put("sasl.jaas.config",jaasconfig);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,autoOffsetReset);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);return props;}

3.2配置文件_2

@Configuration@EnableKafkapublicclassK2kafkaConfiguration{@Value("${spring.kafka.two.bootstrap-servers}")privateString bootstrapServers;@Value("${spring.kafka.two.consumer.group-id}")privateString groupId;@Value("${spring.kafka.two.consumer.enable-auto-commit}")privateString enableAutoCommit;@Value("${spring.kafka.two.consumer.auto-offset-reset}")privateString autoOffsetReset;@Value("${spring.kafka.two.consumer.max-poll-records}")privateString maxPollRecords;@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,String>>kafkaTwoContainerFactory(){ConcurrentKafkaListenerContainerFactory<Integer,String> factory =newConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());//并发数量
        factory.setConcurrency(1);//开启批量监听//factory.setBatchListener(type);// 被过滤的消息将被丢弃// factory.setAckDiscarded(true);
        factory.getContainerProperties().setPollTimeout(3000);//设置手动提交ackMode
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setMissingTopicsFatal(false);return factory;}publicConsumerFactory<Integer,String>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}privateMap<String,Object>consumerConfigs(){Map<String,Object> props =newHashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutoCommit);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,autoOffsetReset);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);return props;}

4.设置消费

4.1 设置消费_1

@Component@Slf4j(topic ="KAFKALOG")publicclassConsumer{@KafkaListener(topics ="#{'${spring.kafka.one.topic}'}", groupId ="defaultName",containerFactory ="kafkaListenerContainerFactory")publicvoidlistenGroup(ConsumerRecord<String,String>record,Acknowledgment ack){
        log.info("[Consumer] 接收到kafka消息:{}",record.value());System.out.println(record);System.out.println(record.value());//手动提交offset//ack.acknowledge();}

4.2 设置消费_2

@Component@Slf4j(topic ="KAFKALOG")publicclassConsumer2{@KafkaListener(topics ="#{'${spring.kafka.two.topic}'}", groupId ="defaultName_two",containerFactory ="kafkaTwoContainerFactory")publicvoidlistenGroup(ConsumerRecord<String,String>record,Acknowledgment ack){
        log.info("[Consumer2 ] 接收到kafka消息:{}",record.value());System.out.println(record);System.out.println(record.value());//手动提交offset//ack.acknowledge();}
标签: spring boot kafka

本文转载自: https://blog.csdn.net/ai_haibin/article/details/131436216
版权归原作者 听者listener 所有, 如有侵权,请联系我们删除。

“[springboot配置Kafka] springboot配置多个kafka,包含账号密码”的评论:

还没有评论