做一个公司网站大概要多少钱,织梦快速建站,数码家电商城网站源码,网站ip地址范围No17: 数据库连接与 ORM#xff08;SQLAlchemy 实战#xff09; 摘要 本文深入探讨SQLAlchemy在复杂场景下的高级应用#xff0c;涵盖四大核心主题#xff1a;
会话生命周期管理#xff1a;通过事件钩子实现事务监控与审计追踪混合继承映射#xff1a;结合单表/连接表继…No17: 数据库连接与 ORMSQLAlchemy 实战 摘要 本文深入探讨SQLAlchemy在复杂场景下的高级应用涵盖四大核心主题
会话生命周期管理通过事件钩子实现事务监控与审计追踪混合继承映射结合单表/连接表继承优势实现多态模型高性能操作批量插入与核心SQL构造优化大数据处理异步支持基于asyncmy的MySQL异步引擎实践
实战案例包含电商系统的分库分表实现策略与基于ORM的审计日志系统设计。扩展部分解析多租户架构的三种隔离方案及SQL注入防御的ORM层最佳实践。配套代码演示了从分片路由到异步操作的完整电商系统实现帮助开发者掌握构建高并发、高安全性的企业级数据库应用能力。 核心概念
1. 会话生命周期管理Session事件钩子
通过事件钩子实现精细化的会话控制典型场景包括事务边界管理、变更追踪和审计日志。
from sqlalchemy import event
from sqlalchemy.orm import sessionmakerSession sessionmaker()# 事务提交前事件
event.listens_for(Session, before_commit)
def before_commit(session):print(即将提交事务当前待处理变更, session.dirty)# 事务回滚后事件
event.listens_for(Session, after_rollback)
def after_rollback(session):print(事务已回滚清理临时状态)2. 混合继承映射策略
结合单表继承和连接表继承的优势实现灵活的多态查询
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationshipBase declarative_base()class Product(Base):__tablename__ productid Column(Integer, primary_keyTrue)type Column(String(20)) # 单表继承标识__mapper_args__ {polymorphic_on: type}class Book(Product):__tablename__ bookid Column(Integer, ForeignKey(product.id), primary_keyTrue)isbn Column(String(13))__mapper_args__ {polymorphic_identity: book}class Electronic(Product):__tablename__ electronicid Column(Integer, ForeignKey(product.id), primary_keyTrue)warranty Column(Integer)__mapper_args__ {polymorphic_identity: electronic}3. 批量操作与核心SQL构造
高效处理大数据量操作的两种方式
# ORM批量插入
session.bulk_insert_mappings(User, [{name: Alice, age: 30},{name: Bob, age: 25}
])# 核心SQL构造
from sqlalchemy import select, table, column
users table(users, column(id), column(name), column(age)
)
stmt users.insert().values([{name: Charlie, age: 35},{name: David, age: 40}
])
connection.execute(stmt)4. 异步引擎与greenlet整合
使用asyncmy驱动实现MySQL异步操作
from sqlalchemy.ext.asyncio import create_async_engine
import asyncioasync_engine create_async_engine(mysqlasyncmy://user:passlocalhost/db,pool_size5, max_overflow10
)async def fetch_users():async with async_engine.connect() as conn:result await conn.execute(SELECT * FROM users)return result.fetchall()asyncio.run(fetch_users())实战案例
1. 电商系统的分表分库实现
使用ShardedSession实现用户表水平分片
from sqlalchemy.ext.horizontal_shard import ShardedSessionshards {shard1: create_engine(sqlite:///./shard1.db),shard2: create_engine(sqlite:///./shard2.db)
}def shard_chooser(mapper, instance, clauseNone):if instance:return shard%d % (instance.id % 2 1)else:return shard1session ShardedSession(shardsshards,shard_choosershard_chooser,id_chooserlambda *args: list(shards.keys())
)# 自动路由到对应分片
user User(id101, nameAlice)
session.add(user) # 自动路由到shard22. 版本控制与审计日志实现
通过事件监听实现变更追踪
from sqlalchemy import inspectaudit_logs []event.listens_for(Session, before_flush)
def track_changes(session, context, instances):for obj in session.dirty:state inspect(obj)for attr in state.attrs:hist attr.load_history()if hist.has_changes():audit_logs.append({object: obj,attribute: attr.key,old: hist.deleted,new: hist.added})扩展思考
1. 多租户架构的数据库隔离方案
三种常见实现方式
# 方案1schema隔离
class TenantSpecificQuery(Query):def get(self, ident):return super().filter_by(tenant_idcurrent_tenant.id).get(ident)# 方案2行级隔离
class BaseModel(Base):__abstract__ Truetenant_id Column(Integer, nullableFalse)# 方案3动态schema切换
event.listens_for(Pool, connect)
def connect(dbapi_con, record):dbapi_con.execute(fSET search_path TO tenant_{current_tenant.id})2. SQL注入防御的ORM层实践
安全操作示例
# 错误方式存在注入风险
query User.query.filter(fusername {input_username})# 正确方式
query User.query.filter(User.username input_username)# 动态查询安全构造
from sqlalchemy import text
stmt text(SELECT * FROM users WHERE username :username)
session.execute(stmt, {username: input_username})完整代码示例
包含分表分库、审计日志、异步操作的完整电商系统示例
# 安装依赖
# pip install sqlalchemy aiosqlitefrom sqlalchemy import Column, Integer, String, Float, select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import asyncio# 定义模型
Base declarative_base()class Product(Base):__tablename__ productsid Column(Integer, primary_keyTrue)name Column(String(50))price Column(Float)shard_id Column(Integer, nullableFalse)# 创建异步引擎
engines {shard1: create_async_engine(sqliteaiosqlite:///./shard1.db, echoTrue),shard2: create_async_engine(sqliteaiosqlite:///./shard2.db, echoTrue)
}# 创建异步会话工厂
async_sessions {shard_id: sessionmaker(bindengine,class_AsyncSession,expire_on_commitFalse)for shard_id, engine in engines.items()
}# 根据 shard_id 选择分片
def get_shard_id(shard_id):return fshard{shard_id % 2 1}# 异步操作示例
async def main():# 创建表for engine in engines.values():async with engine.begin() as conn:await conn.run_sync(Base.metadata.create_all)# 插入数据product Product(nameLaptop, price999.99, shard_id101)shard_id get_shard_id(product.shard_id)async with async_sessions[shard_id]() as session:session.add(product)await session.commit()print(f产品已添加到 {shard_id})# 插入数据product Product(name手机, price10000, shard_id102)shard_id get_shard_id(product.shard_id)async with async_sessions[shard_id]() as session:session.add(product)await session.commit()print(f产品已添加到 {shard_id})# 查询所有分片的数据all_products []for shard_id, session_factory in async_sessions.items():async with session_factory() as session:result await session.execute(select(Product))products result.scalars().all()all_products.extend(products)print(f从 {shard_id} 查询到 {len(products)} 个产品)# 显示所有产品print(\n所有产品:)for product in all_products:print(f产品: {product.name}, 价格: {product.price}, 分片ID: {product.shard_id})# 运行异步主函数
if __name__ __main__:asyncio.run(main())输出:
d:\python_projects\jupyter_demo\sqlalchemy_demo.py:11: MovedIn20Warning: The declarative_base() function is now available as sqlalchemy.orm.declarative_base(). (deprecated since: 2.0) (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)Base declarative_base()
2025-03-09 19:50:49,625 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,626 INFO sqlalchemy.engine.Engine PRAGMA main.table_info(products)
2025-03-09 19:50:49,626 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-03-09 19:50:49,627 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info(products)
2025-03-09 19:50:49,627 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-03-09 19:50:49,628 INFO sqlalchemy.engine.Engine
CREATE TABLE products (id INTEGER NOT NULL,name VARCHAR(50),price FLOAT,shard_id INTEGER NOT NULL,PRIMARY KEY (id)
)
2025-03-09 19:50:49,629 INFO sqlalchemy.engine.Engine [no key 0.00072s] ()
2025-03-09 19:50:49,636 INFO sqlalchemy.engine.Engine COMMIT
2025-03-09 19:50:49,638 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,639 INFO sqlalchemy.engine.Engine PRAGMA main.table_info(products)
2025-03-09 19:50:49,639 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-03-09 19:50:49,640 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info(products)
2025-03-09 19:50:49,640 INFO sqlalchemy.engine.Engine [raw sql] ()
2025-03-09 19:50:49,641 INFO sqlalchemy.engine.Engine
CREATE TABLE products (id INTEGER NOT NULL,name VARCHAR(50),price FLOAT,shard_id INTEGER NOT NULL,PRIMARY KEY (id)
)
2025-03-09 19:50:49,642 INFO sqlalchemy.engine.Engine [no key 0.00067s] ()
2025-03-09 19:50:49,647 INFO sqlalchemy.engine.Engine COMMIT
2025-03-09 19:50:49,649 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,650 INFO sqlalchemy.engine.Engine INSERT INTO products (name, price, shard_id) VALUES (?, ?, ?)
2025-03-09 19:50:49,650 INFO sqlalchemy.engine.Engine [generated in 0.00035s] (Laptop, 999.99, 101)
2025-03-09 19:50:49,652 INFO sqlalchemy.engine.Engine COMMIT
产品已添加到 shard2
2025-03-09 19:50:49,657 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,658 INFO sqlalchemy.engine.Engine INSERT INTO products (name, price, shard_id) VALUES (?, ?, ?)
2025-03-09 19:50:49,659 INFO sqlalchemy.engine.Engine [generated in 0.00077s] (手机, 10000.0, 102)
2025-03-09 19:50:49,661 INFO sqlalchemy.engine.Engine COMMIT
产品已添加到 shard1
2025-03-09 19:50:49,665 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,666 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
FROM products
2025-03-09 19:50:49,665 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,666 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
FROM products
2025-03-09 19:50:49,666 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
FROM products
ducts.price, products.shard_id
FROM products
2025-03-09 19:50:49,667 INFO sqlalchemy.engine.Engine [generated in 0.00054s] ()
从 shard1 查询到 1 个产品
2025-03-09 19:50:49,668 INFO sqlalchemy.engine.Engine ROLLBACK
FROM products
2025-03-09 19:50:49,667 INFO sqlalchemy.engine.Engine [generated in 0.00054s] ()
从 shard1 查询到 1 个产品
2025-03-09 19:50:49,668 INFO sqlalchemy.engine.Engine ROLLBACK
2025-03-09 19:50:49,667 INFO sqlalchemy.engine.Engine [generated in 0.00054s] ()
从 shard1 查询到 1 个产品
2025-03-09 19:50:49,668 INFO sqlalchemy.engine.Engine ROLLBACK
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, pro从 shard1 查询到 1 个产品
2025-03-09 19:50:49,668 INFO sqlalchemy.engine.Engine ROLLBACK
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
2025-03-09 19:50:49,669 INFO sqlalchemy.engine.Engine SELECT products.id, products.name, products.price, products.shard_id
FROM products
2025-03-09 19:50:49,670 INFO sqlalchemy.engine.Engine [generated in 0.00029s] ()
从 shard2 查询到 1 个产品
ducts.price, products.shard_id
FROM products
2025-03-09 19:50:49,670 INFO sqlalchemy.engine.Engine [generated in 0.00029s] ()
从 shard2 查询到 1 个产品
2025-03-09 19:50:49,671 INFO sqlalchemy.engine.Engine ROLLBACK
FROM products
2025-03-09 19:50:49,670 INFO sqlalchemy.engine.Engine [generated in 0.00029s] ()
从 shard2 查询到 1 个产品
2025-03-09 19:50:49,671 INFO sqlalchemy.engine.Engine ROLLBACK
从 shard2 查询到 1 个产品
2025-03-09 19:50:49,671 INFO sqlalchemy.engine.Engine ROLLBACK
2025-03-09 19:50:49,671 INFO sqlalchemy.engine.Engine ROLLBACK
所有产品:
产品: 手机, 价格: 10000.0, 分片ID: 102
所有产品:
产品: 手机, 价格: 10000.0, 分片ID: 102
产品: Laptop, 价格: 999.99, 分片ID: 101
产品: 手机, 价格: 10000.0, 分片ID: 102
产品: Laptop, 价格: 999.99, 分片ID: 101
产品: Laptop, 价格: 999.99, 分片ID: 101通过本章学习您应该能够
熟练使用SQLAlchemy事件系统管理会话生命周期实现复杂的继承映射策略处理大规模数据操作的性能优化构建高并发的异步数据库应用设计企业级的数据库架构方案