在机器人控制系统中,处理长时间运行任务是一个常见需求。想象一下,当你需要让机器人移动到某个位置时,这个过程可能需要几秒甚至几分钟。传统的服务调用(Service)方式会阻塞整个程序,直到任务完成,这显然不够灵活。ROS2的Action机制就是为了解决这类问题而设计的。
Action由三个关键部分组成,构成了一个完整的任务生命周期管理框架:
Goal(目标):客户端向服务器发送的任务请求。比如让海龟旋转到特定角度,或者让移动机器人导航到某个坐标点。在代码中表现为一个特定的消息类型,包含任务所需的全部参数。
Feedback(反馈):服务器在执行过程中定期发送的进度更新。这种反馈机制使得客户端能够实时了解任务执行情况,比如剩余角度、完成百分比等。反馈频率由服务器决定,通常根据任务特性进行优化。
Result(结果):任务结束时(无论成功或失败)发送的最终状态报告。与Service的响应类似,但增加了更丰富的状态信息,比如任务是否被取消、终止原因等。
理解Action的独特价值,需要将其与ROS2的其他通信机制进行对比:
| 特性 | Topic | Service | Action |
|---|---|---|---|
| 通信模式 | 单向发布/订阅 | 请求/响应 | 目标/反馈/结果 |
| 实时性 | 高频、低延迟 | 一次性交互 | 持续交互 |
| 任务管理 | 无 | 无 | 支持取消和进度跟踪 |
| 典型应用场景 | 传感器数据流 | 即时查询或控制 | 长时间运行的任务 |
| 资源消耗 | 中等(取决于频率) | 低 | 较高(需维护状态) |
Action特别适合以下场景:
在开始编写代码前,通过命令行工具熟悉Action的基本操作是非常有价值的。ROS2提供了一组强大的命令行工具来交互Action:
bash复制# 列出当前系统中所有可用的Action
ros2 action list
# 查看特定Action的详细信息
ros2 action info /turtle1/rotate_absolute
# 发送旋转目标并实时查看反馈
ros2 action send_goal /turtle1/rotate_absolute turtlesim/action/RotateAbsolute "{theta: 3.14}" --feedback
当执行最后一条命令时,你会看到类似以下的反馈输出:
code复制Feedback: {remaining: 2.8}
Feedback: {remaining: 2.5}
Feedback: {remaining: 1.9}
...
Result: {success: true, message: 'Target reached'}
提示:在命令执行过程中,随时可以按Ctrl+C取消任务。这是Action区别于Service的重要特性之一。
以turtlesim的RotateAbsolute Action为例,其消息定义清晰地展现了Action的三段式结构:
python复制# Goal定义
float32 theta # 目标角度(弧度制)
---
# Feedback定义
float32 remaining # 剩余需要旋转的角度
---
# Result定义
bool success # 是否成功完成
string message # 结果描述
这种结构设计使得:
一个完整的Action客户端通常包含以下几个关键部分:
python复制#!/usr/bin/env python3
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from turtlesim.action import RotateAbsolute
class TurtleActionClient(Node):
def __init__(self):
super().__init__('turtle_action_client')
# 创建Action客户端
self._action_client = ActionClient(
self,
RotateAbsolute,
'/turtle1/rotate_absolute'
)
def send_goal(self, angle):
# 等待Action服务器可用
self._action_client.wait_for_server()
# 创建Goal消息
goal_msg = RotateAbsolute.Goal()
goal_msg.theta = angle
# 异步发送Goal并设置回调
self._send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
# 处理服务器对Goal的响应
pass
def feedback_callback(self, feedback_msg):
# 处理反馈信息
pass
def get_result_callback(self, future):
# 处理最终结果
pass
在基础架构上,我们可以实现更复杂的控制策略,比如条件取消:
python复制def feedback_callback(self, feedback_msg):
remaining = feedback_msg.feedback.remaining
self.get_logger().info(f'剩余角度: {remaining:.2f}弧度')
# 当剩余角度小于阈值时自动取消
if remaining < 1.0 and not self._cancel_requested:
self._cancel_requested = True
self.get_logger().warn('触发条件取消!')
# 异步发送取消请求
cancel_future = self._goal_handle.cancel_goal_async()
cancel_future.add_done_callback(self.cancel_done_callback)
def cancel_done_callback(self, future):
# 处理取消操作完成的通知
if future.result().return_code == 1: # GOAL_TERMINAL_STATE
self.get_logger().info('取消请求已被接受')
以下是整合了所有功能的完整Action客户端实现:
python复制#!/usr/bin/env python3
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from turtlesim.action import RotateAbsolute
class SmartTurtleCommander(Node):
def __init__(self):
super().__init__('smart_turtle_commander')
self._action_client = ActionClient(
self,
RotateAbsolute,
'/turtle1/rotate_absolute'
)
self._goal_handle = None
self._cancel_requested = False
def send_goal(self, angle):
# 等待服务器可用
if not self._action_client.wait_for_server(timeout_sec=5.0):
self.get_logger().error('Action服务器未就绪')
return
# 创建Goal
goal_msg = RotateAbsolute.Goal()
goal_msg.theta = angle
# 发送Goal
self._send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
self._goal_handle = future.result()
if not self._goal_handle.accepted:
self.get_logger().info('Goal被拒绝')
return
self.get_logger().info('Goal已接受')
self._get_result_future = self._goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def feedback_callback(self, feedback_msg):
remaining = feedback_msg.feedback.remaining
self.get_logger().info(f'进度: 剩余{remaining:.2f}弧度')
if remaining < 1.0 and not self._cancel_requested:
self._cancel_requested = True
self.get_logger().warn('触发条件取消!')
cancel_future = self._goal_handle.cancel_goal_async()
cancel_future.add_done_callback(self.cancel_done_callback)
def cancel_done_callback(self, future):
cancel_response = future.result()
if cancel_response.return_code == 1:
self.get_logger().info('取消成功')
def get_result_callback(self, future):
result = future.result().result
status = future.result().status
if status == 2: # CANCELED
self.get_logger().info('任务已被取消')
elif status == 1 and result.success: # SUCCEEDED
self.get_logger().info('任务完成')
else:
self.get_logger().info('任务失败')
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
client = SmartTurtleCommander()
client.send_goal(6.28) # 360度
rclpy.spin(client)
if __name__ == '__main__':
main()
在实际机器人系统中,常常需要协调多个Action的执行。例如,让机器人先移动到某个位置,然后执行抓取动作。这可以通过Action的状态机来实现:
python复制class MultiActionController(Node):
def __init__(self):
super().__init__('multi_action_controller')
self._move_client = ActionClient(self, Move, '/move_base')
self._gripper_client = ActionClient(self, GripperCommand, '/gripper_controller')
self._current_state = 'IDLE'
def execute_sequence(self):
if self._current_state == 'IDLE':
self._send_move_goal()
self._current_state = 'MOVING'
def _send_move_goal(self):
goal = Move.Goal()
# 设置目标位置...
self._move_client.send_goal_async(
goal,
feedback_callback=self._move_feedback,
done_callback=self._move_done
)
def _move_done(self, future):
result = future.result().result
if result.success:
self._send_gripper_goal()
self._current_state = 'GRIPPING'
else:
self.get_logger().error('移动失败')
def _send_gripper_goal(self):
goal = GripperCommand.Goal()
# 设置抓取参数...
self._gripper_client.send_goal_async(
goal,
done_callback=self._gripper_done
)
def _gripper_done(self, future):
self._current_state = 'COMPLETE'
self.get_logger().info('任务序列完成')
在实际应用中,网络问题或系统负载可能导致Action执行异常。实现健壮的重试机制非常重要:
python复制def send_goal_with_retry(self, goal_msg, max_retries=3):
retry_count = 0
while retry_count < max_retries:
try:
future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
# 设置超时
rclpy.spin_until_future_complete(self, future, timeout_sec=5.0)
if future.done():
return future.result()
except Exception as e:
self.get_logger().warn(f'尝试 {retry_count+1} 失败: {str(e)}')
retry_count += 1
time.sleep(1)
raise Exception('达到最大重试次数')
反馈频率控制:在Action服务器端合理设置反馈频率,避免过高频率导致系统负载增加。对于海龟旋转这样的简单任务,10Hz的反馈率通常足够;对于导航等复杂任务,1-5Hz可能更合适。
取消响应优化:确保取消请求能够快速响应。在服务器端实现中,应该定期检查取消标志,避免长时间阻塞的操作无法及时响应取消请求。
资源清理:Action客户端在完成任务后应该及时清理资源,特别是在频繁创建和销毁客户端的情况下。
| 问题现象 | 可能原因 | 解决方案 |
|---|---|---|
| 无法找到Action服务器 | 1. 服务器未启动 2. 名称不匹配 3. 网络分区 |
1. 检查服务器是否运行 2. 使用 ros2 action list验证名称3. 检查网络配置 |
| 目标被拒绝 | 1. 参数无效 2. 服务器忙碌 3. 权限问题 |
1. 检查Goal参数 2. 查看服务器状态 3. 检查权限设置 |
| 反馈不更新 | 1. 服务器未发送反馈 2. 网络问题 3. 回调函数错误 |
1. 检查服务器实现 2. 测试网络连接 3. 调试回调函数 |
| 取消无响应 | 1. 服务器未处理取消 2. 取消请求丢失 3. 竞争条件 |
1. 检查服务器实现 2. 添加重试逻辑 3. 检查时序问题 |
命令行监控:
bash复制# 查看Action通信详情
ros2 topic echo /turtle1/rotate_absolute/_action/feedback
ros2 topic echo /turtle1/rotate_absolute/_action/status
可视化工具:
rqt_action:提供图形化界面查看和管理Actionrqt_console:查看节点日志,有助于调试回调函数日志记录:
python复制# 在关键位置添加详细日志
self.get_logger().debug(f'Goal状态更新: {status}')
单元测试:
python复制import unittest
from std_srvs.srv import Trigger
class TestActionClient(unittest.TestCase):
def test_goal_acceptance(self):
# 测试Goal是否被正确接受
pass
Action特别适合机器人导航场景。典型的导航Action接口可能包含:
python复制# Goal
geometry_msgs/PoseStamped target_pose
float32 speed_limit
---
# Feedback
geometry_msgs/PoseStamped current_pose
float32 progress # 0.0-1.0
string current_state
---
# Result
bool success
float32 final_distance
duration execution_time
客户端实现要点:
机械臂控制是另一个典型应用场景:
python复制# Goal
geometry_msgs/Pose target_pose
string object_type
float32 force_limit
---
# Feedback
geometry_msgs/Pose current_pose
float32 applied_force
string current_step # APPROACHING, GRASPING, etc.
---
# Result
bool success
float32 final_force
string error_message
实现注意事项:
结合多个Action实现复杂任务流程:
python复制def execute_pick_and_place(self):
# 第一步:移动到目标位置
move_goal = Move.Goal(target=self.pick_position)
self._move_client.send_goal_async(
move_goal,
done_callback=self._on_move_done
)
def _on_move_done(self, future):
if future.result().success:
# 第二步:执行抓取
grasp_goal = Grasp.Goal(object_id=self.target_object)
self._grasp_client.send_goal_async(
grasp_goal,
done_callback=self._on_grasp_done
)
else:
self.get_logger().error('移动失败,终止任务')
def _on_grasp_done(self, future):
if future.result().success:
# 第三步:移动到放置位置
place_goal = Move.Goal(target=self.place_position)
self._move_client.send_goal_async(
place_goal,
done_callback=self._on_place_done
)
Action机制相比Topic和Service会消耗更多资源,主要体现在:
在资源受限的系统中,需要特别注意:
大规模部署时需要考虑:
经过实际项目验证的Action使用建议:
清晰的接口设计:
健壮的客户端实现:
高效的服务器实现:
全面的文档:
性能监控:
在机器人开发中,Action机制的正确使用可以显著提升系统的可靠性和用户体验。通过本指南介绍的技术和方法,你应该能够设计出高效、健壮的Action接口,满足各种复杂机器人任务的需求。