0


FlinkX安装与使用

FlinkX概述

FlinkX是在袋鼠云内部广泛使用的基于flink的分布式离线和实时的数据同步框架,实现了多种异构数据源之间高效的数据迁移。

FlinkX是一个数据同步工具,既可以采集静态的数据,比如MySQL,HDFS等,也可以采集实时变化的数据,比如MySQL ,binlog,Kafka等

FlinkX的安装

1.上传并解压

unzip flinkX-1.10.zip -d /usr/local/soft/

2.配置环境变量

3.给bin/flinkX这个文件加上执行权限

chmod a+x flinkx

4.修改配置文件,设置运行端口

vim flinkconf/flink-conf.yaml

5.web服务端口,不指定会随机生成一个

rest.bind-port:8888

FlinkX的简单使用

https://github.com/oceanos/flinkx/blob/1.8_release/README_OLD.

在flinkX官网中README_CH.md里面查看介绍文档

MySqlToHdfs

1.配置文件

{
    "job": {
        "content": [
            {
                "reader": {
                    "parameter": {
                        "username": "root",
                        "password": "12345678",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/students?useUnicode=true&characterEncoding=utf8&useSSL=false"
                                ],
                                "table": [
                                    "biaomin"
                                ]
                            }
                        ],
                        "column": [
                            "*"
                        ],
                        "where": "id > 90",
                        "requestAccumulatorInterval": 2
                    },
                    "name": "mysqlreader"
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "path": "hdfs://master:9000/bigdata30/flinkx/out1",
                        "defaultFS": "hdfs://master:9000",
                        "column": [
                            {
                                "name": "col1",
                                "index": 0,
                                "type": "string"
                            },{
                                "name": "col2",
                                "index": 1,
                                "type": "string"
                            },{
                                "name": "col3",
                                "index": 2,
                                "type": "string"
                            },{
                                "name": "col4",
                                "index": 3,
                                "type": "string"
                            },{
                                "name": "col1",
                                "index": 4,
                                "type": "string"
                            },{
                                "name": "col2",
                                "index": 5,
                                "type": "string"
                            }
                        ],
                        "fieldDelimiter": ",",
                        "fileType": "text",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "restore": {
                "isRestore": false,
                "isStream": false
            },
            "errorLimit": {},
            "speed": {
                "channel": 1
            }:wq
        }
    }
}

2.启动任务

flinkx -mode local -job ./mysql2hdfs.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/

3.监听日志

flinkx 任务启动后,会在执行命令的目录下生成一个nohup.out文件

tail -F nohup.out

通过web界面查看任务运行情况

http://master:8888
MysqlToHive

启动hiveserver2

nohup hive --service metastore &
nohup hiveserver2 &

1.配置文件

{
    "job": {
        "content": [
            {
                "reader": {
                    "parameter": {
                        "username": "root",
                        "password": "12345678",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/studentdb?useUnicode=true&characterEncoding=utf8&useSSL=false"
                                ],
                                "table": [
                                    "jd_goods"
                                ]
                            }
                        ],
                        "column": [
                            "*"
                        ],
                        "where": "id > 90",
                        "requestAccumulatorInterval": 1
                    },
                    "name": "mysqlreader"
                },
                "writer": {
                    "name": "hivewriter",
                    "parameter": {
                        "jdbcUrl": "jdbc:hive2://master:10000/bigdata30",
                        "username": "",
                        "password": "",
                        "fileType": "text",
                        "fieldDelimiter": ",",
                        "writeMode": "overwrite",
                        "charsetName": "UTF-8",
                            "tablesColumn": "",
                        "defaultFS": "hdfs://master:9000"
                    }
                }
            }
        ],
        "setting": {
            "restore": {
                "isRestore": false,
                "isStream": false
            },
            "errorLimit": {},
            "speed": {
                "channel": 1
            }
        }
    }
}

2.在hive中创建testflinX数据库,并创建分区表

create database testflinkx;
use testflinkx;
CREATE TABLE `bigdata30`.`datax_tb1`(
    `id` STRING,
    `gname` STRING,
    `price` STRING,
    `commit` STRING,
    `shop` STRING,
    `icons` STRING)
PARTITIONED BY ( 
  `pt` string)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',';

3.启动任务

flinkx -mode local -job ./mysql2hive.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/
MysqlToHbase

1.配置文件

{
    "job": {
        "content": [
            {
                "reader": {
                    "parameter": {
                        "username": "root",
                        "password": "12345678",
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://master:3306/studentdb?useUnicode=true&characterEncoding=utf8&useSSL=false"
                                ],
                                "table": [
                                    "jd_goods"
                                ]
                            }
                        ],
                        "column": [
                            "*"
                        ],
                        "where": "id > 90",
                        "requestAccumulatorInterval": 2
                    },
                    "name": "mysqlreader"
                },
                "writer": {
                    "name": "hbasewriter",
                    "parameter": {
                        "hbaseConfig": {
                            "hbase.zookeeper.property.clientPort": "2181",
                            "hbase.rootdir": "hdfs://master:9000/hbase",
                            "hbase.cluster.distributed": "true",
                            "hbase.zookeeper.quorum": "master,node1,node2",
                            "zookeeper.znode.parent": "/hbase"
                        },
                        "table": "flinkx_jd_goods",
                        "rowkeyColumn": "$(cf1:id)",
                        "column": [
                            {
                                "name": "cf1:id",
                                "type": "string"
                            },
                            {
                                "name": "cf1:gname",
                                "type": "string"
                            },
                            {
                                "name": "cf1:price",
                                "type": "string"
                            },
                            {
                                "name": "cf1:commit",
                                "type": "string"
                            },
                            {
                                "name": "cf1:shop",
                                "type": "string"
                            },
                            {
                                "name": "cf1:icons",
                                "type": "string"
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "restore": {
                "isRestore": false,
                "isStream": false
            },
            "errorLimit": {},
            "speed": {
                "channel": 1
            }
        }
    }
}

2.启动hbase 并创建flinkx表

create 'flinkX','lie1'

3.启动任务

flinkx -mode local -job ./mysql2hbase.json -pluginRoot /usr/local/soft/flinkx-1.10/syncplugins/ -flinkconf /usr/local/soft/flinkx-1.10/flinkconf/

4.查看日志

tail -F nohup.out
MysqlToMysql
{
    "job": {
      "content": [
        {
          "reader": {
            "name": "mysqlreader",
            "parameter": {
              "column": [
                {
                  "name": "id",
                  "type": "int"
                },
                {
                  "name": "name",
                  "type": "string"
                },
                {
                  "name": "age",
                  "type": "int"
                },
                {
                  "name": "gender",
                  "type": "string"
                },
                {
                  "name": "clazz",
                  "type": "string"
                }
              ],
              "username": "root",
              "password": "123456",
              "connection": [
                {
                  "jdbcUrl": [
                    "jdbc:mysql://master:3306/student?useSSL=false"
                  ],
                  "table": [
                    "student"
                  ]
                }
              ]
            }
          },
          "writer": {
            "name": "mysqlwriter",
            "parameter": {
              "username": "root",
              "password": "123456",
              "connection": [
                {
                  "jdbcUrl": "jdbc:mysql://master:3306/student?useSSL=false",
                  "table": [
                    "student2"
                  ]
                }
              ],
              "writeMode": "insert",
              "column": [
                {
                    "name": "id",
                    "type": "int"
                  },
                  {
                    "name": "name",
                    "type": "string"
                  },
                  {
                    "name": "age",
                    "type": "int"
                  },
                  {
                    "name": "gender",
                    "type": "string"
                  },
                  {
                    "name": "clazz",
                    "type": "string"
                  }
              ]
            }
          }
        }
      ],
      "setting": {
        "speed": {
          "channel": 1,
          "bytes": 0
        }
      }
    }
  }
标签: flink

本文转载自: https://blog.csdn.net/2403_83630621/article/details/140307486
版权归原作者 Act-F 所有, 如有侵权,请联系我们删除。

“FlinkX安装与使用”的评论:

还没有评论