0


Kafka常用配置及解析

我们在开发的时候经常会用到kafka作为消息订阅模式,里面会涉及到很多参数的配置,通过参数配置取优化业务处理的过程。其中,我们最常用的参数如下:

kafka:
  consumer:
    enable-auto-commit:true
    group-id: groupid
    auto-commit-interval:1000
    auto-offset-reset: latest
    bootstrap-servers:192.168.10.10:4320,192.168.10.11:4321,192.168.10.12:4322
    key-deserializer:org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer
    concurrency:3
    max-poll-records:50
    poll-timeout:1500
    batch-listener:false
  producer:
    servers:192.168.10.10:4320,192.168.10.11:4321,192.168.10.12:4322
    key-deserializer:org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializerswitch-enable:true
    retries:3
    batch:
      size:16384
    linger:0
    buffer:
      memory:33554432
  topic: topic_008,topic_001
  1. 消费者

消费者consumer中enable-auto-commit、auto-commit-interval参数代表开启自动提交,配置自动提交offset 位置之后,我们不必关心消息消费到了什么位置,当程序重启后,消息也不会重复消费。

auto-commit-interval 的默认值是 5000,单位是毫秒。

消费者consumer中group-id,topic到group之间是发布订阅的通信方式,即一条topic会被所有的group消费,属于一对多模式;group到consumer是点对点通信方式,属于一对一模式。
在一个消费者组当中可以有一个或者多个消费者实例,它们共享一个公共的group ID,组ID是一个字符串,用来唯一标志一个消费者组,组内的所有消费者协调在一起来消费订阅主题的所有分区,但是同一个topic下的某个分区只能被消费者组中的一个消费者消费,不同消费者组中的消费者可以消费相同的分区。

消费者consumer中auto-offset-reset有三个值,earliest 、latest、none含义如下:
值描述earliest当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费latest当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据nonetopic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

消费者中bootstrap-servers用来连接Kafka集群的入口参数,这个参数对应的值通常是Kafka集群中broker的地址。

消费者中concurrency参数用来开启消费者线程数。应用在单机部署环境下,这个参数很好理解,你想要开几个相应设置几个就行,concurrency数不能大于partition数量,因为partition会尽量平均分配给消费者,多出的会再重新分配给某些消费者,即消费者消费的partition数量会不等。

消费者max-poll-records指定每次最大消费消息数量。

消费者poll-timeout指定消费的超时时间。

消费者 batch-listener是否开启批量消费,true 表示批量消费 。

kafka消费者配置

/**kafka 集群,broker-list*/@Value("${kafka.consumer.bootstrap-servers}")privateString servers;/**开启自动提交*/@Value("${kafka.consumer.enable-auto-commit}")privateboolean enableAutoCommit;/**自动提交延迟*/@Value("${kafka.consumer.auto-commit-interval}")privateString autoCommitInterval;/**消费者组*/@Value("${kafka.consumer.group-id}")privateString groupId;/**重置消费者的offset*/@Value("${kafka.consumer.auto-offset-reset}")privateString autoOffsetReset;/**最多并发数*/@Value("${kafka.consumer.concurrency}")privateint concurrency;/**是否批量拉取*/@Value("${kafka.consumer.batch-listener}")privateboolean batchListener;/**批量拉取个数*/@Value("${kafka.consumer.max-poll-records}")privateint maxPollRecords;/**拉取超时时间*/@Value("${kafka.consumer.poll-timeout}")privatelong pollTimeout;/**否启用权限认证*/@Value("${kafka.consumer.kafkaSecurityStatus}")privateint kafkaSecurityStatus;@Value("${kafka.securityConfig}")privateString password;@Bean@PrimarypublicKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,byte[]>>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,byte[]> factory =newConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());/*并发数量*/
        factory.setConcurrency(concurrency);/*批量获取开关*/
        factory.setBatchListener(batchListener);/*设置拉取时间超时间隔*/
        factory.getContainerProperties().setPollTimeout(pollTimeout);return factory;}privateConsumerFactory<String,byte[]>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}privateMap<String,Object>consumerConfigs(){Map<String,Object> propsMap =newHashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ByteArrayDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);/* 批量拉取数量*/
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);/*灵活配置是否启用权限认证开关*/if(kafkaSecurityStatus ==1){
            propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
            propsMap.put(SaslConfigs.SASL_MECHANISM,"PLAIN");}return propsMap;}@BeanpublicKafkaProperties.Listenerlistener(){returnnewKafkaProperties.Listener();}@BeanpublicvoidconfigureSaslConsumer(){//如果用-D或者其它方式设置过,这里不再设置。if(null==System.getProperty("java.security.auth.login.config")){//这个路径必须是一个文件系统可读的路径,不能被打包到JAR中。System.setProperty("java.security.auth.login.config",password);}}

2.生产者

retries:生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下, retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。

batch.size:当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送,批次的大小可以通过batch.size 参数设置.默认是102416(16KB),一个非常大的批次大小可能会浪费内存。
linger:延时发送,比如设置batch size为1024
16,但是有的时刻消息比较少,过了很久也没有凑够32KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间,即使数据没达到32KB,也将这个批次发送出去. 比如设置5ms,就是到了5ms,大小没到32KB,也会发出去。

同时设置batch.size和 linger,优先将满足条件的消息发送出去(为互斥),Kafka需要考虑高吞吐量与延时的平衡。
生产者初始化

/**kafka 集群,broker-list*/@Value("${kafka.producer.servers}")privateString servers;/**重试次数*/@Value("${kafka.producer.retries}")privateint retries;/**批次大小*/@Value("${kafka.producer.batch.size}")privateint batchSize;/**等待时间*/@Value("${kafka.producer.linger}")privateint linger;/**RecordAccumulator 缓冲区大小*/@Value("${kafka.producer.buffer.memory}")privateint bufferMemory;/**否启用权限认证*/@Value("${kafka.producer.kafkaSecurityStatus}")privateint kafkaSecurityStatus;/**否启用权限认证*/@Value("${kafka.securityConfig}")privateString password;privateMap<String,Object>producerConfigs(){Map<String,Object> props =newHashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);/*灵活配置开关是否启用权限认证*/if(kafkaSecurityStatus ==1){
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
            props.put(SaslConfigs.SASL_MECHANISM,"PLAIN");}return props;}privateProducerFactory<String,String>producerFactory(){returnnewDefaultKafkaProducerFactory<>(producerConfigs());}/**
     * 创建KafkaTemplate模板,并注入spring中
     * @return
     */@BeanpublicKafkaTemplate<String,String>kafkaTemplate(){returnnewKafkaTemplate<String,String>(producerFactory());}@BeanpublicvoidconfigureSaslProducer(){//如果用-D或者其它方式设置过,这里不再设置。if(null==System.getProperty("java.security.auth.login.config")){//这个路径必须是一个文件系统可读的路径,不能被打包到JAR中。System.setProperty("java.security.auth.login.config",password);}}
标签: kafka java 大数据

本文转载自: https://blog.csdn.net/zhizhi120/article/details/127727810
版权归原作者 蔡晓智 所有, 如有侵权,请联系我们删除。

“Kafka常用配置及解析”的评论:

还没有评论