第一步:application.yml的配置
server:
port: 8080
spring:
application:
name: demo
kafka:
one:
bootstrap-servers: xxx.xxx.xxx.xxx
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
group-id: xxxx
enable-auto-commit: true
two:
bootstrap-servers: xxx.xxx.xxx.xxx
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: xxxx
enable-auto-commit: true
第二步:配置config
@EnableKafka
@Configuration
public class xxxxConfig {
@Value("${spring.kafka.one.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.one.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.one.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Bean
public KafkaTemplate<String, String> xxxxTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> xxxxxxContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(20);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
private ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 不能写成 1
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
return props;
}
}
@EnableKafka
@Configuration
public class xxxxConfig {
@Value("${spring.kafka.two.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.two.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.two.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Bean
public KafkaTemplate<String, String> xxxxxxxTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> xxxxxxContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(6);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
private ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 不能写成 1
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
// value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
return props;
}
}
注意!注意!注意!!!代码中的一些字段名自己改一下。xxxx之类的换成自己的就行
第三步:
@Resource
private KafkaTemplate<String, String> xxxOneTemplate;
@Resource
private KafkaTemplate<String, String> xxxxTwoTemplate;
直接在你要用到的类中直接引用就行。
跟着以上三步走就可以简单的配置两个Kafka了,还有跟高级的spring切面切点的方法作者还没有研究成功就不献丑了。
版权归原作者 xiao_166711 所有, 如有侵权,请联系我们删除。