VWED_server/services/map_tools/smap_to_huarui_converter.py

836 lines
34 KiB
Python
Raw Normal View History

2025-09-12 16:15:13 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
仙工SMAP到华睿地图包转换器
输入仙工SMAP文件输出华睿地图包和站点映射表
"""
import json
import os
import sys
import argparse
import zipfile
from datetime import datetime
from PIL import Image
import numpy as np
class SmapToHuaruiConverter:
def __init__(self):
self.smap_data = None
self.map_name = ""
self.map_width = 0
self.map_height = 0
self.x_attr_min = 0
self.y_attr_min = 0
self.pgm_data = None
self.huarui_stations = []
self.station_mapping = []
def load_smap_file(self, smap_path):
"""加载SMAP文件"""
try:
with open(smap_path, 'r', encoding='utf-8') as f:
self.smap_data = json.load(f)
print(f"SMAP文件加载成功: {smap_path}")
return True
except FileNotFoundError:
print(f"文件不存在: {smap_path}")
return False
except json.JSONDecodeError as e:
print(f"SMAP文件格式错误: {e}")
return False
except Exception as e:
print(f"加载SMAP文件失败: {e}")
return False
def set_map_parameters(self, map_name, map_width, map_height, x_attr_min, y_attr_min):
"""设置地图参数"""
self.map_name = map_name
self.map_width = map_width
self.map_height = map_height
self.x_attr_min = x_attr_min
self.y_attr_min = y_attr_min
print(f"地图参数设置完成: {map_name} ({map_width}x{map_height})")
def generate_pgm_from_pointcloud(self):
"""从点云数据生成PGM地图参照map_generator.html的方法"""
if not self.smap_data or 'normalPosList' not in self.smap_data:
print("SMAP数据中未找到normalPosList")
return False
normal_pos_list = self.smap_data['normalPosList']
if not normal_pos_list:
print("normalPosList为空")
return False
print(f"开始生成PGM地图点云数量: {len(normal_pos_list)}")
# 过滤和验证点云数据
valid_points = []
invalid_count = 0
for i, point in enumerate(normal_pos_list):
# 检查点是否有必要的坐标字段
if isinstance(point, dict) and 'x' in point and 'y' in point:
# 检查坐标是否为有效数字
try:
x = float(point['x'])
y = float(point['y'])
valid_points.append({'x': x, 'y': y})
except (ValueError, TypeError):
invalid_count += 1
if invalid_count <= 5: # 只显示前5个错误
print(f"{i} 坐标无效: {point}")
else:
invalid_count += 1
if invalid_count <= 5: # 只显示前5个错误
print(f"{i} 格式无效: {point}")
if invalid_count > 0:
print(f"发现 {invalid_count} 个无效点,已过滤掉")
if not valid_points:
print("没有有效的点云数据")
return False
print(f"有效点云数量: {len(valid_points)}")
# 计算点云范围
xs = [p['x'] for p in valid_points]
ys = [p['y'] for p in valid_points]
min_x, max_x = min(xs), max(xs)
min_y, max_y = min(ys), max(ys)
range_x = max_x - min_x
range_y = max_y - min_y
print(f"点云范围: X[{min_x:.2f}, {max_x:.2f}], Y[{min_y:.2f}, {max_y:.2f}]")
# 设置分辨率和图像尺寸参照map_generator.html的方法
resolution = 0.02 # 2cm/pixel
width = max(512, int(np.ceil(range_x / resolution)))
height = max(512, int(np.ceil(range_y / resolution)))
print(f"PGM尺寸: {width}x{height}, 分辨率: {resolution}m/pixel")
# 创建图像数据初始化为255自由空间
image_data = np.full(width * height, 255, dtype=np.uint8)
# 将点云数据映射到图像参照map_generator.html的逻辑
for point in valid_points:
pixel_x = int(np.floor((point['x'] - min_x) / resolution))
pixel_y = int(np.floor((point['y'] - min_y) / resolution))
if 0 <= pixel_x < width and 0 <= pixel_y < height:
# Y轴翻转
index = (height - 1 - pixel_y) * width + pixel_x
image_data[index] = 255 # 自由空间
# 在周围添加一些障碍物用于测试参照map_generator.html
radius = 2 # 模拟障碍物半径
for dx in range(-radius, radius + 1):
for dy in range(-radius, radius + 1):
nx = pixel_x + dx
ny = pixel_y + dy
if 0 <= nx < width and 0 <= ny < height:
n_index = (height - 1 - ny) * width + nx
# 只在边界处添加障碍物
if dx == -radius or dx == radius or dy == -radius or dy == radius:
if np.random.random() < 0.1: # 10%的概率添加障碍物
image_data[n_index] = 0 # 障碍物
# 保存PGM数据
self.pgm_data = {
'width': width,
'height': height,
'data': image_data,
'resolution': resolution,
'origin_x': min_x,
'origin_y': min_y
}
# 统计像素值分布
unique, counts = np.unique(image_data, return_counts=True)
pixel_stats = dict(zip(unique, counts))
stats_str = ', '.join([f'{"自由空间" if k == 255 else "障碍物" if k == 0 else str(k)}:{v}' for k, v in pixel_stats.items()])
print(f"像素统计: {stats_str}")
print("PGM地图生成完成")
return True
def generate_huarui_stations(self):
"""生成华睿站点数据"""
if not self.smap_data or 'advancedPointList' not in self.smap_data:
print("SMAP数据中未找到advancedPointList")
return False
advanced_point_list = self.smap_data['advancedPointList']
if not advanced_point_list:
print("advancedPointList为空")
return False
print(f"开始生成华睿站点,仙工站点数量: {len(advanced_point_list)}")
# 类型映射
type_mapping = {
'ActionPoint': {'type': 1, 'prefix': '01', 'name': '动作站点'},
'LocationMark': {'type': 0, 'prefix': '02', 'name': '路径站点'},
'ParkPoint': {'type': 7, 'prefix': '03', 'name': '停靠站点'},
'ChargePoint': {'type': 6, 'prefix': '04', 'name': '充电站点'}
}
self.huarui_stations = []
self.station_mapping = []
station_counter = 1
for smap_station in advanced_point_list:
station_info = type_mapping.get(smap_station['className'], {'type': 0, 'prefix': '02', 'name': '普通站点'})
station_id = f"{station_info['prefix']}{str(station_counter).zfill(6)}"
# 坐标转换(米转毫米)
x = round(smap_station['pos']['x'] * 1000)
y = round(smap_station['pos']['y'] * 1000)
# 创建华睿站点
huarui_station = {
"content": station_id,
"coordinate": {"x": x, "y": y},
"name": station_id,
"type": station_info['type'],
"extraTypes": [],
"isTurn": 0,
"shelfModel": 1,
"evadeNode": 1,
"shelfIsTurn": 1,
"isHandoverArea": 0,
"isVirtualPoint": False,
"isTurnRotatableRange": [0, 360000],
"QRCodeAngle": 0,
"isNavigationMarkPoint": 0,
"contentEdges": [],
"isTextureInitPoint": True,
"navigationMarkPoint": -1,
"navTypeV3": ["QRCode", "Texture", "Laser", "Mileage"],
"isIgnorePayload": False,
"isIgnoreAGV": False,
"machineAngle": 0,
"slopeDistance": [1000, 1000, 1000, 1000],
"actionObstacleType": [],
"obstacleType": [1, 1, 1, 1, 2, 8] if station_info['type'] == 1 else [0, 0, 0, 0, 2, 8],
"slope": [0, 0, 0, 0],
"shelfModelAndAngles": [{
"model": 2 if station_info['type'] == 1 else 1,
"angle": [999000],
"forkToothRadio": "front",
"forkToothFrontToStackDistance": 0,
"forkToothBackToStackDistance": 999
}],
"movPrecExpand": True,
"areaType": 1,
"driftSet": {"x": 0, "y": 0},
"identifyType": [0],
"pickupRelease": 0,
"extraRotExpansion": True,
"palletAlignment": "center",
"palletAlignmentType": 0,
"custom": "",
"navSignType": "QRCode",
"highlightDetection": False,
"AvoidObstaclesDeceleration": [1000, 1000, 1000, 1000],
"AvoidObstaclesEmergencyStop": [50, 50, 50, 50],
"isNavSignPoint": False,
"shelfMode": 2 if station_info['type'] == 1 else 1,
"id": station_id,
"code": "1" if station_info['type'] == 1 else "0",
"desp": 0,
"shelfAngle": [999000],
"SafetyDistance": [[3000, 1000], [3000, 1000], [3000, 1000], [3000, 1000]],
"SafetyShape": [["Rectage"], ["Rectage"], ["Rectage"], ["Rectage"]],
"barrierSwitch": 1,
"edges": []
}
self.huarui_stations.append(huarui_station)
# 创建映射关系
mapping = {
'smap_name': smap_station['instanceName'],
'smap_type': smap_station['className'],
'smap_type_desc': station_info['name'],
'huarui_id': station_id,
'huarui_type': station_info['type'],
'coordinate': {'x': x, 'y': y}
}
self.station_mapping.append(mapping)
station_counter += 1
# 生成路径连接
self._generate_path_connections()
print(f"华睿站点生成完成,共{len(self.huarui_stations)}个站点")
return True
def _generate_path_connections(self):
"""生成站点之间的路径连接"""
if not self.smap_data or 'advancedCurveList' not in self.smap_data:
return
# 创建instanceName到stationId的映射
instance_to_station = {}
for i, mapping in enumerate(self.station_mapping):
instance_to_station[mapping['smap_name']] = mapping['huarui_id']
# 获取所有DegenerateBezier类型的路径
paths = [curve for curve in self.smap_data['advancedCurveList']
if curve.get('className') == 'DegenerateBezier']
print(f"处理路径连接,共{len(paths)}条路径")
# 为每个站点生成contentEdges
for station in self.huarui_stations:
station['contentEdges'] = []
for path in paths:
if not path.get('startPos') or not path.get('endPos'):
continue
start_name = path['startPos'].get('instanceName')
end_name = path['endPos'].get('instanceName')
if not start_name or not end_name:
continue
start_station_id = instance_to_station.get(start_name)
end_station_id = instance_to_station.get(end_name)
# 如果当前站点是路径的起点,添加到终点的连接
if start_station_id == station['name'] and end_station_id:
target_station = next((s for s in self.huarui_stations if s['name'] == end_station_id), None)
if target_station:
edge = {
"destination": target_station['content'],
"leftWidth": -1,
"rightWidth": -1,
"startExpandDistance": -1,
"endExpandDistance": -1,
"needFollow": 1,
"weight": 1,
"avoidScene": 1,
"aspect": "3",
"turningMode": 0,
"freePlan": 0,
"edgeTurnAble": 1,
"speed": 2000,
"tangentAngle": 999000,
"outTangentAngle": 999000,
"navMode": 0,
"trackId": 0,
"bypass": False,
"pointAccuracy": -1,
"angleAccuracy": -1000,
"orientObject": "",
"deviceTravelDirection": "unlimited",
"retrograde": False,
"ignoreSensor": [],
"shelfIsPass": 1,
"trafficAMR": [],
"trafficType": 0,
"trafficLoad": [],
"extraLockExpansion": True,
"headstock": "withLine",
"unloadReachMode": 0,
"loadReachMode": 0,
"useSingleLaserLocate": False,
"custom": "",
"loadPass": 0
}
# 添加DegenerateBezier的controlPoints支持
if path.get('controlPos1') and path.get('controlPos2'):
# 转换控制点坐标(米转毫米)
control_points = [
[
round(path['controlPos1']['x'] * 1000),
round(path['controlPos1']['y'] * 1000)
],
[
round(path['controlPos2']['x'] * 1000),
round(path['controlPos2']['y'] * 1000)
]
]
edge["controlPoints"] = control_points
station['contentEdges'].append(edge)
# 如果当前站点是路径的终点,添加到起点的连接(双向路径)
if end_station_id == station['name'] and start_station_id:
target_station = next((s for s in self.huarui_stations if s['name'] == start_station_id), None)
if target_station:
edge = {
"destination": target_station['content'],
"leftWidth": -1,
"rightWidth": -1,
"startExpandDistance": -1,
"endExpandDistance": -1,
"needFollow": 1,
"weight": 1,
"avoidScene": 1,
"aspect": "3",
"turningMode": 0,
"freePlan": 0,
"edgeTurnAble": 1,
"speed": 2000,
"tangentAngle": 999000,
"outTangentAngle": 999000,
"navMode": 0,
"trackId": 0,
"bypass": False,
"pointAccuracy": -1,
"angleAccuracy": -1000,
"orientObject": "",
"deviceTravelDirection": "unlimited",
"retrograde": False,
"ignoreSensor": [],
"shelfIsPass": 1,
"trafficAMR": [],
"trafficType": 0,
"trafficLoad": [],
"extraLockExpansion": True,
"headstock": "withLine",
"unloadReachMode": 0,
"loadReachMode": 0,
"useSingleLaserLocate": False,
"custom": "",
"loadPass": 0
}
# 添加DegenerateBezier的controlPoints支持反向路径
if path.get('controlPos1') and path.get('controlPos2'):
# 反向路径的控制点顺序相反
control_points = [
[
round(path['controlPos2']['x'] * 1000),
round(path['controlPos2']['y'] * 1000)
],
[
round(path['controlPos1']['x'] * 1000),
round(path['controlPos1']['y'] * 1000)
]
]
edge["controlPoints"] = control_points
station['contentEdges'].append(edge)
# 去重contentEdges
unique_edges = []
seen_destinations = set()
for edge in station['contentEdges']:
if edge['destination'] not in seen_destinations:
seen_destinations.add(edge['destination'])
unique_edges.append(edge)
station['contentEdges'] = unique_edges
# 生成edges字段
station['edges'] = []
for edge in station['contentEdges']:
station['edges'].append({
"destination": edge['destination'],
"weight": edge['weight']
})
def generate_topo_json(self):
"""生成topo.json文件"""
topo = {
"map": {
"name": self.map_name,
"type": "topo",
"width": self.map_width,
"height": self.map_height,
"xAttrMin": self.x_attr_min,
"yAttrMin": self.y_attr_min,
"initSpeed": 2,
"obstacleType": [0, 0, 0, 0, 7, 8],
"actionObstacleType": [],
"confidence": [],
"yawThreshold": [],
"forbiddenArea": [],
"operableArea": [],
"freePlan": 0,
"turningMode": -1,
"margin": 0,
"rcsImageName": "",
"relocationImageName": "",
"shelfModel": 0,
"version": "3.0",
"navType": "qrcode",
"devicePartition": None,
"saveDistance": 1000,
"brakeDistance": 1000,
"initSimple": "Rectage",
"timeStamp": int(datetime.now().timestamp() * 1000),
"times": 34,
"icsImageName": "",
"widthFor-1": 800,
"expandFor-1": 100,
"mp_widthFor-1": 800,
"mp_expandFor-1": 0,
"mutiple_pallet_generate": True
},
"nodes": self.huarui_stations,
"attrArea": [],
"relatingAreas": [],
"virtualPoint": [],
"guidelineConfigs": [
{
"lineId": 1,
"sensor": "camera",
"direction": 0,
"width": 200,
"line": [1000, 0],
"offset": 150
}
]
}
return json.dumps(topo, indent=2, ensure_ascii=False)
def generate_yaml_config(self):
"""生成YAML配置文件"""
if not self.pgm_data:
return ""
yaml_content = f"""image: {self.map_name}.pgm
resolution: {self.pgm_data['resolution']:.6f}
origin: [{self.pgm_data['origin_x']:.6f}, {self.pgm_data['origin_y']:.6f}, 0.000000]
negate: 0
occupied_thresh: 0.65
free_thresh: 0.196
width: {self.pgm_data['width']}
height: {self.pgm_data['height']}
intensity_threshold: 1000.000000
intensities:
"""
return yaml_content
def generate_pgm_file(self):
"""生成PGM文件内容"""
if not self.pgm_data:
return None
header = f"P5\n# Generated by SMAP to Huarui Converter\n{self.pgm_data['width']} {self.pgm_data['height']}\n255\n"
header_bytes = header.encode('ascii')
# 确保数据格式正确
image_array = np.array(self.pgm_data['data'], dtype=np.uint8)
pgm_content = header_bytes + image_array.tobytes()
return pgm_content
def generate_background_jpg(self):
"""生成background.jpg文件参照map_generator.html的转换逻辑"""
if not self.pgm_data:
return None
# 获取PGM数据
width = self.pgm_data['width']
height = self.pgm_data['height']
pgm_array = np.array(self.pgm_data['data'], dtype=np.uint8)
# 创建RGB图像数据
rgb_data = np.zeros((height, width, 3), dtype=np.uint8)
# 按照map_generator.html的逻辑转换
# 255 (自由空间) -> 白色 (255, 255, 255)
# 0 (障碍物) -> 黑色 (0, 0, 0)
for i in range(len(pgm_array)):
y = i // width
x = i % width
value = pgm_array[i]
if value == 255:
# 自由空间 -> 白色
rgb_data[y, x] = [255, 255, 255]
else:
# 障碍物 -> 黑色
rgb_data[y, x] = [0, 0, 0]
# 创建PIL图像
image = Image.fromarray(rgb_data, mode='RGB')
return image
def generate_station_mapping_sql(self):
"""生成站点映射SQL文件"""
# 生成SQL内容
sql_content = []
# 添加注释说明
sql_content.append("-- 仙工SMAP到华睿站点映射表")
sql_content.append(f"-- 地图名称: {self.map_name}")
sql_content.append(f"-- 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
sql_content.append(f"-- 总站点数: {len(self.station_mapping)}")
sql_content.append("")
# 创建站点映射表
sql_content.append("-- 创建站点映射表")
sql_content.append("CREATE TABLE IF NOT EXISTS station_mapping (")
sql_content.append(" Sequence INT PRIMARY KEY,")
sql_content.append(" SeerNodeId VARCHAR(255) NOT NULL,")
sql_content.append(" HuaruiNodeId VARCHAR(255) NOT NULL,")
sql_content.append(" pot_x INT NOT NULL,")
sql_content.append(" pot_y INT NOT NULL")
sql_content.append(");")
sql_content.append("")
# 创建路径映射表
sql_content.append("-- 创建路径映射表(包含贝塞尔曲线控制点)")
sql_content.append("CREATE TABLE IF NOT EXISTS path_mapping (")
sql_content.append(" Sequence INT PRIMARY KEY,")
sql_content.append(" SourceSeerNodeId VARCHAR(255) NOT NULL,")
sql_content.append(" SourceHuaruiNodeId VARCHAR(255) NOT NULL,")
sql_content.append(" SourceNodeX INT NOT NULL,")
sql_content.append(" SourceNodeY INT NOT NULL,")
sql_content.append(" TargetSeerNodeId VARCHAR(255) NOT NULL,")
sql_content.append(" TargetHuaruiNodeId VARCHAR(255) NOT NULL,")
sql_content.append(" TargetNodeX INT NOT NULL,")
sql_content.append(" TargetNodeY INT NOT NULL,")
sql_content.append(" ControlPoint1X INT DEFAULT NULL,")
sql_content.append(" ControlPoint1Y INT DEFAULT NULL,")
sql_content.append(" ControlPoint2X INT DEFAULT NULL,")
sql_content.append(" ControlPoint2Y INT DEFAULT NULL")
sql_content.append(");")
sql_content.append("")
# 清空表数据
sql_content.append("-- 清空现有数据")
sql_content.append("DELETE FROM station_mapping;")
sql_content.append("DELETE FROM path_mapping;")
sql_content.append("")
# 插入站点映射数据
sql_content.append("-- 插入站点映射数据")
for i, mapping in enumerate(self.station_mapping, 1):
insert_sql = (
f"INSERT INTO station_mapping (Sequence, SeerNodeId, HuaruiNodeId, pot_x, pot_y) "
f"VALUES ({i}, '{mapping['smap_name']}', '{mapping['huarui_id']}', "
f"{mapping['coordinate']['x']}, {mapping['coordinate']['y']});"
)
sql_content.append(insert_sql)
sql_content.append("")
# 插入路径映射数据
sql_content.append("-- 插入路径映射数据")
path_sequence = 1
# 创建名称到ID的映射
seer_name_to_id = {mapping['smap_name']: mapping['smap_name'] for mapping in self.station_mapping}
huarui_id_to_name = {mapping['huarui_id']: mapping['huarui_id'] for mapping in self.station_mapping}
# 创建站点名称到坐标的映射
station_coords = {}
for mapping in self.station_mapping:
station_coords[mapping['huarui_id']] = mapping['coordinate']
# 获取原始路径数据
if self.smap_data and 'advancedCurveList' in self.smap_data:
paths = [curve for curve in self.smap_data['advancedCurveList']
if curve.get('className') == 'DegenerateBezier']
# 创建instanceName到华睿ID的映射
instance_to_huarui = {}
for mapping in self.station_mapping:
instance_to_huarui[mapping['smap_name']] = mapping['huarui_id']
for path in paths:
if not path.get('startPos') or not path.get('endPos'):
continue
start_name = path['startPos'].get('instanceName')
end_name = path['endPos'].get('instanceName')
if not start_name or not end_name:
continue
start_huarui_id = instance_to_huarui.get(start_name)
end_huarui_id = instance_to_huarui.get(end_name)
if not start_huarui_id or not end_huarui_id:
continue
# 获取坐标
start_coord = station_coords.get(start_huarui_id, {'x': 0, 'y': 0})
end_coord = station_coords.get(end_huarui_id, {'x': 0, 'y': 0})
# 获取控制点
control1_x = None
control1_y = None
control2_x = None
control2_y = None
if path.get('controlPos1'):
control1_x = round(path['controlPos1']['x'] * 1000)
control1_y = round(path['controlPos1']['y'] * 1000)
if path.get('controlPos2'):
control2_x = round(path['controlPos2']['x'] * 1000)
control2_y = round(path['controlPos2']['y'] * 1000)
# 插入路径数据
control1_x_str = str(control1_x) if control1_x is not None else "NULL"
control1_y_str = str(control1_y) if control1_y is not None else "NULL"
control2_x_str = str(control2_x) if control2_x is not None else "NULL"
control2_y_str = str(control2_y) if control2_y is not None else "NULL"
insert_sql = (
f"INSERT INTO path_mapping (Sequence, SourceSeerNodeId, SourceHuaruiNodeId, "
f"SourceNodeX, SourceNodeY, TargetSeerNodeId, TargetHuaruiNodeId, "
f"TargetNodeX, TargetNodeY, ControlPoint1X, ControlPoint1Y, "
f"ControlPoint2X, ControlPoint2Y) "
f"VALUES ({path_sequence}, '{start_name}', '{start_huarui_id}', "
f"{start_coord['x']}, {start_coord['y']}, '{end_name}', '{end_huarui_id}', "
f"{end_coord['x']}, {end_coord['y']}, {control1_x_str}, {control1_y_str}, "
f"{control2_x_str}, {control2_y_str});"
)
sql_content.append(insert_sql)
path_sequence += 1
sql_content.append("")
sql_content.append("-- 查询所有映射数据")
sql_content.append("SELECT * FROM station_mapping ORDER BY Sequence;")
sql_content.append("")
sql_content.append("-- 查询所有路径数据")
sql_content.append("SELECT * FROM path_mapping ORDER BY Sequence;")
return '\n'.join(sql_content)
def create_map_package(self, output_dir):
"""创建华睿地图包"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
print(f"开始创建华睿地图包到目录: {output_dir}")
# 创建ZIP文件
zip_path = os.path.join(output_dir, f"{self.map_name}.zip")
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
# 1. topo.json
topo_content = self.generate_topo_json()
zipf.writestr("topo.json", topo_content)
print("topo.json")
# 2. PGM文件
pgm_content = self.generate_pgm_file()
if pgm_content:
zipf.writestr(f"{self.map_name}.pgm", pgm_content)
print(f"{self.map_name}.pgm")
# 3. YAML配置文件
yaml_content = self.generate_yaml_config()
if yaml_content:
zipf.writestr(f"{self.map_name}.yaml", yaml_content)
print(f"{self.map_name}.yaml")
# 4. background.jpg
bg_image = self.generate_background_jpg()
if bg_image:
import io
img_buffer = io.BytesIO()
bg_image.save(img_buffer, format='JPEG', quality=95)
zipf.writestr("background.jpg", img_buffer.getvalue())
print("background.jpg")
# 5. dbVersion.txt
db_version = datetime.now().strftime('%Y%m%d%H%M')
zipf.writestr("dbVersion.txt", db_version)
print("dbVersion.txt")
print(f"华睿地图包创建完成: {zip_path}")
return zip_path
def create_station_mapping_sql(self, output_dir):
"""创建站点映射SQL文件"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
sql_content = self.generate_station_mapping_sql()
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
sql_path = os.path.join(output_dir, f"{self.map_name}_StationMapping_{timestamp}.sql")
with open(sql_path, 'w', encoding='utf-8') as f:
f.write(sql_content)
print(f"站点映射表创建完成: {sql_path}")
return sql_path
def convert(self, smap_path, map_name, map_width, map_height, x_attr_min, y_attr_min, output_dir="output"):
"""执行完整的转换流程"""
print("开始SMAP到华睿地图包转换")
print("=" * 60)
# 1. 加载SMAP文件
if not self.load_smap_file(smap_path):
return False
# 2. 设置地图参数
self.set_map_parameters(map_name, map_width, map_height, x_attr_min, y_attr_min)
# 3. 生成PGM地图
if not self.generate_pgm_from_pointcloud():
return False
# 4. 生成华睿站点
if not self.generate_huarui_stations():
return False
# 5. 创建华睿地图包
map_package_path = self.create_map_package(output_dir)
# 6. 创建站点映射表SQL格式
mapping_sql_path = self.create_station_mapping_sql(output_dir)
print("=" * 60)
print("转换完成!")
print(f"华睿地图包: {map_package_path}")
print(f"站点映射表: {mapping_sql_path}")
return True
def main():
parser = argparse.ArgumentParser(description='仙工SMAP到华睿地图包转换器')
parser.add_argument('smap_path', help='仙工SMAP文件路径')
parser.add_argument('map_name', help='地图名称')
parser.add_argument('map_width', type=int, help='地图宽度mm')
parser.add_argument('map_height', type=int, help='地图高度mm')
parser.add_argument('x_attr_min', type=int, help='X最小值mm')
parser.add_argument('y_attr_min', type=int, help='Y最小值mm')
parser.add_argument('-o', '--output', default='output', help='输出目录(默认: output')
args = parser.parse_args()
# 检查SMAP文件是否存在
if not os.path.exists(args.smap_path):
print(f"SMAP文件不存在: {args.smap_path}")
sys.exit(1)
# 创建转换器并执行转换
converter = SmapToHuaruiConverter()
success = converter.convert(
args.smap_path,
args.map_name,
args.map_width,
args.map_height,
args.x_attr_min,
args.y_attr_min,
args.output
)
if success:
print("\n 转换成功完成!")
sys.exit(0)
else:
print("\n 转换失败!")
sys.exit(1)
if __name__ == "__main__":
main()