文章目录
背景信息
Canal是Github中开源的ETL(Extract Transform Load)软件,其功能原理及详细说明请参见Canal。
使用流程
使用canal将MySQL数据同步到表格存储的使用流程如下:
- 在MySQL中,准备待同步的数据。
- 在阿里云Elasticsearch实例中,创建索引。要求Mapping中定义的字段名称和类型与待同步数据保持一致。
- 安装Canal-server,然后修改配置文件关联MySQL,Canal-server模拟MySQL集群的一个slave,获取MySQL集群Master节点的二进制日志(binary log),并将日志推送给Canal-adapter。 该服务负责从上游拉取binlog数据、记录位点等。
- 安装Canal-adapter,然后修改配置文件关联MySQL和Elasticsearch,以及定义MySQL数据到Elasticsearch数据的映射字段,用来将数据同步到Elasticsearch。 该服务负责对接Deployer解析过的数据,并将数据传输到目标库中
- 在MySQL中新增、修改或删除数据,查看数据同步结果。
待全量数据同步完成后,canal会自动开始增量同步。
说明:本文部署方式仅针对于1.1.4版本,其他版本未进行测试,酌情处理
步骤一:准备MySQL数据源
进入MySQL配置文件,修改打开binlog,准备好需要抽取的表。
本文创建的表名称为es_log_demo,包含的字段如下所示。
步骤二:创建索引
- 登录服务器,根据数据库表结构创建索引(索引若已存在则不需要创建)。
- 通过curl执行创建索引命令curl -H ‘Content-Type: application/json’ --user ES用户名:ES密码 -XPUT http://127.0.0.1:9200/需要创建的索引名 -d {索引配置}
curl -H 'Content-Type: application/json' --user elastic:ErtyuioP!@$ -XPUT http://127.0.0.1:9200/es_log_demo -d '{ "type": "es_log_demo", "mappings": { "es_log_demo": { "properties": { "id": { "type": "keyword" }, "log_date": { "type": "date", "format": "strict_date_optional_time||epoch_millis" }, "log_level": { "type": "text" }, "log_class": { "type": "keyword" }, "line_number": { "type": "integer" }, "log_detail": { "type": "text" } } } }, "settings": { "index": { "max_result_window": "2000000000", "number_of_shards": 3, "number_of_replicas": 2 } }}'
创建成功后,返回如下结果。{"acknowledged":true,"shards_acknowledged":true,"index":"索引名"}
步骤三:安装并启动Canal-server(Deployer)
- 下载canal.deployer-1.1.4.tar.gz包并解压。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
解压后,可以看到解压路径下的bin、conf、plugin、lib文件夹(解压默认没有外部文件夹,需要提前新建。或者使用 -C指定文件夹路径)。 - canal实例名称默认为example,如果需要自定义canal实例名称,例如改为demo,请执行以下操作(可不修改)。1. 修改canal.properties文件中canal.destinations(默认为第96行)的值为自定义的canal实例名称,其他配置均保持默认即可。
canal.destinations = demo
2. 在conf路径下创建以canal实例名称命名的文件夹demo,并将conf/example路径下的instance.properties文件复制到conf/demo/路径下。 - 修改instance.properties文件(默认在conf/example文件夹下)。1. 修改需要抽取的数据库地址,canal.instance.master.address(默认在第9行),例如:127.0.0.1:33062. 修改需要抽取的数据库用户名,canal.instance.dbUsername(默认在第33行),例如:root3. 修改需要抽取的数据库密码,canal.instance.dbPassword(默认在第34行),例如:QwertyuioP!@$
- instance.properties修改后如下
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=QwertyuioP!@$
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
instance.properties文件中的主要配置项说明请参见下表。
配置项是否必填示例值描述canal.instance.master.address是rm-bp15p07134rkvf7****.mysql.rds.aliyuncs.com:****
127.0.0.1:3306canal监听的数据库地址,格式为
host:port
。canal.instance.rds.accesskey否LTAn***当MySQL为阿里云产品RDS库时,填写登录账号的AccessKey ID和AccessKey Secret,获取方式请参见为RAM用户创建访问密钥。如果非RDS库,无需填写此项。*canal.instance.rds.secretkey否zbnK*****canal.instance.rds.instanceId否rm-bp15p0713当MySQL为阿里云产品RDS库时,填写实例ID。如果非RDS库,无需填写此项。**canal.instance.dbUsername是root数据库账号用户名。canal.instance.dbPassword是QwertyuioP!@$数据库账号密码。canal.instance.filter.regex是.\…canal实例关注的表。通过正则表达式匹配。此处表示匹配所有数据库下的所有表。canal.destinations是democanal实例名称,必须与配置文件所在上层路径相同。例如配置文件的路径为conf/demo/instance.properties,则canal实例名称为demo。
- 通过bin路径下的sh脚本启动项目
./bin/startup.sh或sh bin/startup.sh
启动完成后查看logs/canal/canal.log出现the canal server is running now …即为启动成功
步骤四:部署Client-Adapter
- 下载canal.adapter-1.1.4.tar.gz包并解压。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
解压后,可以看到项目路径下的bin、conf、plugin、lib文件夹(解压默认没有外部文件夹,需要提前新建。或者使用 -C指定文件夹路径)。 - 修改配置文件。i. 修改conf路径下application.yml文件中表格存储相关的配置信息1. 修改端口server:port,默认为8081canal.conf:canalServerHost,默认在第11行,Deployer的IP和端口2. 修改数据库配置1. canal.conf:srcDataSources:defaultDS:url,默认在23行,示例:jdbc:mysql://127.0.0.1/demo?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&serverTimezone=Asia/Shanghai2. canal.conf:srcDataSources:defaultDS:username,默认在24行,示例:root3. canal.conf:srcDataSources:defaultDS:password,默认在25行,示例:QwertyuioP!@$3. 修改实例名称canal.conf:canalAdapters:- instance,默认是example,需要与Deployer的实例名称(canal.destinations)保持一致4. 修改推送数据源1. 推送到ES 1. 推送到哪个数据库canal.conf:canalAdapters.groups:outerAdapters:name,默认在60行,示例:es,可不修改2. ES地址:canal.conf:canalAdapters.groups:outerAdapters:hosts,默认在61行,示例:127.0.0.1:92003. ES配置:canal.conf:canalAdapters.groups:outerAdapters:properties:mode,默认在63行,示例:rest(可为rest与transport,但需要固定为rest)4. ES用户名密码:canal.conf:canalAdapters.groups:outerAdapters:properties:security.auth,默认在64行,示例(格式:用户名:密码):elastic:ErtyuioP!@$5. canal.conf:canalAdapters.groups:outerAdapters:properties:cluster.name,默认在65行,示例:elasticsearch其他配置项的说明请参见ClientAdapter配置项说明。配置项说明canal.conf.canalServerHostDeployer访问地址canal.conf.srcDataSources.defaultDS.url需要设置为jdbc:mysql://<地址>:<端口>/<数据库名称>,例如jdbc:mysql://127.0.0.1/demo?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&serverTimezone=Asia/Shanghai****canal.conf.srcDataSources.defaultDS.usernameMySQL数据库的账号名称canal.conf.srcDataSources.defaultDS.passwordMySQL数据库的密码。canal.conf.canalAdapters.groups.outerAdapters.hosts定位到name:es的位置,将hosts替换为**<Elasticsearch实例的内网地址>:<内网端口>,相关信息可在Elasticsearch实例的基本信息页面获取。例如es-cn-v64xxxxxxxxx3medp.elasticsearch.aliyuncs.com:9200。canal.conf.canalAdapters.groups.outerAdapters.mode必须设置为rest。canal.conf.canalAdapters.groups.outerAdapters.properties.security.auth需要设置为<Elasticsearch实例的账号>:<密码>。例如elastic:es_password。canal.conf.canalAdapters.groups.outerAdapters.properties.cluster.nameElasticsearch实例的ID,没有可填写elasticsearchapplication.yml文件的完整配置示例如下:
server:port:8081spring:jackson:date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf:mode: tcp # kafka rocketMQcanalServerHost: 127.0.0.1:11111# zookeeperHosts: slave1:2181# mqServers: 127.0.0.1:9092 #or rocketmq# flatMessage: truebatchSize:500syncBatchSize:1000retries:0timeout:accessKey:secretKey:srcDataSources:defaultDS:url: jdbc:mysql://127.0.0.1/demo?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&serverTimezone=Asia/Shanghaiusername: root password: QwertyuioP!@$canalAdapters:-instance: example # canal instance Name or mq topic namegroups:-groupId: g1 outerAdapters:-name: logger # - name: rdb# key: mysql1# properties:# jdbc.driverClassName: com.mysql.jdbc.Driver# jdbc.url: jdbc:mysql://127.0.0.1:3306/my2?useUnicode=true# jdbc.username: root# jdbc.password: 121212# - name: rdb# key: oracle1# properties:# jdbc.driverClassName: oracle.jdbc.OracleDriver# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE# jdbc.username: mytest# jdbc.password: m121212# - name: rdb# key: postgres1# properties:# jdbc.driverClassName: org.postgresql.Driver# jdbc.url: jdbc:postgresql://localhost:5432/postgres# jdbc.username: postgres# jdbc.password: 121212# threads: 1# commitSize: 3000# - name: hbase# properties:# hbase.zookeeper.quorum: 127.0.0.1# hbase.zookeeper.property.clientPort: 2181# zookeeper.znode.parent: /hbase-name: es hosts: 127.0.0.1:9200# 127.0.0.1:9200 for rest modeproperties:mode: rest # or transportsecurity.auth: elastic:ErtyuioP!@$# only used for rest modecluster.name: elasticsearch
ii.同样的方式,修改conf/es/*.yml文件,定义MySQL数据到Elasticsearch数据的映射字段。说明** 如果不存在conf/es路径,请手动创建该路径。1. 在conf/es路径下创建.yml格式的文件1. biz_order.yml2. customer.yml3. mytest_user.yml(全量推送数据时使用)2. 在biz_order.yml中填写以下内容并配置相应参数。1. dataSourceKey,与application.yml中的canal.conf.srcDataSources.defaultDS一致,示例:defaultDS2. destination,与deployer的实例名一致,示例:example3. groupId,与application.yml中的canal.conf.canalAdapters.groups:-groupId一致,示例:g1dataSourceKey: defaultDSdestination: examplegroupId: g1esMapping:_index: es_log_demo _type: _doc _id: _id# relations:# customer_order:# name: order# parent: customer_idsql:"select id _id, id, log_date, log_level, log_class, line_number, log_detail from es_log_demo"# skips:# - customer_id# etlCondition: "where t.c_time>={}"commitBatch:3000
3. 在customer.yml中填写以下内容并配置相应参数。dataSourceKey: defaultDSdestination: examplegroupId: g1esMapping:_index: es_log_demo _type: _doc _id: _id# relations:# customer_order:# name: customersql:"select id _id, id, log_date, log_level, log_class, line_number, log_detail from es_log_demo"# etlCondition: "where t.c_time>={}"commitBatch:3000
4. 在mytest_user.yml中填写以下内容并配置相应参数。dataSourceKey: defaultDSdestination: examplegroupId: g1esMapping:_index: es_log_demo _type: _doc _id: _id upsert:true# pk: idsql:"select id _id, id, log_date, log_level, log_class, line_number, log_detail from es_log_demo"# objFields:# _labels: array:;# etlCondition: "where a.c_time>={}"commitBatch:1000
配置项说明esMapping._indexesMapping._typeesMapping._id需要同步到Elasticsearch实例的文档的id,可自定义。本文使用**_id。esMapping.sqlSQL语句,用来查询需要同步到Elasticsearch中的字段。本文使用select id _id, id, log_date, log_level, log_class, line_number, log_detail from es_log_demo
。esMapping.commitBatch**批量提交条数,可设置为默认的3000 - 修改完成后,启动Canal-adapter服务,并查看日志。
./bin/startup.sh或sh bin/startup.sh
启动完成后查看logs/adapter/adapter.log出现c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in * seconds (JVM running for *)即为启动成功
步骤五:验证增量数据同步
- 在MySQL数据库对应的表中,新增、修改或删除表的记录。
- 登录Elasticsearch-head查看对应的索引中是否有数据变更。
步骤六:同步MySQL全量数据(如果不需要同步全量数据,可忽略这一步)
执行以下命令调用Client-Adapter服务的方法触发同步任务。此时,canal会先中止增量数据传输,然后同步全量数据。待全量数据同步完成后,canal会自动进行增量数据同步。
- 命令格式
curl "hostip:port/etl/type/task"-XPOST
- 示例
curl "127.0.0.1:8081/etl/es/mytest_user.yml"-XPOST
执行后需要等待,成功后会显示以下内容:
{"succeeded":true,"resultMessage":"导入ES 数据:* 条"}
详细配置项说明请参见下表。
配置项是否必选示例描述hostip是localhost部署canal服务的机器IP地址。当在部署canal服务的机器上执行此命令时,可设置hostip为localhost。port是8081adapter端口type是es下游数据库类型task是mytest_user.yml
常见问题及解决方式
1. 部署后启动出现(src-data-sources******)
Failedtobind properties under 'canal.conf'tocom.alibaba.otter.canal.adapter.launcher.config.AdapterCanalConfig:Reason:Unabletoset value for property src-data-sources
解决方案1:
MySQL使用的是5.7以上版本,驱动问题,lib文件夹下的mysql-connector-java-5.1.40.jar替换为mysql-connector-java-5.1.47.jar即可解决。
canal.adapter-1.1.4数据库驱动默认为5.1.40版本,随本文附带的adapter安装包驱动已修改为5.1.47。
并且5.1.47版本也放在了jar文件夹内,需要可手动替换。
解决方案2:
jdbcurl: jdbc:mysql://127.0.0.1/demo
修改为
jdbcurl: jdbc:mysql://127.0.0.1/demo?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&serverTimezone=Asia/Shanghai
2. 启动adapter出现以下问题(Not found the mapping info of index:******):
java.lang.RuntimeException:java.lang.RuntimeException:java.lang.IllegalArgumentException:Not found the mapping info of index: es_log_demo
at com.alibaba.otter.canal.client.adapter.support.Util.sqlRS(Util.java:65)
at com.alibaba.otter.canal.client.adapter.es.service.ESEtlService.executeSqlImport(ESEtlService.java:80)
at com.alibaba.otter.canal.client.adapter.support.AbstractEtlService.lambda$importData$1(AbstractEtlService.java:91)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)Caused by:java.lang.RuntimeException:java.lang.IllegalArgumentException:Not found the mapping info of index: es_log_demo
at com.alibaba.otter.canal.client.adapter.es.service.ESEtlService.lambda$executeSqlImport$1(ESEtlService.java:206)
at com.alibaba.otter.canal.client.adapter.support.Util.sqlRS(Util.java:60)...6 common frames omitted
Caused by:java.lang.IllegalArgumentException:Not found the mapping info of index: es_log_demo
at com.alibaba.otter.canal.client.adapter.es.support.ESTemplate.getEsType(ESTemplate.java:497)
at com.alibaba.otter.canal.client.adapter.es.support.ESTemplate.getValFromRS(ESTemplate.java:262)
at com.alibaba.otter.canal.client.adapter.es.service.ESEtlService.lambda$executeSqlImport$1(ESEtlService.java:98)...7 common frames omitted
解决方案1:
解决方案2:
已创建索引,但索引字段未设置mapping,请根据实际结构设置对应的mapping字段及类型
示例:
curl -H 'Content-Type: application/json' --user elastic:ErtyuioP!@$ -XPOST http://127.0.0.1:9200/es_log_demo/_doc/_mapping?pretty -d '
{
"_doc" : {
"properties" : {
"id": {
"type": "integer"
},
"log_date": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
"log_level" : {
"type" : "text"
},
"log_class" : {
"type" : "keyword"
},
"line_number" : {
"type" : "integer"
},
"log_detail" : {
"type" : "text"
}
}
}
}'
配置项说明http://127.0.0.1:9200ES:ip+端口es_log_demo索引名称_doctype_mapping固定为_mappingpretty固定为pretty
2022-04-1114:43:30.101[destination = example , address =/192.168.7.181:33307,EventParser] ERROR c.a.o.canal.parse.inbound.mysql.dbsync.DirectLogFetcher-I/O error while reading from client socket
java.io.IOException:Received error packet: errno =1236, sqlstate =HY000 errmsg =Could not find first log file name in binary log index file
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:102)~[canal.parse-1.1.4.jar:na]
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:235)[canal.parse-1.1.4.jar:na]
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:265)[canal.parse-1.1.4.jar:na]
at java.lang.Thread.run(Thread.java:748)[na:1.8.0_151]2022-04-1114:43:30.101[destination = example , address =/192.168.7.181:33307,EventParser] ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy- dump address /192.168.7.181:33307 has an error, retrying. caused by
java.io.IOException:Received error packet: errno =1236, sqlstate =HY000 errmsg =Could not find first log file name in binary log index file
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:102)~[canal.parse-1.1.4.jar:na]
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:235)~[canal.parse-1.1.4.jar:na]
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:265)~[canal.parse-1.1.4.jar:na]
at java.lang.Thread.run(Thread.java:748)[na:1.8.0_151]2022-04-1114:43:30.101[destination = example , address =/192.168.7.181:33307,EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler- destination:example[java.io.IOException:Received error packet: errno =1236, sqlstate =HY000 errmsg =Could not find first log file name in binary log index file
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:102)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:235)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:265)
at java.lang.Thread.run(Thread.java:748)]
原因:
Canal同步是模拟MySQL集群的一个slave
当mysql主备切换时,无论binlog文件名是否相同,如果原主节点position大于主备切换后主库当前binlog的position
解决方案:
删除canal.deployer/conf/example下的meta.dat文件,重启:canal.deployer即可
2022-04-1217:24:18.095[MultiStageCoprocessor-Parser-example-6] ERROR com.alibaba.otter.canal.common.utils.NamedThreadFactory- from MultiStageCoprocessor-Parser-example-6com.alibaba.otter.canal.parse.exception.CanalParseException:com.alibaba.otter.canal.parse.exception.CanalParseException:com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by:com.alibaba.otter.canal.parse.exception.CanalParseException:com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by:com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by:com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table:zzddt_dev.sszvcl_team_month_count,27 vs 252022-04-1217:24:18.099[MultiStageCoprocessor-Parser-example-8] ERROR com.alibaba.otter.canal.common.utils.NamedThreadFactory- from MultiStageCoprocessor-Parser-example-8com.alibaba.otter.canal.parse.exception.CanalParseException:com.alibaba.otter.canal.parse.exception.CanalParseException:com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by:com.alibaba.otter.canal.parse.exception.CanalParseException:com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by:com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by:com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table:zzddt_dev.sszvcl_team_month_count,27 vs 25
原因:
修改了数据库表结构,同时也没有开启ddl同步
解决方案:
1. 修改canal.deployer中,conf下canal.properties配置文件,
canal.instance.filter.druid.ddl = true;(默认在第56行)
2. 删除canal.deployer/conf/example下的meta.dat文件,
3. 重启canal.deployer
说明:
- 开启canal.instance.filter.druid.ddl后,后续ddl更新将会自动同步。
版权归原作者 gt-it 所有, 如有侵权,请联系我们删除。