Soma Zero Tutorials
🔍 搜索功能尚未开启,敬请期待。

3.7 ROS2 Action 长时间任务与机械臂集成

一、本章目标

在前面的章节中,我们学习了 Topic(持续数据流)和 Service(一问一答)两种通信方式。但有些场景需要更复杂的交互模式

  • 控制机械臂运动到目标位置(需要知道进度,可能需要取消
  • 执行导航任务(需要持续反馈当前位置)
  • 执行复杂的抓取流程(多步骤任务,每步都需要状态更新)

这些长时间运行的任务用 Service 就不够了,因为:

  • Service 调用是阻塞的,在等待结果期间什么都做不了
  • 没有进度反馈机制
  • 无法中途取消任务

这就是 Action(动作) 通信模式的用武之地。

1.1 学习路线

  1. 理解 Action 的核心概念 - 为什么需要 Action?
  2. 自定义 Action 接口 - 定义 .action 文件
  3. 编写 Action Server - 处理长时间任务
  4. 编写 Action Client - 发送目标、接收反馈
  5. 机械臂通信实战 - 实现角度模式运动控制

1.2 本章目标

完成本章后,你将能够:

  • ✓ 理解 Action 的目标-反馈-结果通信模式
  • ✓ 创建自定义 .action 动作接口
  • ✓ 编写 Action Server 和 Client 节点
  • ✓ 实现机械臂角度模式运动控制

二、Action 核心概念

2.1 回顾三种通信方式

通信方式 特点 典型场景
Topic 单向、持续、异步 传感器数据流、关节状态发布
Service 双向、一次性、同步 读取配置、夹爪开关
Action 双向、持续反馈、可取消 运动控制、导航任务

2.2 Action 的工作原理

Action 采用目标-反馈-结果(Goal-Feedback-Result) 模式:

┌─────────────┐ ┌─────────────┐ │ Client │ ─── Goal Request ─────> │ Server │ │ (客户端) │ │ (服务端) │ │ │ <── Goal Response ───── │ │ │ │ │ │ │ │ <── Feedback ────────── │ (执行中...) │ │ │ <── Feedback ────────── │ │ │ │ <── Feedback ────────── │ │ │ │ │ │ │ │ <── Result ──────────── │ (完成) │ └─────────────┘ └─────────────┘

通信流程

  1. Client 发送目标(Goal):告诉 Server 要做什么
  2. Server 接受/拒绝目标:验证目标是否可执行
  3. Server 持续发送反馈(Feedback):报告执行进度
  4. Server 返回结果(Result):任务完成或失败的最终状态

2.3 外卖系统类比

继续用外卖系统来理解 Action:

通信方式 外卖类比 说明
Topic 骑手持续广播 GPS 单向推送
Service 顾客查询骑手位置 一问一答
Action 顾客下单 → 商家接单 → 骑手配送中(多次位置更新) → 送达 完整订单生命周期

Action 的订单流程

  • Goal:顾客下单(目标地址、菜品)
  • Goal Response:商家接单/拒单
  • Feedback:骑手实时位置更新(多次)
  • Result:订单完成(送达时间、签收状态)

2.4 机械臂场景对比

功能 通信方式 理由
发布关节角度 Topic 持续、高频发布
读取当前角度 Service 按需查询,立即返回
控制夹爪开合 Service 快速操作,需要结果反馈
运动到目标角度 Action 需要进度反馈、可能需要取消
执行轨迹运动 Action 长时间任务,需要实时反馈
直线运动到目标点 Action 多步骤任务,需要进度更新

2.5 Action 接口的结构

Action 接口由 .action 文件定义,包含三个部分,用 --- 分隔:

# 目标(Goal):Client 发送给 Server int32 order --- # 结果(Result):Server 返回给 Client(任务完成时) int32[] sequence --- # 反馈(Feedback):Server 持续发送给 Client(执行过程中) int32[] partial_sequence

对比 Service 的两部分结构

  • Service:请求 + 响应(2 部分)
  • Action:目标 + 结果 + 反馈(3 部分)

三、自定义 Action 接口

3.1 设计机械臂角度控制 Action

我们要实现一个角度模式运动控制的 Action,让机械臂运动到指定的 6 个关节角度。

需求分析

部分 内容 说明
Goal 目标角度数组、速度比例 告诉机械臂要去哪里
Result 是否成功、最终角度、执行时间 任务完成后的状态
Feedback 当前角度、进度百分比、已用时间 执行过程中的实时状态

3.2 创建 .action 文件

步骤 1:创建 action 目录

cd ~/ros2_ws/src/robot_arm_interfaces mkdir -p action

步骤 2:创建 AngleMode.action

nano action/AngleMode.action

写入以下内容:

# AngleMode.action # 机械臂角度模式运动控制 # ========== 目标部分(Goal)========== # 6个电机的目标角度(单位:度) float64[] angles # 动态数组,服务端验证长度必须为6 # 运动速度比例(0.0-1.0) float64 speed_ratio --- # ========== 结果部分(Result)========== # 运动是否成功 bool success # 错误信息或成功提示 string message # 运动耗时(秒) float64 execution_time # 最终达到的角度 float64[6] final_angles --- # ========== 反馈部分(Feedback)========== # 自开始以来经过的时间(秒) float64 elapsed_time # 运动进度(0.0-1.0) float64 progress # 当前电机角度 float64[6] current_angles # 可读的状态信息 string status_message

设计说明

字段 类型 说明
Goal(目标)
angles float64[6] 6 个电机的目标角度(度)
speed_ratio float64 运动速度比例 0.0-1.0
Result(结果)
success bool 运动是否成功
message string 错误信息或成功提示
execution_time float64 运动耗时(秒)
final_angles float64[6] 最终达到的角度
Feedback(反馈)
elapsed_time float64 经过的时间(秒)
progress float64 运动进度 0.0-1.0
current_angles float64[6] 当前电机角度
status_message string 可读的状态信息

3.3 修改 CMakeLists.txt

打开 ~/ros2_ws/src/robot_arm_interfaces/CMakeLists.txt,添加 .action 文件:

cmake_minimum_required(VERSION 3.8) project(robot_arm_interfaces) # 编译器警告选项(C++ 代码质量检查) if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") add_compile_options(-Wall -Wextra -Wpedantic) endif() # ========== 查找依赖包 ========== find_package(ament_cmake REQUIRED) # ROS2 CMake 构建工具 find_package(builtin_interfaces REQUIRED) # 内置接口类型(Time, Duration 等) # ========== 自定义消息生成 ========== find_package(rosidl_default_generators REQUIRED) # 消息代码生成器 # 声明要生成的消息文件(可以添加多个 .msg 文件) rosidl_generate_interfaces(${PROJECT_NAME} "msg/MotorAngles.msg" "srv/ServoGripper.srv" # 新增:舵机夹爪服务 "action/AngleMode.action" # 新增:角度模式动作 DEPENDENCIES builtin_interfaces ) # ========== 测试配置(可选)========== if(BUILD_TESTING) find_package(ament_lint_auto REQUIRED) # 以下行跳过版权检查(如果没有添加版权信息,可以取消注释) # set(ament_cmake_copyright_FOUND TRUE) # 以下行跳过 cpplint 检查(如果不是 git 仓库,可以取消注释) # set(ament_cmake_cpplint_FOUND TRUE) ament_lint_auto_find_test_dependencies() endif() # ========== 导出包信息 ========== ament_package()

3.5 编译并验证

cd ~/ros2_ws colcon build --packages-select robot_arm_interfaces source install/setup.bash # 验证 Action 接口 ros2 interface show robot_arm_interfaces/action/AngleMode

输出:

# AngleMode.action # 机械臂角度模式运动控制 # ========== 目标部分(Goal)========== # 6个电机的目标角度(单位:度) float64[6] angles # 运动速度比例(0.0-1.0) float64 speed_ratio --- # ========== 结果部分(Result)========== # 运动是否成功 bool success # 错误信息或成功提示 string message # 运动耗时(秒) float64 execution_time # 最终达到的角度 float64[6] final_angles --- # ========== 反馈部分(Feedback)========== # 自开始以来经过的时间(秒) float64 elapsed_time # 运动进度(0.0-1.0) float64 progress # 当前电机角度 float64[6] current_angles # 可读的状态信息 string status_message

成功! 自定义 Action 接口已经可以使用了。

四、编写 Action Server

4.1 Action Server 的核心职责

Action Server 负责:

  1. 接收目标:验证并决定是否接受
  2. 执行任务:实际控制机械臂运动
  3. 发布反馈:持续报告进度
  4. 返回结果:任务完成后的最终状态

4.2 编写角度模式 Action Server(模拟版)

我们将逐步构建 Action Server,每一步都添加新功能并说明必要性。

版本 0:最基础的框架

先创建最基础的 Action Server 框架,只接收目标但不做任何事情

创建文件 ~/ros2_ws/src/py_episode/py_episode/angle_mode_server.py

#!/usr/bin/env python3 """ 机械臂角度模式 Action Server(模拟版 - 版本 0) 最基础框架:只接收目标 """ import rclpy from rclpy.node import Node from rclpy.action import ActionServer from robot_arm_interfaces.action import AngleMode class AngleModeServer(Node): """角度模式动作服务器节点""" def __init__(self): super().__init__('angle_mode_server') # 创建 Action Server self._action_server = ActionServer( self, # 节点 AngleMode, # Action 类型 'angle_mode', # Action 名称 self.execute_callback # 执行回调函数 ) self.get_logger().info('角度模式 Action Server 已启动') def execute_callback(self, goal_handle): """ 执行回调函数 - 当目标被接受后执行 """ self.get_logger().info('接收到角度模式目标...') # 获取目标参数 target_angles = list(goal_handle.request.angles) speed_ratio = goal_handle.request.speed_ratio self.get_logger().info(f'目标角度: {target_angles}') self.get_logger().info(f'速度比例: {speed_ratio}') # 创建并返回结果(空的) result = AngleMode.Result() return result def main(args=None): rclpy.init(args=args) node = AngleModeServer() try: rclpy.spin(node) except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()

Action Server 业务流程图(版本 0)

Client Server │ │ │ ① 发送目标 (Goal Request) │ │ ─────────────────────────────────────────────> │ │ angles=[150,80,90,45,100,60] │ │ speed_ratio=0.5 │ │ │ │ ┌──────────┴──────────┐ │ │ execute_callback() │ │ │ ┌───────────────┐ │ │ │ │ 1. 获取目标参数 │ │ │ │ │ 打印日志 │ │ │ │ └───────────────┘ │ │ │ ↓ │ │ │ ┌───────────────┐ │ │ ② 返回结果 (Result) │ │ 2. 返回空结果 │ │ │ <───────────────────────────────────│ │ (未标记状态) │ │ │ success=false (默认) │ └───────────────┘ │ │ 状态: ABORTED └──────────┬──────────┘ │ │ ▼ ▼

说明:版本 0 只是最基础的框架,仅接收目标并打印,没有标记成功状态,
因此 ROS 2 默认将目标视为 ABORTED。

测试版本 0

# 终端 1:启动 Server python3 src/py_episode/py_episode/angle_mode_server.py # 终端 2:发送目标 ros2 action send_goal /angle_mode robot_arm_interfaces/action/AngleMode \ "{angles: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0], speed_ratio: 0.5}"

观察现象

  • Server 接收到目标并打印信息

    # server [INFO] [1768462342.655779639] [angle_mode_server]: 接收到角度模式目标... [INFO] [1768462342.656104552] [angle_mode_server]: 目标角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [1768462342.656420821] [angle_mode_server]: 速度比例: 0.5 [WARN] [1768462342.656854170] [angle_mode_server.action_server]: Goal state not set, assuming aborted. Goal ID: [195 169 41 254 153 129 74 163 175 150 10 173 15 204 203 117] # client Waiting for an action server to become available... Sending goal: angles: - 150.0 - 80.0 - 90.0 - 45.0 - 100.0 - 60.0 speed_ratio: 0.5 Goal accepted with ID: 35cdb3182da8481a9ea77aedcb39f0f4 Result: success: false message: '' execution_time: 0.0 final_angles: - 0.0 - 0.0 - 0.0 - 0.0 - 0.0 - 0.0 Goal finished with status: ABORTED
  • 目标立即完成,但状态显示为 ABORTED(中止)

问题:为什么是 ABORTED?因为我们没有明确告诉 Server 目标的最终状态

版本 1:标记目标成功

新增功能:调用 goal_handle.succeed() 明确标记目标成功。

修改 execute_callback

def execute_callback(self, goal_handle): """执行回调函数 - 版本 1""" self.get_logger().info('接收到角度模式目标...') # 获取目标参数 target_angles = list(goal_handle.request.angles) speed_ratio = goal_handle.request.speed_ratio self.get_logger().info(f'目标角度: {target_angles}') self.get_logger().info(f'速度比例: {speed_ratio}') # 【新增】标记目标成功 goal_handle.succeed() # 创建并返回结果 result = AngleMode.Result() result.success = True result.message = '目标已接收' return result

Action Server 业务流程图(版本 1)

Client Server │ │ │ ① 发送目标 (Goal Request) │ │ ─────────────────────────────────────────────> │ │ angles=[150,80,90,45,100,60] │ │ speed_ratio=0.5 │ │ │ │ ┌──────────┴──────────┐ │ │ execute_callback() │ │ │ ┌───────────────┐ │ │ │ │ 1. 获取目标参数 │ │ │ │ │ 打印日志 │ │ │ │ └───────────────┘ │ │ │ ↓ │ │ │ ┌───────────────┐ │ │ │ │ 2. 标记成功 │ │ │ │ │ succeed() ✓ │ │ │ │ └───────────────┘ │ │ │ ↓ │ │ │ ┌───────────────┐ │ │ ② 返回结果 (Result) │ │ 3. 返回结果 │ │ │ <───────────────────────────────────│ │ success=True │ │ │ success=true │ │ message=... │ │ │ message='目标已接收' │ └───────────────┘ │ │ 状态: SUCCEEDED ✓ └──────────┬──────────┘ │ │ ▼ ▼

版本 1 改进:相比版本 0,新增了 goal_handle.succeed() 调用,
明确告诉 ROS 2 目标已成功完成,状态从 ABORTED 变为 SUCCEEDED。

再次测试

ros2 action send_goal /angle_mode robot_arm_interfaces/action/AngleMode \ "{angles: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0], speed_ratio: 0.5}"

观察现象

  • 目标状态变为 SUCCEEDED(成功)

    # server [INFO] [1768462503.414477447] [angle_mode_server]: 角度模式 Action Server 已启动 [INFO] [1768462506.902255680] [angle_mode_server]: 接收到角度模式目标... [INFO] [1768462506.903605467] [angle_mode_server]: 目标角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [1768462506.904861474] [angle_mode_server]: 速度比例: 0.5 # client Waiting for an action server to become available... Sending goal: angles: - 150.0 - 80.0 - 90.0 - 45.0 - 100.0 - 60.0 speed_ratio: 0.5 Goal accepted with ID: be8ac3c601124e08a100cc0013ecb69d Result: success: true message: 目标已接收 execution_time: 0.0 final_angles: - 0.0 - 0.0 - 0.0 - 0.0 - 0.0 - 0.0 Goal finished with status: SUCCEEDED
  • 但还是立即完成,没有实际运动

必要性说明

  • goal_handle.succeed() - 标记目标成功
  • goal_handle.abort() - 标记目标失败
  • goal_handle.canceled() - 标记目标被取消

如果不调用这些方法,ROS 2 默认认为目标 ABORTED(执行异常)

版本 2:添加实际运动逻辑

新增功能

  1. 验证参数有效性
  2. 模拟运动过程(时间延迟)
  3. 计算并返回完整结果

完整代码:

#!/usr/bin/env python3 """ 机械臂角度模式 Action Server(模拟版 - 版本 2) 添加运动逻辑和参数验证 """ import time import rclpy from rclpy.node import Node from rclpy.action import ActionServer from robot_arm_interfaces.action import AngleMode class AngleModeServer(Node): """角度模式动作服务器节点""" def __init__(self): super().__init__('angle_mode_server') # 创建 Action Server self._action_server = ActionServer( self, AngleMode, 'angle_mode', self.execute_callback ) # 【新增】模拟当前关节角度 self.current_angles = [180.0, 90.0, 83.0, 30.0, 110.0, 30.0] self.get_logger().info('角度模式 Action Server 已启动') def execute_callback(self, goal_handle): """执行回调函数 - 版本 2""" self.get_logger().info('接收到角度模式目标...') # 获取目标参数 target_angles = list(goal_handle.request.angles) speed_ratio = goal_handle.request.speed_ratio self.get_logger().info(f'目标角度: {target_angles}') self.get_logger().info(f'速度比例: {speed_ratio}') # 【新增】验证参数 if len(target_angles) != 6: result = AngleMode.Result() result.success = False result.message = f'错误:需要6个关节角度,收到{len(target_angles)}个' result.execution_time = 0.0 result.final_angles = self.current_angles goal_handle.abort() # 标记失败 return result # 【新增】记录开始时间 start_time = time.time() # 【新增】计算预计执行时间(模拟真实运动) estimated_time = 3.0 self.get_logger().info(f'预计执行时间: {estimated_time:.2f}秒') # 【新增】模拟运动过程 time.sleep(estimated_time) # 【新增】更新当前角度 self.current_angles = target_angles.copy() # 标记成功 goal_handle.succeed() # 【新增】返回完整结果 result = AngleMode.Result() result.success = True result.message = '角度模式运动完成' result.execution_time = time.time() - start_time result.final_angles = self.current_angles.copy() self.get_logger().info(f'运动完成,耗时: {result.execution_time:.2f}秒') return result def main(args=None): rclpy.init(args=args) node = AngleModeServer() try: rclpy.spin(node) except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()

Action Server 业务流程图(版本 2)

Client Server │ │ │ ① 发送目标 (Goal Request) │ │ ─────────────────────────────────────────────> │ │ angles=[150,80,90,45,100,60] │ │ speed_ratio=0.5 │ │ │ │ ┌──────────┴──────────┐ │ │ execute_callback() │ │ │ ┌───────────────┐ │ │ │ │ 1. 获取目标参数 │ │ │ │ └───────────────┘ │ │ │ ↓ │ │ │ ┌───────────────┐ │ │ │ │ 2. 验证参数 ✓ │ │ │ │ │ (6个角度?) │ │ │ │ └───────────────┘ │ │ │ ↓ │ │ │ ┌───────────────┐ │ │ │ │ 3. 模拟运动 │ │ │ │ │ time.sleep() │ │ │ │ │ (3秒延迟) │ │ │ │ └───────────────┘ │ │ │ ↓ │ │ │ ┌───────────────┐ │ │ │ │ 4. 标记成功 │ │ │ │ │ succeed() ✓ │ │ │ │ └───────────────┘ │ │ │ ↓ │ │ ② 返回结果 (Result) │ ┌───────────────┐ │ │ <───────────────────────────────────│ │ 5. 返回完整结果│ │ │ success=true │ │ execution_time│ │ │ execution_time=3.0s │ │ final_angles │ │ │ final_angles=[...] │ └───────────────┘ │ │ 状态: SUCCEEDED ✓ └──────────┬──────────┘ │ │ ▼ ▼

版本 2 改进:相比版本 1,新增了参数验证、模拟运动延迟(time.sleep()),
以及返回完整结果(execution_timefinal_angles)。但仍无进度反馈。

测试版本 2

ros2 action send_goal /angle_mode robot_arm_interfaces/action/AngleMode \ "{angles: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0], speed_ratio: 0.5}"

观察现象

  • Server 打印 "预计执行时间"

  • 等待一段时间后才返回结果

  • 结果包含 execution_timefinal_angles

    [INFO] [1768462880.563510789] [angle_mode_server]: 接收到角度模式目标... [INFO] [1768462880.564861362] [angle_mode_server]: 目标角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [1768462880.566304870] [angle_mode_server]: 速度比例: 0.5 [INFO] [1768462880.567711707] [angle_mode_server]: 预计执行时间: 3.00秒 [INFO] [1768462883.569919271] [angle_mode_server]: 运动完成,耗时: 3.00秒

必要性说明

  • 参数验证:防止非法输入(如角度数量错误)
  • 时间计算:模拟真实机械臂运动的耗时
  • 完整结果:提供任务执行的详细信息
  • 顺便检查一下参数验证,只发送 3 个角度:

    # client请求 ros2 action send_goal /angle_mode robot_arm_interfaces/action/AngleMode \ "{angles: [150.0, 80.0, 90.0, 45.0, speed_ratio: 0.5}" # client输出 Waiting for an action server to become available... Sending goal: angles: - 150.0 - 80.0 - 90.0 speed_ratio: 0.5 Goal accepted with ID: aa39bd94e4c3463c9a8ee2b3cd9c76a2 Result: success: false message: 错误:需要6个关节角度,收到3个 execution_time: 0.0 final_angles: - 180.0 - 90.0 - 83.0 - 30.0 - 110.0 - 30.0 Goal finished with status: ABORTED

版本 3:添加进度反馈

新增功能:在运动过程中持续发布反馈,让 Client 知道当前进度。

修改 execute_callback(只展示新增部分):

def execute_callback(self, goal_handle): """执行回调函数 - 版本 3(完整版)""" self.get_logger().info('接收到角度模式目标...') # 获取目标参数 target_angles = list(goal_handle.request.angles) speed_ratio = goal_handle.request.speed_ratio self.get_logger().info(f'目标角度: {target_angles}') self.get_logger().info(f'速度比例: {speed_ratio}') # 验证参数 if len(target_angles) != 6: result = AngleMode.Result() result.success = False result.message = f'错误:需要6个关节角度,收到{len(target_angles)}个' result.execution_time = 0.0 result.final_angles = self.current_angles goal_handle.abort() return result # 记录开始时间 start_time = time.time() # 计算预计执行时间 estimated_time = 3.0 # 【新增】创建反馈消息 feedback_msg = AngleMode.Feedback() # 【新增】模拟运动过程(分10步,每步发布反馈) steps = 10 for i in range(steps + 1): # 计算当前进度 progress = i / steps # 【新增】模拟插值计算当前角度 for j in range(6): self.current_angles[j] = ( self.current_angles[j] * (1 - progress) + target_angles[j] * progress ) # 【新增】填充反馈消息 feedback_msg.elapsed_time = time.time() - start_time feedback_msg.progress = progress feedback_msg.current_angles = self.current_angles.copy() feedback_msg.status_message = f'运动中... {int(progress * 100)}%' # 【新增】发布反馈 goal_handle.publish_feedback(feedback_msg) self.get_logger().info( f'进度: {progress:.1%}, 角度: {[round(a, 1) for a in self.current_angles]}' ) # 模拟运动时间 time.sleep(estimated_time / steps) # 更新最终角度 self.current_angles = target_angles.copy() # 标记成功 goal_handle.succeed() # 返回结果 result = AngleMode.Result() result.success = True result.message = '角度模式运动完成' result.execution_time = time.time() - start_time result.final_angles = self.current_angles.copy() self.get_logger().info(f'运动完成,耗时: {result.execution_time:.2f}秒') return result

Action Server 业务流程图(版本 3)

Client Server │ │ │ ① 发送目标 (Goal Request) │ │ ─────────────────────────────────────────────> │ │ angles=[150,80,90,45,100,60] │ │ speed_ratio=0.5 │ │ │ │ ┌──────────┴──────────┐ │ │ execute_callback() │ │ │ ┌───────────────┐ │ │ │ │ 1. 获取目标参数 │ │ │ │ └───────────────┘ │ │ │ ↓ │ │ │ ┌───────────────┐ │ │ │ │ 2. 验证参数 │ │ │ │ └───────────────┘ │ │ │ ↓ │ │ │ ┌───────────────┐ │ │ ② 反馈 (Feedback) [多次] │ │ 3. 循环执行 │ │ │ <───────────────────────────────────│ │ ┌──────────┐ │ │ │ progress=10% │ │ │ 计算进度 │ │ │ │ <───────────────────────────────────│ │ │ 插值角度 │ │ │ │ progress=20% │ │ │ 发布反馈 │ │ │ │ <───────────────────────────────────│ │ │ sleep() │ │ │ │ ... │ │ └──────────┘ │ │ │ <───────────────────────────────────│ │ ↓ 重复10次 │ │ │ progress=100% │ └───────────────┘ │ │ │ ↓ │ │ │ ┌───────────────┐ │ │ │ │ 4. 标记成功 │ │ │ │ │ succeed() ✓ │ │ │ │ └───────────────┘ │ │ │ ↓ │ │ ③ 返回结果 (Result) │ ┌───────────────┐ │ │ <───────────────────────────────────│ │ 5. 返回完整结果│ │ │ success=true │ └───────────────┘ │ │ execution_time=3.0s └──────────┬──────────┘ │ 状态: SUCCEEDED ✓ │ ▼ ▼

版本 3 改进:相比版本 2,新增了进度反馈机制,通过循环分 10 步执行,
每步调用 goal_handle.publish_feedback() 向 Client 发送实时进度。
这是 Action 相比 Service 的核心优势!

测试版本 3(带反馈):

ros2 action send_goal --feedback /angle_mode robot_arm_interfaces/action/AngleMode \ "{angles: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0], speed_ratio: 0.5}"

观察现象

  • Server 每隔一段时间打印进度信息
  • Client 终端显示实时反馈:
# server [INFO] [1768463108.487006795] [angle_mode_server]: 接收到角度模式目标... [INFO] [1768463108.488352635] [angle_mode_server]: 目标角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [1768463108.489707628] [angle_mode_server]: 速度比例: 0.5 [INFO] [1768463108.491886186] [angle_mode_server]: 进度: 0.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [1768463108.794439967] [angle_mode_server]: 进度: 10.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [1768463109.096877635] [angle_mode_server]: 进度: 20.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [1768463109.399424508] [angle_mode_server]: 进度: 30.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [1768463109.701977627] [angle_mode_server]: 进度: 40.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [1768463110.004390988] [angle_mode_server]: 进度: 50.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [1768463110.306924879] [angle_mode_server]: 进度: 60.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [1768463110.609436269] [angle_mode_server]: 进度: 70.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [1768463110.912071532] [angle_mode_server]: 进度: 80.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [1768463111.214520061] [angle_mode_server]: 进度: 90.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [1768463111.517010608] [angle_mode_server]: 进度: 100.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [1768463111.819415956] [angle_mode_server]: 运动完成,耗时: 3.33秒 # client Feedback: elapsed_time: 2.420010566711426 progress: 0.8 current_angles: - 150.0 - 80.0 - 90.0 - 45.0 - 100.0 - 60.0 status_message: 运动中... 80% Feedback: elapsed_time: 2.722559690475464 progress: 0.9 current_angles: - 150.0 - 80.0 - 90.0 - 45.0 - 100.0 - 60.0 status_message: 运动中... 90%

必要性说明

  • 反馈机制是 Action 相比 Service 的核心优势
  • 让 Client 知道任务进展,可以显示进度条
  • 在真实场景中,反馈可以包含实时位置、速度、力矩等数据

版本 4(可选):支持取消目标

新增功能:检测 Client 是否请求取消,及时响应。

在循环中添加取消检测:

#!/usr/bin/env python3 """ 机械臂角度模式 Action Server(模拟版 - 版本 4) 支持取消目标 """ import time import rclpy from rclpy.node import Node from rclpy.action import ActionServer, CancelResponse, GoalResponse from rclpy.executors import MultiThreadedExecutor from rclpy.callback_groups import ReentrantCallbackGroup from robot_arm_interfaces.action import AngleMode class AngleModeServer(Node): """角度模式动作服务器节点""" def __init__(self): super().__init__('angle_mode_server') # 创建可重入回调组(允许并行处理回调) self._callback_group = ReentrantCallbackGroup() # 创建 Action Server(添加 cancel 回调) self._action_server = ActionServer( self, AngleMode, 'angle_mode', self.execute_callback, goal_callback=self.goal_callback, cancel_callback=self.cancel_callback, callback_group=self._callback_group ) # 模拟当前关节角度 self.current_angles = [180.0, 90.0, 83.0, 30.0, 110.0, 30.0] self.get_logger().info('角度模式 Action Server 已启动') def goal_callback(self, goal_request): """接收目标请求的回调""" self.get_logger().info('收到目标请求') return GoalResponse.ACCEPT def cancel_callback(self, goal_handle): """接收取消请求的回调 - 必须返回 ACCEPT 才能取消""" self.get_logger().info('收到取消请求,接受取消') return CancelResponse.ACCEPT def execute_callback(self, goal_handle): """执行回调函数 - 版本 3(完整版)""" self.get_logger().info('接收到角度模式目标...') # 获取目标参数 target_angles = list(goal_handle.request.angles) speed_ratio = goal_handle.request.speed_ratio self.get_logger().info(f'目标角度: {target_angles}') self.get_logger().info(f'速度比例: {speed_ratio}') # 验证参数 if len(target_angles) != 6: result = AngleMode.Result() result.success = False result.message = f'错误:需要6个关节角度,收到{len(target_angles)}个' result.execution_time = 0.0 result.final_angles = self.current_angles goal_handle.abort() return result # 记录开始时间 start_time = time.time() # 计算预计执行时间 estimated_time = 20.0 # 【新增】创建反馈消息 feedback_msg = AngleMode.Feedback() # 【新增】模拟运动过程(分10步,每步发布反馈) steps = 10 for i in range(steps + 1): # 【新增】检查是否被取消 if goal_handle.is_cancel_requested: goal_handle.canceled() result = AngleMode.Result() result.success = False result.message = '任务被取消' result.execution_time = time.time() - start_time result.final_angles = self.current_angles self.get_logger().info('目标被取消') return result # 计算当前进度 progress = i / steps # 【新增】模拟插值计算当前角度 for j in range(6): self.current_angles[j] = ( self.current_angles[j] * (1 - progress) + target_angles[j] * progress ) # 【新增】填充反馈消息 feedback_msg.elapsed_time = time.time() - start_time feedback_msg.progress = progress feedback_msg.current_angles = self.current_angles.copy() feedback_msg.status_message = f'运动中... {int(progress * 100)}%' # 【新增】发布反馈 goal_handle.publish_feedback(feedback_msg) self.get_logger().info( f'进度: {progress:.1%}, 角度: {[round(a, 1) for a in self.current_angles]}' ) # 模拟运动时间(分成小片段,频繁检查取消状态) sleep_time = estimated_time / steps sleep_interval = 0.1 # 每 0.1 秒检查一次 elapsed = 0.0 while elapsed < sleep_time: time.sleep(min(sleep_interval, sleep_time - elapsed)) elapsed += sleep_interval # 在 sleep 过程中也检查取消 if goal_handle.is_cancel_requested: break # 更新最终角度 self.current_angles = target_angles.copy() # 标记成功 goal_handle.succeed() # 返回结果 result = AngleMode.Result() result.success = True result.message = '角度模式运动完成' result.execution_time = time.time() - start_time result.final_angles = self.current_angles.copy() self.get_logger().info(f'运动完成,耗时: {result.execution_time:.2f}秒') return result def main(args=None): rclpy.init(args=args) node = AngleModeServer() # 使用多线程执行器,使取消回调能在执行时被处理 executor = MultiThreadedExecutor() executor.add_node(node) try: executor.spin() except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()

新增导入说明

导入项 作用
CancelResponse 取消回调的返回值类型(ACCEPTREJECT
GoalResponse 目标回调的返回值类型(ACCEPTREJECT
MultiThreadedExecutor 多线程执行器,允许在执行任务时处理取消请求
ReentrantCallbackGroup 可重入回调组,允许多个回调并行执行

三个回调函数的调用时序

Client 发送目标 │ ▼ ┌─────────────────┐ │ goal_callback() │ ← 第1步:决定是否接受目标 │ 返回 ACCEPT/- ACCEPT:目标被接受,进入执行 │ REJECT │ - REJECT:目标被拒绝,流程结束 └────────┬────────┘ │ ACCEPT ▼ ┌─────────────────┐ │execute_callback()│ ← 第2步:执行任务(长时间运行) │ - 发布反馈 │ │ - 检查取消 │ │ - 返回结果 │ └────────┬────────┘ │ │ ← 如果 Client 请求取消 ▼ ┌─────────────────┐ │cancel_callback()│ ← 第3步(可选):决定是否允许取消 │ 返回 ACCEPT/- ACCEPT:允许取消,execute_callback │ REJECT │ 中 is_cancel_requested=True └─────────────────┘ - REJECT:拒绝取消,继续执行

为什么需要 MultiThreadedExecutor + ReentrantCallbackGroup

问题:默认单线程执行器 ┌─────────────────────────────────────────┐ │ 单线程执行器 (默认) │ │ ┌─────────────────────────────────┐ │ │ │ execute_callback() 运行中 │ │ │ │ (阻塞整个线程) │ │ │ └─────────────────────────────────┘ │ │ │ │ cancel_callback() 无法执行! ❌ │ │ (必须等 execute_callback 完成) │ └─────────────────────────────────────────┘ 解决:多线程执行器 + 可重入回调组 ┌─────────────────────────────────────────┐ │ 多线程执行器 + 可重入回调组 │ │ ┌──────────────┐ ┌──────────────┐ │ │ │ 线程 1 │ │ 线程 2 │ │ │ │ execute_ │ │ cancel_ │ │ │ │ callback() │ │ callback() │ │ │ │ 运行中 │ │ 可以执行 ✓ │ │ │ └──────────────┘ └──────────────┘ │ └─────────────────────────────────────────┘

Action Server 业务流程图(版本 4 - 完整版)

Client Server │ │ │ ① 发送目标 (Goal Request) │ │ ─────────────────────────────────────────────> │ │ angles=[150,80,90,45,100,60] │ │ speed_ratio=0.5 │ │ ┌──────────┴──────────┐ │ │ goal_callback() │ │ │ 返回 ACCEPT ✓ │ │ └──────────┬──────────┘ │ │ │ ┌──────────┴──────────┐ │ │ execute_callback() │ │ │ ┌───────────────┐ │ │ │ │ 1. 获取并验证 │ │ │ │ └───────────────┘ │ │ │ ↓ │ │ ② 反馈 (Feedback) [多次] │ ┌───────────────┐ │ │ <───────────────────────────────────│ │ 2. 循环执行 │ │ │ progress=10% │ │ ┌────────────┐ │ │ │ │ │ │检查取消请求│ │ │ │ ③ 取消请求 (Cancel) [可选] │ │ │is_cancel_ │ │ │ │ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ > │ │ │requested? │ │ │ │ │ │ └─────┬──────┘ │ │ │ ┌─────────────│ │ │ │ │ │ │ │ │ 是 ↓ 否 │ │ │ ▼ │ │ ┌────┐ ┌────┐ │ │ │ cancel_callback() │ │ │取消│ │继续│ │ │ │ 返回 ACCEPT ✓ │ │ └────┘ └────┘ │ │ │ │ └───────────────┘ │ │ │ ↓ │ │ ④ 返回结果 (Result) │ ┌───────────────┐ │ │ <───────────────────────────────────│ │ 3. 标记状态 │ │ │ 成功: succeed() → SUCCEEDED │ │ succeed() 或 │ │ │ 取消: canceled() → CANCELED │ │ canceled() │ │ │ 失败: abort() → ABORTED │ └───────────────┘ │ │ └──────────┬──────────┘ ▼ ▼

版本 4 改进:相比版本 3,新增了取消支持

  • goal_callback():接收目标时决定是否接受
  • cancel_callback():接收取消请求时决定是否允许取消
  • is_cancel_requested:在循环中频繁检查取消状态
  • MultiThreadedExecutor:使取消回调能在执行时被处理

测试版本 4(带取消):

# 终端 1:启动 Server python3 src/py_episode/py_episode/angle_mode_server.py # 终端 2:发送目标 ros2 action send_goal --feedback /angle_mode robot_arm_interfaces/action/AngleMode \ "{angles: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0], speed_ratio: 0.5}" # 终端 3:在任务执行过程中按Ctrl+C取消 # 输出类似: Feedback: elapsed_time: 4.009111166000366 progress: 0.2 current_angles: - 154.7029248 - 81.56764160000002 - 88.90265088 - 42.6485376 - 101.56764160000002 - 55.29707520000001 status_message: 运动中... 20% ^CCanceling goal... Goal canceled. Result: success: false message: 任务被取消 execution_time: 5.013190507888794 final_angles: - 154.7029248 - 81.56764160000002 - 88.90265088 - 42.6485376 - 101.56764160000002 - 55.29707520000001 Goal finished with status: CANCELED

必要性说明

  • 在真实场景中,用户可能需要紧急停止机械臂运动
  • 检测取消请求并快速响应是安全性的重要保障

版本演进总结

版本 新增功能 核心知识点 流程图变化
版本 0 基础框架 ActionServer 创建、execute_callback 结构 仅接收目标 → 返回空结果(ABORTED)
版本 1 标记成功 goal_handle.succeed() / .abort() 新增状态标记 → 返回成功(SUCCEEDED)
版本 2 运动逻辑 参数验证、time.sleep() 模拟、完整结果 新增验证 + 延迟 → 返回 execution_time
版本 3 进度反馈 goal_handle.publish_feedback() 新增反馈循环 ← 多次 Feedback 箭头
版本 4 取消支持 cancel_callbackis_cancel_requestedMultiThreadedExecutor 新增取消分支 → CANCELED 状态

五、编写 Action Client

5.1 Action Client 的核心职责

Action Client 负责:

  1. 发送目标:告诉 Server 要执行的任务
  2. 处理接受/拒绝:目标是否被接受
  3. 接收反馈:处理执行过程中的进度更新
  4. 获取结果:任务完成后的最终状态

5.2 编写角度模式 Action Client

创建文件 ~/ros2_ws/src/py_episode/py_episode/angle_mode_client.py

#!/usr/bin/env python3 """ 机械臂角度模式 Action Client(完整版) 功能:发送目标、接收反馈、获取结果、支持取消 """ import sys import rclpy from rclpy.node import Node from rclpy.action import ActionClient from robot_arm_interfaces.action import AngleMode class AngleModeClient(Node): """角度模式动作客户端节点""" def __init__(self): super().__init__('angle_mode_client') # 创建 Action Client self._action_client = ActionClient( self, # 节点 AngleMode, # Action 类型 'angle_mode' # Action 名称(必须与 Server 一致) ) # 用于跟踪目标句柄(取消功能需要) self._goal_handle = None self.get_logger().info('角度模式 Action Client 已创建') def send_goal(self, angles, speed_ratio=0.5): """发送角度模式目标""" self.get_logger().info('等待 Action Server...') self._action_client.wait_for_server() # 创建目标消息 goal_msg = AngleMode.Goal() goal_msg.angles = angles goal_msg.speed_ratio = speed_ratio self.get_logger().info(f'发送目标: angles={angles}, speed_ratio={speed_ratio}') # 异步发送目标,注册反馈回调 self._send_goal_future = self._action_client.send_goal_async( goal_msg, feedback_callback=self.feedback_callback # 反馈回调 ) # 注册目标响应回调 self._send_goal_future.add_done_callback(self.goal_response_callback) def goal_response_callback(self, future): """ 目标响应回调 - 当 Server 接受/拒绝目标时调用 """ self._goal_handle = future.result() if not self._goal_handle.accepted: self.get_logger().error('✗ 目标被拒绝') rclpy.shutdown() return self.get_logger().info('✓ 目标被接受,等待结果...') # 异步获取结果 self._get_result_future = self._goal_handle.get_result_async() self._get_result_future.add_done_callback(self.get_result_callback) def feedback_callback(self, feedback_msg): """ 反馈回调 - 接收 Server 发送的进度反馈(多次调用) """ feedback = feedback_msg.feedback # 显示进度条 progress_bar = self._create_progress_bar(feedback.progress) self.get_logger().info( f'反馈: {progress_bar} {feedback.progress*100:.0f}% | ' f'已用时: {feedback.elapsed_time:.1f}s | ' f'{feedback.status_message}' ) def get_result_callback(self, future): """ 结果回调 - 当任务完成时调用 """ result = future.result().result self.get_logger().info('='*50) if result.success: self.get_logger().info('✓ 任务成功!') self.get_logger().info(f' 消息: {result.message}') self.get_logger().info(f' 执行时间: {result.execution_time:.2f} 秒') self.get_logger().info(f' 最终角度: {list(result.final_angles)}') else: self.get_logger().error('✗ 任务失败!') self.get_logger().error(f' 消息: {result.message}') self.get_logger().info('='*50) # 任务完成,关闭节点 rclpy.shutdown() def _create_progress_bar(self, progress, width=20): """创建文本进度条""" filled = int(width * progress) bar = '█' * filled + '░' * (width - filled) return f'[{bar}]' def main(args=None): rclpy.init(args=args) node = AngleModeClient() # 从命令行获取目标角度 if len(sys.argv) >= 7: try: angles = [float(sys.argv[i]) for i in range(1, 7)] except ValueError: print(f'错误: 角度参数必须是数字') sys.exit(1) else: # 默认值 angles = [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] # 解析速度比例(可选) speed_ratio = 0.5 if len(sys.argv) >= 8: try: speed_ratio = float(sys.argv[7]) if not (0.0 < speed_ratio <= 1.0): print(f'警告: 速度比例应在 0.0-1.0 之间,已自动限制') speed_ratio = max(0.1, min(speed_ratio, 1.0)) except ValueError: print(f'警告: 无法解析速度比例,使用默认值 0.5') try: # 发送目标 node.send_goal(angles, speed_ratio) # 保持节点运行直到完成 rclpy.spin(node) except KeyboardInterrupt: # 用户按 Ctrl+C 取消 node.get_logger().warn('\n检测到 Ctrl+C,正在取消任务...') if node._goal_handle is not None: # 发送取消请求 cancel_future = node._goal_handle.cancel_goal_async() # 等待取消完成 rclpy.spin_until_future_complete(node, cancel_future, timeout_sec=2.0) if cancel_future.result(): node.get_logger().info('取消请求已发送') else: node.get_logger().error('取消请求发送失败') finally: if rclpy.ok(): node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()

Action Client 回调函数的调用时序

Client 函数一览表

函数名 类型 功能 何时注册 何时触发
send_goal() 主动调用 发送目标到 Server - 用户调用
goal_response_callback 回调函数 处理目标接受/拒绝响应 send_goal_async().add_done_callback() Server 响应目标请求时(1 次)
feedback_callback 回调函数 处理执行过程中的进度反馈 send_goal_async(feedback_callback=...) Server 每次发布反馈时(多次)
get_result_callback 回调函数 处理任务完成后的最终结果 get_result_async().add_done_callback() Server 返回最终结果时(1 次)

回调注册与触发时序图

Client Server │ │ │ ══════════════════════════════════════════════ │ │ ║ send_goal() 中完成两个回调的注册: ║ │ │ ║ 1. feedback_callback (在 send_goal_async) ║ │ │ ║ 2. goal_response_callback (add_done_callback)║ │ │ ══════════════════════════════════════════════ │ │ │ │ ① send_goal_async() 发送目标 │ │ ───────────────────────────────────────────────> │ │ goal_msg.angles=[150,80,90,45,100,60] │ │ goal_msg.speed_ratio=0.5 │ │ ┌─────────────────────────────────────────┐ │ │ │ 【注册】feedback_callback │ │ │ │ 【注册】goal_response_callback │ │ │ └─────────────────────────────────────────┘ │ │ │ │ ┌──────────┴──────────┐ │ │ goal_callback() │ │ │ 返回 ACCEPT ✓ │ │ └──────────┬──────────┘ │ │ │ ② Goal Response(目标接受/拒绝) │ │ <─────────────────────────────────────────────── │ │ │ ┌─┴────────────────────────┐ │ │ goal_response_callback() │ ← 【触发】第1个回调 │ │ ┌──────────────────────┐ │ │ │ │ 检查 goal_handle │ │ │ │ │ .accepted │ │ │ │ │ - True: 继续 │ │ │ │ │ - False: 结束 │ │ │ │ └──────────────────────┘ │ │ │ ↓ │ │ │ ┌──────────────────────┐ │ │ │ │ get_result_async() │ │ │ │ │ ┌──────────────────┐ │ │ │ │ │ │【注册】 │ │ │ │ │ │ │get_result_callback│ │ │ │ │ │ └──────────────────┘ │ │ │ │ └──────────────────────┘ │ │ └─┬────────────────────────┘ ┌──────────┴──────────┐ │ │ execute_callback() │ │ │ 开始执行任务 │ │ └──────────┬──────────┘ │ │ │ ③ Feedback(反馈消息)[多次] │ │ <─────────────────────────────────────────────── │ │ progress=0%, elapsed_time=0.0s │ │ │ ┌─┴────────────────────────┐ │ │ feedback_callback() │ ← 【触发】第2个回调(多次)│ │ ┌──────────────────────┐ │ │ │ │ 获取 feedback 数据 │ │ │ │ │ - progress │ │ │ │ │ - elapsed_time │ │ │ │ │ - current_angles │ │ │ │ │ - status_message │ │ │ │ └──────────────────────┘ │ │ │ ↓ │ │ │ ┌──────────────────────┐ │ │ │ │ 显示进度信息 │ │ │ │ │ (如: 进度=10%) │ │ │ │ └──────────────────────┘ │ │ └─┬────────────────────────┘ │ │ │ │ <─────────────────────────────────────────────── │ │ progress=10%, elapsed_time=0.3s │ │ │ ┌─┴────────────────────────┐ │ │ feedback_callback() │ ← 【触发】再次 │ └─┬────────────────────────┘ │ │ ... │ │ <─────────────────────────────────────────────── │ │ progress=100%, elapsed_time=3.0s │ │ │ ┌─┴────────────────────────┐ │ │ feedback_callback() │ ← 【触发】最后一次反馈 │ └─┬────────────────────────┘ ┌──────────┴──────────┐ │ │ succeed() 或 │ │ │ canceled() 或 │ │ │ abort() │ │ └──────────┬──────────┘ │ │ │ ④ Result(最终结果) │ │ <─────────────────────────────────────────────── │ │ success=true, message="完成" │ │ execution_time=3.3s │ │ final_angles=[150,80,90,45,100,60] │ │ │ ┌─┴────────────────────────┐ │ │ get_result_callback() │ ← 【触发】第3个回调 │ │ ┌──────────────────────┐ │ │ │ │ 获取 result 数据 │ │ │ │ │ - success │ │ │ │ │ - message │ │ │ │ │ - execution_time │ │ │ │ │ - final_angles │ │ │ │ └──────────────────────┘ │ │ │ ↓ │ │ │ ┌──────────────────────┐ │ │ │ │ 处理最终结果 │ │ │ │ │ 成功: 打印结果 │ │ │ │ │ 失败: 显示错误 │ │ │ │ └──────────────────────┘ │ │ │ ↓ │ │ │ ┌──────────────────────┐ │ │ │ │ rclpy.shutdown() │ │ │ │ │ 结束节点 │ │ │ │ └──────────────────────┘ │ │ └─┬────────────────────────┘ │ │ │ ▼ ▼

为什么 Action Client 设计得这么复杂?

你可能会问:为什么需要这么多回调函数?直接 send_goal() 然后等结果不行吗?

答案是:为了不阻塞!

方式 代码 问题
同步阻塞 result = client.send_goal(goal) 等待期间什么都做不了,无法接收反馈、无法取消
异步回调 send_goal_async() + 回调 发送后立即返回,通过回调处理各阶段事件

类比:点外卖

同步阻塞(不推荐): 你: 下单 → 站在门口等 → 等 → 等 → 骑手到了 → 拿到外卖 问题: 等待期间你什么都干不了,也不知道骑手到哪了 异步回调(Action 方式): 你: 下单 → 继续做自己的事 ↓ [商家接单通知] → goal_response_callback() → "好的,已接单" ↓ [骑手位置更新] → feedback_callback() → "骑手还有2公里" [骑手位置更新] → feedback_callback() → "骑手已到小区" ↓ [送达通知] → get_result_callback() → "外卖到了!"

三个回调对应三个阶段

阶段 回调函数 外卖类比 必要性
1. 目标响应 goal_response_callback 商家是否接单 知道任务是否开始
2. 过程反馈 feedback_callback 骑手实时位置 了解进度,可决定是否取消
3. 最终结果 get_result_callback 外卖送达确认 获取任务结果

总结:复杂的回调设计是为了实现非阻塞的长时间任务管理
这样 Client 在等待过程中可以:

  • 继续处理其他消息
  • 实时显示进度
  • 随时取消任务
  • 响应用户交互

代码改进说明

特性 改进内容 说明
进度条显示 _create_progress_bar() 可视化进度,比数字更直观
目标句柄跟踪 self._goal_handle = None 保存目标句柄,用于取消功能
取消支持 KeyboardInterrupt + cancel_goal_async() 按 Ctrl+C 取消任务
参数验证 try-except 处理 友好的错误提示
清晰输出 ✓/✗ 符号、分隔线 提升用户体验

测试 Action Client

# 终端 1:启动 Server python3 src/py_episode/py_episode/angle_mode_server.py # 终端 2:启动 Client python3 src/py_episode/py_episode/angle_mode_client.py 150.0 80.0 90.0 45.0 100.0 60.0 0.5 # 在运行过程中按 Ctrl+C 测试取消功能

运行效果

[INFO] [angle_mode_client]: 角度模式 Action Client 已创建 [INFO] [angle_mode_client]: 等待 Action Server... [INFO] [angle_mode_client]: 发送目标: angles=[150.0, 80.0, 90.0, 45.0, 100.0, 60.0], speed_ratio=0.5 [INFO] [angle_mode_client]: ✓ 目标被接受,等待结果... [INFO] [angle_mode_client]: 反馈: [██░░░░░░░░░░░░░░░░░░] 10% | 已用时: 0.3s | 运动中... 10% [INFO] [angle_mode_client]: 反馈: [████░░░░░░░░░░░░░░░░] 20% | 已用时: 0.6s | 运动中... 20% ... [INFO] [angle_mode_client]: 反馈: [████████████████████] 100% | 已用时: 3.0s | 运动中... 100% [INFO] [angle_mode_client]: ================================================== [INFO] [angle_mode_client]: ✓ 任务成功! [INFO] [angle_mode_client]: 消息: 角度模式运动完成 [INFO] [angle_mode_client]: 执行时间: 3.30 秒 [INFO] [angle_mode_client]: 最终角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [angle_mode_client]: ==================================================

测试取消功能(在运行过程中按 Ctrl+C):

[INFO] [angle_mode_client]: 反馈: [████░░░░░░░░░░░░░░░░] 20% | 已用时: 0.6s | 运动中... 20% ^C [WARN] [angle_mode_client]: 检测到 Ctrl+C,正在取消任务... [INFO] [angle_mode_client]: 取消请求已发送 [INFO] [angle_mode_client]: ================================================== [ERROR] [angle_mode_client]: ✗ 任务失败! [ERROR] [angle_mode_client]: 消息: 任务被取消 [INFO] [angle_mode_client]: ==================================================

六、配置并运行

6.1 添加依赖

编辑 ~/ros2_ws/src/py_episode/package.xml,添加依赖:

<exec_depend>rclpy</exec_depend> <exec_depend>std_msgs</exec_depend> <exec_depend>sensor_msgs</exec_depend> <exec_depend>robot_arm_interfaces</exec_depend> <exec_depend>rclpy_action</exec_depend> <!-- Action 支持 -->

6.2 添加入口点

编辑 ~/ros2_ws/src/py_episode/setup.py

entry_points={ 'console_scripts': [ 'simple_pub = py_episode.simple_publisher:main', 'simple_sub = py_episode.simple_subscriber:main', 'joint_pub = py_episode.joint_state_publisher:main', 'joint_sub = py_episode.joint_state_subscriber:main', 'motor_pub = py_episode.motor_angles_publisher:main', 'motor_sub = py_episode.motor_angles_subscriber:main', 'simple_srv = py_episode.simple_service:main', 'simple_cli = py_episode.simple_client:main', 'gripper_srv = py_episode.gripper_service:main', 'gripper_cli = py_episode.gripper_client:main', 'angle_srv = py_episode.angle_mode_server:main', # 新增 'angle_cli = py_episode.angle_mode_client:main', # 新增 ], },

6.3 构建并运行

cd ~/ros2_ws colcon build --packages-select py_episode --symlink-install source install/setup.bash # 终端 1:启动 Action Server ros2 run py_episode angle_srv # 终端 2:启动 Action Client ros2 run py_episode angle_cli 150.0 80.0 90.0 45.0 100.0 60.0 0.5

6.4 运行效果

Server 输出

[INFO] [angle_mode_server]: 角度模式 Action Server 已启动 [INFO] [angle_mode_server]: 接收到角度模式目标... [INFO] [angle_mode_server]: 目标角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [angle_mode_server]: 速度比例: 0.5 [INFO] [angle_mode_server]: 进度: 0.0%, 角度: [180.0, 90.0, 83.0, 30.0, 110.0, 30.0] [INFO] [angle_mode_server]: 进度: 10.0%, 角度: [177.0, 89.0, 83.7, 31.5, 109.0, 33.0] ... [INFO] [angle_mode_server]: 进度: 100.0%, 角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [angle_mode_server]: 运动完成,耗时: 0.67秒

Client 输出

[INFO] [angle_mode_client]: 角度模式 Action Client 已创建 [INFO] [angle_mode_client]: 等待 Action Server... [INFO] [angle_mode_client]: 发送目标: angles=[150.0, 80.0, 90.0, 45.0, 100.0, 60.0], speed_ratio=0.5 [INFO] [angle_mode_client]: 目标被接受,等待结果... [INFO] [angle_mode_client]: 反馈: 进度=0.0%, 已用时间=0.00s, 状态="运动中... 0%" [INFO] [angle_mode_client]: 反馈: 进度=10.0%, 已用时间=0.07s, 状态="运动中... 10%" ... [INFO] [angle_mode_client]: 任务成功! 消息: 角度模式运动完成 执行时间: 0.67秒 最终角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0]

6.5 使用命令行工具调试

# 查看 Action 列表 ros2 action list # 输出: /angle_mode # 查看 Action 类型 ros2 action info /angle_mode # 输出: # Action: /angle_mode # Action clients: 1 # Action servers: 1 # 命令行发送目标(带反馈) ros2 action send_goal --feedback /angle_mode robot_arm_interfaces/action/AngleMode \ "{angles: [160.0, 85.0, 80.0, 40.0, 105.0, 50.0], speed_ratio: 0.8}"

七、真实机械臂集成

7.1 理解真实场景的差异

模拟版和真实版的主要区别:

项目 模拟版 真实版
角度读取 内存变量 CAN 总线通信
运动控制 time.sleep() 电机控制 API
反馈数据 计算插值 读取实际传感器
错误处理 简单 需要处理通信超时、电机故障等

7.2 整体架构

在上一章(Service)的代码基础上,我们添加 Action Server 功能,实现一个同时支持 Topic、Service、Action 的完整机械臂接口节点

┌─────────────────────────────────────────────────┐ │ RobotInterfaceNode │ │ ┌──────────────────────────────────────────┐ │ │ │ Topic Publisher │ │ │ │ ┌─────────────┐ │ │ │ │ │ /motor_ │ 30Hz 发布电机角度 │ │ │ │ │ angles │ │ │ │ │ └──────┬──────┘ │ │ │ └─────────┼────────────────────────────────┘ │ │ │ │ │ ┌─────────┼────────────────────────────────┐ │ │ │ │ Service Server │ │ │ │ │ ┌─────────────────┐ │ │ │ │ │ │ /servo_gripper │ │ │ │ │ │ │ 舵机夹爪控制 │ │ │ │ │ │ └────────┬────────┘ │ │ │ └─────────┼─────────────┼──────────────────┘ │ │ │ │ │ │ ┌─────────┼─────────────┼──────────────────┐ │ │ │ │ Action Server (新增) │ │ │ │ │ ┌─────────────────┐ │ │ │ │ │ │ /angle_mode │ │ │ │ │ │ │ 角度模式运动 │ │ │ │ │ │ └────────┬────────┘ │ │ │ └─────────┼─────────────┼──────────────────┘ │ │ │ │ │ │ └──────┬──────┘ │ │ │ │ │ ┌──────▼──────┐ │ │ │ MotorControl │ │ │ │ (CAN 通信) │ │ │ └─────────────┘ │ └─────────────────────────────────────────────────┘

7.3 更新机械臂接口节点

在上一章的代码基础上,添加角度模式 Action Server:

创建文件 ~/ros2_ws/src/py_episode/py_episode/robot_interface_with_action.py

#!/usr/bin/env python3 """ 机械臂接口节点 - 初始化 + 发布电机角度 + 舵机夹爪服务 + 角度模式动作 在上一章基础上新增 Action Server 功能 演示 Topic + Service + Action 三种通信方式的综合应用 """ import time import rclpy from rclpy.node import Node from rclpy.action import ActionServer, CancelResponse, GoalResponse # 新增:取消和目标响应 from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.executors import MultiThreadedExecutor from robot_arm_interfaces.msg import MotorAngles # Topic 消息 from robot_arm_interfaces.srv import ServoGripper # Service 接口 from robot_arm_interfaces.action import AngleMode # Action 接口(新增) # 导入电机控制核心库 from episode_controller_core.controller_core import MotorControl class RobotInterfaceNode(Node): """机械臂接口节点:Topic + Service + Action 完整演示""" def __init__(self): super().__init__('robot_interface_node') # ========== 新增:可重入回调组(支持 Action 取消) ========== self.callback_group = ReentrantCallbackGroup() # ========== 配置参数 ========== self.declare_parameter('usb_index', 1) self.declare_parameter('init_mode', 1) self.declare_parameter('publish_rate', 30.0) self.usb_index = self.get_parameter('usb_index').get_parameter_value().integer_value self.init_mode = self.get_parameter('init_mode').get_parameter_value().integer_value self.publish_rate = self.get_parameter('publish_rate').get_parameter_value().double_value self.home_position = [180, 90, 83, 30, 110, 30] self.calibration_timeout = 120.0 self.cached_motor_angles = [0.0] * 6 # ========== 初始化电机控制 ========== self.motor_control = None self._initialize_robot() # ========== 创建 Topic Publisher ========== self.publisher = self.create_publisher( MotorAngles, 'motor_angles', 10 ) self.get_logger().info('Topic 发布器已创建: /motor_angles') # ========== 创建 Service Server(舵机夹爪) ========== self.servo_gripper_srv = self.create_service( ServoGripper, 'servo_gripper', self.servo_gripper_callback, callback_group=self.callback_group ) self.get_logger().info('Service 服务已创建: /servo_gripper') # ========== 新增:创建 Action Server(角度模式,支持取消) ========== self.angle_mode_action_server = ActionServer( self, AngleMode, # Action 类型 'angle_mode', # Action 名称 self.angle_mode_callback, # 执行回调 callback_group=self.callback_group, # 使用可重入回调组 goal_callback=self.goal_callback, # 目标接收回调(新增) cancel_callback=self.cancel_callback # 取消请求回调(新增) ) self.get_logger().info('Action 服务已创建: /angle_mode (支持取消)') # ========== 创建定时器 ========== timer_period = 1.0 / self.publish_rate self.timer = self.create_timer(timer_period, self.publish_motor_angles) self.get_logger().info('='*50) self.get_logger().info('机械臂接口节点已启动') self.get_logger().info(f' - Topic: /motor_angles ({self.publish_rate} Hz)') self.get_logger().info(f' - Service: /servo_gripper') self.get_logger().info(f' - Action: /angle_mode') self.get_logger().info('='*50) # ==================== 初始化相关方法 ==================== def _initialize_robot(self): """初始化机械臂""" mode_name = "回零校准" if self.init_mode == 0 else "从当前位置恢复" self.get_logger().info(f'开始初始化机械臂,usb_index={self.usb_index}, init_mode={self.init_mode} ({mode_name})') self.motor_control = MotorControl(self.usb_index) if not self.motor_control.initialized: self.get_logger().error('CAN 设备初始化失败') raise RuntimeError('CAN 设备初始化失败') self.get_logger().info('CAN 通信初始化成功') if self.init_mode == 1: self._recover_from_current() else: self._perform_calibration() def _recover_from_current(self): """从当前位置恢复""" self.get_logger().info('从当前位置恢复中...') current_angles = self.motor_control.read_motor_angles() if not current_angles: raise RuntimeError('无法读取电机角度') self.motor_control.last_degrees = [ current_angles[i] * self.motor_control.motor_ratios[i] for i in range(6) ] self.cached_motor_angles = list(current_angles) self.get_logger().info(f'成功从当前位置恢复: {current_angles}') def _perform_calibration(self): """执行回零校准""" self.get_logger().info('开始回零校准流程...') for joint_index, addr in enumerate(self.motor_control.motor_address): self.get_logger().info(f'正在校准关节 {joint_index + 1}/6 ...') self.motor_control.trigger_home(addr, 0x02) time.sleep(0.05) start_time = time.perf_counter() while True: if self.motor_control.check_home_status(addr): self.get_logger().info(f'关节 {joint_index + 1} 校准完成') break if time.perf_counter() - start_time > self.calibration_timeout: raise RuntimeError(f'关节 {joint_index + 1} 校准超时') time.sleep(0.5) self.get_logger().info(f'校准完成,正在移动到 Home 位置: {self.home_position}') time.sleep(1.0) move_time = self.motor_control.execute_motors_degrees_normal(self.home_position) time.sleep(move_time) self.cached_motor_angles = list(self.home_position) self.get_logger().info('机械臂初始化完成') # ==================== Topic 发布 ==================== def publish_motor_angles(self): """定时器回调:发布电机角度""" if self.motor_control is None: return msg = MotorAngles() msg.timestamp = self.get_clock().now().to_msg() try: angles = self.motor_control.read_motor_angles() if angles and all(angle is not None for angle in angles): msg.angles = angles msg.success = True msg.message = "读取成功" self.cached_motor_angles = list(angles) else: msg.angles = self.cached_motor_angles msg.success = False msg.message = "读取失败" except Exception as e: msg.angles = self.cached_motor_angles msg.success = False msg.message = f"读取异常: {str(e)}" self.publisher.publish(msg) # ==================== Service 回调 ==================== def servo_gripper_callback(self, request, response): """舵机夹爪服务回调函数""" try: if self.motor_control is None: response.success = False response.message = "电机控制未初始化" response.actual_angle = 0 return response target_angle = request.angle clamped_angle = max(0, min(target_angle, 110)) self.motor_control.servo_gripper(clamped_angle) response.success = True response.actual_angle = clamped_angle if target_angle != clamped_angle: response.message = f'角度已限制: 请求 {target_angle}° -> 实际 {clamped_angle}°' else: response.message = f'夹爪已设置到 {clamped_angle}°' self.get_logger().info(f'[Service] {response.message}') except Exception as e: response.success = False response.message = f'控制失败: {str(e)}' response.actual_angle = 0 self.get_logger().error(f'[Service] {response.message}') return response # ==================== 新增:Action 回调 ==================== def goal_callback(self, goal_request): """ 目标接收回调:决定是否接受目标 参数: goal_request: 目标请求(包含 angles 和 speed_ratio) 返回: GoalResponse.ACCEPT 或 GoalResponse.REJECT """ self.get_logger().info('[Action] 收到目标请求') # 可以在这里添加目标验证逻辑 angles = goal_request.angles if len(angles) != 6: self.get_logger().warn(f'[Action] 拒绝目标:角度数量错误(期望6个,收到{len(angles)}个)') return GoalResponse.REJECT # 接受目标 return GoalResponse.ACCEPT def cancel_callback(self, goal_handle): """ 取消请求回调:决定是否允许取消 参数: goal_handle: 目标句柄 返回: CancelResponse.ACCEPT 或 CancelResponse.REJECT """ self.get_logger().info('[Action] 收到取消请求,准备紧急停止') # 触发紧急停止 if self.motor_control is not None: try: self.motor_control.set_emergency_stop(active=True) self.get_logger().warn('[Action] 已触发紧急停止') # 更新电机控制器的角度记录,确保下次运动计算准确 # 使用缓存的角度(Topic 持续更新) self.motor_control.last_degrees = [ self.cached_motor_angles[i] * self.motor_control.motor_ratios[i] for i in range(6) ] self.get_logger().info('[Action] 已更新电机角度记录') except Exception as e: self.get_logger().error(f'[Action] 紧急停止失败: {str(e)}') # 接受取消请求 return CancelResponse.ACCEPT def angle_mode_callback(self, goal_handle): """ 角度模式动作回调函数 参数: goal_handle: 目标句柄,包含请求参数和状态控制方法 返回: AngleMode.Result: 执行结果 """ self.get_logger().info('[Action] 接收到角度模式目标...') # 创建结果消息 result = AngleMode.Result() # 检查电机控制是否已初始化 if self.motor_control is None: result.success = False result.message = "电机控制未初始化" result.execution_time = 0.0 result.final_angles = self.cached_motor_angles self.get_logger().error(f'[Action] {result.message}') goal_handle.abort() return result # 获取目标参数 angles = list(goal_handle.request.angles) speed_ratio = goal_handle.request.speed_ratio # 参数验证 if len(angles) != 6: result.success = False result.message = f"角度数量错误: 期望 6 个,实际 {len(angles)} 个" result.execution_time = 0.0 result.final_angles = self.cached_motor_angles self.get_logger().error(f'[Action] {result.message}') goal_handle.abort() return result if not (0.0 < speed_ratio <= 1.0): speed_ratio = max(0.1, min(speed_ratio, 1.0)) self.get_logger().warn(f'[Action] 速度比例已限制为: {speed_ratio}') self.get_logger().info(f'[Action] 目标角度: {angles}') self.get_logger().info(f'[Action] 速度比例: {speed_ratio}') start_time = time.perf_counter() try: # 调用电机控制 API 执行运动 execution_time = float(self.motor_control.execute_motors_degrees_normal( angles, speed_ratio )) self.get_logger().info(f'[Action] 预计执行时间: {execution_time:.2f} 秒') # 发送进度反馈 steps = 10 for i in range(steps + 1): # 检查是否请求取消 if goal_handle.is_cancel_requested: # 解除紧急停止,允许后续运动命令 if self.motor_control is not None: try: self.motor_control.set_emergency_stop(active=False) self.get_logger().info('[Action] 已解除紧急停止') except Exception as e: self.get_logger().error(f'[Action] 解除紧急停止失败: {str(e)}') result.success = False result.message = "任务被取消" result.execution_time = float(time.perf_counter() - start_time) result.final_angles = self.cached_motor_angles self.get_logger().warn(f'[Action] {result.message}') goal_handle.canceled() return result # 创建并发布反馈 feedback_msg = AngleMode.Feedback() feedback_msg.elapsed_time = float(time.perf_counter() - start_time) feedback_msg.progress = float(i / steps) feedback_msg.current_angles = self.cached_motor_angles feedback_msg.status_message = f"运动中... {int(i / steps * 100)}%" goal_handle.publish_feedback(feedback_msg) self.get_logger().info(f'[Action] 进度: {int(i / steps * 100)}%') # 等待(除了最后一步) if i < steps: time.sleep(execution_time / steps) # 运动完成 result.success = True result.message = f"角度模式运动完成" result.execution_time = float(time.perf_counter() - start_time) result.final_angles = self.cached_motor_angles self.get_logger().info(f'[Action] {result.message},耗时: {result.execution_time:.2f} 秒') goal_handle.succeed() return result except Exception as e: result.success = False result.message = f"执行失败: {str(e)}" result.execution_time = float(time.perf_counter() - start_time) result.final_angles = self.cached_motor_angles self.get_logger().error(f'[Action] {result.message}') goal_handle.abort() return result def main(): rclpy.init() try: node = RobotInterfaceNode() # 使用多线程执行器,支持 Action 取消功能 executor = MultiThreadedExecutor(num_threads=4) executor.add_node(node) try: executor.spin() finally: executor.shutdown() node.destroy_node() except RuntimeError as e: print(f'启动失败: {e}') except KeyboardInterrupt: pass finally: rclpy.shutdown() if __name__ == '__main__': main()

对比上一章的代码,新增了

新增内容 代码 说明
导入 Action 相关模块 from rclpy.action import ActionServer, CancelResponse, GoalResponse Action 服务端和取消支持
导入回调组 from rclpy.callback_groups import ReentrantCallbackGroup 支持并发
导入执行器 from rclpy.executors import MultiThreadedExecutor 多线程执行
导入 Action 接口 from robot_arm_interfaces.action import AngleMode 自定义 Action
创建回调组 self.callback_group = ReentrantCallbackGroup() 允许并发回调
创建 Action Server ActionServer(self, AngleMode, 'angle_mode', ..., goal_callback=..., cancel_callback=...) 角度模式 Action(支持取消)
目标接收回调 goal_callback(self, goal_request) 决定是否接受目标
取消请求回调 cancel_callback(self, goal_handle) 触发紧急停止
Action 执行回调 angle_mode_callback(self, goal_handle) 处理运动请求
多线程执行器 MultiThreadedExecutor(num_threads=4) 支持取消功能

7.4 实现角度控制客户端

为了方便测试和使用,我们创建一个独立的客户端节点,可以通过命令行参数控制机械臂运动到指定角度。

创建文件 ~/ros2_ws/src/py_episode/py_episode/angles_control_client.py

#!/usr/bin/env python3 """ 角度模式控制客户端 - 通过命令行参数控制机械臂运动到指定角度 演示如何在独立节点中使用 Action Client """ import sys import rclpy from rclpy.node import Node from rclpy.action import ActionClient from robot_arm_interfaces.action import AngleMode class AnglesControlClient(Node): """角度模式控制客户端节点""" def __init__(self): super().__init__('angles_control_client') # 创建 Action Client self.action_client = ActionClient(self, AngleMode, 'angle_mode') # 等待 Action Server 可用 self.get_logger().info('等待角度模式 Action Server...') while not self.action_client.wait_for_server(timeout_sec=1.0): self.get_logger().info('服务不可用,继续等待...') self.get_logger().info('角度模式 Action Server 已连接') # 用于跟踪目标句柄 self._goal_handle = None def send_goal(self, angles, speed_ratio=0.5): """ 发送角度模式目标 参数: angles: 6 个关节的目标角度列表 speed_ratio: 运动速度比例(0.0-1.0) """ # 创建目标消息 goal_msg = AngleMode.Goal() goal_msg.angles = angles goal_msg.speed_ratio = speed_ratio self.get_logger().info(f'发送目标: angles={angles}, speed_ratio={speed_ratio}') # 异步发送目标,注册反馈回调 self._send_goal_future = self.action_client.send_goal_async( goal_msg, feedback_callback=self.feedback_callback ) # 注册目标响应回调 self._send_goal_future.add_done_callback(self.goal_response_callback) def goal_response_callback(self, future): """目标响应回调:处理目标是否被接受""" self._goal_handle = future.result() if not self._goal_handle.accepted: self.get_logger().error('✗ 目标被拒绝') rclpy.shutdown() return self.get_logger().info('✓ 目标被接受,等待执行...') # 获取结果 self._get_result_future = self._goal_handle.get_result_async() self._get_result_future.add_done_callback(self.get_result_callback) def feedback_callback(self, feedback_msg): """反馈回调:处理执行过程中的进度更新""" feedback = feedback_msg.feedback # 显示进度条 progress_bar = self._create_progress_bar(feedback.progress) self.get_logger().info( f'反馈: {progress_bar} {feedback.progress*100:.0f}% | ' f'已用时: {feedback.elapsed_time:.1f}s | ' f'{feedback.status_message}' ) def get_result_callback(self, future): """结果回调:处理任务完成后的最终结果""" result = future.result().result self.get_logger().info('='*50) if result.success: self.get_logger().info('✓ 任务成功!') self.get_logger().info(f' 消息: {result.message}') self.get_logger().info(f' 执行时间: {result.execution_time:.2f} 秒') self.get_logger().info(f' 最终角度: {list(result.final_angles)}') else: self.get_logger().error('✗ 任务失败!') self.get_logger().error(f' 消息: {result.message}') self.get_logger().info('='*50) # 任务完成,关闭节点 rclpy.shutdown() def _create_progress_bar(self, progress, width=20): """创建文本进度条""" filled = int(width * progress) bar = '█' * filled + '░' * (width - filled) return f'[{bar}]' def main(): # 解析命令行参数 if len(sys.argv) < 7: print('用法: ros2 run py_episode angles_ctrl <j1> <j2> <j3> <j4> <j5> <j6> [speed_ratio]') print() print('参数说明:') print(' j1-j6: 6 个关节的目标角度(单位:度)') print(' speed_ratio: 运动速度比例 0.0-1.0(可选,默认 0.5)') print() print('示例:') print(' ros2 run py_episode angles_ctrl 150 80 90 45 100 60 # 默认速度') print(' ros2 run py_episode angles_ctrl 150 80 90 45 100 60 0.3 # 慢速') print(' ros2 run py_episode angles_ctrl 180 90 83 30 110 30 0.8 # 快速回到 Home') sys.exit(1) # 解析角度参数 try: angles = [float(sys.argv[i]) for i in range(1, 7)] except ValueError as e: print(f'错误: 角度参数必须是数字,但收到: {sys.argv[1:7]}') sys.exit(1) # 解析速度比例(可选) speed_ratio = 0.5 if len(sys.argv) >= 8: try: speed_ratio = float(sys.argv[7]) if not (0.0 < speed_ratio <= 1.0): print(f'警告: 速度比例应在 0.0-1.0 之间,已自动限制') speed_ratio = max(0.1, min(speed_ratio, 1.0)) except ValueError: print(f'警告: 无法解析速度比例 "{sys.argv[7]}",使用默认值 0.5') # 初始化节点 rclpy.init() node = AnglesControlClient() try: # 发送目标 node.send_goal(angles, speed_ratio) # 保持节点运行,等待回调 rclpy.spin(node) except KeyboardInterrupt: # 用户按 Ctrl+C 取消 node.get_logger().warn('\n检测到 Ctrl+C,正在取消任务...') if node._goal_handle is not None: # 发送取消请求 cancel_future = node._goal_handle.cancel_goal_async() # 等待取消完成 rclpy.spin_until_future_complete(node, cancel_future, timeout_sec=2.0) if cancel_future.result(): node.get_logger().info('取消请求已发送') else: node.get_logger().error('取消请求发送失败') finally: if rclpy.ok(): node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()

代码说明

特性 实现 说明
命令行参数 sys.argv[1:7] 获取角度 支持 6 个关节角度 + 可选速度
服务等待 wait_for_server() 确保 Action Server 可用
异步发送 send_goal_async() 非阻塞发送目标
反馈处理 feedback_callback 实时显示进度条
结果处理 get_result_callback 显示最终执行结果
取消支持 KeyboardInterrupt 处理 + cancel_goal_async() Ctrl+C 触发紧急停止并取消任务

7.5 添加入口点并运行

更新 setup.py

entry_points={ 'console_scripts': [ 'simple_pub = py_episode.simple_publisher:main', 'simple_sub = py_episode.simple_subscriber:main', 'joint_pub = py_episode.joint_state_publisher:main', 'joint_sub = py_episode.joint_state_subscriber:main', 'motor_pub = py_episode.motor_angles_publisher:main', 'motor_sub = py_episode.motor_angles_subscriber:main', 'simple_srv = py_episode.simple_service:main', 'simple_cli = py_episode.simple_client:main', 'gripper_srv = py_episode.gripper_service:main', 'gripper_cli = py_episode.gripper_client:main', 'robot_interface_srv = py_episode.robot_interface_with_service:main', 'gripper_ctrl = py_episode.gripper_control_client:main', 'angle_srv = py_episode.angle_mode_server:main', 'angle_cli = py_episode.angle_mode_client:main', 'robot_interface_action = py_episode.robot_interface_with_action:main', # 新增 'angles_ctrl = py_episode.angles_control_client:main', # 新增 ], },

构建并运行

cd ~/ros2_ws colcon build --packages-select py_episode --symlink-install source install/setup.bash # 终端 1:启动机械臂接口节点(使用模式 1 跳过回零) ros2 run py_episode robot_interface_action --ros-args -p usb_index:=1 -p init_mode:=1 # 终端 2:使用客户端控制机械臂运动 ros2 run py_episode angles_ctrl 150 80 90 45 100 60 # 默认速度 ros2 run py_episode angles_ctrl 180 90 83 30 110 30 0.8 # 快速回到 Home ros2 run py_episode angles_ctrl 160 85 80 40 105 50 0.3 # 慢速运动

7.6 运行效果

Server 输出

[INFO] [robot_interface_node]: ================================================ [INFO] [robot_interface_node]: 机械臂接口节点已启动 [INFO] [robot_interface_node]: - Topic: /motor_angles (30.0 Hz) [INFO] [robot_interface_node]: - Service: /servo_gripper [INFO] [robot_interface_node]: - Action: /angle_mode [INFO] [robot_interface_node]: ================================================ [INFO] [robot_interface_node]: [Action] 接收到角度模式目标... [INFO] [robot_interface_node]: [Action] 目标角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [robot_interface_node]: [Action] 速度比例: 0.5 [INFO] [robot_interface_node]: [Action] 预计执行时间: 0.67 秒 [INFO] [robot_interface_node]: [Action] 进度: 0% [INFO] [robot_interface_node]: [Action] 进度: 10% ... [INFO] [robot_interface_node]: [Action] 进度: 100% [INFO] [robot_interface_node]: [Action] 角度模式运动完成,耗时: 0.70 秒

Client 输出

[INFO] [angles_control_client]: 等待角度模式 Action Server... [INFO] [angles_control_client]: 角度模式 Action Server 已连接 [INFO] [angles_control_client]: 发送目标: angles=[150.0, 80.0, 90.0, 45.0, 100.0, 60.0], speed_ratio=0.5 [INFO] [angles_control_client]: ✓ 目标被接受,等待执行... [INFO] [angles_control_client]: 反馈: [██░░░░░░░░░░░░░░░░░░] 10% | 已用时: 0.1s | 运动中... 10% [INFO] [angles_control_client]: 反馈: [████░░░░░░░░░░░░░░░░] 20% | 已用时: 0.2s | 运动中... 20% ... [INFO] [angles_control_client]: 反馈: [████████████████████] 100% | 已用时: 0.7s | 运动中... 100% [INFO] [angles_control_client]: ================================================== [INFO] [angles_control_client]: ✓ 任务成功! [INFO] [angles_control_client]: 消息: 角度模式运动完成 [INFO] [angles_control_client]: 执行时间: 0.70 秒 [INFO] [angles_control_client]: 最终角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [angles_control_client]: ==================================================

7.7 取消功能测试

Action 的一个重要特性是支持中途取消任务。当需要紧急停止机械臂运动时,可以通过取消功能触发紧急停止。

取消功能的工作流程

Client Server │ │ │ ① 发送目标 │ │ ──────────────────────────────────────────────> │ │ │ │ ② 目标被接受 │ │ <────────────────────────────────────────────── │ │ │ │ ┌──────────┴──────────┐ │ │ execute_callback() │ │ │ 开始执行任务 │ │ └──────────┬──────────┘ │ ③ 持续反馈 │ │ <────────────────────────────────────────────── │ │ progress=10%, 20%, 30%... │ │ │ 【用户按 Ctrl+C】 │ │ │ │ ④ 发送取消请求 │ │ ──────────────────────────────────────────────> │ │ │ │ ┌──────────┴──────────┐ │ │ cancel_callback() │ │ │ 触发紧急停止 │ │ │ set_emergency_ │ │ │ stop(True) │ │ └──────────┬──────────┘ │ ⑤ 取消确认 │ │ <────────────────────────────────────────────── │ │ │ │ ┌──────────┴──────────┐ │ │ execute_callback() │ │ │ 检测到取消请求 │ │ │ 解除紧急停止 │ │ │ set_emergency_ │ │ │ stop(False) │ │ └──────────┬──────────┘ │ ⑥ 返回取消结果 │ │ <────────────────────────────────────────────── │ │ success=false, message="任务被取消" │ │ │ ▼ ▼

测试取消功能

# 终端 1:启动 Server ros2 run py_episode robot_interface_action --ros-args -p usb_index:=1 -p init_mode:=1 # 终端 2:启动 Client,在运动过程中按 Ctrl+C ros2 run py_episode angles_ctrl 150 80 90 45 100 60 0.3 # 使用慢速便于测试取消 # 等待几秒后按 Ctrl+C

取消时的输出效果

Client 输出

[INFO] [angles_control_client]: 发送目标: angles=[150.0, 80.0, 90.0, 45.0, 100.0, 60.0], speed_ratio=0.3 [INFO] [angles_control_client]: ✓ 目标被接受,等待执行... [INFO] [angles_control_client]: 反馈: [██░░░░░░░░░░░░░░░░░░] 10% | 已用时: 0.3s | 运动中... 10% [INFO] [angles_control_client]: 反馈: [████░░░░░░░░░░░░░░░░] 20% | 已用时: 0.6s | 运动中... 20% ^C [WARN] [angles_control_client]: 检测到 Ctrl+C,正在取消任务... [INFO] [angles_control_client]: 取消请求已发送 [INFO] [angles_control_client]: ================================================== [ERROR] [angles_control_client]: ✗ 任务失败! [ERROR] [angles_control_client]: 消息: 任务被取消 [INFO] [angles_control_client]: ==================================================

Server 输出

[INFO] [robot_interface_node]: [Action] 收到目标请求 [INFO] [robot_interface_node]: [Action] 接收到角度模式目标... [INFO] [robot_interface_node]: [Action] 目标角度: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0] [INFO] [robot_interface_node]: [Action] 速度比例: 0.3 [INFO] [robot_interface_node]: [Action] 预计执行时间: 1.20 秒 [INFO] [robot_interface_node]: [Action] 进度: 0% [INFO] [robot_interface_node]: [Action] 进度: 10% [INFO] [robot_interface_node]: [Action] 进度: 20% [INFO] [robot_interface_node]: [Action] 收到取消请求,准备紧急停止 [WARN] [robot_interface_node]: [Action] 已触发紧急停止 [INFO] [robot_interface_node]: [Action] 已解除紧急停止 [WARN] [robot_interface_node]: [Action] 任务被取消

取消功能的关键实现

组件 关键代码 说明
Server: 接受取消 cancel_callback() 返回 CancelResponse.ACCEPT 允许取消
Server: 紧急停止 self.motor_control.set_emergency_stop(active=True) 立即停止机械臂
Server: 更新角度 self.motor_control.last_degrees = [...] 记录停止时的角度,确保下次运动计算准确
Server: 检测取消 if goal_handle.is_cancel_requested: 循环中频繁检查
Server: 解除停止 self.motor_control.set_emergency_stop(active=False) 允许后续运动
Server: 标记取消 goal_handle.canceled() 设置状态为 CANCELED
Client: 发送取消 goal_handle.cancel_goal_async() 请求取消任务
Client: 等待确认 rclpy.spin_until_future_complete() 等待取消完成

为什么需要更新角度记录?

# 在 cancel_callback 中 self.motor_control.set_emergency_stop(active=True) # 紧急停止 # 更新角度记录(使用 Topic 持续更新的缓存角度) self.motor_control.last_degrees = [ self.cached_motor_angles[i] * self.motor_control.motor_ratios[i] for i in range(6) ] # ↑ 这一步很重要! # 如果不更新,电机控制器还记录着"停止前应该到达的目标角度" # 下次运动时会从错误的起始位置计算,导致突然的大幅运动

为什么需要解除紧急停止?

# 在检测到取消时 if goal_handle.is_cancel_requested: # 1. 触发紧急停止(在 cancel_callback 中已完成) # 机械臂立即停止,角度已更新 # 2. 解除紧急停止 self.motor_control.set_emergency_stop(active=False) # ↑ 这一步很重要! # 如果不解除,后续的运动命令都会被阻塞 # 3. 返回取消结果 result.message = "任务被取消" goal_handle.canceled() return result

如果不解除紧急停止,机械臂将保持锁定状态,无法响应新的运动命令。

7.8 综合测试

现在我们有了一个同时支持 Topic + Service + Action 的完整节点,可以进行综合测试:

# 终端 1:启动机械臂接口节点 ros2 run py_episode robot_interface_action --ros-args -p usb_index:=1 -p init_mode:=1 # 终端 2:订阅 Topic 查看电机角度 ros2 topic echo /motor_angles # 终端 3:调用 Service 控制夹爪 ros2 run py_episode gripper_ctrl 50 # 设置夹爪到 50 度 # 终端 4:调用 Action 控制机械臂运动 ros2 run py_episode angles_ctrl 150 80 90 45 100 60 0.5

也可以使用命令行工具测试:

# 查看所有 Topic ros2 topic list # 查看所有 Service ros2 service list # 查看所有 Action ros2 action list # 命令行调用 Service ros2 service call /servo_gripper robot_arm_interfaces/srv/ServoGripper "{angle: 60}" # 命令行调用 Action(带反馈) ros2 action send_goal --feedback /angle_mode robot_arm_interfaces/action/AngleMode \ "{angles: [160.0, 85.0, 80.0, 40.0, 105.0, 50.0], speed_ratio: 0.5}"

7.9 三种通信方式对比总结

通过本节的实战,我们在一个节点中实现了三种通信方式:

通信方式 接口名称 功能 使用场景 取消支持
Topic /motor_angles 30Hz 发布电机角度 持续监控、数据记录 不适用
Service /servo_gripper 控制舵机夹爪角度 快速命令、需要结果确认 不支持
Action /angle_mode 控制机械臂运动到目标角度 长时间任务、需要进度反馈 ✓ 支持
┌─────────────────────────────────────────────────────────────┐ │ 通信方式选择指南 │ ├─────────────────────────────────────────────────────────────┤ │ 数据类型 推荐方式 原因 │ ├─────────────────────────────────────────────────────────────┤ │ 传感器数据流 Topic 持续、高频、单向 │ │ 状态查询 Service 按需、快速、需要返回值 │ │ 快速命令 Service 一次性、需要确认结果 │ │ 运动控制 Action 长时间、需要反馈、可取消 │ │ 导航任务 Action 多步骤、需要进度更新 │ │ 紧急停止场景 Action 需要中途取消能力 │ └─────────────────────────────────────────────────────────────┘

Action 取消功能的价值

在机械臂控制场景中,取消功能不仅是一个"nice to have"特性,而是安全性的关键需求

场景 为什么需要取消
碰撞风险 发现即将碰撞障碍物,需要立即停止
路径错误 发现规划路径有误,需要重新规划
紧急情况 操作人员需要介入处理
任务变更 需要执行更高优先级的任务

通过 set_emergency_stop(active=True/False) 机制,我们实现了:

  • 立即响应:收到取消请求时立即停止机械臂
  • 状态恢复:完成取消后解除紧急停止,允许后续运动
  • 安全保障:避免失控运动导致的设备损坏或人员伤害

八、通信方式总结对比

8.1 三种通信方式完整对比

特性 Topic Service Action
通信模式 发布-订阅 请求-响应 目标-反馈-结果
方向 单向 双向(一次) 双向(多次)
同步/异步 异步 同步 异步
反馈机制 有(持续)
取消支持 不适用 不支持 支持
接口文件 .msg .srv(2 部分) .action(3 部分)
典型场景 传感器数据、状态发布 查询、快速命令 长时间任务、运动控制

8.2 创建方法对比

# Topic Publisher self.pub = self.create_publisher(MsgType, 'topic_name', 10) self.pub.publish(msg) # Topic Subscriber self.sub = self.create_subscription(MsgType, 'topic_name', self.callback, 10) # Service Server self.srv = self.create_service(SrvType, 'service_name', self.callback) # Service Client self.cli = self.create_client(SrvType, 'service_name') future = self.cli.call_async(request) # Action Server from rclpy.action import ActionServer self.action_srv = ActionServer(self, ActionType, 'action_name', self.execute_callback) # Action Client from rclpy.action import ActionClient self.action_cli = ActionClient(self, ActionType, 'action_name') future = self.action_cli.send_goal_async(goal, feedback_callback=self.fb_callback)

8.3 回调函数对比

类型 回调签名 返回值
Topic Subscriber callback(msg)
Service Server callback(request, response) response
Action Server execute_callback(goal_handle) result
Action Client 三个回调:goal_responsefeedbackget_result

九、本章小结

9.1 学到了什么?

  1. Action 的核心概念

    • 目标(Goal)→ 反馈(Feedback)→ 结果(Result)
    • 适用于长时间运行、需要进度反馈、可能被取消的任务
  2. 自定义 Action 接口

    • .action 文件包含三部分,用 --- 分隔
    • 需要在 CMakeLists.txt 中声明并编译
  3. Action Server 编写

    • 使用 ActionServer 类创建
    • execute_callback 负责执行任务、发布反馈、返回结果
    • 使用 goal_handle.publish_feedback() 发送进度
    • 使用 goal_handle.succeed() / .abort() / .canceled() 标记状态
  4. Action Client 编写

    • 使用 ActionClient 类创建
    • send_goal_async() 异步发送目标
    • 通过回调函数处理响应、反馈和结果

十、附录

10.1 常见错误排查

错误现象 可能原因 解决方法
"Action server not available" Server 未启动或名称不匹配 检查 Server 是否运行,确认 Action 名称一致
"Goal rejected" Server 拒绝了目标 检查目标参数是否有效
反馈回调不触发 未注册 feedback_callback 确认 send_goal_async() 传入了回调函数
结果回调不触发 未调用 goal_handle.succeed() 确认 Server 正确标记了目标状态

10.2 调试命令

# 查看所有 Action ros2 action list # 查看 Action 详情 ros2 action info /angle_mode # 查看 Action 接口定义 ros2 interface show robot_arm_interfaces/action/AngleMode # 发送目标(带反馈) ros2 action send_goal --feedback /angle_mode robot_arm_interfaces/action/AngleMode \ "{angles: [150.0, 80.0, 90.0, 45.0, 100.0, 60.0], speed_ratio: 0.5}"