发布空闲消费者事件之间的时间(未收到数据)。
#idle-event-interval:
#是否在初始化期间记录容器配置(INFO 级别)。
#log-container-config:
检查无响应消费者的时间间隔。 如果未指定持续时间后缀,则将使用秒。
#monitor-interval:
乘数应用于“pollTimeout”以确定消费者是否无响应。
#no-poll-threshold:
#轮询消费者时使用的超时。
#poll-timeout:
#侦听器类型。默认single,可选batch
#type: single
创建Kafka主题
方式一(不推荐): 自动创建主题
在配置文件里指定好kafka的topic之后,调用send方法或者KafkaListener指定topic会自动帮我们创建好topic,只是创建的topic默认是1个副本和1个分区的,这一般不能满足我们的要求,所以我们还需要在kafka的server.properties里增加或修改以下参数:
auto.create.topics.enable=true
num.partitions=3
default.replication.factor=3
之后,kafka自动帮我们创建的主题都会包含3个副本和3个分区。
方式二:可以提前运行命令行工具在 Kafka 中创建主题:
$ bin/kafka-topics.sh --create \
–zookeeper localhost:2181 \
–replication-factor 1 --partitions 1 \
–topic mytopic
方式三:随着Kafka中_AdminClient_的引入,我们现在可以以编程方式创建主题。
我们需要添加KafkaAdmin Spring bean,它将自动为NewTopic类型的所有 bean 添加主题:
@Configuration
public class KafkaTopicConfig {
@Value(value = “${kafka.bootstrapAddress}”)
private String bootstrapAddress;
@Bean// 使用 Spring Boot 时,KafkaAdmin会自动注册一个bean,因此您只需要NewTopic @Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
// 主题名称、分区数、副本数 或者使用TopicBuilder
return new NewTopic(“baeldung”, 1, (short) 1);
}
}
生产者
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
简单发布消息
我们可以使用_KafkaTemplate_类发送消息:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void
版权归原作者 2401_84049088 所有, 如有侵权,请联系我们删除。