3.6 ROS2 Service 通信编程与机械臂集成
一、本章目标
在前两章中,我们学习了 Topic 通信(Publisher/Subscriber)和自定义消息(.msg)。Topic 适合持续发布的数据流(如关节角度),但有些场景需要一问一答的模式:
- 查询机械臂当前角度(问一次,答一次)
- 控制夹爪张合(发送指令,等待执行结果)
- 读取传感器数据(请求时才读取)
这就是 Service(服务) 通信模式。
1.1 学习路线
- 理解 Service vs Topic - 什么时候用哪种通信方式
- 编写极简 Service 和 Client - 用标准接口快速入门
- 自定义 Service 接口 - 定义
.srv文件 - 机械臂通信实战 - 实现舵机夹爪控制服务
1.2 本章目标
完成本章后,你将能够:
- ✓ 理解 Service 的请求-响应通信模式
- ✓ 编写 Service Server 和 Client 节点
- ✓ 创建自定义
.srv服务接口 - ✓ 实现机械臂舵机夹爪控制服务
二、Service vs Topic:什么时候用哪种?
2.1 回顾外卖系统类比
在第三章,我们用外卖系统类比 ROS2 通信:
| 通信方式 | 外卖系统类比 | 特点 |
|---|---|---|
| Topic | 骑手持续广播 GPS 位置 | 单向、持续、多对多 |
| Service | 顾客点击查询骑手位置 | 双向、一次性、一对一 |
Topic 模式:
骑手节点 ──────> /rider_gps ──────> 顾客节点
持续广播 持续接收
Service 模式:
顾客节点 ──请求──> /get_rider_location ──响应──> 顾客节点
"骑手在哪?" "骑手在 xx 路"
2.2 Service 的核心概念
Service 采用请求-响应(Request-Response) 模式:
┌─────────────┐ ┌─────────────┐ │ Client │ ─── Request ──────> │ Server │ │ (客户端) │ │ (服务端) │ │ │ <── Response ────── │ │ └─────────────┘ └─────────────┘
- Server(服务端):提供服务,等待请求,处理后返回响应
- Client(客户端):发起请求,等待响应
- Service Name:服务名称(类似话题名称)
- Service Type:服务类型(由
.srv文件定义)
2.3 何时使用 Service?
| 场景 | 推荐方式 | 原因 |
|---|---|---|
| 持续数据流(关节角度、图像) | Topic | 数据持续产生,需要实时传输 |
| 一次性操作(查询状态、执行命令) | Service | 请求时才需要,有明确的返回值 |
| 需要确认结果(夹爪是否夹紧) | Service | 需要知道操作是否成功 |
| 长时间任务(移动到目标点) | Action | 需要反馈进度,支持取消 |
机械臂场景:
| 功能 | 通信方式 | 理由 |
|---|---|---|
| 发布关节角度 | Topic | 持续、高频发布 |
| 读取当前角度 | Service | 按需查询 |
| 控制夹爪开合 | Service | 一次性命令,需要结果反馈 |
| 执行轨迹运动 | Action | 长时间任务,需要进度反馈 |
2.4 Service 接口的结构
Service 接口由 .srv 文件定义,包含请求(Request) 和响应(Response) 两部分,用 --- 分隔:
# 请求部分(Client 发送给 Server)
int64 a
int64 b
---
# 响应部分(Server 返回给 Client)
int64 sum
类比理解:
- 请求 = 顾客的点单内容
- 响应 = 餐厅返回的菜品
三、编写极简 Service 和 Client
和 Topic 一样,我们先用标准接口快速入门,再引入自定义接口。
3.1 使用标准服务接口
ROS2 提供了一些标准服务接口用于学习和测试。我们使用 example_interfaces/srv/AddTwoInts:
# 查看服务结构
ros2 interface show example_interfaces/srv/AddTwoInts
输出:
int64 a
int64 b
---
int64 sum
含义:
- 请求:两个整数
a和b - 响应:它们的和
sum
3.2 编写 Service Server
创建文件 ~/ros2_ws/src/py_episode/py_episode/simple_service.py:
#!/usr/bin/env python3
"""
极简 Service Server:提供两数相加服务
"""
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts # 标准服务接口
class SimpleService(Node):
"""简单服务端节点"""
def __init__(self):
super().__init__('simple_service')
# 创建 Service:服务名 /add_two_ints,类型 AddTwoInts,回调函数 callback
self.srv = self.create_service(
AddTwoInts, # 服务类型
'add_two_ints', # 服务名称
self.callback # 回调函数
)
self.get_logger().info('Service Server 已启动,等待请求...')
def callback(self, request, response):
"""
服务回调函数
参数:
request: 客户端发来的请求(包含 a 和 b)
response: 要返回给客户端的响应(包含 sum)
返回:
填充好的 response 对象
"""
response.sum = request.a + request.b
self.get_logger().info(f'收到请求: {request.a} + {request.b} = {response.sum}')
return response
def main():
rclpy.init()
node = SimpleService()
try:
rclpy.spin(node) # 保持节点运行,等待请求
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
核心代码解析:
# 创建 Service(3 个参数)
self.srv = self.create_service(
AddTwoInts, # 1. 服务类型
'add_two_ints', # 2. 服务名称
self.callback # 3. 回调函数
)
# 回调函数:处理请求,返回响应
def callback(self, request, response):
response.sum = request.a + request.b
return response # 必须返回 response!
对比 Topic Subscriber:
- Subscriber 回调只有
msg参数- Service 回调有
request和response两个参数- Service 回调必须 return response
3.3 编写 Service Client
创建文件 ~/ros2_ws/src/py_episode/py_episode/simple_client.py:
#!/usr/bin/env python3
"""
极简 Service Client:调用两数相加服务
"""
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts # 标准服务接口
class SimpleClient(Node):
"""简单客户端节点"""
def __init__(self):
super().__init__('simple_client')
# 创建 Client:服务名和类型必须与 Server 一致
self.cli = self.create_client(AddTwoInts, 'add_two_ints')
# 等待服务可用
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('服务不可用,等待中...')
self.get_logger().info('Service Client 已就绪')
def send_request(self, a, b):
"""发送请求并等待响应"""
# 创建请求对象
request = AddTwoInts.Request()
request.a = a
request.b = b
self.get_logger().info(f'发送请求: {a} + {b} = ?')
# 异步调用服务
future = self.cli.call_async(request)
# 等待响应完成
rclpy.spin_until_future_complete(self, future)
# 获取结果
response = future.result()
self.get_logger().info(f'收到响应: {a} + {b} = {response.sum}')
return response.sum
def main():
rclpy.init()
node = SimpleClient()
try:
# 发送几次请求测试
node.send_request(3, 5)
node.send_request(10, 20)
node.send_request(100, 200)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Client 与 Server 交互流程:
┌──────────────────────────────────────────────────────────────────────────────┐
│ Service Client 调用流程 │
└──────────────────────────────────────────────────────────────────────────────┘
Client 节点 Server 节点
────────── ──────────
│ │
│ ① create_client(ServiceType, 'name') │
│ 创建客户端 │
├────────────────────────────────────────────────────►│
│ │
│ ② wait_for_service(timeout_sec=1.0) │
│ 等待服务可用(循环检查) │
│◄───────────────────────────────────────────────────►│
│ │
│ ③ request = ServiceType.Request() │
│ 创建请求对象,填充字段 │
│ │
│ ④ future = cli.call_async(request) │
│ 发送异步请求 │
├────────────────────────────────────────────────────►│
│ ┌────────────────┤
│ │ callback() │
│ │ 处理请求 │
│ │ return response│
│ └────────────────┤
│ ⑤ spin_until_future_complete(node, future) │
│ 阻塞等待响应完成 │
│◄────────────────────────────────────────────────────┤
│ │
│ ⑥ response = future.result() │
│ 获取响应结果 │
│ │
▼ ▼
┌──────────────────────────────────────────────────────────────────────────────┐
│ 关键函数说明: │
│ • create_client() - 创建服务客户端,指定类型和服务名 │
│ • wait_for_service() - 阻塞等待服务端可用,返回 True/False │
│ • call_async() - 异步发送请求,返回 Future 对象 │
│ • spin_until_future_complete() - 阻塞直到 Future 完成(收到响应) │
│ • future.result() - 获取响应对象 │
└──────────────────────────────────────────────────────────────────────────────┘
核心代码解析:
# 创建 Client(2 个参数)
self.cli = self.create_client(AddTwoInts, 'add_two_ints')
# 等待服务可用(重要!否则调用会失败)
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('服务不可用,等待中...')
# 创建请求对象
request = AddTwoInts.Request()
request.a = a
request.b = b
# 异步调用并等待结果
future = self.cli.call_async(request)
rclpy.spin_until_future_complete(self, future)
response = future.result()
为什么用异步调用?
ROS2 推荐使用
call_async()异步调用服务,原因:
- 不会阻塞节点的其他回调
- 避免死锁问题
- 更灵活的超时控制
rclpy.spin_until_future_complete()会等待直到收到响应。
3.4 配置并运行
添加依赖(package.xml):
<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>
<exec_depend>sensor_msgs</exec_depend>
<exec_depend>robot_arm_interfaces</exec_depend>
<exec_depend>example_interfaces</exec_depend> <!-- 新增 -->
添加入口点(setup.py):
entry_points={
'console_scripts': [
'simple_pub = py_episode.simple_publisher:main',
'simple_sub = py_episode.simple_subscriber:main',
'joint_pub = py_episode.joint_state_publisher:main',
'joint_sub = py_episode.joint_state_subscriber:main',
'motor_pub = py_episode.motor_angles_publisher:main',
'motor_sub = py_episode.motor_angles_subscriber:main',
'simple_srv = py_episode.simple_service:main', # 新增
'simple_cli = py_episode.simple_client:main', # 新增
],
},
构建并运行:
cd ~/ros2_ws
colcon build --packages-select py_episode --symlink-install
source install/setup.bash
# 终端 1:启动 Service Server
ros2 run py_episode simple_srv
# 终端 2:启动 Service Client
ros2 run py_episode simple_cli
运行效果:
# Server 输出
[INFO] [simple_service]: Service Server 已启动,等待请求...
[INFO] [simple_service]: 收到请求: 3 + 5 = 8
[INFO] [simple_service]: 收到请求: 10 + 20 = 30
[INFO] [simple_service]: 收到请求: 100 + 200 = 300
# Client 输出
[INFO] [simple_client]: Service Client 已就绪
[INFO] [simple_client]: 发送请求: 3 + 5 = ?
[INFO] [simple_client]: 收到响应: 3 + 5 = 8
[INFO] [simple_client]: 发送请求: 10 + 20 = ?
[INFO] [simple_client]: 收到响应: 10 + 20 = 30
[INFO] [simple_client]: 发送请求: 100 + 200 = ?
[INFO] [simple_client]: 收到响应: 100 + 200 = 300
3.5 使用命令行工具调试
# 查看服务列表
ros2 service list
# 输出: /add_two_ints
# 查看服务类型
ros2 service type /add_two_ints
# 输出: example_interfaces/srv/AddTwoInts
# 命令行调用服务
ros2 service call /add_two_ints example_interfaces/srv/AddTwoInts "{a: 10, b: 32}"
# 输出:
# requester: making request: example_interfaces.srv.AddTwoInts_Request(a=10, b=32)
# response: example_interfaces.srv.AddTwoInts_Response(sum=42)
3.6 Service vs Topic 核心对比
| 特性 | Topic | Service |
|---|---|---|
| 通信模式 | 发布-订阅(单向) | 请求-响应(双向) |
| 创建方法 | create_publisher / create_subscription |
create_service / create_client |
| 回调参数 | msg |
request, response |
| 返回值 | 无 | 必须 return response |
| 调用方式 | publish(msg) |
call_async(request) |
| 适用场景 | 持续数据流 | 一次性操作 |
四、自定义 Service 接口
和自定义消息(.msg)类似,我们可以创建自定义服务接口(.srv)来满足特定需求。
4.1 极简设计 vs 自定义设计
先对比一下两种接口设计风格:
AddTwoInts(极简设计) ServoGripper(真实场景设计)
──────────────────── ────────────────────────
# 请求 # 请求
int64 a int32 angle
int64 b
--- ---
# 响应 # 响应
int64 sum bool success ← 状态标志
string message ← 错误/状态信息
int32 actual_angle ← 实际执行值
真实场景响应中始终包含
success和message字段,让调用方知道操作是否成功以及失败原因。
4.2 为什么需要自定义 Service?
标准接口 AddTwoInts 只能做加法,但机械臂控制需要更丰富的功能:
| 需求 | 标准接口能否满足? |
|---|---|
| 控制夹爪角度(0-110 度) | 需要自定义请求字段 |
| 返回实际角度和执行状态 | 需要自定义响应字段 |
| 错误信息反馈 | 需要自定义错误描述 |
4.3 创建 .srv 文件
我们来创建一个 ServoGripper.srv,用于控制舵机夹爪:
步骤 1:创建 srv 目录
cd ~/ros2_ws/src/robot_arm_interfaces
mkdir -p srv
步骤 2:创建 ServoGripper.srv
nano srv/ServoGripper.srv
写入以下内容:
# ServoGripper.srv
# 舵机夹爪控制服务
# ========== 请求部分 ==========
# 目标夹爪角度(0-110度,0=完全张开,110=完全闭合)
int32 angle
---
# ========== 响应部分 ==========
# 是否执行成功
bool success
# 状态信息或错误说明
string message
# 实际设置的角度(经过 0-110 范围限制后的值)
int32 actual_angle
设计说明:
| 字段 | 类型 | 说明 |
|---|---|---|
| 请求 | ||
angle |
int32 |
目标夹爪角度(0-110 度) |
| 响应 | ||
success |
bool |
执行是否成功 |
message |
string |
状态信息或错误描述 |
actual_angle |
int32 |
实际设置的角度(经过范围限制后) |
为什么响应包含
actual_angle?用户可能请求
angle=200(超出范围),Server 会将其限制到110。
返回actual_angle让 Client 知道实际执行的值。
4.4 修改 CMakeLists.txt
打开 ~/ros2_ws/src/robot_arm_interfaces/CMakeLists.txt,添加 .srv 文件:
cmake_minimum_required(VERSION 3.8)
project(robot_arm_interfaces)
# 编译器警告选项(C++ 代码质量检查)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# ========== 查找依赖包 ==========
find_package(ament_cmake REQUIRED) # ROS2 CMake 构建工具
find_package(builtin_interfaces REQUIRED) # 内置接口类型(Time, Duration 等)
# ========== 自定义消息生成 ==========
find_package(rosidl_default_generators REQUIRED) # 消息代码生成器
# 声明要生成的消息文件(可以添加多个 .msg 文件)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/MotorAngles.msg"
"srv/ServoGripper.srv" # 新增:舵机夹爪服务
DEPENDENCIES builtin_interfaces
)
# ========== 测试配置(可选)==========
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
# 以下行跳过版权检查(如果没有添加版权信息,可以取消注释)
# set(ament_cmake_copyright_FOUND TRUE)
# 以下行跳过 cpplint 检查(如果不是 git 仓库,可以取消注释)
# set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
# ========== 导出包信息 ==========
ament_package()
4.5 编译并验证
cd ~/ros2_ws
colcon build --packages-select robot_arm_interfaces
source install/setup.bash
# 验证服务接口
ros2 interface show robot_arm_interfaces/srv/ServoGripper
输出:
# ========== 请求部分 ==========
# 目标夹爪角度(0-110度,0=完全张开,110=完全闭合)
int32 angle
---
# ========== 响应部分 ==========
# 是否执行成功
bool success
# 状态信息或错误说明
string message
# 实际设置的角度(经过 0-110 范围限制后的值)
int32 actual_angle
成功! 自定义服务接口已经可以使用了。
4.6 查看生成的文件(可选)
# Python 生成的代码位置
ls ~/ros2_ws/install/robot_arm_interfaces/lib/python3.*/site-packages/robot_arm_interfaces/srv/
# 应该看到:
# _servo_gripper.py
# __init__.py
生成的代码包含两个类:
ServoGripper.Request:请求类(包含angle字段)ServoGripper.Response:响应类(包含success、message、actual_angle字段)
五、使用自定义 Service 重构代码
现在我们用自定义的 ServoGripper 服务来控制夹爪。
5.1 编写 Gripper Service Server(模拟版)
先用模拟版验证接口,不连接真实硬件:
创建文件 ~/ros2_ws/src/py_episode/py_episode/gripper_service.py:
#!/usr/bin/env python3
"""
舵机夹爪服务端(模拟版)
演示如何使用自定义 Service 接口
"""
import rclpy
from rclpy.node import Node
from robot_arm_interfaces.srv import ServoGripper # 自定义服务接口
class GripperService(Node):
"""舵机夹爪服务端节点"""
def __init__(self):
super().__init__('gripper_service')
# 创建 Service
self.srv = self.create_service(
ServoGripper, # 服务类型(自定义)
'servo_gripper', # 服务名称
self.servo_gripper_callback # 回调函数
)
# 记录当前夹爪角度
self.current_angle = 0
self.get_logger().info('舵机夹爪服务已启动(模拟版)')
def servo_gripper_callback(self, request, response):
"""
舵机夹爪控制回调函数
参数:
request.angle: 目标角度(0-110度)
返回:
response.success: 是否成功
response.message: 状态信息
response.actual_angle: 实际设置的角度
"""
try:
# 限制角度范围到 0-110 度
target_angle = request.angle
clamped_angle = max(0, min(target_angle, 110))
# 模拟执行(实际硬件时这里调用 motor_control.servo_gripper())
self.current_angle = clamped_angle
# 填充响应
response.success = True
response.actual_angle = clamped_angle
if target_angle != clamped_angle:
response.message = f'角度已限制: 请求 {target_angle}° -> 实际 {clamped_angle}°'
else:
response.message = f'夹爪已设置到 {clamped_angle}°'
self.get_logger().info(response.message)
except Exception as e:
response.success = False
response.actual_angle = self.current_angle
response.message = f'执行失败: {str(e)}'
self.get_logger().error(response.message)
return response
def main():
rclpy.init()
node = GripperService()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
5.2 编写 Gripper Service Client
创建文件 ~/ros2_ws/src/py_episode/py_episode/gripper_client.py:
#!/usr/bin/env python3
"""
舵机夹爪客户端
演示如何调用自定义 Service
"""
import sys
import rclpy
from rclpy.node import Node
from robot_arm_interfaces.srv import ServoGripper # 自定义服务接口
class GripperClient(Node):
"""舵机夹爪客户端节点"""
def __init__(self):
super().__init__('gripper_client')
# 创建 Client
self.cli = self.create_client(ServoGripper, 'servo_gripper')
# 等待服务可用
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('等待舵机夹爪服务...')
self.get_logger().info('舵机夹爪客户端已就绪')
def send_request(self, angle):
"""
发送夹爪控制请求
参数:
angle: 目标角度(0-110度)
返回:
(success, actual_angle, message) 元组
"""
# 创建请求
request = ServoGripper.Request()
request.angle = angle
self.get_logger().info(f'发送请求: 设置夹爪角度为 {angle}°')
# 异步调用
future = self.cli.call_async(request)
rclpy.spin_until_future_complete(self, future)
# 获取响应
response = future.result()
if response.success:
self.get_logger().info(f'成功: {response.message}')
else:
self.get_logger().error(f'失败: {response.message}')
return response.success, response.actual_angle, response.message
def main():
rclpy.init()
node = GripperClient()
# 从命令行获取角度,默认为 50
angle = int(sys.argv[1]) if len(sys.argv) > 1 else 50
try:
node.send_request(angle)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
5.3 配置并运行
更新入口点(setup.py):
entry_points={
'console_scripts': [
'simple_pub = py_episode.simple_publisher:main',
'simple_sub = py_episode.simple_subscriber:main',
'joint_pub = py_episode.joint_state_publisher:main',
'joint_sub = py_episode.joint_state_subscriber:main',
'motor_pub = py_episode.motor_angles_publisher:main',
'motor_sub = py_episode.motor_angles_subscriber:main',
'simple_srv = py_episode.simple_service:main',
'simple_cli = py_episode.simple_client:main',
'gripper_srv = py_episode.gripper_service:main', # 新增
'gripper_cli = py_episode.gripper_client:main', # 新增
],
},
构建并运行:
cd ~/ros2_ws
colcon build --packages-select py_episode --symlink-install
source install/setup.bash
# 终端 1:启动 Service Server
ros2 run py_episode gripper_srv
# 终端 2:调用服务
ros2 run py_episode gripper_cli 50 # 设置 50 度
ros2 run py_episode gripper_cli 0 # 完全张开
ros2 run py_episode gripper_cli 110 # 完全闭合
ros2 run py_episode gripper_cli 200 # 测试范围限制
运行效果:
# Server 输出
[INFO] [gripper_service]: 舵机夹爪服务已启动(模拟版)
[INFO] [gripper_service]: 夹爪已设置到 50°
[INFO] [gripper_service]: 夹爪已设置到 0°
[INFO] [gripper_service]: 夹爪已设置到 110°
[INFO] [gripper_service]: 角度已限制: 请求 200° -> 实际 110°
# Client 输出
[INFO] [gripper_client]: 舵机夹爪客户端已就绪
[INFO] [gripper_client]: 发送请求: 设置夹爪角度为 50°
[INFO] [gripper_client]: 成功: 夹爪已设置到 50°
5.4 使用命令行调用服务
# 查看服务
ros2 service list
# 输出: /servo_gripper
# 命令行调用
ros2 service call /servo_gripper robot_arm_interfaces/srv/ServoGripper "{angle: 75}"
# 输出:
waiting for service to become available...
requester: making request: robot_arm_interfaces.srv.ServoGripper_Request(angle=75)
response:
robot_arm_interfaces.srv.ServoGripper_Response(success=True, message='夹爪已设置到 75°', actual_angle=75)
六、机械臂 Service 通信实战
现在,我们将自定义 Service 集成到真实的机械臂控制节点中。
注意舵机夹爪长时间堵转、夹取物体,可能温度会过高烧坏舵机!
6.1 集成架构
我们将在上一章的 RobotInterfaceNode 基础上,添加舵机夹爪服务:
┌─────────────────────────────────────────────────┐
│ RobotInterfaceNode │
│ │
│ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ Topic │ │ Service │ │
│ │ Publisher │ │ Server │ │
│ │ │ │ │ │
│ │ /motor_ │ │ /servo_gripper │ │
│ │ angles │ │ │ │
│ └──────┬──────┘ └───────────┬─────────────┘ │
│ │ │ │
│ └───────────┬───────────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ MotorControl │ │
│ │ (CAN 通信) │ │
│ └─────────────┘ │
└─────────────────────────────────────────────────┘
6.2 更新机械臂接口节点
在上一章的代码基础上,添加舵机夹爪服务:
创建文件 ~/ros2_ws/src/py_episode/py_episode/robot_interface_with_service.py:
#!/usr/bin/env python3
"""
机械臂接口节点 - 初始化 + 发布电机角度 + 舵机夹爪服务
在上一章基础上新增 Service 功能
"""
import time
import rclpy
from rclpy.node import Node
from robot_arm_interfaces.msg import MotorAngles
from robot_arm_interfaces.srv import ServoGripper # 新增:导入服务接口
# 导入电机控制核心库
from episode_controller_core.controller_core import MotorControl
class RobotInterfaceNode(Node):
"""机械臂接口节点:初始化 + 发布关节角度 + 舵机夹爪服务"""
def __init__(self):
super().__init__('robot_interface_node')
# ========== 配置参数 ==========
self.declare_parameter('usb_index', 1)
self.declare_parameter('init_mode', 1)
self.declare_parameter('publish_rate', 30.0)
self.usb_index = self.get_parameter('usb_index').get_parameter_value().integer_value
self.init_mode = self.get_parameter('init_mode').get_parameter_value().integer_value
self.publish_rate = self.get_parameter('publish_rate').get_parameter_value().double_value
self.home_position = [180, 90, 83, 30, 110, 30]
self.calibration_timeout = 120.0
self.cached_motor_angles = [0.0] * 6
# ========== 初始化电机控制 ==========
self.motor_control = None
self._initialize_robot()
# ========== 创建 Publisher ==========
self.publisher = self.create_publisher(
MotorAngles,
'motor_angles',
10
)
# ========== 新增:创建舵机夹爪服务 ==========
self.servo_gripper_srv = self.create_service(
ServoGripper, # 服务类型
'servo_gripper', # 服务名称
self.servo_gripper_callback # 回调函数
)
self.get_logger().info('舵机夹爪服务已创建: /servo_gripper')
# ========== 创建定时器 ==========
timer_period = 1.0 / self.publish_rate
self.timer = self.create_timer(timer_period, self.publish_motor_angles)
self.get_logger().info(f'机械臂接口节点已启动,发布频率: {self.publish_rate} Hz')
def _initialize_robot(self):
"""初始化机械臂"""
mode_name = "回零校准" if self.init_mode == 0 else "从当前位置恢复"
self.get_logger().info(f'开始初始化机械臂,usb_index={self.usb_index}, init_mode={self.init_mode} ({mode_name})')
self.motor_control = MotorControl(self.usb_index)
if not self.motor_control.initialized:
self.get_logger().error('CAN 设备初始化失败')
raise RuntimeError('CAN 设备初始化失败')
self.get_logger().info('CAN 通信初始化成功')
if self.init_mode == 1:
self._recover_from_current()
else:
self._perform_calibration()
def _recover_from_current(self):
"""从当前位置恢复"""
self.get_logger().info('从当前位置恢复中...')
current_angles = self.motor_control.read_motor_angles()
if not current_angles:
raise RuntimeError('无法读取电机角度')
self.motor_control.last_degrees = [
current_angles[i] * self.motor_control.motor_ratios[i]
for i in range(6)
]
self.get_logger().info(f'成功从当前位置恢复: {current_angles}')
def _perform_calibration(self):
"""执行回零校准"""
self.get_logger().info('开始回零校准流程...')
for joint_index, addr in enumerate(self.motor_control.motor_address):
self.get_logger().info(f'正在校准关节 {joint_index + 1}/6 ...')
self.motor_control.trigger_home(addr, 0x02)
time.sleep(0.05)
start_time = time.perf_counter()
while True:
if self.motor_control.check_home_status(addr):
self.get_logger().info(f'关节 {joint_index + 1} 校准完成')
break
if time.perf_counter() - start_time > self.calibration_timeout:
raise RuntimeError(f'关节 {joint_index + 1} 校准超时')
time.sleep(0.5)
self.get_logger().info(f'校准完成,正在移动到 Home 位置: {self.home_position}')
time.sleep(1.0)
move_time = self.motor_control.execute_motors_degrees_normal(self.home_position)
time.sleep(move_time)
self.get_logger().info('机械臂初始化完成')
def publish_motor_angles(self):
"""定时器回调:发布电机角度"""
if self.motor_control is None:
return
msg = MotorAngles()
msg.timestamp = self.get_clock().now().to_msg()
try:
angles = self.motor_control.read_motor_angles()
if angles and all(angle is not None for angle in angles):
msg.angles = angles
msg.success = True
msg.message = "读取成功"
self.cached_motor_angles = list(angles)
else:
msg.angles = self.cached_motor_angles
msg.success = False
msg.message = "读取失败"
except Exception as e:
msg.angles = self.cached_motor_angles
msg.success = False
msg.message = f"读取异常: {str(e)}"
self.publisher.publish(msg)
# ========== 新增:舵机夹爪服务回调 ==========
def servo_gripper_callback(self, request, response):
"""
舵机夹爪服务回调函数
参数:
request.angle: 目标角度(0-110度)
返回:
response.success: 是否成功
response.actual_angle: 实际角度
response.message: 状态信息
"""
try:
# 检查电机控制是否已初始化
if self.motor_control is None:
response.success = False
response.message = "电机控制未初始化"
response.actual_angle = 0
self.get_logger().error(response.message)
return response
# 限制角度范围到 0-110 度
target_angle = request.angle
clamped_angle = max(0, min(target_angle, 110))
# 调用舵机夹爪控制方法
self.motor_control.servo_gripper(clamped_angle)
# 填充响应
response.success = True
response.actual_angle = clamped_angle
if target_angle != clamped_angle:
response.message = f'角度已限制: 请求 {target_angle}° -> 实际 {clamped_angle}°'
else:
response.message = f'夹爪已设置到 {clamped_angle}°'
self.get_logger().info(response.message)
except Exception as e:
response.success = False
response.message = f'控制失败: {str(e)}'
response.actual_angle = 0
self.get_logger().error(response.message)
return response
def main():
rclpy.init()
try:
node = RobotInterfaceNode()
rclpy.spin(node)
except RuntimeError as e:
print(f'启动失败: {e}')
except KeyboardInterrupt:
pass
finally:
rclpy.shutdown()
if __name__ == '__main__':
main()
对比上一章的代码,新增了:
| 新增内容 | 代码 |
|---|---|
| 导入服务接口 | from robot_arm_interfaces.srv import ServoGripper |
| 创建服务 | self.create_service(ServoGripper, 'servo_gripper', ...) |
| 服务回调 | servo_gripper_callback(self, request, response) |
6.3 实现夹爪控制客户端
为了方便测试和使用,我们创建一个独立的客户端节点,可以通过命令行参数控制夹爪。
创建文件 ~/ros2_ws/src/py_episode/py_episode/gripper_control_client.py:
#!/usr/bin/env python3
"""
舵机夹爪控制客户端 - 通过命令行参数控制夹爪
演示如何在独立节点中调用 Service
"""
import sys
import rclpy
from rclpy.node import Node
from robot_arm_interfaces.srv import ServoGripper
class GripperControlClient(Node):
"""夹爪控制客户端节点"""
def __init__(self):
super().__init__('gripper_control_client')
# 创建 Service Client
self.client = self.create_client(ServoGripper, 'servo_gripper')
# 等待服务可用
self.get_logger().info('等待舵机夹爪服务...')
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('服务不可用,继续等待...')
self.get_logger().info('舵机夹爪服务已连接')
def control_gripper(self, angle):
"""
控制夹爪到指定角度
参数:
angle: 目标角度(0-110度)
"""
# 创建请求
request = ServoGripper.Request()
request.angle = angle
self.get_logger().info(f'发送请求: 设置夹爪角度为 {angle}°')
# 异步调用服务
future = self.client.call_async(request)
# 等待响应
rclpy.spin_until_future_complete(self, future)
# 处理响应
if future.result() is not None:
response = future.result()
if response.success:
self.get_logger().info(f'✓ 成功: {response.message}')
self.get_logger().info(f' 实际角度: {response.actual_angle}°')
# 显示角度差异
if response.actual_angle != angle:
diff = response.actual_angle - angle
self.get_logger().warn(f' 注意: 请求角度已被限制 (差异: {diff:+.1f}°)')
else:
self.get_logger().error(f'✗ 失败: {response.message}')
else:
self.get_logger().error('✗ 服务调用失败')
def main():
# 解析命令行参数
if len(sys.argv) < 2:
print('用法: ros2 run py_episode gripper_ctrl <angle>')
print('示例:')
print(' ros2 run py_episode gripper_ctrl 0 # 完全张开')
print(' ros2 run py_episode gripper_ctrl 50 # 半开')
print(' ros2 run py_episode gripper_ctrl 110 # 完全闭合')
sys.exit(1)
try:
angle = int(sys.argv[1])
except ValueError:
print(f'错误: 角度必须是整数,但收到: {sys.argv[1]}')
sys.exit(1)
# 初始化节点
rclpy.init()
node = GripperControlClient()
try:
# 控制夹爪
node.control_gripper(angle)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
代码说明:
| 特性 | 实现 |
|---|---|
| 命令行参数 | sys.argv[1] 获取角度 |
| 服务等待 | wait_for_service() 确保 Server 可用 |
| 用户反馈 | 清晰的成功/失败提示,显示实际角度 |
| 错误处理 | 参数验证、异常捕获 |
6.4 添加入口点并运行
更新 setup.py:
entry_points={
'console_scripts': [
'simple_pub = py_episode.simple_publisher:main',
'simple_sub = py_episode.simple_subscriber:main',
'joint_pub = py_episode.joint_state_publisher:main',
'joint_sub = py_episode.joint_state_subscriber:main',
'motor_pub = py_episode.motor_angles_publisher:main',
'motor_sub = py_episode.motor_angles_subscriber:main',
'simple_srv = py_episode.simple_service:main',
'simple_cli = py_episode.simple_client:main',
'gripper_srv = py_episode.gripper_service:main',
'gripper_cli = py_episode.gripper_client:main',
'robot_interface_srv = py_episode.robot_interface_with_service:main', # 新增
'gripper_ctrl = py_episode.gripper_control_client:main', # 新增
],
},
构建并运行:
cd ~/ros2_ws
colcon build --packages-select py_episode --symlink-install
source install/setup.bash
# 终端 1:启动机械臂接口节点(使用模式 1 跳过回零)
ros2 run py_episode robot_interface_srv --ros-args -p usb_index:=1 -p init_mode:=1
# 终端 2:使用客户端控制夹爪
ros2 run py_episode gripper_ctrl 0 # 完全张开
ros2 run py_episode gripper_ctrl 50 # 半开
ros2 run py_episode gripper_ctrl 110 # 完全闭合
ros2 run py_episode gripper_ctrl 200 # 测试范围限制
6.5 运行效果
机械臂节点输出(终端 1):
[INFO] [robot_interface_node]: 开始初始化机械臂,usb_index=1, init_mode=1 (从当前位置恢复) [INFO] [robot_interface_node]: CAN 通信初始化成功 [INFO] [robot_interface_node]: 成功从当前位置恢复: [180.0, 90.0, 83.0, 30.0, 110.0, 30.0] [INFO] [robot_interface_node]: 舵机夹爪服务已创建: /servo_gripper [INFO] [robot_interface_node]: 机械臂接口节点已启动,发布频率: 30.0 Hz [INFO] [robot_interface_node]: 夹爪已设置到 50° [INFO] [robot_interface_node]: 夹爪已设置到 110° [INFO] [robot_interface_node]: 角度已限制: 请求 200° -> 实际 110°
客户端输出(终端 2):
# 设置角度为 50°
$ ros2 run py_episode gripper_ctrl 50
[INFO] [gripper_control_client]: 等待舵机夹爪服务...
[INFO] [gripper_control_client]: 舵机夹爪服务已连接
[INFO] [gripper_control_client]: 发送请求: 设置夹爪角度为 50°
[INFO] [gripper_control_client]: ✓ 成功: 夹爪已设置到 50°
[INFO] [gripper_control_client]: 实际角度: 50°
# 完全闭合(110°)
$ ros2 run py_episode gripper_ctrl 110
[INFO] [gripper_control_client]: 发送请求: 设置夹爪角度为 110°
[INFO] [gripper_control_client]: ✓ 成功: 夹爪已设置到 110°
[INFO] [gripper_control_client]: 实际角度: 110°
# 测试范围限制(200° 超出范围)
$ ros2 run py_episode gripper_ctrl 200
[INFO] [gripper_control_client]: 发送请求: 设置夹爪角度为 200°
[INFO] [gripper_control_client]: ✓ 成功: 角度已限制: 请求 200° -> 实际 110°
[INFO] [gripper_control_client]: 实际角度: 110°
[WARN] [gripper_control_client]: 注意: 请求角度已被限制 (差异: -90.0°)
使用命令行调用(对比):
# 也可以使用 ROS2 命令行工具直接调用
ros2 service call /servo_gripper robot_arm_interfaces/srv/ServoGripper "{angle: 50}"
# 输出:
waiting for service to become available...
requester: making request: robot_arm_interfaces.srv.ServoGripper_Request(angle=50)
response:
robot_arm_interfaces.srv.ServoGripper_Response(success=True, message='夹爪已设置到 50°', actual_angle=50)
对比两种调用方式:
| 方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 命令行工具 | 快速测试,无需编码 | 输出原始,不易阅读 | 调试、快速验证 |
| 客户端节点 | 友好输出,可扩展逻辑 | 需要编写代码 | 生产使用、复杂逻辑 |
6.6 验证系统功能
# 1. 查看节点列表
ros2 node list
# 输出:
# /robot_interface_node
# /gripper_control_client (运行时)
# 2. 查看节点详细信息
ros2 node info /robot_interface_node
# 输出包含:
# Subscribers: (无)
# Publishers:
# /motor_angles: robot_arm_interfaces/msg/MotorAngles
# Service Servers:
# /servo_gripper: robot_arm_interfaces/srv/ServoGripper
# ...
# 3. 查看服务列表
ros2 service list
# 输出包含:
# /servo_gripper
# 4. 查看服务类型
ros2 service type /servo_gripper
# 输出:
# robot_arm_interfaces/srv/ServoGripper
# 5. 查看服务接口定义
ros2 interface show robot_arm_interfaces/srv/ServoGripper
6.7 客户端节点的优势
相比直接使用 ros2 service call 命令,独立的客户端节点有以下优势:
| 优势 | 说明 |
|---|---|
| 友好的输出 | 清晰的成功/失败提示,彩色日志 |
| 参数验证 | 命令行参数检查、类型转换 |
| 复杂逻辑 | 可添加重试、超时、批量操作 |
| 集成能力 | 可与其他 ROS2 功能结合(Topic、Action) |
| 脚本化 | 易于集成到自动化脚本 |
扩展示例(批量控制):
# 在 main() 函数中添加批量控制逻辑
def main():
rclpy.init()
node = GripperControlClient()
try:
# 批量测试不同角度
test_angles = [0, 30, 60, 90, 110]
for angle in test_angles:
node.get_logger().info(f'\n=== 测试角度: {angle}° ===')
node.control_gripper(angle)
time.sleep(1) # 间隔 1 秒
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
6.8 常见问题排查
| 问题 | 可能原因 | 解决方法 |
|---|---|---|
| 服务调用超时 | Server 节点未启动 | 先启动机械臂接口节点 |
| "Service not available" | 服务名称不匹配 | 检查 ros2 service list |
| 夹爪无响应 | 舵机未连接 | 检查舵机接线 |
| 角度不准确 | 舵机需要校准 | 检查舵机零位 |
| 服务类型不匹配 | 未重新编译接口包 | colcon build --packages-select robot_arm_interfaces |
| 客户端立即退出 | 没有参数或参数错误 | 检查命令行用法 |
七、进阶:添加更多服务
在实际机械臂控制中,通常需要多个服务。以下是扩展思路:
具体实现代码可查看 Episode ROS2 包代码:
robot_interface_node.py
7.1 Episode1 机械臂实现的服务:
| 服务名称 | 请求 | 响应 | 用途 |
|---|---|---|---|
/read_motor_angles |
无 | angles[], success, message |
读取当前角度 |
/servo_gripper |
angle |
success, actual_angle, message |
控制舵机夹爪 |
/gripper_control |
enable |
success, message |
负压吸盘开关 |
/set_free_mode |
enable |
success, message |
设置自由拖动模式 |
7.2 服务接口定义示例
ReadMotorAngles.srv:
# 请求部分(空,不需要参数)
---
# 响应部分
float64[6] angles
bool success
string message
SetFreeMode.srv:
# 请求部分
bool enable # true=自由模式, false=锁定模式
---
# 响应部分
bool success
string message
7.3 在同一节点中添加多个服务
# 创建多个 Service
self.read_angles_srv = self.create_service(
ReadMotorAngles, 'read_motor_angles', self.read_angles_callback)
self.servo_gripper_srv = self.create_service(
ServoGripper, 'servo_gripper', self.servo_gripper_callback)
self.free_mode_srv = self.create_service(
SetFreeMode, 'set_free_mode', self.free_mode_callback)
八、总结
8.1 本章收获
完成本章后,你应该能够:
- ✓ 理解 Service 的请求-响应通信模式
- ✓ 编写 Service Server 和 Client 节点
- ✓ 创建自定义
.srv服务接口 - ✓ 在机械臂节点中集成 Service 功能
- ✓ 实现舵机夹爪控制服务
8.2 Topic vs Service 对比总结
| 特性 | Topic | Service |
|---|---|---|
| 通信模式 | 发布-订阅(单向) | 请求-响应(双向) |
| 数据流 | 持续流 | 一次性 |
| 接口文件 | .msg |
.srv |
| 创建 Server | create_publisher |
create_service |
| 创建 Client | create_subscription |
create_client |
| 回调参数 | msg |
request, response |
| 适用场景 | 传感器数据、状态发布 | 命令执行、查询操作 |
8.3 关键代码模板
Service Server:
# 创建服务
self.srv = self.create_service(ServiceType, 'service_name', self.callback)
# 回调函数
def callback(self, request, response):
# 处理请求
response.field = ...
return response # 必须返回!
Service Client:
# 创建客户端
self.cli = self.create_client(ServiceType, 'service_name')
# 等待服务
self.cli.wait_for_service()
# 调用服务
request = ServiceType.Request()
future = self.cli.call_async(request)
rclpy.spin_until_future_complete(self, future)
response = future.result()
8.4 常用命令速查
# 查看服务
ros2 service list
ros2 service type /servo_gripper
ros2 interface show robot_arm_interfaces/srv/ServoGripper
# 命令行调用服务
ros2 service call /servo_gripper robot_arm_interfaces/srv/ServoGripper "{angle: 50}"
# 编译接口包
colcon build --packages-select robot_arm_interfaces
8.5 下一步
在下一章中,我们将学习:
- Action 通信:适合长时间任务(如轨迹执行)
- 自定义 Action:定义
.action文件 - 机械臂轨迹控制:通过 Action 执行运动轨迹