从本篇开始 打算用三篇文章 分别介绍下Producer生产消费,Consumer消费消息 以及Spring是如何集成Kafka 三部分,致于对于Broker的源码解析,因为是scala语言写的,暂时不打算进行学习分享。
总体介绍
- clients : 保存的是Kafka客户端代码,主要就是生产者和消费者代码
- config:保存Kafka的配置文件,其中比较重要的配置文件是server.properties。
- connect目录:保存Connect组件的源代码。我在开篇词里提到过,Kafka Connect组件是用来实现Kafka与外部系统之间的实时数据传输的。
- core目录:保存Broker端代码。Kafka服务器端代码全部保存在该目录下。
而一条消息的整体流转过程其实就是经过三部分,也就是Producer\Broker\Consumer。
因为是对主要核心流程的分析,所以只会截核心代码。具体后面细节,在说。
producer整体流程
对于Producer来说,其实就是几部分。
- 初始化、发送流程、缓冲区
初始化流程
设置分区器
// 设置分区器this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG,Partitioner.class,Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
设置重试时间,默认100ms,如果配置Kafka可以重试,retries制定重试次数,retryBackoffMs指定重试的间隔
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
获取Key和Value的序列化器
// 序列化器if(keySerializer ==null){this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,Serializer.class);this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)),true);}else{
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);this.keySerializer = keySerializer;}if(valueSerializer ==null){this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,Serializer.class);this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)),false);}else{
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);this.valueSerializer = valueSerializer;}
拦截器
// 设置拦截器List<ProducerInterceptor<K,V>> interceptorList =(List) config.getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptor.class,Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));if(interceptors !=null)this.interceptors = interceptors;elsethis.interceptors =newProducerInterceptors<>(interceptorList);
其他参数
// 设置最大消息为多大,默认是1M 默认是16384this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);// 设置缓存大小 默认是32M 默认是33554432 RecordAccumulator=32MBthis.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);// 设置压缩类型 可以提升性能this.compressionType =CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));this.accumulator =newRecordAccumulator(logContext,
// 因为是通过缓冲区发送消息的,所以需要消息累计器RecordAccumulator.PartitionerConfig partitionerConfig =newRecordAccumulator.PartitionerConfig(
enableAdaptivePartitioning,
config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG));
初始化元数据
// 初始化集群元数据if(metadata !=null){this.metadata = metadata;}else{this.metadata =newProducerMetadata(retryBackoffMs,
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
logContext,
clusterResourceListeners,Time.SYSTEM);this.metadata.bootstrap(addresses);}
创建Sender线程,其中包含一个重要的网络组件NetWorkClient
// 创建sender线程this.sender =newSender(logContext, kafkaClient,this.metadata);// 线程nameString ioThreadName = NETWORK_THREAD_PREFIX +" | "+ clientId;// 封装起来 设置为守护线程 并启动this.ioThread =newKafkaThread(ioThreadName,this.sender,true);// 线程启动this.ioThread.start();
发送消息流程
发送消息的过程
publicFuture<RecordMetadata>send(ProducerRecord<K,V>record,Callback callback){// 执行拦截器逻辑ProducerRecord<K,V> interceptedRecord =this.interceptors.onSend(record);returndoSend(interceptedRecord, callback);}
先执行拦截器,可以发现就是遍历拦截器,然后执行对应的onSend()方法。当我们想增加一个拦截器,直接实现对应的接口,重写onSend()方法,然后Kafka就会调用我们的onSend方法。通过提供一个拓展点进行使用。
publicProducerRecord<K,V>onSend(ProducerRecord<K,V>record){ProducerRecord<K,V> interceptRecord =record;for(ProducerInterceptor<K,V> interceptor :this.interceptors){try{
interceptRecord = interceptor.onSend(interceptRecord);}catch(Exception e){}}return interceptRecord;}
从Kafka Broker集群获取元数据metadata
// 从broker获取元数据
clusterAndWaitTime =waitOnMetadata(record.topic(),record.partition(), nowMs, maxBlockTimeMs);
对key和value进行序列化,调用对应的serialize的方法。
byte[] serializedKey;try{// 选择对应的序列化进行操作
serializedKey = keySerializer.serialize(record.topic(),record.headers(),record.key());}catch(ClassCastException cce){}byte[] serializedValue;try{
serializedValue = valueSerializer.serialize(record.topic(),record.headers(),record.value());}catch(ClassCastException cce){}
// 选择具体的分区int partition =partition(record, serializedKey, serializedValue, cluster);// 消息缓存到RecoredAccumulator
result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
serializedValue, headers, appendCallbacks, remainingWaitMs,false, nowMs, cluster);// 消息发送的条件// 缓冲区数据大小达到batch.size 或者linnger.ms达到上限后 唤醒sneder线程。if(result.batchIsFull || result.newBatchCreated){
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch",record.topic(), appendCallbacks.getPartition());this.sender.wakeup();}
Sender线程
runOnce();long pollTimeout =sendProducerData(currentTimeMs);
缓冲区、
这篇讲解很详细 https://www.cnblogs.com/rwxwsblog/p/14754810.html
生产者核心参数配置
bootstrap.servers:连接Broker配置,一般就是xxxx:9092
key.serializer 和 value.serializer:对key和value进行序列化器,可以自定义,一般就是String方式
buffer.memory:RecordAccumulator 缓冲区总大小,默认32m。
batch.size: 消息会以batch的方式进行发送,这是一批数据的大小 默认是16K
linger.ms:发送消息的时机,如果没有达到batch.size or linger.ms的时间就会发送 默认是0ms 立即发送
acks: 0: 不落盘 1:只有leader落盘 -1(all) : leader和所有从节点持久化成功 默认是-1
max.in.flight.requests.per.connection:允许最多没有返回 ack 的次数,默认为 5
retries: 消息发送失败时,系统重发消息 默认值 2147483647
retry.backoff.ms:两次重试间隔 默认是100ms
enable.idempotence: 开启幂等性 默认true
compression.type: 压缩格式 默认是none
版权归原作者 qxlxi 所有, 如有侵权,请联系我们删除。