背景
当kafka单机安装的时候或者集群安装的时候,kafka会先通过
bootstrap.servers
获取集群节点。
有时候网络复杂的时候
- 如内网外部署
- 地址映射
- 代理转发等
bootstrap.servers
配置地址可能为一个公网地址
181.39.77.53:9092
,然而返回的节点为内网地址
172.16.31.33:9092
,此时由于未开通
172.16.31.33:9092
网络,导致访问失败。
此时通常有两种解决方案
- 开通kafka返回节点的网络
- 修改kafka连接方式,忽略返回的
node
节点信息,直接访问bootstrap.servers
配置的地址 这里主要介绍下方案二,直接访问bootstrap.servers
配置的地址
实现原理
- 利用类加载机制,重复的类只会加载一次。
- 同名类先加载的类先生效,后加载的类被忽略
- 优先加载运行jar包内类。 可以通过在项目内新建同名类(包名也要相同),修改源码覆盖的方式来实现。
实现代码
- 在项目内新建
org.apache.kafka.clients.NetworkClient
类 - 通过修改
initiateConnect(Node node, long now)
实现
/**
* Initiate a connection to the given node
* @param node the node to connect to
* @param now current time in epoch milliseconds
*/
private void initiateConnect(Node node, long now) {
String nodeConnectionId = node.idString();
try {
connectionStates.connecting(nodeConnectionId, now, node.host());
InetAddress address = connectionStates.currentAddress(nodeConnectionId);
// 开启单机版kafka连接
// 如果单机kafka 直接连接`bootstrap.servers`中配置的节点
if (HookConnectProperties.hooked) {
KafkaChannel channel = ((Selector) selector).channel("-1");
// 如果channel为空说明访问的`bootstrap.servers`
// 不为空则开始访问kafka返回的集群节点
if(channel != null) {
InetSocketAddress remoteAddress = null;
try {
// 尝试通过反射方式获取`remoteAddress`
Field field = KafkaChannel.class.getDeclaredField("remoteAddress");
field.setAccessible(true);
remoteAddress = (InetSocketAddress)field.get(channel);
log.debug("Initiating connection to node {} using address {}", node, address);
selector.connect(nodeConnectionId,
remoteAddress,
this.socketSendBuffer,
this.socketReceiveBuffer);
} catch (NoSuchFieldException | IllegalAccessException e) {
// 获取不到则获取远程地址
address = channel.socketAddress();
log.debug("Initiating connection to node {} using address {}", node, address);
selector.connect(nodeConnectionId,
new InetSocketAddress(address, node.port()),
this.socketSendBuffer,
this.socketReceiveBuffer);
}
return;
}
}
log.debug("Initiating connection to node {} using address {}", node, address);
selector.connect(nodeConnectionId,
new InetSocketAddress(address, node.port()),
this.socketSendBuffer,
this.socketReceiveBuffer);
} catch (IOException e) {
log.warn("Error connecting to node {}", node, e);
// Attempt failed, we'll try again after the backoff
connectionStates.disconnected(nodeConnectionId, now);
// Notify metadata updater of the connection failure
metadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty());
}
}
本文转载自: https://blog.csdn.net/akaiyijian001/article/details/130717270
版权归原作者 AK_GCC 所有, 如有侵权,请联系我们删除。
版权归原作者 AK_GCC 所有, 如有侵权,请联系我们删除。