0


【Python开发】FastAPI 10:SQL 数据库操作

在 FastAPI 中使用 SQL 数据库可以使用多个 ORM 工具,例如 SQLAlchemy、Tortoise ORM 等,类似 Java 的 Mybatis 。这些 ORM 工具可以帮助我们方便地与关系型数据库进行交互,如 MySQL 、PostgreSQL等。本篇文章将介绍如何使用 SQLAlchemy 来完成数据库操作,以便让我们在 FastAPI 项目中方便地进行数据存储和查询。

📌 源码地址:

https://gitee.com/yinyuu/fast-api_study_yinyu

1 介绍

1.1 SQLAlchemy

简单来说,**SQLAlchemy **就是一个 **ORM **工具,提供了灵活的数据模型定义和查询语法,支持多种数据库后端,比如:

  • MySQL
  • SQLite
  • Oracle
  • PostgreSQL
  • Microsoft SQL Server,等等其它数据库

在 **FastAPI **中使用 SQLAlchemy,我们可以通过安装 **SQLAlchemy **和相应的数据库驱动程序(如 mysqlclient,psycopg2 等)来连接到数据库,然后使用 **SQLAlchemy **提供的模型类定义数据表和字段,以及使用查询语法进行数据操作。

本篇文章中,我将以 **MySQL **为例,实现 **SQLAlchemy **的数据库连接及操作。

ORM 具有在代码和数据库表中的对象之间转换的工具,简单来说就是将该数据表映射到项目代码中,然后你通常在 SQL 数据库中创建一个代表映射的类,该类的每个属性代表一个列,具有名称和类型。

1.2 文件结构

项目中包含子目录 sql_app,本篇文章的文件结构如下:

  1. .
  2. └── sql_app
  3. ├── __init__.py
  4. ├── crud.py
  5. ├── database.py
  6. ├── main.py
  7. ├── models.py
  8. └── schemas.py

文件 **init.py **是一个空文件,不过它告诉 **Python **其中 **sql_app 的所有模块(Python **文件)都是一个包,可以拿来调用。

接下来,本文将以 **database.py -> models.py -> schemas.py -> crud.py -> main.py **的顺序开始讲述~

2 数据库连接

涉及到文件 sql_app/database.py,数据库操作的第一步便是连接数据库。

2.1 安装 mysqlclient

因为需求连接到 **mysql **数据库,因此需要预先安装 **mysql **驱动,可直接使用如下命令:

2.2 SQLAlchemy 使用

具体代码如下 👇

  1. #1.导入 SQLAlchemy 部件
  2. from sqlalchemy import create_engine
  3. from sqlalchemy.ext.declarative import declarative_base
  4. from sqlalchemy.orm import sessionmaker
  5. #2.为 SQLAlchemy 定义数据库 URL地址
  6. SQLALCHEMY_DATABASE_URL = "mysql+pymysql://user:password@ip地址:端口/数据表名?charset=utf8mb4"
  7. #3.创建 SQLAlchemy 引擎
  8. engine = create_engine( SQLALCHEMY_DATABASE_URL )
  9. #4.创建一个SessionLocal 数据库会话
  10. SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  11. #5.创建一个Base类
  12. Base = declarative_base()

📌 URL 地址

如果你想使用其他数据库,那么就需要对 **SQLALCHEMY_DATABASE_URL **的值进行变更。

比如使用的是 **PostgreSQL **数据库:"postgresql://user:password@postgresserver/db"。

📌 SessionLocal 类

它是一个本地线程存储(thread-local storage)的单例类,用来创建数据库会话。

简单来说,**SessionLocal **类的主要作用是为每个请求创建一个数据库会话,并且确保这个会话在整个请求期间都是唯一的。这样,我们就可以在不同的函数中使用同一个会话,从而避免了在不同函数中反复创建会话的麻烦。

📌 declarative_base()

**declarative_base() **是 **SQLAlchemy **中提供的一个函数,用于创建一个基类,然后通过继承这个基类来定义数据表模型。它可以让我们更加方便地定义数据表模型,而不需要关注底层的SQL语句。具体作用:

  • 自动创建对应的数据表:我们定义了数据表模型之后,可以调用 create_all() 方法来创建对应的数据表。
  • 自动映射数据表和类属性:我们只需要定义类属性,**SQLAlchemy **可以自动将这些属性映射到对应的数据表字段。
  • 提供了更加易读易懂的代码:使用 declarative_base() 可以让我们更加方便地定义类,使代码更加清晰易读。

3 创建模型

接下来便是创建和数据表映射的数据库模型以及 Pydantic 模型数据库模型用以对接数据表,Pydantic 模型则用来作为响应模型(response_model)及请求体。

3.1 数据库模型

涉及到文件 sql_app/models.py

具体代码如下 👇

  1. from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
  2. from sqlalchemy.orm import relationship
  3. #1.用Base类来创建 SQLAlchemy 模型
  4. from .database import Base
  5. class User(Base):
  6. __tablename__ = "users"
  7. #2.创建模型属性/列
  8. id = Column(Integer, primary_key=True, index=True)
  9. phone = Column(String, unique=True, index=True)
  10. hashed_password = Column(String)
  11. is_active = Column(Boolean, default=True)
  12. #3.创建关系
  13. items = relationship("Item", back_populates="owner")
  14. class Item(Base):
  15. __tablename__ = "items"
  16. id = Column(Integer, primary_key=True, index=True)
  17. title = Column(String, index=True)
  18. description = Column(String, index=True)
  19. owner_id = Column(Integer, ForeignKey("users.id"))
  20. owner = relationship("User", back_populates="items")

📌 用 Base 类来创建 SQLAlchemy 模型

Base 类也就是数据库连接时的** declarative_base()**,从 database(来自上面的 database.py 文件)导入 Base,那么它将自动映射数据表和类属性(原因在前边)。

📌 **tablename **

该属性是给模型映射的数据表的名称,比如 User 类的tablename users,那么它映射的数据表名即为** users。**

📌 模型属性/列

比如:

  1. id = Column(Integer, primary_key=True, index=True)
  2. phone = Column(String, unique=True, index=True)
  3. hashed_password = Column(String)
  4. is_active = Column(Boolean, default=True)

Column 表示这些属性中的每一个都代表其相应数据库表中的一列,Column 中的第一个参数,如Integer、**String **和 Boolean,它定义了数据库中的类型。

  • primary_key=True 代表了 **id **为主键;
  • index=True 代表 **id **列和 **email **列为索引,以提高查询性能;
  • unique=True 则是唯一约束,以确保在 **email **列中的每个值都是唯一的;
  • default=True 表示**is_active **的默认值为 True

📌 relationship 关系

  1. class User(Base):
  2. __tablename__ = "users"
  3. ...
  4. items = relationship("Item", back_populates="owner")
  5. class Item(Base):
  6. __tablename__ = "items"
  7. ...
  8. owner_id = Column(Integer, ForeignKey("users.id"))
  9. owner = relationship("User", back_populates="items")

这在 **User **模型和 **Item **模型之间定义了一个关系,使用 **back_populates **参数建立了双向关系。具体来说,是在 **User **模型中定义了一个名为 **items **的属性,并在 **Item 模型中定义了一个名为owner **的属性,这两个属性都与对方的模型相关联。

比如访问 **User **中的属性 **items **时,它将指向一个 **Item **的 **SQLAlchemy **模型列表(来自 items表),同时 **Item **使用 ForeignKey("users.id") 来标记 **owner_id **列为外键,并指定它与 User 模型中的 **id **列相关联。

在使用 **back_populates **参数时,需要注意以下几点:

  • **back_populates **参数必须在两个模型的关系属性中都使用,并且值必须互相对应;
  • 如果您使用了 **backref **参数来定义关系,那么可以使用 **backref **替换 **back_populates **;
  • 如果您的模型之间有多个关系,那么需要使用不同的 **back_populates **值来区分它们。

3.2 Pydantic 模型

涉及到文件 sql_app/schemas.py

具体代码如下 👇

  1. from typing import List, Union
  2. from pydantic import BaseModel
  3. #1.创建一个 ItemBase 和 UserBase 的 Pydantic模型(或者我们说“schema”)
  4. class ItemBase(BaseModel):
  5. title: str
  6. description: Union[str, None] = None
  7. #2.ItemCreate 继承自 ItemBase,他们在创建或读取数据时具有共同的属性。
  8. class ItemCreate(ItemBase):
  9. pass
  10. #3.Item 继承自 ItemCreate,增加 id 和 owner_id 字段
  11. class Item(ItemCreate):
  12. id: int
  13. owner_id: int
  14. class Config:
  15. orm_mode = True #使其包含关系字段
  16. class UserBase(BaseModel):
  17. phone: str
  18. #为了安全起见,password 不会出现在其他同类 Pydantic模型中,例如用户请求时不应该从 API 返回响应中包含它。
  19. class UserCreate(UserBase): #字段名对不上会报错,所以单独搞个
  20. password: str
  21. class User(UserBase):
  22. id: int
  23. is_active: bool
  24. items: List[Item] = []
  25. class Config:
  26. orm_mode = True

注意,SQLAlchemy 模型和 Pydantic 声明属性的方式不一样,前者是 = ,而后者是

📌 orm_mode

此类 **Config **用于为 **Pydantic **提供配置。

  1. class Item(ItemCreate):
  2. id: int
  3. owner_id: int
  4. class Config:
  5. orm_mode = True #使其包含关系字段

**Pydanticorm_mode **将告诉 **Pydantic **模型读取数据,即它不是一个 dict,而是一个 **ORM **模型。这样该 **Pydantic **模型就会尝试从属性中获取它,如 id = data.id。

有了这个,**Pydantic **模型与 **ORM **兼容,您只需在路径操作 **response_model **的参数中声明它,即可返回一个数据库模型,并从中读取数据。

SQLAlchemy 和许多其他默认情况下是“延迟加载”。这意味着,除非您尝试访问包含该数据的属性,否则它们不会从数据库中获取关系数据。

4 数据库操作 CRUD

涉及到文件 sql_app/crud.py,在此文件中,我们将编写可重用的函数用来与数据库中的数据进行交互。

CRUD 分别为:增加、查询、更改和删除,即增删改查。

4.1 查—读取数据

首先从 sqlalchemy.orm 中导入 Session,这将允许您声明 **db **参数的类型,并在您的函数中进行更好的类型检查和完成。 然后导入之前的 models(SQLAlchemy 模型)和 schemas(Pydantic模型/模式)。

创建一些实用函数来完成:

  1. from sqlalchemy.orm import Session
  2. from . import models, schemas
  3. #通过 ID 查询单个用户。
  4. def get_user(db: Session, user_id: int):
  5. return db.query(models.User).filter(models.User.id == user_id).first()
  6. #通过电子邮件查询单个用户。
  7. def get_user_by_email(db: Session, email: str):
  8. return db.query(models.User).filter(models.User.email == email).first()
  9. #查询多个用户
  10. def get_users(db: Session, skip: int = 0, limit: int = 100):
  11. return db.query(models.User).offset(skip).limit(limit).all()
  12. #查询多个项目
  13. def get_items(db: Session, skip: int = 0, limit: int = 100):
  14. return db.query(models.Item).offset(skip).limit(limit).all()

4.2 增—创建数据

现在通过函数来创建数据,它的步骤是:

  • 使用您的数据创建一个 **SQLAlchemy **模型实例;
  • 使用 **add **来将该实例对象添加到您的数据库;
  • 使用 **commit **来对数据库的事务提交(以便保存它们);
  • 使用 **refresh **来刷新您的数据库实例(以便它包含来自数据库的任何新数据,例如生成的 ID)。
  1. def create_user(db: Session, user: schemas.UserCreate):
  2. fake_hashed_password = user.password + "notreallyhashed"
  3. db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
  4. db.add(db_user)
  5. db.commit()
  6. db.refresh(db_user)
  7. return db_user
  8. def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
  9. db_item = models.Item(**item.dict(), owner_id=user_id)
  10. db.add(db_item)
  11. db.commit()
  12. db.refresh(db_item)
  13. return db_item

此示例不安全,因为密码未经过哈希处理,可以查看 Security 登录认证 内容进行完善。

4.3 改—修改数据

现在通过函数来修改数据,比如修改描述,它的步骤是:

  • 使用 **query **查找对应的实例对象,然后修改对应的字段;
  • 使用 **commit **来对数据库的事务提交(以便保存它们);
  • 使用 **refresh **来刷新您的数据库实例(以便它包含来自数据库的任何新数据,例如生成的 ID)。
  1. def update_item_desc_by_id(db: Session, id: int, desc: str):
  2. db_item = db.query(models.Item).filter_by(id=id).first()
  3. db_item.description = desc
  4. db.commit()
  5. db.refresh(db_item)
  6. return db_item

4.4 删—删除数据

现在通过函数来删除数据:

  1. # 批量删除1
  2. def delete_item_by_ownerId1(db: Session, owner_id: int):
  3. db.query(models.Item).filter_by(owner_id=owner_id).delete(synchronize_session=False)
  4. db.commit()
  5. return True
  6. # 批量删除2
  7. def delete_item_by_ownerId2(db: Session, owner_id: int):
  8. db_items = db.query(models.Item).filter_by(owner_id=owner_id).all()
  9. [db.delete(item) for item in db_items]
  10. db.commit()
  11. return True

5 接口创建及运行

5.1 接口创建

最后一步便是创建接口了,涉及到文件 sql_app/main.py,让我们集成和使用我们之前创建的所有其他部分:

  1. from typing import List
  2. from fastapi import Depends, FastAPI, HTTPException
  3. from sqlalchemy.orm import Session
  4. from sql_app import crud, models, schemas
  5. from sql_app.database import SessionLocal, engine
  6. #预先创建数据表
  7. models.Base.metadata.create_all(bind=engine)
  8. app = FastAPI()
  9. # Dependency
  10. def get_db():
  11. db = SessionLocal()
  12. try:
  13. yield db
  14. finally:
  15. db.close()
  16. @app.post("/users/", response_model=schemas.User)
  17. def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
  18. db_user = crud.get_user_by_email(db, phone=user.phone)
  19. if db_user:
  20. raise HTTPException(status_code=400, detail="Email already registered")
  21. return crud.create_user(db=db, user=user)
  22. @app.get("/users/", response_model=List[schemas.User])
  23. def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  24. users = crud.get_users(db, skip=skip, limit=limit)
  25. return users
  26. @app.get("/users/{user_id}", response_model=schemas.User)
  27. def read_user(user_id: int, db: Session = Depends(get_db)):
  28. db_user = crud.get_user_by_id(db, user_id=user_id)
  29. if db_user is None:
  30. raise HTTPException(status_code=404, detail="User not found")
  31. return db_user
  32. @app.post("/users/{user_id}/items/", response_model=schemas.Item)
  33. def create_item_for_user(
  34. user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)
  35. ):
  36. return crud.create_user_item(db=db, item=item, user_id=user_id)
  37. @app.get("/items/", response_model=List[schemas.Item])
  38. def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
  39. items = crud.get_items(db, skip=skip, limit=limit)
  40. return items
  41. @app.put("/update_item/{item_id}/")
  42. def update_item(item_id: int,desc: str, db: Session = Depends(get_db)):
  43. db_item = crud.update_item_desc_by_id(db, id=item_id, desc=desc)
  44. return db_item
  45. @app.delete("/delete_item/{owner_id}/")
  46. def delete_item(owner_id: int, db: Session = Depends(get_db)):
  47. result = crud.delete_item_by_ownerId2(db, owner_id=owner_id)
  48. return result
  49. if __name__ == '__main__':
  50. import uvicorn
  51. uvicorn.run(app, host="127.0.0.1", port=8080)

这样,我们就可以直接从路径操作函数内部调用,如 crud.get_user 并使用该会话,来进行对数据库操作。

📌 get_db 创建依赖项

  1. def get_db():
  2. db = SessionLocal()
  3. try:
  4. yield db
  5. finally:
  6. db.close()

使用我们在 sql_app/database.py 文件中创建的 **SessionLocal **来创建依赖项。 **yield **的作用如该链接:tutorial/dependencies/dependencies-with-yield/

我们将 SessionLocal() 请求的创建和处理放在一个 **try **块中。 然后我们在 **finally **块中关闭它。 通过这种方式,我们确保数据库会话在请求后始终关闭。即使在处理请求时出现异常。

**📌 **Session

然后,当在路径操作函数中使用依赖项时,我们使用 Session,直接从 **SQLAlchemy **导入的类型声明它。如:

  1. db: Session = Depends(get_db)

这将为我们在路径操作函数中提供更好的编辑器支持,因为编辑器将知道 **db **参数的类型 Session

一开始,编辑器并不真正知道提供了哪些方法。 但是通过将类型声明为Session,编辑器现在可以知道可用的方法(.add()、.query()、.commit()等)并且可以提供更好的支持。

**📌 **def 与 async def

本实例未使用 **async def **异步,如需使用请参考:FastApi+sqlalchemy异步操作mysql

5.2 项目运行

此时项目已经构建完成了,我们只需要在 main.py 文件中运行即可,我是使用 **main **方式启动,也可采用命令行的方式启动项目。

  1. if __name__ == '__main__':
  2. import uvicorn
  3. uvicorn.run(app, host="127.0.0.1", port=8000)

启动成功 👇

打开浏览器进入 http://127.0.0.1:8080/docs#/ 👇

这样的话你可以直接与你的 **FastAPI **应用程序交互,从真实数据库中读取数据:

📌 建表脚本

因为你需要与数据库进行交互,那么就要创建相应的数据表,以下是对应的建表脚本:

  1. CREATE TABLE `items` (
  2. `id` int(11) NOT NULL AUTO_INCREMENT,
  3. `title` varchar(64) DEFAULT NULL,
  4. `description` varchar(64) DEFAULT NULL,
  5. `owner_id` int(11) DEFAULT NULL,
  6. PRIMARY KEY (`id`)
  7. ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8
  1. CREATE TABLE `users` (
  2. `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  3. `phone` varchar(64) DEFAULT NULL,
  4. `hashed_password` varchar(64) DEFAULT NULL,
  5. `is_active` tinyint(1) DEFAULT '1',
  6. PRIMARY KEY (`id`)
  7. ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1

📌 创建中间件

可以添加中间件(只是一个函数)将为每个请求创建一个新的 SQLAlchemy SessionLocal,将其添加到请求中,然后在请求完成后关闭它。

  1. @app.middleware("http")
  2. async def db_session_middleware(request: Request, call_next):
  3. response = Response("Internal server error", status_code=500)
  4. try:
  5. request.state.db = SessionLocal()
  6. response = await call_next(request)
  7. finally:
  8. request.state.db.close()
  9. return response
  10. # Dependency
  11. def get_db(request: Request):
  12. return request.state.db

request.state 是每个 **Request **对象的属性。它用于存储附加到请求本身的任意对象,例如本例中的数据库会话。对于这种情况下,它帮助我们确保在所有请求中使用单个数据库会话,然后关闭。

使用 **yield **依赖项与使用中间件,虽然效果类似,但也有一些区别:

  • 中间件必须是一个 **async **函数。 - 如果其中有代码必须“等待”网络,它可能会在那里“阻止”您的应用程序并稍微降低性能。- 尽管这里的 SQLAlchemy 工作方式没问题,但是如果您向等待大量 I/O 的中间件添加更多代码,则可能会出现问题。
  • 每个请求都会运行一个中间件。 - 将为每个请求创建一个连接。- 即使处理该请求的路径操作不需要数据库。

**yield **依赖项足以满足用例时,使用 **yield **依赖项方法会更好。

标签: 数据库 fastapi mysql

本文转载自: https://blog.csdn.net/weixin_51407397/article/details/131152251
版权归原作者 尹煜 所有, 如有侵权,请联系我们删除。

“【Python开发】FastAPI 10:SQL 数据库操作”的评论:

还没有评论