136 lines
3.9 KiB
Python
136 lines
3.9 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
Alembic 环境配置
|
|
用于设置数据库连接和模型导入
|
|
"""
|
|
|
|
from logging.config import fileConfig
|
|
import os
|
|
import sys
|
|
import logging
|
|
|
|
from sqlalchemy import engine_from_config
|
|
from sqlalchemy import pool
|
|
|
|
from alembic import context
|
|
|
|
# 配置日志
|
|
logger = logging.getLogger('alembic.env')
|
|
|
|
# 将项目根目录添加到 Python 路径
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
# 导入配置和模型
|
|
from config.database import DATABASE_URL, Base
|
|
from data.models.task import Task
|
|
# 在这里导入其他模型
|
|
# from data.models.other_model import OtherModel
|
|
|
|
# 读取配置
|
|
config = context.config
|
|
|
|
# 设置数据库连接 URL
|
|
config.set_main_option("sqlalchemy.url", DATABASE_URL)
|
|
|
|
# 解析 alembic.ini 中的日志配置
|
|
if config.config_file_name is not None:
|
|
fileConfig(config.config_file_name)
|
|
|
|
# 设置 MetaData 对象,用于生成迁移脚本
|
|
target_metadata = Base.metadata
|
|
|
|
# 检查是否指定了特定表
|
|
specified_tables = os.environ.get('ALEMBIC_TABLES')
|
|
if specified_tables:
|
|
specified_tables = [table.strip() for table in specified_tables.split(',')]
|
|
print(f"只迁移指定的表: {', '.join(specified_tables)}")
|
|
|
|
def include_object(object, name, type_, reflected, compare_to):
|
|
"""
|
|
决定是否包含对象在迁移中
|
|
|
|
Args:
|
|
object: 数据库对象
|
|
name: 对象名称
|
|
type_: 对象类型 ('table', 'column', 等)
|
|
reflected: 是否是反射的对象
|
|
compare_to: 比较对象
|
|
|
|
Returns:
|
|
bool: 是否包含该对象
|
|
"""
|
|
# 如果指定了表,且当前对象是表,检查是否在指定表列表中
|
|
if specified_tables and type_ == 'table':
|
|
include = name in specified_tables
|
|
if not include:
|
|
print(f"跳过表 {name},因为它不在指定的表列表中")
|
|
return include
|
|
|
|
# 如果指定了表,且当前对象是列,检查其所属表是否在指定表列表中
|
|
if specified_tables and type_ == 'column':
|
|
# object.table.name 获取列所属的表名
|
|
include = object.table.name in specified_tables
|
|
if not include:
|
|
print(f"跳过列 {object.table.name}.{name},因为它所属的表不在指定的表列表中")
|
|
return include
|
|
|
|
# 默认包含所有对象
|
|
return True
|
|
|
|
def run_migrations_offline():
|
|
"""
|
|
在离线模式下运行迁移
|
|
|
|
这个函数不会实际连接数据库,而是生成 SQL 脚本
|
|
"""
|
|
url = config.get_main_option("sqlalchemy.url")
|
|
context.configure(
|
|
url=url,
|
|
target_metadata=target_metadata,
|
|
literal_binds=True,
|
|
dialect_opts={"paramstyle": "named"},
|
|
include_object=include_object,
|
|
)
|
|
|
|
with context.begin_transaction():
|
|
context.run_migrations()
|
|
|
|
|
|
def run_migrations_online():
|
|
"""
|
|
在在线模式下运行迁移
|
|
|
|
这个函数会连接数据库并执行迁移
|
|
"""
|
|
connectable = engine_from_config(
|
|
config.get_section(config.config_ini_section),
|
|
prefix="sqlalchemy.",
|
|
poolclass=pool.NullPool,
|
|
)
|
|
|
|
with connectable.connect() as connection:
|
|
context.configure(
|
|
connection=connection,
|
|
target_metadata=target_metadata,
|
|
# 比较类型,如果类型变更则生成迁移脚本
|
|
compare_type=True,
|
|
# 比较默认值,如果默认值变更则生成迁移脚本
|
|
compare_server_default=True,
|
|
# 包含注释变更
|
|
include_object=include_object,
|
|
# 包含表名前缀
|
|
include_schemas=True,
|
|
# 生成可读性更好的迁移脚本
|
|
render_as_batch=True,
|
|
)
|
|
|
|
with context.begin_transaction():
|
|
context.run_migrations()
|
|
|
|
|
|
if context.is_offline_mode():
|
|
run_migrations_offline()
|
|
else:
|
|
run_migrations_online() |