说明:以下描述的源码都是基于最新版,老版本可能会有所不同。
一. 查找源码入口
**kafka-console-producer.sh**是消息生产者的脚本,我们从这里入手,可以看到源码的入口:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
从上面的代码可以得知,源码是kafka.tools.ConsoleProducer,这是一个scala的文件。
二. 利用源码启动生产者进行调试
阅读源码最好的方式就是在debug下,边看边断点跟踪,所以我们先把环境配置好,以便程序可以run起来。
由于kafka server端配置了认证模式,那么在client侧,也需要加上认证的配置,否则会导致连接server失败。如何开启认证模式,可参考我之前写的这篇文章。我们可以参考**kafka-console-producer.sh**脚本运行时传入的参数,对应填入idea的Run/Debug Configurations界面中。
脚本:
/kafka/bin/kafka-console-producer.sh --bootstrap-server=127.0.0.1:9092 --topic=notif.test --producer.config=/kafka/config/topic.properties
topic.properties的内容如下:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
这个配置也可以放在producer.properties里面,下面会看到。
idea界面:
红色框起来的部分,就是和无认证模式下的区别,没有这两个参数,连接server就会失败。client.jaas.conf里面的参数,请参考上面提到的开启认证模式的文章。producer.properties是kafka自带配置文件,我们仅需要增加如下配置即可:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
好了,一切就绪,就可以执行run了,控制台如果没有错误,那就说明启动成功了,如下:
说到这里,不得不感慨一下,平时基本上没有run过命令行输入内容的代码。然后,我停留在这个界面半个小时,一直以为没有连接成功,各种排查是哪里配置的不对之类的。突然间想起去看下server端的日志,结果发现连上了。然后试着在上面红色日志下方去输入内容(见下图),好家伙,consumer侧收到了,大写的尴尬!
kafka同学,你说你要是在我输入的上方再写点提示日志该多好啊。。。
三. 查看生产者连接服务端的过程
既然代码跑起来了,那就可以开始我们的阅读之旅了。如果不想看代码,那就先来一张时序图吧,简单粗暴:
首先,在**ConsoleProducer.scala**中找到入口函数main()方法,这是任何编程语言的启动之源:
def main(args: Array[String]): Unit = {
try {
val config = new ProducerConfig(args)
val input = System.in
val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config))
try loopReader(producer, newReader(config.readerClass, getReaderProps(config)), input, config.sync)
finally producer.close()
Exit.exit(0)
} catch {
case e: joptsimple.OptionException =>
System.err.println(e.getMessage)
Exit.exit(1)
case e: Exception =>
e.printStackTrace()
Exit.exit(1)
}
}
可以看出,它调用了**val producer = new KafkaProducerArray[Byte], Array[Byte]**。
** KafkaProducer**是java代码,查看其最终调用的构造函数:
KafkaProducer(ProducerConfig config,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors<K, V> interceptors,
Time time) {
try {
this.producerConfig = config;
this.time = time;
// 此处省略多行代码
this.errors = this.metrics.sensor("errors");
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
close(Duration.ofMillis(0), true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}
关注**this.sender = newSender(logContext, kafkaClient, this.metadata);**这行代码,进入newSender()函数:
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
// 此处省略部分代码
KafkaClient client = kafkaClient != null ? kafkaClient : ClientUtils.createNetworkClient(producerConfig,
this.metrics,
"producer",
logContext,
apiVersions,
time,
maxInflightRequests,
metadata,
throttleTimeSensor,
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null));
short acks = Short.parseShort(producerConfig.getString(ProducerConfig.ACKS_CONFIG));
return new Sender(参数省略);
}
注意这行代码:
KafkaClient client = kafkaClient != null ? kafkaClient : ClientUtils.createNetworkClient(参数省略);
前面都没有对kafkaClient进行赋值,所以这行代码可简化为:
KafkaClient client = ClientUtils.createNetworkClient(参数省略)
接下来查看**ClientUtils.**createNetworkClient()函数,最终会调用下面这个方法:
public static NetworkClient createNetworkClient(入参省略) {
ChannelBuilder channelBuilder = null;
Selector selector = null;
try {
channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);
selector = new Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG),
metrics,
time,
metricsGroupPrefix,
channelBuilder,
logContext);
return new NetworkClient(metadataUpdater,
metadata,
selector,
clientId,
maxInFlightRequestsPerConnection,
后续参数省略);
} catch (Throwable t) {
closeQuietly(selector, "Selector");
closeQuietly(channelBuilder, "ChannelBuilder");
throw new KafkaException("Failed to create new NetworkClient", t);
}
}
我们在第二步调试的时候,不是加了认证的配置参数吗,处理认证配置的方法就在上面的方法里面,具体是如下代码:
channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);
public static ChannelBuilder createChannelBuilder(AbstractConfig config, Time time, LogContext logContext) {
SecurityProtocol securityProtocol = SecurityProtocol.forName(config.getString(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
String clientSaslMechanism = config.getString(SaslConfigs.SASL_MECHANISM);
return ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, config, null,
clientSaslMechanism, time, true, logContext);
**ChannelBuilders.**createChannelBuilder()方法只是外层的判断:
public static ChannelBuilder clientChannelBuilder(入参省略) {
if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
if (contextType == null)
throw new IllegalArgumentException("`contextType` must be non-null if `securityProtocol` is `" + securityProtocol + "`");
if (clientSaslMechanism == null)
throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
}
return create(securityProtocol, ConnectionMode.CLIENT, contextType, config, listenerName, false, clientSaslMechanism,
saslHandshakeRequestEnable, null, null, time, logContext, null);
}
详细的处理逻辑是在**ChannelBuilders.**create()方法里面:
private static ChannelBuilder create(入参省略) {
Map<String, Object> configs = channelBuilderConfigs(config, listenerName);
ChannelBuilder channelBuilder;
switch (securityProtocol) {
case SSL:
requireNonNullMode(connectionMode, securityProtocol);
channelBuilder = new SslChannelBuilder(connectionMode, listenerName, isInterBrokerListener, logContext);
break;
case SASL_SSL:
case SASL_PLAINTEXT:
// 业务代码太长,省略
break;
case PLAINTEXT:
channelBuilder = new PlaintextChannelBuilder(listenerName);
break;
default:
throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol);
}
channelBuilder.configure(configs);
return channelBuilder;
}
好了,现在又回到**ClientUtils.**createNetworkClient()方法:
public static NetworkClient createNetworkClient(入参省略) {
ChannelBuilder channelBuilder = null;
Selector selector = null;
try {
channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);
selector = new Selector(参数省略);
return new NetworkClient(metadataUpdater,
metadata,
selector,
clientId,
maxInFlightRequestsPerConnection,
后续参数省略);
} catch (Throwable t) {
closeQuietly(selector, "Selector");
closeQuietly(channelBuilder, "ChannelBuilder");
throw new KafkaException("Failed to create new NetworkClient", t);
}
}
创建channelBuilder之后,紧接着是创建一个Selector对象,然后再创建一个NetworkClient对象,并返回。创建Selector和NetworkClient对象的构造函数都只是初始化各类参数,没有值得需要注意的地方,所以这里就跳过了。
上述代码执行完毕,则会回到**KafkaProducer.**newSender()方法:
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
// 此处省略部分代码
KafkaClient client = kafkaClient != null ? kafkaClient : ClientUtils.createNetworkClient(参数省略);
short acks = Short.parseShort(producerConfig.getString(ProducerConfig.ACKS_CONFIG));
return new Sender(参数省略);
}
从前面的代码可知,ClientUtils.createNetworkClient()方法返回一个NetworkClient对象,kafkaClient是NetworkClient的父类,所以kafkaClient client即NetworkClient client。kafkaClient client赋值完成之后,接着是创建一个Sender对象,并返回。 因为Sender对象也只是一些初始化操作,所以这里也跳过。
**KafkaProducer.**newSender()方法返回一个**Sender**对象,然后回到**KafkaProducer**的构造方法:
KafkaProducer(入参省略) {
try {
// 此处省略多行代码
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka producer started");
} catch (Throwable t) {
// 此处省略多行代码
}
}
赋值sender之后,接下来是创建KafkaThread对象,构造方法如下:
public KafkaThread(final String name, Runnable runnable, boolean daemon) {
super(runnable, name);
configureThread(name, daemon);
}
由此可以看出KafkaThread只是对线程做了一些附加的工作,KafkaThread对象创建完成,下一步就是执行start()方法。在KafkaThread的构造函数中传入的Runable参数是Sender对象,所以,我们需要去看下Sender的run()方法:
/**
* The main run loop for the sender thread
*/
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");
if (transactionManager != null)
transactionManager.setPoisonStateOnInvalidTransition(true);
// main loop, runs until close is called
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// okay we stopped accepting requests but there may still be
// requests in the transaction manager, accumulator or waiting for acknowledgment,
// wait until these are completed.
while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// Abort the transaction if any commit or abort didn't go through the transaction manager's queue
while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting()) {
log.info("Aborting incomplete transaction due to shutdown");
try {
// It is possible for the transaction manager to throw errors when aborting. Catch these
// so as not to interfere with the rest of the shutdown logic.
transactionManager.beginAbort();
} catch (Exception e) {
log.error("Error in kafka producer I/O thread while aborting transaction when during closing: ", e);
// Force close in case the transactionManager is in error states.
forceClose = true;
}
}
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
if (forceClose) {
// We need to fail all the incomplete transactional requests and batches and wake up the threads waiting on
// the futures.
if (transactionManager != null) {
log.debug("Aborting incomplete transactional requests due to forced shutdown");
transactionManager.close();
}
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
由于这部分代码是重点,所以就没有对代码做简化。上面的代码可以看出,多次调用了runOnce()方法,所以我们来看下这个方法是在做什么:
/**
* Run a single iteration of sending
*
*/
void runOnce() {
if (transactionManager != null) {
try {
transactionManager.maybeResolveSequences();
RuntimeException lastError = transactionManager.lastError();
// do not continue sending if the transaction manager is in a failed state
if (transactionManager.hasFatalError()) {
if (lastError != null)
maybeAbortBatches(lastError);
client.poll(retryBackoffMs, time.milliseconds());
return;
}
if (transactionManager.hasAbortableError() && shouldHandleAuthorizationError(lastError)) {
return;
}
// Check whether we need a new producerId. If so, we will enqueue an InitProducerId
// request which will be sent below
transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
if (maybeSendAndPollTransactionalRequest()) {
return;
}
} catch (AuthenticationException e) {
// This is already logged as error, but propagated here to perform any clean ups.
log.trace("Authentication exception while processing transactional request", e);
transactionManager.authenticationFailed(e);
}
}
long currentTimeMs = time.milliseconds();
long pollTimeout = sendProducerData(currentTimeMs);
client.poll(pollTimeout, currentTimeMs);
}
上述代码中最重要的方法应该就是client.poll()了吧,查看poll()方法的注释信息,定义在KafkaClient中:
/**
* Do actual reads and writes from sockets.
*
* @param timeout The maximum amount of time to wait for responses in ms, must be non-negative. The implementation
* is free to use a lower value if appropriate (common reasons for this are a lower request or
* metadata update timeout)
* @param now The current time in ms
* @throws IllegalStateException If a request is sent to an unready node
*/
List<ClientResponse> poll(long timeout, long now);
上面注释表示该方法用于对报文进行读写工作。
好了,现在回到**KafkaProducer**的构造方法,当执行this.ioThread.start()代码之后,**KafkaProducer**对象的初始化基本上就算完成了。但是,你们发现没有,上面的代码执行流程,都没有发现连接kafka server的代码呢?
起初我怀疑是不是阅读源码时,把哪里的代码给遗漏了,于是又回头走了一遍,还是没发现连接server的过程。没办法了,开启debug模式吧。为了避免一步步debug,根据我的经验,在开启debug之前,我们可以回头看下,上述的各个java类中,哪一个类里面包含了连接server的方法,然后把断点加上去。
因为上述代码就只有几个类,寻找的过程还是很简单的。很快,我就锁定到**Selector**这个类里面,代码如下:
/**
* Begin connecting to the given address and add the connection to this nioSelector associated with the given id
* number.
* <p>
* Note that this call only initiates the connection, which will be completed on a future {@link #poll(long)}
* call. Check {@link #connected()} to see which (if any) connections have completed after a given poll call.
* @param id The id for the new connection
* @param address The address to connect to
* @param sendBufferSize The send buffer for the new connection
* @param receiveBufferSize The receive buffer for the new connection
* @throws IllegalStateException if there is already a connection for that id
* @throws IOException if DNS resolution fails on the hostname or if the broker is down
*/
@Override
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
ensureNotRegistered(id);
SocketChannel socketChannel = SocketChannel.open();
SelectionKey key = null;
try {
configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
boolean connected = doConnect(socketChannel, address);
key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
if (connected) {
// OP_CONNECT won't trigger for immediately connected channels
log.debug("Immediately connected to node {}", id);
immediatelyConnectedKeys.add(key);
key.interestOps(0);
}
} catch (IOException | RuntimeException e) {
if (key != null)
immediatelyConnectedKeys.remove(key);
channels.remove(id);
socketChannel.close();
throw e;
}
}
看方法上面的注释,也很符合我的猜测,来吧,上断点。 然后查看断点处的线程栈:
没想到吧, 连接server的流程,是执行**KafkaThread.start()方法才触发的。前面提到Sender.**run()方法是重点,贴出的代码未作简化处理,原因正源于此。执行顺序:
run()->runOnce()->maybeSendAndPollTransactionalRequest()->.......
看下**Sender.**maybeSendAndPollTransactionalRequest()的源码:
/**
* Returns true if a transactional request is sent or polled, or if a FindCoordinator request is enqueued
*/
private boolean maybeSendAndPollTransactionalRequest() {
// 省略部分代码
try {
// 省略部分代码
if (targetNode != null) {
if (!awaitNodeReady(targetNode, coordinatorType)) {
log.trace("Target node {} not ready within request timeout, will retry when node is ready.", targetNode);
maybeFindCoordinatorAndRetry(nextRequestHandler);
return true;
}
} else if (coordinatorType != null) {
// 省略部分代码
} else {
// 省略部分代码
}
// 省略部分代码
}
}
进入**Sender.**awaitNodeReady()方法:
private boolean awaitNodeReady(Node node, FindCoordinatorRequest.CoordinatorType coordinatorType) throws IOException {
if (NetworkClientUtils.awaitReady(client, node, time, requestTimeoutMs)) {
if (coordinatorType == FindCoordinatorRequest.CoordinatorType.TRANSACTION) {
// Indicate to the transaction manager that the coordinator is ready, allowing it to check ApiVersions
// This allows us to bump transactional epochs even if the coordinator is temporarily unavailable at
// the time when the abortable error is handled
transactionManager.handleCoordinatorReady();
}
return true;
}
return false;
}
接着进入**NetworkClientUtils.**awaitReady() :
public static boolean awaitReady(KafkaClient client, Node node, Time time, long timeoutMs) throws IOException {
if (timeoutMs < 0) {
throw new IllegalArgumentException("Timeout needs to be greater than 0");
}
long startTime = time.milliseconds();
if (isReady(client, node, startTime) || client.ready(node, startTime))
return true;
// 省略部分代码
}
接着进入**NetworkClientUtils.**isReady():
public static boolean isReady(KafkaClient client, Node node, long currentTime) {
client.poll(0, currentTime);
return client.isReady(node, currentTime);
}
接着进入**NetworkClient.**poll():
@Override
public List<ClientResponse> poll(long timeout, long now) {
ensureActive();
// 省略部分代码
long metadataTimeout = metadataUpdater.maybeUpdate(now);
long telemetryTimeout = telemetrySender != null ? telemetrySender.maybeUpdate(now) : Integer.MAX_VALUE;
// 省略部分代码
return responses;
}
继续进入**NetworkClient.DefaultMetadataUpdater.**maybeUpdate()方法:
class DefaultMetadataUpdater implements MetadataUpdater {
// 省略部分代码
DefaultMetadataUpdater(Metadata metadata) {
this.metadata = metadata;
this.inProgress = null;
}
// 省略部分代码
public long maybeUpdate(long now) {
// 省略部分代码
return maybeUpdate(now, leastLoadedNode.node());
}
}
继续进入**NetworkClient.DefaultMetadataUpdater.**maybeUpdate()方法:
private long maybeUpdate(long now, Node node) {
// 省略部分代码
if (connectionStates.canConnect(nodeConnectionId, now)) {
// We don't have a connection to this node right now, make one
log.debug("Initialize connection to node {} for sending metadata request", node);
initiateConnect(node, now);
return reconnectBackoffMs;
}
return Long.MAX_VALUE;
}
继续进入**NetworkClient.**initiateConnect()方法:
private void initiateConnect(Node node, long now) {
String nodeConnectionId = node.idString();
try {
connectionStates.connecting(nodeConnectionId, now, node.host());
InetAddress address = connectionStates.currentAddress(nodeConnectionId);
log.debug("Initiating connection to node {} using address {}", node, address);
// 这里就是连接server的终极入口了
selector.connect(nodeConnectionId,
new InetSocketAddress(address, node.port()),
this.socketSendBuffer,
this.socketReceiveBuffer);
} catch (IOException e) {
// 省略部分代码
}
}
好了,终于看到希望了,进入**Selector.**connect()方法,正是我之前打断点的代码,这里就不再占用篇幅了。
通过打断点跟踪的方式,终于找到了生产者连接server的过程。连接成功之后,就可以发送消息了。我们再回过头来看下**ConsoleProducer.**main()方法:
def main(args: Array[String]): Unit = {
try {
val config = new ProducerConfig(args)
// 接受控制台输入
val input = System.in
// 连接server
val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config))
// 发送消息
try loopReader(producer, newReader(config.readerClass, getReaderProps(config)), input, config.sync)
finally producer.close()
Exit.exit(0)
} catch {
// 省略部分代码
}
}
总结一下,main()方法就做了三件事:
- 接受控制台输入
- 连接server
- 发送消息
发送消息后续有机会再研究吧,本章内容到此完结,撒花!
版权归原作者 IT布道 所有, 如有侵权,请联系我们删除。