0


离线数仓数据导出-hive数据同步到mysql

离线数仓数据导出-hive数据同步到mysql

为方便报表应用使用数据,需将ads各指标的统计结果导出到MySQL数据库中。

datax支持hive同步MySQL:仅仅支持hive存储的hdfs文件导出。所以reader选hdfs-reader,writer选mysql-writer。

在这里插入图片描述
null值 在hive和mysql里的存储格式不一样,需要告诉DataX应该如何转换。
在这里插入图片描述

MySQL建库建表

12.1.1 创建数据库

CREATE DATABASE IF NOT EXISTS gmall_report DEFAULT CHARSET utf8 COLLATE utf8_general_ci;

建mysql表的,
1字段个数要和hive中的ads层数据保持一致,
2字段类型要和hive对等替换,
3字段顺序也要一致
每张表要有主键

1)各活动补贴率
dt activity_id activity_name 三个主键联合而成

DROP TABLE IF EXISTS `ads_activity_stats`;
CREATE TABLE `ads_activity_stats`  (
  `dt` date NOT NULL COMMENT '统计日期',
  `activity_id` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '活动ID',
  `activity_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '活动名称',
  `start_date` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '活动开始日期',
  `reduce_rate` decimal(16, 2) NULL DEFAULT NULL COMMENT '补贴率',
  PRIMARY KEY (`dt`, `activity_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '活动统计' ROW_FORMAT = Dynamic;

数据导出

DataX配置文件生成脚本
方便起见,此处提供了DataX配置文件批量生成脚本,脚本内容及使用方式如下。
1)在~/bin目录下创建gen_export_config.py脚本
[atguigu@hadoop102 bin]$ vim ~/bin/gen_export_config.py
脚本内容如下

# coding=utf-8import json
import getopt
import os
import sys
import MySQLdb

#MySQL相关配置,需根据实际情况作出修改
mysql_host ="hadoop102"
mysql_port ="3306"
mysql_user ="root"
mysql_passwd ="000000"

#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host ="hadoop102"
hdfs_nn_port ="8020"

#生成配置文件的目标路径,可根据实际情况作出修改
output_path ="/opt/module/datax/job/export"

def get_connection():return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)

def get_mysql_meta(database, table):
    connection =get_connection()
    cursor = connection.cursor()
    sql ="SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql,[database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()return fetchall

def get_mysql_columns(database, table):returnmap(lambda x: x[0],get_mysql_meta(database, table))

def generate_json(target_database, target_table):
    job ={"job":{"setting":{"speed":{"channel":3},"errorLimit":{"record":0,"percentage":0.02}},"content":[{"reader":{"name":"hdfsreader","parameter":{"path":"${exportdir}","defaultFS":"hdfs://"+ hdfs_nn_host +":"+ hdfs_nn_port,"column":["*"],"fileType":"text","encoding":"UTF-8","fieldDelimiter":"\t","nullFormat":"\\N"}},"writer":{"name":"mysqlwriter","parameter":{"writeMode":"replace","username": mysql_user,"password": mysql_passwd,"column":get_mysql_columns(target_database, target_table),"connection":[{"jdbcUrl":"jdbc:mysql://"+ mysql_host +":"+ mysql_port +"/"+ target_database +"?useUnicode=true&characterEncoding=utf-8","table":[target_table]}]}}}]}}if not os.path.exists(output_path):
        os.makedirs(output_path)withopen(os.path.join(output_path,".".join([target_database, target_table,"json"])),"w")asf:
        json.dump(job, f)

def main(args):
    target_database =""
    target_table =""

    options, arguments = getopt.getopt(args,'-d:-t:',['targetdb=','targettbl='])for opt_name, opt_value inoptions:if opt_name in('-d','--targetdb'):
            target_database = opt_value
        if opt_name in('-t','--targettbl'):
            target_table = opt_value

    generate_json(target_database, target_table)if __name__ =='__main__':main(sys.argv[1:])

在~/bin目录下创建gen_export_config.sh脚本
[atguigu@hadoop102 bin]$ vim ~/bin/gen_export_config.sh
脚本内容如下。

#!/bin/bash

python ~/bin/gen_export_config.py -d gmall_report -t ads_activity_stats
python ~/bin/gen_export_config.py -d gmall_report -t ads_coupon_stats
python ~/bin/gen_export_config.py -d gmall_report -t ads_new_buyer_stats
python ~/bin/gen_export_config.py -d gmall_report -t ads_order_by_province
python ~/bin/gen_export_config.py -d gmall_report -t ads_page_path
python ~/bin/gen_export_config.py -d gmall_report -t ads_repeat_purchase_by_tm
python ~/bin/gen_export_config.py -d gmall_report -t ads_sku_cart_num_top3_by_cate
python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats
python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats_by_cate
python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats_by_tm
python ~/bin/gen_export_config.py -d gmall_report -t ads_traffic_stats_by_channel
python ~/bin/gen_export_config.py -d gmall_report -t ads_user_action
python ~/bin/gen_export_config.py -d gmall_report -t ads_user_change
python ~/bin/gen_export_config.py -d gmall_report -t ads_user_retention
python ~/bin/gen_export_config.py -d gmall_report -t ads_user_stats

3)为gen_export_config.sh脚本增加执行权限
[atguigu@hadoop102 bin]$ chmod +x ~/bin/gen_export_config.sh
4)执行gen_export_config.sh脚本,生成配置文件
[atguigu@hadoop102 bin]$ gen_export_config.sh
5)观察生成的配置文件

[atguigu@hadoop102 bin]$ ls /opt/module/datax/job/export/

编写每日导出脚本
(1)在hadoop102的/home/atguigu/bin目录下创建hdfs_to_mysql.sh
[atguigu@hadoop102 bin]$ vim hdfs_to_mysql.sh
(2)编写如下内容


#!/bin/bash

DATAX_HOME=/opt/module/datax

#DataX导出路径不允许存在空文件,该函数作用为清理空文件
handle_export_path(){
  target_file=$1
  for i in`hadoop fs -ls -R $target_file | awk '{print $8}'`;do
    hadoop fs -test -z $i
    if[[ $?-eq 0]]; then
      echo "$i文件大小为0,正在删除"
      hadoop fs -rm -r -f $i
    fi
  done

}

#数据导出
export_data(){
  datax_config=$1
  export_dir=$2
  hadoop fs -test -e $export_dir
  if[[ $?-eq 0]]
  then
    handle_export_path $export_dir
    file_count=$(hadoop fs -ls $export_dir | wc -l)if[ $file_count -gt 0]
    then
      set -e;
      $DATAX_HOME/bin/datax.py -p"-Dexportdir=$export_dir" $datax_config
      set +e;else 
      echo "$export_dir 目录为空,跳过~"
    fi
  else
    echo "路径 $export_dir 不存在,跳过~"
  fi
}case $1 in"ads_new_buyer_stats")
    export_data /opt/module/datax/job/export/gmall_report.ads_new_buyer_stats.json /warehouse/gmall/ads/ads_new_buyer_stats
  ;;"ads_order_by_province")
    export_data /opt/module/datax/job/export/gmall_report.ads_order_by_province.json /warehouse/gmall/ads/ads_order_by_province
  ;;"ads_page_path")
    export_data /opt/module/datax/job/export/gmall_report.ads_page_path.json /warehouse/gmall/ads/ads_page_path
  ;;"ads_repeat_purchase_by_tm")
    export_data /opt/module/datax/job/export/gmall_report.ads_repeat_purchase_by_tm.json /warehouse/gmall/ads/ads_repeat_purchase_by_tm
  ;;"ads_trade_stats")
    export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats.json /warehouse/gmall/ads/ads_trade_stats
  ;;"ads_trade_stats_by_cate")
    export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_cate.json /warehouse/gmall/ads/ads_trade_stats_by_cate
  ;;"ads_trade_stats_by_tm")
    export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_tm.json /warehouse/gmall/ads/ads_trade_stats_by_tm
  ;;"ads_traffic_stats_by_channel")
    export_data /opt/module/datax/job/export/gmall_report.ads_traffic_stats_by_channel.json /warehouse/gmall/ads/ads_traffic_stats_by_channel
  ;;"ads_user_action")
    export_data /opt/module/datax/job/export/gmall_report.ads_user_action.json /warehouse/gmall/ads/ads_user_action
  ;;"ads_user_change")
    export_data /opt/module/datax/job/export/gmall_report.ads_user_change.json /warehouse/gmall/ads/ads_user_change
  ;;"ads_user_retention")
    export_data /opt/module/datax/job/export/gmall_report.ads_user_retention.json /warehouse/gmall/ads/ads_user_retention
  ;;"ads_user_stats")
    export_data /opt/module/datax/job/export/gmall_report.ads_user_stats.json /warehouse/gmall/ads/ads_user_stats
  ;;"ads_activity_stats")
    export_data /opt/module/datax/job/export/gmall_report.ads_activity_stats.json /warehouse/gmall/ads/ads_activity_stats
  ;;"ads_coupon_stats")
    export_data /opt/module/datax/job/export/gmall_report.ads_coupon_stats.json /warehouse/gmall/ads/ads_coupon_stats
  ;;"ads_sku_cart_num_top3_by_cate")
    export_data /opt/module/datax/job/export/gmall_report.ads_sku_cart_num_top3_by_cate.json /warehouse/gmall/ads/ads_sku_cart_num_top3_by_cate
  ;;"all")
  export_data /opt/module/datax/job/export/gmall_report.ads_new_buyer_stats.json /warehouse/gmall/ads/ads_new_buyer_stats
  export_data /opt/module/datax/job/export/gmall_report.ads_order_by_province.json /warehouse/gmall/ads/ads_order_by_province
  export_data /opt/module/datax/job/export/gmall_report.ads_page_path.json /warehouse/gmall/ads/ads_page_path
  export_data /opt/module/datax/job/export/gmall_report.ads_repeat_purchase_by_tm.json /warehouse/gmall/ads/ads_repeat_purchase_by_tm
  export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats.json /warehouse/gmall/ads/ads_trade_stats
  export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_cate.json /warehouse/gmall/ads/ads_trade_stats_by_cate
  export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_tm.json /warehouse/gmall/ads/ads_trade_stats_by_tm
  export_data /opt/module/datax/job/export/gmall_report.ads_traffic_stats_by_channel.json /warehouse/gmall/ads/ads_traffic_stats_by_channel
  export_data /opt/module/datax/job/export/gmall_report.ads_user_action.json /warehouse/gmall/ads/ads_user_action
  export_data /opt/module/datax/job/export/gmall_report.ads_user_change.json /warehouse/gmall/ads/ads_user_change
  export_data /opt/module/datax/job/export/gmall_report.ads_user_retention.json /warehouse/gmall/ads/ads_user_retention
  export_data /opt/module/datax/job/export/gmall_report.ads_user_stats.json /warehouse/gmall/ads/ads_user_stats
  export_data /opt/module/datax/job/export/gmall_report.ads_activity_stats.json /warehouse/gmall/ads/ads_activity_stats
  export_data /opt/module/datax/job/export/gmall_report.ads_coupon_stats.json /warehouse/gmall/ads/ads_coupon_stats
  export_data /opt/module/datax/job/export/gmall_report.ads_sku_cart_num_top3_by_cate.json /warehouse/gmall/ads/ads_sku_cart_num_top3_by_cate
  ;;
esac

(3)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod +x hdfs_to_mysql.sh
(4)脚本用法
[atguigu@hadoop102 bin]$ hdfs_to_mysql.sh all

标签: hive mysql hadoop

本文转载自: https://blog.csdn.net/qq_45972323/article/details/137964403
版权归原作者 大数据驱动AI 所有, 如有侵权,请联系我们删除。

“离线数仓数据导出-hive数据同步到mysql”的评论:

还没有评论