0


使用Python实现对接Hadoop集群(通过Hive)并提供API接口

安装必要的库

首先,确保已经安装了以下库:

pip install flask
pip install pyhive

代码实现

1. app.py(主应用文件)

from flask import Flask, jsonify, request, abort
from pyhive import hive
import re
from datetime import datetime

app = Flask(__name__)# Hive连接配置
HIVE_HOST ="hadoop-cluster-ip"
HIVE_PORT =10000
HIVE_USERNAME ="your_username"
HIVE_PASSWORD ="your_password"# 日期格式正则表达式,用于校验输入的日期格式
DATE_FORMAT_REGEX = re.compile(r'^\d{4}-\d{2}-\d{2}$')defvalidate_date(date_str):"""
    校验日期字符串是否符合指定格式(YYYY-MM-DD)
    """ifnot DATE_FORMAT_REGEX.match(date_str):raise ValueError("日期格式不正确,请使用YYYY-MM-DD格式。")try:
        datetime.strptime(date_str,'%Y-%m-%d')returnTrueexcept ValueError:raise ValueError("日期格式不正确,请使用YYYY-MM-DD格式。")@app.route('/api/orders', methods=['GET'])defget_orders():
    start_date = request.args.get('start_date')
    end_date = request.args.get('end_date')# 校验日期参数格式try:if start_date:
            validate_date(start_date)if end_date:
            validate_date(end_date)except ValueError as e:
        abort(400, description=str(e))try:# 连接Hive
        connection = hive.connect(
            host=HIVE_HOST,
            port=HIVE_PORT,
            username=HIVE_USERNAME,
            password=HIVE_PASSWORD
        )

        cursor = connection.cursor()# 构建查询语句,添加必要的防止SQL注入的处理
        query =f"SELECT order_id, order_date, order_amount FROM orders WHERE order_date BETWEEN '{start_date}' AND '{end_date}'"
        query = query.replace("'","''")# 将单引号替换为两个单引号,防止SQL注入

        cursor.execute(query)
        results = cursor.fetchall()# 将结果转换为字典列表形式
        orders =[]for row in results:
            order ={"order_id": row[0],"order_date": row[1],"order_amount": row[2]}
            orders.append(order)

        cursor.close()
        connection.close()return jsonify(orders)except hive.DatabaseError as e:# 针对Hive数据库相关错误进行更详细的错误处理
        abort(500, description=f"Hive数据库错误: {str(e)}")except Exception as e:
        abort(500, description=f"其他错误: {str(e)}")if __name__ =='__main__':
    app.run(debug=True)

代码解析

输入参数校验

定义了validate_date函数,通过正则表达式和datetime.strptime来严格校验输入的日期参数是否符合YYYY-MM-DD格式。如果不符合格式,将直接返回400错误给客户端,提示正确的日期格式要求。

错误处理

●    在get_orders函数中,对可能出现的不同类型的错误进行了更细致的处理。对于Hive数据库相关的错误(如连接失败、查询失败等),会返回500错误并明确告知是Hive数据库错误及具体错误信息。对于其他一般性的错误,同样返回500错误并给出相应的错误描述。

安全防护(防止SQL注入)

●    在构建查询语句时,对输入的日期参数进行了处理,将单引号替换为两个单引号。这样可以在一定程度上防止SQL注入攻击,确保查询语句的安全性。

单元测试

  • 以下是使用Python的unittest模块对代码进行单元测试的示例:
import unittest
from unittest.mock import patch
from app import app, validate_date

classTestApp(unittest.TestCase):defsetUp(self):
        self.app = app.test_client()deftest_validate_date_valid(self):
        self.assertTrue(validate_date('2024-11-10'))deftest_validate_date_invalid_format(self):with self.assertRaises(ValueError)as context:
            validate_date('2024/11/10')
        self.assertEqual(str(context.exception),"日期格式不正确,请使用YYYY-MM-DD格式。")deftest_validate_date_invalid_value(self):with self.assertRaises(ValueError)as context:
            validate_date('2024-13-32')
        self.assertEqual(str(context.exception),"日期格式不正确,请使用YYYY-MM-DD格式。")@patch('app.hive.connect')deftest_get_orders_success(self, mock_connect):# 模拟查询结果
        mock_cursor = mock_connect.return_value.cursor.return_value
        mock_cursor.fetchall.return_value =[(1,'2024-11-10',100.0)]

        response = self.app.get('/api/orders?start_date=2024-11-10&end_date=2024-11-10')
        self.assertEqual(response.status_code,200)
        self.assertEqual(response.get_json(),[{"order_id":1,"order_date":"2024-11-10","order_amount":100.0}])deftest_get_orders_missing_parameters(self):
        response = self.app.get('/api/orders')
        self.assertEqual(response.status_code,400)
        self.assertEqual(response.get_json()['description'],"日期格式不正确,请使用YYYY-MM-DD格式。")@patch('app.hive.connect')deftest_get_orders_database_error(self, mock_connect):
        mock_connect.side_effect = hive.DatabaseError("模拟数据库错误")

        response = self.app.get('/api/orders?start_date=2024-11-10&end_date=2024-11-10')
        self.assertEqual(response.status_code,500)
        self.assertEqual(response.get_json()['description'],"Hive数据库错误: 模拟数据库错误")@patch('app.hive.connect')deftest_get_orders_general_error(self, mock_connect):
        mock_connect.side_effect = Exception("模拟一般错误")

        response = self.app.get('/api/orders?start_date=2024-11-10&end_date=2024-11-10')
        self.assertEqual(response.status_code,500)
        self.assertEqual(response.get_json()['description'],"其他错误: 模拟一般错误")if __name__ =='__main__':
    unittest.main()

代码块解析

上述单元测试代码主要涵盖了以下几个方面:

测试validate_date函数

• test_validate_date_valid测试了validate_date函数对于有效日期格式的验证是否正确。

• test_validate_date_invalid_format和test_validate_date_invalid_value分别测试了对于无效日期格式和无效日期值的情况,是否能正确抛出ValueError异常并给出正确的错误信息。

测试get_orders函数

• test_get_orders_success通过patch模拟了hive.connect和查询结果,测试了get_orders函数在正常情况下是否能正确返回查询结果和状态码200 。

• test_get_orders_missing_parameters测试了在缺少查询参数时,是否能正确返回400错误及相应的错误描述。

• test_get_orders_database_error和test_get_orders_general_error分别模拟了hive.DatabaseError和一般Exception的情况,测试了get_orders函数在出现不同类型错误时是否能正确返回500错误及相应的错误描述。

通过这些单元测试,可以较为全面地验证优化后的代码的正确性和可靠性,确保各个功能模块能够按照预期工作。

标签: spark 大数据 python

本文转载自: https://blog.csdn.net/qq_38617531/article/details/143757750
版权归原作者 脸ル粉嘟嘟 所有, 如有侵权,请联系我们删除。

“使用Python实现对接Hadoop集群(通过Hive)并提供API接口”的评论:

还没有评论