文章目录
一、全量同步
数据流转:MySQL ---------Sqoop脚本---------》 HDFS
1.1 梳理需要全量同步的业务表
序号表名表中文名1sku_info库存单元表2base_category1一级分类表3base_category2二级分类表4base_category3三级分类表5base_province省份表6base_trademark品牌表7spu_info商品表8favor_info商品收藏表9cart_info购物车表10coupon_info优惠券表11activity_info活动表12activity_rule优惠规则13base_dic字典表14sku_attr_valuesku平台属性值关联表15sku_sale_attr_valuesku销售属性值16base_region地区表17user_info用户表18order_info订单表19coupon_use优惠券领用表20order_status_log订单状态日志表21order_detail订单明细表22payment_info支付信息表23comment_info商品评论表24order_refund_info退单表25order_detail_activity订单明细活动表26order_detail_coupon订单明细购物券表27refund_payment退款信息表
1.2 Sqoop: MySQL To HDFS
1.2.1 开发脚本
脚本放在/home/hadoop/bin目录下的原因:该脚本是给普通用户hadoop使用的,在普通用户hadoop登陆的情况下执行echo $PATH命令,发现/home/hadoop/bin在其中,表示普通用户hadoop可以任意地方使用该目录下的可执行文件。
[hadoop@hadoop102 ~]$ cd /home/hadoop/
[hadoop@hadoop102 ~]$ mkdir bin
[hadoop@hadoop102 ~]$ vim /home/hadoop/bin/mysql_to_hdfs_full.sh
脚本内容:
–delete-target-dir:如果目标文件夹存在,先删除后插入。
#! /bin/bashif[-n"$2"];thendo_date=$2elsedo_date=`date-d'-1 day' +%F`fiimport_data(){
/opt/module/sqoop-1.4.6.bin__hadoop-2.0.4-alpha/bin/sqoop import\--connect jdbc:mysql://mall:3306/gmall \--username root \--password123456\
--target-dir /warehouse/db/gmall/$1_full/$do_date\
--delete-target-dir \--query"$2 and \$CONDITIONS"\
--num-mappers 1\
--fields-terminated-by '\t'\--compress\
--compression-codec gzip\
--null-string '\\N'\
--null-non-string '\\N'}import_sku_info(){
import_data "sku_info""select
id,
spu_id,
price,
sku_name,
sku_desc,
weight,
tm_id,
category3_id,
is_sale,
create_time
from sku_info where 1=1"}import_base_category1(){
import_data "base_category1""select
id,
name
from base_category1 where 1=1"}import_base_category2(){
import_data "base_category2""select
id,
name,
category1_id
from base_category2 where 1=1"}import_base_category3(){
import_data "base_category3""select
id,
name,
category2_id
from base_category3 where 1=1"}import_base_province(){
import_data "base_province""select
id,
name,
region_id,
area_code,
iso_code,
iso_3166_2
from base_province
where 1=1"}import_base_region(){
import_data "base_region""select
id,
region_name
from base_region
where 1=1"}import_base_trademark(){
import_data "base_trademark""select
id,
tm_name
from base_trademark
where 1=1"}import_spu_info(){
import_data "spu_info""select
id,
spu_name,
category3_id,
tm_id
from spu_info
where 1=1"}import_favor_info(){
import_data "favor_info""select
id,
user_id,
sku_id,
spu_id,
is_cancel,
create_time,
cancel_time
from favor_info
where 1=1"}import_cart_info(){
import_data "cart_info""select
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
create_time,
operate_time,
is_ordered,
order_time,
source_type,
source_id
from cart_info
where 1=1"}import_coupon_info(){
import_data "coupon_info""select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
limit_num,
taken_count,
start_time,
end_time,
operate_time,
expire_time
from coupon_info
where 1=1"}import_activity_info(){
import_data "activity_info""select
id,
activity_name,
activity_type,
start_time,
end_time,
create_time,
activity_desc
from activity_info
where 1=1"}import_activity_rule(){
import_data "activity_rule""select
id,
activity_id,
activity_type,
condition_amount,
condition_num,
benefit_amount,
benefit_discount,
benefit_level
from activity_rule
where 1=1"}import_base_dic(){
import_data "base_dic""select
dic_code,
dic_name,
parent_code,
create_time,
operate_time
from base_dic
where 1=1"}import_sku_attr_value(){
import_data "sku_attr_value""select
id,
attr_id,
value_id,
sku_id,
attr_name,
value_name
from sku_attr_value
where 1=1"}import_sku_sale_attr_value(){
import_data "sku_sale_attr_value""select
id,
sku_id,
spu_id,
sale_attr_value_id,
sale_attr_id,
sale_attr_name,
sale_attr_value_name
from sku_sale_attr_value
where 1=1"}import_user_info(){
import_data "user_info""select
id,
login_name,
nick_name,
passwd,
name,
phone_num,
email,
head_img,
user_level,
birthday,
gender,
create_time,
operate_time,
status
from user_info
where 1=1"}import_order_info(){
import_data "order_info""select
id,
consignee,
consignee_tel,
total_amount,
order_status,
user_id,
payment_way,
delivery_address,
order_comment,
out_trade_no,
trade_body,
create_time,
operate_time,
expire_time,
process_status,
tracking_no,
parent_order_id,
img_url,
province_id,
activity_reduce_amount,
coupon_reduce_amount,
original_total_amount,
feight_fee,
feight_fee_reduce,
refundable_time
from order_info
where 1=1"}import_coupon_use(){
import_data "coupon_use""select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time,
expire_time
from coupon_use
where 1=1"}import_order_status_log(){
import_data "order_status_log""select
id,
order_id,
order_status,
operate_time
from order_status_log
where 1=1"}import_order_detail(){
import_data "order_detail""select
id,
order_id,
sku_id,
sku_name,
img_url,
order_price,
sku_num,
create_time,
source_type,
source_id,
split_total_amount,
split_activity_amount,
split_coupon_amount
from order_detail
where 1=1"}import_payment_info(){
import_data "payment_info""select
id,
out_trade_no,
order_id,
user_id,
payment_type,
trade_no,
total_amount,
subject,
payment_status,
create_time,
callback_time,
callback_content
from payment_info
where 1=1"}import_comment_info(){
import_data "comment_info""select
id,
user_id,
nick_name,
head_img,
sku_id,
spu_id,
order_id,
appraise,
comment_txt,
create_time,
operate_time
from comment_info
where 1=1"}import_order_refund_info(){
import_data "order_refund_info""select
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
refund_reason_txt,
refund_status,
create_time
from order_refund_info
where 1=1"}import_order_detail_activity(){
import_data "order_detail_activity""select
id,
order_id,
order_detail_id,
activity_id,
activity_rule_id,
sku_id,
create_time
from order_detail_activity
where 1=1"}import_order_detail_coupon(){
import_data "order_detail_coupon""select
id,
order_id,
order_detail_id,
coupon_id,
coupon_use_id,
sku_id,
create_time
from order_detail_coupon
where 1=1"}import_refund_payment(){
import_data "refund_payment""select
id,
out_trade_no,
order_id,
sku_id,
payment_type,
trade_no,
total_amount,
subject,
refund_status,
create_time,
callback_time,
callback_content
from refund_payment
where 1=1"}case$1in"sku_info")
import_sku_info
;;"base_category1")
import_base_category1
;;"base_category2")
import_base_category2
;;"base_category3")
import_base_category3
;;"base_province")
import_base_province
;;"base_region")
import_base_region
;;"base_trademark")
import_base_trademark
;;"spu_info")
import_spu_info
;;"favor_info")
import_favor_info
;;"cart_info")
import_cart_info
;;"coupon_info")
import_coupon_info
;;"activity_info")
import_activity_info
;;"activity_rule")
import_activity_rule
;;"base_dic")
import_base_dic
;;"sku_attr_value")
import_sku_attr_value
;;"sku_sale_attr_value")
import_sku_sale_attr_value
;;"user_info")
import_user_info
;;"order_info")
import_order_info
;;"coupon_use")
import_coupon_use
;;"order_status_log")
import_order_status_log
;;"order_detail")
import_order_detail
;;"payment_info")
import_payment_info
;;"comment_info")
import_comment_info
;;"order_refund_info")
import_order_refund_info
;;"order_detail_activity")
import_order_detail_activity
;;"order_detail_coupon")
import_order_detail_coupon
;;"refund_payment")
import_refund_payment
;;"all")
import_sku_info
import_base_category1
import_base_category2
import_base_category3
import_base_province
import_base_region
import_base_trademark
import_spu_info
import_favor_info
import_cart_info
import_coupon_info
import_activity_info
import_activity_rule
import_base_dic
import_sku_attr_value
import_sku_sale_attr_value
import_user_info
import_order_info
import_coupon_use
import_order_status_log
import_order_detail
import_payment_info
import_comment_info
import_order_refund_info
import_order_detail_activity
import_order_detail_coupon
import_refund_payment
;;esac
1.2.2 授予脚本执行权限
[hadoop@hadoop102 bin]$ chmod +x mysql_to_hdfs_full.sh
1.2.3 执行脚本
每天凌晨执行不应该带上日期参数。
[hadoop@hadoop102 ~]$ mysql_to_hdfs_full.sh all 2023-12-02
1.2.4 定时调度
凌晨执行,拉取MySQL全量数据放入HDFS昨天分区。
二、增量同步
数据流转:MySQL ---------Maxwell监控程序--------》 Kafka ----------Flume监控程序---------》 HDFS
怎么处理删除和更新??????
2.1 梳理需要增量同步的业务表
序号表名表中文名1order_info订单表2coupon_use优惠券领用表3order_status_log订单状态日志表4user_info用户信息表5order_detail订单明细表6payment_info支付信息表7comment_info商品评论表8order_refund_info退单表9order_detail_activity订单明细活动表10order_detail_coupon订单明细购物券表11refund_payment退款信息表12cart_info购物车表
2.2 Maxwell: MySQL To Kafka
2.2.1 首次全量同步
首次全量同步:使用maxwell-bootstrap功能将MySQL历史全量数据导入Kafka。
注意:增量表全量初始化前不要开启对该表的增量Maxwell采集,否则会造成该表在Kafka中的数据重复!初始化完成后再开启对该表的增量监控!
[mall@mall ~]$ mkdir bin
[mall@mall bin]$ vim /home/mall/bin/mysql_to_kafka_inc_init.sh
配置文件:
见下面2.2.2.1,与增量同步使用同一个配置文件。
脚本内容:
#!/bin/bash# 该脚本的作用是初始化增量表,只需执行一次import_data(){
/opt/module/maxwell-1.29.2/bin/maxwell-bootstrap --database gmall --table$1--config /opt/module/maxwell-1.29.2/config.properties
}case$1in"order_info")
import_data order_info
;;"coupon_use")
import_data coupon_use
;;"order_status_log")
import_data order_status_log
;;"user_info")
import_data user_info
;;"order_detail")
import_data order_detail
;;"payment_info")
import_data payment_info
;;"comment_info")
import_data comment_info
;;"order_refund_info")
import_data order_refund_info
;;"order_detail_activity")
import_data order_detail_activity
;;"order_detail_coupon")
import_data order_detail_coupon
;;"refund_payment")
import_data refund_payment
;;"cart_info")
import_data cart_info
;;"all")
import_data order_info
import_data coupon_use
import_data order_status_log
import_data user_info
import_data order_detail
import_data payment_info
import_data comment_info
import_data order_refund_info
import_data order_detail_activity
import_data order_detail_coupon
import_data refund_payment
import_data cart_info
;;esac
授予脚本执行权限:
[mall@mall bin]$ chmod +x mysql_to_kafka_inc_init.sh
执行脚本:
[mall@mall ~]$ mysql_to_kafka_inc_init.sh all
2.2.2 每日增量同步
2.2.2.1 编写Maxwell配置文件
[mall@mall maxwell-1.29.2]$ vim config.properties
内容:
#Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redisproducer=kafka
#目标Kafka集群地址kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092
#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}kafka_topic=topic_mall_db_binlog
#MySQL相关配置host=mall
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
2.2.2.2 启动Maxwell
# 启动[mall@mall ~]$ /opt/module/maxwell-1.29.2/bin/maxwell --config /opt/module/maxwell-1.29.2/config.properties --daemon# 停止[mall@mall ~]$ ps-ef|grep maxwell |grep-vgrep|awk'{print $2}'[mall@mall ~]$ kill
2.3 Flume: Kafka To HDFS
2.3.1 自定义拦截器
套路:从body中拿出采集到的数据,解析出有用字段放入header中,配置文件中可以获取header中的东西。
作用1:把从Kafka中获取的json串的业务表名放到header中。
作用2:把从Kafka中获取的json串的ts时间戳转换成毫秒,放入header中。
代码:
packagecom.songshuang.flume.interceptor;importcom.alibaba.fastjson.JSONObject;importorg.apache.flume.Context;importorg.apache.flume.Event;importorg.apache.flume.interceptor.Interceptor;importjava.nio.charset.StandardCharsets;importjava.util.List;importjava.util.Map;/**
* @date 2023/12/1 15:24
*/publicclassTimestampAndTableNameInterceptorimplementsInterceptor{@Overridepublicvoidinitialize(){}@OverridepublicEventintercept(Event event){// 1、从body中解析json拿出想要字段byte[] body = event.getBody();String db_binlog =newString(body,StandardCharsets.UTF_8);JSONObject jsonObject =JSONObject.parseObject(db_binlog);Long ts = jsonObject.getLong("ts");//Maxwell输出的数据中的ts字段时间戳单位为秒,Flume HDFSSink要求单位为毫秒String timeMills =String.valueOf(ts *1000);String tableName = jsonObject.getString("table");// 2、放入header中Map<String,String> headers = event.getHeaders();
headers.put("timestamp", timeMills);
headers.put("tableName", tableName);return event;}@OverridepublicList<Event>intercept(List<Event> events){for(Event event : events){intercept(event);}return events;}@Overridepublicvoidclose(){}// 建造者模式publicstaticclassBuilderimplementsInterceptor.Builder{@OverridepublicInterceptorbuild(){returnnewTimestampAndTableNameInterceptor();}@Overridepublicvoidconfigure(Context context){}}}
2.3.2 编写配置文件
[hadoop@hadoop104 ~]$ cd /opt/module/apache-flume-1.9.0-bin/job/
[hadoop@hadoop104 job]$ vim kafka_to_hdfs_db.conf
内容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics = topic_mall_db_binlog
a1.sources.r1.kafka.consumer.group.id = consumer_group_flume_mall_db_binlog
# 指定consumer从哪个offset开始消费,默认latest# a1.sources.r1.kafka.consumer.auto.offset.reset = earliest# 自定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.songshuang.flume.interceptor.TimestampAndTableNameInterceptor$Builder# 配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /warehouse/db/gmall/%{tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC =gzip# 文件滚动策略
a1.sinks.k1.hdfs.rollInterval =20
a1.sinks.k1.hdfs.rollSize =134217728
a1.sinks.k1.hdfs.rollCount =0# 配置channel
a1.channels.c1.type =file
a1.channels.c1.dataDirs = /opt/module/apache-flume-1.9.0-bin/data/kafka_to_hdfs_mall_db_binlog
a1.channels.c1.checkpointDir = /opt/module/apache-flume-1.9.0-bin/checkpoint/kafka_to_hdfs_mall_db_binlog
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.3.3 创建Kafka Topic
手动创建topic好处:可以自定义分区数、分区副本数。
[hadoop@hadoop102 ~]$ cd /opt/module/kafka_2.11-2.4.1/
[hadoop@hadoop102 kafka_2.11-2.4.1]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --create --replication-factor 2--partitions3--topic topic_mall_db_binlog
2.3.4 启动/停止Flume
# 启动[hadoop@hadoop104 job]$ cd /opt/module/apache-flume-1.9.0-bin/
[hadoop@hadoop104 apache-flume-1.9.0-bin]$ nohup bin/flume-ng agent --conf conf/ --name a1 --conf-file job/kafka_to_hdfs_db.conf &>/dev/null 2>/dev/null &# 停止[hadoop@hadoop104 apache-flume-1.9.0-bin]$ ps-ef|grep kafka_to_hdfs_db.conf |grep-vgrep|awk'{print $2}'[hadoop@hadoop104 apache-flume-1.9.0-bin]$ kill
三、正式上线
3.1 删除HDFS上业务测试数据
删除HDFS目录 /warehouse/db/gmall
3.2 删除Hive外部表对应HDFS的数据
删除HDFS目录 /warehouse/dw_ods.db/ods_业务表名
3.3 Sqoop全量同步
3.3.1 首次执行
注意:首次同步的数据不是很准确,分区数据可能错乱!
[hadoop@hadoop102 ~]$ mysql_to_hdfs_full.sh all
3.3.2 定时调度
默认给当前用户添加定时调度任务。
[hadoop@hadoop102 ~]$ crontab-e
内容:
每天的零点零分执行Sqoop全量同步脚本!
注意:添加要使用到的环境变量,最好与登陆用户的PATH保持一致!
原因:crontab用的自己的一套环境变量,并没有加载/etc/profile和~/.bash_profile等!
验证方式:分别通过命令行和crontab任务执行echo $PATH。
#JAVA_HOMEJAVA_HOME=/opt/module/jdk1.8.0_291
#HADOOP_HOMEHADOOP_HOME=/opt/module/hadoop-3.1.3
#KAFKA_HOMEKAFKA_HOME=/opt/module/kafka_2.11-2.4.1
# kafkaEagleKE_HOME=/opt/module/kafka-eagle-web-1.4.8
#HIVE_HOMEHIVE_HOME=/opt/module/apache-hive-3.1.2-bin
#PATHPATH=/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/opt/module/jdk1.8.0_291/bin:/opt/module/hadoop-3.1.3/bin:/opt/module/hadoop-3.1.3/sbin:/opt/module/kafka_2.11-2.4.1/bin:/opt/module/kafka-eagle-web-1.4.8/bin:/opt/module/apache-hive-3.1.2-bin/bin:/home/hadoop/.local/bin:/home/hadoop/bin
00 * * * mysql_to_hdfs_full.sh all >/opt/module/sqoop-1.4.6.bin__hadoop-2.0.4-alpha/logs/crontab_mysql_to_hdfs_full_all.log 2>&1
查看当前用户具有的定时任务:
[hadoop@hadoop102 ~]$ crontab-l
查看crontab任务执行记录:
[hadoop@hadoop102 ~]$ sudocat /var/log/cron |grep mysql_to_hdfs_full.sh
3.4 Maxwell全量同步
[mall@mall ~]$ mysql_to_kafka_inc_init.sh all
版权归原作者 m0_46218511 所有, 如有侵权,请联系我们删除。