一、消息发送流程
第一步
对于一条消息,首先是在Kafka producer进行向下传递,依次经过拦截器、序列化、分区器。
这三个组件后面都会详细讲解。需要注意的是,在这一步最后经过分区器就已经确定好了该消息该发往broker的哪一个分区
第二步
RecordAccumulator记录累加器,可以理解为是生产者客户端的一个内部缓冲器,本质上是在生产者的JVM里开辟了一块内存。
经过了第一步之后,消息进入到内存缓冲池进行排队,注意这里不是Dqueue收到消息就立马进行下一步发送,而是将消息暂存起来,累积到一定大小再进行批量发送
具体做法:内部缓冲区会将多个消息批量在一起,形成一个批次(batch)。每个批次包含多个消息,这些消息属于同一个主题和分区。可以通过 batch.size 配置项设置每个批次的最大字节数。默认值为 16 KB。可以通过 linger.ms 配置项设置生产者在发送批次之前等待的时间。默认值为 0,表示不等待。设置一个非零值可以进一步优化批量发送,但可能会增加消息的延迟。
第三步
到了这一步,我们的生产者线程活就干完了,接下来由kafka的线程工作,是一个异步线程,通常称为Sender线程。具体做法:
异步发送线程定期检查内部缓冲区,获取已经准备好发送的消息批次。消息批次是由多个消息组成的集合,这些消息属于同一个主题和分区。
异步发送线程将消息批次封装成网络请求。每个请求包含一个或多个消息批次,这些批次属于同一个 Broker。
根据消息的分区信息,异步发送线程选择目标 Broker。每个分区都有一个领导者 Broker,负责处理该分区的所有写操作。
如果与目标 Broker 的连接不存在或已断开,异步发送线程会建立新的连接。Kafka 使用 TCP 连接进行通信。
异步发送线程通过 TCP 连接将网络请求发送到目标 Broker。请求中包含消息批次的数据。
目标 Broker 处理请求并将响应返回给异步发送线程。响应中包含每个消息批次的元数据,例如主题、分区、偏移量等。
响应结果包含一个ACK,ACK成功,发送成功;ACK失败,发送失败,进行重新发送。
二、kafka消息拦截器
Kafka 消息拦截器允许你在消息发送前后对消息进行处理。拦截器可以用于日志记录、消息转换、消息验证等多种场景。通过拦截器,你可以在不修改业务逻辑的情况下,对消息进行预处理或后处理
- 生产者拦截器:允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑。
- 消费者拦截器:支持在消费消息前以及提交位移后编写特定逻辑。
拦截器都支持链的方式,即你可以将一组拦截器串连成一个大的拦截器,Kafka 会按照添加顺序依次执行拦截器逻辑
生产者消息拦截
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
interceptor-classes: com.example.kafka.interceptor.TimestampInterceptor
package com.example.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class TimestampInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 在消息发送前添加时间戳
long timestamp = System.currentTimeMillis();
String valueWithTimestamp = record.value() + " [timestamp=" + timestamp + "]";
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), valueWithTimestamp, record.headers());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
// 记录发送成功的日志
System.out.println("Message sent successfully to topic: " + metadata.topic() + ", partition: " + metadata.partition() + ", offset: " + metadata.offset());
} else {
// 记录发送失败的日志
System.err.println("Failed to send message: " + exception.getMessage());
}
}
@Override
public void close() {
// 清理资源
System.out.println("Closing the interceptor");
}
@Override
public void configure(Map<String, ?> configs) {
// 配置拦截器
System.out.println("Configuring the interceptor with: " + configs);
}
}
之后,生产者只要调用send方法就会进入拦截器
消费者消息拦截
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
interceptor-classes: com.example.kafka.interceptor.LoggingInterceptor
package com.example.kafka.interceptor;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.HashMap;
import java.util.Map;
public class LoggingInterceptor implements ConsumerInterceptor<String, String> {
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
// 在消息消费前记录日志
System.out.println("Consuming messages:");
for (ConsumerRecord<String, String> record : records) {
System.out.println("Before consuming: " + record);
}
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
// 在提交偏移量时记录日志
System.out.println("Committing offsets:");
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
System.out.println("Committed offset: " + entry.getKey() + " -> " + entry.getValue().offset());
}
}
@Override
public void close() {
// 清理资源
System.out.println("Closing the consumer interceptor");
}
@Override
public void configure(Map<String, ?> configs) {
// 配置拦截器
System.out.println("Configuring the consumer interceptor with: " + configs);
}
}
三、序列化机制
Producer发送消息要通过序列化器(Serializer)将消息对象转换成字节数组,才能通过网络传输到服务端,消费端则需要通过反序列化器(Deserializer)从服务端拉取字节数组转成消息对象。
配置方式
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
或者
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
目前Kafka提供了十几张序列化器,常见的序列化器有:
ByteArraySerializer // 序列化Byte数组,本质上什么都不用做。
ByteBufferSerializer // 序列化ByteBuffer。
BytesSerializer // 序列化Kafka自定义的Bytes类。
StringSerializer // 序列化String类型。
LongSerializer // 序列化Long类型。
IntegerSerializer // 序列化Integer类型。
ShortSerializer // 序列化Short类型。
DoubleSerializer // 序列化Double类型。
FloatSerializer // 序列化Float类型。
自定义序列化
自定义的序列化器类需要实现Kafka提供的Serializer接口
public class UserSerializer implements Serializer<UserBO> {
private static final Logger logger = LoggerFactory.getLogger(UserSerializer.class);
@Override
public byte[] serialize(String topic, UserBO userBO) {
logger.info("进行序列化转换");
return JSON.toJSONBytes(userBO);
}
}
自定义的反序列化器类需要实现Kafka提供的Deserializer接口
public class UserDesSerializer implements Deserializer<UserBO> {
private static final Logger logger = LoggerFactory.getLogger(UserDesSerializer.class);
@Override
public UserBO deserialize(String s, byte[] bytes) {
logger.info("进行反序列化转换");
return JSON.parseObject(bytes, UserBO.class);
}
}
//自定义序列化器
setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class.getName());
//自定义反序列化器
setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDesSerializer.class.getName());
四、消息分区
所谓分区策略是决定生产者将消息发送到哪个分区的算法
Kafka 为我们提供了默认的分区策略,同时它也支持你自定义分区策略,常见的分区策略包含以下几种:轮询策略、随机策略 、按消息键保序策略(哈希策略)
- 指明
partition
的情况下,直接将指明的值直接作为partiton
值; - 没有指明
partition
值但有key
的情况下,将key
的hash
值与topic
的partition
数进行取余得到partition
值; - 既没有
partition
值又没有key
值的情况下,kafka
采用Sticky Partition
(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,kafka再随机一个分区进行使用,(以前是一条条的轮询,现在是一批次的轮询)
自定义分区
我们有以下需求,根据key进行分区,如果为空或者不以数字结尾则分配到0分区,否则,拿结尾的数字与分区数求余
/**
* 自定义分区:根据key进行分区,如果为空或者不以数字结尾则分配到0分区,否则,拿结尾的数字与分区数求余
*/
public class CustomPartitioner implements Partitioner {
private static final Logger logger = LoggerFactory.getLogger(CustomPartitioner.class);
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic,
Object key, byte[] keyBytes,
Object value, byte[] valueBytes,
Cluster cluster) {
if (null == key) {
return 0;
}
char[] keyCharArray = key.toString().toCharArray();
int n = keyCharArray[keyCharArray.length - 1] - '0';
n = n < 0 ? 0 : n % cluster.partitionCountForTopic(topic);
return n;
}
@Override
public void close() {
}
}
prop.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.heima.kafka.product.partitioner.CustomPartitioner");
五、消息发送方式
在消息发送流程的第三步中,异步发送线程发起TCP数据包之后,根据配置的 acks 参数处理响应:
- **
acks=0
**:生产者不会等待任何确认,消息可能丢失。 - **
acks=1
**:生产者等待领导者确认,但不等待所有副本确认。 - **
acks=all
**:生产者等待所有副本确认,确保消息不会丢失。
对应也是三种不同的消息发送方式,接下来介绍每一种方式的特点
发送并忘记
本质上也是一种异步发送的方式,消息先存储在缓冲区中,达到设定条件后批量发送,当然这是kafka吞吐量最高的一种方式,并配合参数acks=0,这样生产者不需要等待服务器的响应,以网络能支持的最大速度发送消息,但是也是消息最不可靠的一种方式,因为对于发送失败的消息没有做任何处理。
同步发送
同步发送,send()方法会返回Futrue对象,通过调用Futrue对象的get()方法,等待直到结果返回,根据返回的结果可以判断是否发送成功
如果业务要求消息必须是按顺序发送的,那么可以使用同步的方式,并且只能在一个partation上,结合参数设置retries的值让发送失败时重试,设置max_in_flight_requests_per_connection=1,可以控制生产者在收到服务器晌应之前只能发送1个消息,在消息发送成功后立刻flush,从而控制消息顺序发送。
RecordMetadata metadata = producer.send(record).get();
异步发送
异步发送,在调用send()方法的时候指定一个callback函数,当broker接收到返回的时候,该callback函数会被触发执行
如果业务需要知道消息发送是否成功,并且对消息的顺序不关心,那么可以用异步+回调的方式来发送消息,配合参数
retries=0
,并将发送失败的消息记录到日志文件中;要使用callback函数,先要实现
org.apache.kafka.clients.producer.Callback
接口,该接口只有一个
onCompletion
方法,如果发送异常,
onCompletion
的参数
Exceptione
会为非空。
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", "key", "value");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.error("发送出现异常,message:[{}]", e.getMessage(), e);
} else {
logger.info("发送消息的主题:【{}】,发送分区:【{}】,发送偏移量:【{}】", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
}
}
});
六、消息缓冲池
消息缓冲池也就是我们的内部缓冲池,当我们应用程序调用kafka客户端 producer发送消息的时候,在kafka客户端内部,会把属于同一个topic分区的消息先汇总起来,形成一个batch,真正发往kafka服务器的消息都是以batch为单位的
生产端
ProducerRecord
经过序列化器、分区器处理后,并不是直接发往broker端,而是发送到客户端的消息缓冲池(Accumulator) 中,最后交由Sender线程发往broker端。
缓冲池最大大小由参数
buffer.memory
控制,默认是
32M
,当生产消息的速度过快导致
buffer
满了的时候,将阻塞
max.block.ms
时间,超时抛异常,所以
buffer
的大小可以根据实际的业务情况进行适当调整。
缓冲池有什么好处?
- 通过批量发送消息,可以显著减少网络请求的次数,提高发送效率。
- 即使生产者发送消息的速度很快,内部缓冲区也可以平滑地处理这些消息,避免瞬间流量高峰对 Kafka 集群造成压力。
- 每一次发送数据都有一些其他开销,如建立连接的开销,协议首部的开销等,所以什么时候发送数据,一次发送多大的数据都需要我们认真考虑
批量发送
发送到缓冲池中消息将会被分为一个一个的
batch
,分批次的发送到
broker
端
一次发多大数据?
批次大小由参数
batch.size
控制,默认
16KB
,这就意味着正常情况下消息会攒够
16KB
时才会批量发送到
broker
端,所以一般减小
batch
大小有利于降低消息延时,增加
batch
的大小有利于提升吞吐量。
什么时候发数据?
但是消息并不是必须要达到一个
batch
尺寸才会批量发送到服务端呢,Producer端提供了另一个重要参数
linger.ms
,用来控制
batch
最大的空闲时间,超过该时间的
batch
也会被发送到
broker
端。
内存池复用
前面说过 Producer 发送消息是批量的, 因此消息都会先写入 Producer 的内存中进行缓冲, 直到多条消息 组成了一个 Batch,才会 通过网络 把 Batch 发给 Broker
当这个 Batch 发送完毕后,显然这部分数据还会在 Producer 端的 JVM 内存中,由于不存在引用了,它是可以被 JVM 回收掉的。
但是当
JVM GC
时一定会存在
Stop The World
的过程,即使采用最先进的垃圾回收器,也势必会导致工作线程的短暂停顿,这对于 Kafka 这种高并发场景肯定会带来性能上的影响。
有了这个背景,便引出了 Kafka 非常优秀的内存池机制,它和连接池、线程池的本质一样,都是为了提高复用,减少频繁的创建和释放。
Producer 一上来就会占用一个固定大小的内存块,比如 32MB,然后将 32 MB 划分成 M 个小内存块(比如一个小内存块大小是 16KB)当需要创建一个新的 Batch 时,直接从内存池中取出一个 16 KB 的内存块即可,然后往里面不断写入消息,但最大写入量就是 16 KB,接着将 Batch 发送给 Broker ,此时该内存块就可以还回到缓冲池中继续复用了,根本不涉及垃圾回收。
几个重要的配置参数
batch.size
配置设置用于每批处理使用的内存字节数,默认为
16384=16KB
,当使用的内存满的时候,生产者会发送当前批次的所有消息,但是,这并不意味着生产者会一直等待使用的内存变满,根据下面
linger.ms
配置的时间也会触发消息发送,设置较小的值会增加发送的频率,从而可能会减少吞吐量,设置较大的值会使用较多的内存,设置为0会关闭批处理的功能。
linger.ms
KafkaProducer会在当前批次使用的内存已满或等待时间到达
linger.ms
配置时间的时候发送消息,当
linger.ms>0
时,延时性会增加,但会提高吞吐量,因为会减少消息发送频率。
max.block.ms
当发送缓冲区已满或者元数据不可用时,生产者调用
send()
和
partitionsFor()
方法会被阻塞,默认阻塞时间为60000ms=1分钟,由于使用用户自定义的序列化器和分区器造成的阻塞将不会计入此时间。
buffer.memory
如果
buffer.memory
设置的太小,可能导致的问题是:消息快速的写入
内存缓冲
里,但
Sender线程
来不及把
Request
发送到
Kafka服务器
,会造成内存缓冲很快就被写满,而一旦被写满,就会阻塞用户线程,不让继续往
Kafka
写消息了。
所以
buffer.memory
参数需要结合实际业务情况压测,需要测算在生产环境中用户线程会以每秒多少消息的频率来写入内存缓冲,经过压测,调试出来一个合理值。
上述常用配置示例代码
public class CustomProducerParameters {
static {
// batch.size:批次大小,默认16K
KafkaConstant.PRODUCT_PROPERTIES.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
// linger.ms:等待时间,默认0,修改为20ms
KafkaConstant.PRODUCT_PROPERTIES.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
// RecordAccumulator:缓冲区大小,默认32M,buffer.memory,修改为64m
KafkaConstant.PRODUCT_PROPERTIES.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "67108864");
// compression.type:压缩,默认none,可配置值gzip、snappy、lz4和zstd,修改为snappy
KafkaConstant.PRODUCT_PROPERTIES.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
}
private static KafkaProducer<String, String> producer = new KafkaProducer<String, String>(KafkaConstant.PRODUCT_PROPERTIES);
public static void main(String[] args) {
// 4.调用send方法,发送消息
for (int i = 0; i < 5; i++) {
producer.send(new ProducerRecord<>("test", "testMessage" + i));
}
// 5.关闭资源
producer.close();
}
}
版权归原作者 Java 第一深情 所有, 如有侵权,请联系我们删除。