一、配置文件
xxxxxx:
kafka:
bootstrap-servers: xx.xx.xx.xx:9092,xx.xx.xx.xx:9092
producer:
# 设置大于0的值,则客户端会将发送失败的记录重新发送
retries: 3
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。16M
batch-size: 16384
linger: 1
# 设置生产者内存缓冲区的大小。#32M
buffer-memory: 33554432
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: 1
# 指定消息key和消息体的编解码方式 值的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
poll-timeout: 3000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-commit: false
offset-reset: earliest
records: 10
session-timeout: 150000
poll-interval: 360000
request-timeout: 60000
二、KafkaConfig
package com.xxxxxx.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${xxxxxx.kafka.bootstrap-servers}")
private String servers;
@Value("${xxxxxx.kafka.producer.retries}")
private int retries;
@Value("${xxxxxx.kafka.producer.batch-size}")
private int batchSize;
@Value("${xxxxxx-afka.producer.linger}")
private int linger;
@Value("${xxxxxx.kafka.producer.buffer-memory}")
private int bufferMemory;
@Value("${xxxxxx.kafka.producer.acks}")
private String acks;
@Value("${xxxxxx.kafka.producer.key-serializer}")
private String keyDeserializer;
@Value("${xxxxxx.kafka.producer.value-serializer}")
private String valueDeserializer;
// 创建生产者配置map,ProducerConfig中的可配置属性比spring boot自动配置要多
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
//设置重试次数
props.put(ProducerConfig.RETRIES_CONFIG, retries);
//达到batchSize大小的时候会发送消息
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
//延时时间,延时时间到达之后计算批量发送的大小没达到也发送消息
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
//缓冲区的值
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
//序列化手段
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keyDeserializer);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueDeserializer);
props.put(ProducerConfig.ACKS_CONFIG, acks);
return props;
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean(name = "xxxxxxKafkaTemplate")
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
三、生产者
@Resource(name = "xxxxxxKafkaTemplate")
private KafkaTemplate kafkaTemplate;
kafkaTemplate.send(topic, message);
本文转载自: https://blog.csdn.net/cndn20120225/article/details/134271528
版权归原作者 大风起曦云飞扬 所有, 如有侵权,请联系我们删除。
版权归原作者 大风起曦云飞扬 所有, 如有侵权,请联系我们删除。