0


Kafka:消费者参数配置

maven配置

// 消费者
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.25.129:9092,192.168.25.129:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

springboot配置类

@Configuration
public class KafkaConfig {
 
    // 配置全局admin
    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put("bootstrap.servers","192.168.25.129:9092");
        KafkaAdmin admin = new KafkaAdmin(configs);
        return admin;
    }
}

配置文件

# 用于建立初始连接的broker地址
spring.kafka.bootstrap-servers=192.168.25.129:9092
# producer用到的key和value的序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 默认的批处理记录数 16k
spring.kafka.producer.batch-size=16384
# 32MB的总发送缓存
spring.kafka.producer.buffer-memory=33554432
# consumer用到的key和value的反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# consumer的消费组id
spring.kafka.consumer.group-id=spring-kafka-consumer-group
# 是否自动提交消费者偏移量
spring.kafka.consumer.enable-auto-commit=true
# 每隔100ms向broker提交一次偏移量
spring.kafka.consumer.auto-commit-interval=100
# 如果该消费者的偏移量不存在,则自动设置为最早的偏移量
spring.kafka.consumer.auto-offset-reset=earliest

参数配置列表
属性说明bootstrap.servers向Kafka集群建立初始连接用到的host/port列表。 客户端会使用这里列出的所有服务器进行集群其他服务器的发现,而不管 是否指定了哪个服务器用作引导。 这个列表仅影响用来发现集群所有服务器的初始主机。 字符串形式:host1:port1,host2:port2,... 由于这组服务器仅用于建立初始链接,然后发现集群中的所有服务器,因 此没有必要将集群中的所有地址写在这里。 一般最好两台,以防其中一台宕掉。key.deserializerkey的反序列化类,该类需要实现 org.apache.kafka.common.serialization.Deserializer 接口。value.deserializer实现了 org.apache.kafka.common .serialization.Deserializer 接口的反序列化器, 用于对消息的value进行反序列化。client.id当从服务器消费消息的时候向服务器发送的id字符串。在ip/port基础上 提供应用的逻辑名称,记录在服务端的请求日志中,用于追踪请求的源。group.id用于唯一标志当前消费者所属的消费组的字符串。 如果消费者使用组管理功能如subscribe(topic)或使用基于Kafka的偏移量 管理策略,该项必须设置。auto.offset.reset当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被 删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量 latest:自动重置偏移量为最新的偏移量 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异 常 anything:向消费者抛异常auto.commit.interval.ms如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率。enable.auto.commit如果设置为true,消费者会自动周期性地向服务器提交偏移量。fetch.min.bytes服务器对每个拉取消息的请求返回的数据量最小值。 如果数据量达不到这个值,请求等待,以让更多的数据累积, 达到这个值之后响应请求。 默认设置是1个字节,表示只要有一个字节的数据, 就立即响应请求,或者在没有数据的时候请求超时。 将该值设置为大一点儿的数字,会让服务器等待稍微 长一点儿的时间以累积数据。 如此则可以提高服务器的吞吐量,代价是额外的延迟时间。fetch.max.wait.ms如果服务器端的数据量达不到 fetch.min.bytes 的话, 服务器端不能立即响应请求。 该时间用于配置服务器端阻塞请求的最大时长。fetch.max.bytes服务器给单个拉取请求返回的最大数据量。 消费者批量拉取消息,如果第一个非空消息批次的值比该值大, 消息批也会返回,以让消费者可以接着进行。 即该配置并不是绝对的最大值。 broker可以接收的消息批最大值通过 message.max.bytes (broker配置) 或 max.message.bytes (主题配置)来指定。 需要注意的是,消费者一般会并发拉取请求。connections.max.idle.ms在这个时间之后关闭空闲的连接。check.crcs自动计算被消费的消息的CRC32校验值。 可以确保在传输过程中或磁盘存储过程中消息没有被破坏。 它会增加额外的负载,在追求极致性能的场合禁用。exclude.internal.topics是否内部主题应该暴露给消费者。如果该条目设置为true, 则只能先订阅再拉取。isolation.level控制如何读取事务消息。 如果设置了 read_committed ,消费者的poll()方法只会 返回已经提交的事务消息。 如果设置了 read_uncommitted (默认值), 消费者的poll方法返回所有的消息,即使是已经取消的事务消息。 非事务消息以上两种情况都返回。 消息总是以偏移量的顺序返回。 read_committed 只能返回到达LSO的消息。 在LSO之后出现的消息只能等待相关的事务提交之后才能看到。 结果, read_committed 模式,如果有为提交的事务, 消费者不能读取到直到HW的消息。 read_committed 的seekToEnd方法返回LSO。heartbeat.interval.ms当使用消费组的时候,该条目指定消费者向消费者协调器 发送心跳的时间间隔。 心跳是为了确保消费者会话的活跃状态, 同时在消费者加入或离开消费组的时候方便进行再平衡。 该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的1/3。 可以将其调整得更小,以控制正常重新平衡的预期时间。session.timeout.ms当使用Kafka的消费组的时候,消费者周期性地向broker发送心跳数 表明自己的存在。 如果经过该超时时间还没有收到消费者的心跳, 则broker将消费者从消费组移除,并启动再平衡。 该值必须在broker配置 group.min.session.timeout.ms 和 group.max.session.timeout.ms 之间。max.poll.records一次调用poll()方法返回的记录最大数量。max.poll.interval.ms使用消费组的时候调用poll()方法的时间间隔。 该条目指定了消费者调用poll()方法的最大时间间隔。 如果在此时间内消费者没有调用poll()方法, 则broker认为消费者失败,触发再平衡, 将分区分配给消费组中其他消费者。max.partition.fetch.bytes对每个分区,服务器返回的最大数量。消费者按批次拉取数据。 如果非空分区的第一个记录大于这个值,批处理依然可以返回, 以保证消费者可以进行下去。 broker接收批的大小由 message.max.bytes (broker参数)或 max.message.bytes (主题参数)指定。 fetch.max.bytes 用于限制消费者单次请求的数据量。send.buffer.bytes用于TCP发送数据时使用的缓冲大小(SO_SNDBUF), -1表示使用OS默认的缓冲区大小。retry.backoff.ms在发生失败的时候如果需要重试,则该配置表示客户端 等待多长时间再发起重试。 该时间的存在避免了密集循环。request.timeout.ms客户端等待服务端响应的最大时间。如果该时间超时, 则客户端要么重新发起请求,要么如果重试耗尽,请求失败。reconnect.backoff.ms重新连接主机的等待时间。避免了重连的密集循环。 该等待时间应用于该客户端到broker的所有连接。reconnect.backoff.max.ms重新连接到反复连接失败的broker时要等待的最长时间 (以毫秒为单位)。 如果提供此选项,则对于每个连续的连接失败, 每台主机的退避将成倍增加,直至达到此最大值。 在计算退避增量之后,添加20%的随机抖动以避免连接风暴。receive.buffer.bytesTCP连接接收数据的缓存(SO_RCVBUF)。 -1表示使用操作系统的默认值。partition.assignment.strategy当使用消费组的时候,分区分配策略的类名。metrics.sample.window.ms计算指标样本的时间窗口。metrics.recording.level指标的最高记录级别。metrics.num.samples用于计算指标而维护的样本数量interceptor.classes拦截器类的列表。默认没有拦截器 拦截器是消费者的拦截器,该拦截器需要实现 org.apache.kafka.clients.consumer .ConsumerInterceptor 接口。 拦截器可用于对消费者接收到的消息进行拦截处理。

标签: kafka java spring boot

本文转载自: https://blog.csdn.net/weixin_45427648/article/details/129849952
版权归原作者 程序员无羡 所有, 如有侵权,请联系我们删除。

“Kafka:消费者参数配置”的评论:

还没有评论