2025-04-30 16:57:46 +08:00

136 lines
3.9 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Alembic 环境配置
用于设置数据库连接和模型导入
"""
from logging.config import fileConfig
import os
import sys
import logging
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# 配置日志
logger = logging.getLogger('alembic.env')
# 将项目根目录添加到 Python 路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 导入配置和模型
from config.database import DATABASE_URL, Base
from data.models.task import Task
# 在这里导入其他模型
# from data.models.other_model import OtherModel
# 读取配置
config = context.config
# 设置数据库连接 URL
config.set_main_option("sqlalchemy.url", DATABASE_URL)
# 解析 alembic.ini 中的日志配置
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# 设置 MetaData 对象,用于生成迁移脚本
target_metadata = Base.metadata
# 检查是否指定了特定表
specified_tables = os.environ.get('ALEMBIC_TABLES')
if specified_tables:
specified_tables = [table.strip() for table in specified_tables.split(',')]
print(f"只迁移指定的表: {', '.join(specified_tables)}")
def include_object(object, name, type_, reflected, compare_to):
"""
决定是否包含对象在迁移中
Args:
object: 数据库对象
name: 对象名称
type_: 对象类型 ('table', 'column', 等)
reflected: 是否是反射的对象
compare_to: 比较对象
Returns:
bool: 是否包含该对象
"""
# 如果指定了表,且当前对象是表,检查是否在指定表列表中
if specified_tables and type_ == 'table':
include = name in specified_tables
if not include:
print(f"跳过表 {name},因为它不在指定的表列表中")
return include
# 如果指定了表,且当前对象是列,检查其所属表是否在指定表列表中
if specified_tables and type_ == 'column':
# object.table.name 获取列所属的表名
include = object.table.name in specified_tables
if not include:
print(f"跳过列 {object.table.name}.{name},因为它所属的表不在指定的表列表中")
return include
# 默认包含所有对象
return True
def run_migrations_offline():
"""
在离线模式下运行迁移
这个函数不会实际连接数据库,而是生成 SQL 脚本
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
include_object=include_object,
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""
在在线模式下运行迁移
这个函数会连接数据库并执行迁移
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
# 比较类型,如果类型变更则生成迁移脚本
compare_type=True,
# 比较默认值,如果默认值变更则生成迁移脚本
compare_server_default=True,
# 包含注释变更
include_object=include_object,
# 包含表名前缀
include_schemas=True,
# 生成可读性更好的迁移脚本
render_as_batch=True,
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()