222 lines
7.0 KiB
Python
222 lines
7.0 KiB
Python
|
||
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
地图格式转换API路由
|
||
提供多种地图格式之间的转换API接口:
|
||
1. SMAP到Scene格式转换
|
||
2. Scene到SMAP格式转换
|
||
3. SMAP到华睿(IRAY)地图包转换
|
||
4. 转换文件下载接口
|
||
"""
|
||
|
||
from fastapi import APIRouter, UploadFile, File, Request, Form
|
||
from fastapi.responses import FileResponse, StreamingResponse, JSONResponse
|
||
import os
|
||
import time
|
||
import tempfile
|
||
import shutil
|
||
from routes.common_api import format_response, error_response
|
||
from utils.logger import get_logger
|
||
from services import smap_to_scene_converter
|
||
from services.scene_to_smap_converter import convert_scene_to_smap
|
||
from services.smap_to_iray_converter import convert_smap_to_iray
|
||
|
||
# 创建路由
|
||
router = APIRouter(prefix="/api/vwed-map-converter", tags=["地图格式转换"])
|
||
|
||
# 设置日志
|
||
logger = get_logger("app.map_converter_api")
|
||
|
||
@router.post("/smap-to-scene")
|
||
async def smap_to_scene(
|
||
smap_file: UploadFile = File(..., description="上传的smap地图文件"),
|
||
scene_file: UploadFile = File(None, description="可选,参考scene文件")
|
||
):
|
||
"""
|
||
SMAP地图转Scene格式
|
||
支持可选参考scene文件,自动完成场景丰富化
|
||
"""
|
||
try:
|
||
# 保存上传的文件到临时目录
|
||
temp_dir = "temp_uploads"
|
||
os.makedirs(temp_dir, exist_ok=True)
|
||
smap_path = os.path.join(temp_dir, f"{int(time.time())}_{smap_file.filename}")
|
||
with open(smap_path, "wb") as f:
|
||
f.write(await smap_file.read())
|
||
scene_path = None
|
||
if scene_file:
|
||
scene_path = os.path.join(temp_dir, f"{int(time.time())}_{scene_file.filename}")
|
||
with open(scene_path, "wb") as f:
|
||
f.write(await scene_file.read())
|
||
# 执行转换
|
||
result = smap_to_scene_converter.convert_smap_to_scene(smap_path, scene_path)
|
||
return format_response(data=result, message="SMAP地图转换成功")
|
||
except Exception as e:
|
||
logger.error(f"SMAP地图转换失败: {str(e)}")
|
||
return error_response(f"SMAP地图转换失败: {str(e)}", 500)
|
||
|
||
|
||
@router.post("/scene-to-smap")
|
||
async def scene_to_smap(
|
||
scene_file: UploadFile = File(..., description="Scene地图文件"),
|
||
smap_file: UploadFile = File(..., description="SMAP地图文件")
|
||
):
|
||
"""
|
||
Scene地图到SMAP地图转换
|
||
根据scene地图中的points信息,更新smap地图中对应的坐标位置
|
||
"""
|
||
try:
|
||
# 创建临时目录
|
||
temp_dir = "temp_uploads"
|
||
os.makedirs(temp_dir, exist_ok=True)
|
||
|
||
# 保存上传的文件
|
||
timestamp = int(time.time())
|
||
scene_path = os.path.join(temp_dir, f"{timestamp}_{scene_file.filename}")
|
||
smap_path = os.path.join(temp_dir, f"{timestamp}_{smap_file.filename}")
|
||
|
||
with open(scene_path, "wb") as f:
|
||
f.write(await scene_file.read())
|
||
with open(smap_path, "wb") as f:
|
||
f.write(await smap_file.read())
|
||
|
||
# 执行转换
|
||
result = convert_scene_to_smap(scene_path, smap_path)
|
||
|
||
# 清理临时文件
|
||
try:
|
||
os.remove(scene_path)
|
||
os.remove(smap_path)
|
||
except:
|
||
pass
|
||
|
||
if result.get('success'):
|
||
return format_response(data=result, message="Scene到SMAP转换成功")
|
||
else:
|
||
return error_response("Scene到SMAP转换失败", 500, result.get('log', []))
|
||
|
||
except Exception as e:
|
||
logger.error(f"Scene到SMAP转换失败: {str(e)}")
|
||
return error_response(f"Scene到SMAP转换失败: {str(e)}", 500)
|
||
|
||
|
||
@router.post("/smap-to-iray")
|
||
async def smap_to_iray(
|
||
scene_file: UploadFile = File(..., description="Scene文件"),
|
||
smap_file: UploadFile = File(..., description="SMAP文件"),
|
||
map_width: int = Form(..., description="地图宽度(mm)"),
|
||
map_height: int = Form(..., description="地图高度(mm)"),
|
||
x_attr_min: int = Form(..., description="X最小值(mm)"),
|
||
y_attr_min: int = Form(..., description="Y最小值(mm)"),
|
||
response_mode: str = Form("binary", description="返回模式: binary 或 json")
|
||
):
|
||
"""SMAP -> 华睿(IRAY) 地图包
|
||
默认直接返回zip二进制( Content-Type=application/zip )。
|
||
若 response_mode=json 则保持旧行为返回JSON元数据。
|
||
"""
|
||
try:
|
||
temp_dir = "temp_uploads"
|
||
output_dir = "iray_output"
|
||
os.makedirs(temp_dir, exist_ok=True)
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
timestamp = int(time.time())
|
||
scene_path = os.path.join(temp_dir, f"{timestamp}_{scene_file.filename}")
|
||
smap_path = os.path.join(temp_dir, f"{timestamp}_{smap_file.filename}")
|
||
|
||
with open(scene_path, "wb") as f:
|
||
f.write(await scene_file.read())
|
||
with open(smap_path, "wb") as f:
|
||
f.write(await smap_file.read())
|
||
|
||
result = convert_smap_to_iray(
|
||
scene_path=scene_path,
|
||
smap_path=smap_path,
|
||
map_width=map_width,
|
||
map_height=map_height,
|
||
x_attr_min=x_attr_min,
|
||
y_attr_min=y_attr_min,
|
||
output_dir=output_dir
|
||
)
|
||
|
||
# 删除上传原文件(不删除输出)
|
||
try:
|
||
os.remove(scene_path)
|
||
os.remove(smap_path)
|
||
except Exception:
|
||
pass
|
||
|
||
if not result.get('success'):
|
||
return error_response(result.get('message', '转换失败'), 500, result.get('error'))
|
||
|
||
# zip路径(convert函数内部命名: smap文件名 + .zip)
|
||
map_package_path = result.get('map_package_path') or result.get('map_package') or None
|
||
if not map_package_path:
|
||
# 回退推断
|
||
smap_filename = os.path.splitext(os.path.basename(smap_path))[0]
|
||
candidate = os.path.join(output_dir, f"{smap_filename}.zip")
|
||
if os.path.exists(candidate):
|
||
map_package_path = candidate
|
||
|
||
if not map_package_path or not os.path.exists(map_package_path):
|
||
return error_response("生成的地图包未找到", 500)
|
||
|
||
if response_mode == 'json':
|
||
# 返回元数据,供前端自行再下载
|
||
payload = {
|
||
**result,
|
||
"download_url": f"/api/vwed-map-converter/download/iray/{os.path.basename(map_package_path)}"
|
||
}
|
||
return format_response(data=payload, message="SMAP到华睿地图包转换成功")
|
||
|
||
# 二进制直接返回
|
||
filename = os.path.basename(map_package_path)
|
||
def iterfile():
|
||
with open(map_package_path, 'rb') as f:
|
||
while True:
|
||
chunk = f.read(8192)
|
||
if not chunk:
|
||
break
|
||
yield chunk
|
||
headers = {
|
||
"Content-Disposition": f"attachment; filename={filename}",
|
||
"X-Map-Station-Count": str(result.get('station_count', 0))
|
||
}
|
||
return StreamingResponse(iterfile(), media_type='application/zip', headers=headers)
|
||
|
||
except Exception as e:
|
||
logger.error(f"SMAP到华睿地图包转换失败: {str(e)}")
|
||
return error_response(f"SMAP到华睿地图包转换失败: {str(e)}", 500)
|
||
|
||
|
||
@router.get("/download/{file_type}/{filename}")
|
||
async def download_converted_file(file_type: str, filename: str):
|
||
"""
|
||
下载转换后的文件
|
||
file_type: 'scene', 'smap', 'iray'
|
||
filename: 文件名
|
||
"""
|
||
try:
|
||
if file_type == "scene":
|
||
file_path = os.path.join("converted", filename)
|
||
elif file_type == "smap":
|
||
file_path = os.path.join("temp_uploads", filename)
|
||
elif file_type == "iray":
|
||
file_path = os.path.join("iray_output", filename)
|
||
else:
|
||
return error_response("不支持的文件类型", 400)
|
||
|
||
if not os.path.exists(file_path):
|
||
return error_response("文件不存在", 404)
|
||
|
||
return FileResponse(
|
||
path=file_path,
|
||
filename=filename,
|
||
media_type='application/octet-stream'
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"文件下载失败: {str(e)}")
|
||
return error_response(f"文件下载失败: {str(e)}", 500)
|