0


JAVA中Kafka配置

1.XXXAppication启动类中添加默认注解

2.application.properties配置文件

  1. ### kafka configure
  2. spring.kafka.bootstrap-servers = ${kafka.ip}:9092
  3. #spring.kafka.consumer.group-id = milestone-subscription
  4. spring.kafka.consumer.group-id = label-common-service
  5. spring.kafka.consumer.enable-auto-commit = false
  6. spring.kafka.consumer.auto-offset-reset = earliest
  7. spring.kafka.consumer.max-poll-records = 50
  8. spring.kafka.producer.retries = 3
  9. spring.kafka.producer.batch-size = 16384
  10. spring.kafka.producer.buffer-memory = 33554432
  11. spring.kafka.consumer.topic = label_generator_apg
  12. spring.kafka.update.result.topic = label.provider.response

3.kafka配置文件

  1. @EnableKafka
  2. @Configuration
  3. public class KafkaConfig {
  4. @Value("${spring.kafka.bootstrap-servers}")
  5. private String bootstrapServers;
  6. @Value("${spring.kafka.consumer.group-id}")
  7. private String groupId;
  8. @Value("${spring.kafka.consumer.enable-auto-commit}")
  9. private Boolean autoCommit;
  10. @Value("${spring.kafka.consumer.auto-offset-reset}")
  11. private String autoOffsetReset;
  12. @Value("${spring.kafka.consumer.max-poll-records}")
  13. private Integer maxPollRecords;
  14. @Value("${spring.kafka.producer.retries}")
  15. private Integer retries;
  16. @Value("${spring.kafka.producer.batch-size}")
  17. private Integer batchSize;
  18. @Value("${spring.kafka.producer.buffer-memory}")
  19. private Integer bufferMemory;
  20. @Bean
  21. public Map<String, Object> producerConfigs() {
  22. Map<String, Object> props = Maps.newHashMap();
  23. props.put(ProducerConfig.ACKS_CONFIG, "0");
  24. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  25. props.put(ProducerConfig.RETRIES_CONFIG, retries);
  26. props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
  27. props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  28. props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
  29. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  30. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  31. return props;
  32. }
  33. @Bean
  34. public ProducerFactory<String, String> producerFactory() {
  35. return new DefaultKafkaProducerFactory<>(producerConfigs());
  36. }
  37. @Bean
  38. public KafkaTemplate<String, String> kafkaTemplate() {
  39. return new KafkaTemplate<>(producerFactory());
  40. }
  41. @Bean
  42. public Map<String, Object> consumerConfigs() {
  43. Map<String, Object> props = Maps.newHashMap();
  44. props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  45. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
  46. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
  47. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  48. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
  49. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
  50. props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
  51. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  52. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  53. return props;
  54. }
  55. @Bean
  56. public KafkaListenerContainerFactory<?> batchFactory() {
  57. ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  58. factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
  59. //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
  60. factory.setBatchListener(true);
  61. // set the retry template
  62. // factory.setRetryTemplate(retryTemplate());
  63. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
  64. return factory;
  65. }
  66. }

4.消费topic,手动提交offerset

  1. @Component
  2. @Slf4j
  3. public class OrderDataConsumer {
  4. @KafkaListener(topics = "${spring.kafka.consumer.topic}",containerFactory = "batchFactory")
  5. public void listen(List<ConsumerRecord<String,String>> records, Acknowledgment ack){
  6. try {
  7. log.info("====> Start of APGOrderDataConsumer");
  8. for (ConsumerRecord<String, String> record : records) {
  9. log.info("==> Records.offset() value is {}",record.offset());
  10. if (JSON.parseArray(record.value()).size() > 0) {
  11. record.value() // 获取的数据是jsonArry 后续根据业务逻辑进行处理
  12. }else {
  13. log.info("==>the list size of ConsumerRecord record is 0, no need to handle.");
  14. }
  15. }
  16. //手动提交offset
  17. ack.acknowledge();
  18. log.info("==> End of APGOrderDataConsumer");
  19. } catch (Exception e) {
  20. log.error("==> Handel APGOrderDataConsumer fail, the exception is {} ", e);
  21. }
  22. }
  23. }
标签: kafka java 分布式

本文转载自: https://blog.csdn.net/weixin_56331248/article/details/129955153
版权归原作者 一叶灬之秋 所有, 如有侵权,请联系我们删除。

“JAVA中Kafka配置”的评论:

还没有评论