VWED_server/services/map_data_service.py

1113 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
地图数据推送服务
处理地图数据推送时的动作点和库区数据存储
"""
import uuid
import datetime
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from sqlalchemy import and_
from data.models import StorageArea, OperatePoint, OperatePointLayer, StorageAreaType
from routes.model.map_model import (
MapDataPushRequest, MapDataPushResponse, MapDataQueryRequest,
MapDataQueryResponse, StorageAreaData, OperatePointData,
OperatePointLayerData, StorageAreaTypeEnum
)
from utils.logger import get_logger
from config.settings import settings
logger = get_logger("services.map_data_service")
class MapDataService:
"""地图数据推送服务"""
@staticmethod
def push_map_data(db: Session, request: MapDataPushRequest) -> MapDataPushResponse:
"""
推送地图数据 - 状态迁移模式
该方法会智能处理场景更新:
1. 检测场景是否已存在
2. 如果存在:
- 将老场景与新场景进行比对
- 对于相同的库区/站点/库位,保留其业务状态(占用、货物、锁定等)
- 删除新场景中不存在的老数据
- 新增的数据使用初始状态
3. 如果不存在:全新推送所有数据
Args:
db: 数据库会话
request: 地图数据推送请求
Returns:
MapDataPushResponse: 推送结果
"""
try:
# 初始化计数器
stats = {
'storage_areas_created': 0,
'storage_areas_updated': 0,
'storage_areas_deleted': 0,
'operate_points_created': 0,
'operate_points_updated': 0,
'operate_points_deleted': 0,
'layers_created': 0,
'layers_updated': 0,
'layers_deleted': 0,
}
logger.info(f"开始推送地图数据: 场景ID={request.scene_id}")
# 检查场景是否已存在
existing_storage_areas = db.query(StorageArea).filter(
and_(
StorageArea.scene_id == request.scene_id,
StorageArea.is_deleted == False
)
).all()
scene_exists = len(existing_storage_areas) > 0
if scene_exists:
logger.info(f"场景已存在,进入状态迁移模式")
MapDataService._migrate_scene_data(db, request, stats)
else:
logger.info(f"场景不存在,进入全新推送模式")
MapDataService._create_new_scene_data(db, request, stats)
# 提交事务
db.commit()
logger.info(f"地图数据推送成功: 场景ID={request.scene_id}, "
f"库区(新增={stats['storage_areas_created']},更新={stats['storage_areas_updated']},删除={stats['storage_areas_deleted']}), "
f"动作点(新增={stats['operate_points_created']},更新={stats['operate_points_updated']},删除={stats['operate_points_deleted']}), "
f"分层(新增={stats['layers_created']},更新={stats['layers_updated']},删除={stats['layers_deleted']})")
# 生成响应消息
mode = "状态迁移" if scene_exists else "全新推送"
result_message = f"{mode}成功。"
result_message += f"库区:新增{stats['storage_areas_created']}个,更新{stats['storage_areas_updated']}个,删除{stats['storage_areas_deleted']}个;"
result_message += f"动作点:新增{stats['operate_points_created']}个,更新{stats['operate_points_updated']}个,删除{stats['operate_points_deleted']}个;"
result_message += f"分层:新增{stats['layers_created']}个,更新{stats['layers_updated']}个,删除{stats['layers_deleted']}"
return MapDataPushResponse(
scene_id=request.scene_id,
storage_areas_count=stats['storage_areas_created'] + stats['storage_areas_updated'],
operate_points_count=stats['operate_points_created'] + stats['operate_points_updated'],
layers_count=stats['layers_created'] + stats['layers_updated'],
message=result_message
)
except Exception as e:
db.rollback()
logger.error(f"地图数据推送失败: {str(e)}")
raise
@staticmethod
def _migrate_scene_data(db: Session, request: MapDataPushRequest, stats: Dict[str, int]):
"""
状态迁移模式 - 智能处理场景更新
核心逻辑:
1. 构建新场景的数据映射(库区名、站点名、库位名)
2. 获取老场景的所有数据
3. 对于交集部分:保留业务状态,更新配置信息
4. 删除老场景中不在新场景的数据
5. 新增新场景中新增的数据
Args:
db: 数据库会话
request: 地图推送请求
stats: 统计数据字典
"""
scene_id = request.scene_id
# ===== 第一步:构建新场景的数据映射 =====
new_area_names = {area.area_name for area in request.storage_areas}
new_station_names = {point.station_name for point in request.operate_points}
# 构建 站点名->库位名列表 的映射
new_station_layers_map = {}
for point in request.operate_points:
if point.layers:
new_station_layers_map[point.station_name] = {
layer.layer_name for layer in point.layers if layer.layer_name
}
logger.info(f"新场景包含: 库区{len(new_area_names)}个, 站点{len(new_station_names)}")
# ===== 第二步:获取老场景的所有数据 =====
old_storage_areas = db.query(StorageArea).filter(
and_(StorageArea.scene_id == scene_id, StorageArea.is_deleted == False)
).all()
old_operate_points = db.query(OperatePoint).filter(
and_(OperatePoint.scene_id == scene_id, OperatePoint.is_deleted == False)
).all()
# 构建老数据映射
old_area_map = {area.area_name: area for area in old_storage_areas}
old_point_map = {point.station_name: point for point in old_operate_points}
logger.info(f"老场景包含: 库区{len(old_area_map)}个, 站点{len(old_point_map)}")
# ===== 第三步:处理库区数据 =====
logger.info("开始处理库区迁移...")
for area_data in request.storage_areas:
old_area = old_area_map.get(area_data.area_name)
if old_area:
# 交集:更新库区配置,保留业务状态
MapDataService._migrate_storage_area(
db, old_area, area_data, request.operate_points
)
stats['storage_areas_updated'] += 1
else:
# 新增:创建新库区
MapDataService._create_storage_area(
db, area_data, scene_id, request.operate_points
)
stats['storage_areas_created'] += 1
# 删除老场景中不在新场景的库区
for area_name, old_area in old_area_map.items():
if area_name not in new_area_names:
db.delete(old_area)
stats['storage_areas_deleted'] += 1
logger.info(f"删除库区: {area_name}")
# 刷新库区操作到数据库,确保新库区可以被动作点查询到
db.flush()
logger.info("库区处理完成,已刷新到数据库")
# ===== 第四步:处理动作点和库位层数据 =====
logger.info("开始处理动作点和库位层迁移...")
# 检查并过滤重复的站点名称
seen_station_names = set()
valid_operate_points = []
for point_data in request.operate_points:
if point_data.station_name in seen_station_names:
logger.warning(f"发现重复的站点名称,跳过: {point_data.station_name}")
continue
seen_station_names.add(point_data.station_name)
valid_operate_points.append(point_data)
for point_data in valid_operate_points:
old_point = old_point_map.get(point_data.station_name)
if old_point:
# 交集:更新动作点配置,保留业务状态,迁移库位层状态
MapDataService._migrate_operate_point(
db, old_point, point_data, scene_id, stats
)
stats['operate_points_updated'] += 1
else:
# 新增:创建新动作点和库位层
new_point = MapDataService._create_operate_point(
db, point_data, scene_id
)
# 创建库位层
if point_data.layers:
for index, layer_data in enumerate(point_data.layers, 1):
MapDataService._create_layer(
db, new_point, layer_data, index
)
stats['layers_created'] += 1
stats['operate_points_created'] += 1
# 删除老场景中不在新场景的动作点(级联删除库位层)
for station_name, old_point in old_point_map.items():
if station_name not in new_station_names:
# 统计被删除的库位层数量
layer_count = db.query(OperatePointLayer).filter(
and_(
OperatePointLayer.operate_point_id == old_point.id,
OperatePointLayer.is_deleted == False
)
).count()
db.delete(old_point) # 级联删除相关的库位层
stats['operate_points_deleted'] += 1
stats['layers_deleted'] += layer_count
logger.info(f"删除动作点及其{layer_count}个库位层: {station_name}")
@staticmethod
def _create_new_scene_data(db: Session, request: MapDataPushRequest, stats: Dict[str, int]):
"""
全新推送模式 - 创建全新的场景数据
Args:
db: 数据库会话
request: 地图推送请求
stats: 统计数据字典
"""
scene_id = request.scene_id
# 创建所有库区
for area_data in request.storage_areas:
MapDataService._create_storage_area(
db, area_data, scene_id, request.operate_points
)
stats['storage_areas_created'] += 1
# 刷新库区操作到数据库,确保新库区可以被动作点查询到
db.flush()
logger.info("库区创建完成,已刷新到数据库")
# 检查并过滤重复的站点名称
seen_station_names = set()
valid_operate_points = []
for point_data in request.operate_points:
if point_data.station_name in seen_station_names:
logger.warning(f"发现重复的站点名称,跳过: {point_data.station_name}")
continue
seen_station_names.add(point_data.station_name)
valid_operate_points.append(point_data)
# 创建所有动作点和库位层
for point_data in valid_operate_points:
new_point = MapDataService._create_operate_point(db, point_data, scene_id)
stats['operate_points_created'] += 1
# 创建库位层
if point_data.layers:
for index, layer_data in enumerate(point_data.layers, 1):
MapDataService._create_layer(db, new_point, layer_data, index)
stats['layers_created'] += 1
@staticmethod
def _upsert_storage_area(db: Session, area_data: StorageAreaData, scene_id: str,
operate_points_data: List[OperatePointData]) -> bool:
"""
增量更新库区数据
Args:
db: 数据库会话
area_data: 库区数据
scene_id: 场景ID
operate_points_data: 动作点数据列表
Returns:
bool: True表示新增False表示更新
"""
try:
# 查找现有库区基于area_name和scene_id
existing_area = db.query(StorageArea).filter(
and_(
StorageArea.area_name == area_data.area_name,
StorageArea.scene_id == scene_id,
StorageArea.is_deleted == False
)
).first()
# 筛选属于该库区的动作点
area_points = [point for point in operate_points_data if point.area_name == area_data.area_name]
# 系统自动计算容量
max_capacity = MapDataService._calculate_storage_area_capacity(
area_data.area_type.value, area_points
)
if existing_area:
# 更新现有库区
existing_area.area_type = StorageAreaType(area_data.area_type)
existing_area.max_capacity = max_capacity
existing_area.description = area_data.description
existing_area.tags = area_data.tags
existing_area.select_logic = area_data.select_logic
existing_area.updated_at = datetime.datetime.now()
logger.info(f"更新库区: {area_data.area_name}")
return False
else:
# 创建新库区
new_area = StorageArea(
id=str(uuid.uuid4()),
area_name=area_data.area_name,
area_type=StorageAreaType(area_data.area_type),
scene_id=scene_id,
max_capacity=max_capacity,
description=area_data.description,
tags=area_data.tags,
select_logic=area_data.select_logic
)
db.add(new_area)
logger.info(f"创建新库区: {area_data.area_name}")
return True
except Exception as e:
logger.error(f"更新库区失败 - {area_data.area_name}: {str(e)}")
raise ValueError(f"库区数据处理失败: {str(e)}")
@staticmethod
def _upsert_operate_point(db: Session, point_data: OperatePointData, scene_id: str) -> tuple[bool, OperatePoint]:
"""
增量更新动作点数据
支持以下功能:
1. 库位可以不绑定库区area_name为空或None
2. 基于station_name判断是否为已存在的库位
3. 如果station_name存在但库位名称不同则更新库位名称
4. 如果库区名称不同,则更新库区关联
Args:
db: 数据库会话
point_data: 动作点数据
scene_id: 场景ID
Returns:
tuple[bool, OperatePoint]: (是否新增, 动作点对象)
"""
try:
# 查找现有动作点基于station_name和scene_id
existing_point = db.query(OperatePoint).filter(
and_(
OperatePoint.station_name == point_data.station_name,
OperatePoint.scene_id == scene_id,
OperatePoint.is_deleted == False
)
).first()
# 根据库区名称获取库区信息(支持库位不绑定库区的情况)
storage_area = None
if point_data.area_name:
storage_area = db.query(StorageArea).filter(
and_(
StorageArea.area_name == point_data.area_name,
StorageArea.scene_id == scene_id,
StorageArea.is_deleted == False
)
).first()
if existing_point:
# 更新现有动作点
# 支持库区关联的修改
existing_point.storage_area_id = storage_area.id if storage_area else None
existing_point.storage_area_type = storage_area.area_type if storage_area else None
existing_point.area_name = point_data.area_name # 支持库区名称为空的情况
# 更新其他属性
existing_point.max_layers = point_data.max_layers
existing_point.position_x = point_data.position_x
existing_point.position_y = point_data.position_y
existing_point.position_z = point_data.position_z
existing_point.content = point_data.content or ""
existing_point.tags = point_data.tags or ""
existing_point.description = point_data.description
existing_point.updated_at = datetime.datetime.now()
# 记录更新信息
area_info = f"库区: {point_data.area_name}" if point_data.area_name else "未绑定库区"
logger.info(f"更新动作点: {point_data.station_name}, {area_info}")
return False, existing_point
else:
# 创建新动作点
new_point = OperatePoint(
id=str(uuid.uuid4()),
station_name=point_data.station_name,
scene_id=scene_id,
storage_area_id=storage_area.id if storage_area else None,
storage_area_type=storage_area.area_type if storage_area else None,
area_name=point_data.area_name, # 支持为None的情况
max_layers=point_data.max_layers,
position_x=point_data.position_x,
position_y=point_data.position_y,
position_z=point_data.position_z,
content=point_data.content or "",
tags=point_data.tags or "",
description=point_data.description
)
db.add(new_point)
# 记录创建信息
area_info = f"库区: {point_data.area_name}" if point_data.area_name else "未绑定库区"
logger.info(f"创建新动作点: {point_data.station_name}, {area_info}")
return True, new_point
except Exception as e:
logger.error(f"更新动作点失败 - {point_data.station_name}: {str(e)}")
raise ValueError(f"动作点数据处理失败: {str(e)}")
@staticmethod
def _upsert_layers(db: Session, operate_point: OperatePoint,
layers_data: List[OperatePointLayerData]) -> Dict[str, int]:
"""
增量更新分层数据
支持以下功能:
1. 基于动作点ID和层索引确定唯一层
2. 支持库位名称更新(基于层位置)
3. 更新库区关联信息
4. 避免唯一约束冲突
5. 确保操作点库区变更时所有层的库区信息同步更新
Args:
db: 数据库会话
operate_point: 动作点对象
layers_data: 分层数据列表
Returns:
Dict[str, int]: 创建和更新的分层数量统计
"""
try:
created_count = 0
updated_count = 0
# 获取该动作点所有现有的层按layer_index排序
existing_layers = db.query(OperatePointLayer).filter(
and_(
OperatePointLayer.operate_point_id == operate_point.id,
OperatePointLayer.is_deleted == False
)
).order_by(OperatePointLayer.layer_index).all()
# 重要修复:首先更新所有现有层的库区信息,确保库区变更时不会出现重复绑定
# 这解决了库位从一个库区转移到另一个库区时,旧层仍绑定原库区的问题
for existing_layer in existing_layers:
if existing_layer.area_name != operate_point.area_name:
existing_layer.area_name = operate_point.area_name # 同步更新库区名称
existing_layer.station_name = operate_point.station_name # 确保站点名称一致
existing_layer.updated_at = datetime.datetime.now()
logger.debug(f"同步层库区信息: 站点={operate_point.station_name}, 层={existing_layer.layer_index}, 库区={operate_point.area_name}")
# 如果没有提供层数据,但仍需要更新现有层的库区信息,直接返回
if not layers_data:
return {'created': created_count, 'updated': updated_count}
# 创建层索引到层对象的映射
existing_layers_map = {layer.layer_index: layer for layer in existing_layers}
for index, layer_data in enumerate(layers_data, 1):
# 层索引从1开始
layer_index = index
if layer_index in existing_layers_map:
# 更新现有层
existing_layer = existing_layers_map[layer_index]
existing_layer.layer_name = layer_data.layer_name # 支持库位名称更新
existing_layer.area_name = operate_point.area_name # 更新库区名称支持为None
existing_layer.station_name = operate_point.station_name # 确保站点名称一致
existing_layer.max_weight = layer_data.max_weight
existing_layer.max_volume = layer_data.max_volume
existing_layer.layer_height = layer_data.layer_height
existing_layer.description = layer_data.description
existing_layer.tags = layer_data.tags
existing_layer.updated_at = datetime.datetime.now()
# 记录更新信息
area_info = f", 库区: {operate_point.area_name}" if operate_point.area_name else ", 未绑定库区"
logger.debug(f"更新分层: 站点={operate_point.station_name}, 库位={layer_data.layer_name}, 层={layer_index}{area_info}")
updated_count += 1
else:
# 创建新层 - 只有在该位置没有层时才创建
new_layer = OperatePointLayer(
id=str(uuid.uuid4()),
operate_point_id=operate_point.id,
station_name=operate_point.station_name,
area_name=operate_point.area_name, # 支持为None的情况
scene_id=operate_point.scene_id,
layer_index=layer_index,
layer_name=layer_data.layer_name,
max_weight=layer_data.max_weight,
max_volume=layer_data.max_volume,
layer_height=layer_data.layer_height,
description=layer_data.description,
tags=layer_data.tags
)
db.add(new_layer)
# 为新创建的库位层同步扩展属性
try:
MapDataService._sync_extended_properties_to_new_layer(db, new_layer)
logger.debug(f"为新库位层 {new_layer.id} 同步扩展属性成功")
except Exception as e:
logger.error(f"为新库位层 {new_layer.id} 同步扩展属性失败: {str(e)}")
raise ValueError(f"库位层扩展属性同步失败: {str(e)}")
# 记录创建信息
area_info = f", 库区: {operate_point.area_name}" if operate_point.area_name else ", 未绑定库区"
logger.debug(f"创建新分层: 站点={operate_point.station_name}, 库位={layer_data.layer_name}, 层={layer_index}{area_info}")
created_count += 1
return {'created': created_count, 'updated': updated_count}
except Exception as e:
logger.error(f"更新分层数据失败 - 动作点{operate_point.station_name}: {str(e)}")
raise ValueError(f"分层数据处理失败: {str(e)}")
@staticmethod
def query_map_data(db: Session, request: MapDataQueryRequest) -> MapDataQueryResponse:
"""
查询地图数据
Args:
db: 数据库会话
request: 地图数据查询请求
Returns:
MapDataQueryResponse: 查询结果
"""
try:
# 查询库区数据
storage_areas_query = db.query(StorageArea).filter(
and_(
StorageArea.scene_id == request.scene_id,
StorageArea.is_deleted == False
)
)
if request.area_type:
storage_areas_query = storage_areas_query.filter(
StorageArea.area_type == request.area_type
)
storage_areas = storage_areas_query.all()
# 查询动作点数据
operate_points_query = db.query(OperatePoint).filter(
and_(
OperatePoint.scene_id == request.scene_id,
OperatePoint.is_deleted == False
)
)
operate_points = operate_points_query.all()
# 统计数据
total_capacity = sum(area.max_capacity for area in storage_areas)
used_capacity = sum(area.current_usage for area in storage_areas)
dense_areas_count = sum(1 for area in storage_areas if area.area_type == StorageAreaType.DENSE)
general_areas_count = len(storage_areas) - dense_areas_count
# 查询分层数据
total_layers = 0
occupied_layers = 0
if request.include_layers:
for point in operate_points:
layers = db.query(OperatePointLayer).filter(
and_(
OperatePointLayer.operate_point_id == point.id,
OperatePointLayer.is_deleted == False
)
).all()
total_layers += len(layers)
occupied_layers += sum(1 for layer in layers if layer.is_occupied)
# 转换为响应格式
storage_areas_data = []
for area in storage_areas:
area_dict = area.to_dict()
area_dict['area_type'] = area.area_type.value
storage_areas_data.append(area_dict)
operate_points_data = []
for point in operate_points:
point_dict = point.to_dict()
# 添加库区类型信息
if point.storage_area_type:
point_dict['storage_area_type'] = point.storage_area_type.value
# 添加库区名称信息
if point.area_name:
point_dict['area_name'] = point.area_name
if request.include_layers:
# 包含分层数据
layers = db.query(OperatePointLayer).filter(
and_(
OperatePointLayer.operate_point_id == point.id,
OperatePointLayer.is_deleted == False
)
).order_by(OperatePointLayer.layer_index).all()
point_dict['layers'] = [layer.to_dict() for layer in layers]
operate_points_data.append(point_dict)
return MapDataQueryResponse(
scene_id=request.scene_id,
storage_areas=storage_areas_data,
operate_points=operate_points_data,
total_capacity=total_capacity,
used_capacity=used_capacity,
dense_areas_count=dense_areas_count,
general_areas_count=general_areas_count,
total_layers=total_layers,
occupied_layers=occupied_layers
)
except Exception as e:
logger.error(f"查询地图数据失败: {str(e)}")
raise
@staticmethod
def _delete_existing_data(db: Session, scene_id: str):
"""删除现有数据"""
# 先获取需要删除的动作点ID列表
operate_point_ids = db.query(OperatePoint.id).filter(
and_(
OperatePoint.scene_id == scene_id,
OperatePoint.is_deleted == False
)
).all()
operate_point_ids = [point_id[0] for point_id in operate_point_ids]
# 物理删除动作点分层(为了避免主键冲突)
if operate_point_ids:
db.query(OperatePointLayer).filter(
OperatePointLayer.operate_point_id.in_(operate_point_ids)
).delete(synchronize_session=False)
# 物理删除动作点(为了避免主键冲突)
db.query(OperatePoint).filter(
and_(
OperatePoint.scene_id == scene_id,
OperatePoint.is_deleted == False
)
).delete(synchronize_session=False)
# 物理删除库区(为了避免主键冲突)
db.query(StorageArea).filter(
and_(
StorageArea.scene_id == scene_id,
StorageArea.is_deleted == False
)
).delete(synchronize_session=False)
@staticmethod
def _calculate_storage_area_capacity(area_type: str, operate_points_data: List) -> int:
"""
计算库区容量
Args:
area_type: 库区类型
operate_points_data: 属于该库区的动作点数据列表
Returns:
int: 计算出的容量
"""
# 根据库区类型从配置中获取参数
if area_type == "dense":
base_capacity = settings.MAP_DENSE_STORAGE_BASE_CAPACITY
capacity_per_point = settings.MAP_DENSE_STORAGE_CAPACITY_PER_POINT
layer_multiplier = settings.MAP_DENSE_STORAGE_LAYER_MULTIPLIER
else: # general
base_capacity = settings.MAP_GENERAL_STORAGE_BASE_CAPACITY
capacity_per_point = settings.MAP_GENERAL_STORAGE_CAPACITY_PER_POINT
layer_multiplier = settings.MAP_GENERAL_STORAGE_LAYER_MULTIPLIER
# 基础容量
total_capacity = base_capacity
# 根据动作点数量和层数计算额外容量
for point_data in operate_points_data:
point_capacity = capacity_per_point
# 如果有多层,应用层数倍数
if point_data.max_layers > 1:
point_capacity = int(point_capacity * layer_multiplier * point_data.max_layers)
total_capacity += point_capacity
return total_capacity
@staticmethod
def _sync_extended_properties_to_new_layer(db: Session, layer: OperatePointLayer):
"""
将所有已启用的扩展属性同步到新创建的库位层
Args:
db: 数据库会话
layer: 新创建的库位层对象
"""
try:
# 导入扩展属性模型(在方法内导入避免循环导入)
from data.models import ExtendedProperty
import json
import datetime
# 获取所有已启用的扩展属性
extended_properties = db.query(ExtendedProperty).filter(
ExtendedProperty.is_deleted == False,
ExtendedProperty.is_enabled == True
).all()
if not extended_properties:
# 如果没有扩展属性,则不需要处理
return
# 解析现有的config_json
config = {}
if layer.config_json:
try:
config = json.loads(layer.config_json)
except Exception as e:
logger.error(f"解析库位层 {layer.id} 的config_json失败: {str(e)}")
raise ValueError(f"库位层配置数据格式错误: {str(e)}")
# 确保extended_fields字段存在
if 'extended_fields' not in config:
config['extended_fields'] = {}
# 同步所有扩展属性
for prop in extended_properties:
config['extended_fields'][prop.property_name] = {
'value': prop.default_value,
'type': prop.property_type.value,
'is_required': prop.is_required,
'updated_at': datetime.datetime.now().isoformat()
}
# 更新config_json
layer.config_json = json.dumps(config, ensure_ascii=False, indent=2)
logger.debug(f"为库位层 {layer.id} 同步了 {len(extended_properties)} 个扩展属性")
except Exception as e:
logger.error(f"同步扩展属性到库位层失败: {str(e)}")
raise
# ========== 状态迁移相关辅助方法 ==========
@staticmethod
def _migrate_storage_area(db: Session, old_area: StorageArea,
area_data: StorageAreaData,
operate_points_data: List[OperatePointData]):
"""
迁移库区数据 - 更新配置,保留业务状态
保留的状态:
- current_usage: 当前使用量
- is_active: 是否激活
- is_maintenance: 是否维护中
更新的配置:
- area_type: 库区类型
- max_capacity: 最大容量(重新计算)
- description: 描述
- tags: 标签
- select_logic: 选择逻辑
Args:
db: 数据库会话
old_area: 老库区对象
area_data: 新库区数据
operate_points_data: 动作点数据列表
"""
# 筛选属于该库区的动作点
area_points = [point for point in operate_points_data if point.area_name == area_data.area_name]
# 重新计算容量
max_capacity = MapDataService._calculate_storage_area_capacity(
area_data.area_type.value, area_points
)
# 更新配置信息
old_area.area_type = StorageAreaType(area_data.area_type)
old_area.max_capacity = max_capacity
old_area.description = area_data.description
old_area.tags = area_data.tags
old_area.select_logic = area_data.select_logic
old_area.updated_at = datetime.datetime.now()
# 保留业务状态current_usage, is_active, is_maintenance 不变)
logger.info(f"迁移库区: {area_data.area_name}, 保留使用量={old_area.current_usage}")
@staticmethod
def _migrate_operate_point(db: Session, old_point: OperatePoint,
point_data: OperatePointData,
scene_id: str, stats: Dict[str, int]):
"""
迁移动作点数据 - 更新配置,保留业务状态,迁移库位层
保留的状态:
- is_disabled: 是否禁用
更新的配置:
- storage_area_id: 库区ID可能变更
- storage_area_type: 库区类型(可能变更)
- area_name: 库区名称(可能变更)
- max_layers: 最大层数
- position_x/y/z: 位置坐标
- content: 内容
- tags: 标签
- description: 描述
Args:
db: 数据库会话
old_point: 老动作点对象
point_data: 新动作点数据
scene_id: 场景ID
stats: 统计数据字典
"""
# 根据库区名称获取库区信息(支持库位不绑定库区的情况)
storage_area = None
if point_data.area_name:
storage_area = db.query(StorageArea).filter(
and_(
StorageArea.area_name == point_data.area_name,
StorageArea.scene_id == scene_id,
StorageArea.is_deleted == False
)
).first()
# 更新动作点配置
old_point.storage_area_id = storage_area.id if storage_area else None
old_point.storage_area_type = storage_area.area_type if storage_area else None
old_point.area_name = point_data.area_name
old_point.max_layers = point_data.max_layers
old_point.position_x = point_data.position_x
old_point.position_y = point_data.position_y
old_point.position_z = point_data.position_z
old_point.content = point_data.content or ""
old_point.tags = point_data.tags or ""
old_point.description = point_data.description
old_point.updated_at = datetime.datetime.now()
# 保留业务状态is_disabled 不变)
logger.info(f"迁移动作点: {point_data.station_name}")
# 迁移库位层数据
MapDataService._migrate_layers(db, old_point, point_data.layers or [], stats)
@staticmethod
def _migrate_layers(db: Session, operate_point: OperatePoint,
layers_data: List[OperatePointLayerData],
stats: Dict[str, int]):
"""
迁移库位层数据 - 基于库位名称匹配,保留货物状态
匹配规则:基于 layer_name 进行匹配
保留的状态(对于匹配的库位层):
- is_occupied: 是否占用
- goods_content: 货物内容
- goods_weight: 货物重量
- goods_volume: 货物体积
- is_locked: 是否锁定
- is_disabled: 是否禁用
- is_empty_tray: 是否空托盘
- locked_by: 锁定者
- goods_stored_at: 货物存放时间
- goods_retrieved_at: 货物取出时间
- last_access_at: 最后访问时间
- config_json: 配置JSON包含扩展属性
更新的配置:
- layer_index: 层索引(可能变更)
- max_weight: 最大承重
- max_volume: 最大体积
- layer_height: 层高
- description: 描述
- tags: 标签
Args:
db: 数据库会话
operate_point: 动作点对象
layers_data: 新的库位层数据列表
stats: 统计数据字典
"""
# 获取该动作点的所有老库位层
old_layers = db.query(OperatePointLayer).filter(
and_(
OperatePointLayer.operate_point_id == operate_point.id,
OperatePointLayer.is_deleted == False
)
).all()
# 构建老库位层映射layer_name -> layer对象
old_layer_map = {layer.layer_name: layer for layer in old_layers if layer.layer_name}
# 构建新库位名称集合
new_layer_names = {layer.layer_name for layer in layers_data if layer.layer_name}
logger.debug(f"动作点 {operate_point.station_name} 老库位层: {len(old_layer_map)}个, 新库位层: {len(new_layer_names)}")
# ===== 第一步:先删除老库位层中不在新场景的,避免唯一约束冲突 =====
layers_to_delete = []
for layer_name, old_layer in old_layer_map.items():
if layer_name not in new_layer_names:
layers_to_delete.append(old_layer)
logger.debug(f"标记删除库位层: {layer_name}")
# 执行删除并立即刷新到数据库
for old_layer in layers_to_delete:
db.delete(old_layer)
stats['layers_deleted'] += 1
# 刷新删除操作,确保唯一约束释放
if layers_to_delete:
db.flush()
logger.debug(f"已刷新删除操作,释放 {len(layers_to_delete)} 个库位层的唯一约束")
# ===== 第二步:处理新库位层数据(更新或创建)=====
for index, layer_data in enumerate(layers_data, 1):
if not layer_data.layer_name:
continue
old_layer = old_layer_map.get(layer_data.layer_name)
if old_layer:
# 交集:更新配置,保留所有业务状态
old_layer.layer_index = index # 更新层索引
old_layer.area_name = operate_point.area_name # 更新库区名称(跟随动作点)
old_layer.station_name = operate_point.station_name # 更新站点名称
old_layer.max_weight = layer_data.max_weight
old_layer.max_volume = layer_data.max_volume
old_layer.layer_height = layer_data.layer_height
old_layer.description = layer_data.description
old_layer.tags = layer_data.tags
old_layer.updated_at = datetime.datetime.now()
# 保留所有业务状态is_occupied, goods_*, is_locked, locked_by, config_json 等都不变)
stats['layers_updated'] += 1
logger.debug(f"迁移库位层: {layer_data.layer_name}, 占用状态={old_layer.is_occupied}, 货物={old_layer.goods_content}")
else:
# 新增:创建新库位层
MapDataService._create_layer(db, operate_point, layer_data, index)
stats['layers_created'] += 1
@staticmethod
def _create_storage_area(db: Session, area_data: StorageAreaData,
scene_id: str,
operate_points_data: List[OperatePointData]) -> StorageArea:
"""
创建新库区
Args:
db: 数据库会话
area_data: 库区数据
scene_id: 场景ID
operate_points_data: 动作点数据列表
Returns:
StorageArea: 创建的库区对象
"""
# 筛选属于该库区的动作点
area_points = [point for point in operate_points_data if point.area_name == area_data.area_name]
# 计算容量
max_capacity = MapDataService._calculate_storage_area_capacity(
area_data.area_type.value, area_points
)
new_area = StorageArea(
id=str(uuid.uuid4()),
area_name=area_data.area_name,
area_type=StorageAreaType(area_data.area_type),
scene_id=scene_id,
max_capacity=max_capacity,
description=area_data.description,
tags=area_data.tags,
select_logic=area_data.select_logic
)
db.add(new_area)
logger.info(f"创建新库区: {area_data.area_name}")
return new_area
@staticmethod
def _create_operate_point(db: Session, point_data: OperatePointData,
scene_id: str) -> OperatePoint:
"""
创建新动作点
Args:
db: 数据库会话
point_data: 动作点数据
scene_id: 场景ID
Returns:
OperatePoint: 创建的动作点对象
"""
# 根据库区名称获取库区信息(支持库位不绑定库区的情况)
storage_area = None
if point_data.area_name:
storage_area = db.query(StorageArea).filter(
and_(
StorageArea.area_name == point_data.area_name,
StorageArea.scene_id == scene_id,
StorageArea.is_deleted == False
)
).first()
new_point = OperatePoint(
id=str(uuid.uuid4()),
station_name=point_data.station_name,
scene_id=scene_id,
storage_area_id=storage_area.id if storage_area else None,
storage_area_type=storage_area.area_type if storage_area else None,
area_name=point_data.area_name,
max_layers=point_data.max_layers,
position_x=point_data.position_x,
position_y=point_data.position_y,
position_z=point_data.position_z,
content=point_data.content or "",
tags=point_data.tags or "",
description=point_data.description
)
db.add(new_point)
area_info = f"库区: {point_data.area_name}" if point_data.area_name else "未绑定库区"
logger.info(f"创建新动作点: {point_data.station_name}, {area_info}")
return new_point
@staticmethod
def _create_layer(db: Session, operate_point: OperatePoint,
layer_data: OperatePointLayerData, layer_index: int) -> OperatePointLayer:
"""
创建新库位层
Args:
db: 数据库会话
operate_point: 动作点对象
layer_data: 库位层数据
layer_index: 层索引
Returns:
OperatePointLayer: 创建的库位层对象
"""
new_layer = OperatePointLayer(
id=str(uuid.uuid4()),
operate_point_id=operate_point.id,
station_name=operate_point.station_name,
area_name=operate_point.area_name,
scene_id=operate_point.scene_id,
layer_index=layer_index,
layer_name=layer_data.layer_name,
max_weight=layer_data.max_weight,
max_volume=layer_data.max_volume,
layer_height=layer_data.layer_height,
description=layer_data.description,
tags=layer_data.tags
)
db.add(new_layer)
# 为新创建的库位层同步扩展属性
try:
MapDataService._sync_extended_properties_to_new_layer(db, new_layer)
logger.debug(f"为新库位层 {new_layer.layer_name} 同步扩展属性成功")
except Exception as e:
logger.error(f"为新库位层 {new_layer.layer_name} 同步扩展属性失败: {str(e)}")
raise ValueError(f"库位层扩展属性同步失败: {str(e)}")
area_info = f", 库区: {operate_point.area_name}" if operate_point.area_name else ", 未绑定库区"
logger.debug(f"创建新库位层: 站点={operate_point.station_name}, 库位={layer_data.layer_name}, 层={layer_index}{area_info}")
return new_layer