0


kafka忽略集群Node信息,直接向`bootstrap.servers`地址发送消息

背景

当kafka单机安装的时候或者集群安装的时候,kafka会先通过

bootstrap.servers

获取集群节点。
有时候网络复杂的时候

  • 如内网外部署
  • 地址映射
  • 代理转发等
bootstrap.servers

配置地址可能为一个公网地址

181.39.77.53:9092

,然而返回的节点为内网地址

172.16.31.33:9092

,此时由于未开通

172.16.31.33:9092

网络,导致访问失败。

此时通常有两种解决方案

  • 开通kafka返回节点的网络
  • 修改kafka连接方式,忽略返回的node节点信息,直接访问bootstrap.servers配置的地址 这里主要介绍下方案二,直接访问bootstrap.servers配置的地址

实现原理

  • 利用类加载机制,重复的类只会加载一次。
  • 同名类先加载的类先生效,后加载的类被忽略
  • 优先加载运行jar包内类。 可以通过在项目内新建同名类(包名也要相同),修改源码覆盖的方式来实现。

实现代码

  • 在项目内新建org.apache.kafka.clients.NetworkClient
  • 通过修改initiateConnect(Node node, long now)实现
  /**
     * Initiate a connection to the given node
     * @param node the node to connect to
     * @param now current time in epoch milliseconds
     */
    private void initiateConnect(Node node, long now) {

        String nodeConnectionId = node.idString();

        try {
            connectionStates.connecting(nodeConnectionId, now, node.host());
            InetAddress address = connectionStates.currentAddress(nodeConnectionId);

            // 开启单机版kafka连接
            // 如果单机kafka 直接连接`bootstrap.servers`中配置的节点
            if (HookConnectProperties.hooked) {
                KafkaChannel channel = ((Selector) selector).channel("-1");

                // 如果channel为空说明访问的`bootstrap.servers`
                // 不为空则开始访问kafka返回的集群节点
                if(channel != null) {
                    InetSocketAddress remoteAddress = null;
                    try {
                        // 尝试通过反射方式获取`remoteAddress`
                        Field field = KafkaChannel.class.getDeclaredField("remoteAddress");
                        field.setAccessible(true);
                        remoteAddress = (InetSocketAddress)field.get(channel);
                        log.debug("Initiating connection to node {} using address {}", node, address);
                        selector.connect(nodeConnectionId,
                                remoteAddress,
                                this.socketSendBuffer,
                                this.socketReceiveBuffer);

                    } catch (NoSuchFieldException | IllegalAccessException e) {
                        // 获取不到则获取远程地址
                        address = channel.socketAddress();
                        log.debug("Initiating connection to node {} using address {}", node, address);
                        selector.connect(nodeConnectionId,
                                new InetSocketAddress(address, node.port()),
                                this.socketSendBuffer,
                                this.socketReceiveBuffer);
                    }
                    return;
                }
            }
            log.debug("Initiating connection to node {} using address {}", node, address);
            selector.connect(nodeConnectionId,
                    new InetSocketAddress(address, node.port()),
                    this.socketSendBuffer,
                    this.socketReceiveBuffer);
        } catch (IOException e) {
            log.warn("Error connecting to node {}", node, e);
            // Attempt failed, we'll try again after the backoff
            connectionStates.disconnected(nodeConnectionId, now);
            // Notify metadata updater of the connection failure
            metadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty());
        }
    }
标签: kafka java MQ

本文转载自: https://blog.csdn.net/akaiyijian001/article/details/130717270
版权归原作者 AK_GCC 所有, 如有侵权,请联系我们删除。

“kafka忽略集群Node信息,直接向`bootstrap.servers`地址发送消息”的评论:

还没有评论