0


Spark高级用法-数据源的读取与写入


数据读取

  • 读文件- read.json- read.csv- csv文件有两个部分构成 头部数据,也就是字段数据,行数数据- read.orc

  • 读数据库- read.jdbc(jdbc连接地址,table='表名',properties={'user'=用户名,'password'=密码,'driver'='驱动信息'})

缺少连接驱动的错误

拷贝连接驱动包

将MySQL驱动包放入/export/server/spark/jars/目录下

cp /export/server/hive/lib/mysql-connector-java-5.1.32.jar /export/server/spark/jars/

数据库创建测试数据

  1. create database itcast charset=utf8;
  2. create table itcast.tb_user(
  3. id int,
  4. name varchar(20),
  5. age int,
  6. gender varchar(20)
  7. );
  8. insert into itcast.tb_user values (1,'张三',20,'男');

pyspark读取数据库数据

  1. from pyspark.sql import SparkSession
  2. ss = SparkSession.builder.getOrCreate()
  3. # 获取外部
  4. df = ss.read.text("hdfs://node1:8020/data/students.txt")
  5. df.show()
  6. # 获取外部数据库数据 采用jdbc方式读取,只要是支持jdbc连接的的数据库都可读
  7. # url参数1 jdbc的连接地址
  8. # table 指定连接的表
  9. # properties 属性参数,指定连接的账户密码及驱动信息
  10. df2 = ss.read.jdbc(
  11. url='jdbc:mysql://192.168.88.100:3306/itcast',table='tb_user',
  12. properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'}
  13. )
  14. df2.show()

数据写入

  • 因为数据是在df中存储,所以使用dataframe进行数据写入 - 使用dtaframe的的write方法

  • 写入文件有个模式,覆盖和追加两种方式,用mode参数指定 - 覆盖 overwrite- 追加 oppend

  • 写入文件 - write.json- write.csv- write.orc

  • 写入数据库 - write.jdbc(jdbc连接地址,table='表名',properties={'user'=用户名,'password'=密码,'driver'='驱动信息'},mode='写入方式')

数据库创建表

pyspark写入数据库数据

  1. # 数据写入
  2. from pyspark.sql import SparkSession,Row
  3. ss = SparkSession.builder.getOrCreate()
  4. df = ss.createDataFrame([
  5. Row(id = 1,name = '张三',age = 20),
  6. Row(id = 2,name = '李松',age = 20),
  7. Row(id = 3,name = '荔枝',age = 20)
  8. ],
  9. schema = 'id int,name string,age int'
  10. )
  11. # 将df数据写入hdfs中
  12. df.write.json('hdfs://node1:8020/data/data_json',mode='overwrite')
  13. # 写入数据库
  14. df.write.jdbc('jdbc:mysql://192.168.88.100:3306/itcast?characterEncoding=utf8',table='tb_stu',mode='overwrite',
  15. properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'})

验证hdfs是否写入数据

验证数据库是否传入数据

总结

使用read和write实现数据导入导出

读取mysql数据库的原始数据表

df = ss.read.jdbc()

在将读取到的数据导入数仓中

df.write.orc(hdfs://node1:8020/ods/tb_user


本文转载自: https://blog.csdn.net/weixin_58305115/article/details/142902985
版权归原作者 木鬼与槐 所有, 如有侵权,请联系我们删除。

“Spark高级用法-数据源的读取与写入”的评论:

还没有评论