0


使用DataX和sqoop将数据从MySQL导入Hive

使用DataX和sqoop将数据从MySQL导入Hive

一、DataX简述

DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。
源码地址:https://github.com/alibaba/DataX

二、sqoop简述

Apache Sqoop(SQL-to-Hadoop)项目旨在协助RDBMS与Hadoop之间进行高效的大数据交流。用户可以在 Sqoop 的帮助下,轻松地把关系型数据库的数据导入到 Hadoop 与其相关的系统 (如HBase和Hive)中;同时也可以把数据从 Hadoop 系统里抽取并导出到关系型数据库里。
Sqoop是一个在结构化数据和Hadoop之间进行批量数据迁移的工具,结构化数据可以是MySQL、Oracle等RDBMS。Sqoop底层用MapReduce程序实现抽取、转换、加载,MapReduce天生的特性保证了并行化和高容错率,而且相比Kettle等传统ETL工具,任务跑在Hadoop集群上,减少了ETL服务器资源的使用情况。在特定场景下,抽取过程会有很大的性能提升。

三、需求背景

MySQL中的库ecommerce有以下表,需要将以下表导入Hive中的ecommerce库中作为ODS层以后进一步处理。

  • 用户信息表(t_member)
  • 用户信息表(t_member)
  • 用户地址表(t_member_addr)
  • 商品信息表(t_commodity)
  • 商品类别信息表(t_commodity_cate)
  • 订单表(t_order)
  • 订单商品表(t_order_commodity)
  • 优惠券表(t_coupon)
  • 用户优惠券表(t_coupon_member)
  • 订单优惠券表(t_coupon_order)
  • 快递表(t_delivery)
  • 反馈表(t_feedback)
  • 商店表(t_shop)
  • 商家订单表(t_shop_order,订单对于卖家也有记录)
  • 后台用户表(t_user,员工表)

四、实现方式

可以通过sqoop和DataX两种方式来实现数据从MySQL导入Hive功能。

3.1 使用DataX将数据从MySQL导入Hive

在Hive里建库 ecommerce

create database ecommerce;

使用Python写入DataX配置文件批量生成脚本gen_import_config.py

# coding=utf-8import json
import getopt
import os
import sys
import MySQLdb

#MySQL相关配置,需根据实际情况作出修改
mysql_host ="hadoop102"
mysql_port ="3306"
mysql_user ="root"
mysql_passwd ="123456"#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host ="hadoop102"
hdfs_nn_port ="8020"#生成配置文件的目标路径,可根据实际情况作出修改
output_path ="/opt/module/datax/job/import"#获取mysql连接defget_connection():return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)#获取表格的元数据  包含列名和数据类型defget_mysql_meta(database, table):
    connection = get_connection()
    cursor = connection.cursor()
    sql ="SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql,[database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()return fetchall

#获取mysql表的列名defget_mysql_columns(database, table):returnmap(lambda x: x[0], get_mysql_meta(database, table))#将获取的元数据中mysql的数据类型转换为hive的数据类型  写入到hdfswriter中defget_hive_columns(database, table):deftype_mapping(mysql_type):
        mappings ={"bigint":"bigint","int":"bigint","smallint":"bigint","tinyint":"bigint","decimal":"string","double":"double","float":"float","binary":"string","char":"string","varchar":"string","datetime":"string","time":"string","timestamp":"string","date":"string","text":"string","tinytext":"string"}return mappings[mysql_type]

    meta = get_mysql_meta(database, table)returnmap(lambda x:{"name": x[0],"type": type_mapping(x[1].lower())}, meta)#生成json文件defgenerate_json(source_database, source_table):
    job ={"job":{"setting":{"speed":{"channel":3},"errorLimit":{"record":0,"percentage":0.02}},"content":[{"reader":{"name":"mysqlreader","parameter":{"username": mysql_user,"password": mysql_passwd,"column": get_mysql_columns(source_database, source_table),"splitPk":"","connection":[{"table":[source_table],"jdbcUrl":["jdbc:mysql://"+ mysql_host +":"+ mysql_port +"/"+ source_database]}]}},"writer":{"name":"hdfswriter","parameter":{"defaultFS":"hdfs://"+ hdfs_nn_host +":"+ hdfs_nn_port,"fileType":"text","path":"${targetdir}","fileName": source_table,"column": get_hive_columns(source_database, source_table),"writeMode":"append","fieldDelimiter":"\t","compress":"gzip"}}}]}}ifnot os.path.exists(output_path):
        os.makedirs(output_path)withopen(os.path.join(output_path,".".join([source_database, source_table,"json"])),"w")as f:
        json.dump(job, f)defmain(args):
    source_database =""
    source_table =""

    options, arguments = getopt.getopt(args,'-d:-t:',['sourcedb=','sourcetbl='])for opt_name, opt_value in options:if opt_name in('-d','--sourcedb'):
            source_database = opt_value
        if opt_name in('-t','--sourcetbl'):
            source_table = opt_value

    generate_json(source_database, source_table)if __name__ =='__main__':
    main(sys.argv[1:])

由于需要使用Python访问Mysql数据库,故需安装驱动,命令如下:

yum install -y MySQL-python

调用上述的配置文件脚本

#!/bin/bash
python /root/bin/gen_import_config.py -d ecommerce -t t_commodity
python /root/bin/gen_import_config.py -d ecommerce -t t_commodity_cate
python /root/bin/gen_import_config.py -d ecommerce -t t_coupon
python /root/bin/gen_import_config.py -d ecommerce -t t_coupon_member
python /root/bin/gen_import_config.py -d ecommerce -t t_coupon_order
python /root/bin/gen_import_config.py -d ecommerce -t t_delivery
python /root/bin/gen_import_config.py -d ecommerce -t t_feedback
python /root/bin/gen_import_config.py -d ecommerce -t t_member
python /root/bin/gen_import_config.py -d ecommerce -t t_member_addr
python /root/bin/gen_import_config.py -d ecommerce -t t_order
python /root/bin/gen_import_config.py -d ecommerce -t t_order_commodity
python /root/bin/gen_import_config.py -d ecommerce -t t_shop
python /root/bin/gen_import_config.py -d ecommerce -t t_shop_order
python /root/bin/gen_import_config.py -d ecommerce -t t_user

在/opt/module/datax/job/import可以看到json文件
json文件
编写脚本将mysql数据传入HDFS中

#!/bin/bash
DATAX_HOME=/opt/module/datax
#处理目标路径,此处的处理逻辑是,如果目标路径不存在,则创建;若存在,则清空,目的是保证同步任务可重复执行
handle_targetdir(){
  hadoop fs -test -e $1if[[ $? -eq 1 ]]; then
    echo"路径$1不存在,正在创建......"
    hadoop fs -mkdir -p $1elseecho"路径$1已经存在"
    fs_count=$(hadoop fs -count $1)
    content_size=$(echo$fs_count| awk '{print $3}')if[[$content_size-eq 0 ]]; then
      echo"路径$1为空"elseecho"路径$1不为空,正在清空......"
      hadoop fs -rm-r -f $1/*
    fi
  fi
}#数据同步
import_data(){
  datax_config=$1
  target_dir=$2

  handle_targetdir $target_dir
  python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$target_dir"$datax_config}

case $1 in
"t_commodity")
  import_data /opt/module/datax/job/import/ecommerce.t_commodity.json /ecommerce/t_commodity
  ;;"t_commodity_cate")
  import_data /opt/module/datax/job/import/ecommerce.t_commodity_cate.json /ecommerce/t_commodity_cate
  ;;"t_coupon")
  import_data /opt/module/datax/job/import/ecommerce.t_coupon.json /ecommerce/t_coupon
  ;;"t_coupon_member")
  import_data /opt/module/datax/job/import/ecommerce.t_coupon_member.json /ecommerce/t_coupon_member
  ;;"t_coupon_order")
  import_data /opt/module/datax/job/import/ecommerce.t_coupon_order.json /ecommerce/t_coupon_order
  ;;"t_delivery")
  import_data /opt/module/datax/job/import/ecommerce.t_delivery.json /ecommerce/t_delivery
  ;;"t_feack")
  import_data /opt/module/datax/job/import/ecommerce.t_feack.json /ecommerce/t_feack
  ;;"t_member")
  import_data /opt/module/datax/job/import/ecommerce.t_member.json /ecommerce/t_member
  ;;"t_member_addr")
  import_data /opt/module/datax/job/import/ecommerce.t_member_addr.json /ecommerce/t_member_addr
  ;;"t_order")
  import_data /opt/module/datax/job/import/ecommerce.t_order.json /ecommerce/t_order
  ;;"t_order_commodity")
  import_data /opt/module/datax/job/import/ecommerce.t_order_commodity.json /ecommerce/t_order_commodity
  ;;"t_shop")
  import_data /opt/module/datax/job/import/ecommerce.t_shop.json /ecommerce/t_shop
  ;;"t_shop_order")
  import_data /opt/module/datax/job/import/ecommerce.t_shop_order.json /ecommerce/t_shop_order
  ;;"t_user")
  import_data /opt/module/datax/job/import/ecommerce.t_user.json /ecommerce/t_user
  ;;"all")
  import_data /opt/module/datax/job/import/ecommerce.t_commodity.json /ecommerce/t_commodity
  import_data /opt/module/datax/job/import/ecommerce.t_commodity_cate.json /ecommerce/t_commodity_cate
  import_data /opt/module/datax/job/import/ecommerce.t_coupon.json /ecommerce/t_coupon
  import_data /opt/module/datax/job/import/ecommerce.t_coupon_member.json /ecommerce/t_coupon_member
  import_data /opt/module/datax/job/import/ecommerce.t_coupon_order.json /ecommerce/t_coupon_order
  import_data /opt/module/datax/job/import/ecommerce.t_delivery.json /ecommerce/t_delivery
  import_data /opt/module/datax/job/import/ecommerce.t_feedback.json /ecommerce/t_feedback
  import_data /opt/module/datax/job/import/ecommerce.t_member.json /ecommerce/t_member
  import_data /opt/module/datax/job/import/ecommerce.t_member_addr.json /ecommerce/t_member_addr
  import_data /opt/module/datax/job/import/ecommerce.t_order.json /ecommerce/t_order
  import_data /opt/module/datax/job/import/ecommerce.t_order_commodity.json /ecommerce/t_order_commodity
  import_data /opt/module/datax/job/import/ecommerce.t_shop.json /ecommerce/t_shop
  import_data /opt/module/datax/job/import/ecommerce.t_shop_order.json /ecommerce/t_shop_order
  import_data /opt/module/datax/job/import/ecommerce.t_user.json /ecommerce/t_user
  ;;
esac

调用上述函数可以在HDFS上看到数据库文件
HDFS/ecommerce下的数据
启动hiveserver2在Hive中建表,与MySQL字段映射关系如下
Hive中字段数据类型对比
写入SQL建表及加载HDFS上的数据

DROPTABLEIFEXISTS`t_commodity`;CREATETABLE`t_commodity`(`id`BIGINT,`commodity_name` STRING,`commodity_price`DECIMAL(10,2),`commodity_cate_one`BIGINT,`commodity_cate_two`BIGINT,`create_user_id`BIGINT,`status`BIGINT,`create_time` STRING,`update_time` STRING
)COMMENT'商品信息表'ROW FORMAT DELIMITED FIELDSTERMINATEDBY'\t'NULL DEFINED AS''
LOCATION '/warehouse/ecommerce/t_commodity/';loaddata inpath 'hdfs:/ecommerce/t_commodity'intotable`t_commodity`;DROPTABLEIFEXISTS`t_commodity_cate`;CREATETABLE`t_commodity_cate`(`id`BIGINT,`cate_name` STRING,`cate_parent_id`BIGINT,`create_user_id`BIGINT,`status`BIGINT,`create_time` STRING,`update_time` STRING
)COMMENT'商品类别信息表'ROW FORMAT DELIMITED FIELDSTERMINATEDBY'\t'NULL DEFINED AS''
LOCATION '/warehouse/ecommerce/t_commodity_cate/';loaddata inpath 'hdfs:/ecommerce/t_commodity_cate'intotable`t_commodity_cate`;DROPTABLEIFEXISTS`t_coupon`;CREATETABLE`t_coupon`(`id`BIGINT,`coupon_name` STRING,`coupon_price`decimal(10,2),`create_user_id`BIGINT,`create_time` STRING,`update_time` STRING
)COMMENT'优惠券表'ROW FORMAT DELIMITED FIELDSTERMINATEDBY'\t'NULL DEFINED AS''
LOCATION '/warehouse/ecommerce/t_coupon/';loaddata inpath 'hdfs:/ecommerce/t_coupon'intotable`t_coupon`;DROPTABLEIFEXISTS`t_coupon_member`;CREATETABLE`t_coupon_member`(`id`BIGINT,`coupon_id`BIGINT,`member_id`BIGINT,`coupon_channel`BIGINTCOMMENT'1 用户购买 2 公司发放',`create_time` STRING,`update_time` STRING
)COMMENT'用户优惠券表'ROW FORMAT DELIMITED FIELDSTERMINATEDBY'\t'NULL DEFINED AS''
LOCATION '/warehouse/ecommerce/t_coupon_member/';loaddata inpath 'hdfs:/ecommerce/t_coupon_member'intotable`t_coupon_member`;DROPTABLEIFEXISTS`t_member`;CREATETABLE`t_member`(`id`BIGINT,`name` STRING,`password` STRING,`sex` STRING,`phone` STRING,`address_default_id`BIGINT,`member_channel`BIGINTCOMMENT'1 IOS 2 android 3 微信小程序 4 微信公众号 5 h5',`mp_open_id` STRING COMMENT'微信公众号openId',`status`BIGINT,`create_time` STRING,`update_time` STRING
)COMMENT'用户信息表'ROW FORMAT DELIMITED FIELDSTERMINATEDBY'\t'NULL DEFINED AS''
LOCATION '/warehouse/ecommerce/t_member/';loaddata inpath 'hdfs:/ecommerce/t_member'intotable`t_member`;DROPTABLEIFEXISTS`t_member_addr`;CREATETABLE`t_member_addr`(`id`BIGINT,`member_id`BIGINT,`contact_person` STRING,`contact_phone` STRING,`address` STRING,`create_time` STRING,`update_time` STRING
)COMMENT'用户地址表'ROW FORMAT DELIMITED FIELDSTERMINATEDBY'\t'NULL DEFINED AS''
LOCATION '/warehouse/ecommerce/t_member_addr/';loaddata inpath 'hdfs:/ecommerce/t_member_addr'intotable`t_member_addr`;DROPTABLEIFEXISTS`t_delivery`;CREATETABLE`t_delivery`(`id`BIGINT,`delivery_no` STRING,`order_id`BIGINT,`shop_id`BIGINT,`postman`BIGINT,`pick_time` STRING,`arrive_time` STRING,`member_id`BIGINT,`member_addr_id`BIGINT)COMMENT'快递表'ROW FORMAT DELIMITED FIELDSTERMINATEDBY'\t'NULL DEFINED AS''
LOCATION '/warehouse/ecommerce/t_delivery/';loaddata inpath 'hdfs:/ecommerce/t_delivery'intotable`t_delivery`;DROPTABLEIFEXISTS`t_feedback`;CREATETABLE`t_feedback`(`id`BIGINT,`member_id`BIGINT,`create_user_id`BIGINT,`feedback_content` STRING,`feedback_type`BIGINTCOMMENT'1 破损 2 缺货 3 错货 4 投诉')COMMENT'反馈表'ROW FORMAT DELIMITED FIELDSTERMINATEDBY'\t'NULL DEFINED AS''
LOCATION '/warehouse/ecommerce/t_feedback/';loaddata inpath 'hdfs:/ecommerce/t_feedback'intotable`t_feedback`;DROPTABLEIFEXISTS`t_shop`;CREATETABLE`t_shop`(`id`BIGINT,`shop_name` STRING,`city_id`BIGINT,`city_name` STRING,`area_id`BIGINT,`area_name` STRING,`charge_user` STRING,`create_time` STRING,`update_time` STRING
)COMMENT'商店表'ROW FORMAT DELIMITED FIELDSTERMINATEDBY'\t'NULL DEFINED AS''
LOCATION '/warehouse/ecommerce/t_shop/';loaddata inpath 'hdfs:/ecommerce/t_shop'intotable`t_shop`;DROPTABLEIFEXISTS`t_shop_order`;CREATETABLE`t_shop_order`(`id`BIGINT,`shop_id`BIGINT,`order_id`BIGINT,`start_time` STRING,`done_time` STRING
)COMMENT'商家订单表'ROW FORMAT DELIMITED FIELDSTERMINATEDBY'\t'NULL DEFINED AS''
LOCATION '/warehouse/ecommerce/t_shop_order/';loaddata inpath 'hdfs:/ecommerce/t_shop_order'intotable`t_shop_order`;DROPTABLEIFEXISTS`t_user`;CREATETABLE`t_user`(`id`BIGINT,`user_name` STRING,`user_password` STRING,`user_phone` STRING,`create_time` STRING,`update_time` STRING
)COMMENT'后台用户表'ROW FORMAT DELIMITED FIELDSTERMINATEDBY'\t'NULL DEFINED AS''
LOCATION '/warehouse/ecommerce/t_user/';loaddata inpath 'hdfs:/ecommerce/t_user'intotable`t_user`;DROPTABLEIFEXISTS`t_order`;CREATETABLE`t_order`(`order_id`BIGINT,`member_id`BIGINT,`origin_price`decimal(10,2),`pay_price`decimal(10,2),`shop_id`BIGINT,`shop_name` STRING,`order_status` STRING COMMENT'1,进行中 2 已完成 3 已取消',`create_time` STRING,`update_time` STRING
)COMMENT'订单表'ROW FORMAT DELIMITED FIELDSTERMINATEDBY'\t'NULL DEFINED AS''
LOCATION '/warehouse/ecommerce/t_order/';loaddata inpath 'hdfs:/ecommerce/t_order'intotable`t_order`;DROPTABLEIFEXISTS`t_order_commodity`;CREATETABLE`t_order_commodity`(`id`BIGINT,`order_id`BIGINT,`commodity_id`BIGINT,`commodity_name` STRING,`commodity_num`BIGINT,`commodity_price`decimal(10,2),`create_time` STRING,`update_time` STRING
)COMMENT'订单商品表'ROW FORMAT DELIMITED FIELDSTERMINATEDBY'\t'NULL DEFINED AS''
LOCATION '/warehouse/ecommerce/t_order_commodity/';loaddata inpath 'hdfs:/ecommerce/t_order_commodity'intotable`t_order_commodity`;

在Hive中可以看到如下表格
表格
数据也都进入了Hive表中
表格数据

3.2 通过sqoop将数据从MySQL导入Hive

在Hive里建库 ecommerce

create database ecommerce;

运行下面的函数可以将数据从MySQL导入Hive

#!/bin/bash# 定义一个函数,执行sqoop命令,所以执行脚本应该在sqoop/bin下面
sq(){./sqoop import \
--connect jdbc:mysql://hadoop102:3306/ecommerce \
--username root \
--password 000000 \
--table $1 \
--num-mappers 1 \
--hive-import \
--fields-terminated-by "\t" \
--hive-overwrite \
--hive-database ecommerce \
--hive-table $1}
sq t_commodity
sq t_commodity_cate
sq t_coupon
sq t_coupon_member
sq t_coupon_order
sq t_delivery
sq t_feedback
sq t_member
sq t_member_addr
sq t_order
sq t_order_commodity
sq t_shop
sq t_shop_order
sq t_user

四、总结

4.1 Datax主要特点

1、异构数据库 和 文件系统 之间的数据交换;
2、采用 Framework + plugin 架构构建,
Framework 处理了缓冲,流控,并发,上下文加载
等高速数据交换的大部分技术问题,提供了简单的接口与插件交互,
插件仅需实现对数据处理系统的访问;
3、数据传输过程在单进程内完成,全内存操作,不读写磁盘,也没有 IPC;
4、开放式的框架,开发者可以在极短的时间开发一个新插件
以快速支持新的数据库/文件系统。

4.2 Sqoop主要特点

1、可以将关系型数据库中的数据导入 hdfs、hive 或者 hbase 等 hadoop 组件中,
也可将 hadoop 组件中的数据导入到关系型数据库中;
2、sqoop 在导入导出数据时,充分采用了 map-reduce 计算框架,
根据输入条件生成一个 map-reduce 作业,在 hadoop 集群中运行。
采用 map-reduce 框架同时在多个节点进行 import 或者 export 操作,
速度比单节点运行多个并行导入导出效率高,同时提供了良好的并发性和容错性;
3、支持 insert、update 模式,可以选择参数,若内容存在就更新,若不存在就插入;
4、对国外的主流关系型数据库支持性更好。

4.3 Sqoop 和 Datax的区别

1、sqoop 采用 map-reduce 计算框架进行导入导出,而 datax 仅仅在运行 datax 的单台机器上进行数据的抽取和加载,速度比 sqoop 慢了许多;
2、sqoop 只可以在关系型数据库和 hadoop 组件之间进行数据迁移,
而在 hadoop 相关组件之间,比如 hive 和 hbase 之间就无法使用 sqoop 互相导入导出数据,同时在关系型数据库之间,比如 mysql 和 oracle 之间也无法通过 sqoop 导入导出数据。
datax 能够分别实现关系型数据库 hadoop 组件之间、关系型数据库之间、hadoop 组件之间的数据迁移;
3、sqoop 是专门为 hadoop 而生,对 hadoop 支持度好,而 datax 可能会出现不支持高版本 hadoop 的现象;
4、sqoop 只支持官方提供的指定几种关系型数据库和 hadoop 组件之间的数据交换,而在 datax 中,用户只需根据自身需求修改文件,生成相应 rpm 包,自行安装之后就可以使用自己定制的插件;

标签: hive sqoop mysql

本文转载自: https://blog.csdn.net/Davidchou3165/article/details/127472181
版权归原作者 Davidchou3165 所有, 如有侵权,请联系我们删除。

“使用DataX和sqoop将数据从MySQL导入Hive”的评论:

还没有评论