Soma Zero Tutorials
🔍 搜索功能尚未开启,敬请期待。

3.4 ROS2 Topic 通信编程实战

一、本章目标

在上一章中,我们学习了 ROS2 的核心概念(Nodes、Topics、Services、Actions、TF2),并通过命令行工具进行了实践。本章将带你动手编写代码,用 Python 实现自己的 Publisher 和 Subscriber 节点。

1.1 学习路线

按照以下顺序逐步掌握 ROS2 开发:

  1. 使用 colcon 构建工具 - ROS2 官方推荐的构建系统
  2. 创建工作空间(Workspace) - 管理 ROS2 项目的目录结构
  3. 创建功能包(Package) - ROS2 代码的组织单元
  4. 编写 Publisher 和 Subscriber - 实现节点间通信

1.2 为什么要学习这些?

回顾外卖系统类比

还记得第三章的外卖系统吗?

  • 骑手节点持续广播 GPS 位置(Publisher)
  • 顾客节点实时接收位置信息(Subscriber)

在实际机器人项目中:

  • 相机节点发布图像数据(Publisher)
  • 视觉识别节点接收并处理图像(Subscriber)
  • 机械臂控制节点发布关节状态(Publisher)
  • **可视化节点(RViz)**接收并显示状态(Subscriber)

本章将教你如何编写这样节点的代码。

二、使用 colcon 构建工具

2.1 什么是 colcon?

colcon 是 ROS2 官方推荐的构建工具,用于编译和管理 ROS2 工作空间。它是 ROS1 构建工具(catkin_make、catkin_tools)的迭代版本。

主要功能

  • 编译 C++ 和 Python 功能包
  • 管理包之间的依赖关系
  • 并行构建多个包(提高效率)
  • 支持多种构建类型(ament_cmake、ament_python)

2.1.1 colcon 本质上在做什么?

理解 colcon 的工作原理能帮助你更好地调试问题和优化构建流程。

对于 C++ 代码

colcon 本质上是在调用 CMakeMake 来编译 C++ 代码:

# colcon build 背后的操作流程: 1. 读取 CMakeLists.txt └─> 配置编译选项、依赖库 2. 生成 Makefile(CMake 阶段) └─> 在 build/ 目录生成构建脚本 3. 编译源代码(Make 阶段) └─> g++ 或 clang++ 编译 .cpp 文件生成 .o 对象文件 4. 链接生成可执行文件或库 └─> 将 .o 文件和依赖库链接成最终的可执行文件 5. 安装到 install/ 目录 └─> 复制可执行文件、库文件、头文件到安装目录

类比理解

  • CMakeLists.txt = 施工图纸(告诉 colcon 如何构建)
  • build/ 目录 = 施工现场(中间产物)
  • install/ 目录 = 交付的成品房(可直接使用)

为什么 C++ 需要编译?

  • C++ 是编译型语言,需要转换成机器码
  • 每次修改代码都需要重新编译
  • 编译后的程序运行速度快

对于 Python 代码

colcon 对 Python 的处理完全不同,它主要做的是文件组织和安装

# colcon build 对 Python 包的操作: 1. 读取 setup.py └─> 了解包的元信息和入口点 2. 不编译代码(Python 是解释型语言) └─> 跳过编译步骤 3. 安装 Python 模块 ├─> 默认模式:复制 .py 文件到 install/lib/python3.x/site-packages/ └─> --symlink-install:创建符号链接(推荐) 4. 创建可执行脚本 └─> 根据 entry_points 在 install/lib/<package_name>/ 创建启动脚本 5. 设置环境变量 └─> 修改 PYTHONPATH,让 Python 能找到你的包

关键区别

特性 C++ 包 Python 包
是否编译 需要编译成机器码 不需要,直接运行源码
修改后 必须重新 colcon build 使用 --symlink-install 无需重新构建
build/ 目录 包含 .o 对象文件、Makefile 几乎为空,只有元数据
install/ 目录 复制编译后的二进制文件 复制或链接 .py 源文件
运行速度 快(机器码) 相对慢(解释执行)

2.2 安装 colcon

如果你按照前面教程安装了 ROS2,colcon 应该已经安装。验证安装:

colcon version-check

如果未安装,运行:

# Ubuntu/Debian sudo apt install python3-colcon-common-extensions

2.3 colcon 工作空间结构

一个典型的 ROS2 工作空间包含以下目录:

ros2_ws/ # 工作空间根目录 ├── src/ # 源代码目录(你的功能包放这里) │ ├── package_1/ │ ├── package_2/ │ └── ... ├── build/ # 构建过程的中间文件(自动生成) ├── install/ # 安装目录,包含可执行文件(自动生成) └── log/ # 构建日志(自动生成)

比如 Episode1 包编译后:

. ├── build ├── install ├── log ├── README.md └── src ├── episode1_urdf_1113 ├── episode_controller └── robot_arm_interfaces 8 directories, 1 file

目录说明

  • src/:唯一需要手动创建和编辑的目录
  • build/:CMake 构建的中间文件
  • install/:编译后的可执行文件和库
  • log/:构建过程的日志信息

2.4 colcon 常用命令

# 1. 构建整个工作空间 colcon build # 2. 只构建指定的包 colcon build --packages-select <package_name> # 3. 构建指定包及其依赖 colcon build --packages-up-to <package_name> # 4. Python 开发模式(修改代码无需重新编译) colcon build --symlink-install # 5. 显示构建过程的详细输出 colcon build --event-handlers console_direct+ # 6. 串行构建(适合资源受限的设备) colcon build --executor sequential

推荐组合(Python 开发):

colcon build --symlink-install --packages-select <package_name>

--symlink-install 会创建符号链接而不是复制文件,这样修改 Python 代码后无需重新编译。

三、创建工作空间

3.1 理解工作空间的概念

工作空间(Workspace) 是一个包含 ROS2 功能包的目录。ROS2 使用分层工作空间概念:

  • Underlay(底层):ROS2 系统安装目录(如 /opt/ros/jazzy
  • Overlay(叠加层):你的开发工作空间(如 ~/ros2_ws

类比理解

  • Underlay 就像操作系统(提供基础功能)
  • Overlay 就像用户安装的应用(你的自定义功能)
  • Overlay 会覆盖 Underlay 中同名的包(方便开发和测试)

3.2 创建工作空间步骤

步骤 1:创建工作空间目录

# 创建工作空间根目录和 src 目录 mkdir -p ~/ros2_ws/src # 进入工作空间 cd ~/ros2_ws

步骤 2:Source ROS2 环境(Underlay)

在使用工作空间前,必须先 source ROS2 的安装环境:

source /opt/ros/jazzy/setup.bash

重要:每次打开新终端都需要 source,或者将这行命令添加到 ~/.bashrc 中:

echo "source /opt/ros/jazzy/setup.bash" >> ~/.bashrc

步骤 3:验证工作空间

此时工作空间是空的,但我们可以测试构建系统:

cd ~/ros2_ws colcon build

输出应该显示:

Starting >>> [工作空间初始化] Summary: 0 packages finished [0.XXs]

构建完成后,会生成 build/install/log/ 目录:

ls ~/ros2_ws # 输出:build install log src

3.3 Source 工作空间(Overlay)

构建工作空间后,需要 source 才能使用其中的功能包:

# 这两种方式完全等价: # 方式 1(分两步) source /opt/ros/jazzy/setup.bash # 前面做过了 source ~/ros2_ws/install/local_setup.bash # 方式 2(一步到位) source ~/ros2_ws/install/setup.bash # 推荐,简单一些

关于 setup.bash vs local_setup.bash 的区别,请参见附录:A.1 setup.bash vs local_setup.bash 的区别

四、创建功能包(Package)

4.1 什么是功能包?

功能包(Package) 是 ROS2 代码的基本组织单元,包含:

  • 节点的源代码
  • 配置文件
  • 依赖声明
  • 构建指令

类比理解

  • 功能包 = Python 模块(module)
  • 工作空间 = Python 虚拟环境(venv)

4.2 Python 功能包的结构

一个标准的 Python 功能包包含以下文件:

my_package/ ├── package.xml # 功能包元信息(名称、版本、依赖) ├── setup.py # Python 安装脚本 ├── setup.cfg # 安装配置 ├── resource/ │ └── my_package # 空标记文件 ├── my_package/ # Python 模块目录 │ ├── __init__.py # 模块初始化文件 │ └── my_node.py # 你的节点代码 └── test/ # 测试文件 ├── test_copyright.py ├── test_flake8.py └── test_pep257.py

比如 Episode1 interface 包:

./src/episode_controller/ ├── demo │ ├── client_demo.py # 机械臂控制测试 │ └── __init__.py ├── episode_controller │ ├── __init__.py │ └── robot_interface_node.py # 机械臂通讯节点 ├── MANIFEST.in ├── package.xml ├── resource │ └── episode_controller ├── setup.cfg ├── setup.py └── test ├── test_copyright.py ├── test_flake8.py └── test_pep257.py

4.3 创建功能包

步骤 1:进入 src 目录

cd ~/ros2_ws/src

步骤 2:创建功能包

ros2 pkg create --build-type ament_python --license Apache-2.0 py_episode

命令解析

  • ros2 pkg create:创建功能包命令
  • --build-type ament_python:指定为 Python 包
  • --license Apache-2.0:指定开源许可证
  • py_episode:功能包名称

输出示例:

going to create a new package package name: py_episode destination directory: /home/enpei/ros2_ws package format: 3 version: 0.0.0 description: TODO: Package description maintainer: ['<name> <email>'] licenses: ['Apache-2.0'] build type: ament_python dependencies: [] creating folder ./py_episode creating ./py_episode/package.xml creating source folder creating folder ./py_episode/py_episode creating ./py_episode/setup.py creating ./py_episode/setup.cfg creating folder ./py_episode/resource creating ./py_episode/resource/py_episode creating ./py_episode/py_episode/__init__.py creating folder ./py_episode/test creating ./py_episode/test/test_copyright.py creating ./py_episode/test/test_flake8.py creating ./py_episode/test/test_pep257.py

步骤 3:验证功能包结构

tree # 你应该看到下面的目录结构 . └── py_episode ├── LICENSE ├── package.xml ├── py_episode │ └── __init__.py ├── resource │ └── py_episode ├── setup.cfg ├── setup.py └── test ├── test_copyright.py ├── test_flake8.py └── test_pep257.py 5 directories, 9 files

4.4 自定义功能包信息

编辑 package.xml

打开 package.xml 文件:

cd ~/ros2_ws/src/py_episode nano package.xml # 或使用你喜欢的编辑器

修改以下字段:

<?xml version="1.0"?> <?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?> <package format="3"> <name>py_episode</name> <version>0.0.0</version> <!-- 修改描述 --> <description>TODO: Package description</description> <!-- 修改维护者信息 --> <maintainer email="[email protected]">enpei</maintainer> <license>Apache-2.0</license> <!-- 添加依赖(后续会用到) --> <exec_depend>rclpy</exec_depend> <exec_depend>std_msgs</exec_depend> <test_depend>ament_copyright</test_depend> <test_depend>ament_flake8</test_depend> <test_depend>ament_pep257</test_depend> <test_depend>python3-pytest</test_depend> <export> <build_type>ament_python</build_type> </export> </package>

依赖说明

  • rclpy:ROS2 的 Python 客户端库
  • std_msgs:标准消息类型(如 String、Int32 等)

编辑 setup.py

打开 setup.py

nano setup.py

修改以下字段:

from setuptools import find_packages, setup package_name = 'py_episode' setup( name=package_name, version='0.0.0', packages=find_packages(exclude=['test']), data_files=[ ('share/ament_index/resource_index/packages', ['resource/' + package_name]), ('share/' + package_name, ['package.xml']), ], install_requires=['setuptools'], zip_safe=True, maintainer='enpei', maintainer_email='[email protected]', description='TODO: Package description', license='Apache-2.0', extras_require={ 'test': [ 'pytest', ], }, entry_points={ 'console_scripts': [ # 后续会在这里添加可执行文件 ], }, )

五、编写 Publisher 和 Subscriber

本节采用循序渐进的方式,从最简单的代码开始,逐步完善:

阶段 目标 消息类型
阶段 1 发布单个关节角度 std_msgs/Float64
阶段 2 发布 6 个关节角度 sensor_msgs/JointState
阶段 3 思考:如何优化? 引出自定义 Interface

5.1 阶段 1:极简代码 - 发布单个关节

目标

用最少的代码实现 Topic 通信:

  • Publisher:发布 joint1 的角度(一个浮点数)
  • Subscriber:接收并打印角度

5.1.1 编写 Publisher

创建文件 ~/ros2_ws/src/py_episode/py_episode/simple_publisher.py

#!/usr/bin/env python3 """ 极简 Publisher:发布单个关节角度(使用 Timer 回调模式) """ import rclpy # ROS2 Python 客户端库 from rclpy.node import Node # 节点基类 from std_msgs.msg import Float64 # 标准消息类型:64位浮点数 class SimplePublisher(Node): # 继承 Node 类,获得其所有方法 """简单发布者节点类""" def __init__(self): # super() 调用父类 Node 的构造函数,完成节点初始化 # 这样我们才能使用 self.create_publisher()、self.get_logger() 等父类方法 super().__init__('simple_publisher') # 参数是节点名称,ros2 node list 会显示这个名字 # 创建 Publisher:话题名 /joint1_angle,消息类型 Float64,队列长度 10 # 队列长度 10 表示:如果 Subscriber 处理消息太慢,最多缓存 10 条消息 # 超过 10 条后,最旧的消息会被丢弃(防止内存无限增长) self.pub = self.create_publisher(Float64, '/joint1_angle', 10) # 创建定时器:每 0.5 秒(2Hz)调用一次 timer_callback self.timer = self.create_timer(0.5, self.timer_callback) self.angle = 0.0 # 初始角度 self.get_logger().info('Publisher 已启动,开始发布 joint1 角度...') def timer_callback(self): """定时器回调函数:每次被调用时发布一条消息""" msg = Float64() # 创建消息对象 msg.data = self.angle # 设置消息内容 self.pub.publish(msg) # 发布消息到话题 self.get_logger().info(f'发布: joint1 = {self.angle:.1f}°') self.angle += 5.0 # 每次增加 5 度 if self.angle > 90.0: # 超过 90 度后重置 self.angle = 0.0 def main(): rclpy.init() # 初始化 ROS2 node = SimplePublisher() # 创建节点实例 try: rclpy.spin(node) # 保持节点运行,处理回调 except KeyboardInterrupt: pass # Ctrl+C 优雅退出 finally: node.destroy_node() # 销毁节点 rclpy.shutdown() # 关闭 ROS2 if __name__ == '__main__': # Python 入口点 main()

核心代码只有 3 行

# 在 __init__ 中创建 Publisher self.pub = self.create_publisher(Float64, '/joint1_angle', 10) # 在回调函数中发布消息 msg = Float64() msg.data = self.angle self.pub.publish(msg)

什么是回调函数(Callback)?

回调函数是一种"被动触发"的函数,不是你主动调用它,而是在某个事件发生时,系统自动调用它

类比理解

  • 普通函数:你打电话给别人(主动调用)
  • 回调函数:你留了个电话号码,别人有事时打给你(被动触发)

在 ROS2 Publisher 的定时器中:

  • 当定时器时间到达时(每 0.5 秒)
  • ROS2 系统自动调用你定义的 timer_callback 函数
  • 你在回调函数中构建消息并发布

这种模式叫做事件驱动编程,在 GUI 编程(按钮点击事件)、网络编程(收到数据包)中都很常见。

Float64() 是 ROS2 标准消息库 std_msgs 中定义的消息类型,本质上是一个只包含一个 64 位浮点数的简单结构。

查看消息结构

ros2 interface show std_msgs/msg/Float64 # 输出: # float64 data

可以看到,Float64 只有一个字段 data,所以我们用 msg.data = 值 来设置内容。

常用的 std_msgs 消息类型

消息类型 字段 用途
Bool bool data 布尔值
Int32 int32 data 32 位整数
Float64 float64 data 64 位浮点数
String string data 字符串
Float64MultiArray float64[] data 浮点数数组

查看所有可用消息

ros2 interface list | grep std_msgs

5.1.2 编写 Subscriber

创建文件 ~/ros2_ws/src/py_episode/py_episode/simple_subscriber.py

#!/usr/bin/env python3 """ 极简 Subscriber:接收单个关节角度(使用类风格) """ import rclpy # ROS2 Python 客户端库 from rclpy.node import Node # 节点基类 from std_msgs.msg import Float64 # 标准消息类型:64位浮点数 class SimpleSubscriber(Node): """简单订阅者节点类""" def __init__(self): super().__init__('simple_subscriber') # 调用父类构造函数,设置节点名称 # 创建 Subscriber:订阅 /joint1_angle 话题,收到消息后调用 callback self.subscription = self.create_subscription( Float64, # 消息类型 '/joint1_angle', # 话题名 self.callback, # 回调函数 10) # 队列长度 self.get_logger().info('Subscriber 已启动,等待数据...') def callback(self, msg: Float64): """回调函数:每次收到消息时自动执行""" self.get_logger().info(f'收到: joint1 = {msg.data:.1f}°') def main(): rclpy.init() # 初始化 ROS2 node = SimpleSubscriber() # 创建节点实例 try: rclpy.spin(node) # 保持节点运行,处理回调 except KeyboardInterrupt: pass # Ctrl +C 优雅退出 finally: node.destroy_node() # 销毁节点 rclpy.shutdown() # 关闭 ROS2 if __name__ == '__main__': # Python 入口点 main()

核心代码只有 2 行

# 在 __init__ 中创建 Subscriber self.subscription = self.create_subscription(Float64, '/joint1_angle', self.callback, 10) # 回调函数处理消息 def callback(self, msg): self.get_logger().info(f'{msg.data}')

5.1.3 配置并运行

添加入口点setup.py):

entry_points={ 'console_scripts': [ 'simple_pub = py_episode.simple_publisher:main', 'simple_sub = py_episode.simple_subscriber:main', ], },

入口点语法解释

格式:'命令名 = 包名.模块名:函数名'

  • simple_pub:你在终端输入的命令名(ros2 run py_episode simple_pub
  • py_episode:包名(对应 py_episode/ 目录)
  • simple_publisher:模块名(对应 simple_publisher.py 文件)
  • main:要执行的函数名(文件中的 def main():

colcon 会自动生成可执行脚本,将命令映射到 Python 函数。

添加依赖package.xml):

<exec_depend>rclpy</exec_depend> <exec_depend>std_msgs</exec_depend>

为什么要添加依赖?

和 Python 的 requirements.txtimport 类似,但 ROS2 依赖有特殊作用:

  1. 告诉 colcon:这个包需要哪些其他 ROS2 包才能运行
  2. 自动安装:使用 rosdep install 命令时,会自动安装缺失的依赖
  3. 构建顺序:colcon 会根据依赖关系决定包的构建顺序

<exec_depend> 表示运行时依赖(执行时需要),还有 <build_depend>(构建时需要)、<test_depend>(测试时需要)等类型。

为什么注释掉依赖有时也能运行?

因为你的系统已经安装了 ROS2,rclpystd_msgs 已经存在,Python 可以直接 import。但声明依赖仍然重要:

  • 团队协作:其他人克隆你的代码时,rosdep install --from-paths src --ignore-src -r -y 会自动安装依赖
  • CI/CD:持续集成环境会根据依赖自动配置
  • 依赖链检查:当其他包依赖你的包时,colcon 会检查完整的依赖关系

所以即使"能运行",也应该正确声明依赖,这是 ROS2 开发的最佳实践。

构建并运行

cd ~/ros2_ws colcon build --packages-select py_episode --symlink-install source install/local_setup.bash # 终端 1 ros2 run py_episode simple_pub # 终端 2 ros2 run py_episode simple_sub

--symlink-install 的作用

创建符号链接而不是复制文件。对于 Python 包:

  • 不使用colcon build 会把 .py 文件复制到 install/ 目录,修改源码后需要重新构建
  • 使用后:只创建链接指向 src/ 中的源码,修改代码后无需重新 build,直接生效

推荐:Python 开发时始终使用此参数,提高开发效率(只需要 build 一次)。

运行效果

# Publisher 输出 [INFO] [simple_publisher]: 发布: joint1 = 0.0° [INFO] [simple_publisher]: 发布: joint1 = 5.0° [INFO] [simple_publisher]: 发布: joint1 = 10.0° # Subscriber 输出 [INFO] [simple_subscriber]: 收到: joint1 = 0.0° [INFO] [simple_subscriber]: 收到: joint1 = 5.0° [INFO] [simple_subscriber]: 收到: joint1 = 10.0°

成功! 你已经用最简代码实现了 Topic 通信。

5.2 阶段 2:发布 6 个关节 - 引入 JointState

问题来了

单个关节可以用 Float64,但机械臂有 6 个关节,怎么办?

方案对比

方案 实现方式 问题
方案 A 创建 6 个话题:/joint1_angle, /joint2_angle... 管理复杂,不优雅
方案 B 使用数组类型 Float64MultiArray 没有关节名称,不知道哪个是哪个
方案 C 使用标准消息 sensor_msgs/JointState 推荐!包含名称和角度

5.2.1 了解 Interface(消息类型)

在 ROS2 中,Interface 定义了消息的数据结构。使用命令查看:

# 查看 JointState 的结构 ros2 interface show sensor_msgs/msg/JointState

输出:

std_msgs/Header header builtin_interfaces/Time stamp string frame_id string[] name # 关节名称数组 float64[] position # 关节位置(弧度) float64[] velocity # 关节速度 float64[] effort # 关节力矩

解读

  • name:关节名称列表,如 ['joint1', 'joint2', ...]
  • position:对应的角度列表(弧度),如 [0.1, 0.2, ...]
  • velocityeffort:可选字段

为什么用 JointState?

  • ROS2 机械臂的标准消息类型
  • RViz、MoveIt 等工具都使用它
  • 名称和角度一一对应,清晰明了

5.2.2 修改 Publisher

只需改动几行代码,就能发布 6 个关节:

创建文件 ~/ros2_ws/src/py_episode/py_episode/joint_state_publisher.py

#!/usr/bin/env python3 """ 机械臂关节状态发布器:发布 6 个关节角度(使用类风格) """ import rclpy # ROS2 Python 客户端库 from rclpy.node import Node # 节点基类 from sensor_msgs.msg import JointState # 机械臂关节状态消息类型 import math # 数学库,用于 sin 函数 class JointStatePublisher(Node): """关节状态发布者节点类""" def __init__(self): super().__init__('joint_state_publisher') # 调用父类构造函数 # 创建 Publisher:话题名 /joint_states,消息类型 JointState self.pub = self.create_publisher(JointState, '/joint_states', 10) # 创建定时器:每 0.1 秒(10Hz)调用一次 timer_callback self.timer = self.create_timer(0.1, self.timer_callback) # 定义 6 个关节名称(与 URDF 模型中的关节名对应) self.joint_names = ['joint1', 'joint2', 'joint3', 'joint4', 'joint5', 'joint6'] self.step = 0 # 步数计数器,用于生成正弦波 self.get_logger().info('关节状态发布器已启动') def timer_callback(self): """定时器回调函数:每次被调用时发布一条消息""" # 构建 JointState 消息 msg = JointState() # 创建消息对象 msg.header.stamp = self.get_clock().now().to_msg() # 设置时间戳 msg.name = self.joint_names # 设置关节名称列表 # 模拟 6 个关节的运动(正弦波,每个关节频率不同) msg.position = [math.sin(self.step * 0.05 * (i + 1)) * 0.5 for i in range(6)] msg.velocity = [] # 速度字段,可选,这里留空 msg.effort = [] # 力矩字段,可选,这里留空 self.pub.publish(msg) # 发布消息 # 日志(每秒打印一次,10Hz 所以每 10 步打印) if self.step % 10 == 0: angles = [f'{math.degrees(p):.1f}°' for p in msg.position] self.get_logger().info(f'发布: {angles}') self.step += 1 # 步数 +1 def main(): rclpy.init() # 初始化 ROS2 node = JointStatePublisher() # 创建节点实例 try: rclpy.spin(node) # 保持节点运行,处理回调 except KeyboardInterrupt: pass # Ctrl+C 优雅退出 finally: node.destroy_node() # 销毁节点 rclpy.shutdown() # 关闭 ROS2 if __name__ == '__main__': # Python 入口点 main()

对比阶段 1 的改动

改动点 阶段 1 阶段 2
消息类型 Float64 JointState
话题名 /joint1_angle /joint_states
数据 msg.data = 角度 msg.name = [...]``msg.position = [...]

5.2.3 修改 Subscriber

创建文件 ~/ros2_ws/src/py_episode/py_episode/joint_state_subscriber.py

#!/usr/bin/env python3 """ 机械臂关节状态监听器:接收 6 个关节角度(使用类风格) """ import rclpy # ROS2 Python 客户端库 from rclpy.node import Node # 节点基类 from sensor_msgs.msg import JointState # 机械臂关节状态消息类型 import math # 数学库,用于弧度转角度 class JointStateSubscriber(Node): """关节状态订阅者节点类""" def __init__(self): super().__init__('joint_state_subscriber') # 调用父类构造函数 # 创建 Subscriber:订阅 /joint_states 话题 self.subscription = self.create_subscription( JointState, # 消息类型 '/joint_states', # 话题名 self.callback, # 回调函数 10) # 队列长度 self.get_logger().info('关节状态监听器已启动,等待数据...') def callback(self, msg: JointState): """回调函数:每次收到 JointState 消息时执行""" # 将弧度转为角度,方便阅读 angles = [f'{math.degrees(p):.1f}°' for p in msg.position] # 格式化输出:joint1: 10.0° | joint2: 20.0° | ... output = ' | '.join([f'{n}: {a}' for n, a in zip(msg.name, angles)]) self.get_logger().info(output) # 打印到终端 def main(): rclpy.init() # 初始化 ROS2 node = JointStateSubscriber() # 创建节点实例 try: rclpy.spin(node) # 保持节点运行,处理回调 except KeyboardInterrupt: pass # Ctrl+C 优雅退出 finally: node.destroy_node() # 销毁节点 rclpy.shutdown() # 关闭 ROS2 if __name__ == '__main__': # Python 入口点 main()

5.2.4 配置并运行

添加依赖package.xml):

<exec_depend>rclpy</exec_depend> <exec_depend>std_msgs</exec_depend> <exec_depend>sensor_msgs</exec_depend> <!-- 新增 -->

添加入口点setup.py):

entry_points={ 'console_scripts': [ 'simple_pub = py_episode.simple_publisher:main', 'simple_sub = py_episode.simple_subscriber:main', 'joint_pub = py_episode.joint_state_publisher:main', # 新增 'joint_sub = py_episode.joint_state_subscriber:main', # 新增 ], },

构建并运行

cd ~/ros2_ws colcon build --packages-select py_episode --symlink-install source install/local_setup.bash # 终端 1 ros2 run py_episode joint_pub # 终端 2 ros2 run py_episode joint_sub

运行效果

# Publisher 输出 [INFO] [joint_state_publisher]: 发布: ['0.0°', '0.0°', '0.0°', '0.0°', '0.0°', '0.0°'] [INFO] [joint_state_publisher]: 发布: ['14.3°', '27.6°', '38.9°', '47.6°', '53.5°', '56.4°'] # Subscriber 输出 [INFO] [joint_state_subscriber]: joint1: 14.3° | joint2: 27.6° | joint3: 38.9° | joint4: 47.6° | joint5: 53.5° | joint6: 56.4°

rqt 检查这个正弦波 Topic

5.2.5 验证话题

# 查看话题列表 ros2 topic list # 输出: /joint_states # 查看话题详情 ros2 topic info /joint_states # 输出: Type: sensor_msgs/msg/JointState Publisher count: 1 Subscription count: 7 # 包含rqt # 命令行监听 ros2 topic echo /joint_states

5.3 阶段 3:思考 - 自定义 Interface 的必要性

问题:JointState 有冗余字段

再看一次 JointState 的结构:

ros2 interface show sensor_msgs/msg/JointState
std_msgs/Header header string[] name float64[] position # ← 我们需要这个 float64[] velocity # ← 我们不需要 float64[] effort # ← 我们不需要

在我们的场景中:

  • 只需要 nameposition
  • velocityeffort 是多余的

这会带来什么问题?

  1. 带宽浪费:每次发布都包含空的 velocity 和 effort 字段
  2. 语义不清:其他开发者看到 JointState,会以为有速度和力矩数据
  3. 灵活性差:如果想添加其他信息(如关节温度),JointState 不支持

解决方案:自定义 Interface

ROS2 允许你定义自己的消息类型

比如,我们可以创建一个精简版的 ArmJointAngles.msg

# 文件:ArmJointAngles.msg string[] name # 关节名称 float64[] position # 关节角度(弧度) float64 timestamp # 时间戳

甚至可以添加自定义字段:

# 更丰富的自定义消息 string[] name float64[] position float64[] temperature # 关节温度 bool[] is_enabled # 是否使能

自定义 Interface 将在下一章详细讲解!

本章先使用标准的 JointState,因为:

  • RViz、MoveIt 等工具默认支持它
  • 能快速跑通整个流程
  • 理解标准消息后,自定义才更有意义

5.4 本节总结

三阶段回顾

阶段 1:Float64(单个关节) ↓ 问题:6 个关节怎么办? 阶段 2:JointState(6 个关节) ↓ 问题:字段冗余怎么办? 阶段 3:自定义 Interface(按需定义) → 下一章详解

Publisher vs Subscriber 核心对比

特性 Publisher Subscriber
创建方法 create_publisher(类型, 话题, 队列) create_subscription(类型, 话题, 回调, 队列)
发送/接收 pub.publish(msg) 自动调用 callback(msg)
主动/被动 主动发送 被动接收
运行方式 定时器 + create_timer() rclpy.spin(node)

六、调试与验证

6.1 使用命令行工具调试

本节演示如何用 ROS2 命令行工具调试我们编写的节点。

6.1.1 调试阶段 1(Float64 单关节)

启动 simple_pub 后:

# 查看节点 ros2 node list # 输出: /simple_publisher # 查看话题 ros2 topic list # 输出: /joint1_angle # 查看话题类型 ros2 topic info /joint1_angle # 输出: Type: std_msgs/msg/Float64 # 监听话题 ros2 topic echo /joint1_angle # 输出: # data: 15.0 # --- # data: 20.0 # --- # 查看发布频率 ros2 topic hz /joint1_angle # 输出: average rate: 2.000

6.1.2 调试阶段 2(JointState 六关节)

启动 joint_pub 后:

# 查看节点 ros2 node list # 输出: # /joint_state_publisher # /joint_state_subscriber # 查看话题 ros2 topic list # 输出: /joint_states # 查看话题类型 ros2 topic info /joint_states # 输出: Type: sensor_msgs/msg/JointState # 监听话题 ros2 topic echo /joint_states

输出:

header: stamp: sec: 1704520000 nanosec: 123456789 frame_id: '' name: - joint1 - joint2 - joint3 - joint4 - joint5 - joint6 position: - 0.25 - 0.48 - 0.67 - 0.82 - 0.92 - 0.97 velocity: [] effort: [] ---
# 查看发布频率 ros2 topic hz /joint_states # 输出: average rate: 10.000

输出:

average rate: 10.000 min: 0.099s max: 0.101s std dev: 0.00071s window: 50

6.2 使用 rqt_graph 可视化

启动可视化工具:

ros2 run rqt_graph rqt_graph

你会看到:

  • 两个节点:/joint_state_publisher/joint_state_subscriber
  • 一个话题:/joint_states
  • 箭头显示数据流方向

6.3 常见问题排查

问题 1:Subscriber 收不到消息

可能原因

  • 话题名称不匹配
  • 消息类型不匹配
  • Publisher 还未启动

排查步骤

# 1. 检查话题是否存在 ros2 topic list # 2. 检查话题类型 ros2 topic info /joint_states # 3. 检查节点是否运行 ros2 node list

问题 2:节点启动失败

可能原因

  • 未 source 工作空间
  • 未构建功能包
  • 代码有语法错误

排查步骤

# 1. 重新 source source /opt/ros/jazzy/setup.bash source ~/ros2_ws/install/local_setup.bash # 2. 重新构建 cd ~/ros2_ws colcon build --packages-select py_episode # 3. 检查 Python 语法 python3 ~/ros2_ws/src/py_episode/py_episode/joint_state_publisher.py

问题 3:修改代码后没有生效

原因:未使用 --symlink-install 或未重新构建。

解决方法

# 方法 1:使用 symlink 模式(推荐 Python) colcon build --packages-select py_episode --symlink-install # 方法 2:每次修改后重新构建 colcon build --packages-select py_episode source install/local_setup.bash

七、总结

7.1 本章收获

完成本章后,你应该能够:

  • ✓ 理解 colcon 构建系统的工作原理
  • ✓ 创建和管理 ROS2 工作空间
  • ✓ 创建 Python 功能包
  • ✓ 编写 Publisher 节点发布机械臂关节状态
  • ✓ 编写 Subscriber 节点接收关节状态
  • ✓ 使用命令行工具调试节点和话题
  • ✓ 使用 sensor_msgs/JointState 标准消息类型

7.2 关键概念回顾

概念 说明 类比
colcon ROS2 构建工具 Make、CMake
Workspace 包含多个功能包的目录 Python 虚拟环境
Package ROS2 代码的组织单元 Python 模块
Publisher 发布数据到话题 骑手广播位置
Subscriber 订阅话题接收数据 顾客查看位置
Topic 数据传输的通道 广播频道
JointState 关节状态消息 机械臂的"体检报告"

7.3 常用命令速查

# 工作空间管理 mkdir -p ~/ros2_ws/src cd ~/ros2_ws colcon build --packages-select <package_name> --symlink-install source install/local_setup.bash # 功能包创建 cd ~/ros2_ws/src ros2 pkg create --build-type ament_python --license Apache-2.0 <package_name> # 节点运行 ros2 run <package_name> <executable_name> # 调试命令 ros2 node list # 查看节点 ros2 topic list # 查看话题 ros2 topic echo /joint_states # 监听关节状态 ros2 topic info /joint_states # 查看话题详情 ros2 topic hz /joint_states # 查看发布频率 ros2 run rqt_graph rqt_graph # 可视化节点关系

附录

A.1 setup.bash vs local_setup.bash 的区别

在 ROS2 工作空间中,install/ 目录下有两个重要的环境脚本:

~/ros2_ws/install/ ├── setup.bash # 完整环境脚本 └── local_setup.bash # 仅本工作空间脚本

它们的区别

脚本 作用 适用场景
setup.bash 同时 source 本工作空间 + 所有底层依赖(Underlay) 新开终端时,一步到位
local_setup.bash source 本工作空间,不包含底层依赖 已经 source 过 Underlay 时

工作原理

当你执行 colcon build 时,colcon 会记录当前环境中已有的工作空间(Underlay),并把这些信息写入 setup.bash

示例场景

# 假设你的工作空间依赖 ROS2 Jazzy # 方式 1:使用 setup.bash(一步到位) source ~/ros2_ws/install/setup.bash # 效果:同时 source 了 /opt/ros/jazzy + ~/ros2_ws # 方式 2:使用 local_setup.bash(分步) source /opt/ros/jazzy/setup.bash # 先 source Underlay source ~/ros2_ws/install/local_setup.bash # 再 source 本工作空间 # 效果:与方式 1 完全相同

何时用哪个?

推荐使用 setup.bash(大多数情况):

# 简单直接,适合日常开发 source ~/ros2_ws/install/setup.bash

使用 local_setup.bash 的场景

  1. 多工作空间叠加:当你有多个 Overlay 工作空间,需要精确控制加载顺序时

    # 假设有三层工作空间 source /opt/ros/jazzy/setup.bash # 系统层 source ~/base_ws/install/local_setup.bash # 基础层 source ~/dev_ws/install/local_setup.bash # 开发层
  2. 避免重复 source:如果你已经 source 过 ROS2 环境,使用 local_setup.bash 可以避免重复加载

  3. 隔离环境:在测试或 CI 中精确控制环境变量

查看区别

你可以查看两个脚本的内容来理解区别:

# setup.bash 会调用 local_setup.bash + Underlay cat ~/ros2_ws/install/setup.bash # local_setup.bash 只设置本工作空间的路径 cat ~/ros2_ws/install/local_setup.bash

总结

场景 推荐脚本
新开终端,开始开发 setup.bash
已 source ROS2,只加载当前工作空间 local_setup.bash
多工作空间精确控制 local_setup.bash
不确定用哪个 setup.bash(更安全)

小贴士:如果你只有一个工作空间(大多数初学者的情况),直接用 setup.bash 就对了!