在阿里云odps中有相关操作,dataworks中也可操作,odps可以在服务器通过账号访问,dataworks是可视化的界面,可以写sql,spark,python,当然表的相关操作都可以进行,默认前提是自己拥有权限。
dataworks可以实现端到端的操作,从数据获取到数据处理,数据应用,完全不需要额外的空间进行数据操作(预处理,编码,建模,等等)。当然不是我说说这么简单,下面是我的一些记录:
For Recommendation in Deep learning QQ Group 277356808
For deep learning QQ Second Group 629530787
I'm here waiting for you
1-pyodps创建与读取表,参考资源
cmd = odps.ODPS(access_id='Lkssdmndiamk92idm02kd,soskx,xpdmd;a,cdsa2', # 账号
secret_access_key='993js9392dd,slamclcajwqoqwmdod,swpEfr', # 密码
project="xiaomingge", # odps上的项目名称
endpoint='http://xiaomingbrother/api') #
cmd.execute_sql("drop TABLE if EXISTS {};".format(table_name))#个人完全拥有权限
columns=[]
for col in df.columns.to_list():
columns.append(odps.models.Column(name="{0}".format(col),type="string",comment="the {0}".format(col)))
#partitions=[odps.models.Partition(name="dt",type="string",comment="the partition")]
schema=odps.models.Schema(columns=columns,)#partitions=partitions)
print(schema.columns)
table=cmd.create_table(name=table_name,schema=schema,project="dalao",if_not_exists=True)
上述的表创建之前都要删除之前的同名表(如果存在),因为我不确定 insert overwrite这种写法有啥用,以及有啥影响。
读取表:
t = cmd.get_table(table_name)
with t.open_reader() as reader:
count = reader.count
print("table rows' number :{}".format(count))
pd_df=reader.to_pandas()
2-odps写表,参考上述资源
依旧是直接pd-dataframe操作,分区可选
t=cmd.get_table(table_name)
with t.open_writer() as writer:#partition="dt=test",create_partition=True
res=df.values.tolist()
writer.write(res)
注意:如果创建的表不是分区的,则写入的时候分区可能不成功,应保持一致,不能朝三暮四
3-spark-sql随机的任意dataframe或数组创建表并存储在odps
这个问题直接问死老师傅,大部分都是从另一个表到新的表,随机的几行几列数组,比如10行2列,也是可以直接存储的(读取为spark的dataframe即可)
spark = SparkSession.builder \
.appName("xiaoming_gege_project") \
.config("spark.sql.catalogImplementation", "odps") \
.getOrCreate()
keys=["laji_user_id","user_id","swru","sqmu","haby","user_gender",
"banum","ba_gender","city_level_id","province_name","work_month",
"user_status"]
values=[10000000,1000000,20000,20000,3,3,
10,3,8,39,20,10]
encoder_tuple=[]#元素为()
for i in range(1,len(keys)+1):
key=keys[i]
value=list(range(values[i]))
value=",".join(value)
encoder_tuple.append((i,key,value))
encoder_table= "myproject.dssm_encoder"
# 表结构
schema = StructType([StructField("keys", StringType(), True),
StructField("raw_data", StringType(), True),])
rdd = spark.sparkContext.parallelize(encoder_tuple)
encoder_df = spark.createDataFrame(rdd, schema)
encoder_df.toDF().insertInto(encoder_table)
似乎也可是如下的形式:py-dataframe,原来的列表形式改成字典形式(如下,也可是其他py-pandas能处理的形式)
# encoder_df=pd.DataFrame(encoder_dict.items(),columns=["keys","raw_data"])
# encoder_df=spark.createDataFrame(encoder_df)
4-追加内容SQL
如果是原来表的内容不要了,那么可以采用 insert overwrite table myproject.dalao_table
如果是追加表内容,把overwrite换成into即可。
5-SQL建表
CREATE TABLE myproject.dalao_table
(
user_id STRING
)
版权归原作者 小李飞刀李寻欢 所有, 如有侵权,请联系我们删除。