#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 仙工SMAP到华睿地图包转换器 输入仙工SMAP文件和Scene文件,输出华睿地图包和站点映射表 """ import json import os import sys import argparse import zipfile import csv from datetime import datetime from PIL import Image import numpy as np class SmapToHuaruiConverter: def __init__(self): self.smap_data = None self.scene_data = None self.map_name = "" self.map_width = 0 self.map_height = 0 self.x_attr_min = 0 self.y_attr_min = 0 self.pgm_data = None self.huarui_stations = [] self.station_mapping = [] self.csv_mapping = {} # 从Scene文件提取的站点映射 self.points_mapping = [] # Scene文件中的点位映射 def load_scene_file(self, scene_path): """加载Scene文件""" try: with open(scene_path, 'r', encoding='utf-8') as f: self.scene_data = json.load(f) print(f"Scene文件加载成功: {scene_path}") return True except FileNotFoundError: print(f"文件不存在: {scene_path}") return False except json.JSONDecodeError as e: print(f"Scene文件格式错误: {e}") return False except Exception as e: print(f"加载Scene文件失败: {e}") return False def extract_points_mapping(self): """提取points字段的映射关系""" if not self.scene_data or 'points' not in self.scene_data: print("Scene数据中未找到points字段") return False points = self.scene_data['points'] if not points: print("points字段为空") return False print(f"开始提取站点映射关系,共{len(points)}个站点") self.points_mapping = [] for i, point in enumerate(points, 1): mapping = { 'sequence': i, 'name': point.get('name', ''), 'id': point.get('id', '') } self.points_mapping.append(mapping) # 同时填充csv_mapping用于后续处理 if mapping['name']: self.csv_mapping[mapping['name']] = mapping['id'] print(f"站点映射提取完成,共{len(self.points_mapping)}个站点") return True def generate_csv_report(self, output_path): """生成CSV格式的映射表""" if not self.points_mapping: print("没有映射数据可导出") return False try: with open(output_path, 'w', newline='', encoding='utf-8-sig') as csvfile: fieldnames = ['sequence', 'name', 'id'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) # 写入表头 writer.writeheader() # 写入数据 for mapping in self.points_mapping: writer.writerow(mapping) print(f"CSV映射表生成完成: {output_path}") return True except Exception as e: print(f"生成CSV文件失败: {e}") return False def load_smap_file(self, smap_path): """加载SMAP文件""" try: with open(smap_path, 'r', encoding='utf-8') as f: self.smap_data = json.load(f) print(f"SMAP文件加载成功: {smap_path}") return True except FileNotFoundError: print(f"文件不存在: {smap_path}") return False except json.JSONDecodeError as e: print(f"SMAP文件格式错误: {e}") return False except Exception as e: print(f"加载SMAP文件失败: {e}") return False def set_map_parameters(self, map_name, map_width, map_height, x_attr_min, y_attr_min): """设置地图参数""" self.map_name = map_name self.map_width = map_width self.map_height = map_height self.x_attr_min = x_attr_min self.y_attr_min = y_attr_min print(f"地图参数设置完成: {map_name} ({map_width}x{map_height})") def generate_pgm_from_pointcloud(self): """从点云数据生成PGM地图,参照map_generator.html的方法""" if not self.smap_data or 'normalPosList' not in self.smap_data: print("SMAP数据中未找到normalPosList") return False normal_pos_list = self.smap_data['normalPosList'] if not normal_pos_list: print("normalPosList为空") return False print(f"开始生成PGM地图,点云数量: {len(normal_pos_list)}") # 过滤和验证点云数据 valid_points = [] invalid_count = 0 for i, point in enumerate(normal_pos_list): # 检查点是否有必要的坐标字段 if isinstance(point, dict) and 'x' in point and 'y' in point: # 检查坐标是否为有效数字 try: x = float(point['x']) y = float(point['y']) valid_points.append({'x': x, 'y': y}) except (ValueError, TypeError): invalid_count += 1 if invalid_count <= 5: # 只显示前5个错误 print(f"点 {i} 坐标无效: {point}") else: invalid_count += 1 if invalid_count <= 5: # 只显示前5个错误 print(f"点 {i} 格式无效: {point}") if invalid_count > 0: print(f"发现 {invalid_count} 个无效点,已过滤掉") if not valid_points: print("没有有效的点云数据") return False print(f"有效点云数量: {len(valid_points)}") # 计算点云范围 xs = [p['x'] for p in valid_points] ys = [p['y'] for p in valid_points] min_x, max_x = min(xs), max(xs) min_y, max_y = min(ys), max(ys) range_x = max_x - min_x range_y = max_y - min_y print(f"点云范围: X[{min_x:.2f}, {max_x:.2f}], Y[{min_y:.2f}, {max_y:.2f}]") # 设置分辨率和图像尺寸(参照map_generator.html的方法) resolution = 0.02 # 2cm/pixel width = max(512, int(np.ceil(range_x / resolution))) height = max(512, int(np.ceil(range_y / resolution))) print(f"PGM尺寸: {width}x{height}, 分辨率: {resolution}m/pixel") # 创建图像数据,初始化为255(自由空间) image_data = np.full(width * height, 255, dtype=np.uint8) # 将点云数据映射到图像(参照map_generator.html的逻辑) for point in valid_points: pixel_x = int(np.floor((point['x'] - min_x) / resolution)) pixel_y = int(np.floor((point['y'] - min_y) / resolution)) if 0 <= pixel_x < width and 0 <= pixel_y < height: # Y轴翻转 index = (height - 1 - pixel_y) * width + pixel_x image_data[index] = 255 # 自由空间 # 在周围添加一些障碍物用于测试(参照map_generator.html) radius = 2 # 模拟障碍物半径 for dx in range(-radius, radius + 1): for dy in range(-radius, radius + 1): nx = pixel_x + dx ny = pixel_y + dy if 0 <= nx < width and 0 <= ny < height: n_index = (height - 1 - ny) * width + nx # 只在边界处添加障碍物 if dx == -radius or dx == radius or dy == -radius or dy == radius: if np.random.random() < 0.1: # 10%的概率添加障碍物 image_data[n_index] = 0 # 障碍物 # 保存PGM数据 self.pgm_data = { 'width': width, 'height': height, 'data': image_data, 'resolution': resolution, 'origin_x': min_x, 'origin_y': min_y } # 统计像素值分布 unique, counts = np.unique(image_data, return_counts=True) pixel_stats = dict(zip(unique, counts)) stats_str = ', '.join([f'{"自由空间" if k == 255 else "障碍物" if k == 0 else str(k)}:{v}' for k, v in pixel_stats.items()]) print(f"像素统计: {stats_str}") print("PGM地图生成完成") return True def generate_huarui_stations(self): """生成华睿站点数据""" if not self.smap_data or 'advancedPointList' not in self.smap_data: print("SMAP数据中未找到advancedPointList") return False advanced_point_list = self.smap_data['advancedPointList'] if not advanced_point_list: print("advancedPointList为空") return False if not self.csv_mapping: print("CSV映射数据为空,请先加载CSV映射文件") return False print(f"开始生成华睿站点,仙工站点数量: {len(advanced_point_list)}") # 类型映射 type_mapping = { 'ActionPoint': {'type': 1, 'prefix': '01', 'name': '动作站点'}, 'LocationMark': {'type': 0, 'prefix': '02', 'name': '路径站点'}, 'ParkPoint': {'type': 7, 'prefix': '03', 'name': '停靠站点'}, 'ChargePoint': {'type': 6, 'prefix': '04', 'name': '充电站点'} } self.huarui_stations = [] self.station_mapping = [] for smap_station in advanced_point_list: station_info = type_mapping.get(smap_station['className'], {'type': 0, 'prefix': '02', 'name': '普通站点'}) # 使用CSV映射中的ID,如果找不到则跳过该站点 smap_name = smap_station['instanceName'] if smap_name not in self.csv_mapping: print(f"警告: 站点 {smap_name} 未在CSV映射文件中找到,跳过") continue station_id = self.csv_mapping[smap_name] # 坐标转换(米转毫米) x = round(smap_station['pos']['x'] * 1000) y = round(smap_station['pos']['y'] * 1000) # 创建华睿站点,使用CSV中的ID作为content和name huarui_station = { "content": station_id, "coordinate": {"x": x, "y": y}, "name": station_id, "type": station_info['type'], "extraTypes": [], "isTurn": 0, "shelfModel": 1, "evadeNode": 1, "shelfIsTurn": 1, "isHandoverArea": 0, "isVirtualPoint": False, "isTurnRotatableRange": [0, 360000], "QRCodeAngle": 0, "isNavigationMarkPoint": 0, "contentEdges": [], "isTextureInitPoint": True, "navigationMarkPoint": -1, "navTypeV3": ["QRCode", "Texture", "Laser", "Mileage"], "isIgnorePayload": False, "isIgnoreAGV": False, "machineAngle": 0, "slopeDistance": [1000, 1000, 1000, 1000], "actionObstacleType": [], "obstacleType": [1, 1, 1, 1, 2, 8] if station_info['type'] == 1 else [0, 0, 0, 0, 2, 8], "slope": [0, 0, 0, 0], "shelfModelAndAngles": [{ "model": 2 if station_info['type'] == 1 else 1, "angle": [999000], "forkToothRadio": "front", "forkToothFrontToStackDistance": 0, "forkToothBackToStackDistance": 999 }], "movPrecExpand": True, "areaType": 1, "driftSet": {"x": 0, "y": 0}, "identifyType": [0], "pickupRelease": 0, "extraRotExpansion": True, "palletAlignment": "center", "palletAlignmentType": 0, "custom": "", "navSignType": "QRCode", "highlightDetection": False, "AvoidObstaclesDeceleration": [1000, 1000, 1000, 1000], "AvoidObstaclesEmergencyStop": [50, 50, 50, 50], "isNavSignPoint": False, "shelfMode": 2 if station_info['type'] == 1 else 1, "id": station_id, "code": "1" if station_info['type'] == 1 else "0", "desp": 0, "shelfAngle": [999000], "SafetyDistance": [[3000, 1000], [3000, 1000], [3000, 1000], [3000, 1000]], "SafetyShape": [["Rectage"], ["Rectage"], ["Rectage"], ["Rectage"]], "barrierSwitch": 1, "edges": [] } self.huarui_stations.append(huarui_station) # 创建映射关系 mapping = { 'smap_name': smap_station['instanceName'], 'smap_type': smap_station['className'], 'smap_type_desc': station_info['name'], 'huarui_id': station_id, 'huarui_type': station_info['type'], 'coordinate': {'x': x, 'y': y} } self.station_mapping.append(mapping) # 生成路径连接 self._generate_path_connections() print(f"华睿站点生成完成,共{len(self.huarui_stations)}个站点") return True def _generate_path_connections(self): """生成站点之间的路径连接""" if not self.smap_data or 'advancedCurveList' not in self.smap_data: return # 创建instanceName到stationId的映射 instance_to_station = {} for i, mapping in enumerate(self.station_mapping): instance_to_station[mapping['smap_name']] = mapping['huarui_id'] # 获取所有DegenerateBezier类型的路径 paths = [curve for curve in self.smap_data['advancedCurveList'] if curve.get('className') == 'DegenerateBezier'] print(f"处理路径连接,共{len(paths)}条路径") # 为每个站点生成contentEdges for station in self.huarui_stations: station['contentEdges'] = [] for path in paths: if not path.get('startPos') or not path.get('endPos'): continue start_name = path['startPos'].get('instanceName') end_name = path['endPos'].get('instanceName') if not start_name or not end_name: continue start_station_id = instance_to_station.get(start_name) end_station_id = instance_to_station.get(end_name) # 如果当前站点是路径的起点,添加到终点的连接 if start_station_id == station['name'] and end_station_id: target_station = next((s for s in self.huarui_stations if s['name'] == end_station_id), None) if target_station: edge = { "destination": target_station['content'], "leftWidth": -1, "rightWidth": -1, "startExpandDistance": -1, "endExpandDistance": -1, "needFollow": 1, "weight": 1, "avoidScene": 1, "aspect": "3", "turningMode": 0, "freePlan": 0, "edgeTurnAble": 1, "speed": 2000, "tangentAngle": 999000, "outTangentAngle": 999000, "navMode": 0, "trackId": 0, "bypass": False, "pointAccuracy": -1, "angleAccuracy": -1000, "orientObject": "", "deviceTravelDirection": "unlimited", "retrograde": False, "ignoreSensor": [], "shelfIsPass": 1, "trafficAMR": [], "trafficType": 0, "trafficLoad": [], "extraLockExpansion": True, "headstock": "withLine", "unloadReachMode": 0, "loadReachMode": 0, "useSingleLaserLocate": False, "custom": "", "loadPass": 0 } # 添加DegenerateBezier的controlPoints支持 if path.get('controlPos1') and path.get('controlPos2'): # 转换控制点坐标(米转毫米) control_points = [ [ round(path['controlPos1']['x'] * 1000), round(path['controlPos1']['y'] * 1000) ], [ round(path['controlPos2']['x'] * 1000), round(path['controlPos2']['y'] * 1000) ] ] edge["controlPoints"] = control_points station['contentEdges'].append(edge) # 如果当前站点是路径的终点,添加到起点的连接(双向路径) if end_station_id == station['name'] and start_station_id: target_station = next((s for s in self.huarui_stations if s['name'] == start_station_id), None) if target_station: edge = { "destination": target_station['content'], "leftWidth": -1, "rightWidth": -1, "startExpandDistance": -1, "endExpandDistance": -1, "needFollow": 1, "weight": 1, "avoidScene": 1, "aspect": "3", "turningMode": 0, "freePlan": 0, "edgeTurnAble": 1, "speed": 2000, "tangentAngle": 999000, "outTangentAngle": 999000, "navMode": 0, "trackId": 0, "bypass": False, "pointAccuracy": -1, "angleAccuracy": -1000, "orientObject": "", "deviceTravelDirection": "unlimited", "retrograde": False, "ignoreSensor": [], "shelfIsPass": 1, "trafficAMR": [], "trafficType": 0, "trafficLoad": [], "extraLockExpansion": True, "headstock": "withLine", "unloadReachMode": 0, "loadReachMode": 0, "useSingleLaserLocate": False, "custom": "", "loadPass": 0 } # 添加DegenerateBezier的controlPoints支持(反向路径) if path.get('controlPos1') and path.get('controlPos2'): # 反向路径的控制点顺序相反 control_points = [ [ round(path['controlPos2']['x'] * 1000), round(path['controlPos2']['y'] * 1000) ], [ round(path['controlPos1']['x'] * 1000), round(path['controlPos1']['y'] * 1000) ] ] edge["controlPoints"] = control_points station['contentEdges'].append(edge) # 去重contentEdges unique_edges = [] seen_destinations = set() for edge in station['contentEdges']: if edge['destination'] not in seen_destinations: seen_destinations.add(edge['destination']) unique_edges.append(edge) station['contentEdges'] = unique_edges # 生成edges字段 station['edges'] = [] for edge in station['contentEdges']: station['edges'].append({ "destination": edge['destination'], "weight": edge['weight'] }) def generate_topo_json(self): """生成topo.json文件""" topo = { "map": { "name": self.map_name, "type": "topo", "width": self.map_width, "height": self.map_height, "xAttrMin": self.x_attr_min, "yAttrMin": self.y_attr_min, "initSpeed": 2, "obstacleType": [0, 0, 0, 0, 7, 8], "actionObstacleType": [], "confidence": [], "yawThreshold": [], "forbiddenArea": [], "operableArea": [], "freePlan": 0, "turningMode": -1, "margin": 0, "rcsImageName": "", "relocationImageName": "", "shelfModel": 0, "version": "3.0", "navType": "qrcode", "devicePartition": None, "saveDistance": 1000, "brakeDistance": 1000, "initSimple": "Rectage", "timeStamp": int(datetime.now().timestamp() * 1000), "times": 34, "icsImageName": "", "widthFor-1": 800, "expandFor-1": 100, "mp_widthFor-1": 800, "mp_expandFor-1": 0, "mutiple_pallet_generate": True }, "nodes": self.huarui_stations, "attrArea": [], "relatingAreas": [], "virtualPoint": [], "guidelineConfigs": [ { "lineId": 1, "sensor": "camera", "direction": 0, "width": 200, "line": [1000, 0], "offset": 150 } ] } return json.dumps(topo, indent=2, ensure_ascii=False) def generate_yaml_config(self): """生成YAML配置文件""" if not self.pgm_data: return "" yaml_content = f"""image: {self.map_name}.pgm resolution: {self.pgm_data['resolution']:.6f} origin: [{self.pgm_data['origin_x']:.6f}, {self.pgm_data['origin_y']:.6f}, 0.000000] negate: 0 occupied_thresh: 0.65 free_thresh: 0.196 width: {self.pgm_data['width']} height: {self.pgm_data['height']} intensity_threshold: 1000.000000 intensities: """ return yaml_content def generate_pgm_file(self): """生成PGM文件内容""" if not self.pgm_data: return None header = f"P5\n# Generated by SMAP to Huarui Converter\n{self.pgm_data['width']} {self.pgm_data['height']}\n255\n" header_bytes = header.encode('ascii') # 确保数据格式正确 image_array = np.array(self.pgm_data['data'], dtype=np.uint8) pgm_content = header_bytes + image_array.tobytes() return pgm_content def generate_background_jpg(self): """生成background.jpg文件,参照map_generator.html的转换逻辑""" if not self.pgm_data: return None # 获取PGM数据 width = self.pgm_data['width'] height = self.pgm_data['height'] pgm_array = np.array(self.pgm_data['data'], dtype=np.uint8) # 创建RGB图像数据 rgb_data = np.zeros((height, width, 3), dtype=np.uint8) # 按照map_generator.html的逻辑转换: # 255 (自由空间) -> 白色 (255, 255, 255) # 0 (障碍物) -> 黑色 (0, 0, 0) for i in range(len(pgm_array)): y = i // width x = i % width value = pgm_array[i] if value == 255: # 自由空间 -> 白色 rgb_data[y, x] = [255, 255, 255] else: # 障碍物 -> 黑色 rgb_data[y, x] = [0, 0, 0] # 创建PIL图像 image = Image.fromarray(rgb_data, mode='RGB') return image def create_map_package(self, output_dir): """创建华睿地图包""" if not os.path.exists(output_dir): os.makedirs(output_dir) print(f"开始创建华睿地图包到目录: {output_dir}") # 创建ZIP文件 zip_path = os.path.join(output_dir, f"{self.map_name}.zip") with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: # 1. topo.json topo_content = self.generate_topo_json() zipf.writestr("topo.json", topo_content) print("topo.json") # 2. PGM文件 pgm_content = self.generate_pgm_file() if pgm_content: zipf.writestr(f"{self.map_name}.pgm", pgm_content) print(f"{self.map_name}.pgm") # 3. YAML配置文件 yaml_content = self.generate_yaml_config() if yaml_content: zipf.writestr(f"{self.map_name}.yaml", yaml_content) print(f"{self.map_name}.yaml") # 4. background.jpg bg_image = self.generate_background_jpg() if bg_image: import io img_buffer = io.BytesIO() bg_image.save(img_buffer, format='JPEG', quality=95) zipf.writestr("background.jpg", img_buffer.getvalue()) print("background.jpg") # 5. dbVersion.txt db_version = datetime.now().strftime('%Y%m%d%H%M') zipf.writestr("dbVersion.txt", db_version) print("dbVersion.txt") print(f"华睿地图包创建完成: {zip_path}") return zip_path def convert(self, scene_path, smap_path, map_width, map_height, x_attr_min, y_attr_min, output_dir="output"): """执行完整的转换流程""" print("开始SMAP到华睿地图包转换") print("=" * 60) # 1. 加载Scene文件 if not self.load_scene_file(scene_path): return False # 2. 提取Scene文件中的点位映射 if not self.extract_points_mapping(): return False # 3. 加载SMAP文件 if not self.load_smap_file(smap_path): return False # 4. 使用smap文件名作为地图名称 smap_filename = os.path.splitext(os.path.basename(smap_path))[0] self.set_map_parameters(smap_filename, map_width, map_height, x_attr_min, y_attr_min) # 5. 生成PGM地图 if not self.generate_pgm_from_pointcloud(): return False # 6. 生成华睿站点 if not self.generate_huarui_stations(): return False # 7. 生成CSV映射表 if not os.path.exists(output_dir): os.makedirs(output_dir) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') csv_path = os.path.join(output_dir, f"{smap_filename}_PointsMapping_{timestamp}.csv") self.generate_csv_report(csv_path) # 8. 创建华睿地图包 map_package_path = self.create_map_package(output_dir) print("=" * 60) print("转换完成!") print(f"华睿地图包: {map_package_path}") print(f"站点映射表: {csv_path}") return True def main(): parser = argparse.ArgumentParser(description='仙工SMAP到华睿地图包转换器(集成Scene点位映射提取)') parser.add_argument('scene_path', help='Scene文件路径') parser.add_argument('smap_path', help='仙工SMAP文件路径') parser.add_argument('map_width', type=int, help='地图宽度(mm)') parser.add_argument('map_height', type=int, help='地图高度(mm)') parser.add_argument('x_attr_min', type=int, help='X最小值(mm)') parser.add_argument('y_attr_min', type=int, help='Y最小值(mm)') args = parser.parse_args() # 检查Scene文件是否存在 if not os.path.exists(args.scene_path): print(f"Scene文件不存在: {args.scene_path}") sys.exit(1) # 检查SMAP文件是否存在 if not os.path.exists(args.smap_path): print(f"SMAP文件不存在: {args.smap_path}") sys.exit(1) # 创建转换器并执行转换 converter = SmapToHuaruiConverter() success = converter.convert( args.scene_path, args.smap_path, args.map_width, args.map_height, args.x_attr_min, args.y_attr_min, args.output ) if success: print("\n✅ 转换成功完成!") sys.exit(0) else: print("\n❌ 转换失败!") sys.exit(1) def convert_smap_to_iray(scene_path, smap_path, map_width, map_height, x_attr_min, y_attr_min, output_dir="iray_output"): """ API调用接口:将SMAP和Scene文件转换为华睿(IRAY)地图包 Args: scene_path: Scene文件路径 smap_path: SMAP文件路径 map_width: 地图宽度(mm) map_height: 地图高度(mm) x_attr_min: X最小值(mm) y_attr_min: Y最小值(mm) output_dir: 输出目录,默认为 "iray_output" Returns: dict: 包含转换结果的字典 """ try: # 创建转换器实例 converter = SmapToHuaruiConverter() # 执行转换 success = converter.convert( scene_path, smap_path, map_width, map_height, x_attr_min, y_attr_min, output_dir ) if success: # 获取生成的文件信息 smap_filename = os.path.splitext(os.path.basename(smap_path))[0] map_package_path = os.path.join(output_dir, f"{smap_filename}.zip") timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') csv_path = os.path.join(output_dir, f"{smap_filename}_PointsMapping_{timestamp}.csv") return { "success": True, "message": "转换成功完成", "map_package_path": map_package_path, "csv_mapping_path": csv_path, "station_count": len(converter.huarui_stations) if converter.huarui_stations else 0 } else: return { "success": False, "message": "转换失败", "error": "转换过程中出现错误" } except Exception as e: return { "success": False, "message": "转换失败", "error": str(e) } if __name__ == "__main__": main()