0


PySpark 读写Hive数据源

一、环境配置

本文在Windows下配置Spark访问Hive。如需在Linux上配置,请对应Linux上同样的目录即可。

  1. 检查PySpark环境正常运行;检查Hive环境正常运行;启动Hive元数据服务

hive –service metastore

  1. 先将%HIVE_HOME%\conf\hive-site.xml拷贝到%SPARK_HOME%\conf。此步骤是为了Spark能读取Hive相应的配置;
  2. 再将%HIVE_HOME%\lib下的MySQL连接驱动的Jar包(mysql-connector-java-5.1.36-bin.jar)拷贝到%SPARK_HOME%\jars目录下。Jar包的版本与MySQL数据库的版本配套。此步骤是为了Spark能够访问Hive的元数据库。

此时,正常启动PySpark交互程序,可在交互模式下正常访问Hive了。进入交互环境,在提示符后直接输入以下代码:

spark.sql(‘show tables’).show()

正常执行后,应该能够看到default库中的表。这时,可以配置Python IDE的开发环境了

  1. 检查pycharm或其他IDE中的PySpark的开发环境正常;
  2. 再将%HIVE_HOME%\lib下的MySQL连接驱动的Jar包(mysql-connector-java-5.1.36-bin.jar)拷贝到%PYTHONDIR%\Lib\site-packages\pyspark\jars目录下。Jar包的版本与MySQL数据库的版本配套。此步骤是为了在IDE环境中能够访问Hive的元数据库;
  3. 添加环境变量SPARK_CONF_DIR,变量值为%SPARK_HOME%\conf。此步骤是为了在IDE中运行Spark程序时,能够读取Spark配置目录下的相应配置信息;

此时,正常启动pycharm。可在IDE环境下正常访问Hive了。在pycharm的工程中新建一个Python文件,输入以下代码:

from pyspark.sql import SparkSession

spark=SparkSession.builder.appName('sparkhive').master('local[*]').enableHiveSupport().getOrCreate()

spark.sql('show tables').show()

正常执行后,应该能够看到default库中的表。

二、读写Hive数据源

从Spark2.0开始,引入SparkSession 作为 DataSet 和 DataFrame API 的切入点,SparkSession封装了SparkConf、SparkContext 和 SQLContext。为了向后兼容,SQLContext 和 HiveContext也被保存下来。在实际写程序时,只需要定义一个SparkSession对象就可以了。不用使用SQLContext 和 HiveContext。

  1. SQLContext 和 HiveContext方式读写Hive数据

(1)读取数据

from pyspark.sql import HiveContext

from pyspark import SparkConf,SparkContext

conf=SparkConf().setMaster("local").setAppName("sparkhive")

sc=SparkContext(conf=conf)

创建HiveContext实例

hive_context = HiveContext(sc)

读取default.stocks表

stocks_df = hive_context.sql("SELECT * FROM stocks")

显示数据

stocks_df.show(10)

(2)写入数据

from pyspark.sql import HiveContext

from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType

from pyspark import SparkConf,SparkContext

conf=SparkConf().setMaster("local").setAppName("sparkhive")

sc=SparkContext(conf=conf)

定义DataFrame的结构(与stocks表的结构一致)

schema = StructType([

StructField("exchange_e", StringType(), True),

StructField("symbol", StringType(), True),

StructField("ymd", StringType(), True),

StructField("price_open", FloatType(), True),

StructField("price_high", FloatType(), True),

StructField("price_low", FloatType(), True),

StructField("price_close", FloatType(), True),

StructField("volume", IntegerType(), True),

StructField("price_adj_close", FloatType(), True)

])

hive_context = HiveContext(sc)

创建DataFrame

new_data = [("BJSSE","AAPL", "2023-07-04", 150.0, 155.0, 148.0, 152.0, 1000000, 151.0),

        ("SSE", "GOOG","2023-07-04", 2600.0, 2650.0, 2590.0, 2630.0, 500000, 2620.0)]

df_to_write = hive_context.createDataFrame(new_data, schema=schema)

注册为临时表以便进行后续操作

df_to_write.registerTempTable("temp_stocks")

将临时表中的数据插入到stocks表

hive_context.sql('''

INSERT INTO TABLE stocks

SELECT * FROM temp_stocks

''')

hive_context.sql("select * from stocks where exchange_e='SSE'").show()

此方法的读写操作也可以参看Spark2.1.0入门:连接Hive读写数据(DataFrame)(Python版)_厦大数据库实验室博客 Spark2.1.0入门:连接Hive读写数据(DataFrame)(Python版)_厦大数据库实验室博客

  1. SparkSession方式读取Hive数据

在Spark中,使用SparkSession(从Spark 2.0开始)可以方便地读取和写入Hive表。以下是如何在Python中使用PySpark进行操作的例子:

(1)读取数据

from pyspark.sql import SparkSession

初始化SparkSession并启用Hive支持

spark = SparkSession.builder\

.appName("StocksDataWriteExample")\

.enableHiveSupport()\

.getOrCreate()

读取并显示stocks表的数据

spark.sql("SELECT * FROM stocks").show(10)

(2)写入数据

from pyspark.sql import SparkSession

初始化SparkSession并启用Hive支持

spark = SparkSession.builder \

    .appName("StocksDataWriteExample") \

    .enableHiveSupport() \

    .getOrCreate()

定义数据和列结构(与stocks表结构一致)

columns = ["exchange_e", "symbol", "ymd", "price_open", "price_high", "price_low", "price_close", "volume", "price_adj_close"]

new_data = [("BJSSE","AAPL", "2023-07-04", 150.0, 155.0, 148.0, 152.0, 1000000, 151.0),

        ("SSE", "GOOG","2023-07-04", 2600.0, 2650.0, 2590.0, 2630.0, 500000, 2620.0)]

创建DataFrame

df_to_write = spark.createDataFrame(new_data, schema=columns)

写入数据到stocks表,这里假设mode为'append'(追加模式)

df_to_write.write \

.mode('append') \

.format('Hive') \

.saveAsTable('default.stocks')

(3)要注意的问题

Hive 3.0以后,默认建立的表是ORC格式的(不用在hive-site.xml中开启行级事务支持)。即可以支持INSERT,DELETE和UPDATE行级事务操作。但如果是在Hive交互命令行创建的表,在spark程序看来都是HiveFileFormat格式的表。因此,上面的代码中采用.format('Hive')。Spark会匹配相应的schema。要回避这个问题,也可以采用以下代码,即从一个临时表向目标表追加数据的方法。

创建一个与stocks表结构相同的临时表

df_to_write.createOrReplaceTempView("temp_stocks")

使用Hive SQL语句将临时表数据插入到stocks表

spark.sql("""

INSERT INTO TABLE default.stocks

SELECT * FROM temp_stocks

""")

spark.sql('select * from stocks limit 10').show()


本文转载自: https://blog.csdn.net/alan_lin/article/details/136317019
版权归原作者 alan_lin 所有, 如有侵权,请联系我们删除。

“PySpark 读写Hive数据源”的评论:

还没有评论