tianfeng_task_modules/migrations/versions/81b754397c49_增加字段.py

119 lines
3.2 KiB
Python
Raw Permalink Normal View History

2025-03-17 14:58:05 +08:00
"""增加字段
Revision ID: 81b754397c49
Revises: 001
Create Date: 2025-03-14 18:01:04.421172
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy import text
from sqlalchemy.engine import reflection
import logging
from sqlalchemy.dialects import mysql
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('alembic.migration')
# revision identifiers, used by Alembic.
revision = '81b754397c49'
down_revision = '001'
branch_labels = None
depends_on = None
def safe_execute(func, *args, **kwargs):
"""
安全执行函数捕获异常并记录日志
Args:
func: 要执行的函数
*args: 位置参数
**kwargs: 关键字参数
Returns:
执行结果或 None如果发生异常
"""
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f"执行 {func.__name__} 时发生错误: {e}")
return None
def column_exists(table_name, column_name):
"""
检查列是否存在
Args:
table_name: 表名
column_name: 列名
Returns:
bool: 列是否存在
"""
conn = op.get_bind()
inspector = reflection.Inspector.from_engine(conn)
columns = inspector.get_columns(table_name)
return column_name in [column['name'] for column in columns]
def index_exists(table_name, index_name):
"""
检查索引是否存在
Args:
table_name: 表名
index_name: 索引名
Returns:
bool: 索引是否存在
"""
conn = op.get_bind()
inspector = reflection.Inspector.from_engine(conn)
indexes = inspector.get_indexes(table_name)
return index_name in [index['name'] for index in indexes]
def table_exists(table_name):
"""
检查表是否存在
Args:
table_name: 表名
Returns:
bool: 表是否存在
"""
conn = op.get_bind()
inspector = reflection.Inspector.from_engine(conn)
return table_name in inspector.get_table_names()
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('tasks', schema=None) as batch_op:
batch_op.add_column(sa.Column('task_id', sa.String(length=36), nullable=False, comment='任务UUID用于外部引用'))
batch_op.alter_column('task_type',
existing_type=mysql.ENUM('NORMAL', 'SCHEDULED', collation='utf8mb4_unicode_ci'),
type_=sa.String(length=100),
existing_comment='任务类型',
existing_nullable=False)
batch_op.create_index(batch_op.f('ix_tasks_task_id'), ['task_id'], unique=True)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('tasks', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_tasks_task_id'))
batch_op.alter_column('task_type',
existing_type=sa.String(length=100),
type_=mysql.ENUM('NORMAL', 'SCHEDULED', collation='utf8mb4_unicode_ci'),
existing_comment='任务类型',
existing_nullable=False)
batch_op.drop_column('task_id')
# ### end Alembic commands ###