836 lines
32 KiB
Python
836 lines
32 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
仙工SMAP到华睿地图包转换器
|
||
输入仙工SMAP文件和Scene文件,输出华睿地图包和站点映射表
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import sys
|
||
import argparse
|
||
import zipfile
|
||
import csv
|
||
from datetime import datetime
|
||
from PIL import Image
|
||
import numpy as np
|
||
|
||
class SmapToHuaruiConverter:
|
||
def __init__(self):
|
||
self.smap_data = None
|
||
self.scene_data = None
|
||
self.map_name = ""
|
||
self.map_width = 0
|
||
self.map_height = 0
|
||
self.x_attr_min = 0
|
||
self.y_attr_min = 0
|
||
self.pgm_data = None
|
||
self.huarui_stations = []
|
||
self.station_mapping = []
|
||
self.csv_mapping = {} # 从Scene文件提取的站点映射
|
||
self.points_mapping = [] # Scene文件中的点位映射
|
||
|
||
def load_scene_file(self, scene_path):
|
||
"""加载Scene文件"""
|
||
try:
|
||
with open(scene_path, 'r', encoding='utf-8') as f:
|
||
self.scene_data = json.load(f)
|
||
print(f"Scene文件加载成功: {scene_path}")
|
||
return True
|
||
except FileNotFoundError:
|
||
print(f"文件不存在: {scene_path}")
|
||
return False
|
||
except json.JSONDecodeError as e:
|
||
print(f"Scene文件格式错误: {e}")
|
||
return False
|
||
except Exception as e:
|
||
print(f"加载Scene文件失败: {e}")
|
||
return False
|
||
|
||
def extract_points_mapping(self):
|
||
"""提取points字段的映射关系"""
|
||
if not self.scene_data or 'points' not in self.scene_data:
|
||
print("Scene数据中未找到points字段")
|
||
return False
|
||
|
||
points = self.scene_data['points']
|
||
if not points:
|
||
print("points字段为空")
|
||
return False
|
||
|
||
print(f"开始提取站点映射关系,共{len(points)}个站点")
|
||
|
||
self.points_mapping = []
|
||
|
||
for i, point in enumerate(points, 1):
|
||
mapping = {
|
||
'sequence': i,
|
||
'name': point.get('name', ''),
|
||
'id': point.get('id', '')
|
||
}
|
||
|
||
self.points_mapping.append(mapping)
|
||
# 同时填充csv_mapping用于后续处理
|
||
if mapping['name']:
|
||
self.csv_mapping[mapping['name']] = mapping['id']
|
||
|
||
print(f"站点映射提取完成,共{len(self.points_mapping)}个站点")
|
||
return True
|
||
|
||
def generate_csv_report(self, output_path):
|
||
"""生成CSV格式的映射表"""
|
||
if not self.points_mapping:
|
||
print("没有映射数据可导出")
|
||
return False
|
||
|
||
try:
|
||
with open(output_path, 'w', newline='', encoding='utf-8-sig') as csvfile:
|
||
fieldnames = ['sequence', 'name', 'id']
|
||
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
||
|
||
# 写入表头
|
||
writer.writeheader()
|
||
|
||
# 写入数据
|
||
for mapping in self.points_mapping:
|
||
writer.writerow(mapping)
|
||
|
||
print(f"CSV映射表生成完成: {output_path}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"生成CSV文件失败: {e}")
|
||
return False
|
||
|
||
def load_smap_file(self, smap_path):
|
||
"""加载SMAP文件"""
|
||
try:
|
||
with open(smap_path, 'r', encoding='utf-8') as f:
|
||
self.smap_data = json.load(f)
|
||
print(f"SMAP文件加载成功: {smap_path}")
|
||
return True
|
||
except FileNotFoundError:
|
||
print(f"文件不存在: {smap_path}")
|
||
return False
|
||
except json.JSONDecodeError as e:
|
||
print(f"SMAP文件格式错误: {e}")
|
||
return False
|
||
except Exception as e:
|
||
print(f"加载SMAP文件失败: {e}")
|
||
return False
|
||
|
||
def set_map_parameters(self, map_name, map_width, map_height, x_attr_min, y_attr_min):
|
||
"""设置地图参数"""
|
||
self.map_name = map_name
|
||
self.map_width = map_width
|
||
self.map_height = map_height
|
||
self.x_attr_min = x_attr_min
|
||
self.y_attr_min = y_attr_min
|
||
print(f"地图参数设置完成: {map_name} ({map_width}x{map_height})")
|
||
|
||
def generate_pgm_from_pointcloud(self):
|
||
"""从点云数据生成PGM地图,参照map_generator.html的方法"""
|
||
if not self.smap_data or 'normalPosList' not in self.smap_data:
|
||
print("SMAP数据中未找到normalPosList")
|
||
return False
|
||
|
||
normal_pos_list = self.smap_data['normalPosList']
|
||
if not normal_pos_list:
|
||
print("normalPosList为空")
|
||
return False
|
||
|
||
print(f"开始生成PGM地图,点云数量: {len(normal_pos_list)}")
|
||
|
||
# 过滤和验证点云数据
|
||
valid_points = []
|
||
invalid_count = 0
|
||
|
||
for i, point in enumerate(normal_pos_list):
|
||
# 检查点是否有必要的坐标字段
|
||
if isinstance(point, dict) and 'x' in point and 'y' in point:
|
||
# 检查坐标是否为有效数字
|
||
try:
|
||
x = float(point['x'])
|
||
y = float(point['y'])
|
||
valid_points.append({'x': x, 'y': y})
|
||
except (ValueError, TypeError):
|
||
invalid_count += 1
|
||
if invalid_count <= 5: # 只显示前5个错误
|
||
print(f"点 {i} 坐标无效: {point}")
|
||
else:
|
||
invalid_count += 1
|
||
if invalid_count <= 5: # 只显示前5个错误
|
||
print(f"点 {i} 格式无效: {point}")
|
||
|
||
if invalid_count > 0:
|
||
print(f"发现 {invalid_count} 个无效点,已过滤掉")
|
||
|
||
if not valid_points:
|
||
print("没有有效的点云数据")
|
||
return False
|
||
|
||
print(f"有效点云数量: {len(valid_points)}")
|
||
|
||
# 计算点云范围
|
||
xs = [p['x'] for p in valid_points]
|
||
ys = [p['y'] for p in valid_points]
|
||
min_x, max_x = min(xs), max(xs)
|
||
min_y, max_y = min(ys), max(ys)
|
||
range_x = max_x - min_x
|
||
range_y = max_y - min_y
|
||
|
||
print(f"点云范围: X[{min_x:.2f}, {max_x:.2f}], Y[{min_y:.2f}, {max_y:.2f}]")
|
||
|
||
# 设置分辨率和图像尺寸(参照map_generator.html的方法)
|
||
resolution = 0.02 # 2cm/pixel
|
||
width = max(512, int(np.ceil(range_x / resolution)))
|
||
height = max(512, int(np.ceil(range_y / resolution)))
|
||
|
||
print(f"PGM尺寸: {width}x{height}, 分辨率: {resolution}m/pixel")
|
||
|
||
# 创建图像数据,初始化为255(自由空间)
|
||
image_data = np.full(width * height, 255, dtype=np.uint8)
|
||
|
||
# 将点云数据映射到图像(参照map_generator.html的逻辑)
|
||
for point in valid_points:
|
||
pixel_x = int(np.floor((point['x'] - min_x) / resolution))
|
||
pixel_y = int(np.floor((point['y'] - min_y) / resolution))
|
||
|
||
if 0 <= pixel_x < width and 0 <= pixel_y < height:
|
||
# Y轴翻转
|
||
index = (height - 1 - pixel_y) * width + pixel_x
|
||
image_data[index] = 255 # 自由空间
|
||
|
||
# 在周围添加一些障碍物用于测试(参照map_generator.html)
|
||
radius = 2 # 模拟障碍物半径
|
||
for dx in range(-radius, radius + 1):
|
||
for dy in range(-radius, radius + 1):
|
||
nx = pixel_x + dx
|
||
ny = pixel_y + dy
|
||
if 0 <= nx < width and 0 <= ny < height:
|
||
n_index = (height - 1 - ny) * width + nx
|
||
# 只在边界处添加障碍物
|
||
if dx == -radius or dx == radius or dy == -radius or dy == radius:
|
||
if np.random.random() < 0.1: # 10%的概率添加障碍物
|
||
image_data[n_index] = 0 # 障碍物
|
||
|
||
# 保存PGM数据
|
||
self.pgm_data = {
|
||
'width': width,
|
||
'height': height,
|
||
'data': image_data,
|
||
'resolution': resolution,
|
||
'origin_x': min_x,
|
||
'origin_y': min_y
|
||
}
|
||
|
||
# 统计像素值分布
|
||
unique, counts = np.unique(image_data, return_counts=True)
|
||
pixel_stats = dict(zip(unique, counts))
|
||
stats_str = ', '.join([f'{"自由空间" if k == 255 else "障碍物" if k == 0 else str(k)}:{v}' for k, v in pixel_stats.items()])
|
||
print(f"像素统计: {stats_str}")
|
||
|
||
print("PGM地图生成完成")
|
||
return True
|
||
|
||
def generate_huarui_stations(self):
|
||
"""生成华睿站点数据"""
|
||
if not self.smap_data or 'advancedPointList' not in self.smap_data:
|
||
print("SMAP数据中未找到advancedPointList")
|
||
return False
|
||
|
||
advanced_point_list = self.smap_data['advancedPointList']
|
||
if not advanced_point_list:
|
||
print("advancedPointList为空")
|
||
return False
|
||
|
||
if not self.csv_mapping:
|
||
print("CSV映射数据为空,请先加载CSV映射文件")
|
||
return False
|
||
|
||
print(f"开始生成华睿站点,仙工站点数量: {len(advanced_point_list)}")
|
||
|
||
# 类型映射
|
||
type_mapping = {
|
||
'ActionPoint': {'type': 1, 'prefix': '01', 'name': '动作站点'},
|
||
'LocationMark': {'type': 0, 'prefix': '02', 'name': '路径站点'},
|
||
'ParkPoint': {'type': 7, 'prefix': '03', 'name': '停靠站点'},
|
||
'ChargePoint': {'type': 6, 'prefix': '04', 'name': '充电站点'}
|
||
}
|
||
|
||
self.huarui_stations = []
|
||
self.station_mapping = []
|
||
|
||
for smap_station in advanced_point_list:
|
||
station_info = type_mapping.get(smap_station['className'], {'type': 0, 'prefix': '02', 'name': '普通站点'})
|
||
|
||
# 使用CSV映射中的ID,如果找不到则跳过该站点
|
||
smap_name = smap_station['instanceName']
|
||
if smap_name not in self.csv_mapping:
|
||
print(f"警告: 站点 {smap_name} 未在CSV映射文件中找到,跳过")
|
||
continue
|
||
|
||
station_id = self.csv_mapping[smap_name]
|
||
|
||
# 坐标转换(米转毫米)
|
||
x = round(smap_station['pos']['x'] * 1000)
|
||
y = round(smap_station['pos']['y'] * 1000)
|
||
|
||
# 创建华睿站点,使用CSV中的ID作为content和name
|
||
huarui_station = {
|
||
"content": station_id,
|
||
"coordinate": {"x": x, "y": y},
|
||
"name": station_id,
|
||
"type": station_info['type'],
|
||
"extraTypes": [],
|
||
"isTurn": 0,
|
||
"shelfModel": 1,
|
||
"evadeNode": 1,
|
||
"shelfIsTurn": 1,
|
||
"isHandoverArea": 0,
|
||
"isVirtualPoint": False,
|
||
"isTurnRotatableRange": [0, 360000],
|
||
"QRCodeAngle": 0,
|
||
"isNavigationMarkPoint": 0,
|
||
"contentEdges": [],
|
||
"isTextureInitPoint": True,
|
||
"navigationMarkPoint": -1,
|
||
"navTypeV3": ["QRCode", "Texture", "Laser", "Mileage"],
|
||
"isIgnorePayload": False,
|
||
"isIgnoreAGV": False,
|
||
"machineAngle": 0,
|
||
"slopeDistance": [1000, 1000, 1000, 1000],
|
||
"actionObstacleType": [],
|
||
"obstacleType": [1, 1, 1, 1, 2, 8] if station_info['type'] == 1 else [0, 0, 0, 0, 2, 8],
|
||
"slope": [0, 0, 0, 0],
|
||
"shelfModelAndAngles": [{
|
||
"model": 2 if station_info['type'] == 1 else 1,
|
||
"angle": [999000],
|
||
"forkToothRadio": "front",
|
||
"forkToothFrontToStackDistance": 0,
|
||
"forkToothBackToStackDistance": 999
|
||
}],
|
||
"movPrecExpand": True,
|
||
"areaType": 1,
|
||
"driftSet": {"x": 0, "y": 0},
|
||
"identifyType": [0],
|
||
"pickupRelease": 0,
|
||
"extraRotExpansion": True,
|
||
"palletAlignment": "center",
|
||
"palletAlignmentType": 0,
|
||
"custom": "",
|
||
"navSignType": "QRCode",
|
||
"highlightDetection": False,
|
||
"AvoidObstaclesDeceleration": [1000, 1000, 1000, 1000],
|
||
"AvoidObstaclesEmergencyStop": [50, 50, 50, 50],
|
||
"isNavSignPoint": False,
|
||
"shelfMode": 2 if station_info['type'] == 1 else 1,
|
||
"id": station_id,
|
||
"code": "1" if station_info['type'] == 1 else "0",
|
||
"desp": 0,
|
||
"shelfAngle": [999000],
|
||
"SafetyDistance": [[3000, 1000], [3000, 1000], [3000, 1000], [3000, 1000]],
|
||
"SafetyShape": [["Rectage"], ["Rectage"], ["Rectage"], ["Rectage"]],
|
||
"barrierSwitch": 1,
|
||
"edges": []
|
||
}
|
||
|
||
self.huarui_stations.append(huarui_station)
|
||
|
||
# 创建映射关系
|
||
mapping = {
|
||
'smap_name': smap_station['instanceName'],
|
||
'smap_type': smap_station['className'],
|
||
'smap_type_desc': station_info['name'],
|
||
'huarui_id': station_id,
|
||
'huarui_type': station_info['type'],
|
||
'coordinate': {'x': x, 'y': y}
|
||
}
|
||
self.station_mapping.append(mapping)
|
||
|
||
# 生成路径连接
|
||
self._generate_path_connections()
|
||
|
||
print(f"华睿站点生成完成,共{len(self.huarui_stations)}个站点")
|
||
return True
|
||
|
||
def _generate_path_connections(self):
|
||
"""生成站点之间的路径连接"""
|
||
if not self.smap_data or 'advancedCurveList' not in self.smap_data:
|
||
return
|
||
|
||
# 创建instanceName到stationId的映射
|
||
instance_to_station = {}
|
||
for i, mapping in enumerate(self.station_mapping):
|
||
instance_to_station[mapping['smap_name']] = mapping['huarui_id']
|
||
|
||
# 获取所有DegenerateBezier类型的路径
|
||
paths = [curve for curve in self.smap_data['advancedCurveList']
|
||
if curve.get('className') == 'DegenerateBezier']
|
||
|
||
print(f"处理路径连接,共{len(paths)}条路径")
|
||
|
||
# 为每个站点生成contentEdges
|
||
for station in self.huarui_stations:
|
||
station['contentEdges'] = []
|
||
|
||
for path in paths:
|
||
if not path.get('startPos') or not path.get('endPos'):
|
||
continue
|
||
|
||
start_name = path['startPos'].get('instanceName')
|
||
end_name = path['endPos'].get('instanceName')
|
||
|
||
if not start_name or not end_name:
|
||
continue
|
||
|
||
start_station_id = instance_to_station.get(start_name)
|
||
end_station_id = instance_to_station.get(end_name)
|
||
|
||
# 如果当前站点是路径的起点,添加到终点的连接
|
||
if start_station_id == station['name'] and end_station_id:
|
||
target_station = next((s for s in self.huarui_stations if s['name'] == end_station_id), None)
|
||
if target_station:
|
||
edge = {
|
||
"destination": target_station['content'],
|
||
"leftWidth": -1,
|
||
"rightWidth": -1,
|
||
"startExpandDistance": -1,
|
||
"endExpandDistance": -1,
|
||
"needFollow": 1,
|
||
"weight": 1,
|
||
"avoidScene": 1,
|
||
"aspect": "3",
|
||
"turningMode": 0,
|
||
"freePlan": 0,
|
||
"edgeTurnAble": 1,
|
||
"speed": 2000,
|
||
"tangentAngle": 999000,
|
||
"outTangentAngle": 999000,
|
||
"navMode": 0,
|
||
"trackId": 0,
|
||
"bypass": False,
|
||
"pointAccuracy": -1,
|
||
"angleAccuracy": -1000,
|
||
"orientObject": "",
|
||
"deviceTravelDirection": "unlimited",
|
||
"retrograde": False,
|
||
"ignoreSensor": [],
|
||
"shelfIsPass": 1,
|
||
"trafficAMR": [],
|
||
"trafficType": 0,
|
||
"trafficLoad": [],
|
||
"extraLockExpansion": True,
|
||
"headstock": "withLine",
|
||
"unloadReachMode": 0,
|
||
"loadReachMode": 0,
|
||
"useSingleLaserLocate": False,
|
||
"custom": "",
|
||
"loadPass": 0
|
||
}
|
||
|
||
# 添加DegenerateBezier的controlPoints支持
|
||
if path.get('controlPos1') and path.get('controlPos2'):
|
||
# 转换控制点坐标(米转毫米)
|
||
control_points = [
|
||
[
|
||
round(path['controlPos1']['x'] * 1000),
|
||
round(path['controlPos1']['y'] * 1000)
|
||
],
|
||
[
|
||
round(path['controlPos2']['x'] * 1000),
|
||
round(path['controlPos2']['y'] * 1000)
|
||
]
|
||
]
|
||
edge["controlPoints"] = control_points
|
||
|
||
station['contentEdges'].append(edge)
|
||
|
||
# 如果当前站点是路径的终点,添加到起点的连接(双向路径)
|
||
if end_station_id == station['name'] and start_station_id:
|
||
target_station = next((s for s in self.huarui_stations if s['name'] == start_station_id), None)
|
||
if target_station:
|
||
edge = {
|
||
"destination": target_station['content'],
|
||
"leftWidth": -1,
|
||
"rightWidth": -1,
|
||
"startExpandDistance": -1,
|
||
"endExpandDistance": -1,
|
||
"needFollow": 1,
|
||
"weight": 1,
|
||
"avoidScene": 1,
|
||
"aspect": "3",
|
||
"turningMode": 0,
|
||
"freePlan": 0,
|
||
"edgeTurnAble": 1,
|
||
"speed": 2000,
|
||
"tangentAngle": 999000,
|
||
"outTangentAngle": 999000,
|
||
"navMode": 0,
|
||
"trackId": 0,
|
||
"bypass": False,
|
||
"pointAccuracy": -1,
|
||
"angleAccuracy": -1000,
|
||
"orientObject": "",
|
||
"deviceTravelDirection": "unlimited",
|
||
"retrograde": False,
|
||
"ignoreSensor": [],
|
||
"shelfIsPass": 1,
|
||
"trafficAMR": [],
|
||
"trafficType": 0,
|
||
"trafficLoad": [],
|
||
"extraLockExpansion": True,
|
||
"headstock": "withLine",
|
||
"unloadReachMode": 0,
|
||
"loadReachMode": 0,
|
||
"useSingleLaserLocate": False,
|
||
"custom": "",
|
||
"loadPass": 0
|
||
}
|
||
|
||
# 添加DegenerateBezier的controlPoints支持(反向路径)
|
||
if path.get('controlPos1') and path.get('controlPos2'):
|
||
# 反向路径的控制点顺序相反
|
||
control_points = [
|
||
[
|
||
round(path['controlPos2']['x'] * 1000),
|
||
round(path['controlPos2']['y'] * 1000)
|
||
],
|
||
[
|
||
round(path['controlPos1']['x'] * 1000),
|
||
round(path['controlPos1']['y'] * 1000)
|
||
]
|
||
]
|
||
edge["controlPoints"] = control_points
|
||
|
||
station['contentEdges'].append(edge)
|
||
|
||
# 去重contentEdges
|
||
unique_edges = []
|
||
seen_destinations = set()
|
||
for edge in station['contentEdges']:
|
||
if edge['destination'] not in seen_destinations:
|
||
seen_destinations.add(edge['destination'])
|
||
unique_edges.append(edge)
|
||
station['contentEdges'] = unique_edges
|
||
|
||
# 生成edges字段
|
||
station['edges'] = []
|
||
for edge in station['contentEdges']:
|
||
station['edges'].append({
|
||
"destination": edge['destination'],
|
||
"weight": edge['weight']
|
||
})
|
||
|
||
def generate_topo_json(self):
|
||
"""生成topo.json文件"""
|
||
topo = {
|
||
"map": {
|
||
"name": self.map_name,
|
||
"type": "topo",
|
||
"width": self.map_width,
|
||
"height": self.map_height,
|
||
"xAttrMin": self.x_attr_min,
|
||
"yAttrMin": self.y_attr_min,
|
||
"initSpeed": 2,
|
||
"obstacleType": [0, 0, 0, 0, 7, 8],
|
||
"actionObstacleType": [],
|
||
"confidence": [],
|
||
"yawThreshold": [],
|
||
"forbiddenArea": [],
|
||
"operableArea": [],
|
||
"freePlan": 0,
|
||
"turningMode": -1,
|
||
"margin": 0,
|
||
"rcsImageName": "",
|
||
"relocationImageName": "",
|
||
"shelfModel": 0,
|
||
"version": "3.0",
|
||
"navType": "qrcode",
|
||
"devicePartition": None,
|
||
"saveDistance": 1000,
|
||
"brakeDistance": 1000,
|
||
"initSimple": "Rectage",
|
||
"timeStamp": int(datetime.now().timestamp() * 1000),
|
||
"times": 34,
|
||
"icsImageName": "",
|
||
"widthFor-1": 800,
|
||
"expandFor-1": 100,
|
||
"mp_widthFor-1": 800,
|
||
"mp_expandFor-1": 0,
|
||
"mutiple_pallet_generate": True
|
||
},
|
||
"nodes": self.huarui_stations,
|
||
"attrArea": [],
|
||
"relatingAreas": [],
|
||
"virtualPoint": [],
|
||
"guidelineConfigs": [
|
||
{
|
||
"lineId": 1,
|
||
"sensor": "camera",
|
||
"direction": 0,
|
||
"width": 200,
|
||
"line": [1000, 0],
|
||
"offset": 150
|
||
}
|
||
]
|
||
}
|
||
return json.dumps(topo, indent=2, ensure_ascii=False)
|
||
|
||
def generate_yaml_config(self):
|
||
"""生成YAML配置文件"""
|
||
if not self.pgm_data:
|
||
return ""
|
||
|
||
yaml_content = f"""image: {self.map_name}.pgm
|
||
resolution: {self.pgm_data['resolution']:.6f}
|
||
origin: [{self.pgm_data['origin_x']:.6f}, {self.pgm_data['origin_y']:.6f}, 0.000000]
|
||
negate: 0
|
||
occupied_thresh: 0.65
|
||
free_thresh: 0.196
|
||
width: {self.pgm_data['width']}
|
||
height: {self.pgm_data['height']}
|
||
intensity_threshold: 1000.000000
|
||
intensities:
|
||
"""
|
||
return yaml_content
|
||
|
||
def generate_pgm_file(self):
|
||
"""生成PGM文件内容"""
|
||
if not self.pgm_data:
|
||
return None
|
||
|
||
header = f"P5\n# Generated by SMAP to Huarui Converter\n{self.pgm_data['width']} {self.pgm_data['height']}\n255\n"
|
||
header_bytes = header.encode('ascii')
|
||
|
||
# 确保数据格式正确
|
||
image_array = np.array(self.pgm_data['data'], dtype=np.uint8)
|
||
|
||
pgm_content = header_bytes + image_array.tobytes()
|
||
return pgm_content
|
||
|
||
def generate_background_jpg(self):
|
||
"""生成background.jpg文件,参照map_generator.html的转换逻辑"""
|
||
if not self.pgm_data:
|
||
return None
|
||
|
||
# 获取PGM数据
|
||
width = self.pgm_data['width']
|
||
height = self.pgm_data['height']
|
||
pgm_array = np.array(self.pgm_data['data'], dtype=np.uint8)
|
||
|
||
# 创建RGB图像数据
|
||
rgb_data = np.zeros((height, width, 3), dtype=np.uint8)
|
||
|
||
# 按照map_generator.html的逻辑转换:
|
||
# 255 (自由空间) -> 白色 (255, 255, 255)
|
||
# 0 (障碍物) -> 黑色 (0, 0, 0)
|
||
for i in range(len(pgm_array)):
|
||
y = i // width
|
||
x = i % width
|
||
value = pgm_array[i]
|
||
|
||
if value == 255:
|
||
# 自由空间 -> 白色
|
||
rgb_data[y, x] = [255, 255, 255]
|
||
else:
|
||
# 障碍物 -> 黑色
|
||
rgb_data[y, x] = [0, 0, 0]
|
||
|
||
# 创建PIL图像
|
||
image = Image.fromarray(rgb_data, mode='RGB')
|
||
|
||
return image
|
||
|
||
def create_map_package(self, output_dir):
|
||
"""创建华睿地图包"""
|
||
if not os.path.exists(output_dir):
|
||
os.makedirs(output_dir)
|
||
|
||
print(f"开始创建华睿地图包到目录: {output_dir}")
|
||
|
||
# 创建ZIP文件
|
||
zip_path = os.path.join(output_dir, f"{self.map_name}.zip")
|
||
|
||
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
||
# 1. topo.json
|
||
topo_content = self.generate_topo_json()
|
||
zipf.writestr("topo.json", topo_content)
|
||
print("topo.json")
|
||
|
||
# 2. PGM文件
|
||
pgm_content = self.generate_pgm_file()
|
||
if pgm_content:
|
||
zipf.writestr(f"{self.map_name}.pgm", pgm_content)
|
||
print(f"{self.map_name}.pgm")
|
||
|
||
# 3. YAML配置文件
|
||
yaml_content = self.generate_yaml_config()
|
||
if yaml_content:
|
||
zipf.writestr(f"{self.map_name}.yaml", yaml_content)
|
||
print(f"{self.map_name}.yaml")
|
||
|
||
# 4. background.jpg
|
||
bg_image = self.generate_background_jpg()
|
||
if bg_image:
|
||
import io
|
||
img_buffer = io.BytesIO()
|
||
bg_image.save(img_buffer, format='JPEG', quality=95)
|
||
zipf.writestr("background.jpg", img_buffer.getvalue())
|
||
print("background.jpg")
|
||
|
||
# 5. dbVersion.txt
|
||
db_version = datetime.now().strftime('%Y%m%d%H%M')
|
||
zipf.writestr("dbVersion.txt", db_version)
|
||
print("dbVersion.txt")
|
||
|
||
print(f"华睿地图包创建完成: {zip_path}")
|
||
return zip_path
|
||
|
||
def convert(self, scene_path, smap_path, map_width, map_height, x_attr_min, y_attr_min, output_dir="output"):
|
||
"""执行完整的转换流程"""
|
||
print("开始SMAP到华睿地图包转换")
|
||
print("=" * 60)
|
||
|
||
# 1. 加载Scene文件
|
||
if not self.load_scene_file(scene_path):
|
||
return False
|
||
|
||
# 2. 提取Scene文件中的点位映射
|
||
if not self.extract_points_mapping():
|
||
return False
|
||
|
||
# 3. 加载SMAP文件
|
||
if not self.load_smap_file(smap_path):
|
||
return False
|
||
|
||
# 4. 使用smap文件名作为地图名称
|
||
smap_filename = os.path.splitext(os.path.basename(smap_path))[0]
|
||
self.set_map_parameters(smap_filename, map_width, map_height, x_attr_min, y_attr_min)
|
||
|
||
# 5. 生成PGM地图
|
||
if not self.generate_pgm_from_pointcloud():
|
||
return False
|
||
|
||
# 6. 生成华睿站点
|
||
if not self.generate_huarui_stations():
|
||
return False
|
||
|
||
# 7. 生成CSV映射表
|
||
if not os.path.exists(output_dir):
|
||
os.makedirs(output_dir)
|
||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||
csv_path = os.path.join(output_dir, f"{smap_filename}_PointsMapping_{timestamp}.csv")
|
||
self.generate_csv_report(csv_path)
|
||
|
||
# 8. 创建华睿地图包
|
||
map_package_path = self.create_map_package(output_dir)
|
||
|
||
print("=" * 60)
|
||
print("转换完成!")
|
||
print(f"华睿地图包: {map_package_path}")
|
||
print(f"站点映射表: {csv_path}")
|
||
|
||
return True
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description='仙工SMAP到华睿地图包转换器(集成Scene点位映射提取)')
|
||
parser.add_argument('scene_path', help='Scene文件路径')
|
||
parser.add_argument('smap_path', help='仙工SMAP文件路径')
|
||
parser.add_argument('map_width', type=int, help='地图宽度(mm)')
|
||
parser.add_argument('map_height', type=int, help='地图高度(mm)')
|
||
parser.add_argument('x_attr_min', type=int, help='X最小值(mm)')
|
||
parser.add_argument('y_attr_min', type=int, help='Y最小值(mm)')
|
||
|
||
args = parser.parse_args()
|
||
|
||
# 检查Scene文件是否存在
|
||
if not os.path.exists(args.scene_path):
|
||
print(f"Scene文件不存在: {args.scene_path}")
|
||
sys.exit(1)
|
||
|
||
# 检查SMAP文件是否存在
|
||
if not os.path.exists(args.smap_path):
|
||
print(f"SMAP文件不存在: {args.smap_path}")
|
||
sys.exit(1)
|
||
|
||
# 创建转换器并执行转换
|
||
converter = SmapToHuaruiConverter()
|
||
success = converter.convert(
|
||
args.scene_path,
|
||
args.smap_path,
|
||
args.map_width,
|
||
args.map_height,
|
||
args.x_attr_min,
|
||
args.y_attr_min,
|
||
args.output
|
||
)
|
||
|
||
if success:
|
||
print("\n✅ 转换成功完成!")
|
||
sys.exit(0)
|
||
else:
|
||
print("\n❌ 转换失败!")
|
||
sys.exit(1)
|
||
|
||
def convert_smap_to_iray(scene_path, smap_path, map_width, map_height, x_attr_min, y_attr_min, output_dir="iray_output"):
|
||
"""
|
||
API调用接口:将SMAP和Scene文件转换为华睿(IRAY)地图包
|
||
|
||
Args:
|
||
scene_path: Scene文件路径
|
||
smap_path: SMAP文件路径
|
||
map_width: 地图宽度(mm)
|
||
map_height: 地图高度(mm)
|
||
x_attr_min: X最小值(mm)
|
||
y_attr_min: Y最小值(mm)
|
||
output_dir: 输出目录,默认为 "iray_output"
|
||
|
||
Returns:
|
||
dict: 包含转换结果的字典
|
||
"""
|
||
try:
|
||
# 创建转换器实例
|
||
converter = SmapToHuaruiConverter()
|
||
|
||
# 执行转换
|
||
success = converter.convert(
|
||
scene_path,
|
||
smap_path,
|
||
map_width,
|
||
map_height,
|
||
x_attr_min,
|
||
y_attr_min,
|
||
output_dir
|
||
)
|
||
|
||
if success:
|
||
# 获取生成的文件信息
|
||
smap_filename = os.path.splitext(os.path.basename(smap_path))[0]
|
||
map_package_path = os.path.join(output_dir, f"{smap_filename}.zip")
|
||
|
||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||
csv_path = os.path.join(output_dir, f"{smap_filename}_PointsMapping_{timestamp}.csv")
|
||
|
||
return {
|
||
"success": True,
|
||
"message": "转换成功完成",
|
||
"map_package_path": map_package_path,
|
||
"csv_mapping_path": csv_path,
|
||
"station_count": len(converter.huarui_stations) if converter.huarui_stations else 0
|
||
}
|
||
else:
|
||
return {
|
||
"success": False,
|
||
"message": "转换失败",
|
||
"error": "转换过程中出现错误"
|
||
}
|
||
|
||
except Exception as e:
|
||
return {
|
||
"success": False,
|
||
"message": "转换失败",
|
||
"error": str(e)
|
||
}
|
||
|
||
if __name__ == "__main__":
|
||
main() |