文章目录
前言
在我们构建离线数仓时或者迁移数据时,通常选用sqoop和datax等工具进行操作,sqoop和datax各有优点,datax优点也很明显,基于内存,所以速度上很快,那么在进行全量同步时编写json文件是一项很繁琐的事,是否可以编写脚本来把繁琐事来简单化,接下来我将分享这样一个mysql全量同步到hive自动生成json文件的python脚本。
一、展示脚本
# coding=utf-8import json
import getopt
import os
import sys
import pymysql
# MySQL 相关配置,需根据实际情况作出修改
mysql_host ="XXXXXX"
mysql_port ="XXXX"
mysql_user ="XXX"
mysql_passwd ="XXXXXX"# HDFS NameNode 相关配置,需根据实际情况作出修改
hdfs_nn_host ="XXXXXX"
hdfs_nn_port ="XXXX"# 生成配置文件的目标路径,可根据实际情况作出修改
output_path ="/XXX/XXX/XXX"defget_connection():return pymysql.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, password=mysql_passwd)defget_mysql_meta(database, table):
connection = get_connection()
cursor = connection.cursor()
sql ="SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
cursor.execute(sql,[database, table])
fetchall = cursor.fetchall()
cursor.close()
connection.close()return fetchall
defget_mysql_columns(database, table):returnlist(map(lambda x: x[0], get_mysql_meta(database, table)))defget_hive_columns(database, table):deftype_mapping(mysql_type):
mappings ={"bigint":"bigint","int":"bigint","smallint":"bigint","tinyint":"bigint","decimal":"string","double":"double","float":"float","binary":"string","char":"string","varchar":"string","datetime":"string","time":"string","timestamp":"string","date":"string","text":"string"}return mappings[mysql_type]
meta = get_mysql_meta(database, table)returnlist(map(lambda x:{"name": x[0],"type": type_mapping(x[1].lower())}, meta))defgenerate_json(source_database, source_table):
job ={"job":{"setting":{"speed":{"channel":3},"errorLimit":{"record":0,"percentage":0.02}},"content":[{"reader":{"name":"mysqlreader","parameter":{"username": mysql_user,"password": mysql_passwd,"column": get_mysql_columns(source_database, source_table),"splitPk":"","connection":[{"table":[source_table],"jdbcUrl":["jdbc:mysql://"+ mysql_host +":"+ mysql_port +"/"+ source_database]}]}},"writer":{"name":"hdfswriter","parameter":{"defaultFS":"hdfs://"+ hdfs_nn_host +":"+ hdfs_nn_port,"fileType":"text","path":"${targetdir}","fileName": source_table,"column": get_hive_columns(source_database, source_table),"writeMode":"append","fieldDelimiter":"\t","compress":"gzip"}}}]}}ifnot os.path.exists(output_path):
os.makedirs(output_path)withopen(os.path.join(output_path,".".join([source_database, source_table,"json"])),"w")as f:
json.dump(job, f)defmain(args):
source_database =""
source_table =""
options, arguments = getopt.getopt(args,'-d:-t:',['sourcedb=','sourcetbl='])for opt_name, opt_value in options:if opt_name in('-d','--sourcedb'):
source_database = opt_value
if opt_name in('-t','--sourcetbl'):
source_table = opt_value
generate_json(source_database, source_table)if __name__ =='__main__':
main(sys.argv[1:])
二、使用准备
1、安装python环境
这里我安装的是python3环境
sudo yum install -y python3
2、安装EPEL
EPEL(Extra Packages for Enterprise Linux)是一个由 Fedora Special Interest Group 维护的软件仓库,提供了大量在官方 RHEL 或 CentOS 软件仓库中没有的软件包。当你在 CentOS 或 RHEL 系统上需要安装一些不在官方软件仓库中的软件时,通常会先安装epel - release
sudo yum install -y epel-release
3、安装脚本执行需要的第三方模块
pip3 install pymysql
pip3 install cryptography
这里可能由于斑纹问题cryptography安装不上去更新一下pip和setuptools
pip3 install --upgrade pip
pip3 install --upgrade setuptools
重新安装cryptography
pip3 install cryptography
三、脚本使用方法
1、配置脚本
首先根据自己服务器修改脚本相关配置
2、创建.py文件
vim /xxx/xxx/xxx/gen_import_config.py
3、执行脚本
python3 /脚本路径/gen_import_config.py -d 数据库名 -t 表名
4、测试生成json文件是否可用
datax.py -p"-Dtargetdir=/表在hdfs存放路径" /生成的json文件路径
执行时首先要确保targetdir目标地址在hdfs上存在,如果没有需要创建后再次执行
版权归原作者 大数据编程之光 所有, 如有侵权,请联系我们删除。