面向具身智能的大规模轨迹自动化清洗与数据重打标Pipeline
核心工作: 开发一套 Python 工具包。输入是几百 GB 脏乱差的 HDF5 原始数据,你的脚本能自动并行化地:筛出机械臂受力异常的无效帧。做相机图像与动作的时间戳对齐。调用 VLM(如免费的 API)给数据自动打高质量的文本标签(这叫 Data Relabeling,现在极其吃香)。交付物: 不是虚假的成功率,而是清洗速度(如处理100GB数据仅需XX分钟),以及清洗前后数据质量的可视化对比
文章目录
一、项目简介
- 核心工作: 开发一套 Python 工具包。输入是几百 GB 脏乱差的 HDF5 原始数据,你的脚本能自动并行化地:
-
筛出机械臂受力异常的无效帧。
-
做相机图像与动作的时间戳对齐。
-
调用 VLM(如免费的 API)给数据自动打高质量的文本标签(这叫 Data Relabeling,现在极其吃香)。
-
交付物: 不是虚假的成功率,而是清洗速度(如处理100GB数据仅需XX分钟),以及清洗前后数据质量的可视化对比图。
-
核心技术栈:
-
数据处理: h5py (处理HDF5), zarr, numpy, scipy (插值对齐)
-
多线程/并发: multiprocessing 或 concurrent.futures (展示处理速度)
-
视觉大模型 (VLM): 调用免费的 API (如 Gemini 1.5 Flash 或开源的 Qwen-VL) 进行自动化文本打标
注意,本项目不太需要租赁GPU来实现,即使需要租,也是去租一台高 CPU 核心数(比如 32 核或 64 核)、大内存(64GB+)、最重要的是必须配备 NVMe 固态硬盘的机器(读取几十GB的 HDF5,如果是机械硬盘会让你等到绝望)。
另外,如果你需要本地跑 VLM,带一张 RTX 3090 或 4090(大约 2 块钱/小时)即可。
二、环境搭建
具身智能的数据工程师,每天打交道最多的格式就是 HDF5 (.hdf5 或 .h5)。它就像是一个带压缩功能的“虚拟文件夹”,里面可以嵌套存放大规模的多维数组(视频帧、传感器数据)。同时,在写代码之前,我们需要建立一个概念:HDF5 文件的内部结构非常像电脑里的文件系统,它由两部分组成:
-
Groups(组): 相当于“文件夹”。
-
Datasets(数据集): 相当于“文件”,里面真正装着 NumPy 数组。
在具身智能的场景中,一个典型的文件(比如一条机器人抓取杯子的轨迹)通常长这样:
-
/data/demo_1/obs/camera_rgb (存放 [总帧数, 高, 宽, 3] 的图像数组)
-
/data/demo_1/obs/joint_positions (存放 [总帧数, 6] 的机械臂6个关节角度)
-
/data/demo_1/actions (存放 [总帧数, 7] 的动作指令,包含6个关节增量+1个夹爪开合度)
我们现在本地打开anaconda prompt, 创建一个干净的虚拟环境并安装核心依赖(注意本地操作时关闭VPN):
# 创建并激活虚拟环境
conda create -n vla_pipeline python=3.10 -y
conda activate vla_pipeline
# 安装基础数据处理和可视化库
pip install h5py numpy pandas matplotlib opencv-python tqdm
pip install huggingface_hub
pip install scipy
pip install pillow
不同实验室(如 Stanford 的 ALOHA、Berkeley 的 Bridge)开源的数据集,内部的键名(Keys)都不一样。数据工程师接手新数据集的第一件事,就是写个脚本把它的“骨架”打印出来。接下来我们编写能读取和解析 HDF5 格式轨迹数据集的脚本:
此时我们打开VSCode ,新建项目文件夹vla_data_pipline,先激活环境:conda activate vla_pipeline,再在该文件夹下新建一个 Python 文件 bridge_parser.py,将以下代码复制进去:
import h5py
import numpy as np
import cv2
import os
class VLADatasetParser:
def __init__(self, h5_path):
self.h5_path = h5_path
self.data_dict = {}
def extract_trajectory(self):
"""
核心能力:智能搜索并提取 HDF5 中的核心字段
支持格式:Robomimic / Bridge V2 / ALOHA / 我们的测试用例
"""
print(f"🔄 正在解析数据集: {self.h5_path}")
try:
with h5py.File(self.h5_path, 'r') as f:
# ==========================================
# 智能路由:判断根目录结构
# ==========================================
if 'data' in f:
demo_key = list(f['data'].keys())[0] # 找到类似 demo_0 的组
demo = f['data'][demo_key]
else:
demo = f # ALOHA 格式,直接在根目录
# ==========================================
# 1. 提取图像 (加入对多种键名的兼容)
# ==========================================
if 'obs' in demo and 'agentview_image' in demo['obs']:
self.data_dict['images'] = demo['obs']['agentview_image'][:]
elif 'observations' in demo and 'images0' in demo['observations']:
self.data_dict['images'] = demo['observations']['images0'][:]
# 兼容我们生成的测试数据: observations/images/top
elif 'observations' in demo and 'images' in demo['observations'] and 'top' in demo['observations']['images']:
self.data_dict['images'] = demo['observations']['images']['top'][:]
else:
raise ValueError("未找到标准图像字段!")
# ==========================================
# 2. 提取本体状态 (ee_pose 或 qpos)
# ==========================================
if 'obs' in demo and 'robot0_eef_pos' in demo['obs']:
self.data_dict['ee_pose'] = demo['obs']['robot0_eef_pos'][:]
elif 'obs' in demo and 'joint_positions' in demo['obs']:
self.data_dict['ee_pose'] = demo['obs']['joint_positions'][:]
# 兼容 ALOHA 测试数据: observations/qpos
elif 'observations' in demo and 'qpos' in demo['observations']:
self.data_dict['ee_pose'] = demo['observations']['qpos'][:]
else:
raise ValueError("未找到机械臂状态字段!")
# ==========================================
# 3. 提取动作 (actions 或 action)
# ==========================================
if 'actions' in demo:
self.data_dict['actions'] = demo['actions'][:]
# 兼容 ALOHA 测试数据: action
elif 'action' in demo:
self.data_dict['actions'] = demo['action'][:]
else:
raise ValueError("未找到动作字段!")
# ==========================================
# 4. 提取语言指令 (如果存在)
# ==========================================
if 'language_instruction' in demo:
raw_text = demo['language_instruction'][()]
self.data_dict['instruction'] = raw_text.decode('utf-8') if isinstance(raw_text, bytes) else str(raw_text)
else:
self.data_dict['instruction'] = "No instruction provided."
except Exception as e:
print(f"❌ 解析失败: {e}")
def export_to_mp4(self, output_filename="trajectory_output.mp4", fps=30):
"""
将提取到的图像序列合成为可视化的 MP4 视频
对应目标:把轨迹保存为 .mp4 视频用来肉眼观察
"""
if 'images' not in self.data_dict:
print("❌ 字典中没有图像数据,无法生成视频!")
return
images = self.data_dict['images']
num_frames, height, width, channels = images.shape
print(f"\n🎬 正在生成视频: {output_filename} ({num_frames} 帧)...")
# 定义视频编码器 (mp4v 是广泛支持的 mp4 编码)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_filename, fourcc, fps, (width, height))
for i in range(num_frames):
frame = images[i]
# OpenCV 默认使用 BGR 色彩空间,如果原图是 RGB,需要转换
# 真实具身数据通常是 RGB,这里做一次容错转换
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
out.write(frame_bgr)
out.release()
print(f"✅ 视频生成完毕!已保存至: {os.path.abspath(output_filename)}")
if __name__ == "__main__":
# 使用我们刚刚的通用解析器
target_file = "sample_dataset.hdf5" # 稍后我们会换成真正的 Bridge 数据
parser = VLADatasetParser(target_file)
parser.extract_trajectory()
parser.export_to_mp4("result.mp4")
这个脚本实现了:
-
容错提取(Data Ingestion): 它不仅能解析我们之前的结构,还能智能识别 observations/images0(BridgeData 的习惯命名)和 robot0_eef_pos(末端执行器位姿),把它们干净地塞进 Python 字典里。
-
文本解码: 它处理了 HDF5 中恼人的二进制字符串(Byte strings)问题。
-
肉眼观察(MP4 导出): 它调用了 cv2.VideoWriter,把你提取出来的多维数组,直接压成了一个你双击就能播放的 .mp4 视频。
在实际使用时,我们将下载到的hdf5 文件,直接拖进 vla_data_pipeline 文件夹里,将最后name函数中的target_file与想得到demp4格式文件名字进行修改即可。我在尝试跑通demo时选择使用了ALOHA的数据集。
三、物理规则与异常值过滤
目前具身智能最主流的托管平台就是 Hugging Face(HF)。我们将使用官方的 huggingface_hub 库,直接从斯坦福大学著名的 ALOHA 项目中精准下载一个真实的抓取轨迹文件(大约几 MB,包含真实的机械臂视角和动作)。没有选择BridgeData V2是因为BridgeData V2 的存储格式非常“重”: 在 Hugging Face 上,官方的 BridgeData V2 (rail-berkeley/bridge_v2) 并不是以单个的小 .hdf5 文件存放的。它是为了谷歌 TPU 和大规模集群训练设计的,被打包成了巨大的 RLDS (TFRecord) 格式,或者是动辄几十 GB 的 .tar 压缩包。你没法只用几行代码就从中单独“抠”出一条 5MB 的轨迹来做快速测试。
但是考虑到真实的成功轨迹是完美的,反而测不出我们后续写的“异常过滤”代码。我们要亲手捏造出包含三种致命异常的假数据,来看看我们的清洗器能不能把它们全揪出来:
在vla_data_pipline下新建generate_test_cases.py,代码如下:
import h5py
import numpy as np
import os
def create_fake_aloha_hdf5(filename, num_frames, qpos_data, action_data):
"""按照真实的 ALOHA 格式生成 HDF5 文件"""
with h5py.File(filename, 'w') as f:
f.create_dataset('action', data=action_data)
obs = f.create_group('observations')
obs.create_dataset('qpos', data=qpos_data)
# 顺便塞点假图像进去充数
obs.create_dataset('images/top', data=np.zeros((num_frames, 224, 224, 3), dtype=np.uint8))
print(f"📦 生成测试文件: {filename} (帧数: {num_frames})")
# 准备测试目录
os.makedirs("test_data", exist_ok=True)
# ---------------------------------------------------------
# Case 1: 完美的正常轨迹 (应该 ✅ 质检通过)
# ---------------------------------------------------------
frames = 150
# 模拟机械臂平滑移动:利用 np.linspace 生成渐变的数据
qpos_good = np.linspace(0, 1.5, frames).reshape(-1, 1) * np.ones((1, 14))
action_good = np.copy(qpos_good)
create_fake_aloha_hdf5("test_data/case1_perfect.hdf5", frames, qpos_good, action_good)
# ---------------------------------------------------------
# Case 2: 轨迹过短,刚启动就断开 (应该 ❌ 质检失败)
# ---------------------------------------------------------
frames_short = 10
qpos_short = np.zeros((frames_short, 14))
action_short = np.zeros((frames_short, 14))
create_fake_aloha_hdf5("test_data/case2_too_short.hdf5", frames_short, qpos_short, action_short)
# ---------------------------------------------------------
# Case 3: 原地发呆,没有产生位移 (应该 ❌ 质检失败)
# ---------------------------------------------------------
frames_lazy = 100
# 数据全是 0,代表机械臂根本没动
qpos_lazy = np.zeros((frames_lazy, 14))
action_lazy = np.zeros((frames_lazy, 14))
create_fake_aloha_hdf5("test_data/case3_lazy.hdf5", frames_lazy, qpos_lazy, action_lazy)
# ---------------------------------------------------------
# Case 4: 传感器跳变,速度爆炸 (应该 ❌ 质检失败)
# ---------------------------------------------------------
frames_spike = 100
qpos_spike = np.linspace(0, 0.5, frames_spike).reshape(-1, 1) * np.ones((1, 14))
# 💥 投毒:在第 50 帧,某个关节的数值突然暴增!
qpos_spike[50, 3] = 10.0
action_spike = np.copy(qpos_spike)
create_fake_aloha_hdf5("test_data/case4_spike.hdf5", frames_spike, qpos_spike, action_spike)
print("\n🎯 4个极限测试用例已生成在 test_data/ 目录下!")
再简历kinematic_filter.py用来过滤脏数据,代码如下:
import h5py
import numpy as np
import glob
class KinematicFilter:
def __init__(self, min_length=15, movement_threshold=0.01, velocity_limit=1.0):
# 规则 1:最短有效帧数 (太短的通常是误触录制)
self.min_length = min_length
# 规则 2:首尾位置最小位移 (判断是否在原地发呆,未完成任务)
self.movement_threshold = movement_threshold
# 规则 3:相邻两帧最大允许的关节角速度 (判断传感器是否发生数值爆炸/突变)
self.velocity_limit = velocity_limit
def run_checks(self, qpos, actions):
"""
运行三大物理规则质检
qpos: 本体状态数组 [帧数, 关节数]
actions: 动作数组 [帧数, 动作维度]
返回: (是否通过, 失败原因列表, 总位移量, 最大角速度)
"""
num_frames = qpos.shape[0]
reasons = []
# 1. 轨迹过短剔除
if num_frames < self.min_length:
reasons.append(f"轨迹过短: 仅 {num_frames} 帧 (要求 >= {self.min_length})")
# 2. 静止发呆剔除
# 计算首帧和尾帧关节角度的 L2 距离 (欧氏距离),衡量整段轨迹的变化量
total_movement = np.linalg.norm(qpos[-1] - qpos[0])
if total_movement < self.movement_threshold:
reasons.append(f"轨迹疑似静止: 总位移量 {total_movement:.4f} < {self.movement_threshold}")
# 3. 奇异点/突变剔除 (面试高频考点!)
# np.diff 计算相邻两帧的差值,这就代表了瞬时速度
velocities = np.diff(qpos, axis=0)
max_velocity = np.max(np.abs(velocities))
if max_velocity > self.velocity_limit:
reasons.append(f"发现速度突变: 最大瞬时角速度 {max_velocity:.4f} > 阈值 {self.velocity_limit}")
passed = len(reasons) == 0
return passed, reasons, total_movement, max_velocity
def process_single_file(h5_path):
print(f"🛡️ 开始质检文件: {h5_path}")
try:
with h5py.File(h5_path, 'r') as f:
# 智能路由提取数据 (兼容昨天写的两种格式)
if 'action' in f and 'observations' in f:
actions = f['action'][:]
qpos = f['observations']['qpos'][:]
elif 'data' in f:
demo = f['data'][list(f['data'].keys())[0]]
actions = demo['actions'][:]
qpos = demo['obs']['joint_positions'][:]
else:
print("❌ 数据集格式无法识别!")
return
# 初始化过滤器并运行质检
# 这里的阈值(0.01和1.0)是经验值,大厂里通常会根据具体机械臂的物理极限来设置
data_filter = KinematicFilter(min_length=15, movement_threshold=0.01, velocity_limit=1.0)
passed, reasons, movement, max_vel = data_filter.run_checks(qpos, actions)
print("\n=== 📝 质检报告 ===")
print(f"轨迹长度: {qpos.shape[0]} 帧")
print(f"总位移量: {movement:.4f}")
print(f"最大角速度: {max_vel:.4f}")
if passed:
print("✅ 结论: 质检通过!这是一条高质量的有效轨迹。")
else:
print("❌ 结论: 质检失败!该轨迹将被丢弃。")
for r in reasons:
print(f" - 原因: {r}")
except Exception as e:
print(f"读取文件时发生错误: {e}")
if __name__ == "__main__":
# 批量寻找 test_data 文件夹下的所有 hdf5 文件
test_files = glob.glob("test_data/*.hdf5")
print("🚀 启动数据清洗管线批量质检...\n")
for file_path in sorted(test_files):
process_single_file(file_path)
print("-" * 50)
接下来在终端执行:
python generate_test_cases.py
你的 VS Code 左侧目录会多出一个叫 test_data 的文件夹,里面躺着 4 个刚造好的 .hdf5 文件(分别代表完美数据、过短数据、发呆数据、抽搐数据)。
再执行:
python kinematic_filter.py
此时终端会滚动输出这 4 个文件的质检报告,大概长下面的样子:
(vla_pipeline) PS C:\Users\admin\Desktop\vla_data_pipeline> python kinematic_filter.py
🚀 启动数据清洗管线批量质检...
🛡️ 开始质检文件: test_data\case1_perfect.hdf5
=== 📝 质检报告 ===
轨迹长度: 150 帧
总位移量: 5.6125
最大角速度: 0.0101
✅ 结论: 质检通过!这是一条高质量的有效轨迹。
--------------------------------------------------
🛡️ 开始质检文件: test_data\case2_too_short.hdf5
=== 📝 质检报告 ===
轨迹长度: 10 帧
总位移量: 0.0000
最大角速度: 0.0000
❌ 结论: 质检失败!该轨迹将被丢弃。
- 原因: 轨迹过短: 仅 10 帧 (要求 >= 15)
- 原因: 轨迹疑似静止: 总位移量 0.0000 < 0.01
--------------------------------------------------
🛡️ 开始质检文件: test_data\case3_lazy.hdf5
=== 📝 质检报告 ===
轨迹长度: 100 帧
总位移量: 0.0000
最大角速度: 0.0000
❌ 结论: 质检失败!该轨迹将被丢弃。
- 原因: 轨迹疑似静止: 总位移量 0.0000 < 0.01
--------------------------------------------------
🛡️ 开始质检文件: test_data\case4_spike.hdf5
=== 📝 质检报告 ===
轨迹长度: 100 帧
总位移量: 1.8708
最大角速度: 9.7525
❌ 结论: 质检失败!该轨迹将被丢弃。
- 原因: 发现速度突变: 最大瞬时角速度 9.7525 > 阈值 1.0
--------------------------------------------------
四、多模态时间戳精准对齐
在具身智能硬件系统里:
-
相机(视觉): 通常以 30Hz 或 60Hz 的固定频率拍照。
-
机械臂(本体状态与动作): 通常以 50Hz、100Hz 甚至 500Hz 的超高频率发送控制指令。
这就导致了: 同一时间点,你有图片,但没有对应的机械臂位置;或者你有机械臂位置,但相机还没来得及拍照。 如果你不把它们对齐到同一条时间轴上,模型学到的就是“错位”的因果关系(看到苹果,但手已经在抓空气了)。此时我们以频率最低的传感器(通常是相机)为基准时间轴,用数学插值法,把高频的动作数据“强行拉伸/压缩”到相机的频率上。我们将使用科学计算库 SciPy 来实现,在环境搭建环节已经安装。
我们建立文件time_sync_show.py,方便大家理解基本的处理逻辑,代码如下:
import numpy as np
from scipy.interpolate import interp1d
def synchronize_multimodal_data():
print("⏳ 开始多模态时间戳对齐模拟...")
# ==========================================
# 1. 制造一组极度不对齐的原始数据
# ==========================================
duration = 1.0 # 录制了 1 秒的轨迹
cam_fps = 30 # 相机 30Hz
robot_hz = 50 # 机器人 50Hz
# 生成时间戳 (相机有30个时间点,机器人有50个时间点)
cam_timestamps = np.linspace(0, duration, int(duration * cam_fps))
robot_timestamps = np.linspace(0, duration, int(duration * robot_hz))
# 模拟机械臂的连续动作:7个关节都在做正弦波运动
robot_qpos = np.sin(robot_timestamps * 2 * np.pi).reshape(-1, 1) * np.ones((1, 7))
# 模拟夹爪的离散动作:前0.5秒张开(0),后0.5秒闭合(1)
robot_gripper = np.where(robot_timestamps < 0.5, 0.0, 1.0).reshape(-1, 1)
print(f"🚨 对齐前灾难现场:")
print(f" 📷 相机帧数: {len(cam_timestamps)} 帧")
print(f" 🦾 关节帧数: {len(robot_qpos)} 帧")
print(f" 🤏 夹爪帧数: {len(robot_gripper)} 帧")
# ==========================================
# 2. 核心算法:SciPy 时间轴重采样与插值
# ==========================================
# 针对连续物理量 (关节角 qpos):使用【线性插值 linear】
# 原理:如果在 0.1秒和0.2秒之间有个相机帧,那就取两帧动作的平均渐变值
interp_func_qpos = interp1d(
robot_timestamps,
robot_qpos,
kind='linear',
axis=0,
fill_value="extrapolate" # 如果超出边界就外推,防止报错
)
synced_qpos = interp_func_qpos(cam_timestamps)
# 针对离散指令 (夹爪 gripper):使用【最近邻插值 nearest】
# 面试加分项:绝对不能对夹爪用线性插值!否则 0 和 1 之间会插出 0.5 的半开状态,导致机械臂硬件报错!
interp_func_gripper = interp1d(
robot_timestamps,
robot_gripper,
kind='nearest',
axis=0,
fill_value="extrapolate"
)
synced_gripper = interp_func_gripper(cam_timestamps)
# ==========================================
# 3. 验证对齐结果
# ==========================================
print(f"\n✅ 对齐后完美状态:(以相机时间轴为准)")
print(f" 📷 相机帧数: {len(cam_timestamps)} 帧")
print(f" 🦾 同步后关节: {len(synced_qpos)} 帧")
print(f" 🤏 同步后夹爪: {len(synced_gripper)} 帧")
# 拼接成最终送入 VLA 模型的数据集格式 [30, 8]
final_robot_state = np.hstack([synced_qpos, synced_gripper])
print(f"\n🎉 最终拼接完成的机器人状态矩阵形状: {final_robot_state.shape}")
if __name__ == "__main__":
synchronize_multimodal_data()
运行后日志大概是这样:
⏳ 开始多模态时间戳对齐模拟...
🚨 对齐前灾难现场:
📷 相机帧数: 30 帧
🦾 关节帧数: 50 帧
🤏 夹爪帧数: 50 帧
✅ 对齐后完美状态:(以相机时间轴为准)
📷 相机帧数: 30 帧
🦾 同步后关节: 30 帧
🤏 同步后夹爪: 30 帧
🎉 最终拼接完成的机器人状态矩阵形状: (30, 8)
其中,30代表对齐后的统一帧率,8代表的是机械臂的状态由8维向量构成,分别是1-3维度的空间坐标,4-7维度的位姿,使用四元数来表示,最后第8维是抓夹状态的开合,通常0代表张开。
实际使用时我们使用另外的文件time_sync.py:
import numpy as np
from scipy.interpolate import interp1d
class TimeSynchronizer:
def __init__(self, target_fps=30):
# 目标频率,通常对齐到相机的频率 (30Hz)
self.target_fps = target_fps
def synchronize(self, images, qpos, actions, original_robot_hz=50):
"""
通用的多模态对齐函数
images: 视觉数组 [相机帧数, H, W, C]
qpos: 关节角度数组 [机器人帧数, 关节维数]
actions: 动作指令数组 [机器人帧数, 动作维数 (包含夹爪)]
original_robot_hz: 原始机器人数据采集频率
"""
num_cam_frames = images.shape[0]
num_robot_frames = qpos.shape[0]
# 如果帧数天然一致,直接放行(节省算力)
if num_cam_frames == num_robot_frames:
return images, qpos, actions
# 1. 根据帧数反推时间轴
duration = num_cam_frames / self.target_fps
cam_timestamps = np.linspace(0, duration, num_cam_frames)
robot_timestamps = np.linspace(0, duration, num_robot_frames)
# 2. 分离连续动作(关节)和离散动作(夹爪)
# 假设 actions 的最后一列是夹爪 (0 或 1)
continuous_actions = actions[:, :-1]
gripper_actions = actions[:, -1:]
# 3. 对本体状态 (qpos) 进行线性插值
interp_qpos = interp1d(robot_timestamps, qpos, kind='linear', axis=0, fill_value="extrapolate")
synced_qpos = interp_qpos(cam_timestamps)
# 4. 对连续动作 进行线性插值
interp_actions = interp1d(robot_timestamps, continuous_actions, kind='linear', axis=0, fill_value="extrapolate")
synced_continuous_actions = interp_actions(cam_timestamps)
# 5. 对夹爪指令 进行最近邻插值 (极其重要,保持 0/1 状态不被破坏)
interp_gripper = interp1d(robot_timestamps, gripper_actions, kind='nearest', axis=0, fill_value="extrapolate")
synced_gripper = interp_gripper(cam_timestamps)
# 重新拼接动作
synced_actions = np.hstack([synced_continuous_actions, synced_gripper])
return images, synced_qpos, synced_actions
五、引入 VLM 进行自动化重打标
在传统的开源数据集(比如早期的 BridgeData 或 ALOHA)里,人类标的文本往往非常敷衍,可能整个数据集几十万条轨迹,文本标签全是单调的:“pick up the cup”(拿起杯子)。
如果用这种干瘪的数据去训练 VLA 模型,模型就会变成一个“文盲”,稍微换个指令(比如“把那个红色的陶瓷马克杯抓起来”)它就听不懂了。
现在的前沿做法是利用多模态大模型(VLM,如 Gemini 1.5 Flash 或 GPT-4o)自动看图说话,为每一条轨迹生成极度丰富、细致的语言标签。考虑到可能还没配置真实的 API Key,我在这里做了一个工业级 Mock(模拟)+ 真实 API 预留的双轨代码:
新建文件vlm_relabel.py,代码如下:
import h5py
import numpy as np
from PIL import Image
import time
class VLMRelabeler:
def __init__(self, api_key=None):
self.api_key = api_key
# 如果你有真实的 API Key,这里会初始化真正的 VLM 客户端
if self.api_key:
print("🔗 已连接到真实的 VLM API...")
# import google.generativeai as genai
# genai.configure(api_key=self.api_key)
# self.model = genai.GenerativeModel('gemini-1.5-flash')
else:
print("⚠️ 未检测到 API Key,启用本地离线模拟 VLM 模式...")
def extract_start_end_frames(self, h5_path):
"""从 HDF5 中提取轨迹的第一帧(初始状态)和最后一帧(结束状态)"""
print(f"📦 正在打开数据文件: {h5_path}")
try:
with h5py.File(h5_path, 'r') as f:
# 兼容我们之前生成的 test_data 结构
images = f['observations']['images/top'][:]
# 提取首尾两帧
start_frame_np = images[0]
end_frame_np = images[-1]
# 将 numpy 数组转换为 PIL Image 格式 (VLM 接口通常需要这种格式)
start_image = Image.fromarray(start_frame_np)
end_image = Image.fromarray(end_frame_np)
return start_image, end_image
except Exception as e:
print(f"❌ 提取图像失败: {e}")
return None, None
def generate_rich_instruction(self, start_img, end_img):
"""调用 VLM 生成高质量的动作描述文本"""
prompt = """
You are an expert roboticist. I will provide you with the start and end camera frames of a robot manipulation task.
Please describe the task the robot just completed in a single, highly detailed sentence.
Include the objects involved, their colors, and the spatial relationship changes.
"""
if self.api_key:
# === 这里是真实的 API 调用逻辑 (留给以后用) ===
# response = self.model.generate_content([prompt, start_img, end_img])
# return response.text.strip()
pass
else:
# === 这里是模拟 VLM 思考和输出的过程 ===
print("🧠 [模拟 VLM] 正在分析视觉差异特征...")
time.sleep(1.5) # 模拟网络延迟
# 我们随机返回一些高质量的伪造指令,展示重打标的效果
mock_instructions = [
"The robot arm successfully grasped the transparent plastic bottle and placed it upright inside the blue storage bin.",
"The robotic manipulator picked up the red apple from the wooden table and gently dropped it into the metallic sink.",
"The end-effector smoothly slid the yellow sponge across the white plate to clean its surface."
]
import random
return random.choice(mock_instructions)
if __name__ == "__main__":
# 我们用生成的第一个完美测试文件来做实验
target_file = "test_data/case1_perfect.hdf5"
# 实例化打标器 (如果不传 api_key,就会走本地模拟逻辑,确保代码必能跑通)
relabeler = VLMRelabeler(api_key=None)
print("\n" + "="*50)
print("🎥 步骤 1: 提取首尾关键帧")
img_start, img_end = relabeler.extract_start_end_frames(target_file)
if img_start and img_end:
print("\n✍️ 步骤 2: 将多模态图像送入大模型请求重打标")
new_instruction = relabeler.generate_rich_instruction(img_start, img_end)
print("\n✅ VLM 重打标完成!")
print(f"👉 生成的高质量指令: \n\033[92m\"{new_instruction}\"\033[0m")
print("="*50)
当我们真实调用api时,操作如下(以google的为例):
- 安装 Google Gemini 官方库。在你的终端(确保带有 (vla_pipeline))里运行:
pip install google-generativeai
- 获取一个免费的 API Key。打开浏览器,访问 Google AI Studio: https://aistudio.google.com/,登录你的 Google 账号,点击左侧的 “Get API key”,然后点击 “Create API key”,复制那一长串生成的字符(比如 AIzaSyB…)。
- 打开 vlm_relabel.py,把被 # 注释掉的代码释放出来。主要修改三个地方:
修改点一:初始化真实模型
找到__init__函数,改成这样:
def __init__(self, api_key=None):
self.api_key = api_key
if self.api_key:
print("🔗 已连接到真实的 VLM API...")
import google.generativeai as genai
genai.configure(api_key=self.api_key)
# 使用最新的多模态模型 gemini-1.5-flash
self.model = genai.GenerativeModel('gemini-1.5-flash')
else:
print("⚠️ 未检测到 API Key,启用本地离线模拟 VLM 模式...")
修改点二:发送真实请求
找到 generate_rich_instruction 函数,把 if self.api_key: 下面的代码改成这样:
if self.api_key:
print("🧠 [真实 VLM] 正在将图像发送至云端大模型进行解析...")
try:
# 真实调用:同时传入提示词、起始图、结束图
response = self.model.generate_content([prompt, start_img, end_img])
return response.text.strip()
except Exception as e:
return f"API 调用失败: {e}"
修改点 3:填入你的 API Key
在代码最底部的 if name == “main”: 下方,将你的 Key 填进去:
if __name__ == "__main__":
target_file = "test_data/case1_perfect.hdf5"
# ⚠️ 把下面的 "替换成你的真实API_KEY" 换成你刚才复制的那串字符
REAL_API_KEY = "替换成你的真实API_KEY"
# 实例化打标器,传入真实的 Key
relabeler = VLMRelabeler(api_key=REAL_API_KEY)
# ... 后面的执行代码保持不变 ...
- 设置代理并运行
由于调用 Google 的 API 也需要经过海外网络,你在运行代码前,必须确保你的代理处于 TUN 模式(全局路由),或者在代码最顶端加上 os.environ[“HTTPS_PROXY”] = “http://127.0.0.1:你的端口”。比如我使用的clash,端口通常为7890.
六、工程打包与标准化
新建文件main_pipeline.py,代码如下:
import os
import h5py
import glob
from tqdm import tqdm # 进度条神器
# 导入之前写的模块
from bridge_parser import VLADatasetParser
from kinematic_filter import KinematicFilter
from time_sync import TimeSynchronizer
from vlm_relabel import VLMRelabeler
def run_vla_data_pipeline(input_folder, output_folder):
print("🚀 启动具身智能大规模数据清洗管线 (VLA Data Pipeline) 🚀\n")
os.makedirs(output_folder, exist_ok=True)
# 实例化四大组件
filter_engine = KinematicFilter(min_length=15)
sync_engine = TimeSynchronizer(target_fps=30)
vlm_engine = VLMRelabeler() # 离线模拟模式
# 找到所有 HDF5 文件
all_files = glob.glob(os.path.join(input_folder, "*.hdf5"))
print(f"📂 扫描到 {len(all_files)} 个待处理的原始轨迹文件。\n")
valid_count = 0
# tqdm 提供一个高大上的进度条
for file_path in tqdm(all_files, desc="Processing Trajectories"):
file_name = os.path.basename(file_path)
# ==========================================
# 步骤 1:解析提取
# ==========================================
parser = VLADatasetParser(file_path)
parser.extract_trajectory()
data = parser.data_dict
# 如果解析失败(缺少核心字段),直接跳过
if 'images' not in data or 'ee_pose' not in data or 'actions' not in data:
continue
# ==========================================
# 步骤 2:物理规则质检
# ==========================================
passed, reasons, _, _ = filter_engine.run_checks(data['ee_pose'], data['actions'])
if not passed:
# 真实业务中,脏数据会被直接丢弃,不占用后续算力
continue
# ==========================================
# 步骤 3:多模态时间戳对齐
# ==========================================
# 把原始的图片、关节、动作喂给对齐引擎,吐出完美对齐的新数据
synced_imgs, synced_qpos, synced_actions = sync_engine.synchronize(
data['images'], data['ee_pose'], data['actions']
)
# ==========================================
# 步骤 4:VLM 自动化重打标
# ==========================================
# 提取对齐后的第一帧和最后一帧
from PIL import Image
img_start = Image.fromarray(synced_imgs[0])
img_end = Image.fromarray(synced_imgs[-1])
new_instruction = vlm_engine.generate_rich_instruction(img_start, img_end)
# ==========================================
# 步骤 5:落盘存储 (保存为干净的数据集)
# ==========================================
out_path = os.path.join(output_folder, f"clean_{file_name}")
with h5py.File(out_path, 'w') as f:
f.create_dataset('images', data=synced_imgs)
f.create_dataset('qpos', data=synced_qpos)
f.create_dataset('actions', data=synced_actions)
f.create_dataset('instruction', data=new_instruction)
valid_count += 1
print("\n" + "="*50)
print("📊 数据清洗任务完成报告")
print(f"总计输入文件数: {len(all_files)}")
print(f"成功清洗并对齐保留的文件数: {valid_count}")
print(f"数据清洗率 (留存率): {(valid_count/len(all_files))*100:.1f}%")
print(f"干净的数据已存入: {os.path.abspath(output_folder)}")
print("="*50)
if __name__ == "__main__":
# 从我们造的测试数据文件夹读取,输出到干净的产物文件夹
run_vla_data_pipeline(input_folder="test_data", output_folder="clean_data")
运行后终端日志大概内容如下:
(vla_pipeline) PS C:\Users\admin\Desktop\vla_data_pipeline> python main_pipeline.py
🚀 启动具身智能大规模数据清洗管线 (VLA Data Pipeline) 🚀
⚠️ 未检测到 API Key,启用本地离线模拟 VLM 模式...
📂 扫描到 4 个待处理的原始轨迹文件。
Processing Trajectories: 0%| | 0/4 [00:00<?, ?it/s]� 正在解析数据集: test_data\case1_perfect.hdf5
🧠 [模拟 VLM] 正在分析视觉差异特征...
Processing Trajectories: 25%|███████████████████████████████████████▌ | 1/4 [00:01<00:04, 1.56s/it]� 正在解析数据集: test_data\case2_too_short.hdf5
🔄 正在解析数据集: test_data\case3_lazy.hdf5
🔄 正在解析数据集: test_data\case4_spike.hdf5
Processing Trajectories: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4/4 [00:01<00:00, 2.51it/s]
==================================================
📊 数据清洗任务完成报告
总计输入文件数: 4
成功清洗并对齐保留的文件数: 1
数据清洗率 (留存率): 25.0%
干净的数据已存入: C:\Users\admin\Desktop\vla_data_pipeline\clean_data
==================================================
七、可视化报告
新建文件夹inspect_clean_data.py,代码如下:
import h5py
import os
def inspect_clean_data(file_path):
print(f"🕵️ 开始开箱检验成品数据: {file_path}")
if not os.path.exists(file_path):
print("❌ 找不到文件!请确认你的 main_pipeline 成功生成了 clean_data 目录。")
return
with h5py.File(file_path, 'r') as f:
print("\n=== 📦 数据集内部结构 ===")
print(f"📷 图像 (images): {f['images'].shape}, 类型: {f['images'].dtype}")
print(f"🦾 关节 (qpos): {f['qpos'].shape}, 类型: {f['qpos'].dtype}")
print(f"⚡ 动作 (actions): {f['actions'].shape}, 类型: {f['actions'].dtype}")
# 提取并解码 VLM 生成的标签
instruction = f['instruction'][()]
if isinstance(instruction, bytes):
instruction = instruction.decode('utf-8')
print("\n=== 🏷️ AI 自动生成的高质量语义标签 ===")
print(f"\033[92m\"{instruction}\"\033[0m\n")
if __name__ == "__main__":
# 指向我们刚清洗出来的成品文件
target_file = "clean_data/clean_case1_perfect.hdf5"
inspect_clean_data(target_file)
后续使用把name函数中的路径进行修改即可。运行后的终端日志大概如下:
(vla_pipeline) PS C:\Users\admin\Desktop\vla_data_pipeline> python .\inspect_clean_data.py
🕵️ 开始开箱检验成品数据: clean_data/clean_case1_perfect.hdf5
=== 📦 数据集内部结构 ===
📷 图像 (images): (150, 224, 224, 3), 类型: uint8
🦾 关节 (qpos): (150, 14), 类型: float64
⚡ 动作 (actions): (150, 14), 类型: float64
=== 🏷️ AI 自动生成的高质量语义标签 ===
"The robot arm successfully grasped the transparent plastic bottle and placed it upright inside the blue storage bin."
八、处理真实的 ALOHA 物理世界数据集
我们以episode_0.hdf5为例,将它下载到vla_data_pipeline目录下新建的real_data文件夹下,根据前文把API进行配置,相关文件路径进行修改,最后打开main_pipeline.py,把最后一行读取的文件夹改掉:
if __name__ == "__main__":
# 将 input_folder 改为存放真实数据的 real_data 文件夹
run_parallel_pipeline(input_folder="real_data", output_folder="clean_data")
在终端运行即可:
python main_pipeline.py
PS: 我们可以写一个专门的“质检播放器”,把成品数据里隐藏的 VLM 标签提取出来,像电影字幕一样直接打在机器人操作的画面上。新建文件visualize_result.py,代码如下:
import h5py
import cv2
import os
import numpy as np
def generate_demo_video(clean_h5_path, output_mp4="final_demo.mp4", fps=30):
print(f"🎬 开始为清洗后的成品数据生成可视化 Demo: {clean_h5_path}")
if not os.path.exists(clean_h5_path):
print("❌ 找不到成品文件!请确认你的 main_pipeline 已经成功跑完。")
return
try:
with h5py.File(clean_h5_path, 'r') as f:
# 读取清洗后标准化的高质量数组
images = f['images'][:]
instruction = f['instruction'][()]
# 解码 VLM 文本
if isinstance(instruction, bytes):
instruction = instruction.decode('utf-8')
except Exception as e:
print(f"❌ 读取文件失败: {e}")
return
num_frames, height, width, channels = images.shape
# 设置视频编码器
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_mp4, fourcc, fps, (width, height))
print(f"✍️ 正在将 AI 标签和质检水印烧录进 {num_frames} 帧画面中...")
for i in range(num_frames):
# 转换色彩空间用于 OpenCV 写入
frame_bgr = cv2.cvtColor(images[i], cv2.COLOR_RGB2BGR)
# 1. 在左上角打上绿色的“质检通过”水印 (证明 Day 2 和 Day 3 的工作)
cv2.putText(frame_bgr, "STATUS: Cleaned & Aligned", (20, 40),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
# 2. 在底部打上黄色的 VLM 重打标文本 (证明 Day 4 的工作)
# 为了防止文字太长超出屏幕,我们简单截断一下展示
display_text = f"VLM: {instruction[:60]}..." if len(instruction) > 60 else f"VLM: {instruction}"
# 为了让文字更清晰,先画一个黑色的半透明背景条 (高级 UI 技巧)
cv2.rectangle(frame_bgr, (0, height - 50), (width, height), (0, 0, 0), -1)
cv2.putText(frame_bgr, display_text, (20, height - 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
out.write(frame_bgr)
out.release()
print(f"✅ 完美!带有 VLM 标签的绝赞展示视频已保存为: {os.path.abspath(output_mp4)}")
if __name__ == "__main__":
# 指向流水线刚刚输出的成品文件
target_clean_file = "clean_data/clean_episode_0.hdf5"
generate_demo_video(target_clean_file, "aloha_cleaned_demo.mp4")
在终端运行即可:
python visualize_result.py
更多推荐



所有评论(0)