0


kafka三节点集群2.8.0平滑升级到3.4.0过程指导

一、前言

在这里插入图片描述

Apache Kafka作为常用的开源分布式流媒体平台,可以实时发布、订阅、存储和处理数据流,多用于作为消息队列获取实时数据,构建对数据流的变化进行实时反应的应用程序,已被数千家公司用于高性能数据管道、流分析、数据集成和任务关键型应用程序。而其中Apache Kafka Connect 作为 Kafka 中用于和其他数据系统流式传输数据的服务,其独立运行版本可以在 Kafka 发布包中通过

bin/connect-standalone.sh

启动,默认会在 8083 端口开启 HTTP REST API 服务,攻击者可以利用基于SASLJAAS 配置和SASL 协议的任意Kafka客户端,对可对连接器(Connector)的配置进行操作,将连接器中的 Kafka 客户端 sasl.jaas.config 属性值设置为 com.sun.security.auth.module.JndiLoginModule(通过

producer.override.sasl.jaas.config, consumer.override.sasl.jaas.config

admin.override.sasl.jaas.config

属性进行配置)时,如果此时连接器连接到攻击者可控的 LDAP 服务器时容易受到反序列化攻击,也称JNDI 注入来实现远程任意代码执行。云平台中,Kafka Connect 服务通常用于提供 Kafka 数据迁移、数据同步的管道能力,其默认 HTTP API 开放于 8083 端口。

    因现场kafka选用版本较低,安全扫描时触发安全风险告警,低于 Kafka 升级3.4.0版本,涉及【Apache Kafka JNDI注入漏洞(CVE-2023-25194)】漏洞,该漏洞可允许远程代码执行,当攻击者可控制kafka-clients连接时的属性,可通过设置 ’ sasl.jaas.config ’ 属性为 ’ com.sun.security.auth.module.JndiLoginModule ’ 进行JNDI注入或反序列化利用,当JDK版本过低或者存在Gadgets时可导致远程代码执行。现场版本kafka_2.13-2.8.0,java version “1.8.0_361”,sasl.jaas.config 配置采用:

sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule;

☬ 漏洞复现: 执行创建文件,相关安全软件会报:JNDI注入的告警;影响版本: 2.3.0 至 3.3.2 版本Kafka Connect,原则上不影响 Kafka server (broker),但是会级联影响,最好还是升级到3.4.0及以上版本,升级JDK版本,可采用OpenJDK替换,相关经验已验证:OpenJDK1.8.0_362 + Zookeeper3.6.3 + Kafka3.4.0。

在这里插入图片描述

关联资源:官网升级指导、kafka部署快走

二、软件升级

本次软件要升级到3.4.0版本,版本说明见Kafka - Version 3.4.0,升级步骤也可参考官网升级指导。注意这里咱们是从2.1.x升级到3.x,不同于3.x升级,需注意存储 consumer offsets的schema和inter.broker.protocol.version里的版本,一旦升级后不支持降级。Apache Kafka 3.4.0以来,新增了一个系统属性:org.apache.kafka.disallowed.login.modules,用来在SASL JAAS配置中禁用有问题的登录模块,另外默认com.sun.security.auth.module.JndiLoginModule 在该版本中被禁用;另外需注意的是, Kafka 3.0中 Java 8 已注明废弃, 在Apache Kafka 4.0将不再支持;如果启用TLS,Java 11及更高版本的性能会明显更好,因此强烈建议使用它们。对应的zk版本稳定版为 3.5,注意zk需要足够的堆空间(3-5G,看数据量大小);另zk集群不宜过大,尤其是在写操作频繁的使用模式中,意味着会造成大量的集群内通信(写操作和随后的集群成员更新的配额),尽量让ZooKeeper系统尽可能小,并尽可能保持其独立性,以处理负载。

1)滚动升级步骤

1、升级前注意:在待升级节点server.properties文件中添加:inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 3.3, 3.2, etc.),如果是从0.11.0.x或更高版本升级的,并且没有重写message.format.version,那么只需要配置覆写: inter-broker protocol version参数的kafka版本即可,否者还需要设置:log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION,现场我们只需要添加::inter.broker.protocol.version=3.4;

2、对Broker滚动升级,一次升级一个节点或实例:关闭待升级的broker,解压新版本,然后迁移更新配置,重启新的代理,验证数据同步;这时,最新版本的broker程序会运行,之后可以验证kafka集群的业务行为和性能是否符合预期。如果出现任何问题,目前还可以进行降级回滚。Kafka集群的完整升级过程涵盖了broker侧和client侧,因broker是向下兼容的,升级过程中必需先成功升级所有的broker,对于Client(producer 和 consumer)在broker完成升级之后再升级。

3、一旦验证了集群的业务行为和性能满足预期,就可以通过更改协议版本来应用:inter.broker.protocol.version=3.4

4、然后逐个重启kafka brokers,以让inter.broker.protocol.version=3.4生效;这是,就不再支持降级了;

5、最后完成kafka整个集群状态及数据分布验证。inter.broker.protocol.version的值可参考如下,但官网升级指导里看也可直接写成:inter.broker.protocol.version=2.8(针对本次待升级的2.8版本也可),之后再修改为inter.broker.protocol.version=3.4;这种格式是可以的。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2)关停替换升级

即: 如果可接受停机,可将所有broker关闭后,更新版本替换后重启启动。但是会存在一个问题,关闭老版本,启动新版本的过程中,存在部分线上数据丢失的情况,此种情况建议在凌晨数据量少的时候进行;

3)升级计划或流程

在这里插入图片描述
测试方案:
在这里插入图片描述

2.1、软件下载

#MD5: CF 6B 8B 1C A1 12 9E 69  41 39 92 99 B6 CC 47 8Cwget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.12-3.4.0.tgz
md5sum kafka_2.12-3.4.0.tgz  #输出
cf6b8b1ca1129e6941399299b6cc478c  ./kafka_2.12-3.4.0.tgz

2.2、单节点/实例kafka升级

注意: 停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper 集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程。特别注意的是,broker之间的通讯协议和message的传输协议要与旧版本的一致,否则升级完的broker会因为通讯协议版本不一致导致节点一直报错(Connection to “broker id” was disconnected before the response was read),即升级新版本需现将旧版本的信息写入新版的配置文件中,以兼容当前(旧版)环境适配。从2.6.0版开始,对于Java 11或更新版本,TLSv1.3是默认启用的。客户机和服务器将协商是否支持TLSv1.3,否则将退回到TLSv1.2。

#1、解压缩新版本kafkatar-xzf kafka_2.12-3.4.0.tgz
cd ./kafka_2.12-3.4.0/config

#2、修改新旧版配置文件,添加版本参数,broker启动时会读取server.propertiesvim config/server.properties  #新增inter.broker.protocol.version=2.8#旧版本号(即官网说的当前的kafka版本),完成后重启当前kafka实例#顺序重启三个kafka实例
./bin/kafka-server-stop.sh ./config/server.properties
./bin/kafka-server-start.sh -daemon ./config/server.properties

#3、旧版查看topic
./bin/zkCli.sh -server10.100.1.94:2181  #查看节点[zk: 10.100.1.94:2181(CONNECTED)10] get /brokers/ids/1
{"features":{},"listener_security_protocol_map":{"INTERNAL":"SASL_PLAINTEXT"},"endpoints":["INTERNAL://10.100.1.94:9092"],"jmx_port":-1,"port":-1,"host":null,"version":5,"timestamp":"1709263320647"}

bin/kafka-topics.sh --bootstrap-server 10.100.1.94:9092 --list#或
./bin/kafka-topics.sh --zookeeper10.100.1.94:2181 --list
CCOPS-ZYZX-ENC-001-AVG-1DAY
CCOPS-ZYZX-ENC-001-AVG-1DAY-PROVINCE
CCOPS-ZYZX-ENC-001-AVG-1HOUR
CCOPS-ZYZX-ENC-001-AVG-1HOUR-PROVINCE
#基数
./bin/kafka-topics.sh --zookeeper10.100.1.94:2181 --list|wc-l257#查看kafka集群主题列表和分区信息
./bin/kafka-topics.sh --bootstrap-server 10.100.1.94:9092 --topic YWPT-ZYZX-YYP-001-AVG-5MIN --describe#或
./bin/kafka-topics.sh --zookeeper10.100.1.94:2181 --list#
./bin/kafka-server-start.sh ./config/server.properties

2.3、剩余2节点kafka升级

2.4、集群状态确认

2.5、kafka性能测试

#使用 8 个线程向名为 test-update-perf 的主题发送 500000 条大小为 50000 字节的消息,并将性能统计信息写入位于 ./perf-test 目录中的 CSV 文件中。性能统计信息将每 3000 毫秒报告一次。

./bin/kafka-producer-perf-test.sh --messages500000 --message-size 50000--topic test-update-perf --threads8 --broker-list * —show-detailed-stats --csv-reporter-enabled --metrics-dir ./perf-test --reporting-interval 3000#参数说明--messages500000:指定要发送的消息总数。
--message-size 50000:指定要发送的每条消息的大小(以字节为单位)。
--topic test-update-perf:指定要发送消息的目标主题。
--threads8:指定用于发送消息的线程数。
--broker-list *:指定 Kafka 集群中所有代理的列表。星号* 表示所有代理。
--show-detailed-stats:启用详细统计信息的显示,包括每个线程的统计信息。
--csv-reporter-enabled:启用 CSV 报告程序,它将性能统计信息写入 CSV 文件。
--metrics-dir ./perf-test:指定用于存储 CSV 报告程序输出的目录。
--reporting-interval 3000:指定性能统计信息报告的间隔(以毫秒为单位)

#示例2:perf-consumer-t4单线程向test-update-perf主题请求消费500000 条大小为 50000 字节的消息,测试性能
/bin/kafka-consumer-perf-test.sh --topic test-update-perf --zookeeper10.100.1.94:2183 --threads1--group perf-consumer-t4 --message-size 50000--messages10

三、附录

3.1、Kafka消息发送流程

kafka在消息发送的过程中,涉及到两个线程:main 线程和 Sender 线程。其中,main 线程中会创建了一个队列 RecordAccumulator,main 线程将消息发送给 RecordAccumulator;Sender 线程则不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。示意如下:

在这里插入图片描述

3.2、kafka应用场景

在这里插入图片描述

3.3、kafka架构回顾

在这里插入图片描述

3.4、zookeeper中kafka信息结构

在这里插入图片描述

3.5、Kafka Broker 总工作流程

在这里插入图片描述

3.6、消费者组消费流程

在这里插入图片描述

标签: kafka 分布式

本文转载自: https://blog.csdn.net/ximenjianxue/article/details/136315754
版权归原作者 羌俊恩 所有, 如有侵权,请联系我们删除。

“kafka三节点集群2.8.0平滑升级到3.4.0过程指导”的评论:

还没有评论