0


大数据高级开发工程师——大数据相关工具之三 Maxwell

文章目录

大数据相关工具

Maxwell数据实时同步工具

Maxwell 简介

  • Maxwell 是一个能实时读取 MySQL 二进制日志文件binlog,并生成 Json格式的消息,作为生产者发送给 Kafka、Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。它的常见应用场景有ETL、维护缓存、收集表级别的 DML 指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。
  • 官网地址:http://maxwells-daemon.io
  • GitHub地址:https://github.com/zendesk/maxwell
  • Maxwell 主要功能: - 支持 select * from table 方式进行全量数据初始化;- 支持在主库发生 failover 后,自动恢复 binlog 位置,实现断点续传;- 可以对数据进行分区,解决数据倾斜问题,发送到 Kafka 的数据支持库、表、列等级别的数据分区;- 工作方式是伪装为 slave 接收 binlog events,然后根据 schema 信息拼装,可以接受 ddl、xid、row 等event。

MySQL Binlog 介绍

1. Binlog简介

  • MySQL 中有以下几种日志: - 错误日志:记录在启动,运行或停止mysqld时遇到的问题- 通用日志:记录建立的客户端连接和执行的语句- 二进制日志(bin log):记录更改数据的语句- 中继日志:从服务器 复制 主服务器接收的数据更改- 慢查询日志:记录所有执行时间超过 long_query_time 秒的所有查询或不使用索引的查询- DDL 日志(元数据日志):元数据操作由 DDL 语句执行
  • 在默认情况下,系统仅仅打开错误日志,关闭了其他所有日志,以达到尽可能减少IO损耗提高系统性能的目的,但是在一般稍微重要一点的实际应用场景中,都至少需要打开二进制日志,因为这是MySQL很多存储引擎进行增量备份的基础,也是MySQL实现复制的基本条件。
  • MySQL 的二进制日志 binlog 可以说是 MySQL 最重要的日志,它记录了所有的 DDLDML 语句(除了数据查询语句select、show等),以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。binlog 的主要目的是复制和恢复。 - MySQL主从复制:MySQL Replication 在 Master 端开启 binlog,Master 把它的二进制日志传递给slaves 来达到 master-slave 数据一致的目的。- 数据恢复:通过使用 mysqlbinlog 工具来使恢复数据。

2. Binlog的日志格式

  • 记录在二进制日志中的事件的格式取决于二进制记录格式,支持三种格式类型: - Statement:基于 SQL 语句的复制(statement-based replication, SBR)- Row:基于行的复制(row-based replication, RBR)- Mixed:混合模式复制(mixed-based replication, MBR)
  • Statement: - 每一条会修改数据的sql都会记录在binlog中。- 优点:不需要记录每一行的变化,减少了binlog日志量,节约了IO,提高了性能。- 缺点:在进行数据同步的过程中有可能出现数据不一致,比如 update tt set create_date=now(),如果用binlog日志进行恢复,由于执行时间不同可能产生的数据就不同。
  • Row: - 它不记录sql语句上下文相关信息,仅保存哪条记录被修改。- 优点:保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,它只记录执行后的效果。- 缺点:每行数据的修改都会记录,最明显的就是update语句,导致更新多少条数据就会产生多少事件,占用较大空间。
  • Mixed: - 从5.1.8版本开始,MySQL提供了Mixed格式,实际上就是Statement与Row的结合。在Mixed模式下,一般的复制使用Statement模式保存binlog,对于Statement模式无法复制的操作使用Row模式保存binlog, MySQL会根据执行的SQL语句选择日志保存方式(因为statement只有sql,没有数据,无法获取原始的变更日志,所以一般建议为Row模式)。- 优点:节省空间,同时兼顾了一定的一致性。- 缺点:还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便。

Mysql 实时数据同步方案对比

  • mysql 数据实时同步可以通过解析mysql的 binlog 的方式来实现,解析binlog可以有多种方式,可以通过canal,或者maxwell等各种方式实现。以下是各种抽取方式的对比介绍:
    对比项CanalMaxwellmysql_streamer开源方阿里巴巴zendeskYelp语言JavaJavaPython活跃度活跃活跃活跃HA支持定制支持数据落地定制Kafka等Kafka分区支持支持不支持bootstrap不支持支持支持数据格式格式自由json(固定)json(固定)文档较详细较详细粗略随机读支持支持支持

  • 其中canal由 Java开发,分为服务端和客户端,拥有众多的衍生应用,性能稳定,功能强大;canal 需要自己编写客户端来消费 canal 解析到的数据。

  • Maxwell相对于canal的优势是使用简单,Maxwell比Canal更加轻量级,它直接将数据变更输出为json字符串,不需要再编写客户端。对于缺乏基础建设,短时间内需要快速迭代的项目和公司比较合适

  • 另外Maxwell 有一个亮点功能,就是Canal只能抓取最新数据,对已存在的历史数据没有办法处理。而Maxwell有一个bootstrap功能,可以直接引导出完整的历史数据用于初始化,非常好用。

开启MySQL的Binlog

  • mysql的版本尽量不要太低,也不要太高,最好使用5.6及以上版本。
  • 添加mysql普通用户maxwell,因为maxwell这个软件默认用户使用的是maxwell这个用户。
-- 校验级别最低,只校验密码长度
mysql>setglobal validate_password_policy=LOW;
mysql>setglobal validate_password_length=6;-- 创建maxwell库(启动时候会自动创建,不需手动创建)和用户
mysql>CREATEUSER'maxwell'@'%' IDENTIFIED BY'123456';
mysql>GRANTALLON maxwell.*TO'maxwell'@'%';
mysql>GRANTSELECT,REPLICATION CLIENT,REPLICATION SLAVE on*.*to'maxwell'@'%';-- 刷新权限
mysql> flush privileges;
  • 修改配置文件 sudo vim /etc/my.cnf,添加或修改以下三行配置:
# binlog日志名称前缀
log-bin=/var/lib/mysql/mysql-bin
# binlog日志格式
binlog-format=ROW
# 唯一标识,这个值的区间是:1到(2^32)-1server_id=1
  • 重启 MySQL 服务
sudoservice mysqld restart
  • 验证 binlog 是否配置成功
show variables like '%log_bin%';
  • 查看binlog日志文件生成:进入 /var/lib/mysql 目录,查看binlog日志文件

在这里插入图片描述

Maxwell 安装部署

  • 安装包下载&解压:
wget https://github.com/zendesk/maxwell/releases/download/v1.21.1/maxwell-1.21.1.tar.gz
tar -zxvf maxwell-1.21.1.tar.gz -C /bigdata/install
  • 修改maxwell配置文件:
cd /bigdata/install/maxwell-1.21.1/
cp config.properties.example config.properties
vim config.properties
  • 配置文件config.properties 内容如下:
# choose where to produce data toproducer=kafka
# list of kafka brokers
kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092
# mysql login infohost=node03
port=3306user=maxwell
password=123456# kafka topic to write tokafka_topic=maxwell

Maxwell 实时采集案例

  • 关于 kafka 的安装和使用,请参考我之前的博文:JavaEE 企业级分布式高级架构师(十二)Kafka学习笔记
  • 启动kafka集群和zookeeper集群
zkServer.sh start
bin/kafka-server-start.sh -daemon config/server.properties
  • 创建 topic
# /usr/apps/kafka
bin/kafka-topics.sh --create --bootstrap-server 192.168.254.128:9092 --replication-factor 2 --partitions 3 --topic maxwell
# 查看topic
bin/kafka-topics.sh --list --bootstrap-server 192.168.254.128:9092

在这里插入图片描述

  • node01 启动 kafka 自带控制台消费者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.254.128:9092 --topic maxwell --from-beginning
  • node03 启动maxwell服务
maxwell

在这里插入图片描述

  • 插入数据进行测试,向mysql表中插入一条数据,查看kafka是否能够接收到数据
CREATEDATABASE/*!32312 IF NOT EXISTS*/`test_db`/*!40100 DEFAULT CHARACTER SET utf8 */;USE`test_db`;/*Table structure for table `user` */DROPTABLEIFEXISTS`user`;CREATETABLE`user`(`id`varchar(10)NOTNULL,`name`varchar(10)DEFAULTNULL,`age`int(11)DEFAULTNULL,PRIMARYKEY(`id`))ENGINE=InnoDBDEFAULTCHARSET=utf8;/*Data for the table `user` */-- 插入数据insertinto`user`(`id`,`name`,`age`)values('1','xiaokai',20);-- 修改数据update`user`set age=30where id='1';-- 删除数据deletefrom`user`where id='1';

在这里插入图片描述

  • 输出 json 数据字段说明:
    字段说明database数据库名称table表名称type操作类型,包括 insert、update、delete 等ts操作时间戳xid事务idcommit同一个 xid 代表同一个事务,事务的最后一条语句会有 commitdata最新的数据,修改后的数据old旧数据,修改前的数据
标签: mysql 数据库 kafka

本文转载自: https://blog.csdn.net/yangwei234/article/details/122288454
版权归原作者 yangwei_sir 所有, 如有侵权,请联系我们删除。

“大数据高级开发工程师——大数据相关工具之三 Maxwell”的评论:

还没有评论