#!/usr/bin/env python # -*- coding: utf-8 -*- """ Alembic 环境配置 用于设置数据库连接和模型导入 """ from logging.config import fileConfig import os import sys import logging from sqlalchemy import engine_from_config from sqlalchemy import pool from alembic import context # 配置日志 logger = logging.getLogger('alembic.env') # 将项目根目录添加到 Python 路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # 导入配置和模型 from config.database import DATABASE_URL, Base from data.models.task import Task # 在这里导入其他模型 # from data.models.other_model import OtherModel # 读取配置 config = context.config # 设置数据库连接 URL config.set_main_option("sqlalchemy.url", DATABASE_URL) # 解析 alembic.ini 中的日志配置 if config.config_file_name is not None: fileConfig(config.config_file_name) # 设置 MetaData 对象,用于生成迁移脚本 target_metadata = Base.metadata # 检查是否指定了特定表 specified_tables = os.environ.get('ALEMBIC_TABLES') if specified_tables: specified_tables = [table.strip() for table in specified_tables.split(',')] print(f"只迁移指定的表: {', '.join(specified_tables)}") def include_object(object, name, type_, reflected, compare_to): """ 决定是否包含对象在迁移中 Args: object: 数据库对象 name: 对象名称 type_: 对象类型 ('table', 'column', 等) reflected: 是否是反射的对象 compare_to: 比较对象 Returns: bool: 是否包含该对象 """ # 如果指定了表,且当前对象是表,检查是否在指定表列表中 if specified_tables and type_ == 'table': include = name in specified_tables if not include: print(f"跳过表 {name},因为它不在指定的表列表中") return include # 如果指定了表,且当前对象是列,检查其所属表是否在指定表列表中 if specified_tables and type_ == 'column': # object.table.name 获取列所属的表名 include = object.table.name in specified_tables if not include: print(f"跳过列 {object.table.name}.{name},因为它所属的表不在指定的表列表中") return include # 默认包含所有对象 return True def run_migrations_offline(): """ 在离线模式下运行迁移 这个函数不会实际连接数据库,而是生成 SQL 脚本 """ url = config.get_main_option("sqlalchemy.url") context.configure( url=url, target_metadata=target_metadata, literal_binds=True, dialect_opts={"paramstyle": "named"}, include_object=include_object, ) with context.begin_transaction(): context.run_migrations() def run_migrations_online(): """ 在在线模式下运行迁移 这个函数会连接数据库并执行迁移 """ connectable = engine_from_config( config.get_section(config.config_ini_section), prefix="sqlalchemy.", poolclass=pool.NullPool, ) with connectable.connect() as connection: context.configure( connection=connection, target_metadata=target_metadata, # 比较类型,如果类型变更则生成迁移脚本 compare_type=True, # 比较默认值,如果默认值变更则生成迁移脚本 compare_server_default=True, # 包含注释变更 include_object=include_object, # 包含表名前缀 include_schemas=True, # 生成可读性更好的迁移脚本 render_as_batch=True, ) with context.begin_transaction(): context.run_migrations() if context.is_offline_mode(): run_migrations_offline() else: run_migrations_online()