3.7 ROS2 Action 长时间任务与机械臂集成
一、本章目标
在前面的章节中,我们学习了 Topic(持续数据流)和 Service(一问一答)两种通信方式。但有些场景需要更复杂的交互模式:
- 控制机械臂运动到目标位置(需要知道进度,可能需要取消)
- 执行导航任务(需要持续反馈当前位置)
- 执行复杂的抓取流程(多步骤任务,每步都需要状态更新)
这些长时间运行的任务用 Service 就不够了,因为:
- Service 调用是阻塞的,在等待结果期间什么都做不了
- 没有进度反馈机制
- 无法中途取消任务
这就是 Action(动作) 通信模式的用武之地。
1.1 学习路线
- 理解 Action 的核心概念 - 为什么需要 Action?
- 自定义 Action 接口 - 定义
.action文件 - 编写 Action Server - 处理长时间任务
- 编写 Action Client - 发送目标、接收反馈
- 机械臂通信实战 - 实现角度模式运动控制
1.2 本章目标
完成本章后,你将能够:
- ✓ 理解 Action 的目标-反馈-结果通信模式
- ✓ 创建自定义
.action动作接口 - ✓ 编写 Action Server 和 Client 节点
- ✓ 实现机械臂角度模式运动控制
二、Action 核心概念
2.1 回顾三种通信方式
| 通信方式 | 特点 | 典型场景 |
|---|---|---|
| Topic | 单向、持续、异步 | 传感器数据流、关节状态发布 |
| Service | 双向、一次性、同步 | 读取配置、夹爪开关 |
| Action | 双向、持续反馈、可取消 | 运动控制、导航任务 |
2.2 Action 的工作原理
Action 采用目标-反馈-结果(Goal-Feedback-Result) 模式:
┌─────────────┐ ┌─────────────┐
│ Client │ ─── Goal Request ─────> │ Server │
│ (客户端) │ │ (服务端) │
│ │ <── Goal Response ───── │ │
│ │ │ │
│ │ <── Feedback ────────── │ (执行中...) │
│ │ <── Feedback ────────── │ │
│ │ <── Feedback ────────── │ │
│ │ │ │
│ │ <── Result ──────────── │ (完成) │
└─────────────┘ └─────────────┘
通信流程:
- Client 发送目标(Goal):告诉 Server 要做什么
- Server 接受/拒绝目标:验证目标是否可执行
- Server 持续发送反馈(Feedback):报告执行进度
- Server 返回结果(Result):任务完成或失败的最终状态
2.3 外卖系统类比
继续用外卖系统来理解 Action:
| 通信方式 | 外卖类比 | 说明 |
|---|---|---|
| Topic | 骑手持续广播 GPS | 单向推送 |
| Service | 顾客查询骑手位置 | 一问一答 |
| Action | 顾客下单 → 商家接单 → 骑手配送中(多次位置更新) → 送达 | 完整订单生命周期 |
Action 的订单流程:
- Goal:顾客下单(目标地址、菜品)
- Goal Response:商家接单/拒单
- Feedback:骑手实时位置更新(多次)
- Result:订单完成(送达时间、签收状态)
2.4 机械臂场景对比
| 功能 | 通信方式 | 理由 |
|---|---|---|
| 发布关节角度 | Topic | 持续、高频发布 |
| 读取当前角度 | Service | 按需查询,立即返回 |
| 控制夹爪开合 | Service | 快速操作,需要结果反馈 |
| 运动到目标角度 | Action | 需要进度反馈、可能需要取消 |
| 执行轨迹运动 | Action | 长时间任务,需要实时反馈 |
| 直线运动到目标点 | Action | 多步骤任务,需要进度更新 |
2.5 Action 接口的结构
Action 接口由 .action 文件定义,包含三个部分,用 --- 分隔:
# 目标(Goal):Client 发送给 Server
int32 order
---
# 结果(Result):Server 返回给 Client(任务完成时)
int32[] sequence
---
# 反馈(Feedback):Server 持续发送给 Client(执行过程中)
int32[] partial_sequence
对比 Service 的两部分结构:
- Service:请求 + 响应(2 部分)
- Action:目标 + 结果 + 反馈(3 部分)
三、自定义 Action 接口
3.1 设计机械臂角度控制 Action
我们要实现一个角度模式运动控制的 Action,让机械臂运动到指定的 6 个关节角度。
需求分析:
| 部分 | 内容 | 说明 |
|---|---|---|
| Goal | 目标角度数组、速度比例 | 告诉机械臂要去哪里 |
| Result | 是否成功、最终角度、执行时间 | 任务完成后的状态 |
| Feedback | 当前角度、进度百分比、已用时间 | 执行过程中的实时状态 |
3.2 创建 .action 文件
步骤 1:创建 action 目录
cd ~/ros2_ws/src/robot_arm_interfaces
mkdir -p action
步骤 2:创建 AngleMode.action
nano action/AngleMode.action
写入以下内容:
# AngleMode.action
# 机械臂角度模式运动控制
# ========== 目标部分(Goal)==========
# 6个电机的目标角度(单位:度)
float64[] angles # 动态数组,服务端验证长度必须为6
# 运动速度比例(0.0-1.0)
float64 speed_ratio
---
# ========== 结果部分(Result)==========
# 运动是否成功
bool success
# 错误信息或成功提示
string message
# 运动耗时(秒)
float64 execution_time
# 最终达到的角度
float64[6] final_angles
---
# ========== 反馈部分(Feedback)==========
# 自开始以来经过的时间(秒)
float64 elapsed_time
# 运动进度(0.0-1.0)
float64 progress
# 当前电机角度
float64[6] current_angles
# 可读的状态信息
string status_message
设计说明:
| 字段 | 类型 | 说明 |
|---|---|---|
| Goal(目标) | ||
angles |
float64[6] |
6 个电机的目标角度(度) |
speed_ratio |
float64 |
运动速度比例 0.0-1.0 |
| Result(结果) | ||
success |
bool |
运动是否成功 |
message |
string |
错误信息或成功提示 |
execution_time |
float64 |
运动耗时(秒) |
final_angles |
float64[6] |
最终达到的角度 |
| Feedback(反馈) | ||
elapsed_time |
float64 |
经过的时间(秒) |
progress |
float64 |
运动进度 0.0-1.0 |
current_angles |
float64[6] |
当前电机角度 |
status_message |
string |
可读的状态信息 |
3.3 修改 CMakeLists.txt
打开 ~/ros2_ws/src/robot_arm_interfaces/CMakeLists.txt,添加 .action 文件:
cmake_minimum_required(VERSION 3.8)
project(robot_arm_interfaces)
# 编译器警告选项(C++ 代码质量检查)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# ========== 查找依赖包 ==========
find_package(ament_cmake REQUIRED) # ROS2 CMake 构建工具
find_package(builtin_interfaces REQUIRED) # 内置接口类型(Time, Duration 等)
# ========== 自定义消息生成 ==========
find_package(rosidl_default_generators REQUIRED) # 消息代码生成器
# 声明要生成的消息文件(可以添加多个 .msg 文件)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/MotorAngles.msg"
"srv/ServoGripper.srv" # 新增:舵机夹爪服务
"action/AngleMode.action" # 新增:角度模式动作
DEPENDENCIES builtin_interfaces
)
# ========== 测试配置(可选)==========
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
# 以下行跳过版权检查(如果没有添加版权信息,可以取消注释)
# set(ament_cmake_copyright_FOUND TRUE)
# 以下行跳过 cpplint 检查(如果不是 git 仓库,可以取消注释)
# set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
# ========== 导出包信息 ==========
ament_package()
3.5 编译并验证
cd ~/ros2_ws
colcon build --packages-select robot_arm_interfaces
source install/setup.bash
# 验证 Action 接口
ros2 interface show robot_arm_interfaces/action/AngleMode
输出:
# AngleMode.action
# 机械臂角度模式运动控制
# ========== 目标部分(Goal)==========
# 6个电机的目标角度(单位:度)
float64[6] angles
# 运动速度比例(0.0-1.0)
float64 speed_ratio
---
# ========== 结果部分(Result)==========
# 运动是否成功
bool success
# 错误信息或成功提示
string message
# 运动耗时(秒)
float64 execution_time
# 最终达到的角度
float64[6] final_angles
---
# ========== 反馈部分(Feedback)==========
# 自开始以来经过的时间(秒)
float64 elapsed_time
# 运动进度(0.0-1.0)
float64 progress
# 当前电机角度
float64[6] current_angles
# 可读的状态信息
string status_message
成功! 自定义 Action 接口已经可以使用了。
四、编写 Action Server
4.1 Action Server 的核心职责
Action Server 负责:
- 接收目标:验证并决定是否接受
- 执行任务:实际控制机械臂运动
- 发布反馈:持续报告进度
- 返回结果:任务完成后的最终状态
4.2 编写角度模式 Action Server(模拟版)
我们将逐步构建 Action Server,每一步都添加新功能并说明必要性。
版本 0:最基础的框架
先创建最基础的 Action Server 框架,只接收目标但不做任何事情。
创建文件 ~/ros2_ws/src/py_episode/py_episode/angle_mode_server.py:
#!/usr/bin/env python3
"""
机械臂角度模式 Action Server(模拟版 - 版本 0)
最基础框架:只接收目标
"""
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from robot_arm_interfaces.action import AngleMode
class AngleModeServer(Node):
"""角度模式动作服务器节点"""
def __init__(self):
super().__init__('angle_mode_server')
# 创建 Action Server
self._action_server = ActionServer(
self, # 节点
AngleMode, # Action 类型
'angle_mode', # Action 名称
self.execute_callback # 执行回调函数
)
self.get_logger().info('角度模式 Action Server 已启动')
def execute_callback(self, goal_handle):
"""
执行回调函数 - 当目标被接受后执行
"""
self.get_logger().info('接收到角度模式目标...')
# 获取目标参数
target_angles = list(goal_handle.request.angles)
speed_ratio = goal_handle.request.speed_ratio
self.get_logger().info(f'目标角度: {target_angles}')
self.get_logger().info(f'速度比例: {speed_ratio}')
# 创建并返回结果(空的)
result = AngleMode.Result()
return result
def main(args=None):
rclpy.init(args=args)
node = AngleModeServer()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Action Server 业务流程图(版本 0):
Client Server
│ │
│ ① 发送目标 (Goal Request) │
│ ─────────────────────────────────────────────> │
│ angles=[150,80,90,45,100,60] │
│ speed_ratio=0.5 │
│ │
│ ┌──────────┴──────────┐
│ │ execute_callback() │
│ │ ┌───────────────┐ │
│ │ │ 1. 获取目标参数 │ │
│ │ │ 打印日志 │ │
│ │ └───────────────┘ │
│ │ ↓ │
│ │ ┌───────────────┐ │
│ ② 返回结果 (Result) │ │ 2. 返回空结果 │ │
│ <───────────────────────────────────│ │ (未标记状态) │ │
│ success=false (默认) │ └───────────────┘ │
│ 状态: ABORTED └──────────┬──────────┘
│ │
▼ ▼
说明:版本 0 只是最基础的框架,仅接收目标并打印,没有标记成功状态,
因此 ROS 2 默认将目标视为 ABORTED。
测试版本 0:
# 终端 1:启动 Server
python3 src/py_episode/py_episode/angle_mode_server.py
# 终端 2:发送目标
ros2 action send_goal /angle_mode robot_arm_interfaces/action/AngleMode \
"{angles: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0], speed_ratio: 0.5}"
观察现象:
-
Server 接收到目标并打印信息
# server [INFO] [1768462342.655779639] [angle_mode_server]: 接收到角度模式目标... [INFO] [1768462342.656104552] [angle_mode_server]: 目标角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [1768462342.656420821] [angle_mode_server]: 速度比例: 0.5 [WARN] [1768462342.656854170] [angle_mode_server.action_server]: Goal state not set, assuming aborted. Goal ID: [195 169 41 254 153 129 74 163 175 150 10 173 15 204 203 117] # client Waiting for an action server to become available... Sending goal: angles: - 150.0 - 80.0 - 90.0 - 45.0 - 100.0 - 60.0 speed_ratio: 0.5 Goal accepted with ID: 35cdb3182da8481a9ea77aedcb39f0f4 Result: success: false message: '' execution_time: 0.0 final_angles: - 0.0 - 0.0 - 0.0 - 0.0 - 0.0 - 0.0 Goal finished with status: ABORTED -
目标立即完成,但状态显示为 ABORTED(中止)
问题:为什么是 ABORTED?因为我们没有明确告诉 Server 目标的最终状态。
版本 1:标记目标成功
新增功能:调用 goal_handle.succeed() 明确标记目标成功。
修改 execute_callback:
def execute_callback(self, goal_handle):
"""执行回调函数 - 版本 1"""
self.get_logger().info('接收到角度模式目标...')
# 获取目标参数
target_angles = list(goal_handle.request.angles)
speed_ratio = goal_handle.request.speed_ratio
self.get_logger().info(f'目标角度: {target_angles}')
self.get_logger().info(f'速度比例: {speed_ratio}')
# 【新增】标记目标成功
goal_handle.succeed()
# 创建并返回结果
result = AngleMode.Result()
result.success = True
result.message = '目标已接收'
return result
Action Server 业务流程图(版本 1):
Client Server
│ │
│ ① 发送目标 (Goal Request) │
│ ─────────────────────────────────────────────> │
│ angles=[150,80,90,45,100,60] │
│ speed_ratio=0.5 │
│ │
│ ┌──────────┴──────────┐
│ │ execute_callback() │
│ │ ┌───────────────┐ │
│ │ │ 1. 获取目标参数 │ │
│ │ │ 打印日志 │ │
│ │ └───────────────┘ │
│ │ ↓ │
│ │ ┌───────────────┐ │
│ │ │ 2. 标记成功 │ │
│ │ │ succeed() ✓ │ │
│ │ └───────────────┘ │
│ │ ↓ │
│ │ ┌───────────────┐ │
│ ② 返回结果 (Result) │ │ 3. 返回结果 │ │
│ <───────────────────────────────────│ │ success=True │ │
│ success=true │ │ message=... │ │
│ message='目标已接收' │ └───────────────┘ │
│ 状态: SUCCEEDED ✓ └──────────┬──────────┘
│ │
▼ ▼
版本 1 改进:相比版本 0,新增了
goal_handle.succeed()调用,
明确告诉 ROS 2 目标已成功完成,状态从 ABORTED 变为 SUCCEEDED。
再次测试:
ros2 action send_goal /angle_mode robot_arm_interfaces/action/AngleMode \
"{angles: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0], speed_ratio: 0.5}"
观察现象:
-
目标状态变为 SUCCEEDED(成功)
# server [INFO] [1768462503.414477447] [angle_mode_server]: 角度模式 Action Server 已启动 [INFO] [1768462506.902255680] [angle_mode_server]: 接收到角度模式目标... [INFO] [1768462506.903605467] [angle_mode_server]: 目标角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [1768462506.904861474] [angle_mode_server]: 速度比例: 0.5 # client Waiting for an action server to become available... Sending goal: angles: - 150.0 - 80.0 - 90.0 - 45.0 - 100.0 - 60.0 speed_ratio: 0.5 Goal accepted with ID: be8ac3c601124e08a100cc0013ecb69d Result: success: true message: 目标已接收 execution_time: 0.0 final_angles: - 0.0 - 0.0 - 0.0 - 0.0 - 0.0 - 0.0 Goal finished with status: SUCCEEDED -
但还是立即完成,没有实际运动
必要性说明:
goal_handle.succeed()- 标记目标成功goal_handle.abort()- 标记目标失败goal_handle.canceled()- 标记目标被取消
如果不调用这些方法,ROS 2 默认认为目标 ABORTED(执行异常)。
版本 2:添加实际运动逻辑
新增功能:
- 验证参数有效性
- 模拟运动过程(时间延迟)
- 计算并返回完整结果
完整代码:
#!/usr/bin/env python3
"""
机械臂角度模式 Action Server(模拟版 - 版本 2)
添加运动逻辑和参数验证
"""
import time
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from robot_arm_interfaces.action import AngleMode
class AngleModeServer(Node):
"""角度模式动作服务器节点"""
def __init__(self):
super().__init__('angle_mode_server')
# 创建 Action Server
self._action_server = ActionServer(
self,
AngleMode,
'angle_mode',
self.execute_callback
)
# 【新增】模拟当前关节角度
self.current_angles = [180.0, 90.0, 83.0, 30.0, 110.0, 30.0]
self.get_logger().info('角度模式 Action Server 已启动')
def execute_callback(self, goal_handle):
"""执行回调函数 - 版本 2"""
self.get_logger().info('接收到角度模式目标...')
# 获取目标参数
target_angles = list(goal_handle.request.angles)
speed_ratio = goal_handle.request.speed_ratio
self.get_logger().info(f'目标角度: {target_angles}')
self.get_logger().info(f'速度比例: {speed_ratio}')
# 【新增】验证参数
if len(target_angles) != 6:
result = AngleMode.Result()
result.success = False
result.message = f'错误:需要6个关节角度,收到{len(target_angles)}个'
result.execution_time = 0.0
result.final_angles = self.current_angles
goal_handle.abort() # 标记失败
return result
# 【新增】记录开始时间
start_time = time.time()
# 【新增】计算预计执行时间(模拟真实运动)
estimated_time = 3.0
self.get_logger().info(f'预计执行时间: {estimated_time:.2f}秒')
# 【新增】模拟运动过程
time.sleep(estimated_time)
# 【新增】更新当前角度
self.current_angles = target_angles.copy()
# 标记成功
goal_handle.succeed()
# 【新增】返回完整结果
result = AngleMode.Result()
result.success = True
result.message = '角度模式运动完成'
result.execution_time = time.time() - start_time
result.final_angles = self.current_angles.copy()
self.get_logger().info(f'运动完成,耗时: {result.execution_time:.2f}秒')
return result
def main(args=None):
rclpy.init(args=args)
node = AngleModeServer()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Action Server 业务流程图(版本 2):
Client Server
│ │
│ ① 发送目标 (Goal Request) │
│ ─────────────────────────────────────────────> │
│ angles=[150,80,90,45,100,60] │
│ speed_ratio=0.5 │
│ │
│ ┌──────────┴──────────┐
│ │ execute_callback() │
│ │ ┌───────────────┐ │
│ │ │ 1. 获取目标参数 │ │
│ │ └───────────────┘ │
│ │ ↓ │
│ │ ┌───────────────┐ │
│ │ │ 2. 验证参数 ✓ │ │
│ │ │ (6个角度?) │ │
│ │ └───────────────┘ │
│ │ ↓ │
│ │ ┌───────────────┐ │
│ │ │ 3. 模拟运动 │ │
│ │ │ time.sleep() │ │
│ │ │ (3秒延迟) │ │
│ │ └───────────────┘ │
│ │ ↓ │
│ │ ┌───────────────┐ │
│ │ │ 4. 标记成功 │ │
│ │ │ succeed() ✓ │ │
│ │ └───────────────┘ │
│ │ ↓ │
│ ② 返回结果 (Result) │ ┌───────────────┐ │
│ <───────────────────────────────────│ │ 5. 返回完整结果│ │
│ success=true │ │ execution_time│ │
│ execution_time=3.0s │ │ final_angles │ │
│ final_angles=[...] │ └───────────────┘ │
│ 状态: SUCCEEDED ✓ └──────────┬──────────┘
│ │
▼ ▼
版本 2 改进:相比版本 1,新增了参数验证、模拟运动延迟(
time.sleep()),
以及返回完整结果(execution_time、final_angles)。但仍无进度反馈。
测试版本 2:
ros2 action send_goal /angle_mode robot_arm_interfaces/action/AngleMode \
"{angles: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0], speed_ratio: 0.5}"
观察现象:
-
Server 打印 "预计执行时间"
-
等待一段时间后才返回结果
-
结果包含
execution_time和final_angles[INFO] [1768462880.563510789] [angle_mode_server]: 接收到角度模式目标... [INFO] [1768462880.564861362] [angle_mode_server]: 目标角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [1768462880.566304870] [angle_mode_server]: 速度比例: 0.5 [INFO] [1768462880.567711707] [angle_mode_server]: 预计执行时间: 3.00秒 [INFO] [1768462883.569919271] [angle_mode_server]: 运动完成,耗时: 3.00秒
必要性说明:
- 参数验证:防止非法输入(如角度数量错误)
- 时间计算:模拟真实机械臂运动的耗时
- 完整结果:提供任务执行的详细信息
-
顺便检查一下参数验证,只发送 3 个角度:
# client请求 ros2 action send_goal /angle_mode robot_arm_interfaces/action/AngleMode \ "{angles: [150.0, 80.0, 90.0, 45.0, speed_ratio: 0.5}" # client输出 Waiting for an action server to become available... Sending goal: angles: - 150.0 - 80.0 - 90.0 speed_ratio: 0.5 Goal accepted with ID: aa39bd94e4c3463c9a8ee2b3cd9c76a2 Result: success: false message: 错误:需要6个关节角度,收到3个 execution_time: 0.0 final_angles: - 180.0 - 90.0 - 83.0 - 30.0 - 110.0 - 30.0 Goal finished with status: ABORTED
版本 3:添加进度反馈
新增功能:在运动过程中持续发布反馈,让 Client 知道当前进度。
修改 execute_callback(只展示新增部分):
def execute_callback(self, goal_handle):
"""执行回调函数 - 版本 3(完整版)"""
self.get_logger().info('接收到角度模式目标...')
# 获取目标参数
target_angles = list(goal_handle.request.angles)
speed_ratio = goal_handle.request.speed_ratio
self.get_logger().info(f'目标角度: {target_angles}')
self.get_logger().info(f'速度比例: {speed_ratio}')
# 验证参数
if len(target_angles) != 6:
result = AngleMode.Result()
result.success = False
result.message = f'错误:需要6个关节角度,收到{len(target_angles)}个'
result.execution_time = 0.0
result.final_angles = self.current_angles
goal_handle.abort()
return result
# 记录开始时间
start_time = time.time()
# 计算预计执行时间
estimated_time = 3.0
# 【新增】创建反馈消息
feedback_msg = AngleMode.Feedback()
# 【新增】模拟运动过程(分10步,每步发布反馈)
steps = 10
for i in range(steps + 1):
# 计算当前进度
progress = i / steps
# 【新增】模拟插值计算当前角度
for j in range(6):
self.current_angles[j] = (
self.current_angles[j] * (1 - progress) +
target_angles[j] * progress
)
# 【新增】填充反馈消息
feedback_msg.elapsed_time = time.time() - start_time
feedback_msg.progress = progress
feedback_msg.current_angles = self.current_angles.copy()
feedback_msg.status_message = f'运动中... {int(progress * 100)}%'
# 【新增】发布反馈
goal_handle.publish_feedback(feedback_msg)
self.get_logger().info(
f'进度: {progress:.1%}, 角度: {[round(a, 1) for a in self.current_angles]}'
)
# 模拟运动时间
time.sleep(estimated_time / steps)
# 更新最终角度
self.current_angles = target_angles.copy()
# 标记成功
goal_handle.succeed()
# 返回结果
result = AngleMode.Result()
result.success = True
result.message = '角度模式运动完成'
result.execution_time = time.time() - start_time
result.final_angles = self.current_angles.copy()
self.get_logger().info(f'运动完成,耗时: {result.execution_time:.2f}秒')
return result
Action Server 业务流程图(版本 3):
Client Server
│ │
│ ① 发送目标 (Goal Request) │
│ ─────────────────────────────────────────────> │
│ angles=[150,80,90,45,100,60] │
│ speed_ratio=0.5 │
│ │
│ ┌──────────┴──────────┐
│ │ execute_callback() │
│ │ ┌───────────────┐ │
│ │ │ 1. 获取目标参数 │ │
│ │ └───────────────┘ │
│ │ ↓ │
│ │ ┌───────────────┐ │
│ │ │ 2. 验证参数 │ │
│ │ └───────────────┘ │
│ │ ↓ │
│ │ ┌───────────────┐ │
│ ② 反馈 (Feedback) [多次] │ │ 3. 循环执行 │ │
│ <───────────────────────────────────│ │ ┌──────────┐ │ │
│ progress=10% │ │ │ 计算进度 │ │ │
│ <───────────────────────────────────│ │ │ 插值角度 │ │ │
│ progress=20% │ │ │ 发布反馈 │ │ │
│ <───────────────────────────────────│ │ │ sleep() │ │ │
│ ... │ │ └──────────┘ │ │
│ <───────────────────────────────────│ │ ↓ 重复10次 │ │
│ progress=100% │ └───────────────┘ │
│ │ ↓ │
│ │ ┌───────────────┐ │
│ │ │ 4. 标记成功 │ │
│ │ │ succeed() ✓ │ │
│ │ └───────────────┘ │
│ │ ↓ │
│ ③ 返回结果 (Result) │ ┌───────────────┐ │
│ <───────────────────────────────────│ │ 5. 返回完整结果│ │
│ success=true │ └───────────────┘ │
│ execution_time=3.0s └──────────┬──────────┘
│ 状态: SUCCEEDED ✓ │
▼ ▼
版本 3 改进:相比版本 2,新增了进度反馈机制,通过循环分 10 步执行,
每步调用goal_handle.publish_feedback()向 Client 发送实时进度。
这是 Action 相比 Service 的核心优势!
测试版本 3(带反馈):
ros2 action send_goal --feedback /angle_mode robot_arm_interfaces/action/AngleMode \
"{angles: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0], speed_ratio: 0.5}"
观察现象:
- Server 每隔一段时间打印进度信息
- Client 终端显示实时反馈:
# server
[INFO] [1768463108.487006795] [angle_mode_server]: 接收到角度模式目标...
[INFO] [1768463108.488352635] [angle_mode_server]: 目标角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0]
[INFO] [1768463108.489707628] [angle_mode_server]: 速度比例: 0.5
[INFO] [1768463108.491886186] [angle_mode_server]: 进度: 0.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0]
[INFO] [1768463108.794439967] [angle_mode_server]: 进度: 10.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0]
[INFO] [1768463109.096877635] [angle_mode_server]: 进度: 20.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0]
[INFO] [1768463109.399424508] [angle_mode_server]: 进度: 30.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0]
[INFO] [1768463109.701977627] [angle_mode_server]: 进度: 40.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0]
[INFO] [1768463110.004390988] [angle_mode_server]: 进度: 50.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0]
[INFO] [1768463110.306924879] [angle_mode_server]: 进度: 60.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0]
[INFO] [1768463110.609436269] [angle_mode_server]: 进度: 70.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0]
[INFO] [1768463110.912071532] [angle_mode_server]: 进度: 80.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0]
[INFO] [1768463111.214520061] [angle_mode_server]: 进度: 90.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0]
[INFO] [1768463111.517010608] [angle_mode_server]: 进度: 100.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0]
[INFO] [1768463111.819415956] [angle_mode_server]: 运动完成,耗时: 3.33秒
# client
Feedback:
elapsed_time: 2.420010566711426
progress: 0.8
current_angles:
- 150.0
- 80.0
- 90.0
- 45.0
- 100.0
- 60.0
status_message: 运动中... 80%
Feedback:
elapsed_time: 2.722559690475464
progress: 0.9
current_angles:
- 150.0
- 80.0
- 90.0
- 45.0
- 100.0
- 60.0
status_message: 运动中... 90%
必要性说明:
- 反馈机制是 Action 相比 Service 的核心优势
- 让 Client 知道任务进展,可以显示进度条
- 在真实场景中,反馈可以包含实时位置、速度、力矩等数据
版本 4(可选):支持取消目标
新增功能:检测 Client 是否请求取消,及时响应。
在循环中添加取消检测:
#!/usr/bin/env python3
"""
机械臂角度模式 Action Server(模拟版 - 版本 4)
支持取消目标
"""
import time
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer, CancelResponse, GoalResponse
from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import ReentrantCallbackGroup
from robot_arm_interfaces.action import AngleMode
class AngleModeServer(Node):
"""角度模式动作服务器节点"""
def __init__(self):
super().__init__('angle_mode_server')
# 创建可重入回调组(允许并行处理回调)
self._callback_group = ReentrantCallbackGroup()
# 创建 Action Server(添加 cancel 回调)
self._action_server = ActionServer(
self,
AngleMode,
'angle_mode',
self.execute_callback,
goal_callback=self.goal_callback,
cancel_callback=self.cancel_callback,
callback_group=self._callback_group
)
# 模拟当前关节角度
self.current_angles = [180.0, 90.0, 83.0, 30.0, 110.0, 30.0]
self.get_logger().info('角度模式 Action Server 已启动')
def goal_callback(self, goal_request):
"""接收目标请求的回调"""
self.get_logger().info('收到目标请求')
return GoalResponse.ACCEPT
def cancel_callback(self, goal_handle):
"""接收取消请求的回调 - 必须返回 ACCEPT 才能取消"""
self.get_logger().info('收到取消请求,接受取消')
return CancelResponse.ACCEPT
def execute_callback(self, goal_handle):
"""执行回调函数 - 版本 3(完整版)"""
self.get_logger().info('接收到角度模式目标...')
# 获取目标参数
target_angles = list(goal_handle.request.angles)
speed_ratio = goal_handle.request.speed_ratio
self.get_logger().info(f'目标角度: {target_angles}')
self.get_logger().info(f'速度比例: {speed_ratio}')
# 验证参数
if len(target_angles) != 6:
result = AngleMode.Result()
result.success = False
result.message = f'错误:需要6个关节角度,收到{len(target_angles)}个'
result.execution_time = 0.0
result.final_angles = self.current_angles
goal_handle.abort()
return result
# 记录开始时间
start_time = time.time()
# 计算预计执行时间
estimated_time = 20.0
# 【新增】创建反馈消息
feedback_msg = AngleMode.Feedback()
# 【新增】模拟运动过程(分10步,每步发布反馈)
steps = 10
for i in range(steps + 1):
# 【新增】检查是否被取消
if goal_handle.is_cancel_requested:
goal_handle.canceled()
result = AngleMode.Result()
result.success = False
result.message = '任务被取消'
result.execution_time = time.time() - start_time
result.final_angles = self.current_angles
self.get_logger().info('目标被取消')
return result
# 计算当前进度
progress = i / steps
# 【新增】模拟插值计算当前角度
for j in range(6):
self.current_angles[j] = (
self.current_angles[j] * (1 - progress) +
target_angles[j] * progress
)
# 【新增】填充反馈消息
feedback_msg.elapsed_time = time.time() - start_time
feedback_msg.progress = progress
feedback_msg.current_angles = self.current_angles.copy()
feedback_msg.status_message = f'运动中... {int(progress * 100)}%'
# 【新增】发布反馈
goal_handle.publish_feedback(feedback_msg)
self.get_logger().info(
f'进度: {progress:.1%}, 角度: {[round(a, 1) for a in self.current_angles]}'
)
# 模拟运动时间(分成小片段,频繁检查取消状态)
sleep_time = estimated_time / steps
sleep_interval = 0.1 # 每 0.1 秒检查一次
elapsed = 0.0
while elapsed < sleep_time:
time.sleep(min(sleep_interval, sleep_time - elapsed))
elapsed += sleep_interval
# 在 sleep 过程中也检查取消
if goal_handle.is_cancel_requested:
break
# 更新最终角度
self.current_angles = target_angles.copy()
# 标记成功
goal_handle.succeed()
# 返回结果
result = AngleMode.Result()
result.success = True
result.message = '角度模式运动完成'
result.execution_time = time.time() - start_time
result.final_angles = self.current_angles.copy()
self.get_logger().info(f'运动完成,耗时: {result.execution_time:.2f}秒')
return result
def main(args=None):
rclpy.init(args=args)
node = AngleModeServer()
# 使用多线程执行器,使取消回调能在执行时被处理
executor = MultiThreadedExecutor()
executor.add_node(node)
try:
executor.spin()
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
新增导入说明:
| 导入项 | 作用 |
|---|---|
CancelResponse |
取消回调的返回值类型(ACCEPT 或 REJECT) |
GoalResponse |
目标回调的返回值类型(ACCEPT 或 REJECT) |
MultiThreadedExecutor |
多线程执行器,允许在执行任务时处理取消请求 |
ReentrantCallbackGroup |
可重入回调组,允许多个回调并行执行 |
三个回调函数的调用时序:
Client 发送目标
│
▼
┌─────────────────┐
│ goal_callback() │ ← 第1步:决定是否接受目标
│ 返回 ACCEPT/ │ - ACCEPT:目标被接受,进入执行
│ REJECT │ - REJECT:目标被拒绝,流程结束
└────────┬────────┘
│ ACCEPT
▼
┌─────────────────┐
│execute_callback()│ ← 第2步:执行任务(长时间运行)
│ - 发布反馈 │
│ - 检查取消 │
│ - 返回结果 │
└────────┬────────┘
│
│ ← 如果 Client 请求取消
▼
┌─────────────────┐
│cancel_callback()│ ← 第3步(可选):决定是否允许取消
│ 返回 ACCEPT/ │ - ACCEPT:允许取消,execute_callback
│ REJECT │ 中 is_cancel_requested=True
└─────────────────┘ - REJECT:拒绝取消,继续执行
为什么需要 MultiThreadedExecutor + ReentrantCallbackGroup?
问题:默认单线程执行器 ┌─────────────────────────────────────────┐ │ 单线程执行器 (默认) │ │ ┌─────────────────────────────────┐ │ │ │ execute_callback() 运行中 │ │ │ │ (阻塞整个线程) │ │ │ └─────────────────────────────────┘ │ │ │ │ cancel_callback() 无法执行! ❌ │ │ (必须等 execute_callback 完成) │ └─────────────────────────────────────────┘ 解决:多线程执行器 + 可重入回调组 ┌─────────────────────────────────────────┐ │ 多线程执行器 + 可重入回调组 │ │ ┌──────────────┐ ┌──────────────┐ │ │ │ 线程 1 │ │ 线程 2 │ │ │ │ execute_ │ │ cancel_ │ │ │ │ callback() │ │ callback() │ │ │ │ 运行中 │ │ 可以执行 ✓ │ │ │ └──────────────┘ └──────────────┘ │ └─────────────────────────────────────────┘
Action Server 业务流程图(版本 4 - 完整版):
Client Server
│ │
│ ① 发送目标 (Goal Request) │
│ ─────────────────────────────────────────────> │
│ angles=[150,80,90,45,100,60] │
│ speed_ratio=0.5 │
│ ┌──────────┴──────────┐
│ │ goal_callback() │
│ │ 返回 ACCEPT ✓ │
│ └──────────┬──────────┘
│ │
│ ┌──────────┴──────────┐
│ │ execute_callback() │
│ │ ┌───────────────┐ │
│ │ │ 1. 获取并验证 │ │
│ │ └───────────────┘ │
│ │ ↓ │
│ ② 反馈 (Feedback) [多次] │ ┌───────────────┐ │
│ <───────────────────────────────────│ │ 2. 循环执行 │ │
│ progress=10% │ │ ┌────────────┐ │ │
│ │ │ │检查取消请求│ │ │
│ ③ 取消请求 (Cancel) [可选] │ │ │is_cancel_ │ │ │
│ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ > │ │ │requested? │ │ │
│ │ │ └─────┬──────┘ │ │
│ ┌─────────────│ │ │ │ │
│ │ │ │ 是 ↓ 否 │ │
│ ▼ │ │ ┌────┐ ┌────┐ │ │
│ cancel_callback() │ │ │取消│ │继续│ │ │
│ 返回 ACCEPT ✓ │ │ └────┘ └────┘ │ │
│ │ └───────────────┘ │
│ │ ↓ │
│ ④ 返回结果 (Result) │ ┌───────────────┐ │
│ <───────────────────────────────────│ │ 3. 标记状态 │ │
│ 成功: succeed() → SUCCEEDED │ │ succeed() 或 │ │
│ 取消: canceled() → CANCELED │ │ canceled() │ │
│ 失败: abort() → ABORTED │ └───────────────┘ │
│ └──────────┬──────────┘
▼ ▼
版本 4 改进:相比版本 3,新增了取消支持:
goal_callback():接收目标时决定是否接受cancel_callback():接收取消请求时决定是否允许取消is_cancel_requested:在循环中频繁检查取消状态MultiThreadedExecutor:使取消回调能在执行时被处理
测试版本 4(带取消):
# 终端 1:启动 Server
python3 src/py_episode/py_episode/angle_mode_server.py
# 终端 2:发送目标
ros2 action send_goal --feedback /angle_mode robot_arm_interfaces/action/AngleMode \
"{angles: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0], speed_ratio: 0.5}"
# 终端 3:在任务执行过程中按Ctrl+C取消
# 输出类似:
Feedback:
elapsed_time: 4.009111166000366
progress: 0.2
current_angles:
- 154.7029248
- 81.56764160000002
- 88.90265088
- 42.6485376
- 101.56764160000002
- 55.29707520000001
status_message: 运动中... 20%
^CCanceling goal...
Goal canceled.
Result:
success: false
message: 任务被取消
execution_time: 5.013190507888794
final_angles:
- 154.7029248
- 81.56764160000002
- 88.90265088
- 42.6485376
- 101.56764160000002
- 55.29707520000001
Goal finished with status: CANCELED
必要性说明:
- 在真实场景中,用户可能需要紧急停止机械臂运动
- 检测取消请求并快速响应是安全性的重要保障
版本演进总结
| 版本 | 新增功能 | 核心知识点 | 流程图变化 |
|---|---|---|---|
| 版本 0 | 基础框架 | ActionServer 创建、execute_callback 结构 |
仅接收目标 → 返回空结果(ABORTED) |
| 版本 1 | 标记成功 | goal_handle.succeed() / .abort() |
新增状态标记 → 返回成功(SUCCEEDED) |
| 版本 2 | 运动逻辑 | 参数验证、time.sleep() 模拟、完整结果 |
新增验证 + 延迟 → 返回 execution_time |
| 版本 3 | 进度反馈 | goal_handle.publish_feedback() |
新增反馈循环 ← 多次 Feedback 箭头 |
| 版本 4 | 取消支持 | cancel_callback、is_cancel_requested、MultiThreadedExecutor |
新增取消分支 → CANCELED 状态 |
五、编写 Action Client
5.1 Action Client 的核心职责
Action Client 负责:
- 发送目标:告诉 Server 要执行的任务
- 处理接受/拒绝:目标是否被接受
- 接收反馈:处理执行过程中的进度更新
- 获取结果:任务完成后的最终状态
5.2 编写角度模式 Action Client
创建文件 ~/ros2_ws/src/py_episode/py_episode/angle_mode_client.py:
#!/usr/bin/env python3
"""
机械臂角度模式 Action Client(完整版)
功能:发送目标、接收反馈、获取结果、支持取消
"""
import sys
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from robot_arm_interfaces.action import AngleMode
class AngleModeClient(Node):
"""角度模式动作客户端节点"""
def __init__(self):
super().__init__('angle_mode_client')
# 创建 Action Client
self._action_client = ActionClient(
self, # 节点
AngleMode, # Action 类型
'angle_mode' # Action 名称(必须与 Server 一致)
)
# 用于跟踪目标句柄(取消功能需要)
self._goal_handle = None
self.get_logger().info('角度模式 Action Client 已创建')
def send_goal(self, angles, speed_ratio=0.5):
"""发送角度模式目标"""
self.get_logger().info('等待 Action Server...')
self._action_client.wait_for_server()
# 创建目标消息
goal_msg = AngleMode.Goal()
goal_msg.angles = angles
goal_msg.speed_ratio = speed_ratio
self.get_logger().info(f'发送目标: angles={angles}, speed_ratio={speed_ratio}')
# 异步发送目标,注册反馈回调
self._send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback # 反馈回调
)
# 注册目标响应回调
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
"""
目标响应回调 - 当 Server 接受/拒绝目标时调用
"""
self._goal_handle = future.result()
if not self._goal_handle.accepted:
self.get_logger().error('✗ 目标被拒绝')
rclpy.shutdown()
return
self.get_logger().info('✓ 目标被接受,等待结果...')
# 异步获取结果
self._get_result_future = self._goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def feedback_callback(self, feedback_msg):
"""
反馈回调 - 接收 Server 发送的进度反馈(多次调用)
"""
feedback = feedback_msg.feedback
# 显示进度条
progress_bar = self._create_progress_bar(feedback.progress)
self.get_logger().info(
f'反馈: {progress_bar} {feedback.progress*100:.0f}% | '
f'已用时: {feedback.elapsed_time:.1f}s | '
f'{feedback.status_message}'
)
def get_result_callback(self, future):
"""
结果回调 - 当任务完成时调用
"""
result = future.result().result
self.get_logger().info('='*50)
if result.success:
self.get_logger().info('✓ 任务成功!')
self.get_logger().info(f' 消息: {result.message}')
self.get_logger().info(f' 执行时间: {result.execution_time:.2f} 秒')
self.get_logger().info(f' 最终角度: {list(result.final_angles)}')
else:
self.get_logger().error('✗ 任务失败!')
self.get_logger().error(f' 消息: {result.message}')
self.get_logger().info('='*50)
# 任务完成,关闭节点
rclpy.shutdown()
def _create_progress_bar(self, progress, width=20):
"""创建文本进度条"""
filled = int(width * progress)
bar = '█' * filled + '░' * (width - filled)
return f'[{bar}]'
def main(args=None):
rclpy.init(args=args)
node = AngleModeClient()
# 从命令行获取目标角度
if len(sys.argv) >= 7:
try:
angles = [float(sys.argv[i]) for i in range(1, 7)]
except ValueError:
print(f'错误: 角度参数必须是数字')
sys.exit(1)
else:
# 默认值
angles = [150.0, 80.0, 90.0, 45.0, 100.0, 60.0]
# 解析速度比例(可选)
speed_ratio = 0.5
if len(sys.argv) >= 8:
try:
speed_ratio = float(sys.argv[7])
if not (0.0 < speed_ratio <= 1.0):
print(f'警告: 速度比例应在 0.0-1.0 之间,已自动限制')
speed_ratio = max(0.1, min(speed_ratio, 1.0))
except ValueError:
print(f'警告: 无法解析速度比例,使用默认值 0.5')
try:
# 发送目标
node.send_goal(angles, speed_ratio)
# 保持节点运行直到完成
rclpy.spin(node)
except KeyboardInterrupt:
# 用户按 Ctrl+C 取消
node.get_logger().warn('\n检测到 Ctrl+C,正在取消任务...')
if node._goal_handle is not None:
# 发送取消请求
cancel_future = node._goal_handle.cancel_goal_async()
# 等待取消完成
rclpy.spin_until_future_complete(node, cancel_future, timeout_sec=2.0)
if cancel_future.result():
node.get_logger().info('取消请求已发送')
else:
node.get_logger().error('取消请求发送失败')
finally:
if rclpy.ok():
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Action Client 回调函数的调用时序:
Client 函数一览表:
| 函数名 | 类型 | 功能 | 何时注册 | 何时触发 |
|---|---|---|---|---|
send_goal() |
主动调用 | 发送目标到 Server | - | 用户调用 |
goal_response_callback |
回调函数 | 处理目标接受/拒绝响应 | send_goal_async().add_done_callback() |
Server 响应目标请求时(1 次) |
feedback_callback |
回调函数 | 处理执行过程中的进度反馈 | send_goal_async(feedback_callback=...) |
Server 每次发布反馈时(多次) |
get_result_callback |
回调函数 | 处理任务完成后的最终结果 | get_result_async().add_done_callback() |
Server 返回最终结果时(1 次) |
回调注册与触发时序图:
Client Server
│ │
│ ══════════════════════════════════════════════ │
│ ║ send_goal() 中完成两个回调的注册: ║ │
│ ║ 1. feedback_callback (在 send_goal_async) ║ │
│ ║ 2. goal_response_callback (add_done_callback)║ │
│ ══════════════════════════════════════════════ │
│ │
│ ① send_goal_async() 发送目标 │
│ ───────────────────────────────────────────────> │
│ goal_msg.angles=[150,80,90,45,100,60] │
│ goal_msg.speed_ratio=0.5 │
│ ┌─────────────────────────────────────────┐ │
│ │ 【注册】feedback_callback │ │
│ │ 【注册】goal_response_callback │ │
│ └─────────────────────────────────────────┘ │
│ │
│ ┌──────────┴──────────┐
│ │ goal_callback() │
│ │ 返回 ACCEPT ✓ │
│ └──────────┬──────────┘
│ │
│ ② Goal Response(目标接受/拒绝) │
│ <─────────────────────────────────────────────── │
│ │
┌─┴────────────────────────┐ │
│ goal_response_callback() │ ← 【触发】第1个回调 │
│ ┌──────────────────────┐ │ │
│ │ 检查 goal_handle │ │ │
│ │ .accepted │ │ │
│ │ - True: 继续 │ │ │
│ │ - False: 结束 │ │ │
│ └──────────────────────┘ │ │
│ ↓ │ │
│ ┌──────────────────────┐ │ │
│ │ get_result_async() │ │ │
│ │ ┌──────────────────┐ │ │ │
│ │ │【注册】 │ │ │ │
│ │ │get_result_callback│ │ │ │
│ │ └──────────────────┘ │ │ │
│ └──────────────────────┘ │ │
└─┬────────────────────────┘ ┌──────────┴──────────┐
│ │ execute_callback() │
│ │ 开始执行任务 │
│ └──────────┬──────────┘
│ │
│ ③ Feedback(反馈消息)[多次] │
│ <─────────────────────────────────────────────── │
│ progress=0%, elapsed_time=0.0s │
│ │
┌─┴────────────────────────┐ │
│ feedback_callback() │ ← 【触发】第2个回调(多次)│
│ ┌──────────────────────┐ │ │
│ │ 获取 feedback 数据 │ │ │
│ │ - progress │ │ │
│ │ - elapsed_time │ │ │
│ │ - current_angles │ │ │
│ │ - status_message │ │ │
│ └──────────────────────┘ │ │
│ ↓ │ │
│ ┌──────────────────────┐ │ │
│ │ 显示进度信息 │ │ │
│ │ (如: 进度=10%) │ │ │
│ └──────────────────────┘ │ │
└─┬────────────────────────┘ │
│ │
│ <─────────────────────────────────────────────── │
│ progress=10%, elapsed_time=0.3s │
│ │
┌─┴────────────────────────┐ │
│ feedback_callback() │ ← 【触发】再次 │
└─┬────────────────────────┘ │
│ ... │
│ <─────────────────────────────────────────────── │
│ progress=100%, elapsed_time=3.0s │
│ │
┌─┴────────────────────────┐ │
│ feedback_callback() │ ← 【触发】最后一次反馈 │
└─┬────────────────────────┘ ┌──────────┴──────────┐
│ │ succeed() 或 │
│ │ canceled() 或 │
│ │ abort() │
│ └──────────┬──────────┘
│ │
│ ④ Result(最终结果) │
│ <─────────────────────────────────────────────── │
│ success=true, message="完成" │
│ execution_time=3.3s │
│ final_angles=[150,80,90,45,100,60] │
│ │
┌─┴────────────────────────┐ │
│ get_result_callback() │ ← 【触发】第3个回调 │
│ ┌──────────────────────┐ │ │
│ │ 获取 result 数据 │ │ │
│ │ - success │ │ │
│ │ - message │ │ │
│ │ - execution_time │ │ │
│ │ - final_angles │ │ │
│ └──────────────────────┘ │ │
│ ↓ │ │
│ ┌──────────────────────┐ │ │
│ │ 处理最终结果 │ │ │
│ │ 成功: 打印结果 │ │ │
│ │ 失败: 显示错误 │ │ │
│ └──────────────────────┘ │ │
│ ↓ │ │
│ ┌──────────────────────┐ │ │
│ │ rclpy.shutdown() │ │ │
│ │ 结束节点 │ │ │
│ └──────────────────────┘ │ │
└─┬────────────────────────┘ │
│ │
▼ ▼
为什么 Action Client 设计得这么复杂?
你可能会问:为什么需要这么多回调函数?直接 send_goal() 然后等结果不行吗?
答案是:为了不阻塞!
| 方式 | 代码 | 问题 |
|---|---|---|
| 同步阻塞 | result = client.send_goal(goal) |
等待期间什么都做不了,无法接收反馈、无法取消 |
| 异步回调 | send_goal_async() + 回调 |
发送后立即返回,通过回调处理各阶段事件 |
类比:点外卖
同步阻塞(不推荐):
你: 下单 → 站在门口等 → 等 → 等 → 骑手到了 → 拿到外卖
问题: 等待期间你什么都干不了,也不知道骑手到哪了
异步回调(Action 方式):
你: 下单 → 继续做自己的事
↓
[商家接单通知] → goal_response_callback() → "好的,已接单"
↓
[骑手位置更新] → feedback_callback() → "骑手还有2公里"
[骑手位置更新] → feedback_callback() → "骑手已到小区"
↓
[送达通知] → get_result_callback() → "外卖到了!"
三个回调对应三个阶段:
| 阶段 | 回调函数 | 外卖类比 | 必要性 |
|---|---|---|---|
| 1. 目标响应 | goal_response_callback |
商家是否接单 | 知道任务是否开始 |
| 2. 过程反馈 | feedback_callback |
骑手实时位置 | 了解进度,可决定是否取消 |
| 3. 最终结果 | get_result_callback |
外卖送达确认 | 获取任务结果 |
总结:复杂的回调设计是为了实现非阻塞的长时间任务管理。
这样 Client 在等待过程中可以:
- 继续处理其他消息
- 实时显示进度
- 随时取消任务
- 响应用户交互
代码改进说明:
| 特性 | 改进内容 | 说明 |
|---|---|---|
| 进度条显示 | _create_progress_bar() |
可视化进度,比数字更直观 |
| 目标句柄跟踪 | self._goal_handle = None |
保存目标句柄,用于取消功能 |
| 取消支持 | KeyboardInterrupt + cancel_goal_async() |
按 Ctrl+C 取消任务 |
| 参数验证 | try-except 处理 |
友好的错误提示 |
| 清晰输出 | ✓/✗ 符号、分隔线 |
提升用户体验 |
测试 Action Client:
# 终端 1:启动 Server
python3 src/py_episode/py_episode/angle_mode_server.py
# 终端 2:启动 Client
python3 src/py_episode/py_episode/angle_mode_client.py 150.0 80.0 90.0 45.0 100.0 60.0 0.5
# 在运行过程中按 Ctrl+C 测试取消功能
运行效果:
[INFO] [angle_mode_client]: 角度模式 Action Client 已创建 [INFO] [angle_mode_client]: 等待 Action Server... [INFO] [angle_mode_client]: 发送目标: angles=[150.0, 80.0, 90.0, 45.0, 100.0, 60.0], speed_ratio=0.5 [INFO] [angle_mode_client]: ✓ 目标被接受,等待结果... [INFO] [angle_mode_client]: 反馈: [██░░░░░░░░░░░░░░░░░░] 10% | 已用时: 0.3s | 运动中... 10% [INFO] [angle_mode_client]: 反馈: [████░░░░░░░░░░░░░░░░] 20% | 已用时: 0.6s | 运动中... 20% ... [INFO] [angle_mode_client]: 反馈: [████████████████████] 100% | 已用时: 3.0s | 运动中... 100% [INFO] [angle_mode_client]: ================================================== [INFO] [angle_mode_client]: ✓ 任务成功! [INFO] [angle_mode_client]: 消息: 角度模式运动完成 [INFO] [angle_mode_client]: 执行时间: 3.30 秒 [INFO] [angle_mode_client]: 最终角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [angle_mode_client]: ==================================================
测试取消功能(在运行过程中按 Ctrl+C):
[INFO] [angle_mode_client]: 反馈: [████░░░░░░░░░░░░░░░░] 20% | 已用时: 0.6s | 运动中... 20% ^C [WARN] [angle_mode_client]: 检测到 Ctrl+C,正在取消任务... [INFO] [angle_mode_client]: 取消请求已发送 [INFO] [angle_mode_client]: ================================================== [ERROR] [angle_mode_client]: ✗ 任务失败! [ERROR] [angle_mode_client]: 消息: 任务被取消 [INFO] [angle_mode_client]: ==================================================
六、配置并运行
6.1 添加依赖
编辑 ~/ros2_ws/src/py_episode/package.xml,添加依赖:
<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>
<exec_depend>sensor_msgs</exec_depend>
<exec_depend>robot_arm_interfaces</exec_depend>
<exec_depend>rclpy_action</exec_depend> <!-- Action 支持 -->
6.2 添加入口点
编辑 ~/ros2_ws/src/py_episode/setup.py:
entry_points={
'console_scripts': [
'simple_pub = py_episode.simple_publisher:main',
'simple_sub = py_episode.simple_subscriber:main',
'joint_pub = py_episode.joint_state_publisher:main',
'joint_sub = py_episode.joint_state_subscriber:main',
'motor_pub = py_episode.motor_angles_publisher:main',
'motor_sub = py_episode.motor_angles_subscriber:main',
'simple_srv = py_episode.simple_service:main',
'simple_cli = py_episode.simple_client:main',
'gripper_srv = py_episode.gripper_service:main',
'gripper_cli = py_episode.gripper_client:main',
'angle_srv = py_episode.angle_mode_server:main', # 新增
'angle_cli = py_episode.angle_mode_client:main', # 新增
],
},
6.3 构建并运行
cd ~/ros2_ws
colcon build --packages-select py_episode --symlink-install
source install/setup.bash
# 终端 1:启动 Action Server
ros2 run py_episode angle_srv
# 终端 2:启动 Action Client
ros2 run py_episode angle_cli 150.0 80.0 90.0 45.0 100.0 60.0 0.5
6.4 运行效果
Server 输出:
[INFO] [angle_mode_server]: 角度模式 Action Server 已启动 [INFO] [angle_mode_server]: 接收到角度模式目标... [INFO] [angle_mode_server]: 目标角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [angle_mode_server]: 速度比例: 0.5 [INFO] [angle_mode_server]: 进度: 0.0%, 角度: [180.0, 90.0, 83.0, 30.0, 110.0, 30.0] [INFO] [angle_mode_server]: 进度: 10.0%, 角度: [177.0, 89.0, 83.7, 31.5, 109.0, 33.0] ... [INFO] [angle_mode_server]: 进度: 100.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [angle_mode_server]: 运动完成,耗时: 0.67秒
Client 输出:
[INFO] [angle_mode_client]: 角度模式 Action Client 已创建
[INFO] [angle_mode_client]: 等待 Action Server...
[INFO] [angle_mode_client]: 发送目标: angles=[150.0, 80.0, 90.0, 45.0, 100.0, 60.0], speed_ratio=0.5
[INFO] [angle_mode_client]: 目标被接受,等待结果...
[INFO] [angle_mode_client]: 反馈: 进度=0.0%, 已用时间=0.00s, 状态="运动中... 0%"
[INFO] [angle_mode_client]: 反馈: 进度=10.0%, 已用时间=0.07s, 状态="运动中... 10%"
...
[INFO] [angle_mode_client]: 任务成功!
消息: 角度模式运动完成
执行时间: 0.67秒
最终角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0]
6.5 使用命令行工具调试
# 查看 Action 列表
ros2 action list
# 输出: /angle_mode
# 查看 Action 类型
ros2 action info /angle_mode
# 输出:
# Action: /angle_mode
# Action clients: 1
# Action servers: 1
# 命令行发送目标(带反馈)
ros2 action send_goal --feedback /angle_mode robot_arm_interfaces/action/AngleMode \
"{angles: [160.0, 85.0, 80.0, 40.0, 105.0, 50.0], speed_ratio: 0.8}"
七、真实机械臂集成
7.1 理解真实场景的差异
模拟版和真实版的主要区别:
| 项目 | 模拟版 | 真实版 |
|---|---|---|
| 角度读取 | 内存变量 | CAN 总线通信 |
| 运动控制 | time.sleep() |
电机控制 API |
| 反馈数据 | 计算插值 | 读取实际传感器 |
| 错误处理 | 简单 | 需要处理通信超时、电机故障等 |
7.2 整体架构
在上一章(Service)的代码基础上,我们添加 Action Server 功能,实现一个同时支持 Topic、Service、Action 的完整机械臂接口节点:
┌─────────────────────────────────────────────────┐
│ RobotInterfaceNode │
│ ┌──────────────────────────────────────────┐ │
│ │ Topic Publisher │ │
│ │ ┌─────────────┐ │ │
│ │ │ /motor_ │ 30Hz 发布电机角度 │ │
│ │ │ angles │ │ │
│ │ └──────┬──────┘ │ │
│ └─────────┼────────────────────────────────┘ │
│ │ │
│ ┌─────────┼────────────────────────────────┐ │
│ │ │ Service Server │ │
│ │ │ ┌─────────────────┐ │ │
│ │ │ │ /servo_gripper │ │ │
│ │ │ │ 舵机夹爪控制 │ │ │
│ │ │ └────────┬────────┘ │ │
│ └─────────┼─────────────┼──────────────────┘ │
│ │ │ │
│ ┌─────────┼─────────────┼──────────────────┐ │
│ │ │ Action Server (新增) │ │
│ │ │ ┌─────────────────┐ │ │
│ │ │ │ /angle_mode │ │ │
│ │ │ │ 角度模式运动 │ │ │
│ │ │ └────────┬────────┘ │ │
│ └─────────┼─────────────┼──────────────────┘ │
│ │ │ │
│ └──────┬──────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ MotorControl │ │
│ │ (CAN 通信) │ │
│ └─────────────┘ │
└─────────────────────────────────────────────────┘
7.3 更新机械臂接口节点
在上一章的代码基础上,添加角度模式 Action Server:
创建文件 ~/ros2_ws/src/py_episode/py_episode/robot_interface_with_action.py:
#!/usr/bin/env python3
"""
机械臂接口节点 - 初始化 + 发布电机角度 + 舵机夹爪服务 + 角度模式动作
在上一章基础上新增 Action Server 功能
演示 Topic + Service + Action 三种通信方式的综合应用
"""
import time
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer, CancelResponse, GoalResponse # 新增:取消和目标响应
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.executors import MultiThreadedExecutor
from robot_arm_interfaces.msg import MotorAngles # Topic 消息
from robot_arm_interfaces.srv import ServoGripper # Service 接口
from robot_arm_interfaces.action import AngleMode # Action 接口(新增)
# 导入电机控制核心库
from episode_controller_core.controller_core import MotorControl
class RobotInterfaceNode(Node):
"""机械臂接口节点:Topic + Service + Action 完整演示"""
def __init__(self):
super().__init__('robot_interface_node')
# ========== 新增:可重入回调组(支持 Action 取消) ==========
self.callback_group = ReentrantCallbackGroup()
# ========== 配置参数 ==========
self.declare_parameter('usb_index', 1)
self.declare_parameter('init_mode', 1)
self.declare_parameter('publish_rate', 30.0)
self.usb_index = self.get_parameter('usb_index').get_parameter_value().integer_value
self.init_mode = self.get_parameter('init_mode').get_parameter_value().integer_value
self.publish_rate = self.get_parameter('publish_rate').get_parameter_value().double_value
self.home_position = [180, 90, 83, 30, 110, 30]
self.calibration_timeout = 120.0
self.cached_motor_angles = [0.0] * 6
# ========== 初始化电机控制 ==========
self.motor_control = None
self._initialize_robot()
# ========== 创建 Topic Publisher ==========
self.publisher = self.create_publisher(
MotorAngles,
'motor_angles',
10
)
self.get_logger().info('Topic 发布器已创建: /motor_angles')
# ========== 创建 Service Server(舵机夹爪) ==========
self.servo_gripper_srv = self.create_service(
ServoGripper,
'servo_gripper',
self.servo_gripper_callback,
callback_group=self.callback_group
)
self.get_logger().info('Service 服务已创建: /servo_gripper')
# ========== 新增:创建 Action Server(角度模式,支持取消) ==========
self.angle_mode_action_server = ActionServer(
self,
AngleMode, # Action 类型
'angle_mode', # Action 名称
self.angle_mode_callback, # 执行回调
callback_group=self.callback_group, # 使用可重入回调组
goal_callback=self.goal_callback, # 目标接收回调(新增)
cancel_callback=self.cancel_callback # 取消请求回调(新增)
)
self.get_logger().info('Action 服务已创建: /angle_mode (支持取消)')
# ========== 创建定时器 ==========
timer_period = 1.0 / self.publish_rate
self.timer = self.create_timer(timer_period, self.publish_motor_angles)
self.get_logger().info('='*50)
self.get_logger().info('机械臂接口节点已启动')
self.get_logger().info(f' - Topic: /motor_angles ({self.publish_rate} Hz)')
self.get_logger().info(f' - Service: /servo_gripper')
self.get_logger().info(f' - Action: /angle_mode')
self.get_logger().info('='*50)
# ==================== 初始化相关方法 ====================
def _initialize_robot(self):
"""初始化机械臂"""
mode_name = "回零校准" if self.init_mode == 0 else "从当前位置恢复"
self.get_logger().info(f'开始初始化机械臂,usb_index={self.usb_index}, init_mode={self.init_mode} ({mode_name})')
self.motor_control = MotorControl(self.usb_index)
if not self.motor_control.initialized:
self.get_logger().error('CAN 设备初始化失败')
raise RuntimeError('CAN 设备初始化失败')
self.get_logger().info('CAN 通信初始化成功')
if self.init_mode == 1:
self._recover_from_current()
else:
self._perform_calibration()
def _recover_from_current(self):
"""从当前位置恢复"""
self.get_logger().info('从当前位置恢复中...')
current_angles = self.motor_control.read_motor_angles()
if not current_angles:
raise RuntimeError('无法读取电机角度')
self.motor_control.last_degrees = [
current_angles[i] * self.motor_control.motor_ratios[i]
for i in range(6)
]
self.cached_motor_angles = list(current_angles)
self.get_logger().info(f'成功从当前位置恢复: {current_angles}')
def _perform_calibration(self):
"""执行回零校准"""
self.get_logger().info('开始回零校准流程...')
for joint_index, addr in enumerate(self.motor_control.motor_address):
self.get_logger().info(f'正在校准关节 {joint_index + 1}/6 ...')
self.motor_control.trigger_home(addr, 0x02)
time.sleep(0.05)
start_time = time.perf_counter()
while True:
if self.motor_control.check_home_status(addr):
self.get_logger().info(f'关节 {joint_index + 1} 校准完成')
break
if time.perf_counter() - start_time > self.calibration_timeout:
raise RuntimeError(f'关节 {joint_index + 1} 校准超时')
time.sleep(0.5)
self.get_logger().info(f'校准完成,正在移动到 Home 位置: {self.home_position}')
time.sleep(1.0)
move_time = self.motor_control.execute_motors_degrees_normal(self.home_position)
time.sleep(move_time)
self.cached_motor_angles = list(self.home_position)
self.get_logger().info('机械臂初始化完成')
# ==================== Topic 发布 ====================
def publish_motor_angles(self):
"""定时器回调:发布电机角度"""
if self.motor_control is None:
return
msg = MotorAngles()
msg.timestamp = self.get_clock().now().to_msg()
try:
angles = self.motor_control.read_motor_angles()
if angles and all(angle is not None for angle in angles):
msg.angles = angles
msg.success = True
msg.message = "读取成功"
self.cached_motor_angles = list(angles)
else:
msg.angles = self.cached_motor_angles
msg.success = False
msg.message = "读取失败"
except Exception as e:
msg.angles = self.cached_motor_angles
msg.success = False
msg.message = f"读取异常: {str(e)}"
self.publisher.publish(msg)
# ==================== Service 回调 ====================
def servo_gripper_callback(self, request, response):
"""舵机夹爪服务回调函数"""
try:
if self.motor_control is None:
response.success = False
response.message = "电机控制未初始化"
response.actual_angle = 0
return response
target_angle = request.angle
clamped_angle = max(0, min(target_angle, 110))
self.motor_control.servo_gripper(clamped_angle)
response.success = True
response.actual_angle = clamped_angle
if target_angle != clamped_angle:
response.message = f'角度已限制: 请求 {target_angle}° -> 实际 {clamped_angle}°'
else:
response.message = f'夹爪已设置到 {clamped_angle}°'
self.get_logger().info(f'[Service] {response.message}')
except Exception as e:
response.success = False
response.message = f'控制失败: {str(e)}'
response.actual_angle = 0
self.get_logger().error(f'[Service] {response.message}')
return response
# ==================== 新增:Action 回调 ====================
def goal_callback(self, goal_request):
"""
目标接收回调:决定是否接受目标
参数:
goal_request: 目标请求(包含 angles 和 speed_ratio)
返回:
GoalResponse.ACCEPT 或 GoalResponse.REJECT
"""
self.get_logger().info('[Action] 收到目标请求')
# 可以在这里添加目标验证逻辑
angles = goal_request.angles
if len(angles) != 6:
self.get_logger().warn(f'[Action] 拒绝目标:角度数量错误(期望6个,收到{len(angles)}个)')
return GoalResponse.REJECT
# 接受目标
return GoalResponse.ACCEPT
def cancel_callback(self, goal_handle):
"""
取消请求回调:决定是否允许取消
参数:
goal_handle: 目标句柄
返回:
CancelResponse.ACCEPT 或 CancelResponse.REJECT
"""
self.get_logger().info('[Action] 收到取消请求,准备紧急停止')
# 触发紧急停止
if self.motor_control is not None:
try:
self.motor_control.set_emergency_stop(active=True)
self.get_logger().warn('[Action] 已触发紧急停止')
# 更新电机控制器的角度记录,确保下次运动计算准确
# 使用缓存的角度(Topic 持续更新)
self.motor_control.last_degrees = [
self.cached_motor_angles[i] * self.motor_control.motor_ratios[i]
for i in range(6)
]
self.get_logger().info('[Action] 已更新电机角度记录')
except Exception as e:
self.get_logger().error(f'[Action] 紧急停止失败: {str(e)}')
# 接受取消请求
return CancelResponse.ACCEPT
def angle_mode_callback(self, goal_handle):
"""
角度模式动作回调函数
参数:
goal_handle: 目标句柄,包含请求参数和状态控制方法
返回:
AngleMode.Result: 执行结果
"""
self.get_logger().info('[Action] 接收到角度模式目标...')
# 创建结果消息
result = AngleMode.Result()
# 检查电机控制是否已初始化
if self.motor_control is None:
result.success = False
result.message = "电机控制未初始化"
result.execution_time = 0.0
result.final_angles = self.cached_motor_angles
self.get_logger().error(f'[Action] {result.message}')
goal_handle.abort()
return result
# 获取目标参数
angles = list(goal_handle.request.angles)
speed_ratio = goal_handle.request.speed_ratio
# 参数验证
if len(angles) != 6:
result.success = False
result.message = f"角度数量错误: 期望 6 个,实际 {len(angles)} 个"
result.execution_time = 0.0
result.final_angles = self.cached_motor_angles
self.get_logger().error(f'[Action] {result.message}')
goal_handle.abort()
return result
if not (0.0 < speed_ratio <= 1.0):
speed_ratio = max(0.1, min(speed_ratio, 1.0))
self.get_logger().warn(f'[Action] 速度比例已限制为: {speed_ratio}')
self.get_logger().info(f'[Action] 目标角度: {angles}')
self.get_logger().info(f'[Action] 速度比例: {speed_ratio}')
start_time = time.perf_counter()
try:
# 调用电机控制 API 执行运动
execution_time = float(self.motor_control.execute_motors_degrees_normal(
angles, speed_ratio
))
self.get_logger().info(f'[Action] 预计执行时间: {execution_time:.2f} 秒')
# 发送进度反馈
steps = 10
for i in range(steps + 1):
# 检查是否请求取消
if goal_handle.is_cancel_requested:
# 解除紧急停止,允许后续运动命令
if self.motor_control is not None:
try:
self.motor_control.set_emergency_stop(active=False)
self.get_logger().info('[Action] 已解除紧急停止')
except Exception as e:
self.get_logger().error(f'[Action] 解除紧急停止失败: {str(e)}')
result.success = False
result.message = "任务被取消"
result.execution_time = float(time.perf_counter() - start_time)
result.final_angles = self.cached_motor_angles
self.get_logger().warn(f'[Action] {result.message}')
goal_handle.canceled()
return result
# 创建并发布反馈
feedback_msg = AngleMode.Feedback()
feedback_msg.elapsed_time = float(time.perf_counter() - start_time)
feedback_msg.progress = float(i / steps)
feedback_msg.current_angles = self.cached_motor_angles
feedback_msg.status_message = f"运动中... {int(i / steps * 100)}%"
goal_handle.publish_feedback(feedback_msg)
self.get_logger().info(f'[Action] 进度: {int(i / steps * 100)}%')
# 等待(除了最后一步)
if i < steps:
time.sleep(execution_time / steps)
# 运动完成
result.success = True
result.message = f"角度模式运动完成"
result.execution_time = float(time.perf_counter() - start_time)
result.final_angles = self.cached_motor_angles
self.get_logger().info(f'[Action] {result.message},耗时: {result.execution_time:.2f} 秒')
goal_handle.succeed()
return result
except Exception as e:
result.success = False
result.message = f"执行失败: {str(e)}"
result.execution_time = float(time.perf_counter() - start_time)
result.final_angles = self.cached_motor_angles
self.get_logger().error(f'[Action] {result.message}')
goal_handle.abort()
return result
def main():
rclpy.init()
try:
node = RobotInterfaceNode()
# 使用多线程执行器,支持 Action 取消功能
executor = MultiThreadedExecutor(num_threads=4)
executor.add_node(node)
try:
executor.spin()
finally:
executor.shutdown()
node.destroy_node()
except RuntimeError as e:
print(f'启动失败: {e}')
except KeyboardInterrupt:
pass
finally:
rclpy.shutdown()
if __name__ == '__main__':
main()
对比上一章的代码,新增了:
| 新增内容 | 代码 | 说明 |
|---|---|---|
| 导入 Action 相关模块 | from rclpy.action import ActionServer, CancelResponse, GoalResponse |
Action 服务端和取消支持 |
| 导入回调组 | from rclpy.callback_groups import ReentrantCallbackGroup |
支持并发 |
| 导入执行器 | from rclpy.executors import MultiThreadedExecutor |
多线程执行 |
| 导入 Action 接口 | from robot_arm_interfaces.action import AngleMode |
自定义 Action |
| 创建回调组 | self.callback_group = ReentrantCallbackGroup() |
允许并发回调 |
| 创建 Action Server | ActionServer(self, AngleMode, 'angle_mode', ..., goal_callback=..., cancel_callback=...) |
角度模式 Action(支持取消) |
| 目标接收回调 | goal_callback(self, goal_request) |
决定是否接受目标 |
| 取消请求回调 | cancel_callback(self, goal_handle) |
触发紧急停止 |
| Action 执行回调 | angle_mode_callback(self, goal_handle) |
处理运动请求 |
| 多线程执行器 | MultiThreadedExecutor(num_threads=4) |
支持取消功能 |
7.4 实现角度控制客户端
为了方便测试和使用,我们创建一个独立的客户端节点,可以通过命令行参数控制机械臂运动到指定角度。
创建文件 ~/ros2_ws/src/py_episode/py_episode/angles_control_client.py:
#!/usr/bin/env python3
"""
角度模式控制客户端 - 通过命令行参数控制机械臂运动到指定角度
演示如何在独立节点中使用 Action Client
"""
import sys
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from robot_arm_interfaces.action import AngleMode
class AnglesControlClient(Node):
"""角度模式控制客户端节点"""
def __init__(self):
super().__init__('angles_control_client')
# 创建 Action Client
self.action_client = ActionClient(self, AngleMode, 'angle_mode')
# 等待 Action Server 可用
self.get_logger().info('等待角度模式 Action Server...')
while not self.action_client.wait_for_server(timeout_sec=1.0):
self.get_logger().info('服务不可用,继续等待...')
self.get_logger().info('角度模式 Action Server 已连接')
# 用于跟踪目标句柄
self._goal_handle = None
def send_goal(self, angles, speed_ratio=0.5):
"""
发送角度模式目标
参数:
angles: 6 个关节的目标角度列表
speed_ratio: 运动速度比例(0.0-1.0)
"""
# 创建目标消息
goal_msg = AngleMode.Goal()
goal_msg.angles = angles
goal_msg.speed_ratio = speed_ratio
self.get_logger().info(f'发送目标: angles={angles}, speed_ratio={speed_ratio}')
# 异步发送目标,注册反馈回调
self._send_goal_future = self.action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
# 注册目标响应回调
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
"""目标响应回调:处理目标是否被接受"""
self._goal_handle = future.result()
if not self._goal_handle.accepted:
self.get_logger().error('✗ 目标被拒绝')
rclpy.shutdown()
return
self.get_logger().info('✓ 目标被接受,等待执行...')
# 获取结果
self._get_result_future = self._goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def feedback_callback(self, feedback_msg):
"""反馈回调:处理执行过程中的进度更新"""
feedback = feedback_msg.feedback
# 显示进度条
progress_bar = self._create_progress_bar(feedback.progress)
self.get_logger().info(
f'反馈: {progress_bar} {feedback.progress*100:.0f}% | '
f'已用时: {feedback.elapsed_time:.1f}s | '
f'{feedback.status_message}'
)
def get_result_callback(self, future):
"""结果回调:处理任务完成后的最终结果"""
result = future.result().result
self.get_logger().info('='*50)
if result.success:
self.get_logger().info('✓ 任务成功!')
self.get_logger().info(f' 消息: {result.message}')
self.get_logger().info(f' 执行时间: {result.execution_time:.2f} 秒')
self.get_logger().info(f' 最终角度: {list(result.final_angles)}')
else:
self.get_logger().error('✗ 任务失败!')
self.get_logger().error(f' 消息: {result.message}')
self.get_logger().info('='*50)
# 任务完成,关闭节点
rclpy.shutdown()
def _create_progress_bar(self, progress, width=20):
"""创建文本进度条"""
filled = int(width * progress)
bar = '█' * filled + '░' * (width - filled)
return f'[{bar}]'
def main():
# 解析命令行参数
if len(sys.argv) < 7:
print('用法: ros2 run py_episode angles_ctrl <j1> <j2> <j3> <j4> <j5> <j6> [speed_ratio]')
print()
print('参数说明:')
print(' j1-j6: 6 个关节的目标角度(单位:度)')
print(' speed_ratio: 运动速度比例 0.0-1.0(可选,默认 0.5)')
print()
print('示例:')
print(' ros2 run py_episode angles_ctrl 150 80 90 45 100 60 # 默认速度')
print(' ros2 run py_episode angles_ctrl 150 80 90 45 100 60 0.3 # 慢速')
print(' ros2 run py_episode angles_ctrl 180 90 83 30 110 30 0.8 # 快速回到 Home')
sys.exit(1)
# 解析角度参数
try:
angles = [float(sys.argv[i]) for i in range(1, 7)]
except ValueError as e:
print(f'错误: 角度参数必须是数字,但收到: {sys.argv[1:7]}')
sys.exit(1)
# 解析速度比例(可选)
speed_ratio = 0.5
if len(sys.argv) >= 8:
try:
speed_ratio = float(sys.argv[7])
if not (0.0 < speed_ratio <= 1.0):
print(f'警告: 速度比例应在 0.0-1.0 之间,已自动限制')
speed_ratio = max(0.1, min(speed_ratio, 1.0))
except ValueError:
print(f'警告: 无法解析速度比例 "{sys.argv[7]}",使用默认值 0.5')
# 初始化节点
rclpy.init()
node = AnglesControlClient()
try:
# 发送目标
node.send_goal(angles, speed_ratio)
# 保持节点运行,等待回调
rclpy.spin(node)
except KeyboardInterrupt:
# 用户按 Ctrl+C 取消
node.get_logger().warn('\n检测到 Ctrl+C,正在取消任务...')
if node._goal_handle is not None:
# 发送取消请求
cancel_future = node._goal_handle.cancel_goal_async()
# 等待取消完成
rclpy.spin_until_future_complete(node, cancel_future, timeout_sec=2.0)
if cancel_future.result():
node.get_logger().info('取消请求已发送')
else:
node.get_logger().error('取消请求发送失败')
finally:
if rclpy.ok():
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
代码说明:
| 特性 | 实现 | 说明 |
|---|---|---|
| 命令行参数 | sys.argv[1:7] 获取角度 |
支持 6 个关节角度 + 可选速度 |
| 服务等待 | wait_for_server() |
确保 Action Server 可用 |
| 异步发送 | send_goal_async() |
非阻塞发送目标 |
| 反馈处理 | feedback_callback |
实时显示进度条 |
| 结果处理 | get_result_callback |
显示最终执行结果 |
| 取消支持 | KeyboardInterrupt 处理 + cancel_goal_async() |
Ctrl+C 触发紧急停止并取消任务 |
7.5 添加入口点并运行
更新 setup.py:
entry_points={
'console_scripts': [
'simple_pub = py_episode.simple_publisher:main',
'simple_sub = py_episode.simple_subscriber:main',
'joint_pub = py_episode.joint_state_publisher:main',
'joint_sub = py_episode.joint_state_subscriber:main',
'motor_pub = py_episode.motor_angles_publisher:main',
'motor_sub = py_episode.motor_angles_subscriber:main',
'simple_srv = py_episode.simple_service:main',
'simple_cli = py_episode.simple_client:main',
'gripper_srv = py_episode.gripper_service:main',
'gripper_cli = py_episode.gripper_client:main',
'robot_interface_srv = py_episode.robot_interface_with_service:main',
'gripper_ctrl = py_episode.gripper_control_client:main',
'angle_srv = py_episode.angle_mode_server:main',
'angle_cli = py_episode.angle_mode_client:main',
'robot_interface_action = py_episode.robot_interface_with_action:main', # 新增
'angles_ctrl = py_episode.angles_control_client:main', # 新增
],
},
构建并运行:
cd ~/ros2_ws
colcon build --packages-select py_episode --symlink-install
source install/setup.bash
# 终端 1:启动机械臂接口节点(使用模式 1 跳过回零)
ros2 run py_episode robot_interface_action --ros-args -p usb_index:=1 -p init_mode:=1
# 终端 2:使用客户端控制机械臂运动
ros2 run py_episode angles_ctrl 150 80 90 45 100 60 # 默认速度
ros2 run py_episode angles_ctrl 180 90 83 30 110 30 0.8 # 快速回到 Home
ros2 run py_episode angles_ctrl 160 85 80 40 105 50 0.3 # 慢速运动
7.6 运行效果
Server 输出:
[INFO] [robot_interface_node]: ================================================ [INFO] [robot_interface_node]: 机械臂接口节点已启动 [INFO] [robot_interface_node]: - Topic: /motor_angles (30.0 Hz) [INFO] [robot_interface_node]: - Service: /servo_gripper [INFO] [robot_interface_node]: - Action: /angle_mode [INFO] [robot_interface_node]: ================================================ [INFO] [robot_interface_node]: [Action] 接收到角度模式目标... [INFO] [robot_interface_node]: [Action] 目标角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [robot_interface_node]: [Action] 速度比例: 0.5 [INFO] [robot_interface_node]: [Action] 预计执行时间: 0.67 秒 [INFO] [robot_interface_node]: [Action] 进度: 0% [INFO] [robot_interface_node]: [Action] 进度: 10% ... [INFO] [robot_interface_node]: [Action] 进度: 100% [INFO] [robot_interface_node]: [Action] 角度模式运动完成,耗时: 0.70 秒
Client 输出:
[INFO] [angles_control_client]: 等待角度模式 Action Server... [INFO] [angles_control_client]: 角度模式 Action Server 已连接 [INFO] [angles_control_client]: 发送目标: angles=[150.0, 80.0, 90.0, 45.0, 100.0, 60.0], speed_ratio=0.5 [INFO] [angles_control_client]: ✓ 目标被接受,等待执行... [INFO] [angles_control_client]: 反馈: [██░░░░░░░░░░░░░░░░░░] 10% | 已用时: 0.1s | 运动中... 10% [INFO] [angles_control_client]: 反馈: [████░░░░░░░░░░░░░░░░] 20% | 已用时: 0.2s | 运动中... 20% ... [INFO] [angles_control_client]: 反馈: [████████████████████] 100% | 已用时: 0.7s | 运动中... 100% [INFO] [angles_control_client]: ================================================== [INFO] [angles_control_client]: ✓ 任务成功! [INFO] [angles_control_client]: 消息: 角度模式运动完成 [INFO] [angles_control_client]: 执行时间: 0.70 秒 [INFO] [angles_control_client]: 最终角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [angles_control_client]: ==================================================
7.7 取消功能测试
Action 的一个重要特性是支持中途取消任务。当需要紧急停止机械臂运动时,可以通过取消功能触发紧急停止。
取消功能的工作流程:
Client Server
│ │
│ ① 发送目标 │
│ ──────────────────────────────────────────────> │
│ │
│ ② 目标被接受 │
│ <────────────────────────────────────────────── │
│ │
│ ┌──────────┴──────────┐
│ │ execute_callback() │
│ │ 开始执行任务 │
│ └──────────┬──────────┘
│ ③ 持续反馈 │
│ <────────────────────────────────────────────── │
│ progress=10%, 20%, 30%... │
│ │
【用户按 Ctrl+C】 │
│ │
│ ④ 发送取消请求 │
│ ──────────────────────────────────────────────> │
│ │
│ ┌──────────┴──────────┐
│ │ cancel_callback() │
│ │ 触发紧急停止 │
│ │ set_emergency_ │
│ │ stop(True) │
│ └──────────┬──────────┘
│ ⑤ 取消确认 │
│ <────────────────────────────────────────────── │
│ │
│ ┌──────────┴──────────┐
│ │ execute_callback() │
│ │ 检测到取消请求 │
│ │ 解除紧急停止 │
│ │ set_emergency_ │
│ │ stop(False) │
│ └──────────┬──────────┘
│ ⑥ 返回取消结果 │
│ <────────────────────────────────────────────── │
│ success=false, message="任务被取消" │
│ │
▼ ▼
测试取消功能:
# 终端 1:启动 Server
ros2 run py_episode robot_interface_action --ros-args -p usb_index:=1 -p init_mode:=1
# 终端 2:启动 Client,在运动过程中按 Ctrl+C
ros2 run py_episode angles_ctrl 150 80 90 45 100 60 0.3 # 使用慢速便于测试取消
# 等待几秒后按 Ctrl+C
取消时的输出效果:
Client 输出:
[INFO] [angles_control_client]: 发送目标: angles=[150.0, 80.0, 90.0, 45.0, 100.0, 60.0], speed_ratio=0.3 [INFO] [angles_control_client]: ✓ 目标被接受,等待执行... [INFO] [angles_control_client]: 反馈: [██░░░░░░░░░░░░░░░░░░] 10% | 已用时: 0.3s | 运动中... 10% [INFO] [angles_control_client]: 反馈: [████░░░░░░░░░░░░░░░░] 20% | 已用时: 0.6s | 运动中... 20% ^C [WARN] [angles_control_client]: 检测到 Ctrl+C,正在取消任务... [INFO] [angles_control_client]: 取消请求已发送 [INFO] [angles_control_client]: ================================================== [ERROR] [angles_control_client]: ✗ 任务失败! [ERROR] [angles_control_client]: 消息: 任务被取消 [INFO] [angles_control_client]: ==================================================
Server 输出:
[INFO] [robot_interface_node]: [Action] 收到目标请求 [INFO] [robot_interface_node]: [Action] 接收到角度模式目标... [INFO] [robot_interface_node]: [Action] 目标角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [robot_interface_node]: [Action] 速度比例: 0.3 [INFO] [robot_interface_node]: [Action] 预计执行时间: 1.20 秒 [INFO] [robot_interface_node]: [Action] 进度: 0% [INFO] [robot_interface_node]: [Action] 进度: 10% [INFO] [robot_interface_node]: [Action] 进度: 20% [INFO] [robot_interface_node]: [Action] 收到取消请求,准备紧急停止 [WARN] [robot_interface_node]: [Action] 已触发紧急停止 [INFO] [robot_interface_node]: [Action] 已解除紧急停止 [WARN] [robot_interface_node]: [Action] 任务被取消
取消功能的关键实现:
| 组件 | 关键代码 | 说明 |
|---|---|---|
| Server: 接受取消 | cancel_callback() 返回 CancelResponse.ACCEPT |
允许取消 |
| Server: 紧急停止 | self.motor_control.set_emergency_stop(active=True) |
立即停止机械臂 |
| Server: 更新角度 | self.motor_control.last_degrees = [...] |
记录停止时的角度,确保下次运动计算准确 |
| Server: 检测取消 | if goal_handle.is_cancel_requested: |
循环中频繁检查 |
| Server: 解除停止 | self.motor_control.set_emergency_stop(active=False) |
允许后续运动 |
| Server: 标记取消 | goal_handle.canceled() |
设置状态为 CANCELED |
| Client: 发送取消 | goal_handle.cancel_goal_async() |
请求取消任务 |
| Client: 等待确认 | rclpy.spin_until_future_complete() |
等待取消完成 |
为什么需要更新角度记录?
# 在 cancel_callback 中
self.motor_control.set_emergency_stop(active=True) # 紧急停止
# 更新角度记录(使用 Topic 持续更新的缓存角度)
self.motor_control.last_degrees = [
self.cached_motor_angles[i] * self.motor_control.motor_ratios[i]
for i in range(6)
]
# ↑ 这一步很重要!
# 如果不更新,电机控制器还记录着"停止前应该到达的目标角度"
# 下次运动时会从错误的起始位置计算,导致突然的大幅运动
为什么需要解除紧急停止?
# 在检测到取消时
if goal_handle.is_cancel_requested:
# 1. 触发紧急停止(在 cancel_callback 中已完成)
# 机械臂立即停止,角度已更新
# 2. 解除紧急停止
self.motor_control.set_emergency_stop(active=False)
# ↑ 这一步很重要!
# 如果不解除,后续的运动命令都会被阻塞
# 3. 返回取消结果
result.message = "任务被取消"
goal_handle.canceled()
return result
如果不解除紧急停止,机械臂将保持锁定状态,无法响应新的运动命令。
7.8 综合测试
现在我们有了一个同时支持 Topic + Service + Action 的完整节点,可以进行综合测试:
# 终端 1:启动机械臂接口节点
ros2 run py_episode robot_interface_action --ros-args -p usb_index:=1 -p init_mode:=1
# 终端 2:订阅 Topic 查看电机角度
ros2 topic echo /motor_angles
# 终端 3:调用 Service 控制夹爪
ros2 run py_episode gripper_ctrl 50 # 设置夹爪到 50 度
# 终端 4:调用 Action 控制机械臂运动
ros2 run py_episode angles_ctrl 150 80 90 45 100 60 0.5
也可以使用命令行工具测试:
# 查看所有 Topic
ros2 topic list
# 查看所有 Service
ros2 service list
# 查看所有 Action
ros2 action list
# 命令行调用 Service
ros2 service call /servo_gripper robot_arm_interfaces/srv/ServoGripper "{angle: 60}"
# 命令行调用 Action(带反馈)
ros2 action send_goal --feedback /angle_mode robot_arm_interfaces/action/AngleMode \
"{angles: [160.0, 85.0, 80.0, 40.0, 105.0, 50.0], speed_ratio: 0.5}"
7.9 三种通信方式对比总结
通过本节的实战,我们在一个节点中实现了三种通信方式:
| 通信方式 | 接口名称 | 功能 | 使用场景 | 取消支持 |
|---|---|---|---|---|
| Topic | /motor_angles |
30Hz 发布电机角度 | 持续监控、数据记录 | 不适用 |
| Service | /servo_gripper |
控制舵机夹爪角度 | 快速命令、需要结果确认 | 不支持 |
| Action | /angle_mode |
控制机械臂运动到目标角度 | 长时间任务、需要进度反馈 | ✓ 支持 |
┌─────────────────────────────────────────────────────────────┐
│ 通信方式选择指南 │
├─────────────────────────────────────────────────────────────┤
│ 数据类型 推荐方式 原因 │
├─────────────────────────────────────────────────────────────┤
│ 传感器数据流 Topic 持续、高频、单向 │
│ 状态查询 Service 按需、快速、需要返回值 │
│ 快速命令 Service 一次性、需要确认结果 │
│ 运动控制 Action 长时间、需要反馈、可取消 │
│ 导航任务 Action 多步骤、需要进度更新 │
│ 紧急停止场景 Action 需要中途取消能力 │
└─────────────────────────────────────────────────────────────┘
Action 取消功能的价值:
在机械臂控制场景中,取消功能不仅是一个"nice to have"特性,而是安全性的关键需求:
| 场景 | 为什么需要取消 |
|---|---|
| 碰撞风险 | 发现即将碰撞障碍物,需要立即停止 |
| 路径错误 | 发现规划路径有误,需要重新规划 |
| 紧急情况 | 操作人员需要介入处理 |
| 任务变更 | 需要执行更高优先级的任务 |
通过 set_emergency_stop(active=True/False) 机制,我们实现了:
- 立即响应:收到取消请求时立即停止机械臂
- 状态恢复:完成取消后解除紧急停止,允许后续运动
- 安全保障:避免失控运动导致的设备损坏或人员伤害
八、通信方式总结对比
8.1 三种通信方式完整对比
| 特性 | Topic | Service | Action |
|---|---|---|---|
| 通信模式 | 发布-订阅 | 请求-响应 | 目标-反馈-结果 |
| 方向 | 单向 | 双向(一次) | 双向(多次) |
| 同步/异步 | 异步 | 同步 | 异步 |
| 反馈机制 | 无 | 无 | 有(持续) |
| 取消支持 | 不适用 | 不支持 | 支持 |
| 接口文件 | .msg |
.srv(2 部分) |
.action(3 部分) |
| 典型场景 | 传感器数据、状态发布 | 查询、快速命令 | 长时间任务、运动控制 |
8.2 创建方法对比
# Topic Publisher
self.pub = self.create_publisher(MsgType, 'topic_name', 10)
self.pub.publish(msg)
# Topic Subscriber
self.sub = self.create_subscription(MsgType, 'topic_name', self.callback, 10)
# Service Server
self.srv = self.create_service(SrvType, 'service_name', self.callback)
# Service Client
self.cli = self.create_client(SrvType, 'service_name')
future = self.cli.call_async(request)
# Action Server
from rclpy.action import ActionServer
self.action_srv = ActionServer(self, ActionType, 'action_name', self.execute_callback)
# Action Client
from rclpy.action import ActionClient
self.action_cli = ActionClient(self, ActionType, 'action_name')
future = self.action_cli.send_goal_async(goal, feedback_callback=self.fb_callback)
8.3 回调函数对比
| 类型 | 回调签名 | 返回值 |
|---|---|---|
| Topic Subscriber | callback(msg) |
无 |
| Service Server | callback(request, response) |
response |
| Action Server | execute_callback(goal_handle) |
result |
| Action Client | 三个回调:goal_response、feedback、get_result |
无 |
九、本章小结
9.1 学到了什么?
-
Action 的核心概念
- 目标(Goal)→ 反馈(Feedback)→ 结果(Result)
- 适用于长时间运行、需要进度反馈、可能被取消的任务
-
自定义 Action 接口
.action文件包含三部分,用---分隔- 需要在
CMakeLists.txt中声明并编译
-
Action Server 编写
- 使用
ActionServer类创建 execute_callback负责执行任务、发布反馈、返回结果- 使用
goal_handle.publish_feedback()发送进度 - 使用
goal_handle.succeed()/.abort()/.canceled()标记状态
- 使用
-
Action Client 编写
- 使用
ActionClient类创建 send_goal_async()异步发送目标- 通过回调函数处理响应、反馈和结果
- 使用
十、附录
10.1 常见错误排查
| 错误现象 | 可能原因 | 解决方法 |
|---|---|---|
| "Action server not available" | Server 未启动或名称不匹配 | 检查 Server 是否运行,确认 Action 名称一致 |
| "Goal rejected" | Server 拒绝了目标 | 检查目标参数是否有效 |
| 反馈回调不触发 | 未注册 feedback_callback | 确认 send_goal_async() 传入了回调函数 |
| 结果回调不触发 | 未调用 goal_handle.succeed() |
确认 Server 正确标记了目标状态 |
10.2 调试命令
# 查看所有 Action
ros2 action list
# 查看 Action 详情
ros2 action info /angle_mode
# 查看 Action 接口定义
ros2 interface show robot_arm_interfaces/action/AngleMode
# 发送目标(带反馈)
ros2 action send_goal --feedback /angle_mode robot_arm_interfaces/action/AngleMode \
"{angles: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0], speed_ratio: 0.5}"