0


从零搭建开发脚手架 Spring Boot集成Kafka实现生产者消费者的多种方式

发布空闲消费者事件之间的时间(未收到数据)。

#idle-event-interval:

#是否在初始化期间记录容器配置(INFO 级别)。

#log-container-config:

检查无响应消费者的时间间隔。 如果未指定持续时间后缀,则将使用秒。

#monitor-interval:

乘数应用于“pollTimeout”以确定消费者是否无响应。

#no-poll-threshold:

#轮询消费者时使用的超时。

#poll-timeout:

#侦听器类型。默认single,可选batch

#type: single

创建Kafka主题


方式一(不推荐): 自动创建主题

在配置文件里指定好kafka的topic之后,调用send方法或者KafkaListener指定topic会自动帮我们创建好topic,只是创建的topic默认是1个副本和1个分区的,这一般不能满足我们的要求,所以我们还需要在kafka的server.properties里增加或修改以下参数:

auto.create.topics.enable=true

num.partitions=3

default.replication.factor=3

之后,kafka自动帮我们创建的主题都会包含3个副本和3个分区。

方式二:可以提前运行命令行工具在 Kafka 中创建主题:

$ bin/kafka-topics.sh --create \

–zookeeper localhost:2181 \

–replication-factor 1 --partitions 1 \

–topic mytopic

方式三:随着Kafka中_AdminClient_的引入,我们现在可以以编程方式创建主题。

我们需要添加KafkaAdmin Spring bean,它将自动为NewTopic类型的所有 bean 添加主题:

@Configuration

public class KafkaTopicConfig {

@Value(value = “${kafka.bootstrapAddress}”)

private String bootstrapAddress;

@Bean// 使用 Spring Boot 时,KafkaAdmin会自动注册一个bean,因此您只需要NewTopic @Bean

public KafkaAdmin kafkaAdmin() {

Map<String, Object> configs = new HashMap<>();

configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);

return new KafkaAdmin(configs);

}

@Bean

public NewTopic topic1() {

// 主题名称、分区数、副本数 或者使用TopicBuilder

return new NewTopic(“baeldung”, 1, (short) 1);

}

}

生产者


@Configuration

public class KafkaProducerConfig {

@Bean

public ProducerFactory<String, String> producerFactory() {

Map<String, Object> configProps = new HashMap<>();

configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress);

configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);

configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);

return new DefaultKafkaProducerFactory<>(configProps);

}

@Bean

public KafkaTemplate<String, String> kafkaTemplate() {

return new KafkaTemplate<>(producerFactory());

}

}

简单发布消息

我们可以使用_KafkaTemplate_类发送消息:

@Autowired

private KafkaTemplate<String, String> kafkaTemplate;

public void

标签: spring boot kafka linq

本文转载自: https://blog.csdn.net/2401_84049088/article/details/137404355
版权归原作者 2401_84049088 所有, 如有侵权,请联系我们删除。

“从零搭建开发脚手架 Spring Boot集成Kafka实现生产者消费者的多种方式”的评论:

还没有评论