我们在开发的时候经常会用到kafka作为消息订阅模式,里面会涉及到很多参数的配置,通过参数配置取优化业务处理的过程。其中,我们最常用的参数如下:
kafka:
consumer:
enable-auto-commit:true
group-id: groupid
auto-commit-interval:1000
auto-offset-reset: latest
bootstrap-servers:192.168.10.10:4320,192.168.10.11:4321,192.168.10.12:4322
key-deserializer:org.apache.kafka.common.serialization.StringDeserializer
value-deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer
concurrency:3
max-poll-records:50
poll-timeout:1500
batch-listener:false
producer:
servers:192.168.10.10:4320,192.168.10.11:4321,192.168.10.12:4322
key-deserializer:org.apache.kafka.common.serialization.StringDeserializer
value-deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializerswitch-enable:true
retries:3
batch:
size:16384
linger:0
buffer:
memory:33554432
topic: topic_008,topic_001
- 消费者
消费者consumer中enable-auto-commit、auto-commit-interval参数代表开启自动提交,配置自动提交offset 位置之后,我们不必关心消息消费到了什么位置,当程序重启后,消息也不会重复消费。
auto-commit-interval 的默认值是 5000,单位是毫秒。
消费者consumer中group-id,topic到group之间是发布订阅的通信方式,即一条topic会被所有的group消费,属于一对多模式;group到consumer是点对点通信方式,属于一对一模式。
在一个消费者组当中可以有一个或者多个消费者实例,它们共享一个公共的group ID,组ID是一个字符串,用来唯一标志一个消费者组,组内的所有消费者协调在一起来消费订阅主题的所有分区,但是同一个topic下的某个分区只能被消费者组中的一个消费者消费,不同消费者组中的消费者可以消费相同的分区。
消费者consumer中auto-offset-reset有三个值,earliest 、latest、none含义如下:
值描述earliest当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费latest当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据nonetopic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
消费者中bootstrap-servers用来连接Kafka集群的入口参数,这个参数对应的值通常是Kafka集群中broker的地址。
消费者中concurrency参数用来开启消费者线程数。应用在单机部署环境下,这个参数很好理解,你想要开几个相应设置几个就行,concurrency数不能大于partition数量,因为partition会尽量平均分配给消费者,多出的会再重新分配给某些消费者,即消费者消费的partition数量会不等。
消费者max-poll-records指定每次最大消费消息数量。
消费者poll-timeout指定消费的超时时间。
消费者 batch-listener是否开启批量消费,true 表示批量消费 。
kafka消费者配置
/**kafka 集群,broker-list*/@Value("${kafka.consumer.bootstrap-servers}")privateString servers;/**开启自动提交*/@Value("${kafka.consumer.enable-auto-commit}")privateboolean enableAutoCommit;/**自动提交延迟*/@Value("${kafka.consumer.auto-commit-interval}")privateString autoCommitInterval;/**消费者组*/@Value("${kafka.consumer.group-id}")privateString groupId;/**重置消费者的offset*/@Value("${kafka.consumer.auto-offset-reset}")privateString autoOffsetReset;/**最多并发数*/@Value("${kafka.consumer.concurrency}")privateint concurrency;/**是否批量拉取*/@Value("${kafka.consumer.batch-listener}")privateboolean batchListener;/**批量拉取个数*/@Value("${kafka.consumer.max-poll-records}")privateint maxPollRecords;/**拉取超时时间*/@Value("${kafka.consumer.poll-timeout}")privatelong pollTimeout;/**否启用权限认证*/@Value("${kafka.consumer.kafkaSecurityStatus}")privateint kafkaSecurityStatus;@Value("${kafka.securityConfig}")privateString password;@Bean@PrimarypublicKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,byte[]>>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,byte[]> factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());/*并发数量*/
factory.setConcurrency(concurrency);/*批量获取开关*/
factory.setBatchListener(batchListener);/*设置拉取时间超时间隔*/
factory.getContainerProperties().setPollTimeout(pollTimeout);return factory;}privateConsumerFactory<String,byte[]>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());}privateMap<String,Object>consumerConfigs(){Map<String,Object> propsMap =newHashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ByteArrayDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);/* 批量拉取数量*/
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);/*灵活配置是否启用权限认证开关*/if(kafkaSecurityStatus ==1){
propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
propsMap.put(SaslConfigs.SASL_MECHANISM,"PLAIN");}return propsMap;}@BeanpublicKafkaProperties.Listenerlistener(){returnnewKafkaProperties.Listener();}@BeanpublicvoidconfigureSaslConsumer(){//如果用-D或者其它方式设置过,这里不再设置。if(null==System.getProperty("java.security.auth.login.config")){//这个路径必须是一个文件系统可读的路径,不能被打包到JAR中。System.setProperty("java.security.auth.login.config",password);}}
2.生产者
retries:生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下, retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。
batch.size:当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送,批次的大小可以通过batch.size 参数设置.默认是102416(16KB),一个非常大的批次大小可能会浪费内存。
linger:延时发送,比如设置batch size为102416,但是有的时刻消息比较少,过了很久也没有凑够32KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间,即使数据没达到32KB,也将这个批次发送出去. 比如设置5ms,就是到了5ms,大小没到32KB,也会发出去。
同时设置batch.size和 linger,优先将满足条件的消息发送出去(为互斥),Kafka需要考虑高吞吐量与延时的平衡。
生产者初始化
/**kafka 集群,broker-list*/@Value("${kafka.producer.servers}")privateString servers;/**重试次数*/@Value("${kafka.producer.retries}")privateint retries;/**批次大小*/@Value("${kafka.producer.batch.size}")privateint batchSize;/**等待时间*/@Value("${kafka.producer.linger}")privateint linger;/**RecordAccumulator 缓冲区大小*/@Value("${kafka.producer.buffer.memory}")privateint bufferMemory;/**否启用权限认证*/@Value("${kafka.producer.kafkaSecurityStatus}")privateint kafkaSecurityStatus;/**否启用权限认证*/@Value("${kafka.securityConfig}")privateString password;privateMap<String,Object>producerConfigs(){Map<String,Object> props =newHashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);/*灵活配置开关是否启用权限认证*/if(kafkaSecurityStatus ==1){
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM,"PLAIN");}return props;}privateProducerFactory<String,String>producerFactory(){returnnewDefaultKafkaProducerFactory<>(producerConfigs());}/**
* 创建KafkaTemplate模板,并注入spring中
* @return
*/@BeanpublicKafkaTemplate<String,String>kafkaTemplate(){returnnewKafkaTemplate<String,String>(producerFactory());}@BeanpublicvoidconfigureSaslProducer(){//如果用-D或者其它方式设置过,这里不再设置。if(null==System.getProperty("java.security.auth.login.config")){//这个路径必须是一个文件系统可读的路径,不能被打包到JAR中。System.setProperty("java.security.auth.login.config",password);}}
版权归原作者 蔡晓智 所有, 如有侵权,请联系我们删除。