VWED_server/services/smap_to_iray_converter.py

836 lines
32 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
仙工SMAP到华睿地图包转换器
输入仙工SMAP文件和Scene文件输出华睿地图包和站点映射表
"""
import json
import os
import sys
import argparse
import zipfile
import csv
from datetime import datetime
from PIL import Image
import numpy as np
class SmapToHuaruiConverter:
def __init__(self):
self.smap_data = None
self.scene_data = None
self.map_name = ""
self.map_width = 0
self.map_height = 0
self.x_attr_min = 0
self.y_attr_min = 0
self.pgm_data = None
self.huarui_stations = []
self.station_mapping = []
self.csv_mapping = {} # 从Scene文件提取的站点映射
self.points_mapping = [] # Scene文件中的点位映射
def load_scene_file(self, scene_path):
"""加载Scene文件"""
try:
with open(scene_path, 'r', encoding='utf-8') as f:
self.scene_data = json.load(f)
print(f"Scene文件加载成功: {scene_path}")
return True
except FileNotFoundError:
print(f"文件不存在: {scene_path}")
return False
except json.JSONDecodeError as e:
print(f"Scene文件格式错误: {e}")
return False
except Exception as e:
print(f"加载Scene文件失败: {e}")
return False
def extract_points_mapping(self):
"""提取points字段的映射关系"""
if not self.scene_data or 'points' not in self.scene_data:
print("Scene数据中未找到points字段")
return False
points = self.scene_data['points']
if not points:
print("points字段为空")
return False
print(f"开始提取站点映射关系,共{len(points)}个站点")
self.points_mapping = []
for i, point in enumerate(points, 1):
mapping = {
'sequence': i,
'name': point.get('name', ''),
'id': point.get('id', '')
}
self.points_mapping.append(mapping)
# 同时填充csv_mapping用于后续处理
if mapping['name']:
self.csv_mapping[mapping['name']] = mapping['id']
print(f"站点映射提取完成,共{len(self.points_mapping)}个站点")
return True
def generate_csv_report(self, output_path):
"""生成CSV格式的映射表"""
if not self.points_mapping:
print("没有映射数据可导出")
return False
try:
with open(output_path, 'w', newline='', encoding='utf-8-sig') as csvfile:
fieldnames = ['sequence', 'name', 'id']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# 写入表头
writer.writeheader()
# 写入数据
for mapping in self.points_mapping:
writer.writerow(mapping)
print(f"CSV映射表生成完成: {output_path}")
return True
except Exception as e:
print(f"生成CSV文件失败: {e}")
return False
def load_smap_file(self, smap_path):
"""加载SMAP文件"""
try:
with open(smap_path, 'r', encoding='utf-8') as f:
self.smap_data = json.load(f)
print(f"SMAP文件加载成功: {smap_path}")
return True
except FileNotFoundError:
print(f"文件不存在: {smap_path}")
return False
except json.JSONDecodeError as e:
print(f"SMAP文件格式错误: {e}")
return False
except Exception as e:
print(f"加载SMAP文件失败: {e}")
return False
def set_map_parameters(self, map_name, map_width, map_height, x_attr_min, y_attr_min):
"""设置地图参数"""
self.map_name = map_name
self.map_width = map_width
self.map_height = map_height
self.x_attr_min = x_attr_min
self.y_attr_min = y_attr_min
print(f"地图参数设置完成: {map_name} ({map_width}x{map_height})")
def generate_pgm_from_pointcloud(self):
"""从点云数据生成PGM地图参照map_generator.html的方法"""
if not self.smap_data or 'normalPosList' not in self.smap_data:
print("SMAP数据中未找到normalPosList")
return False
normal_pos_list = self.smap_data['normalPosList']
if not normal_pos_list:
print("normalPosList为空")
return False
print(f"开始生成PGM地图点云数量: {len(normal_pos_list)}")
# 过滤和验证点云数据
valid_points = []
invalid_count = 0
for i, point in enumerate(normal_pos_list):
# 检查点是否有必要的坐标字段
if isinstance(point, dict) and 'x' in point and 'y' in point:
# 检查坐标是否为有效数字
try:
x = float(point['x'])
y = float(point['y'])
valid_points.append({'x': x, 'y': y})
except (ValueError, TypeError):
invalid_count += 1
if invalid_count <= 5: # 只显示前5个错误
print(f"{i} 坐标无效: {point}")
else:
invalid_count += 1
if invalid_count <= 5: # 只显示前5个错误
print(f"{i} 格式无效: {point}")
if invalid_count > 0:
print(f"发现 {invalid_count} 个无效点,已过滤掉")
if not valid_points:
print("没有有效的点云数据")
return False
print(f"有效点云数量: {len(valid_points)}")
# 计算点云范围
xs = [p['x'] for p in valid_points]
ys = [p['y'] for p in valid_points]
min_x, max_x = min(xs), max(xs)
min_y, max_y = min(ys), max(ys)
range_x = max_x - min_x
range_y = max_y - min_y
print(f"点云范围: X[{min_x:.2f}, {max_x:.2f}], Y[{min_y:.2f}, {max_y:.2f}]")
# 设置分辨率和图像尺寸参照map_generator.html的方法
resolution = 0.02 # 2cm/pixel
width = max(512, int(np.ceil(range_x / resolution)))
height = max(512, int(np.ceil(range_y / resolution)))
print(f"PGM尺寸: {width}x{height}, 分辨率: {resolution}m/pixel")
# 创建图像数据初始化为255自由空间
image_data = np.full(width * height, 255, dtype=np.uint8)
# 将点云数据映射到图像参照map_generator.html的逻辑
for point in valid_points:
pixel_x = int(np.floor((point['x'] - min_x) / resolution))
pixel_y = int(np.floor((point['y'] - min_y) / resolution))
if 0 <= pixel_x < width and 0 <= pixel_y < height:
# Y轴翻转
index = (height - 1 - pixel_y) * width + pixel_x
image_data[index] = 255 # 自由空间
# 在周围添加一些障碍物用于测试参照map_generator.html
radius = 2 # 模拟障碍物半径
for dx in range(-radius, radius + 1):
for dy in range(-radius, radius + 1):
nx = pixel_x + dx
ny = pixel_y + dy
if 0 <= nx < width and 0 <= ny < height:
n_index = (height - 1 - ny) * width + nx
# 只在边界处添加障碍物
if dx == -radius or dx == radius or dy == -radius or dy == radius:
if np.random.random() < 0.1: # 10%的概率添加障碍物
image_data[n_index] = 0 # 障碍物
# 保存PGM数据
self.pgm_data = {
'width': width,
'height': height,
'data': image_data,
'resolution': resolution,
'origin_x': min_x,
'origin_y': min_y
}
# 统计像素值分布
unique, counts = np.unique(image_data, return_counts=True)
pixel_stats = dict(zip(unique, counts))
stats_str = ', '.join([f'{"自由空间" if k == 255 else "障碍物" if k == 0 else str(k)}:{v}' for k, v in pixel_stats.items()])
print(f"像素统计: {stats_str}")
print("PGM地图生成完成")
return True
def generate_huarui_stations(self):
"""生成华睿站点数据"""
if not self.smap_data or 'advancedPointList' not in self.smap_data:
print("SMAP数据中未找到advancedPointList")
return False
advanced_point_list = self.smap_data['advancedPointList']
if not advanced_point_list:
print("advancedPointList为空")
return False
if not self.csv_mapping:
print("CSV映射数据为空请先加载CSV映射文件")
return False
print(f"开始生成华睿站点,仙工站点数量: {len(advanced_point_list)}")
# 类型映射
type_mapping = {
'ActionPoint': {'type': 1, 'prefix': '01', 'name': '动作站点'},
'LocationMark': {'type': 0, 'prefix': '02', 'name': '路径站点'},
'ParkPoint': {'type': 7, 'prefix': '03', 'name': '停靠站点'},
'ChargePoint': {'type': 6, 'prefix': '04', 'name': '充电站点'}
}
self.huarui_stations = []
self.station_mapping = []
for smap_station in advanced_point_list:
station_info = type_mapping.get(smap_station['className'], {'type': 0, 'prefix': '02', 'name': '普通站点'})
# 使用CSV映射中的ID如果找不到则跳过该站点
smap_name = smap_station['instanceName']
if smap_name not in self.csv_mapping:
print(f"警告: 站点 {smap_name} 未在CSV映射文件中找到跳过")
continue
station_id = self.csv_mapping[smap_name]
# 坐标转换(米转毫米)
x = round(smap_station['pos']['x'] * 1000)
y = round(smap_station['pos']['y'] * 1000)
# 创建华睿站点使用CSV中的ID作为content和name
huarui_station = {
"content": station_id,
"coordinate": {"x": x, "y": y},
"name": station_id,
"type": station_info['type'],
"extraTypes": [],
"isTurn": 0,
"shelfModel": 1,
"evadeNode": 1,
"shelfIsTurn": 1,
"isHandoverArea": 0,
"isVirtualPoint": False,
"isTurnRotatableRange": [0, 360000],
"QRCodeAngle": 0,
"isNavigationMarkPoint": 0,
"contentEdges": [],
"isTextureInitPoint": True,
"navigationMarkPoint": -1,
"navTypeV3": ["QRCode", "Texture", "Laser", "Mileage"],
"isIgnorePayload": False,
"isIgnoreAGV": False,
"machineAngle": 0,
"slopeDistance": [1000, 1000, 1000, 1000],
"actionObstacleType": [],
"obstacleType": [1, 1, 1, 1, 2, 8] if station_info['type'] == 1 else [0, 0, 0, 0, 2, 8],
"slope": [0, 0, 0, 0],
"shelfModelAndAngles": [{
"model": 2 if station_info['type'] == 1 else 1,
"angle": [999000],
"forkToothRadio": "front",
"forkToothFrontToStackDistance": 0,
"forkToothBackToStackDistance": 999
}],
"movPrecExpand": True,
"areaType": 1,
"driftSet": {"x": 0, "y": 0},
"identifyType": [0],
"pickupRelease": 0,
"extraRotExpansion": True,
"palletAlignment": "center",
"palletAlignmentType": 0,
"custom": "",
"navSignType": "QRCode",
"highlightDetection": False,
"AvoidObstaclesDeceleration": [1000, 1000, 1000, 1000],
"AvoidObstaclesEmergencyStop": [50, 50, 50, 50],
"isNavSignPoint": False,
"shelfMode": 2 if station_info['type'] == 1 else 1,
"id": station_id,
"code": "1" if station_info['type'] == 1 else "0",
"desp": 0,
"shelfAngle": [999000],
"SafetyDistance": [[3000, 1000], [3000, 1000], [3000, 1000], [3000, 1000]],
"SafetyShape": [["Rectage"], ["Rectage"], ["Rectage"], ["Rectage"]],
"barrierSwitch": 1,
"edges": []
}
self.huarui_stations.append(huarui_station)
# 创建映射关系
mapping = {
'smap_name': smap_station['instanceName'],
'smap_type': smap_station['className'],
'smap_type_desc': station_info['name'],
'huarui_id': station_id,
'huarui_type': station_info['type'],
'coordinate': {'x': x, 'y': y}
}
self.station_mapping.append(mapping)
# 生成路径连接
self._generate_path_connections()
print(f"华睿站点生成完成,共{len(self.huarui_stations)}个站点")
return True
def _generate_path_connections(self):
"""生成站点之间的路径连接"""
if not self.smap_data or 'advancedCurveList' not in self.smap_data:
return
# 创建instanceName到stationId的映射
instance_to_station = {}
for i, mapping in enumerate(self.station_mapping):
instance_to_station[mapping['smap_name']] = mapping['huarui_id']
# 获取所有DegenerateBezier类型的路径
paths = [curve for curve in self.smap_data['advancedCurveList']
if curve.get('className') == 'DegenerateBezier']
print(f"处理路径连接,共{len(paths)}条路径")
# 为每个站点生成contentEdges
for station in self.huarui_stations:
station['contentEdges'] = []
for path in paths:
if not path.get('startPos') or not path.get('endPos'):
continue
start_name = path['startPos'].get('instanceName')
end_name = path['endPos'].get('instanceName')
if not start_name or not end_name:
continue
start_station_id = instance_to_station.get(start_name)
end_station_id = instance_to_station.get(end_name)
# 如果当前站点是路径的起点,添加到终点的连接
if start_station_id == station['name'] and end_station_id:
target_station = next((s for s in self.huarui_stations if s['name'] == end_station_id), None)
if target_station:
edge = {
"destination": target_station['content'],
"leftWidth": -1,
"rightWidth": -1,
"startExpandDistance": -1,
"endExpandDistance": -1,
"needFollow": 1,
"weight": 1,
"avoidScene": 1,
"aspect": "3",
"turningMode": 0,
"freePlan": 0,
"edgeTurnAble": 1,
"speed": 2000,
"tangentAngle": 999000,
"outTangentAngle": 999000,
"navMode": 0,
"trackId": 0,
"bypass": False,
"pointAccuracy": -1,
"angleAccuracy": -1000,
"orientObject": "",
"deviceTravelDirection": "unlimited",
"retrograde": False,
"ignoreSensor": [],
"shelfIsPass": 1,
"trafficAMR": [],
"trafficType": 0,
"trafficLoad": [],
"extraLockExpansion": True,
"headstock": "withLine",
"unloadReachMode": 0,
"loadReachMode": 0,
"useSingleLaserLocate": False,
"custom": "",
"loadPass": 0
}
# 添加DegenerateBezier的controlPoints支持
if path.get('controlPos1') and path.get('controlPos2'):
# 转换控制点坐标(米转毫米)
control_points = [
[
round(path['controlPos1']['x'] * 1000),
round(path['controlPos1']['y'] * 1000)
],
[
round(path['controlPos2']['x'] * 1000),
round(path['controlPos2']['y'] * 1000)
]
]
edge["controlPoints"] = control_points
station['contentEdges'].append(edge)
# 如果当前站点是路径的终点,添加到起点的连接(双向路径)
if end_station_id == station['name'] and start_station_id:
target_station = next((s for s in self.huarui_stations if s['name'] == start_station_id), None)
if target_station:
edge = {
"destination": target_station['content'],
"leftWidth": -1,
"rightWidth": -1,
"startExpandDistance": -1,
"endExpandDistance": -1,
"needFollow": 1,
"weight": 1,
"avoidScene": 1,
"aspect": "3",
"turningMode": 0,
"freePlan": 0,
"edgeTurnAble": 1,
"speed": 2000,
"tangentAngle": 999000,
"outTangentAngle": 999000,
"navMode": 0,
"trackId": 0,
"bypass": False,
"pointAccuracy": -1,
"angleAccuracy": -1000,
"orientObject": "",
"deviceTravelDirection": "unlimited",
"retrograde": False,
"ignoreSensor": [],
"shelfIsPass": 1,
"trafficAMR": [],
"trafficType": 0,
"trafficLoad": [],
"extraLockExpansion": True,
"headstock": "withLine",
"unloadReachMode": 0,
"loadReachMode": 0,
"useSingleLaserLocate": False,
"custom": "",
"loadPass": 0
}
# 添加DegenerateBezier的controlPoints支持反向路径
if path.get('controlPos1') and path.get('controlPos2'):
# 反向路径的控制点顺序相反
control_points = [
[
round(path['controlPos2']['x'] * 1000),
round(path['controlPos2']['y'] * 1000)
],
[
round(path['controlPos1']['x'] * 1000),
round(path['controlPos1']['y'] * 1000)
]
]
edge["controlPoints"] = control_points
station['contentEdges'].append(edge)
# 去重contentEdges
unique_edges = []
seen_destinations = set()
for edge in station['contentEdges']:
if edge['destination'] not in seen_destinations:
seen_destinations.add(edge['destination'])
unique_edges.append(edge)
station['contentEdges'] = unique_edges
# 生成edges字段
station['edges'] = []
for edge in station['contentEdges']:
station['edges'].append({
"destination": edge['destination'],
"weight": edge['weight']
})
def generate_topo_json(self):
"""生成topo.json文件"""
topo = {
"map": {
"name": self.map_name,
"type": "topo",
"width": self.map_width,
"height": self.map_height,
"xAttrMin": self.x_attr_min,
"yAttrMin": self.y_attr_min,
"initSpeed": 2,
"obstacleType": [0, 0, 0, 0, 7, 8],
"actionObstacleType": [],
"confidence": [],
"yawThreshold": [],
"forbiddenArea": [],
"operableArea": [],
"freePlan": 0,
"turningMode": -1,
"margin": 0,
"rcsImageName": "",
"relocationImageName": "",
"shelfModel": 0,
"version": "3.0",
"navType": "qrcode",
"devicePartition": None,
"saveDistance": 1000,
"brakeDistance": 1000,
"initSimple": "Rectage",
"timeStamp": int(datetime.now().timestamp() * 1000),
"times": 34,
"icsImageName": "",
"widthFor-1": 800,
"expandFor-1": 100,
"mp_widthFor-1": 800,
"mp_expandFor-1": 0,
"mutiple_pallet_generate": True
},
"nodes": self.huarui_stations,
"attrArea": [],
"relatingAreas": [],
"virtualPoint": [],
"guidelineConfigs": [
{
"lineId": 1,
"sensor": "camera",
"direction": 0,
"width": 200,
"line": [1000, 0],
"offset": 150
}
]
}
return json.dumps(topo, indent=2, ensure_ascii=False)
def generate_yaml_config(self):
"""生成YAML配置文件"""
if not self.pgm_data:
return ""
yaml_content = f"""image: {self.map_name}.pgm
resolution: {self.pgm_data['resolution']:.6f}
origin: [{self.pgm_data['origin_x']:.6f}, {self.pgm_data['origin_y']:.6f}, 0.000000]
negate: 0
occupied_thresh: 0.65
free_thresh: 0.196
width: {self.pgm_data['width']}
height: {self.pgm_data['height']}
intensity_threshold: 1000.000000
intensities:
"""
return yaml_content
def generate_pgm_file(self):
"""生成PGM文件内容"""
if not self.pgm_data:
return None
header = f"P5\n# Generated by SMAP to Huarui Converter\n{self.pgm_data['width']} {self.pgm_data['height']}\n255\n"
header_bytes = header.encode('ascii')
# 确保数据格式正确
image_array = np.array(self.pgm_data['data'], dtype=np.uint8)
pgm_content = header_bytes + image_array.tobytes()
return pgm_content
def generate_background_jpg(self):
"""生成background.jpg文件参照map_generator.html的转换逻辑"""
if not self.pgm_data:
return None
# 获取PGM数据
width = self.pgm_data['width']
height = self.pgm_data['height']
pgm_array = np.array(self.pgm_data['data'], dtype=np.uint8)
# 创建RGB图像数据
rgb_data = np.zeros((height, width, 3), dtype=np.uint8)
# 按照map_generator.html的逻辑转换
# 255 (自由空间) -> 白色 (255, 255, 255)
# 0 (障碍物) -> 黑色 (0, 0, 0)
for i in range(len(pgm_array)):
y = i // width
x = i % width
value = pgm_array[i]
if value == 255:
# 自由空间 -> 白色
rgb_data[y, x] = [255, 255, 255]
else:
# 障碍物 -> 黑色
rgb_data[y, x] = [0, 0, 0]
# 创建PIL图像
image = Image.fromarray(rgb_data, mode='RGB')
return image
def create_map_package(self, output_dir):
"""创建华睿地图包"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
print(f"开始创建华睿地图包到目录: {output_dir}")
# 创建ZIP文件
zip_path = os.path.join(output_dir, f"{self.map_name}.zip")
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
# 1. topo.json
topo_content = self.generate_topo_json()
zipf.writestr("topo.json", topo_content)
print("topo.json")
# 2. PGM文件
pgm_content = self.generate_pgm_file()
if pgm_content:
zipf.writestr(f"{self.map_name}.pgm", pgm_content)
print(f"{self.map_name}.pgm")
# 3. YAML配置文件
yaml_content = self.generate_yaml_config()
if yaml_content:
zipf.writestr(f"{self.map_name}.yaml", yaml_content)
print(f"{self.map_name}.yaml")
# 4. background.jpg
bg_image = self.generate_background_jpg()
if bg_image:
import io
img_buffer = io.BytesIO()
bg_image.save(img_buffer, format='JPEG', quality=95)
zipf.writestr("background.jpg", img_buffer.getvalue())
print("background.jpg")
# 5. dbVersion.txt
db_version = datetime.now().strftime('%Y%m%d%H%M')
zipf.writestr("dbVersion.txt", db_version)
print("dbVersion.txt")
print(f"华睿地图包创建完成: {zip_path}")
return zip_path
def convert(self, scene_path, smap_path, map_width, map_height, x_attr_min, y_attr_min, output_dir="output"):
"""执行完整的转换流程"""
print("开始SMAP到华睿地图包转换")
print("=" * 60)
# 1. 加载Scene文件
if not self.load_scene_file(scene_path):
return False
# 2. 提取Scene文件中的点位映射
if not self.extract_points_mapping():
return False
# 3. 加载SMAP文件
if not self.load_smap_file(smap_path):
return False
# 4. 使用smap文件名作为地图名称
smap_filename = os.path.splitext(os.path.basename(smap_path))[0]
self.set_map_parameters(smap_filename, map_width, map_height, x_attr_min, y_attr_min)
# 5. 生成PGM地图
if not self.generate_pgm_from_pointcloud():
return False
# 6. 生成华睿站点
if not self.generate_huarui_stations():
return False
# 7. 生成CSV映射表
if not os.path.exists(output_dir):
os.makedirs(output_dir)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
csv_path = os.path.join(output_dir, f"{smap_filename}_PointsMapping_{timestamp}.csv")
self.generate_csv_report(csv_path)
# 8. 创建华睿地图包
map_package_path = self.create_map_package(output_dir)
print("=" * 60)
print("转换完成!")
print(f"华睿地图包: {map_package_path}")
print(f"站点映射表: {csv_path}")
return True
def main():
parser = argparse.ArgumentParser(description='仙工SMAP到华睿地图包转换器集成Scene点位映射提取')
parser.add_argument('scene_path', help='Scene文件路径')
parser.add_argument('smap_path', help='仙工SMAP文件路径')
parser.add_argument('map_width', type=int, help='地图宽度mm')
parser.add_argument('map_height', type=int, help='地图高度mm')
parser.add_argument('x_attr_min', type=int, help='X最小值mm')
parser.add_argument('y_attr_min', type=int, help='Y最小值mm')
args = parser.parse_args()
# 检查Scene文件是否存在
if not os.path.exists(args.scene_path):
print(f"Scene文件不存在: {args.scene_path}")
sys.exit(1)
# 检查SMAP文件是否存在
if not os.path.exists(args.smap_path):
print(f"SMAP文件不存在: {args.smap_path}")
sys.exit(1)
# 创建转换器并执行转换
converter = SmapToHuaruiConverter()
success = converter.convert(
args.scene_path,
args.smap_path,
args.map_width,
args.map_height,
args.x_attr_min,
args.y_attr_min,
args.output
)
if success:
print("\n✅ 转换成功完成!")
sys.exit(0)
else:
print("\n❌ 转换失败!")
sys.exit(1)
def convert_smap_to_iray(scene_path, smap_path, map_width, map_height, x_attr_min, y_attr_min, output_dir="iray_output"):
"""
API调用接口将SMAP和Scene文件转换为华睿IRAY地图包
Args:
scene_path: Scene文件路径
smap_path: SMAP文件路径
map_width: 地图宽度mm
map_height: 地图高度mm
x_attr_min: X最小值mm
y_attr_min: Y最小值mm
output_dir: 输出目录,默认为 "iray_output"
Returns:
dict: 包含转换结果的字典
"""
try:
# 创建转换器实例
converter = SmapToHuaruiConverter()
# 执行转换
success = converter.convert(
scene_path,
smap_path,
map_width,
map_height,
x_attr_min,
y_attr_min,
output_dir
)
if success:
# 获取生成的文件信息
smap_filename = os.path.splitext(os.path.basename(smap_path))[0]
map_package_path = os.path.join(output_dir, f"{smap_filename}.zip")
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
csv_path = os.path.join(output_dir, f"{smap_filename}_PointsMapping_{timestamp}.csv")
return {
"success": True,
"message": "转换成功完成",
"map_package_path": map_package_path,
"csv_mapping_path": csv_path,
"station_count": len(converter.huarui_stations) if converter.huarui_stations else 0
}
else:
return {
"success": False,
"message": "转换失败",
"error": "转换过程中出现错误"
}
except Exception as e:
return {
"success": False,
"message": "转换失败",
"error": str(e)
}
if __name__ == "__main__":
main()