spark 读取数据
**除了可以用spark.sql(sql语句如select...)外,对于不同的格式的数据也有相应的接口api ,得到spark dataframe的数据。 **
如果csv : spark.read.csv('path') 或者 spark.createDataFrame(pandas.read_csv('path'))
如果是json : spark.read.json(path);
如果是parquet :
spark.read.parquet(path);
如果是text: spark.read.text(path) ;
注: 上面的spark 是pyspark 建的一个spark session 。 相关配置可以忽略,一般都是复用公司固定的。
spark = SparkSession.builder.appName(VERSION).config(
'spark.yarn.queue', '具体的队列名' ## 这只是队列名-可忽略
).config(
'spark.jars', 'lib/spark-tensorflow-connector_2.11-1.10.0.jar'
).config(
'spark.dynamicAllocation.maxExecutors', '1000'
).config(
'spark.executor.cores', '2'
).config(
'spark.sql.autoBroadcastJoinThreshold', '-1'
).config("dfs.client.use.datanode.hostname", "true"
).config('spark.default.parallelism', '100'
).config('spark.yarn.executor.memoryOverhead', '4096'
).config('spark.core.connection.ack.wait.timeout', '300'
).config('spark.network.timeout', '120s'
).config('spark.executor.memory', '8G'
).config('spark.sql.merge.enabled', 'true'
).config('spark.driver.memory', '6G'
).enableHiveSupport().getOrCreate()
spark从某hive表选取数据写入另一个表的一个模板 概述:
table_name = 'name' # 要写入的目标表
date = '2023-01-21' # 取数据的日期
create_tabel(table_name) # 建表函数,表结构(要写入的数据表) ,建表时注意常用日期来分区
drop_partition(spark, table_name) #删除原有函数, 如果原来有相关分区数据则进行删除
generate_data(date, table_name) # 读取数据函数并写入目标表
add_partition(spark, table_name) # 调整写入的分区 ,完成
create_tabel建表函数,定义日期分区
def create_tabel(table_name) :
create_table_sql = """
CREATE TABLE IF NOT EXISTS DB_NAME.{table_name} (
column1 数据类型,
dt_test ,string , .....
count ,float
) PARTITIONED BY ( year string,
month string,
day string
) STORED AS ORC
LOCATION 'DB_PATH/{table_name}'
""".format(table_name = table_name)
spark.sql(create_table_sql) # 执行建表语句。注意上面的分区形式。
删除原有分区drop_partition函数
def drop_partition(spark, table_name):
alter_table_sql = '''
ALTER TABLE ${DB_NAME}.{table_name} DROP IF EXISTS
PARTITION (year = '{year}',month = '{month}', day = '{day}')
'''.format(year = date[:4], month = date[5:7], day = date[8:10], table_name = table_name)
spark.sql(alter_table_sql)
generate_data 数据处理函数,将相关数据写入定义的表中
def generate_data(date, table_name):
get_data_part = spark.sql("""
select
concat_ws('-', year, month, day) as dt_test ,
count ,...更多数据列 (可以是经过sql处理解析后后得到的数据列,如json可以直接字典解析param['city'] as city_id
from 已有old表A
where
各种限制条件或者过滤条件 ,如时间限制 concat_ws('-', year, month, day) = '{date}'
""".format(date= date )
).cache() # 缓存数据到内存,后期数据不用再反复执行,减少耗时
可以对Dataframe get_data_part再进行各种处理得到spark dataframe get_data_part_final 。
** aim_columns** = [ 'col1', 'col2' , 'col3'...] **# aim_columns 传变量的形式,这里的数据column 一定要和建表时的数据一致,写入前select 后直接写入,以这种写法这样便可以减少对写入数据时列名对应不上等报错问题。另外要注意最后处理的得到的各列数据类型也一致。 **
写入数据
file_path = 'DB_PATH/{table_name}/{year}/{moth}/{day}'.format(year = date[:4],month=date[5:7],day=date[8:10],tabel_name= table_name) # 定义路径
**#方式1 : 直接用spark Dataframe 的write来写入。 **
get_data_part_final.select(aim_columns).**coalesce(5).write.orc(path=file_path,mode='overwrite') **
上面写入语句中: .coalesce(5)是将数据文件写为指定个数5,这样可以减少数据倾斜现象,和.repartition(5)的功能无差别,这样就能控制保存的文件数量**。 mode= 'overwrite'会覆盖之前的数据,如果将overwrite改为'append'会追加到表中。 **
关于数据倾斜和小文件过多,可以见自己的另一个总结:http://t.csdn.cn/i7nv0
# 方式2 可以用spark sql 的方式来写
get_data_part_final.**createOrReplaceTempView("test_temp")**
spark.sql("""**insert overwrite table db_name.{table_name} partition(date={date}) **select aim_columns from test_temp""".format(table_name= table_name,date=date )
上面insert overwrite会重写数据,既先进行删除,再写入,对应方式1 的mode为overwrite 的形式。如果是追加写入表中(方式1重mode为append),则此时应该改为insert into直接追加到表中数据的尾部。
**insert into 和overwrite **
spark.sql中 注意 insert into 后的 table 标识可写可不写,但是insert overwrite 不行!
INSERT****INTO [TABLE] [db_name.]table_name
INSERT OVERWRITE TABLE [db_name.]table_name
另外: ''' insert into tableNew [ partition(...)] select 字段 from 已有表''' 时,注意select 后的字段要确认和tableNew 中的表字段的先后顺序对应的上!**不要随意用
select *
这个习惯不好! 另外注意字段的数量是否一致。**
**注: 关于 insert overwrite/into 中partition时容易出的分区报错问题: **
**关于partition : **
关于 partition的分区,如果是想以指定的某一天的日期进行分区,在insert into 语句 的partition里直接指定 分区的取值,如(year='2022'),插入的数据的分区则为指定取值的分区,此时下面的select语句中不用包括year,month,day字段了。注意此时 因为 insert 中的partition后已经指定了字段值,如果select 中再加入这几个字段就会显示字段值不匹配,会多了而报错,指定字段的值如下.如果想根据 选出的字段中的值进行分区,则 partition()里加入字段即可,注意select中要包括partition括号的字段,如 insert into tableA partition(year, month,day) select ... year,month,day ....
此处参考:pyspark--写hive分区表覆盖指定分区数据
一文搞定hive之insert into 和 insert overwrite与数据分区
添加分区函数add_partition
def add_partition(spark, table_name):
alter_table_sql = '''
ALTER TABLE DB_NAME.{table_name} ADD IF NOT EXISTS
PARTITION (year = '{year}',month = '{month}', day = '{day}')
LOCATION 'DB_PATH/{table_name}/{year}/{month}/{day}'
'''.format(year = date[:4], month = date[5:7], day = date[8:10], table_name = table_name)
spark.sql(alter_table_sql)
这个如果在原来generate_data写入数据函数中有进行分区,其实可以不用再调用。
部分经验:多用传变量的形式; select最后的逗号要去掉;
之前自己遇到过的一些低级错误记录下,
- 多处用到的名字尽量用变量传入进去到sql语句部分。前后用的view 和 register 相语句中tempTable 表名要一致! 多处用到的名字尽量用变量传入进去到sql语句部分,这样一个地方定义了就不会因为多次更改而容易出错,减少犯错的概率,养成良好的代码习惯。 在pyspark中形式如 spark.sql( ```sql 语句 中涉及变量为{var1}`,注意带着{}``.format(var1=值) ,如果在函数中的尽量用函数参数变量给值,这样当变量值涉及多个函数用到时方便同意给值。
- 括号要对应上,select 语句中的最后一个字段不能有逗号!有时候日志不一定能报错出来!要注意检查。通常报错 “ org.apache.spark.sql.AnalysisException: cannot resolve '
reward
(第一个字段)' given input columns: [].... 'InsertIntoTable 'UnresolvedRelationdb.table
, Map(字段)...False”时要注意去看是否字段不对应,是否最后一个字段多了多余的分割符号等!
参考:
Hive/Spark小文件解决方案(企业级实战) - 腾讯云开发者社区-腾讯云
PySaprk 将 DataFrame 数据保存为 Hive 分区表 | XinanCSD.github.io
版权归原作者 旺仔的算法coding笔记 所有, 如有侵权,请联系我们删除。