Soma Zero Tutorials
🔍 搜索功能尚未开启,敬请期待。

3.6 ROS2 Service 通信编程与机械臂集成

一、本章目标

在前两章中,我们学习了 Topic 通信(Publisher/Subscriber)和自定义消息(.msg)。Topic 适合持续发布的数据流(如关节角度),但有些场景需要一问一答的模式:

  • 查询机械臂当前角度(问一次,答一次)
  • 控制夹爪张合(发送指令,等待执行结果)
  • 读取传感器数据(请求时才读取)

这就是 Service(服务) 通信模式。

1.1 学习路线

  1. 理解 Service vs Topic - 什么时候用哪种通信方式
  2. 编写极简 Service 和 Client - 用标准接口快速入门
  3. 自定义 Service 接口 - 定义 .srv 文件
  4. 机械臂通信实战 - 实现舵机夹爪控制服务

1.2 本章目标

完成本章后,你将能够:

  • ✓ 理解 Service 的请求-响应通信模式
  • ✓ 编写 Service Server 和 Client 节点
  • ✓ 创建自定义 .srv 服务接口
  • ✓ 实现机械臂舵机夹爪控制服务

二、Service vs Topic:什么时候用哪种?

2.1 回顾外卖系统类比

在第三章,我们用外卖系统类比 ROS2 通信:

通信方式 外卖系统类比 特点
Topic 骑手持续广播 GPS 位置 单向、持续、多对多
Service 顾客点击查询骑手位置 双向、一次性、一对一

Topic 模式

骑手节点 ──────> /rider_gps ──────> 顾客节点 持续广播 持续接收

Service 模式

顾客节点 ──请求──> /get_rider_location ──响应──> 顾客节点 "骑手在哪?" "骑手在 xx 路"

2.2 Service 的核心概念

Service 采用请求-响应(Request-Response) 模式:

┌─────────────┐ ┌─────────────┐ │ Client │ ─── Request ──────> │ Server │ │ (客户端) │ │ (服务端) │ │ │ <── Response ────── │ │ └─────────────┘ └─────────────┘
  • Server(服务端):提供服务,等待请求,处理后返回响应
  • Client(客户端):发起请求,等待响应
  • Service Name:服务名称(类似话题名称)
  • Service Type:服务类型(由 .srv 文件定义)

2.3 何时使用 Service?

场景 推荐方式 原因
持续数据流(关节角度、图像) Topic 数据持续产生,需要实时传输
一次性操作(查询状态、执行命令) Service 请求时才需要,有明确的返回值
需要确认结果(夹爪是否夹紧) Service 需要知道操作是否成功
长时间任务(移动到目标点) Action 需要反馈进度,支持取消

机械臂场景

功能 通信方式 理由
发布关节角度 Topic 持续、高频发布
读取当前角度 Service 按需查询
控制夹爪开合 Service 一次性命令,需要结果反馈
执行轨迹运动 Action 长时间任务,需要进度反馈

2.4 Service 接口的结构

Service 接口由 .srv 文件定义,包含请求(Request)响应(Response) 两部分,用 --- 分隔:

# 请求部分(Client 发送给 Server) int64 a int64 b --- # 响应部分(Server 返回给 Client) int64 sum

类比理解

  • 请求 = 顾客的点单内容
  • 响应 = 餐厅返回的菜品

三、编写极简 Service 和 Client

和 Topic 一样,我们先用标准接口快速入门,再引入自定义接口。

3.1 使用标准服务接口

ROS2 提供了一些标准服务接口用于学习和测试。我们使用 example_interfaces/srv/AddTwoInts

# 查看服务结构 ros2 interface show example_interfaces/srv/AddTwoInts

输出:

int64 a int64 b --- int64 sum

含义

  • 请求:两个整数 ab
  • 响应:它们的和 sum

3.2 编写 Service Server

创建文件 ~/ros2_ws/src/py_episode/py_episode/simple_service.py

#!/usr/bin/env python3 """ 极简 Service Server:提供两数相加服务 """ import rclpy from rclpy.node import Node from example_interfaces.srv import AddTwoInts # 标准服务接口 class SimpleService(Node): """简单服务端节点""" def __init__(self): super().__init__('simple_service') # 创建 Service:服务名 /add_two_ints,类型 AddTwoInts,回调函数 callback self.srv = self.create_service( AddTwoInts, # 服务类型 'add_two_ints', # 服务名称 self.callback # 回调函数 ) self.get_logger().info('Service Server 已启动,等待请求...') def callback(self, request, response): """ 服务回调函数 参数: request: 客户端发来的请求(包含 a 和 b) response: 要返回给客户端的响应(包含 sum) 返回: 填充好的 response 对象 """ response.sum = request.a + request.b self.get_logger().info(f'收到请求: {request.a} + {request.b} = {response.sum}') return response def main(): rclpy.init() node = SimpleService() try: rclpy.spin(node) # 保持节点运行,等待请求 except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()

核心代码解析

# 创建 Service(3 个参数) self.srv = self.create_service( AddTwoInts, # 1. 服务类型 'add_two_ints', # 2. 服务名称 self.callback # 3. 回调函数 ) # 回调函数:处理请求,返回响应 def callback(self, request, response): response.sum = request.a + request.b return response # 必须返回 response!

对比 Topic Subscriber

  • Subscriber 回调只有 msg 参数
  • Service 回调有 requestresponse 两个参数
  • Service 回调必须 return response

3.3 编写 Service Client

创建文件 ~/ros2_ws/src/py_episode/py_episode/simple_client.py

#!/usr/bin/env python3 """ 极简 Service Client:调用两数相加服务 """ import rclpy from rclpy.node import Node from example_interfaces.srv import AddTwoInts # 标准服务接口 class SimpleClient(Node): """简单客户端节点""" def __init__(self): super().__init__('simple_client') # 创建 Client:服务名和类型必须与 Server 一致 self.cli = self.create_client(AddTwoInts, 'add_two_ints') # 等待服务可用 while not self.cli.wait_for_service(timeout_sec=1.0): self.get_logger().info('服务不可用,等待中...') self.get_logger().info('Service Client 已就绪') def send_request(self, a, b): """发送请求并等待响应""" # 创建请求对象 request = AddTwoInts.Request() request.a = a request.b = b self.get_logger().info(f'发送请求: {a} + {b} = ?') # 异步调用服务 future = self.cli.call_async(request) # 等待响应完成 rclpy.spin_until_future_complete(self, future) # 获取结果 response = future.result() self.get_logger().info(f'收到响应: {a} + {b} = {response.sum}') return response.sum def main(): rclpy.init() node = SimpleClient() try: # 发送几次请求测试 node.send_request(3, 5) node.send_request(10, 20) node.send_request(100, 200) except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()

Client 与 Server 交互流程

┌──────────────────────────────────────────────────────────────────────────────┐ │ Service Client 调用流程 │ └──────────────────────────────────────────────────────────────────────────────┘ Client 节点 Server 节点 ────────── ────────── │ │ │ ① create_client(ServiceType, 'name') │ │ 创建客户端 │ ├────────────────────────────────────────────────────►│ │ │ │ ② wait_for_service(timeout_sec=1.0) │ │ 等待服务可用(循环检查) │ │◄───────────────────────────────────────────────────►│ │ │ │ ③ request = ServiceType.Request() │ │ 创建请求对象,填充字段 │ │ │ │ ④ future = cli.call_async(request) │ │ 发送异步请求 │ ├────────────────────────────────────────────────────►│ │ ┌────────────────┤ │ │ callback() │ │ │ 处理请求 │ │ │ return response│ │ └────────────────┤ │ ⑤ spin_until_future_complete(node, future) │ │ 阻塞等待响应完成 │ │◄────────────────────────────────────────────────────┤ │ │ │ ⑥ response = future.result() │ │ 获取响应结果 │ │ │ ▼ ▼ ┌──────────────────────────────────────────────────────────────────────────────┐ │ 关键函数说明: │ │ • create_client() - 创建服务客户端,指定类型和服务名 │ │ • wait_for_service() - 阻塞等待服务端可用,返回 True/False │ │ • call_async() - 异步发送请求,返回 Future 对象 │ │ • spin_until_future_complete() - 阻塞直到 Future 完成(收到响应) │ │ • future.result() - 获取响应对象 │ └──────────────────────────────────────────────────────────────────────────────┘

核心代码解析

# 创建 Client(2 个参数) self.cli = self.create_client(AddTwoInts, 'add_two_ints') # 等待服务可用(重要!否则调用会失败) while not self.cli.wait_for_service(timeout_sec=1.0): self.get_logger().info('服务不可用,等待中...') # 创建请求对象 request = AddTwoInts.Request() request.a = a request.b = b # 异步调用并等待结果 future = self.cli.call_async(request) rclpy.spin_until_future_complete(self, future) response = future.result()

为什么用异步调用?

ROS2 推荐使用 call_async() 异步调用服务,原因:

  • 不会阻塞节点的其他回调
  • 避免死锁问题
  • 更灵活的超时控制

rclpy.spin_until_future_complete() 会等待直到收到响应。

3.4 配置并运行

添加依赖package.xml):

<exec_depend>rclpy</exec_depend> <exec_depend>std_msgs</exec_depend> <exec_depend>sensor_msgs</exec_depend> <exec_depend>robot_arm_interfaces</exec_depend> <exec_depend>example_interfaces</exec_depend> <!-- 新增 -->

添加入口点setup.py):

entry_points={ 'console_scripts': [ 'simple_pub = py_episode.simple_publisher:main', 'simple_sub = py_episode.simple_subscriber:main', 'joint_pub = py_episode.joint_state_publisher:main', 'joint_sub = py_episode.joint_state_subscriber:main', 'motor_pub = py_episode.motor_angles_publisher:main', 'motor_sub = py_episode.motor_angles_subscriber:main', 'simple_srv = py_episode.simple_service:main', # 新增 'simple_cli = py_episode.simple_client:main', # 新增 ], },

构建并运行

cd ~/ros2_ws colcon build --packages-select py_episode --symlink-install source install/setup.bash # 终端 1:启动 Service Server ros2 run py_episode simple_srv # 终端 2:启动 Service Client ros2 run py_episode simple_cli

运行效果

# Server 输出 [INFO] [simple_service]: Service Server 已启动,等待请求... [INFO] [simple_service]: 收到请求: 3 + 5 = 8 [INFO] [simple_service]: 收到请求: 10 + 20 = 30 [INFO] [simple_service]: 收到请求: 100 + 200 = 300 # Client 输出 [INFO] [simple_client]: Service Client 已就绪 [INFO] [simple_client]: 发送请求: 3 + 5 = ? [INFO] [simple_client]: 收到响应: 3 + 5 = 8 [INFO] [simple_client]: 发送请求: 10 + 20 = ? [INFO] [simple_client]: 收到响应: 10 + 20 = 30 [INFO] [simple_client]: 发送请求: 100 + 200 = ? [INFO] [simple_client]: 收到响应: 100 + 200 = 300

3.5 使用命令行工具调试

# 查看服务列表 ros2 service list # 输出: /add_two_ints # 查看服务类型 ros2 service type /add_two_ints # 输出: example_interfaces/srv/AddTwoInts # 命令行调用服务 ros2 service call /add_two_ints example_interfaces/srv/AddTwoInts "{a: 10, b: 32}" # 输出: # requester: making request: example_interfaces.srv.AddTwoInts_Request(a=10, b=32) # response: example_interfaces.srv.AddTwoInts_Response(sum=42)

3.6 Service vs Topic 核心对比

特性 Topic Service
通信模式 发布-订阅(单向) 请求-响应(双向)
创建方法 create_publisher / create_subscription create_service / create_client
回调参数 msg request, response
返回值 必须 return response
调用方式 publish(msg) call_async(request)
适用场景 持续数据流 一次性操作

四、自定义 Service 接口

和自定义消息(.msg)类似,我们可以创建自定义服务接口(.srv)来满足特定需求。

4.1 极简设计 vs 自定义设计

先对比一下两种接口设计风格:

AddTwoInts(极简设计) ServoGripper(真实场景设计) ──────────────────── ──────────────────────── # 请求 # 请求 int64 a int32 angle int64 b --- --- # 响应 # 响应 int64 sum bool success ← 状态标志 string message ← 错误/状态信息 int32 actual_angle ← 实际执行值

真实场景响应中始终包含 successmessage 字段,让调用方知道操作是否成功以及失败原因。

4.2 为什么需要自定义 Service?

标准接口 AddTwoInts 只能做加法,但机械臂控制需要更丰富的功能:

需求 标准接口能否满足?
控制夹爪角度(0-110 度) 需要自定义请求字段
返回实际角度和执行状态 需要自定义响应字段
错误信息反馈 需要自定义错误描述

4.3 创建 .srv 文件

我们来创建一个 ServoGripper.srv,用于控制舵机夹爪:

步骤 1:创建 srv 目录

cd ~/ros2_ws/src/robot_arm_interfaces mkdir -p srv

步骤 2:创建 ServoGripper.srv

nano srv/ServoGripper.srv

写入以下内容:

# ServoGripper.srv # 舵机夹爪控制服务 # ========== 请求部分 ========== # 目标夹爪角度(0-110度,0=完全张开,110=完全闭合) int32 angle --- # ========== 响应部分 ========== # 是否执行成功 bool success # 状态信息或错误说明 string message # 实际设置的角度(经过 0-110 范围限制后的值) int32 actual_angle

设计说明

字段 类型 说明
请求
angle int32 目标夹爪角度(0-110 度)
响应
success bool 执行是否成功
message string 状态信息或错误描述
actual_angle int32 实际设置的角度(经过范围限制后)

为什么响应包含 actual_angle

用户可能请求 angle=200(超出范围),Server 会将其限制到 110
返回 actual_angle 让 Client 知道实际执行的值。

4.4 修改 CMakeLists.txt

打开 ~/ros2_ws/src/robot_arm_interfaces/CMakeLists.txt,添加 .srv 文件:

cmake_minimum_required(VERSION 3.8) project(robot_arm_interfaces) # 编译器警告选项(C++ 代码质量检查) if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") add_compile_options(-Wall -Wextra -Wpedantic) endif() # ========== 查找依赖包 ========== find_package(ament_cmake REQUIRED) # ROS2 CMake 构建工具 find_package(builtin_interfaces REQUIRED) # 内置接口类型(Time, Duration 等) # ========== 自定义消息生成 ========== find_package(rosidl_default_generators REQUIRED) # 消息代码生成器 # 声明要生成的消息文件(可以添加多个 .msg 文件) rosidl_generate_interfaces(${PROJECT_NAME} "msg/MotorAngles.msg" "srv/ServoGripper.srv" # 新增:舵机夹爪服务 DEPENDENCIES builtin_interfaces ) # ========== 测试配置(可选)========== if(BUILD_TESTING) find_package(ament_lint_auto REQUIRED) # 以下行跳过版权检查(如果没有添加版权信息,可以取消注释) # set(ament_cmake_copyright_FOUND TRUE) # 以下行跳过 cpplint 检查(如果不是 git 仓库,可以取消注释) # set(ament_cmake_cpplint_FOUND TRUE) ament_lint_auto_find_test_dependencies() endif() # ========== 导出包信息 ========== ament_package()

4.5 编译并验证

cd ~/ros2_ws colcon build --packages-select robot_arm_interfaces source install/setup.bash # 验证服务接口 ros2 interface show robot_arm_interfaces/srv/ServoGripper

输出:

# ========== 请求部分 ========== # 目标夹爪角度(0-110度,0=完全张开,110=完全闭合) int32 angle --- # ========== 响应部分 ========== # 是否执行成功 bool success # 状态信息或错误说明 string message # 实际设置的角度(经过 0-110 范围限制后的值) int32 actual_angle

成功! 自定义服务接口已经可以使用了。

4.6 查看生成的文件(可选)

# Python 生成的代码位置 ls ~/ros2_ws/install/robot_arm_interfaces/lib/python3.*/site-packages/robot_arm_interfaces/srv/ # 应该看到: # _servo_gripper.py # __init__.py

生成的代码包含两个类:

  • ServoGripper.Request:请求类(包含 angle 字段)
  • ServoGripper.Response:响应类(包含 successmessageactual_angle 字段)

五、使用自定义 Service 重构代码

现在我们用自定义的 ServoGripper 服务来控制夹爪。

5.1 编写 Gripper Service Server(模拟版)

先用模拟版验证接口,不连接真实硬件:

创建文件 ~/ros2_ws/src/py_episode/py_episode/gripper_service.py

#!/usr/bin/env python3 """ 舵机夹爪服务端(模拟版) 演示如何使用自定义 Service 接口 """ import rclpy from rclpy.node import Node from robot_arm_interfaces.srv import ServoGripper # 自定义服务接口 class GripperService(Node): """舵机夹爪服务端节点""" def __init__(self): super().__init__('gripper_service') # 创建 Service self.srv = self.create_service( ServoGripper, # 服务类型(自定义) 'servo_gripper', # 服务名称 self.servo_gripper_callback # 回调函数 ) # 记录当前夹爪角度 self.current_angle = 0 self.get_logger().info('舵机夹爪服务已启动(模拟版)') def servo_gripper_callback(self, request, response): """ 舵机夹爪控制回调函数 参数: request.angle: 目标角度(0-110度) 返回: response.success: 是否成功 response.message: 状态信息 response.actual_angle: 实际设置的角度 """ try: # 限制角度范围到 0-110 度 target_angle = request.angle clamped_angle = max(0, min(target_angle, 110)) # 模拟执行(实际硬件时这里调用 motor_control.servo_gripper()) self.current_angle = clamped_angle # 填充响应 response.success = True response.actual_angle = clamped_angle if target_angle != clamped_angle: response.message = f'角度已限制: 请求 {target_angle}° -> 实际 {clamped_angle}°' else: response.message = f'夹爪已设置到 {clamped_angle}°' self.get_logger().info(response.message) except Exception as e: response.success = False response.actual_angle = self.current_angle response.message = f'执行失败: {str(e)}' self.get_logger().error(response.message) return response def main(): rclpy.init() node = GripperService() try: rclpy.spin(node) except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()

5.2 编写 Gripper Service Client

创建文件 ~/ros2_ws/src/py_episode/py_episode/gripper_client.py

#!/usr/bin/env python3 """ 舵机夹爪客户端 演示如何调用自定义 Service """ import sys import rclpy from rclpy.node import Node from robot_arm_interfaces.srv import ServoGripper # 自定义服务接口 class GripperClient(Node): """舵机夹爪客户端节点""" def __init__(self): super().__init__('gripper_client') # 创建 Client self.cli = self.create_client(ServoGripper, 'servo_gripper') # 等待服务可用 while not self.cli.wait_for_service(timeout_sec=1.0): self.get_logger().info('等待舵机夹爪服务...') self.get_logger().info('舵机夹爪客户端已就绪') def send_request(self, angle): """ 发送夹爪控制请求 参数: angle: 目标角度(0-110度) 返回: (success, actual_angle, message) 元组 """ # 创建请求 request = ServoGripper.Request() request.angle = angle self.get_logger().info(f'发送请求: 设置夹爪角度为 {angle}°') # 异步调用 future = self.cli.call_async(request) rclpy.spin_until_future_complete(self, future) # 获取响应 response = future.result() if response.success: self.get_logger().info(f'成功: {response.message}') else: self.get_logger().error(f'失败: {response.message}') return response.success, response.actual_angle, response.message def main(): rclpy.init() node = GripperClient() # 从命令行获取角度,默认为 50 angle = int(sys.argv[1]) if len(sys.argv) > 1 else 50 try: node.send_request(angle) except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()

5.3 配置并运行

更新入口点setup.py):

entry_points={ 'console_scripts': [ 'simple_pub = py_episode.simple_publisher:main', 'simple_sub = py_episode.simple_subscriber:main', 'joint_pub = py_episode.joint_state_publisher:main', 'joint_sub = py_episode.joint_state_subscriber:main', 'motor_pub = py_episode.motor_angles_publisher:main', 'motor_sub = py_episode.motor_angles_subscriber:main', 'simple_srv = py_episode.simple_service:main', 'simple_cli = py_episode.simple_client:main', 'gripper_srv = py_episode.gripper_service:main', # 新增 'gripper_cli = py_episode.gripper_client:main', # 新增 ], },

构建并运行

cd ~/ros2_ws colcon build --packages-select py_episode --symlink-install source install/setup.bash # 终端 1:启动 Service Server ros2 run py_episode gripper_srv # 终端 2:调用服务 ros2 run py_episode gripper_cli 50 # 设置 50 度 ros2 run py_episode gripper_cli 0 # 完全张开 ros2 run py_episode gripper_cli 110 # 完全闭合 ros2 run py_episode gripper_cli 200 # 测试范围限制

运行效果

# Server 输出 [INFO] [gripper_service]: 舵机夹爪服务已启动(模拟版) [INFO] [gripper_service]: 夹爪已设置到 50° [INFO] [gripper_service]: 夹爪已设置到 0° [INFO] [gripper_service]: 夹爪已设置到 110° [INFO] [gripper_service]: 角度已限制: 请求 200° -> 实际 110° # Client 输出 [INFO] [gripper_client]: 舵机夹爪客户端已就绪 [INFO] [gripper_client]: 发送请求: 设置夹爪角度为 50° [INFO] [gripper_client]: 成功: 夹爪已设置到 50°

5.4 使用命令行调用服务

# 查看服务 ros2 service list # 输出: /servo_gripper # 命令行调用 ros2 service call /servo_gripper robot_arm_interfaces/srv/ServoGripper "{angle: 75}" # 输出: waiting for service to become available... requester: making request: robot_arm_interfaces.srv.ServoGripper_Request(angle=75) response: robot_arm_interfaces.srv.ServoGripper_Response(success=True, message='夹爪已设置到 75°', actual_angle=75)

六、机械臂 Service 通信实战

现在,我们将自定义 Service 集成到真实的机械臂控制节点中。

注意舵机夹爪长时间堵转、夹取物体,可能温度会过高烧坏舵机!

6.1 集成架构

我们将在上一章的 RobotInterfaceNode 基础上,添加舵机夹爪服务:

┌─────────────────────────────────────────────────┐ │ RobotInterfaceNode │ │ │ │ ┌─────────────┐ ┌─────────────────────────┐ │ │ │ Topic │ │ Service │ │ │ │ Publisher │ │ Server │ │ │ │ │ │ │ │ │ │ /motor_ │ │ /servo_gripper │ │ │ │ angles │ │ │ │ │ └──────┬──────┘ └───────────┬─────────────┘ │ │ │ │ │ │ └───────────┬───────────┘ │ │ │ │ │ ┌──────▼──────┐ │ │ │ MotorControl │ │ │ │ (CAN 通信) │ │ │ └─────────────┘ │ └─────────────────────────────────────────────────┘

6.2 更新机械臂接口节点

在上一章的代码基础上,添加舵机夹爪服务:

创建文件 ~/ros2_ws/src/py_episode/py_episode/robot_interface_with_service.py

#!/usr/bin/env python3 """ 机械臂接口节点 - 初始化 + 发布电机角度 + 舵机夹爪服务 在上一章基础上新增 Service 功能 """ import time import rclpy from rclpy.node import Node from robot_arm_interfaces.msg import MotorAngles from robot_arm_interfaces.srv import ServoGripper # 新增:导入服务接口 # 导入电机控制核心库 from episode_controller_core.controller_core import MotorControl class RobotInterfaceNode(Node): """机械臂接口节点:初始化 + 发布关节角度 + 舵机夹爪服务""" def __init__(self): super().__init__('robot_interface_node') # ========== 配置参数 ========== self.declare_parameter('usb_index', 1) self.declare_parameter('init_mode', 1) self.declare_parameter('publish_rate', 30.0) self.usb_index = self.get_parameter('usb_index').get_parameter_value().integer_value self.init_mode = self.get_parameter('init_mode').get_parameter_value().integer_value self.publish_rate = self.get_parameter('publish_rate').get_parameter_value().double_value self.home_position = [180, 90, 83, 30, 110, 30] self.calibration_timeout = 120.0 self.cached_motor_angles = [0.0] * 6 # ========== 初始化电机控制 ========== self.motor_control = None self._initialize_robot() # ========== 创建 Publisher ========== self.publisher = self.create_publisher( MotorAngles, 'motor_angles', 10 ) # ========== 新增:创建舵机夹爪服务 ========== self.servo_gripper_srv = self.create_service( ServoGripper, # 服务类型 'servo_gripper', # 服务名称 self.servo_gripper_callback # 回调函数 ) self.get_logger().info('舵机夹爪服务已创建: /servo_gripper') # ========== 创建定时器 ========== timer_period = 1.0 / self.publish_rate self.timer = self.create_timer(timer_period, self.publish_motor_angles) self.get_logger().info(f'机械臂接口节点已启动,发布频率: {self.publish_rate} Hz') def _initialize_robot(self): """初始化机械臂""" mode_name = "回零校准" if self.init_mode == 0 else "从当前位置恢复" self.get_logger().info(f'开始初始化机械臂,usb_index={self.usb_index}, init_mode={self.init_mode} ({mode_name})') self.motor_control = MotorControl(self.usb_index) if not self.motor_control.initialized: self.get_logger().error('CAN 设备初始化失败') raise RuntimeError('CAN 设备初始化失败') self.get_logger().info('CAN 通信初始化成功') if self.init_mode == 1: self._recover_from_current() else: self._perform_calibration() def _recover_from_current(self): """从当前位置恢复""" self.get_logger().info('从当前位置恢复中...') current_angles = self.motor_control.read_motor_angles() if not current_angles: raise RuntimeError('无法读取电机角度') self.motor_control.last_degrees = [ current_angles[i] * self.motor_control.motor_ratios[i] for i in range(6) ] self.get_logger().info(f'成功从当前位置恢复: {current_angles}') def _perform_calibration(self): """执行回零校准""" self.get_logger().info('开始回零校准流程...') for joint_index, addr in enumerate(self.motor_control.motor_address): self.get_logger().info(f'正在校准关节 {joint_index + 1}/6 ...') self.motor_control.trigger_home(addr, 0x02) time.sleep(0.05) start_time = time.perf_counter() while True: if self.motor_control.check_home_status(addr): self.get_logger().info(f'关节 {joint_index + 1} 校准完成') break if time.perf_counter() - start_time > self.calibration_timeout: raise RuntimeError(f'关节 {joint_index + 1} 校准超时') time.sleep(0.5) self.get_logger().info(f'校准完成,正在移动到 Home 位置: {self.home_position}') time.sleep(1.0) move_time = self.motor_control.execute_motors_degrees_normal(self.home_position) time.sleep(move_time) self.get_logger().info('机械臂初始化完成') def publish_motor_angles(self): """定时器回调:发布电机角度""" if self.motor_control is None: return msg = MotorAngles() msg.timestamp = self.get_clock().now().to_msg() try: angles = self.motor_control.read_motor_angles() if angles and all(angle is not None for angle in angles): msg.angles = angles msg.success = True msg.message = "读取成功" self.cached_motor_angles = list(angles) else: msg.angles = self.cached_motor_angles msg.success = False msg.message = "读取失败" except Exception as e: msg.angles = self.cached_motor_angles msg.success = False msg.message = f"读取异常: {str(e)}" self.publisher.publish(msg) # ========== 新增:舵机夹爪服务回调 ========== def servo_gripper_callback(self, request, response): """ 舵机夹爪服务回调函数 参数: request.angle: 目标角度(0-110度) 返回: response.success: 是否成功 response.actual_angle: 实际角度 response.message: 状态信息 """ try: # 检查电机控制是否已初始化 if self.motor_control is None: response.success = False response.message = "电机控制未初始化" response.actual_angle = 0 self.get_logger().error(response.message) return response # 限制角度范围到 0-110 度 target_angle = request.angle clamped_angle = max(0, min(target_angle, 110)) # 调用舵机夹爪控制方法 self.motor_control.servo_gripper(clamped_angle) # 填充响应 response.success = True response.actual_angle = clamped_angle if target_angle != clamped_angle: response.message = f'角度已限制: 请求 {target_angle}° -> 实际 {clamped_angle}°' else: response.message = f'夹爪已设置到 {clamped_angle}°' self.get_logger().info(response.message) except Exception as e: response.success = False response.message = f'控制失败: {str(e)}' response.actual_angle = 0 self.get_logger().error(response.message) return response def main(): rclpy.init() try: node = RobotInterfaceNode() rclpy.spin(node) except RuntimeError as e: print(f'启动失败: {e}') except KeyboardInterrupt: pass finally: rclpy.shutdown() if __name__ == '__main__': main()

对比上一章的代码,新增了

新增内容 代码
导入服务接口 from robot_arm_interfaces.srv import ServoGripper
创建服务 self.create_service(ServoGripper, 'servo_gripper', ...)
服务回调 servo_gripper_callback(self, request, response)

6.3 实现夹爪控制客户端

为了方便测试和使用,我们创建一个独立的客户端节点,可以通过命令行参数控制夹爪。

创建文件 ~/ros2_ws/src/py_episode/py_episode/gripper_control_client.py

#!/usr/bin/env python3 """ 舵机夹爪控制客户端 - 通过命令行参数控制夹爪 演示如何在独立节点中调用 Service """ import sys import rclpy from rclpy.node import Node from robot_arm_interfaces.srv import ServoGripper class GripperControlClient(Node): """夹爪控制客户端节点""" def __init__(self): super().__init__('gripper_control_client') # 创建 Service Client self.client = self.create_client(ServoGripper, 'servo_gripper') # 等待服务可用 self.get_logger().info('等待舵机夹爪服务...') while not self.client.wait_for_service(timeout_sec=1.0): self.get_logger().info('服务不可用,继续等待...') self.get_logger().info('舵机夹爪服务已连接') def control_gripper(self, angle): """ 控制夹爪到指定角度 参数: angle: 目标角度(0-110度) """ # 创建请求 request = ServoGripper.Request() request.angle = angle self.get_logger().info(f'发送请求: 设置夹爪角度为 {angle}°') # 异步调用服务 future = self.client.call_async(request) # 等待响应 rclpy.spin_until_future_complete(self, future) # 处理响应 if future.result() is not None: response = future.result() if response.success: self.get_logger().info(f'✓ 成功: {response.message}') self.get_logger().info(f' 实际角度: {response.actual_angle}°') # 显示角度差异 if response.actual_angle != angle: diff = response.actual_angle - angle self.get_logger().warn(f' 注意: 请求角度已被限制 (差异: {diff:+.1f}°)') else: self.get_logger().error(f'✗ 失败: {response.message}') else: self.get_logger().error('✗ 服务调用失败') def main(): # 解析命令行参数 if len(sys.argv) < 2: print('用法: ros2 run py_episode gripper_ctrl <angle>') print('示例:') print(' ros2 run py_episode gripper_ctrl 0 # 完全张开') print(' ros2 run py_episode gripper_ctrl 50 # 半开') print(' ros2 run py_episode gripper_ctrl 110 # 完全闭合') sys.exit(1) try: angle = int(sys.argv[1]) except ValueError: print(f'错误: 角度必须是整数,但收到: {sys.argv[1]}') sys.exit(1) # 初始化节点 rclpy.init() node = GripperControlClient() try: # 控制夹爪 node.control_gripper(angle) except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()

代码说明

特性 实现
命令行参数 sys.argv[1] 获取角度
服务等待 wait_for_service() 确保 Server 可用
用户反馈 清晰的成功/失败提示,显示实际角度
错误处理 参数验证、异常捕获

6.4 添加入口点并运行

更新 setup.py

entry_points={ 'console_scripts': [ 'simple_pub = py_episode.simple_publisher:main', 'simple_sub = py_episode.simple_subscriber:main', 'joint_pub = py_episode.joint_state_publisher:main', 'joint_sub = py_episode.joint_state_subscriber:main', 'motor_pub = py_episode.motor_angles_publisher:main', 'motor_sub = py_episode.motor_angles_subscriber:main', 'simple_srv = py_episode.simple_service:main', 'simple_cli = py_episode.simple_client:main', 'gripper_srv = py_episode.gripper_service:main', 'gripper_cli = py_episode.gripper_client:main', 'robot_interface_srv = py_episode.robot_interface_with_service:main', # 新增 'gripper_ctrl = py_episode.gripper_control_client:main', # 新增 ], },

构建并运行

cd ~/ros2_ws colcon build --packages-select py_episode --symlink-install source install/setup.bash # 终端 1:启动机械臂接口节点(使用模式 1 跳过回零) ros2 run py_episode robot_interface_srv --ros-args -p usb_index:=1 -p init_mode:=1 # 终端 2:使用客户端控制夹爪 ros2 run py_episode gripper_ctrl 0 # 完全张开 ros2 run py_episode gripper_ctrl 50 # 半开 ros2 run py_episode gripper_ctrl 110 # 完全闭合 ros2 run py_episode gripper_ctrl 200 # 测试范围限制

6.5 运行效果

机械臂节点输出(终端 1):

[INFO] [robot_interface_node]: 开始初始化机械臂,usb_index=1, init_mode=1 (从当前位置恢复) [INFO] [robot_interface_node]: CAN 通信初始化成功 [INFO] [robot_interface_node]: 成功从当前位置恢复: [180.0, 90.0, 83.0, 30.0, 110.0, 30.0] [INFO] [robot_interface_node]: 舵机夹爪服务已创建: /servo_gripper [INFO] [robot_interface_node]: 机械臂接口节点已启动,发布频率: 30.0 Hz [INFO] [robot_interface_node]: 夹爪已设置到 50° [INFO] [robot_interface_node]: 夹爪已设置到 110° [INFO] [robot_interface_node]: 角度已限制: 请求 200° -> 实际 110°

客户端输出(终端 2):

# 设置角度为 50° $ ros2 run py_episode gripper_ctrl 50 [INFO] [gripper_control_client]: 等待舵机夹爪服务... [INFO] [gripper_control_client]: 舵机夹爪服务已连接 [INFO] [gripper_control_client]: 发送请求: 设置夹爪角度为 50° [INFO] [gripper_control_client]: ✓ 成功: 夹爪已设置到 50° [INFO] [gripper_control_client]: 实际角度: 50° # 完全闭合(110°) $ ros2 run py_episode gripper_ctrl 110 [INFO] [gripper_control_client]: 发送请求: 设置夹爪角度为 110° [INFO] [gripper_control_client]: ✓ 成功: 夹爪已设置到 110° [INFO] [gripper_control_client]: 实际角度: 110° # 测试范围限制(200° 超出范围) $ ros2 run py_episode gripper_ctrl 200 [INFO] [gripper_control_client]: 发送请求: 设置夹爪角度为 200° [INFO] [gripper_control_client]: ✓ 成功: 角度已限制: 请求 200° -> 实际 110° [INFO] [gripper_control_client]: 实际角度: 110° [WARN] [gripper_control_client]: 注意: 请求角度已被限制 (差异: -90.0°)

使用命令行调用(对比):

# 也可以使用 ROS2 命令行工具直接调用 ros2 service call /servo_gripper robot_arm_interfaces/srv/ServoGripper "{angle: 50}" # 输出: waiting for service to become available... requester: making request: robot_arm_interfaces.srv.ServoGripper_Request(angle=50) response: robot_arm_interfaces.srv.ServoGripper_Response(success=True, message='夹爪已设置到 50°', actual_angle=50)

对比两种调用方式

方式 优点 缺点 适用场景
命令行工具 快速测试,无需编码 输出原始,不易阅读 调试、快速验证
客户端节点 友好输出,可扩展逻辑 需要编写代码 生产使用、复杂逻辑

6.6 验证系统功能

# 1. 查看节点列表 ros2 node list # 输出: # /robot_interface_node # /gripper_control_client (运行时) # 2. 查看节点详细信息 ros2 node info /robot_interface_node # 输出包含: # Subscribers: (无) # Publishers: # /motor_angles: robot_arm_interfaces/msg/MotorAngles # Service Servers: # /servo_gripper: robot_arm_interfaces/srv/ServoGripper # ... # 3. 查看服务列表 ros2 service list # 输出包含: # /servo_gripper # 4. 查看服务类型 ros2 service type /servo_gripper # 输出: # robot_arm_interfaces/srv/ServoGripper # 5. 查看服务接口定义 ros2 interface show robot_arm_interfaces/srv/ServoGripper

6.7 客户端节点的优势

相比直接使用 ros2 service call 命令,独立的客户端节点有以下优势:

优势 说明
友好的输出 清晰的成功/失败提示,彩色日志
参数验证 命令行参数检查、类型转换
复杂逻辑 可添加重试、超时、批量操作
集成能力 可与其他 ROS2 功能结合(Topic、Action)
脚本化 易于集成到自动化脚本

扩展示例(批量控制):

# 在 main() 函数中添加批量控制逻辑 def main(): rclpy.init() node = GripperControlClient() try: # 批量测试不同角度 test_angles = [0, 30, 60, 90, 110] for angle in test_angles: node.get_logger().info(f'\n=== 测试角度: {angle}° ===') node.control_gripper(angle) time.sleep(1) # 间隔 1 秒 except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown()

6.8 常见问题排查

问题 可能原因 解决方法
服务调用超时 Server 节点未启动 先启动机械臂接口节点
"Service not available" 服务名称不匹配 检查 ros2 service list
夹爪无响应 舵机未连接 检查舵机接线
角度不准确 舵机需要校准 检查舵机零位
服务类型不匹配 未重新编译接口包 colcon build --packages-select robot_arm_interfaces
客户端立即退出 没有参数或参数错误 检查命令行用法

七、进阶:添加更多服务

在实际机械臂控制中,通常需要多个服务。以下是扩展思路:

具体实现代码可查看 Episode ROS2 包代码:robot_interface_node.py

7.1 Episode1 机械臂实现的服务:

服务名称 请求 响应 用途
/read_motor_angles angles[], success, message 读取当前角度
/servo_gripper angle success, actual_angle, message 控制舵机夹爪
/gripper_control enable success, message 负压吸盘开关
/set_free_mode enable success, message 设置自由拖动模式

7.2 服务接口定义示例

ReadMotorAngles.srv

# 请求部分(空,不需要参数) --- # 响应部分 float64[6] angles bool success string message

SetFreeMode.srv

# 请求部分 bool enable # true=自由模式, false=锁定模式 --- # 响应部分 bool success string message

7.3 在同一节点中添加多个服务

# 创建多个 Service self.read_angles_srv = self.create_service( ReadMotorAngles, 'read_motor_angles', self.read_angles_callback) self.servo_gripper_srv = self.create_service( ServoGripper, 'servo_gripper', self.servo_gripper_callback) self.free_mode_srv = self.create_service( SetFreeMode, 'set_free_mode', self.free_mode_callback)

八、总结

8.1 本章收获

完成本章后,你应该能够:

  • ✓ 理解 Service 的请求-响应通信模式
  • ✓ 编写 Service Server 和 Client 节点
  • ✓ 创建自定义 .srv 服务接口
  • ✓ 在机械臂节点中集成 Service 功能
  • ✓ 实现舵机夹爪控制服务

8.2 Topic vs Service 对比总结

特性 Topic Service
通信模式 发布-订阅(单向) 请求-响应(双向)
数据流 持续流 一次性
接口文件 .msg .srv
创建 Server create_publisher create_service
创建 Client create_subscription create_client
回调参数 msg request, response
适用场景 传感器数据、状态发布 命令执行、查询操作

8.3 关键代码模板

Service Server

# 创建服务 self.srv = self.create_service(ServiceType, 'service_name', self.callback) # 回调函数 def callback(self, request, response): # 处理请求 response.field = ... return response # 必须返回!

Service Client

# 创建客户端 self.cli = self.create_client(ServiceType, 'service_name') # 等待服务 self.cli.wait_for_service() # 调用服务 request = ServiceType.Request() future = self.cli.call_async(request) rclpy.spin_until_future_complete(self, future) response = future.result()

8.4 常用命令速查

# 查看服务 ros2 service list ros2 service type /servo_gripper ros2 interface show robot_arm_interfaces/srv/ServoGripper # 命令行调用服务 ros2 service call /servo_gripper robot_arm_interfaces/srv/ServoGripper "{angle: 50}" # 编译接口包 colcon build --packages-select robot_arm_interfaces

8.5 下一步

在下一章中,我们将学习:

  • Action 通信:适合长时间任务(如轨迹执行)
  • 自定义 Action:定义 .action 文件
  • 机械臂轨迹控制:通过 Action 执行运动轨迹