0


Python——脚本实现datax全量同步mysql到hive

文章目录


前言

在我们构建离线数仓时或者迁移数据时,通常选用sqoop和datax等工具进行操作,sqoop和datax各有优点,datax优点也很明显,基于内存,所以速度上很快,那么在进行全量同步时编写json文件是一项很繁琐的事,是否可以编写脚本来把繁琐事来简单化,接下来我将分享这样一个mysql全量同步到hive自动生成json文件的python脚本。


一、展示脚本

# coding=utf-8import json
import getopt
import os
import sys
import pymysql

# MySQL 相关配置,需根据实际情况作出修改
mysql_host ="XXXXXX"
mysql_port ="XXXX"
mysql_user ="XXX"
mysql_passwd ="XXXXXX"# HDFS NameNode 相关配置,需根据实际情况作出修改
hdfs_nn_host ="XXXXXX"
hdfs_nn_port ="XXXX"# 生成配置文件的目标路径,可根据实际情况作出修改
output_path ="/XXX/XXX/XXX"defget_connection():return pymysql.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, password=mysql_passwd)defget_mysql_meta(database, table):
    connection = get_connection()
    cursor = connection.cursor()
    sql ="SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql,[database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()return fetchall

defget_mysql_columns(database, table):returnlist(map(lambda x: x[0], get_mysql_meta(database, table)))defget_hive_columns(database, table):deftype_mapping(mysql_type):
        mappings ={"bigint":"bigint","int":"bigint","smallint":"bigint","tinyint":"bigint","decimal":"string","double":"double","float":"float","binary":"string","char":"string","varchar":"string","datetime":"string","time":"string","timestamp":"string","date":"string","text":"string"}return mappings[mysql_type]

    meta = get_mysql_meta(database, table)returnlist(map(lambda x:{"name": x[0],"type": type_mapping(x[1].lower())}, meta))defgenerate_json(source_database, source_table):
    job ={"job":{"setting":{"speed":{"channel":3},"errorLimit":{"record":0,"percentage":0.02}},"content":[{"reader":{"name":"mysqlreader","parameter":{"username": mysql_user,"password": mysql_passwd,"column": get_mysql_columns(source_database, source_table),"splitPk":"","connection":[{"table":[source_table],"jdbcUrl":["jdbc:mysql://"+ mysql_host +":"+ mysql_port +"/"+ source_database]}]}},"writer":{"name":"hdfswriter","parameter":{"defaultFS":"hdfs://"+ hdfs_nn_host +":"+ hdfs_nn_port,"fileType":"text","path":"${targetdir}","fileName": source_table,"column": get_hive_columns(source_database, source_table),"writeMode":"append","fieldDelimiter":"\t","compress":"gzip"}}}]}}ifnot os.path.exists(output_path):
        os.makedirs(output_path)withopen(os.path.join(output_path,".".join([source_database, source_table,"json"])),"w")as f:
        json.dump(job, f)defmain(args):
    source_database =""
    source_table =""

    options, arguments = getopt.getopt(args,'-d:-t:',['sourcedb=','sourcetbl='])for opt_name, opt_value in options:if opt_name in('-d','--sourcedb'):
            source_database = opt_value
        if opt_name in('-t','--sourcetbl'):
            source_table = opt_value

    generate_json(source_database, source_table)if __name__ =='__main__':
    main(sys.argv[1:])

二、使用准备

1、安装python环境

这里我安装的是python3环境

sudo yum install -y python3

2、安装EPEL

EPEL(Extra Packages for Enterprise Linux)是一个由 Fedora Special Interest Group 维护的软件仓库,提供了大量在官方 RHEL 或 CentOS 软件仓库中没有的软件包。当你在 CentOS 或 RHEL 系统上需要安装一些不在官方软件仓库中的软件时,通常会先安装epel - release

sudo yum install -y epel-release

3、安装脚本执行需要的第三方模块

pip3 install pymysql
pip3 install cryptography
这里可能由于斑纹问题cryptography安装不上去更新一下pip和setuptools
pip3 install --upgrade pip
pip3 install --upgrade setuptools

重新安装cryptography

pip3 install cryptography

三、脚本使用方法

1、配置脚本

首先根据自己服务器修改脚本相关配置

2、创建.py文件

vim /xxx/xxx/xxx/gen_import_config.py

3、执行脚本

python3 /脚本路径/gen_import_config.py -d 数据库名 -t 表名

4、测试生成json文件是否可用

datax.py -p"-Dtargetdir=/表在hdfs存放路径" /生成的json文件路径

执行时首先要确保targetdir目标地址在hdfs上存在,如果没有需要创建后再次执行

标签: python mysql hive

本文转载自: https://blog.csdn.net/qq_68076599/article/details/143166317
版权归原作者 大数据编程之光 所有, 如有侵权,请联系我们删除。

“Python——脚本实现datax全量同步mysql到hive”的评论:

还没有评论