1. 项目概述
今天我们来聊聊如何基于SAM3模型搭建一个完整的视觉处理服务端,实现机械臂3D抓取的前端视觉部分。这个方案特别适合需要精准抓取不规则物体的场景,比如物流分拣、工业装配等。
整个系统的工作流程是这样的:首先通过RGB-D相机获取场景的彩色和深度图像,然后使用SAM3模型进行物体分割,接着通过点云处理算法计算物体的3D位置和姿态,最后将这些信息转换为机械臂可以理解的坐标和角度。下面我会详细拆解每个环节的实现细节。
2. 环境准备与模型下载
2.1 模型文件获取
在开始之前,我们需要准备好SAM3的模型文件。这里有两种获取方式:
-
百度网盘下载:
- 链接:https://pan.baidu.com/s/1MiYRLo5K3lS_KuRT9QNSXQ
- 提取码:k38s
-
官网下载:
- 访问ModelScope官网:https://www.modelscope.cn/models/facebook/sam3/files
提示:建议将模型文件放在项目根目录下的models文件夹中,这样后续代码中的路径配置会更方便。
2.2 Python环境配置
我们需要安装以下Python库:
bash复制pip install opencv-python numpy open3d pyrealsense2 flask ultralytics
如果你使用的是其他品牌的RGB-D相机,需要安装对应的SDK包替代pyrealsense2。
3. 相机服务端核心实现
3.1 Flask应用框架搭建
我们使用Flask作为服务端框架,因为它轻量且易于扩展。下面是基本的应用结构:
python复制from flask import Flask, Response, jsonify, render_template
import threading
app = Flask(__name__)
frame_lock = threading.Lock() # 用于线程安全的帧数据访问
# 全局变量
processed_frame = None
current_mask = None
latest_color_frame = None
latest_depth_frame = None
intr = None
depth_scale = None
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, threaded=True)
3.2 SAM3模型初始化
模型初始化时需要特别注意预加载(预热)的问题:
python复制from ultralytics.models.sam import SAM3SemanticPredictor
import numpy as np
# 模型初始化
predictor = SAM3SemanticPredictor(overrides=dict(
model="models/sam3.pt", # 模型路径
conf=0.3 # 置信度阈值
))
# 模型预热
dummy = np.zeros((640, 480, 3), dtype=np.uint8)
predictor.set_image(dummy)
predictor(text=["object"])
print("SAM3模型预热完成")
经验分享:模型预热这个步骤很关键,可以避免第一次推理时的延迟。在实际应用中,这个延迟可能会达到2-3秒,而预热后通常能在500ms内完成推理。
3.3 相机配置与线程
我们使用单独的线程来处理相机数据流,避免阻塞主线程:
python复制import pyrealsense2 as rs
# 相机配置
pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)
# 手眼标定矩阵(眼在手上)
T_cam_to_tool = np.array([
[-1, 0, 0, 0.04],
[0, 1, 0, -0.06],
[0, 0, -1, 0.176],
[0, 0, 0, 1]
])
def camera_thread():
global latest_color_frame, latest_depth_frame, intr, depth_scale
# 启动相机
profile = pipeline.start(config)
align_to = rs.stream.color
align = rs.align(align_to)
# 获取相机参数
depth_sensor = profile.get_device().first_depth_sensor()
depth_scale = depth_sensor.get_depth_scale()
intr = profile.get_stream(rs.stream.color).as_video_stream_profile().get_intrinsics()
try:
while True:
frames = pipeline.wait_for_frames()
aligned_frames = align.process(frames)
# 更新帧数据
with frame_lock:
latest_color_frame = np.asanyarray(aligned_frames.get_color_frame().get_data())
latest_depth_frame = np.asanyarray(aligned_frames.get_depth_frame().get_data())
time.sleep(0.033) # ~30fps
finally:
pipeline.stop()
注意事项:手眼标定矩阵需要根据你的实际安装位置进行调整。建议先用棋盘格或Aruco码进行标定,然后再微调。
4. 核心算法实现
4.1 SAM3推理接口
python复制@app.route('/sam')
def sam():
global current_mask, processed_frame
with frame_lock:
img = latest_color_frame.copy()
# 执行推理
predictor.set_image(img)
results = predictor(text=["object"])
# 选择最佳掩码
best_mask = None
if results and results[0].masks is not None:
masks = results[0].masks.data.cpu().numpy()
boxes = results[0].boxes.xyxy.cpu().numpy() if results[0].boxes else []
confs = results[0].boxes.conf.cpu().numpy() if results[0].boxes else []
# 按置信度和面积排序
if len(masks) > 0:
areas = [np.sum(mask) for mask in masks]
scored = [(-confs[i] if i < len(confs) else 0.0, -areas[i], i)
for i in range(len(masks))]
scored.sort()
best_mask = masks[scored[0][2]]
# 更新全局变量
with frame_lock:
current_mask = best_mask
vis = img.copy()
if best_mask is not None:
# 可视化掩码
alpha = 0.5
green = np.array([0, 255, 0], dtype=np.uint8)
mask_bool = best_mask.astype(bool)
vis[mask_bool] = (vis[mask_bool] * (1 - alpha) + green * alpha).astype(np.uint8)
# 绘制边界框
if len(boxes) > scored[0][2]:
x1, y1, x2, y2 = boxes[scored[0][2]].astype(int)
cv2.rectangle(vis, (x1, y1), (x2, y2), (0, 255, 0), 2)
processed_frame = vis
return jsonify({"status": "sam_done", "mask_found": best_mask is not None})
4.2 点云处理核心算法
python复制def get_mask_pointcloud_center_and_normal(depth, mask, intr):
# 从掩码区域生成点云
points = []
for v in range(0, depth.shape[0], 2): # 步长为2降采样
for u in range(0, depth.shape[1], 2):
if mask[v, u] == 0:
continue
z = depth[v, u] * depth_scale
if z <= 0 or z > 1.2: # 深度范围过滤
continue
# 像素坐标转3D坐标
x = (u - intr.ppx) * z / intr.fx
y = (v - intr.ppy) * z / intr.fy
points.append([x, y, z])
if len(points) < 100:
return None, None, None, None
pts = np.array(points)
# 去除桌面点云
z_min = np.min(pts[:, 2])
pts = pts[pts[:, 2] < z_min + 0.1] # 保留比桌面高0.1m内的点
if len(pts) < 30:
return None, None, None, None
# 点云聚类
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(pts)
labels = np.array(pcd.cluster_dbscan(eps=0.02, min_points=50))
if labels.max() < 0: # 没有有效聚类
return None, None, None, None
# 选择最大聚类
largest_label = max(set(labels), key=list(labels).count)
obj_pts = pts[labels == largest_label]
# 计算中心点
center = obj_pts.mean(axis=0)
# 计算法向量
pcd_obj = o3d.geometry.PointCloud()
pcd_obj.points = o3d.utility.Vector3dVector(obj_pts)
pcd_obj.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamKNN(knn=30))
normal = np.asarray(pcd_obj.normals).mean(axis=0)
normal = normal / np.linalg.norm(normal) # 归一化
# 计算像素坐标
u = int(center[0] * intr.fx / center[2] + intr.ppx)
v = int(center[1] * intr.fy / center[2] + intr.ppy)
return center, normal, (u, v), obj_pts
4.3 角度计算与坐标转换
python复制@app.route('/pointcloud')
def pointcloud():
global processed_frame, current_mask
with frame_lock:
if current_mask is None:
return jsonify({"error": "请先执行SAM分割"}), 400
img = latest_color_frame.copy()
depth = latest_depth_frame.copy()
mask = current_mask.copy()
# 获取点云信息
center, normal, uv, obj_pts = get_mask_pointcloud_center_and_normal(depth, mask, intr)
if center is None:
return jsonify({"status": "no_object"})
# 计算掩码主方向角度
ys, xs = np.where(mask > 0)
pts_2d = np.stack([xs, ys], axis=1)
cov = np.cov(pts_2d.T - np.mean(pts_2d, axis=0))
eigvals, eigvecs = np.linalg.eig(cov)
main_vec = eigvecs[:, np.argmax(eigvals)]
# 角度处理(锁定到0~π/2范围)
angle_rad = np.arctan2(main_vec[1], main_vec[0]) % np.pi
if angle_rad > np.pi/2:
angle_rad = np.pi - angle_rad
grasp_theta = angle_rad - np.pi/2
# 坐标转换(相机→工具)
point_cam = np.array([*center, 1])
point_tool = T_cam_to_tool @ point_cam
# 可视化
vis = img.copy()
u, v = uv
if 0 <= u < 640 and 0 <= v < 480:
cv2.circle(vis, (u, v), 6, (0, 255, 0), -1)
dx = int(np.cos(grasp_theta) * 80)
dy = int(np.sin(grasp_theta) * 80)
cv2.arrowedLine(vis, (u, v), (u + dx, v + dy), (255, 0, 0), 2)
with frame_lock:
processed_frame = vis
return jsonify({
"status": "ok",
"center_camera": {"x": float(center[0]), "y": float(center[1]), "z": float(center[2])},
"normal": {"nx": float(normal[0]), "ny": float(normal[1]), "nz": float(normal[2])},
"tool_xyz": {"x": float(point_tool[0]), "y": float(point_tool[1]), "z": float(point_tool[2])},
"theta_rad": float(grasp_theta)
})
5. 前端界面与视频流
5.1 HTML页面设计
html复制<!DOCTYPE html>
<html>
<head>
<title>视觉检测系统</title>
<style>
body {
font-family: Arial, sans-serif;
background: #f0f2f5;
margin: 0;
padding: 20px;
}
.container {
display: flex;
gap: 20px;
margin-bottom: 20px;
}
.card {
background: white;
border-radius: 8px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
padding: 15px;
flex: 1;
}
.controls {
display: flex;
justify-content: center;
gap: 15px;
margin-top: 20px;
}
button {
padding: 10px 20px;
background: #4CAF50;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 16px;
}
button:hover {
background: #45a049;
}
</style>
</head>
<body>
<h1>视觉检测系统</h1>
<div class="container">
<div class="card">
<h2>实时画面</h2>
<img src="{{ url_for('video_feed') }}" width="640" height="480">
</div>
<div class="card">
<h2>处理结果</h2>
<img src="{{ url_for('processed_feed') }}" width="640" height="480">
</div>
</div>
<div class="controls">
<button onclick="run('/capture')">拍照</button>
<button onclick="run('/sam')">SAM分割</button>
<button onclick="run('/pointcloud')">点云分析</button>
</div>
<script>
function run(url) {
fetch(url)
.then(res => res.json())
.then(data => console.log(data))
.catch(err => console.error(err));
}
</script>
</body>
</html>
5.2 视频流接口
python复制@app.route('/video_feed')
def video_feed():
def generate():
while True:
with frame_lock:
if latest_color_frame is None:
continue
ret, jpeg = cv2.imencode('.jpg', latest_color_frame)
frame = jpeg.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n')
return Response(generate(), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/processed_feed')
def processed_feed():
def generate():
while True:
with frame_lock:
if processed_frame is None:
continue
ret, jpeg = cv2.imencode('.jpg', processed_frame)
frame = jpeg.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n')
return Response(generate(), mimetype='multipart/x-mixed-replace; boundary=frame')
6. 系统部署与优化建议
6.1 性能优化技巧
-
模型推理加速:
- 使用ONNX或TensorRT格式的模型
- 启用半精度推理(FP16)
- 对于固定场景,可以减小输入图像分辨率
-
点云处理优化:
- 使用Open3D的GPU加速功能
- 对点云进行体素降采样
- 提前过滤掉无效深度区域
-
多线程处理:
- 将图像采集、模型推理、点云计算放在不同线程
- 使用线程池处理并发请求
6.2 常见问题排查
-
模型加载失败:
- 检查模型文件路径是否正确
- 确认模型文件完整(可以尝试重新下载)
- 检查PyTorch版本是否兼容
-
点云结果不稳定:
- 检查深度相机的标定是否准确
- 调整点云聚类的参数(eps和min_points)
- 增加点云过滤的阈值
-
手眼标定误差:
- 使用更高精度的标定板
- 增加标定样本数量(建议至少15组)
- 考虑使用手眼标定工具箱(如MATLAB的Camera Calibrator)
7. 扩展功能建议
-
多物体抓取:
- 修改SAM3的prompt为复数形式
- 对每个检测到的物体分别计算抓取点
- 添加抓取优先级策略
-
抓取姿态评估:
- 基于点云曲率分析最佳抓取位置
- 考虑机械臂的避障约束
- 添加抓取成功率预测模型
-
动态场景处理:
- 添加目标跟踪算法
- 预测物体运动轨迹
- 实现实时抓取规划
在实际部署时,我发现锁定角度象限的处理特别重要。最初没有做这个处理时,机械臂经常会因为角度突变而产生抖动。通过将角度限制在0~π/2范围内,不仅提高了稳定性,还简化了后续的运动规划。