0


SpringBoot Kafka生产者 多kafka配置

一、配置文件

  1. xxxxxx:
  2. kafka:
  3. bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092
  4. producer:
  5. # 设置大于0的值,则客户端会将发送失败的记录重新发送
  6. retries: 3
  7. #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。16M
  8. batch-size: 16384
  9. linger: 1
  10. # 设置生产者内存缓冲区的大小。#32M
  11. buffer-memory: 33554432
  12. # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
  13. # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
  14. # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
  15. acks: 1
  16. # 指定消息key和消息体的编解码方式 值的序列化方式
  17. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  18. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  19. consumer:
  20. poll-timeout: 3000
  21. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  22. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  23. auto-commit: false
  24. offset-reset: earliest
  25. records: 10
  26. session-timeout: 150000
  27. poll-interval: 360000
  28. request-timeout: 60000

二、KafkaConfig

  1. package com.xxxxxx.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.kafka.clients.consumer.ConsumerConfig;
  4. import org.apache.kafka.clients.producer.ProducerConfig;
  5. import org.apache.kafka.common.serialization.StringSerializer;
  6. import org.springframework.beans.factory.annotation.Value;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import org.springframework.kafka.annotation.EnableKafka;
  10. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  11. import org.springframework.kafka.config.KafkaListenerContainerFactory;
  12. import org.springframework.kafka.core.*;
  13. import java.util.HashMap;
  14. import java.util.Map;
  15. @Slf4j
  16. @Configuration
  17. @EnableKafka
  18. public class KafkaConfig {
  19. @Value("${xxxxxx.kafka.bootstrap-servers}")
  20. private String servers;
  21. @Value("${xxxxxx.kafka.producer.retries}")
  22. private int retries;
  23. @Value("${xxxxxx.kafka.producer.batch-size}")
  24. private int batchSize;
  25. @Value("${xxxxxx-afka.producer.linger}")
  26. private int linger;
  27. @Value("${xxxxxx.kafka.producer.buffer-memory}")
  28. private int bufferMemory;
  29. @Value("${xxxxxx.kafka.producer.acks}")
  30. private String acks;
  31. @Value("${xxxxxx.kafka.producer.key-serializer}")
  32. private String keyDeserializer;
  33. @Value("${xxxxxx.kafka.producer.value-serializer}")
  34. private String valueDeserializer;
  35. // 创建生产者配置map,ProducerConfig中的可配置属性比spring boot自动配置要多
  36. public Map<String, Object> producerConfigs() {
  37. Map<String, Object> props = new HashMap<>();
  38. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
  39. //设置重试次数
  40. props.put(ProducerConfig.RETRIES_CONFIG, retries);
  41. //达到batchSize大小的时候会发送消息
  42. props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
  43. //延时时间,延时时间到达之后计算批量发送的大小没达到也发送消息
  44. props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
  45. //缓冲区的值
  46. props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
  47. //序列化手段
  48. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keyDeserializer);
  49. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueDeserializer);
  50. props.put(ProducerConfig.ACKS_CONFIG, acks);
  51. return props;
  52. }
  53. public ProducerFactory<String, String> producerFactory() {
  54. return new DefaultKafkaProducerFactory<>(producerConfigs());
  55. }
  56. @Bean(name = "xxxxxxKafkaTemplate")
  57. public KafkaTemplate<String, String> kafkaTemplate() {
  58. return new KafkaTemplate<String, String>(producerFactory());
  59. }
  60. }

三、生产者

  1. @Resource(name = "xxxxxxKafkaTemplate")
  2. private KafkaTemplate kafkaTemplate;

kafkaTemplate.send(topic, message);


本文转载自: https://blog.csdn.net/cndn20120225/article/details/134271528
版权归原作者 大风起曦云飞扬 所有, 如有侵权,请联系我们删除。

“SpringBoot Kafka生产者 多kafka配置”的评论:

还没有评论