概述
本文主要是分享Kafka初始化生产者的大体过程
初始化过程中会新建很多对象,本文暂先分享部分对象
1.分区器---Partitioner partitioner
2.重试时间---long retryBackoffMs
3.序列化器---Serializer<K> keySerializer,Serializer<V> valueSerializer
4.拦截器---List<ProducerInterceptor<K, V>> interceptorList
5.累加器---RecordAccumulator accumulator
6.元数据---ProducerMetadata metadata
7.创建sender线程---Sender sender
生产者初始化代码示例:
后续分享这些配置会被用到什么地方,生成上述哪些对象
// 这是构建kafka生产者的[示例代码]
// 设置属性
Properties properties = new Properties();
// 指定连接的kafka服务器的地址,配置多台的服务 用,分割, 其中一个宕机,生产者 依然可以连上(集群)
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "[kafka server ip]:[kafka server port]");
// 1.分区器---Partitioner partitioner
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
// 2.重试时间---long retryBackoffMs
properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 10L);
// 3.key和value的序列化器---Serializer<K> keySerializer,Serializer<V> valueSerializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 4.拦截器---List<ProducerInterceptor<K, V>> interceptorList
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyInterceptor.class);
// 构建kafka生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
一、分区器
对应初始化时设置的参数:
ProducerConfig.PARTITIONER_CLASS_CONFIG
示意:
分区器是在发送消息时用来计算消息将要发送到哪个分区的,支持自定义分区器
// 这是kafka client 初始化生产者的[源码]
// 如果没有设置自定义分区器,则partitioner为null,会影响到后续初始化逻辑以及发送消息时的逻辑
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
自定义分区器示例代码
如需使用自定义分区器,需要考虑好分区负载问题,切勿为了解决需求盲目使用自定义分区;
分区不合理可能影响broker性能,也是对低负载分区资源的浪费,严重情况下某一分区的消费者负载过大,或某一分区broker负载过大,可能导致雪崩;
// 这是自定义分区器的[示例代码]
public class MyPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
int num = partitionInfos.size();
// 与[org.apache.kafka.clients.producer.internals.DefaultPartitioner]
// 计算分区时一样的算法
int parId = Utils.toPositive(Utils.murmur2(valueBytes)) % num;
return parId;
}
public void close() {//do nothing}
public void configure(Map<String, ?> configs) {//do nothing}
}
分区器使用
1.用户可以通过实现该接口自定义分区器,在生产者调用send方法发送消息时,会使用用户自定义的分区器计算消息要发送到哪个分区,自定义分区器简单实现见上面代码块
org.apache.kafka.clients.producer.KafkaProducer#partition
2.在生产者调用send方法发送消息时,如果使用用户自定义的分区器,允许在第一次将消息放入本地缓存失败时,进行一次尝试:重现分配分区和本地缓存
// 这是kafka生产者调用send方法发送消息时的部分[源码]
// check if we have an in-progress batch
Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
synchronized (dq) {
// After taking the lock, validate that the partition hasn't changed and retry.
if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
continue;
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
if (appendResult != null) {
// 这是第一次就将消息添加到缓存后返回
// If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
boolean enableSwitch = allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
}
}
// 这是第一次尝试将消息放入本地缓存失败后,判断如果使用户自定义的分区器,则返回一个对象
// 该对象将在append方法的调用处进行重新计算分区并重试一次
// we don't have an in-progress record batch try to allocate a new batch
if (abortOnNewBatch) {
// Return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, true, 0);
}
该对象还有别的使用场景暂不介绍
二、重试时间
对应初始化时设置的参数
ProducerConfig.RETRY_BACKOFF_MS_CONFIG
示意:
The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.
尝试重试对给定主题分区的失败请求之前等待的时间。这避免了在某些失败场景下以紧密循环的方式重复发送请求。
重试时间的使用
1.初始化累加器时作为入参,保存到累加器字段[long retryBackoffMs]中,用于sender线程发送消息时检测重试超时
2.初始化元信息对象[org.apache.kafka.clients.Metadata]时,放入字段[long refreshBackoffMs]中,用于更新元信息前判断等待时间
总结该字段主要用于向服务器循环发送请求时停顿等待
三、序列化器
对应初始化时设置的参数
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
示意:
Serializer class for key that implements the org.apache.kafka.common.serialization.Serializerinterface.
实现org.apache.kafka.common.serialization.Serializer接口的key序列化程序类。Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface.
实现org.apache.kafka.commun.serialization.Serializer接口的值的序列化程序类。
自定义序列化器
// 序列化对象
public class UserSerializer implements Serializer<User> {
public void configure(Map<String, ?> configs, boolean isKey) {
//do nothing
}
public byte[] serialize(String topic, User data) {
try {
byte[] name;
int nameSize;
if (data == null) {
return null;
}
if (data.getName() != null) {
name = data.getName().getBytes("UTF-8");
//字符串的长度
nameSize = data.getName().length();
} else {
name = new byte[0];
nameSize = 0;
}
/*id的长度4个字节,字符串的长度描述4个字节,
字符串本身的长度nameSize个字节*/
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + nameSize);
buffer.putInt(data.getId());//4
buffer.putInt(nameSize);//4
buffer.put(name);//nameSize
return buffer.array();
} catch (Exception e) {
throw new SerializationException("Error serialize User:" + e);
}
}
public void close() {
//do nothing
}
}
// 反序列化
public class UserDeserializer implements Deserializer<User> {
public void configure(Map<String, ?> configs, boolean isKey) {
//do nothing
}
public User deserialize(String topic, byte[] data) {
try {
if (data == null) {
return null;
}
if (data.length < 8) {
throw new SerializationException("Error data size.");
}
ByteBuffer buffer = ByteBuffer.wrap(data);
int id;
String name;
int nameSize;
id = buffer.getInt();
nameSize = buffer.getInt();
byte[] nameByte = new byte[nameSize];
buffer.get(nameByte);
name = new String(nameByte, "UTF-8");
return new User(id, name);
} catch (Exception e) {
throw new SerializationException("Error Deserializer DemoUser." + e);
}
}
public void close() {
//do nothing
}
}
// 消息中的实体类
public class User {
private int id;
private String name;
public User(int id) {
this.id = id;
}
public User(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}
使用自定义序列化器时需要在消费者消费消息时使用反序列化器将消息反序列化,一般常用的就是字符串序列化器
org.apache.kafka.common.serialization.StringSerializer
四、拦截器
对应初始化时设置的参数
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG
示意:
Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there are no interceptors
通过实现org.apache.kafka.clients.producer.ProducerInterceptor接口,您可以在生产者接收到的记录发布到kafka集群之前拦截这些记录。默认情况下,没有拦截器
自定义序拦截器
// 这是自定义序列化器[示例代码]
public class MyInterceptor implements ProducerInterceptor<String, String> {
private long successCount = 0L;
private long errorCount = 0L;
//该方法:Producer在将消息序列化和分配分区之前会调用拦截器的这个方法来对消息进行相应的操作
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
//要把发送的value都带上时间戳
return new ProducerRecord<String, String>(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + record.value(), record.headers());
}
//发送消息情况统计
//该方法:会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用。并且通常都是在producer回调逻辑触发之前
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
successCount++;
} else {
errorCount++;
}
}
//该方法:可以关闭拦截器,主要用于执行一些资源清理工作
@Override
public void close() {
//producer发送数据结束并close后,会自动调用拦截器的close方法来输出统计的成功和失败次数
System.out.println("成功次数=" + successCount);
System.out.println("失败次数=" + errorCount);
}
@Override
public void configure(Map<String, ?> configs) {
}
}
五、累加器
这是Kafka中非常重要的内部组件,主要用于缓存消息以便批量发送,从而减少网络传输的资源消耗并提升性能。
简单概述:
对于生产者的作用是使用累加器,可以让生产者不必每次发送消息就即刻推送到broker,可以将一个topic的同一分区消息写入同一份缓存,等待sender线程批量获取这批消息一次性发送到broker。减少生产者发起网络调用的次数。
对broker而言的作用是,broker也可以一次性将接收到的这一批多个消息以顺序IO的方式追加到文件中,提高了储存效率。
示意:
This class acts as a queue that accumulates records into MemoryRecords instances to be sent to the server.
The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
this behavior is explicitly disabled.此类充当一个队列,将记录累积到要发送到服务器的MemoryRecords实例中。
累加器使用一定量的内存,当内存耗尽时,追加调用将被阻塞,除非此行为被明确禁用。
累加器核心作用
1.消息缓存:RecordAccumulator实际上是在客户端开辟出的一块内存区域,主要用来缓存消息。这种缓存机制允许Sender线程后续批量发送消息,而不是每调用一次send方法就直接将消息发送给broker。
2.批量发送:当触发发送条件如MemoryRecords缓存或者Deque队列满了或者一个队列等待时间达到配置时间,sender线程将一次性将这批消息发送给broker。这种方式可以减少网络请求的数量,提高系统的吞吐量。这些触发条件可参考下面参数配置的buffer.memory,batch.size,linger.ms,max.block.ms
3.资源管理:如果生产者发送消息的速度超过发送到服务器的速度,那么累加器中空间不足的话,就会导致生产者无法继续发送消息。在这种情况下,生产者可以通过设置max.block.ms参数来控制是否阻塞或者抛出异常。如果max.block.ms参数的默认值为60000(即60秒),那么超过这个时间限制后,如果累加器仍然没有足够的空间,生产者将无法继续发送消息。
关于累加器的实现,涉及到内部缓存管理,broker服务器元数据统计,sender线程交互等逻辑,后续再进行分享吧
如下这些参数都是有关累加器的重要配置,直接影响kafka生产者发送消息的性能
配置解释默认值buffer.memoryProducer 用来缓冲等待被发送到服务器的记录的总字节数。如果记录发送的速度比发送到服务器的速度快, Producer 就会阻塞,如果阻塞的时间超过
max.block.ms
配置的时长,则会抛出一个异常。
这个配置与 Producer 的可用总内存有一定的对应关系,但并不是完全等价的关系,因为 Producer 的可用内存并不是全部都用来缓存。一些额外的内存可能会用于压缩(如果启用了压缩),以及维护正在运行的请求。
33554432batch.size当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。
当记录的大小超过了配置的字节数, Producer 将不再尝试往批次增加记录。
发送到 broker 的请求会包含多个批次的数据,每个批次对应一个 partition 的可用数据
小的 batch.size 将减少批处理,并且可能会降低吞吐量(如果 batch.size = 0的话将完全禁用批处理)。 很大的 batch.size 可能造成内存浪费,因为我们一般会在 batch.size 的基础上分配一部分缓存以应付额外的记录。
16384linger.msproducer 会将两个请求发送时间间隔内到达的记录合并到一个单独的批处理请求中。通常只有当记录到达的速度超过了发送的速度时才会出现这种情况。然而,在某些场景下,即使处于可接受的负载下,客户端也希望能减少请求的数量。这个设置是通过添加少量的人为延迟来实现的;即,与其立即发送记录, producer 将等待给定的延迟时间,以便将在等待过程中到达的其他记录能合并到本批次的处理中。这可以认为是与 TCP 中的 Nagle 算法类似。这个设置为批处理的延迟提供了上限:一旦我们接受到记录超过了分区的 batch.size ,Producer 会忽略这个参数,立刻发送数据。但是如果累积的字节数少于 batch.size ,那么我们将在指定的时间内“逗留”(linger),以等待更多的记录出现。这个设置默认为0(即没有延迟)。例如:如果设置
linger.ms=5
,则发送的请求会减少并降低部分负载,但同时会增加5毫秒的延迟。0max.block.ms该配置控制
KafkaProducer.send()
和
KafkaProducer.partitionsFor()
允许被阻塞的时长。这些方法可能因为缓冲区满了或者元数据不可用而被阻塞。用户提供的序列化程序或分区程序的阻塞将不会被计算到这个超时。
六、初始化元数据
对应初始化时设置的参数
ProducerConfig.METADATA_MAX_AGE_CONFIG
示意:
The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.
以毫秒为单位的时间段,在此之后,即使我们没有看到任何主分区的变化,我们也会强制刷新元数据,以主动发现任何新的代理或分区
---元数据刷新时间
ProducerConfig.METADATA_MAX_IDLE_CONFIG
示意:
Controls how long the producer will cache metadata for a topic that's idle. If the elapsed time since a topic was last produced to exceeds the metadata idle duration, then the topic's metadata is forgotten and the next access to it will force a metadata fetch request.
控制生产者为空闲主题缓存元数据的时间。如果自上次生成主题以来经过的时间超过了元数据空闲持续时间,则该主题的元数据将被遗忘,下一次对其的访问将强制执行元数据获取请求。
---生产者客户端为[某一主题]缓存元数据的时间,超过该时间后获取该主题元数据将强制从broker获取
初始化元数据信息分为两部分,第一部分初始化ProducerMetadata对象,设置元数据,topic信息缓存空闲时间如下源代码:
this.metadata = new ProducerMetadata(retryBackoffMs,
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
logContext,
clusterResourceListeners,
Time.SYSTEM);
第二部分为加载broker节点信息Node,源代码如下:
Node:org.apache.kafka.common.Node
this.metadata.bootstrap(addresses);
...
public synchronized void bootstrap(List<InetSocketAddress> addresses) {
this.needFullUpdate = true;
this.updateVersion += 1;
this.cache = MetadataCache.bootstrap(addresses);
}
...
static MetadataCache bootstrap(List<InetSocketAddress> addresses) {
Map<Integer, Node> nodes = new HashMap<>();
int nodeId = -1;
for (InetSocketAddress address : addresses) {
nodes.put(nodeId, new Node(nodeId, address.getHostString(), address.getPort()));
nodeId--;
}
return new MetadataCache(null, nodes, Collections.emptyList(),
Collections.emptySet(), Collections.emptySet(), Collections.emptySet(),
null, Collections.emptyMap(), Cluster.bootstrap(addresses));
}
上面只是初始化元数据最外层的代码,bootstrap是kafka生产者客户端初始化broker信息缓存的入口,执行这个方法后这个客户端将缓存kakfa broker的节点信息,每个节点的topic信息,分片信息等缓存。在客户端发送消息时,将通过这些缓存信息给broker发起请求,所以这块缓存是生产者客户端非常重要的部分,详情请看:三、Kafka生产者4---核心组件[元数据]-Metadata-CSDN博客
七、创建sender线程
是一个无限循环运行在后台的线程,会一直等待累加器中缓存的消息达到发送条件,把消息发送给Broker
发送的核心流程是:
1.从累加器中批量获取消息并创建 ClientRequest对象
2.将ClientRequest对象交给NetworkClient客户端发送
3.NetworkClient客户端将请求放入KafkaChannel的缓存
4.NetworkClient执行网络 I/O,完成请求的发送
5.NetworkClient收到响应,调用 ClientRequest 的回调函数,触发每个消息上注册的回调函数
总结:
本文大致介绍初始化kafka生产者的核心逻辑,创建的各类后续用于发送消息的对象,线程,配置信息等;
这些配置将直接影响生产者发送消息的性能和可靠性,如果需要在复杂应用场景下追求两者的平衡,需要对这些参数有深刻认识并调试验证后上线!
kafka生产者配置参数中文网站:3. 配置 - 【布客】kafka 中文翻译
kafka生产者配置参数英文网站:https://kafka.apache.org/documentation/#producerconfigs
版权归原作者 i_191 所有, 如有侵权,请联系我们删除。