0


kafka的安装,用于数据库同步数据

1.0 背景调研

因业务需求,需要查询其他部门的数据库数据,不方便直连数据库,所以要定时将他们的数据同步到我们的环境中,技术选型选中了kafka+CDC

Kafka是Apache旗下的一款分布式流媒体平台,Kafka是一种高吞吐量、持久性、分布式的发布订阅的消息队列系统。 它最初由LinkedIn(领英)公司发布,使用Scala语言编写,与2010年12月份开源,成为Apache的顶级子项目。 它主要用于处理消费者规模网站中的所有动作流数据。动作指(网页浏览、搜索和其它用户行动所产生的数据)。

1.1 kafka核心

Kafka 的核心概念是消息系统,其中包含以下几个重要组件:

Topic(主题):消息的分类或者说是逻辑上的区域。数据被发布到 Kafka 集群的 Topic 中,并且消费者对特定 Topic 进行订阅来消费这些数据。

Producer(生产者):负责向 Kafka 集群中的 Topic 发布消息,可以是任何发送数据的应用程序。

Consumer(消费者):订阅一个或多个 Topic,从 Broker 中拉取数据并进行处理。消费者可以以组的形式组织,每个组只能有一个 Consumer 对 Topic 的每个分区进行消费。

Broker(代理服务器):Kafka 集群中的每个节点都是一个 Broker,负责管理数据的存储和传输,处理生产者和消费者的请求。

1.2 技术比对

相比于 Cattle,Kafka 具备以下优势:

高吞吐量和低延迟:Kafka 使用简单而高效的数据存储机制,具备高度可伸缩性,能够处理大规模的数据流。它的设计目标是支持每秒数百万条消息的处理。

分布式和可扩展:Kafka 可以在多个 Broker 节点上进行水平扩展,通过分区机制实现负载均衡和容错性。

持久化存储:Kafka 使用日志结构存储消息,提供了高效的磁盘持久化功能,确保数据的可靠性和持久性。

多语言支持:Kafka 提供丰富的客户端库,支持多种编程语言,包括 Java、Python、Go、C++ 等,方便开发者进行集成和使用。

生态系统丰富:Kafka 生态系统提供了许多与之配套的工具和服务,例如 Kafka Connect 用于数据集成,KSQL 实现流处理,以及一些第三方工具用于监控、管理和运维等。

总的来说,Kafka 是一个高性能、可靠性强且具备良好扩展性的分布式流处理平台,适用于实时数据流处理、消息队列、日志收集等各种场景。相比之下,Cattle 在数据传输和流处理方面的功能可能较为有限,但具体选择还需根据业务需求和技术栈来进行评估和决策。

2.0 安装环境

Linux服务器

Xshell工具

Jdk

Kafka

Docker

Windows服务器

消费者系统(我们用的是.net,可使用其他语言代替)

。。。。。。

3.0 目标源数据库配置步骤

需要在目标源数据库增加个账号给maxwell使用,这里以mysql为例

新建mysql的帐号

CREATE USER ' maxwell用户名'@'%' IDENTIFIED BY '密码';

CREATE USER 'maxwell用户名'@'localhost' IDENTIFIED BY '密码';

给新账号赋权限(这个是全部权限)

GRANT ALL ON maxwell用户名.* TO 'maxwell用户名'@'%';

GRANT ALL ON maxwell用户名.* TO 'maxwell用户名'@'localhost';

给新账号赋权限(这个是指定权限,两个执行一个就行)

GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON . TO 'maxwell用户名'@'%';

GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON . TO 'maxwell用户名'@'localhost';

4.0 Linux服务器配置步骤

4.1 安装基础工具

4.1.1 lrzsz(文件传输工具)

第1****步:安装lrzsz,这个工具可以直接拖拽文件上传到服务器

yum install lrzsz –y

执行结果示例

第2****步:新建目录,用来存储安装包

mkdir /home/software

第3****步:把安装包传输到linux服务器上,拖拽到/home/software

CD /home/software

Kafka安装包本次使用的是2.13版本,开始拖拽文件到Xshell窗口下

kafka_2.13-3.5.0.tgz

执行结果示例

执行ls查看目录下是否有文件

Ls

执行结果示例

4.1.2 vim(文本编辑器)

第1****步:安装vim,这个是文本编辑器

yum install vim –y

执行结果示例

可通过vim命令打开文本,配置文件等

4.2 安装Java

第1****步:安装jdk,执行命令,代表安装所有相关的包,yum install -y java-1.8.0-openjdk.x86_64如果执行这个命令,则只安装jdk本身,后续的软件安装运行会出现依赖包不全等问题,建议使用来安装

yum install -y java-1.8.0-openjdk*

执行结果示例

第2****步:查询java版本,检验第一步是否安装成功,同时查出java版本号,为后面配置环境变量做准备

rpm -qa | grep java

执行结果示例

第3****步:配置java环境变量

输入以下命令打开配置文件

vi /etc/profile

按键pgdn跳到最后一行 按键O 新插下一行

将下面文件添加到配置文件中 注意:替换java版本号

export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.382.b05-1.el7_9.x86_64

export JRE_HOME=/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.382.b05-1.el7_9.x86_64

export CLASSPATH=.:$JRE_HOME/lib

export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin

按键esc切换命令模式,输入:wq退出保存

第4****步:刷新环境变量,让新加的java配置生效

source /etc/profile

第5****步:检查java是否安装成功,输入后会提示java版本号,如果第3步配置错误,会提示java命令不存在

java –version

执行结果示例

4.3 安装kafka

第1****步:新建目录,用来存储kafka

mkdir /home/kafka

第2****步:要解压 tgz 文件,可以使用 Linux 系统自带的 tar 命令。文件在4.1.1中上传到了服务器

tar –xzvf kafka_2.13-3.5.0.tgz -C /home/kafka

执行结果示例

tar命令解释(无特殊情况不需要第3步后续操作,了解就好):

tar -xzvf kafka_2.13-3.5.0.tgz

x:表示解压

z:表示使用 gzip 压缩

v:表示显示详细的解压过程

f:表示指定要解压的文件

解压完成后,解压出来的文件会放在当前目录下。如果要将解压出来的文件放在其他目录,可以在命令中使用 -C 选项指定目录,例如:

tar -xzvf kafka_2.13-3.5.0.tgz -C /home/kafka

这样就可以将解压出来的文件放在 /home/kafka 目录下。

第4****步:因为解压后,多了一层目录【mv kafka_2.13-3.5.0】,所以要移动目录

mv kafka_2.13-3.5.0/* ./

然后删除空目录

rmdir kafka_2.13-3.5.0

第3****步:配置kafka,打开kafka配置文件

vim /home/kafka/config/server.properties

修改一下配置,注释掉的要打开注释,删除掉#

socket服务端地址
listeners=PLAINTEXT://【本机IP自行替换】:9092

侦听器名称、主机名和端口代理将通知给客户端。

advertised.listeners=PLAINTEXT:// 【本机IP自行替换】:9092

日志文件路径
log.dirs=/home/kafka_data/logs

zookeeper地址,zookeeper是用来监听kafka源数据变化
zookeeper.connect=【本机IP自行替换】:2181

第4****步:创建日志的文件夹,如果有就不用创建了

mkdir -p /home/kafka_data/logs

-p 是创建多级目录

4.4 安装docker

sudo是一个用于在Linux系统上获得超级用户权限的命令。它允许普通用户以超级用户身份执行特权命令。

第1****步:安装yum-utils,yum-utils是yum的一个附加软件包,提供了一些额外的功能和工具。

sudo yum install -y yum-utils

执行结果示例

第2****步:添加存储库的地址

sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo

执行结果示例

第3****步:安装docker相关程序

sudo yum install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin

docker-ce是Docker的主要软件包,包含了Docker的核心功能;

docker-ce-cli是Docker的命令行界面工具;

containerd.io是Docker的底层容器运行时,负责管理容器的生命周期;

docker-buildx-plugin是Docker的多平台构建插件,可以跨多种平台构建容器镜像;

docker-compose-plugin是Docker的应用程序容器编排工具,可以快速构建、启动和管理多个Docker容器应用。

执行结果示例

输入y继续

输入y继续

安装完成!!!

第4****步:配置docker

vi /etc/docker/daemon.json

添加配置项文件

{

"registry-mirrors": ["https://5twf62k1.mirror.aliyuncs.com"],

"insecure-registries": ["sinoeyes.io","hub.sinoeyes.com"]

}

按键esc切换命令模式,输入:wq退出保存,保存后会生成文件

registry-mirrors属性用于指定Docker镜像的加速器地址。由于Docker镜像可能分布在全球不同的地方,而各地的网络环境和速度也不同,因此使用镜像加速器可以显著提高拉取镜像的速度和稳定性。例如,将registry-mirrors属性设置为http://f1361db2.m.daocloud.io即可使用DaoCloud镜像加速器。

insecure-registries属性用于指定Docker容器镜像的非安全注册表地址。如果您使用的是私有Docker注册表,并且此注册表的TLS证书未经过验证或过期,则可以添加该注册表的URL以允许使用不安全的HTTP协议进行拉取和推送镜像。例如,将insecure-registries属性设置为["myregistry.example.com:5000"]即可允许使用不安全的HTTP协议访问myregistry.example.com:5000注册表。

第5****步:输入ls查看是否有文件生成

CD /etc/docker/

Ls

执行结果示例

第6****步:docker-compose部署,使用curl工具下载Docker Compose程序,并将其保存到/usr/local/bin/docker-compose路径下。

curl -L https://github.com/docker/compose/releases/download/1.21.1/docker-compose-`uname -s-uname -m` -o /usr/local/bin/docker-compose

执行结果示例

这里我因网络原因,下载失败,如果命令正常执行,不用进行第6步的后续。

手动上传文件到目录下,将docker-compose里面的文件全部上传到这个路径下【/usr/local/bin】。

第7****步:该命令的作用就是将/usr/local/bin/docker-compose文件的权限设置为可执行权限,以便用户可以直接运行这个文件并使用Docker Compose的功能。

chmod +x /usr/local/bin/docker-compose

第8****步:检查Docker Compose是否安装成功

docker-compose -v

执行结果示例

4.5 运行kafka

第1****步:新建目录

mkdir -p /home/app/maxwell/conf

第2****步:在/home/app/maxwell下创建文件docker-compose.yml

vim docker-compose.yml

按键I进入编辑模式输入以下文件内容

version: '3.5'

services:

maxwell:

restart: always

image: zendesk/maxwell

container_name: maxwell

network_mode: "host"

command: bin/maxwell --config /etc/maxwell/config.properties

#command: bin/maxwell-bootstrap --config /etc/maxwell/config.properties

volumes:

  - ./conf:/etc/maxwell/

environment:

  - "TZ=Asia/Shanghai"

按键esc切换命令模式,输入:wq退出保存,保存后会生成文件

maxwell容器的编排文件,以后启动maxwell时更方便

第3****步:在/home/app/maxwell/conf下创建文件config.properties

vim config.properties

按键I进入编辑模式,输入以下内容,根据情况替换信息,关键信息红色标出

daemon=true

第一次启动时建议改为debug,可以开到mysql数据与kafka请求,稳定后再改为info

#log_level=info

log_level=info

producer=kafka

kafka.bootstrap.servers=【本机IP自行替换】:9092

会往 kafka下主题为'test'的分区下推送数据

kafka_topic=maxwell

#当producer_partition_by设置为table时,Maxwell会将生成的消息根据表名称进行分区,不同的表将会被分配到不同的分区中,默认为database

producer_partition_by=table

client_id=maxwell_1

client_id=sddi-consumer-group-1-client-1

mysql login info 需要先在mysql创建maxwell用户

host=【目标源数据库IP】

port=33066

port=3306

user=【数据库账号】

password=【数据库密码】

schema_database=maxwell

replication_host=【目标源数据库IP】

replication_user=【数据库账号】

replication_password=【数据库密码】

replication_port=3306

jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai

exclude_dbs=*

同步的数据表

include_dbs=【sddi 数据库名】

inlcude_tables=【sddi.b_missfile_info,sddi.b_pharm_info, e,sddi.f_pbt_file,同步的表,逗号隔开】

#inlcude_tables=sddi.b_missfile_info,sddi.b_pharm_info

~

按键esc切换命令模式,输入:wq退出保存,保存后会生成文件

第4****步:启动docker

systemctl start docker

第5****步:启动zookeeper,以守护线程的方式,这个是kafka集成的消息队列

bin/zookeeper-server-start.sh -daemon /home/kafka/config/zookeeper.properties

如果出现错误,没有那个文件或目录,则进行一下操作

先把zookeeper-server-start.sh添加到环境变量

echo "export PATH=$PATH:/home/kafka/bin" >> /root/.bash_profile

source /root/.bash_profile

然后执行zookeeper命令

zookeeper-server-start.sh -daemon /home/kafka/config/zookeeper.properties

第6****步:启动kafka,以守护线程的方式

kafka-server-start.sh -daemon /home/kafka/config/server.properties

第7****步:在 Kafka 中创建一个名为 maxwell 的新主题(Topic)

kafka-topics.sh --bootstrap-server 192.168.180.31:9092 --create --topic maxwell

第8****步:启动maxwell

cd /home/app/maxwell

因为第1步和第2步创建了配置文件,所以这里可以省略很多参数,直接启动maxwell

docker-compose up -d

执行结果示例

第9****步:查看docker的maxwell容器

列出所有容器

docker ps

查询指定容器的日志

docker logs --tail 100 -f 51b978d72157

第10****步:查看maxwell的下发消息(数据量大慎用)

kafka-console-consumer.sh --topic maxwell --from-beginning --bootstrap-server 192.168.180.31:9092

5.0 后期运维

查询所有的消费组

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --list

查看某个消费组的消费情况

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --describe --group sddi-consumer-group-1

属性介绍

列头

实例值

描述

GROUP

sddi-consumer-group-1

消费者组的名称

TOPIC

maxwell

所订阅的主题。

PARTITION

0

主题的分区编号。

CURRENT-OFFSET

666520

消费者当前的偏移量(已经消费到的消息的偏移量)。

LOG-END-OFFSET

698913

当前分区日志的最新偏移量(该分区中的消息总数)。

LAG

32393

当前落后的偏移量数量(LAG = LOG-END-OFFSET - CURRENT-OFFSET),表示消费者还未消费的消息数量。

CONSUMER-ID

sddi-consumer-group-1-client-1-8cc14037-bf1c-43eb-8094-2c4e45e5cc04

消费者的唯一标识符。

HOST

/192.168.180.18

消费者所在的主机名。

CLIENT-ID

sddi-consumer-group-1-client-1

消费者的客户端标识符。

这些信息对于监控和管理 Kafka 消费者组以及消费状态非常有用。可以根据需要使用不同的选项来查看和管理消费者组的状态。

初始化maxwell 测试环境 命令是单个表初始化同步

docker run -it --network host --rm zendesk/maxwell bin/maxwell-bootstrap --user=maxwell --password=rPq60r4BUA19@ --host=192.168.17.22 -database sddi --table b_business_info --client_id=sddi-consumer-group-1-client-1

停止maxwell (只是记录命令,可以不执行)

cd /home/app/maxwell

docker-compose down

停止kafka

kafka-server-stop.sh

停止zookeeper

zookeeper-server-stop.sh

查看容器的配置

docker inspect maxwell

查看主题列表

kafka-topics.sh --list --bootstrap-server 192.168.180.31:9092

创建主题

kafka-topics.sh --bootstrap-server 192.168.180.31:9092 --create --topic maxwell --replication-factor 1 --partitions 3

查看主题信息

kafka-topics.sh --bootstrap-server 192.168.180.31:9092 --describe --topic maxwell

使用生产者发送消息

kafka-console-producer.sh --broker-list 192.168.180.31:9092 --topic maxwell

使用消息者接受消息(从起始位置开始查看)

kafka-console-consumer.sh --bootstrap-server 192.168.180.31:9092 --topic maxwell --from-beginning

使用消息者接受消息(从当前位置开始查看)

kafka-console-consumer.sh --bootstrap-server 192.168.180.31:9092 --topic maxwell

查看kafka的所有消费组

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --list

查看kafka日志

tail -10000f /home/kafka/logs/server.log

查看zookeeper日志

tail -1000f /home/kafka/logs/zookeeper.out

查看maxwell日志

cd /home/app/maxwell

docker-compose logs -f --tail 100

Earliest 策略直接指定–to-earliest

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --all-topics --to-earliest –execute

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --topics maxwell --to-earliest –execute

Latest 策略直接指定–to-latest

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --all-topics --to-latest --execute

Current 策略直接指定–to-current

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --all-topics --to-current --execute

Specified-Offset 策略直接指定–to-offset

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --all-topics --to-offset 51691 --execute

Shift-By-N 策略直接指定–shift-by N

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --topic maxwell --shift-by 1 --execute

DateTime 策略直接指定–to-datetime

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute

最后是实现 Duration 策略,我们直接指定–by-duration

kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --by-duration PT0H30M0S --execute

Earliest 策略表示将位移调整到主题当前最早位移处。这个最早位移不一定就是 0,因为在生产环境中,很久远的消息会被 Kafka 自动删除,所以当前最早位移很可能是一个大于 0 的值。如果你想要重新消费主题的所有消息,那么可以使用 Earliest 策略。

Latest 策略表示把位移重设成最新末端位移。如果你总共向某个主题发送了 15 条消息,那么最新末端位移就是 15。如果你想跳过所有历史消息,打算从最新的消息处开始消费的话,可以使用 Latest 策略。

Current 策略表示将位移调整成消费者当前提交的最新位移。有时候你可能会碰到这样的场景:你修改了消费者程序代码,并重启了消费者,结果发现代码有问题,你需要回滚之前的代码变更,同时也要把位移重设到消费者重启时的位置,那么,Current 策略就可以帮你实现这个功能。

表中第 4 行的 Specified-Offset 策略则是比较通用的策略,表示消费者把位移值调整到你指定的位移处。这个策略的典型使用场景是,消费者程序在处理某条错误消息时,你可以手动地“跳过”此消息的处理。在实际使用过程中,可能会出现 corrupted 消息无法被消费的情形,此时消费者程序会抛出异常,无法继续工作。一旦碰到这个问题,你就可以尝试使用 Specified-Offset 策略来规避。

如果说 Specified-Offset 策略要求你指定位移的绝对数值的话,那么 Shift-By-N 策略指定的就是位移的相对数值,即你给出要跳过的一段消息的距离即可。这里的“跳”是双向的,你既可以向前“跳”,也可以向后“跳”。比如,你想把位移重设成当前位移的前 100 条位移处,此时你需要指定 N 为 -100。

刚刚讲到的这几种策略都是位移维度的,下面我们来聊聊从时间维度重设位移的 DateTime 和 Duration 策略。

DateTime 允许你指定一个时间,然后将位移重置到该时间之后的最早位移处。常见的使用场景是,你想重新消费昨天的数据,那么你可以使用该策略重设位移到昨天 0 点。

Duration 策略则是指给定相对的时间间隔,然后将位移调整到距离当前给定时间间隔的位移处,具体格式是 PnDTnHnMnS。如果你熟悉 Java 8 引入的 Duration 类的话,你应该不会对这个格式感到陌生。它就是一个符合 ISO-8601 规范的 Duration 格式,以字母 P 开头,后面由 4 部分组成,即 D、H、M 和 S,分别表示天、小时、分钟和秒。举个例子,如果你想将位移调回到 15 分钟前,那么你就可以指定 PT0H15M0S。

6.0 消费者系统

我们这里采用的是.net平台的c#语言,编写了windows server程序,来处理队列数据

使用的类库是Confluent.Kafka

6.1 Confluent.Kafka

Confluent.Kafka是一个流行的开源Kafka客户端库,它提供了在.NET应用程序中与Apache Kafka进行交互的功能。下面是对Confluent.Kafka类库的介绍:

高级API:Confluent.Kafka为开发人员提供了一组简单易用的高级API,用于连接到Kafka集群、读取和写入消息、管理消费者组等。你可以方便地使用它来发送消息到Kafka主题或从主题中消费消息。

完全支持Kafka协议:Confluent.Kafka完全支持Kafka协议,包括Kafka 1.0.0及更高版本。它与最新的Kafka版本保持同步,并提供了一致性和稳定性。

高性能:Confluent.Kafka经过优化,具有出色的性能表现。它采用了异步、无锁的设计,支持高并发的消息处理。

配置灵活:Confluent.Kafka提供了丰富的配置选项,可以根据实际需求进行调整。你可以配置消息的传递语义、消费者的批量读取、消息序列化和反序列化方式等。

支持消息引擎扩展:Confluent.Kafka还支持通过插件机制扩展消息引擎,例如支持Avro、Protobuf等消息格式的插件。

社区活跃:Confluent.Kafka是一个受欢迎且活跃的开源项目,拥有强大的社区支持。你可以在社区中获得帮助、提出问题或提交改进建议。

总之,Confluent.Kafka是一个功能强大、性能优越的Kafka客户端库,为.NET开发人员提供了与Apache Kafka无缝集成的能力,使他们能够轻松地构建可靠的消息流应用程序。

6.2 相关连接

https://github.com/confluentinc/confluent-kafka-dotnet/

http://www.bilibili996.com/Course?id=1159444000368

https://zhuanlan.zhihu.com/p/139101754?utm_id=0


本文转载自: https://blog.csdn.net/qq_41192896/article/details/136173558
版权归原作者 Libby博仙 所有, 如有侵权,请联系我们删除。

“kafka的安装,用于数据库同步数据”的评论:

还没有评论