一、配置文件
xxxxxx:
kafka:
bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092
consumer:
poll-timeout: 3000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-commit: false
offset-reset: earliest
records: 10
session-timeout: 150000
poll-interval: 360000
request-timeout: 60000
二、KafkaConfig
package com.xxxxxx.xxxxxx.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.*;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${xxxxxx.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${xxxxxx.kafka.consumer.poll-timeout}")
private Integer pollTimeout;
@Value("${xxxxxx.kafka.consumer.key-deserializer}")
private String keyDeserializer;
@Value("${xxxxxx.kafka.consumer.value-deserializer}")
private String valueDeserializer;
@Value("${xxxxxx.kafka.consumer.auto-commit}")
private String autoCommit;
@Value("${xxxxxx.kafka.consumer.offset-reset}")
private String offsetReset;
@Value("${xxxxxx.kafka.consumer.records}")
private Integer records;
@Value("${xxxxxx.kafka.consumer.session-timeout}")
private Integer sessionTimeout;
@Value("${xxxxxx.kafka.consumer.poll-interval}")
private Integer pollInterval;
@Value("${xxxxxx.kafka.consumer.request-timeout}")
private Integer requestTimeout;
@Bean(name = "ixxxxxxKafkaListenerContainerFactory")
public KafkaListenerContainerFactory integratedEnergyKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//并发数量
factory.setConcurrency(3);
//设置在消费者中等待记录的最大阻塞时间。
factory.getContainerProperties().setPollTimeout(pollTimeout);
//ack模式
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
private ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map consumerConfigs() {
Map props = new HashMap<>();
//Kafka集群
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//消费者组,只要group.id相同,就属于同一个消费者组
//props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//是否自动提交offset,默认为true,设置为false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
//key反序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
//value反序列化器
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
//一次消费信息条数
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records);
//earliest:第一次从头开始消费,之后按照offset开始消费;latest:只消费自己启动之后的消息
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
//session超时时间
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
//消费者轮询获取消息的最大时间间隔,超过此时间未获取消息,组将重新平衡,以便将分区重新分配给另一个成员
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval);
//客户端发起请求后,等待响应的最大时间。如果超时之前未收到响应,客户端会在必要时重新发起请求
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);
return props;
}
}
三、消费者
@KafkaListener(
containerFactory = "xxxxxxEnergyKafkaListenerContainerFactory",
id = "itsId",
idIsGroup = false,
groupId = "itsGroupId",
topics = "itsTopic"
)
public void consumerUser(
@Payload String data,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Acknowledgment ack,
Consumer<?, ?> consumer
){
try{
}catch (Exception e){
}
ack.acknowledge();
}
本文转载自: https://blog.csdn.net/cndn20120225/article/details/134271762
版权归原作者 大风起曦云飞扬 所有, 如有侵权,请联系我们删除。
版权归原作者 大风起曦云飞扬 所有, 如有侵权,请联系我们删除。