0


大数据比对,shell脚本与hive技术结合

需求描述

从主机中获取加密数据内容,解密数据内容(可能会存在json解析)插入到另一个库中,比对原始库和新库的相同表数据的数据一致性内容。

数据一致性比对实现

上亿条数据,如何比对并发现两个表数据差异

相关流程

从其他主机获取大批量数据内容文件(zip格式)–>针对大批量数据文件进行解密、解压输出–>在新库里创建对应的比对表–>将解压、解密的文件内容直接入到hdfs路径上,并刷新分区–>写出比对脚本–>参考多进程跑多个脚本内容输出相关多个日志

相关脚本内容

获取批量数据文件内容

#!/bin/bash# https://blog.csdn.net/axing2015/article/details/89313460# SFTP:10.230.105.47/48/49,用户密码线下提供# 存量数据:     /data1/etl/csv/省拼音/批次/表名.csv# 增量数据:     /data1/etl/stream/省拼音/批次/表名# 加载工具:     /data1/etl/tool# 已经有人帮我下载下来了,所有没必要去下载了data_home=/data1/etl/csv/省拼音/批次/
sftp_path=/data1/etl/csv/heilongjiang/0/

sftp_ip=xxxx
sftp_user=xxxx
sftp_passwd=xxx
sftp_port=22file_name=*.csv

mkdir -p ${data_home}

lftp -u ${sftp_user},${sftp_passwd} sftp://${sftp_ip}:${sftp_port}<<!#关于ftp地址切换的命令 是在本地主机目录操作的命令 把东西下载到指定的本地目录
lcd ${data_home}cd${sftp_path}# 下载多个文件
mget ${file_name}
bye
!cd${data_home}# 创建hive表结构# 将文件入到hive中去sh putCsvLoadHive.sh

解密脚本(传输的大文件数据是加密的,需要解密)

#!/bin/bash# 将传过来的增量文件进行解密beginTime=$(date +%s)if[$# -eq 0];thenecho"没有传参数进来,请输入时间参数"exitfisource_path="/data0/e3base/wangsw_a/js_shell/sftp_file/stock_data"decrypt_path="/data0/e3base/wangsw_a/js_shell/sftp_file/decrypt_data"password="e3base1"do_tran_path="/data0/e3base/do_trans/"cd"$do_tran_path"forfilein"$source_path"/*.des3;doif[ -f "$file"];thenfilename=$(basename"$file")filename_without_ext="${filename%.*}"decrypted_file="$decrypt_path/$filename_without_ext"

        ./dzip -pwd "$password" -unzip "$file""$decrypted_file"if[ -f "$decrypted_file"];thenecho"解密成功: $decrypted_file"elseecho"解密失败: $file"fifidoneendTime=$(date +%s)executionTime=$((endTime - beginTime))echo"脚本执行时间:$executionTime秒"

创建hive表

#!/bin/bashbeginTime=$(date +%s)sql="
drop database if exists radmcsdb_restore_ah cascade;
create database radmcsdb_restore_ah;
-- ac_contract_info
create  table radmcsdb_restore_ah.oracle_ac_contract_info_ah(
\`ACCOUNT_LIMIT\` string,
\`ACCOUNT_TYPE\` string,
\`CONTRACTATT_TYPE\` string,
\`CONTRACT_NAME\` string,
\`CONTRACT_NAME_ENCRYPT\` string,
\`CONTRACT_NO\` string,
\`CONTRACT_PASSWD\` string,
\`CUST_ID\` string,
\`FINISH_FLAG\` string,
\`GROUP_ID\` string,
\`LOGIN_ACCEPT\` string,
\`LOGIN_NO\` string,
\`OP_CODE\` string,
\`OP_TIME\` string,
\`PAY_CODE\` string,
\`REPRESENT_PHONE\` string,
\`STATUS_CODE\` string,
\`STATUS_TIME\` string
)
PARTITIONED BY ( \`pt_day_time\` string)
row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
with serdeproperties (
    'separatorChar' = ',',   
    'quoteChar' = '\\\"',
    'escapeChar' = '\\\\'
) 
stored as textfile tblproperties("skip.header.line.count"="1");
-- location 'hdfs://drmcluster/apps/hive/warehouse/radmcsdb_restore_ah.db/oracle_ac_contract_info_ah';

--- 差异表
create table radmcsdb_restore_ah.ac_contract_info_ah_diff(
\`oracle_ah_ACCOUNT_LIMIT\` string,
\`oracle_ah_ACCOUNT_TYPE\` string,
\`oracle_ah_CONTRACTATT_TYPE\` string,
\`oracle_ah_CONTRACT_NAME\` string,
\`oracle_ah_CONTRACT_NAME_ENCRYPT\` string,
\`oracle_ah_CONTRACT_NO\` string,
\`oracle_ah_CONTRACT_PASSWD\` string,
\`oracle_ah_CUST_ID\` string,
\`oracle_ah_FINISH_FLAG\` string,
\`oracle_ah_GROUP_ID\` string,
\`oracle_ah_LOGIN_ACCEPT\` string,
\`oracle_ah_LOGIN_NO\` string,
\`oracle_ah_OP_CODE\` string,
\`oracle_ah_OP_TIME\` string,
\`oracle_ah_PAY_CODE\` string,
\`oracle_ah_REPRESENT_PHONE\` string,
\`oracle_ah_STATUS_CODE\` string,
\`oracle_ah_STATUS_TIME\` string,
\`oracle_ah_pt_day_time\` string,
\`restore_ah_ACCOUNT_LIMIT\` string,
\`restore_ah_ACCOUNT_TYPE\` string,
\`restore_ah_CONTRACTATT_TYPE\` string,
\`restore_ah_CONTRACT_NAME\` string,
\`restore_ah_CONTRACT_NAME_ENCRYPT\` string,
\`restore_ah_CONTRACT_NO\` string,
\`restore_ah_CONTRACT_PASSWD\` string,
\`restore_ah_CUST_ID\` string,
\`restore_ah_FINISH_FLAG\` string,
\`restore_ah_GROUP_ID\` string,
\`restore_ah_LOGIN_ACCEPT\` string,
\`restore_ah_LOGIN_NO\` string,
\`restore_ah_OP_CODE\` string,
\`restore_ah_OP_TIME\` string,
\`restore_ah_PAY_CODE\` string,
\`restore_ah_REPRESENT_PHONE\` string,
\`restore_ah_STATUS_CODE\` string,
\`restore_ah_STATUS_TIME\` string
)
PARTITIONED BY ( \`pt_day_time\` string)
row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
with serdeproperties (
    'separatorChar' = ',',   
    'quoteChar' = '\\\"',
    'escapeChar' = '\\\\'
) 
stored as textfile tblproperties("skip.header.line.count"="1");
-- location 'hdfs://drmcluster/apps/hive/warehouse/radmcsdb_restore_ah.db/ac_contract_info_ah_diff';

-- 主键文件表
create table radmcsdb_restore_ah.ac_contract_info_ah_primary(
\`primary_key_contract_no\` string
)
partitioned by ( \`pt_day_time\` string)
row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
with serdeproperties (
    'separatorChar' = ',',   
    'quoteChar' = '\\\"',
    'escapeChar' = '\\\\'
) 
stored as textfile tblproperties("skip.header.line.count"="1");
-- location 'hdfs://drmcluster/apps/hive/warehouse/radmcsdb_restore_ah.db/ac_contract_info_ah_primary';

..............
"echo"${sql}!quit"| beeline -u 'jdbc:hive2://G034:11001,G035:11001,G036:11001/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi' -n e3base

echo"结束时间 $endTimeYMD"endTime=$(date +%s)endTimeYMD=$(date +%Y%m%d%H%M%S)echo"结束时间 $endTimeYMD"echo"还原层表已创建完,请进行总共21个表核对,总共耗时:'$(($endTime - $beginTime))'秒"

解密文件入hive存储地址,并刷新分区

#!/bin/bash# 将省端数据文件传入hive 并刷新分区。beginTime=$(date +%s)if[$# -eq 0];thenecho"没有传参数进来,请输入省份参数"exitfiif[$# -eq 1];thenecho"请确认是否输入省份和时间参数"exitfi# hive_url="beeline -u 'jdbc:hive2://G034:11001,G035:11001,G036:11001/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi' -n e3base"# hive_url_e="$hive_url --silent=true --showHeader=false --outputformat=dsv -e"data_home=/data1/etl/csv/$1for((i =0; i <21; i++));do{cd${data_home}/$isql=""ls *.csv |{whileread t1;do# 删除第一行# sed -i '1d' $t1# 判断是否是数字if[[$t1== *[0-9]* ]];then# PD_USERPRC_INFO_00.csvname2=oracle_${t1%_*}_ah
                    # 大写边小写name3=${name2,,}elsename2=oracle_${t1%.*}_ah
                    name3=${name2,,}fi# partition_primary=$($hive_url_e "show partitions radmcsdb_restore_ah.$name3;" | sort | tail -n 1)# partition_primary=$($hive_url_e "show partitions radmcsdb_restore_ah.oracle_ac_contract_info_ah;" | sort | tail -n 1)# hdfs dfs -rm -f hdfs://drmcluster/apps/hive/warehouse/radmcsdb_restore_ah.db/$name3/$partition_primaryif[$i==0];then
                    hdfs dfs -mkdir hdfs://drmcluster/apps/hive/warehouse/radmcsdb_restore_ah.db/$name3/pt_day_time=$2fi
                hdfs dfs -put $t1 hdfs://drmcluster/apps/hive/warehouse/radmcsdb_restore_ah.db/$name3/pt_day_time=$2/ && hdfs dfs -mv hdfs://drmcluster/apps/hive/warehouse/radmcsdb_restore_ah.db/$name3/pt_day_time=$2/$t1 hdfs://drmcluster/apps/hive/warehouse/radmcsdb_restore_ah.db/$name3/pt_day_time=$2/${i}_${t1}# 判断其是否是最后一个文件夹if[$i==20];thensql="alter table radmcsdb_restore_ah.$name3 add if not exists partition (pt_day_time=$2);$sql"fidoneif[$i==20];thenecho"${sql}!quit"| beeline -u 'jdbc:hive2://G034:11001,G035:11001,G036:11001/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi' -n e3base
            fi}}done# wait关键字确保每一个子进程都执行完成# waitendTime=$(date +%s)echo"将数据上传hdfs,总共耗时:'$(($endTime - $beginTime))'秒"

比较数据一致性

#!/bin/bash# 逻辑:插入并生成差异文件,提取差异文件,并将差异主键进行输出到指定目录。# diff_table_sh# 字符串切割 https://blog.csdn.net/bandaoyu/article/details/120659630# 获取最该目录下最新的文件的数据# https://blog.csdn.net/sh13661847134/article/details/113757792# 第一部分获取分区# 预支前提beginTime=$(date +%s)beginTimeYMD=$(date +%Y%m%d%H%M%S)hive_url="beeline -u 'jdbc:hive2://G034:11001,G035:11001,G036:11001/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi' -n e3base"hive_url_e="$hive_url --silent=true --showHeader=false --outputformat=dsv -e"# 第一部分获取分区join="full outer join"partition_orcle=$($hive_url_e "show partitions radmcsdb_restore_ah.oracle_ac_account_rel_ah;"|sort|tail -n 1)count_num=$($hive_url_e "select count(distinct pt_day_time) as count_num from radmcsdb_restore_ah.oracle_ac_account_rel_ah;")if[$count_num -gt 1];thenjoin="inner join"fi# 第二部分执行sqlsql="

insert overwrite table radmcsdb_restore_ah.ac_account_rel_ah_diff partition(pt_day_time)
 select left_table.*,
       right_table.*,
       from_unixtime(unix_timestamp(),'yyyyMMddHHmmss') as pt_day_time
  from (
        select *
          from radmcsdb_restore_ah.oracle_ac_account_rel_ah where $partition_orcle
       )left_table
  $join (
        select *
          from restore_ah.ac_account_rel
       )right_table
    on (left_table.CONTRACT_NO = right_table.CONTRACT_NO
    and left_table.ACCT_REL_TYPE = right_table.ACCT_REL_TYPE
    and left_table.REL_CONTRACT_NO = right_table.REL_CONTRACT_NO
    and left_table.ACCT_ITEMS = right_table.ACCT_ITEMS
    )
  where
COALESCE(left_table.login_accept, '0')  <> COALESCE(right_table.login_accept, '0') or
COALESCE(left_table.contract_no, '0')  <> COALESCE(right_table.contract_no, '0') or
COALESCE(left_table.rel_contract_no, '0')  <> COALESCE(right_table.rel_contract_no, '0') or
COALESCE(left_table.acct_rel_type, '0')  <> COALESCE(right_table.acct_rel_type, '0') or
COALESCE(left_table.acct_items, '0')  <> COALESCE(right_table.acct_items, '0') or
COALESCE(left_table.pay_value, '0')  <> COALESCE(right_table.pay_value, '0') or
COALESCE(left_table.pay_pri, '0')  <> COALESCE(right_table.pay_pri, '0') or
-- COALESCE(left_table.eff_date, '0')  <> COALESCE(right_table.eff_date, '0') or
-- COALESCE(left_table.exp_date, '0')  <> COALESCE(right_table.exp_date, '0') or
COALESCE(left_table.login_no, '0')  <> COALESCE(right_table.login_no, '0') or
-- COALESCE(left_table.op_time, '0')  <> COALESCE(right_table.op_time, '0') or
COALESCE(left_table.remark, '0')  <> COALESCE(right_table.remark, '0') or
COALESCE(left_table.busi_type, '0')  <> COALESCE(right_table.busi_type, '0');

insert overwrite table radmcsdb_restore_ah.ac_account_rel_ah_primary partition(pt_day_time)
select
oracle_ah_CONTRACT_NO,
oracle_ah_ACCT_REL_TYPE,
oracle_ah_REL_CONTRACT_NO,
oracle_ah_ACCT_ITEMS,
from_unixtime(unix_timestamp(),'yyyyMMddHHmmss') as pt_day_time
from (
select 
oracle_ah_CONTRACT_NO,
oracle_ah_ACCT_REL_TYPE,
oracle_ah_REL_CONTRACT_NO,
oracle_ah_ACCT_ITEMS
from radmcsdb_restore_ah.ac_account_rel_ah_diff  
where 
oracle_ah_CONTRACT_NO is not null 
and oracle_ah_ACCT_REL_TYPE is not null 
and oracle_ah_REL_CONTRACT_NO is not null 
and oracle_ah_ACCT_ITEMS is not null 
and pt_day_time=(select max(pt_day_time) from radmcsdb_restore_ah.ac_account_rel_ah_diff) 

union

select 
restore_ah_CONTRACT_NO,
restore_ah_ACCT_REL_TYPE,
restore_ah_REL_CONTRACT_NO,
restore_ah_ACCT_ITEMS
from radmcsdb_restore_ah.ac_account_rel_ah_diff 
where 
restore_ah_SERV_ACCT_ID is not null 
and restore_ah_ACCT_REL_TYPE is not null 
and restore_ah_REL_CONTRACT_NO is not null 
and restore_ah_ACCT_ITEMS is not null 
and pt_day_time=(select max(pt_day_time) from radmcsdb_restore_ah.ac_account_rel_ah_diff)) a;
"echo"${sql}!quit"|$hive_url# 第三部分获取主键文件partition_primary=$($hive_url_e "show partitions radmcsdb_restore_ah.ac_account_rel_ah_primary;"|sort|tail -n 1)pt_day_time=${partition_primary:12}$hive_url_e"set hive.cli.print.header=true; select * from radmcsdb_restore_ah.ac_account_rel_ah_primary where $partition_primary;"|grep -v "WARN">ac_account_rel_ah_primary_$pt_day_time.csv
# 针对主键文件进行传输到指定位置# 第四部分输出主键数量count_primary=$($hive_url_e "select count(1) from radmcsdb_restore_ah.ac_account_rel_ah_primary where $partition_primary;")echo"差异主键数量还剩:$count_primary"# 第五部分输出比对数据sql="select left_table.pv,right_table.pv as pv_diff
  from (
        select count(1) as pv
          from radmcsdb_restore_ah.oracle_ac_account_rel_ah
       )left_table
  left outer join (
        select count(1) as pv
          from restore_ah.ac_account_rel
       )right_table
    on 1=1;"# 将换行转换层空格,方面sql美观度sql_text="$(echo $sql |tr'\n'' ')"count_compare=$($hive_url_e "$sql_text")# 第六部分输出主键数量count_primary=$($hive_url_e "select count(1) from radmcsdb_restore_ah.ac_account_rel_ah_primary where $partition_primary;")# 结束标语endTime=$(date +%s)endTimeYMD=$(date +%Y%m%d%H%M%S)echo"打印开始时间:$beginTimeYMD"echo"ac_account_rel比较量级自己省端和还原层:$count_compare"echo"差异主键数量还剩:$count_primary"echo"打印结束时间:$endTimeYMD"echo"ac_account_rel_ah_diff 异常表执行完成 ,开始时间:$beginTimeYMD,结束时间:$endTimeYMD,总共耗时:'$(($endTime - $beginTime))'秒"echo"ac_account_rel_ah_diff 异常表执行完成"

多进程跑脚本输出日志

运行就nohup 这个主脚本即可

#!/bin/bashif[$# -eq 0];thenecho"没有传参数进来,请输入时间参数"exitfihive_url="beeline -u 'jdbc:hive2://G034:11001,G035:11001,G036:11001/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi' -n e3base"hive_url_e="$hive_url --silent=true --showHeader=false --outputformat=dsv -e"beginTime=$(date +%s)beginTimeYMD=$(date +%Y%m%d%H%M%S)mkdir ./finalLog

# 第一张表partition_orcle=$($hive_url_e "show partitions radmcsdb.oracle_cs_conuserrel_info_hlj;"|sort|tail -n 1)count_orcle=$($hive_url_e "select count(1) from radmcsdb.oracle_cs_conuserrel_info where $partition_orcle;")partition_primary=$($hive_url_e "show partitions radmcsdb.cs_conuserrel_info_primary;"|sort|tail -n 1)count_primary=$($hive_url_e "select count(1) from radmcsdb.cs_conuserrel_info_primary where $partition_primary;")echo"cs_conuserrel_info 表数量:$count_orcle,差异主键数量还剩:$count_primary"# 第二十一张表partition_orcle=$($hive_url_e "show partitions radmcsdb.oracle_ep_organization_hlj;"|sort|tail -n 1)count_orcle=$($hive_url_e "select count(1) from radmcsdb.oracle_ct_custcontact_info_hlj where $partition_orcle;")partition_primary=$($hive_url_e "show partitions radmcsdb.ep_organization_hlj_primary;"|sort|tail -n 1)count_primary=$($hive_url_e "select count(1) from radmcsdb.ep_organization_hlj_primary where $partition_primary;")echo"ep_organization 数量:$count_orcle,差异主键数量还剩:$count_primary"endTime=$(date +%s)endTimeYMD=$(date +%Y%m%d%H%M%S)echo"结束时间 $endTimeYMD"echo"多个表执行完成,总共耗时:'$(($endTime - $beginTime))'秒"

可能存在将json 数据解析 重新输出成 csv文件

#!/bin/bash# 将json 数据 解析成 csv文件# 遍历该下面的所有文件# https://blog.csdn.net/qq_36836950/article/details/131063485# https://blog.csdn.net/weixin_45842494/article/details/123943756# https://blog.csdn.net/qq_38250124/article/details/86554834# https://www.cnblogs.com/bymo/p/7601519.html# https://blog.csdn.net/weixin_44056331/article/details/102411008# 预支前提beginTime=$(date +%s)beginTimeYMD=$(date +%Y%m%d%H%M%S)hive_url="beeline  -u 'jdbc:hive2://iot-e3base06:11001,iot-e3base07:11001,iot-e3base08:11001/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk'  -n dwdapp"hive_url_e="$hive_url --silent=true --showHeader=false --outputformat=dsv -e"# 从文件里面获取所有需要的表名 赋值一个数组# 根据数组进行循环遍历# 第一步根据表名获取列名partition_stock=$($hive_url_e "desc dwdb.ods_soa_mft_bill_d;")# $0 是指文本中的第一列 print arr[1] 输出第一列中的所有值    for(i in arr) print arr[i] 输出每一列的分割的值partition_stock1=$(echo"$partition_stock"|awk'{split($0, arr, "|"); print arr[1]}')# 这里是将列转行 并输出为数组的形式arr=(${partition_stock1//\\n/})# 变量拼接值data_txt=''# 读取文件的每一行whileread -r line;do# jq -r 是英文字符串输出出来会有双引号,-r 可以消除# 用jq 插件的对象的方式去获取forsin${arr[@]};doif[[$s!='pt_day']];then# 字符拼接if[[$data_txt=='']];thendata_txt="$(echo $line | jq -r ".COLUMNINFO.$s")"elsedata_txt="$data_txt,$(echo $line | jq -r ".COLUMNINFO.$s")"fielse# 输出内容到指定路径echo$data_txt>>'AC_CONTRACTADD_INFO_JSON.csv'breakfidone# 需要解析的文件done<ods_soa_mft_bill_d.csv

其他-scp脚本解密

#/user/bin/expect# Expect是一个免费的编程工具语言,用来实现自动和交互式任务进行通信,而无需人的干预。# 首行/usr/bin/expect,声明使用except组件,类似/bin/sh用法# spawn: spawn + 需要执行的shell命令# expect: 只有spawn执行的命令结果才会被expect捕捉到,因为spawn会启动一个进程,只有这个进程的相关信息才会被捕捉到,主要包括:标准输入的提示信息,eof和timeout。# send和send_user:send会将expect脚本中需要的信息发送给spawn启动的那个进程,而send_user只是回显用户发出的信息,类似于shell中的echo而已。
spawn scp -r /data0/e3base/wangsw_a/sftp_file/hlj/ e3base@G030:/data1/hlj/stock_data/
expect"*password:"
send "E3base_12#34\n"expect eof
标签: 大数据 hive hadoop

本文转载自: https://blog.csdn.net/weixin_45969142/article/details/142914358
版权归原作者 专注学习java 所有, 如有侵权,请联系我们删除。

“大数据比对,shell脚本与hive技术结合”的评论:

还没有评论