3.4 ROS2 Topic 通信编程实战
一、本章目标
在上一章中,我们学习了 ROS2 的核心概念(Nodes、Topics、Services、Actions、TF2),并通过命令行工具进行了实践。本章将带你动手编写代码,用 Python 实现自己的 Publisher 和 Subscriber 节点。
1.1 学习路线
按照以下顺序逐步掌握 ROS2 开发:
- 使用 colcon 构建工具 - ROS2 官方推荐的构建系统
- 创建工作空间(Workspace) - 管理 ROS2 项目的目录结构
- 创建功能包(Package) - ROS2 代码的组织单元
- 编写 Publisher 和 Subscriber - 实现节点间通信
1.2 为什么要学习这些?
回顾外卖系统类比:
还记得第三章的外卖系统吗?
- 骑手节点持续广播 GPS 位置(Publisher)
- 顾客节点实时接收位置信息(Subscriber)
在实际机器人项目中:
- 相机节点发布图像数据(Publisher)
- 视觉识别节点接收并处理图像(Subscriber)
- 机械臂控制节点发布关节状态(Publisher)
- **可视化节点(RViz)**接收并显示状态(Subscriber)
本章将教你如何编写这样节点的代码。
二、使用 colcon 构建工具
2.1 什么是 colcon?
colcon 是 ROS2 官方推荐的构建工具,用于编译和管理 ROS2 工作空间。它是 ROS1 构建工具(catkin_make、catkin_tools)的迭代版本。
主要功能:
- 编译 C++ 和 Python 功能包
- 管理包之间的依赖关系
- 并行构建多个包(提高效率)
- 支持多种构建类型(ament_cmake、ament_python)
2.1.1 colcon 本质上在做什么?
理解 colcon 的工作原理能帮助你更好地调试问题和优化构建流程。
对于 C++ 代码
colcon 本质上是在调用 CMake 和 Make 来编译 C++ 代码:
# colcon build 背后的操作流程:
1. 读取 CMakeLists.txt
└─> 配置编译选项、依赖库
2. 生成 Makefile(CMake 阶段)
└─> 在 build/ 目录生成构建脚本
3. 编译源代码(Make 阶段)
└─> g++ 或 clang++ 编译 .cpp 文件生成 .o 对象文件
4. 链接生成可执行文件或库
└─> 将 .o 文件和依赖库链接成最终的可执行文件
5. 安装到 install/ 目录
└─> 复制可执行文件、库文件、头文件到安装目录
类比理解:
CMakeLists.txt= 施工图纸(告诉 colcon 如何构建)build/目录 = 施工现场(中间产物)install/目录 = 交付的成品房(可直接使用)
为什么 C++ 需要编译?
- C++ 是编译型语言,需要转换成机器码
- 每次修改代码都需要重新编译
- 编译后的程序运行速度快
对于 Python 代码
colcon 对 Python 的处理完全不同,它主要做的是文件组织和安装:
# colcon build 对 Python 包的操作:
1. 读取 setup.py
└─> 了解包的元信息和入口点
2. 不编译代码(Python 是解释型语言)
└─> 跳过编译步骤
3. 安装 Python 模块
├─> 默认模式:复制 .py 文件到 install/lib/python3.x/site-packages/
└─> --symlink-install:创建符号链接(推荐)
4. 创建可执行脚本
└─> 根据 entry_points 在 install/lib/<package_name>/ 创建启动脚本
5. 设置环境变量
└─> 修改 PYTHONPATH,让 Python 能找到你的包
关键区别:
| 特性 | C++ 包 | Python 包 |
|---|---|---|
| 是否编译 | 需要编译成机器码 | 不需要,直接运行源码 |
| 修改后 | 必须重新 colcon build |
使用 --symlink-install 无需重新构建 |
| build/ 目录 | 包含 .o 对象文件、Makefile | 几乎为空,只有元数据 |
| install/ 目录 | 复制编译后的二进制文件 | 复制或链接 .py 源文件 |
| 运行速度 | 快(机器码) | 相对慢(解释执行) |
2.2 安装 colcon
如果你按照前面教程安装了 ROS2,colcon 应该已经安装。验证安装:
colcon version-check
如果未安装,运行:
# Ubuntu/Debian
sudo apt install python3-colcon-common-extensions
2.3 colcon 工作空间结构
一个典型的 ROS2 工作空间包含以下目录:
ros2_ws/ # 工作空间根目录
├── src/ # 源代码目录(你的功能包放这里)
│ ├── package_1/
│ ├── package_2/
│ └── ...
├── build/ # 构建过程的中间文件(自动生成)
├── install/ # 安装目录,包含可执行文件(自动生成)
└── log/ # 构建日志(自动生成)
比如 Episode1 包编译后:
.
├── build
├── install
├── log
├── README.md
└── src
├── episode1_urdf_1113
├── episode_controller
└── robot_arm_interfaces
8 directories, 1 file
目录说明:
- src/:唯一需要手动创建和编辑的目录
- build/:CMake 构建的中间文件
- install/:编译后的可执行文件和库
- log/:构建过程的日志信息
2.4 colcon 常用命令
# 1. 构建整个工作空间
colcon build
# 2. 只构建指定的包
colcon build --packages-select <package_name>
# 3. 构建指定包及其依赖
colcon build --packages-up-to <package_name>
# 4. Python 开发模式(修改代码无需重新编译)
colcon build --symlink-install
# 5. 显示构建过程的详细输出
colcon build --event-handlers console_direct+
# 6. 串行构建(适合资源受限的设备)
colcon build --executor sequential
推荐组合(Python 开发):
colcon build --symlink-install --packages-select <package_name>
--symlink-install会创建符号链接而不是复制文件,这样修改 Python 代码后无需重新编译。
三、创建工作空间
3.1 理解工作空间的概念
工作空间(Workspace) 是一个包含 ROS2 功能包的目录。ROS2 使用分层工作空间概念:
- Underlay(底层):ROS2 系统安装目录(如
/opt/ros/jazzy) - Overlay(叠加层):你的开发工作空间(如
~/ros2_ws)
类比理解:
- Underlay 就像操作系统(提供基础功能)
- Overlay 就像用户安装的应用(你的自定义功能)
- Overlay 会覆盖 Underlay 中同名的包(方便开发和测试)
3.2 创建工作空间步骤
步骤 1:创建工作空间目录
# 创建工作空间根目录和 src 目录
mkdir -p ~/ros2_ws/src
# 进入工作空间
cd ~/ros2_ws
步骤 2:Source ROS2 环境(Underlay)
在使用工作空间前,必须先 source ROS2 的安装环境:
source /opt/ros/jazzy/setup.bash
重要:每次打开新终端都需要 source,或者将这行命令添加到
~/.bashrc中:echo "source /opt/ros/jazzy/setup.bash" >> ~/.bashrc
步骤 3:验证工作空间
此时工作空间是空的,但我们可以测试构建系统:
cd ~/ros2_ws
colcon build
输出应该显示:
Starting >>> [工作空间初始化]
Summary: 0 packages finished [0.XXs]
构建完成后,会生成 build/、install/、log/ 目录:
ls ~/ros2_ws
# 输出:build install log src
3.3 Source 工作空间(Overlay)
构建工作空间后,需要 source 才能使用其中的功能包:
# 这两种方式完全等价:
# 方式 1(分两步)
source /opt/ros/jazzy/setup.bash # 前面做过了
source ~/ros2_ws/install/local_setup.bash
# 方式 2(一步到位)
source ~/ros2_ws/install/setup.bash # 推荐,简单一些
关于
setup.bash vs local_setup.bash的区别,请参见附录:A.1 setup.bash vs local_setup.bash 的区别
四、创建功能包(Package)
4.1 什么是功能包?
功能包(Package) 是 ROS2 代码的基本组织单元,包含:
- 节点的源代码
- 配置文件
- 依赖声明
- 构建指令
类比理解:
- 功能包 = Python 模块(module)
- 工作空间 = Python 虚拟环境(venv)
4.2 Python 功能包的结构
一个标准的 Python 功能包包含以下文件:
my_package/
├── package.xml # 功能包元信息(名称、版本、依赖)
├── setup.py # Python 安装脚本
├── setup.cfg # 安装配置
├── resource/
│ └── my_package # 空标记文件
├── my_package/ # Python 模块目录
│ ├── __init__.py # 模块初始化文件
│ └── my_node.py # 你的节点代码
└── test/ # 测试文件
├── test_copyright.py
├── test_flake8.py
└── test_pep257.py
比如 Episode1 interface 包:
./src/episode_controller/
├── demo
│ ├── client_demo.py # 机械臂控制测试
│ └── __init__.py
├── episode_controller
│ ├── __init__.py
│ └── robot_interface_node.py # 机械臂通讯节点
├── MANIFEST.in
├── package.xml
├── resource
│ └── episode_controller
├── setup.cfg
├── setup.py
└── test
├── test_copyright.py
├── test_flake8.py
└── test_pep257.py
4.3 创建功能包
步骤 1:进入 src 目录
cd ~/ros2_ws/src
步骤 2:创建功能包
ros2 pkg create --build-type ament_python --license Apache-2.0 py_episode
命令解析:
ros2 pkg create:创建功能包命令--build-type ament_python:指定为 Python 包--license Apache-2.0:指定开源许可证py_episode:功能包名称
输出示例:
going to create a new package
package name: py_episode
destination directory: /home/enpei/ros2_ws
package format: 3
version: 0.0.0
description: TODO: Package description
maintainer: ['<name> <email>']
licenses: ['Apache-2.0']
build type: ament_python
dependencies: []
creating folder ./py_episode
creating ./py_episode/package.xml
creating source folder
creating folder ./py_episode/py_episode
creating ./py_episode/setup.py
creating ./py_episode/setup.cfg
creating folder ./py_episode/resource
creating ./py_episode/resource/py_episode
creating ./py_episode/py_episode/__init__.py
creating folder ./py_episode/test
creating ./py_episode/test/test_copyright.py
creating ./py_episode/test/test_flake8.py
creating ./py_episode/test/test_pep257.py
步骤 3:验证功能包结构
tree
# 你应该看到下面的目录结构
.
└── py_episode
├── LICENSE
├── package.xml
├── py_episode
│ └── __init__.py
├── resource
│ └── py_episode
├── setup.cfg
├── setup.py
└── test
├── test_copyright.py
├── test_flake8.py
└── test_pep257.py
5 directories, 9 files
4.4 自定义功能包信息
编辑 package.xml
打开 package.xml 文件:
cd ~/ros2_ws/src/py_episode
nano package.xml # 或使用你喜欢的编辑器
修改以下字段:
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>py_episode</name>
<version>0.0.0</version>
<!-- 修改描述 -->
<description>TODO: Package description</description>
<!-- 修改维护者信息 -->
<maintainer email="[email protected]">enpei</maintainer>
<license>Apache-2.0</license>
<!-- 添加依赖(后续会用到) -->
<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>
依赖说明:
rclpy:ROS2 的 Python 客户端库std_msgs:标准消息类型(如 String、Int32 等)
编辑 setup.py
打开 setup.py:
nano setup.py
修改以下字段:
from setuptools import find_packages, setup
package_name = 'py_episode'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='enpei',
maintainer_email='[email protected]',
description='TODO: Package description',
license='Apache-2.0',
extras_require={
'test': [
'pytest',
],
},
entry_points={
'console_scripts': [
# 后续会在这里添加可执行文件
],
},
)
五、编写 Publisher 和 Subscriber
本节采用循序渐进的方式,从最简单的代码开始,逐步完善:
| 阶段 | 目标 | 消息类型 |
|---|---|---|
| 阶段 1 | 发布单个关节角度 | std_msgs/Float64 |
| 阶段 2 | 发布 6 个关节角度 | sensor_msgs/JointState |
| 阶段 3 | 思考:如何优化? | 引出自定义 Interface |
5.1 阶段 1:极简代码 - 发布单个关节
目标
用最少的代码实现 Topic 通信:
- Publisher:发布 joint1 的角度(一个浮点数)
- Subscriber:接收并打印角度
5.1.1 编写 Publisher
创建文件 ~/ros2_ws/src/py_episode/py_episode/simple_publisher.py:
#!/usr/bin/env python3
"""
极简 Publisher:发布单个关节角度(使用 Timer 回调模式)
"""
import rclpy # ROS2 Python 客户端库
from rclpy.node import Node # 节点基类
from std_msgs.msg import Float64 # 标准消息类型:64位浮点数
class SimplePublisher(Node): # 继承 Node 类,获得其所有方法
"""简单发布者节点类"""
def __init__(self):
# super() 调用父类 Node 的构造函数,完成节点初始化
# 这样我们才能使用 self.create_publisher()、self.get_logger() 等父类方法
super().__init__('simple_publisher') # 参数是节点名称,ros2 node list 会显示这个名字
# 创建 Publisher:话题名 /joint1_angle,消息类型 Float64,队列长度 10
# 队列长度 10 表示:如果 Subscriber 处理消息太慢,最多缓存 10 条消息
# 超过 10 条后,最旧的消息会被丢弃(防止内存无限增长)
self.pub = self.create_publisher(Float64, '/joint1_angle', 10)
# 创建定时器:每 0.5 秒(2Hz)调用一次 timer_callback
self.timer = self.create_timer(0.5, self.timer_callback)
self.angle = 0.0 # 初始角度
self.get_logger().info('Publisher 已启动,开始发布 joint1 角度...')
def timer_callback(self):
"""定时器回调函数:每次被调用时发布一条消息"""
msg = Float64() # 创建消息对象
msg.data = self.angle # 设置消息内容
self.pub.publish(msg) # 发布消息到话题
self.get_logger().info(f'发布: joint1 = {self.angle:.1f}°')
self.angle += 5.0 # 每次增加 5 度
if self.angle > 90.0: # 超过 90 度后重置
self.angle = 0.0
def main():
rclpy.init() # 初始化 ROS2
node = SimplePublisher() # 创建节点实例
try:
rclpy.spin(node) # 保持节点运行,处理回调
except KeyboardInterrupt:
pass # Ctrl+C 优雅退出
finally:
node.destroy_node() # 销毁节点
rclpy.shutdown() # 关闭 ROS2
if __name__ == '__main__': # Python 入口点
main()
核心代码只有 3 行:
# 在 __init__ 中创建 Publisher
self.pub = self.create_publisher(Float64, '/joint1_angle', 10)
# 在回调函数中发布消息
msg = Float64()
msg.data = self.angle
self.pub.publish(msg)
什么是回调函数(Callback)?
回调函数是一种"被动触发"的函数,不是你主动调用它,而是在某个事件发生时,系统自动调用它。
类比理解:
- 普通函数:你打电话给别人(主动调用)
- 回调函数:你留了个电话号码,别人有事时打给你(被动触发)
在 ROS2 Publisher 的定时器中:
- 当定时器时间到达时(每 0.5 秒)
- ROS2 系统自动调用你定义的
timer_callback函数- 你在回调函数中构建消息并发布
这种模式叫做事件驱动编程,在 GUI 编程(按钮点击事件)、网络编程(收到数据包)中都很常见。
Float64()是 ROS2 标准消息库std_msgs中定义的消息类型,本质上是一个只包含一个 64 位浮点数的简单结构。查看消息结构:
ros2 interface show std_msgs/msg/Float64 # 输出: # float64 data可以看到,
Float64只有一个字段data,所以我们用msg.data = 值来设置内容。常用的 std_msgs 消息类型:
消息类型 字段 用途 Boolbool data布尔值 Int32int32 data32 位整数 Float64float64 data64 位浮点数 Stringstring data字符串 Float64MultiArrayfloat64[] data浮点数数组 查看所有可用消息:
ros2 interface list | grep std_msgs
5.1.2 编写 Subscriber
创建文件 ~/ros2_ws/src/py_episode/py_episode/simple_subscriber.py:
#!/usr/bin/env python3
"""
极简 Subscriber:接收单个关节角度(使用类风格)
"""
import rclpy # ROS2 Python 客户端库
from rclpy.node import Node # 节点基类
from std_msgs.msg import Float64 # 标准消息类型:64位浮点数
class SimpleSubscriber(Node):
"""简单订阅者节点类"""
def __init__(self):
super().__init__('simple_subscriber') # 调用父类构造函数,设置节点名称
# 创建 Subscriber:订阅 /joint1_angle 话题,收到消息后调用 callback
self.subscription = self.create_subscription(
Float64, # 消息类型
'/joint1_angle', # 话题名
self.callback, # 回调函数
10) # 队列长度
self.get_logger().info('Subscriber 已启动,等待数据...')
def callback(self, msg: Float64):
"""回调函数:每次收到消息时自动执行"""
self.get_logger().info(f'收到: joint1 = {msg.data:.1f}°')
def main():
rclpy.init() # 初始化 ROS2
node = SimpleSubscriber() # 创建节点实例
try:
rclpy.spin(node) # 保持节点运行,处理回调
except KeyboardInterrupt:
pass # Ctrl +C 优雅退出
finally:
node.destroy_node() # 销毁节点
rclpy.shutdown() # 关闭 ROS2
if __name__ == '__main__': # Python 入口点
main()
核心代码只有 2 行:
# 在 __init__ 中创建 Subscriber
self.subscription = self.create_subscription(Float64, '/joint1_angle', self.callback, 10)
# 回调函数处理消息
def callback(self, msg):
self.get_logger().info(f'{msg.data}')
5.1.3 配置并运行
添加入口点(setup.py):
entry_points={
'console_scripts': [
'simple_pub = py_episode.simple_publisher:main',
'simple_sub = py_episode.simple_subscriber:main',
],
},
入口点语法解释:
格式:
'命令名 = 包名.模块名:函数名'
simple_pub:你在终端输入的命令名(ros2 run py_episode simple_pub)py_episode:包名(对应py_episode/目录)simple_publisher:模块名(对应simple_publisher.py文件)main:要执行的函数名(文件中的def main():)colcon 会自动生成可执行脚本,将命令映射到 Python 函数。
添加依赖(package.xml):
<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>
为什么要添加依赖?
和 Python 的
requirements.txt或import类似,但 ROS2 依赖有特殊作用:
- 告诉 colcon:这个包需要哪些其他 ROS2 包才能运行
- 自动安装:使用
rosdep install命令时,会自动安装缺失的依赖- 构建顺序:colcon 会根据依赖关系决定包的构建顺序
<exec_depend>表示运行时依赖(执行时需要),还有<build_depend>(构建时需要)、<test_depend>(测试时需要)等类型。为什么注释掉依赖有时也能运行?
因为你的系统已经安装了 ROS2,
rclpy和std_msgs已经存在,Python 可以直接 import。但声明依赖仍然重要:
- 团队协作:其他人克隆你的代码时,
rosdep install --from-paths src --ignore-src -r -y会自动安装依赖- CI/CD:持续集成环境会根据依赖自动配置
- 依赖链检查:当其他包依赖你的包时,colcon 会检查完整的依赖关系
所以即使"能运行",也应该正确声明依赖,这是 ROS2 开发的最佳实践。
构建并运行:
cd ~/ros2_ws
colcon build --packages-select py_episode --symlink-install
source install/local_setup.bash
# 终端 1
ros2 run py_episode simple_pub
# 终端 2
ros2 run py_episode simple_sub
--symlink-install的作用:创建符号链接而不是复制文件。对于 Python 包:
- 不使用:
colcon build会把.py文件复制到install/目录,修改源码后需要重新构建- 使用后:只创建链接指向
src/中的源码,修改代码后无需重新 build,直接生效推荐:Python 开发时始终使用此参数,提高开发效率(只需要
build一次)。
运行效果:
# Publisher 输出
[INFO] [simple_publisher]: 发布: joint1 = 0.0°
[INFO] [simple_publisher]: 发布: joint1 = 5.0°
[INFO] [simple_publisher]: 发布: joint1 = 10.0°
# Subscriber 输出
[INFO] [simple_subscriber]: 收到: joint1 = 0.0°
[INFO] [simple_subscriber]: 收到: joint1 = 5.0°
[INFO] [simple_subscriber]: 收到: joint1 = 10.0°
成功! 你已经用最简代码实现了 Topic 通信。
5.2 阶段 2:发布 6 个关节 - 引入 JointState
问题来了
单个关节可以用 Float64,但机械臂有 6 个关节,怎么办?
方案对比:
| 方案 | 实现方式 | 问题 |
|---|---|---|
| 方案 A | 创建 6 个话题:/joint1_angle, /joint2_angle... |
管理复杂,不优雅 |
| 方案 B | 使用数组类型 Float64MultiArray |
没有关节名称,不知道哪个是哪个 |
| 方案 C | 使用标准消息 sensor_msgs/JointState |
推荐!包含名称和角度 |
5.2.1 了解 Interface(消息类型)
在 ROS2 中,Interface 定义了消息的数据结构。使用命令查看:
# 查看 JointState 的结构
ros2 interface show sensor_msgs/msg/JointState
输出:
std_msgs/Header header
builtin_interfaces/Time stamp
string frame_id
string[] name # 关节名称数组
float64[] position # 关节位置(弧度)
float64[] velocity # 关节速度
float64[] effort # 关节力矩
解读:
name:关节名称列表,如['joint1', 'joint2', ...]position:对应的角度列表(弧度),如[0.1, 0.2, ...]velocity、effort:可选字段
为什么用 JointState?
- ROS2 机械臂的标准消息类型
- RViz、MoveIt 等工具都使用它
- 名称和角度一一对应,清晰明了
5.2.2 修改 Publisher
只需改动几行代码,就能发布 6 个关节:
创建文件 ~/ros2_ws/src/py_episode/py_episode/joint_state_publisher.py:
#!/usr/bin/env python3
"""
机械臂关节状态发布器:发布 6 个关节角度(使用类风格)
"""
import rclpy # ROS2 Python 客户端库
from rclpy.node import Node # 节点基类
from sensor_msgs.msg import JointState # 机械臂关节状态消息类型
import math # 数学库,用于 sin 函数
class JointStatePublisher(Node):
"""关节状态发布者节点类"""
def __init__(self):
super().__init__('joint_state_publisher') # 调用父类构造函数
# 创建 Publisher:话题名 /joint_states,消息类型 JointState
self.pub = self.create_publisher(JointState, '/joint_states', 10)
# 创建定时器:每 0.1 秒(10Hz)调用一次 timer_callback
self.timer = self.create_timer(0.1, self.timer_callback)
# 定义 6 个关节名称(与 URDF 模型中的关节名对应)
self.joint_names = ['joint1', 'joint2', 'joint3', 'joint4', 'joint5', 'joint6']
self.step = 0 # 步数计数器,用于生成正弦波
self.get_logger().info('关节状态发布器已启动')
def timer_callback(self):
"""定时器回调函数:每次被调用时发布一条消息"""
# 构建 JointState 消息
msg = JointState() # 创建消息对象
msg.header.stamp = self.get_clock().now().to_msg() # 设置时间戳
msg.name = self.joint_names # 设置关节名称列表
# 模拟 6 个关节的运动(正弦波,每个关节频率不同)
msg.position = [math.sin(self.step * 0.05 * (i + 1)) * 0.5 for i in range(6)]
msg.velocity = [] # 速度字段,可选,这里留空
msg.effort = [] # 力矩字段,可选,这里留空
self.pub.publish(msg) # 发布消息
# 日志(每秒打印一次,10Hz 所以每 10 步打印)
if self.step % 10 == 0:
angles = [f'{math.degrees(p):.1f}°' for p in msg.position]
self.get_logger().info(f'发布: {angles}')
self.step += 1 # 步数 +1
def main():
rclpy.init() # 初始化 ROS2
node = JointStatePublisher() # 创建节点实例
try:
rclpy.spin(node) # 保持节点运行,处理回调
except KeyboardInterrupt:
pass # Ctrl+C 优雅退出
finally:
node.destroy_node() # 销毁节点
rclpy.shutdown() # 关闭 ROS2
if __name__ == '__main__': # Python 入口点
main()
对比阶段 1 的改动:
| 改动点 | 阶段 1 | 阶段 2 |
|---|---|---|
| 消息类型 | Float64 |
JointState |
| 话题名 | /joint1_angle |
/joint_states |
| 数据 | msg.data = 角度 |
msg.name = [...]``msg.position = [...] |
5.2.3 修改 Subscriber
创建文件 ~/ros2_ws/src/py_episode/py_episode/joint_state_subscriber.py:
#!/usr/bin/env python3
"""
机械臂关节状态监听器:接收 6 个关节角度(使用类风格)
"""
import rclpy # ROS2 Python 客户端库
from rclpy.node import Node # 节点基类
from sensor_msgs.msg import JointState # 机械臂关节状态消息类型
import math # 数学库,用于弧度转角度
class JointStateSubscriber(Node):
"""关节状态订阅者节点类"""
def __init__(self):
super().__init__('joint_state_subscriber') # 调用父类构造函数
# 创建 Subscriber:订阅 /joint_states 话题
self.subscription = self.create_subscription(
JointState, # 消息类型
'/joint_states', # 话题名
self.callback, # 回调函数
10) # 队列长度
self.get_logger().info('关节状态监听器已启动,等待数据...')
def callback(self, msg: JointState):
"""回调函数:每次收到 JointState 消息时执行"""
# 将弧度转为角度,方便阅读
angles = [f'{math.degrees(p):.1f}°' for p in msg.position]
# 格式化输出:joint1: 10.0° | joint2: 20.0° | ...
output = ' | '.join([f'{n}: {a}' for n, a in zip(msg.name, angles)])
self.get_logger().info(output) # 打印到终端
def main():
rclpy.init() # 初始化 ROS2
node = JointStateSubscriber() # 创建节点实例
try:
rclpy.spin(node) # 保持节点运行,处理回调
except KeyboardInterrupt:
pass # Ctrl+C 优雅退出
finally:
node.destroy_node() # 销毁节点
rclpy.shutdown() # 关闭 ROS2
if __name__ == '__main__': # Python 入口点
main()
5.2.4 配置并运行
添加依赖(package.xml):
<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>
<exec_depend>sensor_msgs</exec_depend> <!-- 新增 -->
添加入口点(setup.py):
entry_points={
'console_scripts': [
'simple_pub = py_episode.simple_publisher:main',
'simple_sub = py_episode.simple_subscriber:main',
'joint_pub = py_episode.joint_state_publisher:main', # 新增
'joint_sub = py_episode.joint_state_subscriber:main', # 新增
],
},
构建并运行:
cd ~/ros2_ws
colcon build --packages-select py_episode --symlink-install
source install/local_setup.bash
# 终端 1
ros2 run py_episode joint_pub
# 终端 2
ros2 run py_episode joint_sub
运行效果:
# Publisher 输出
[INFO] [joint_state_publisher]: 发布: ['0.0°', '0.0°', '0.0°', '0.0°', '0.0°', '0.0°']
[INFO] [joint_state_publisher]: 发布: ['14.3°', '27.6°', '38.9°', '47.6°', '53.5°', '56.4°']
# Subscriber 输出
[INFO] [joint_state_subscriber]: joint1: 14.3° | joint2: 27.6° | joint3: 38.9° | joint4: 47.6° | joint5: 53.5° | joint6: 56.4°
用 rqt 检查这个正弦波 Topic:

5.2.5 验证话题
# 查看话题列表
ros2 topic list
# 输出: /joint_states
# 查看话题详情
ros2 topic info /joint_states
# 输出:
Type: sensor_msgs/msg/JointState
Publisher count: 1
Subscription count: 7 # 包含rqt
# 命令行监听
ros2 topic echo /joint_states
5.3 阶段 3:思考 - 自定义 Interface 的必要性
问题:JointState 有冗余字段
再看一次 JointState 的结构:
ros2 interface show sensor_msgs/msg/JointState
std_msgs/Header header
string[] name
float64[] position # ← 我们需要这个
float64[] velocity # ← 我们不需要
float64[] effort # ← 我们不需要
在我们的场景中:
- 只需要
name和position velocity和effort是多余的
这会带来什么问题?
- 带宽浪费:每次发布都包含空的 velocity 和 effort 字段
- 语义不清:其他开发者看到 JointState,会以为有速度和力矩数据
- 灵活性差:如果想添加其他信息(如关节温度),JointState 不支持
解决方案:自定义 Interface
ROS2 允许你定义自己的消息类型!
比如,我们可以创建一个精简版的 ArmJointAngles.msg:
# 文件:ArmJointAngles.msg
string[] name # 关节名称
float64[] position # 关节角度(弧度)
float64 timestamp # 时间戳
甚至可以添加自定义字段:
# 更丰富的自定义消息
string[] name
float64[] position
float64[] temperature # 关节温度
bool[] is_enabled # 是否使能
自定义 Interface 将在下一章详细讲解!
本章先使用标准的
JointState,因为:
- RViz、MoveIt 等工具默认支持它
- 能快速跑通整个流程
- 理解标准消息后,自定义才更有意义
5.4 本节总结
三阶段回顾
阶段 1:Float64(单个关节)
↓ 问题:6 个关节怎么办?
阶段 2:JointState(6 个关节)
↓ 问题:字段冗余怎么办?
阶段 3:自定义 Interface(按需定义)
→ 下一章详解
Publisher vs Subscriber 核心对比
| 特性 | Publisher | Subscriber |
|---|---|---|
| 创建方法 | create_publisher(类型, 话题, 队列) |
create_subscription(类型, 话题, 回调, 队列) |
| 发送/接收 | pub.publish(msg) |
自动调用 callback(msg) |
| 主动/被动 | 主动发送 | 被动接收 |
| 运行方式 | 定时器 + create_timer() |
rclpy.spin(node) |
六、调试与验证
6.1 使用命令行工具调试
本节演示如何用 ROS2 命令行工具调试我们编写的节点。
6.1.1 调试阶段 1(Float64 单关节)
启动 simple_pub 后:
# 查看节点
ros2 node list
# 输出: /simple_publisher
# 查看话题
ros2 topic list
# 输出: /joint1_angle
# 查看话题类型
ros2 topic info /joint1_angle
# 输出: Type: std_msgs/msg/Float64
# 监听话题
ros2 topic echo /joint1_angle
# 输出:
# data: 15.0
# ---
# data: 20.0
# ---
# 查看发布频率
ros2 topic hz /joint1_angle
# 输出: average rate: 2.000
6.1.2 调试阶段 2(JointState 六关节)
启动 joint_pub 后:
# 查看节点
ros2 node list
# 输出:
# /joint_state_publisher
# /joint_state_subscriber
# 查看话题
ros2 topic list
# 输出: /joint_states
# 查看话题类型
ros2 topic info /joint_states
# 输出: Type: sensor_msgs/msg/JointState
# 监听话题
ros2 topic echo /joint_states
输出:
header:
stamp:
sec: 1704520000
nanosec: 123456789
frame_id: ''
name:
- joint1
- joint2
- joint3
- joint4
- joint5
- joint6
position:
- 0.25
- 0.48
- 0.67
- 0.82
- 0.92
- 0.97
velocity: []
effort: []
---
# 查看发布频率
ros2 topic hz /joint_states
# 输出: average rate: 10.000
输出:
average rate: 10.000
min: 0.099s max: 0.101s std dev: 0.00071s window: 50
6.2 使用 rqt_graph 可视化
启动可视化工具:
ros2 run rqt_graph rqt_graph

你会看到:
- 两个节点:
/joint_state_publisher和/joint_state_subscriber - 一个话题:
/joint_states - 箭头显示数据流方向
6.3 常见问题排查
问题 1:Subscriber 收不到消息
可能原因:
- 话题名称不匹配
- 消息类型不匹配
- Publisher 还未启动
排查步骤:
# 1. 检查话题是否存在
ros2 topic list
# 2. 检查话题类型
ros2 topic info /joint_states
# 3. 检查节点是否运行
ros2 node list
问题 2:节点启动失败
可能原因:
- 未 source 工作空间
- 未构建功能包
- 代码有语法错误
排查步骤:
# 1. 重新 source
source /opt/ros/jazzy/setup.bash
source ~/ros2_ws/install/local_setup.bash
# 2. 重新构建
cd ~/ros2_ws
colcon build --packages-select py_episode
# 3. 检查 Python 语法
python3 ~/ros2_ws/src/py_episode/py_episode/joint_state_publisher.py
问题 3:修改代码后没有生效
原因:未使用 --symlink-install 或未重新构建。
解决方法:
# 方法 1:使用 symlink 模式(推荐 Python)
colcon build --packages-select py_episode --symlink-install
# 方法 2:每次修改后重新构建
colcon build --packages-select py_episode
source install/local_setup.bash
七、总结
7.1 本章收获
完成本章后,你应该能够:
- ✓ 理解 colcon 构建系统的工作原理
- ✓ 创建和管理 ROS2 工作空间
- ✓ 创建 Python 功能包
- ✓ 编写 Publisher 节点发布机械臂关节状态
- ✓ 编写 Subscriber 节点接收关节状态
- ✓ 使用命令行工具调试节点和话题
- ✓ 使用
sensor_msgs/JointState标准消息类型
7.2 关键概念回顾
| 概念 | 说明 | 类比 |
|---|---|---|
| colcon | ROS2 构建工具 | Make、CMake |
| Workspace | 包含多个功能包的目录 | Python 虚拟环境 |
| Package | ROS2 代码的组织单元 | Python 模块 |
| Publisher | 发布数据到话题 | 骑手广播位置 |
| Subscriber | 订阅话题接收数据 | 顾客查看位置 |
| Topic | 数据传输的通道 | 广播频道 |
| JointState | 关节状态消息 | 机械臂的"体检报告" |
7.3 常用命令速查
# 工作空间管理
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws
colcon build --packages-select <package_name> --symlink-install
source install/local_setup.bash
# 功能包创建
cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python --license Apache-2.0 <package_name>
# 节点运行
ros2 run <package_name> <executable_name>
# 调试命令
ros2 node list # 查看节点
ros2 topic list # 查看话题
ros2 topic echo /joint_states # 监听关节状态
ros2 topic info /joint_states # 查看话题详情
ros2 topic hz /joint_states # 查看发布频率
ros2 run rqt_graph rqt_graph # 可视化节点关系
附录
A.1 setup.bash vs local_setup.bash 的区别
在 ROS2 工作空间中,install/ 目录下有两个重要的环境脚本:
~/ros2_ws/install/
├── setup.bash # 完整环境脚本
└── local_setup.bash # 仅本工作空间脚本
它们的区别
| 脚本 | 作用 | 适用场景 |
|---|---|---|
| setup.bash | 同时 source 本工作空间 + 所有底层依赖(Underlay) | 新开终端时,一步到位 |
| local_setup.bash | 仅 source 本工作空间,不包含底层依赖 | 已经 source 过 Underlay 时 |
工作原理
当你执行 colcon build 时,colcon 会记录当前环境中已有的工作空间(Underlay),并把这些信息写入 setup.bash。
示例场景:
# 假设你的工作空间依赖 ROS2 Jazzy
# 方式 1:使用 setup.bash(一步到位)
source ~/ros2_ws/install/setup.bash
# 效果:同时 source 了 /opt/ros/jazzy + ~/ros2_ws
# 方式 2:使用 local_setup.bash(分步)
source /opt/ros/jazzy/setup.bash # 先 source Underlay
source ~/ros2_ws/install/local_setup.bash # 再 source 本工作空间
# 效果:与方式 1 完全相同
何时用哪个?
推荐使用 setup.bash(大多数情况):
# 简单直接,适合日常开发
source ~/ros2_ws/install/setup.bash
使用 local_setup.bash 的场景:
-
多工作空间叠加:当你有多个 Overlay 工作空间,需要精确控制加载顺序时
# 假设有三层工作空间 source /opt/ros/jazzy/setup.bash # 系统层 source ~/base_ws/install/local_setup.bash # 基础层 source ~/dev_ws/install/local_setup.bash # 开发层 -
避免重复 source:如果你已经 source 过 ROS2 环境,使用
local_setup.bash可以避免重复加载 -
隔离环境:在测试或 CI 中精确控制环境变量
查看区别
你可以查看两个脚本的内容来理解区别:
# setup.bash 会调用 local_setup.bash + Underlay
cat ~/ros2_ws/install/setup.bash
# local_setup.bash 只设置本工作空间的路径
cat ~/ros2_ws/install/local_setup.bash
总结
| 场景 | 推荐脚本 |
|---|---|
| 新开终端,开始开发 | setup.bash |
| 已 source ROS2,只加载当前工作空间 | local_setup.bash |
| 多工作空间精确控制 | local_setup.bash |
| 不确定用哪个 | setup.bash(更安全) |
小贴士:如果你只有一个工作空间(大多数初学者的情况),直接用
setup.bash就对了!