FlinkX概述
FlinkX是在袋鼠云内部广泛使用的基于flink的分布式离线和实时的数据同步框架,实现了多种异构数据源之间高效的数据迁移。
FlinkX是一个数据同步工具,既可以采集静态的数据,比如MySQL,HDFS等,也可以采集实时变化的数据,比如MySQL ,binlog,Kafka等
FlinkX的安装
1.上传并解压
unzip flinkX-1.10.zip -d /usr/local/soft/
2.配置环境变量
3.给bin/flinkX这个文件加上执行权限
chmod a+x flinkx
4.修改配置文件,设置运行端口
vim flinkconf/flink-conf.yaml
5.web服务端口,不指定会随机生成一个
rest.bind-port:8888
FlinkX的简单使用
https://github.com/oceanos/flinkx/blob/1.8_release/README_OLD.
在flinkX官网中README_CH.md里面查看介绍文档
MySqlToHdfs
1.配置文件
{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "root",
"password": "12345678",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/students?useUnicode=true&characterEncoding=utf8&useSSL=false"
],
"table": [
"biaomin"
]
}
],
"column": [
"*"
],
"where": "id > 90",
"requestAccumulatorInterval": 2
},
"name": "mysqlreader"
},
"writer": {
"name": "hdfswriter",
"parameter": {
"path": "hdfs://master:9000/bigdata30/flinkx/out1",
"defaultFS": "hdfs://master:9000",
"column": [
{
"name": "col1",
"index": 0,
"type": "string"
},{
"name": "col2",
"index": 1,
"type": "string"
},{
"name": "col3",
"index": 2,
"type": "string"
},{
"name": "col4",
"index": 3,
"type": "string"
},{
"name": "col1",
"index": 4,
"type": "string"
},{
"name": "col2",
"index": 5,
"type": "string"
}
],
"fieldDelimiter": ",",
"fileType": "text",
"writeMode": "append"
}
}
}
],
"setting": {
"restore": {
"isRestore": false,
"isStream": false
},
"errorLimit": {},
"speed": {
"channel": 1
}:wq
}
}
}
2.启动任务
flinkx -mode local -job ./mysql2hdfs.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/
3.监听日志
flinkx 任务启动后,会在执行命令的目录下生成一个nohup.out文件
tail -F nohup.out
通过web界面查看任务运行情况
http://master:8888
MysqlToHive
启动hiveserver2
nohup hive --service metastore &
nohup hiveserver2 &
1.配置文件
{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "root",
"password": "12345678",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/studentdb?useUnicode=true&characterEncoding=utf8&useSSL=false"
],
"table": [
"jd_goods"
]
}
],
"column": [
"*"
],
"where": "id > 90",
"requestAccumulatorInterval": 1
},
"name": "mysqlreader"
},
"writer": {
"name": "hivewriter",
"parameter": {
"jdbcUrl": "jdbc:hive2://master:10000/bigdata30",
"username": "",
"password": "",
"fileType": "text",
"fieldDelimiter": ",",
"writeMode": "overwrite",
"charsetName": "UTF-8",
"tablesColumn": "",
"defaultFS": "hdfs://master:9000"
}
}
}
],
"setting": {
"restore": {
"isRestore": false,
"isStream": false
},
"errorLimit": {},
"speed": {
"channel": 1
}
}
}
}
2.在hive中创建testflinX数据库,并创建分区表
create database testflinkx;
use testflinkx;
CREATE TABLE `bigdata30`.`datax_tb1`(
`id` STRING,
`gname` STRING,
`price` STRING,
`commit` STRING,
`shop` STRING,
`icons` STRING)
PARTITIONED BY (
`pt` string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';
3.启动任务
flinkx -mode local -job ./mysql2hive.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
MysqlToHbase
1.配置文件
{
"job": {
"content": [
{
"reader": {
"parameter": {
"username": "root",
"password": "12345678",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/studentdb?useUnicode=true&characterEncoding=utf8&useSSL=false"
],
"table": [
"jd_goods"
]
}
],
"column": [
"*"
],
"where": "id > 90",
"requestAccumulatorInterval": 2
},
"name": "mysqlreader"
},
"writer": {
"name": "hbasewriter",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.property.clientPort": "2181",
"hbase.rootdir": "hdfs://master:9000/hbase",
"hbase.cluster.distributed": "true",
"hbase.zookeeper.quorum": "master,node1,node2",
"zookeeper.znode.parent": "/hbase"
},
"table": "flinkx_jd_goods",
"rowkeyColumn": "$(cf1:id)",
"column": [
{
"name": "cf1:id",
"type": "string"
},
{
"name": "cf1:gname",
"type": "string"
},
{
"name": "cf1:price",
"type": "string"
},
{
"name": "cf1:commit",
"type": "string"
},
{
"name": "cf1:shop",
"type": "string"
},
{
"name": "cf1:icons",
"type": "string"
}
]
}
}
}
],
"setting": {
"restore": {
"isRestore": false,
"isStream": false
},
"errorLimit": {},
"speed": {
"channel": 1
}
}
}
}
2.启动hbase 并创建flinkx表
create 'flinkX','lie1'
3.启动任务
flinkx -mode local -job ./mysql2hbase.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
4.查看日志
tail -F nohup.out
MysqlToMysql
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "gender",
"type": "string"
},
{
"name": "clazz",
"type": "string"
}
],
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://master:3306/student?useSSL=false"
],
"table": [
"student"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"jdbcUrl": "jdbc:mysql://master:3306/student?useSSL=false",
"table": [
"student2"
]
}
],
"writeMode": "insert",
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "gender",
"type": "string"
},
{
"name": "clazz",
"type": "string"
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 1,
"bytes": 0
}
}
}
}
版权归原作者 Act-F 所有, 如有侵权,请联系我们删除。