ROS与STM32通信(二)-pyserial
ROS与STM32通信一般分为两种,
- STM32上运行ros节点实现通信
- 使用普通的串口库进行通信,然后以话题方式发布
第一种方式具体实现过程可参考上篇文章ROS与STM32通信-rosserial,上述文章中的收发频率不一致情况,目前还没解决,所以本篇文章采用第二种方式来实现STM32与ROS通信,C++实现方式可参看这篇文章ROS与STM32通信,其利用ros serial库数据格式为C/C++共用体实现解析与发布。Python实现方式可使用pyserial库来实现通信,pyserial的用法可参考我之前写的文章python与stm32通信,数据格式我们采用Json格式来解析与发布。
以STM32读取MPU6050,然后ROS发布与订阅为例
下位机
参考之前写的文章STM32HAL库驱动MPU6050
main.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| while (1) {
while (mpu_dmp_get_data(&pitch, &roll, &yaw));
printf("{\"roll\":%.4f,\"pitch\":%.4f,\"yaw\":%.4f}",roll, pitch, yaw); sprintf(oledBuf, "roll :%.2f", roll); OLED_ShowString(0, 28, (u8*)oledBuf, 12); sprintf(oledBuf, "pitch:%.2f", pitch); OLED_ShowString(0, 40, (u8*)oledBuf, 12); sprintf(oledBuf, "yaw :%.2f", yaw); OLED_ShowString(0, 52, (u8*)oledBuf, 12); OLED_Refresh(); }
|
使用printf重定向发送json字符串,注意C语言转义字符:
1
| printf("{\"roll\":%.4f,\"pitch\":%.4f,\"yaw\":%.4f",roll, pitch, yaw);
|
可使用cutecom查看发送的消息

上位机
自定义msg消息
在功能包下新建文件夹为msg
新建文件Imu.msg(首字母大写),输入以下内容
1 2 3
| float32 pitch float32 roll float32 yaw
|
package.xml添加依赖
1 2
| <build_depend>message_generation</build_depend> <exec_depend>message_runtime</exec_depend>
|
CMakeList.txt编辑msg相关配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| find_package(catkin REQUIRED COMPONENTS roscpp rospy std_msgs message_generation )
add_message_files( FILES Imu.msg )
generate_messages( DEPENDENCIES std_msgs )
catkin_package(
CATKIN_DEPENDS roscpp rospy std_msgs message_runtime
)
|
然后编译整个工作空间catkin_make
Python 需要调用的中间文件(…/工作空间/devel/lib/python3/dist-packages/包名/msg)

vscode配置
将前面生成的 python 文件路径配置进settings.json
1 2 3 4 5 6 7 8 9
| { "python.autoComplete.extraPaths": [ "/opt/ros/noetic/lib/python2.7/dist-packages" ], "python.analysis.extraPaths": [ "/opt/ros/noetic/lib/python3/dist-packages", "/home/ghigher/ROS_SW/demo01_ws/devel/lib/python3/dist-packages" ] }
|
发布
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| import serial import rospy import json from hello_vscode.msg import Imu
def is_json(test_str): try: json_object = json.loads(test_str) except Exception as e: return False return True
if __name__ == '__main__': try: port = '/dev/ttyUSB0' baud = 115200 rospy.init_node("serial_node") ser = serial.Serial(port, baud, timeout=0.5) imu_pub = rospy.Publisher("imu", Imu, queue_size=10) flag = ser.isOpen() if flag: rospy.loginfo("Succeed to open port") while not rospy.is_shutdown(): data = ser.readline().decode('gbk') imu_msg = Imu() if data != '' and is_json(data): imu_data = json.loads(data) imu_msg.pitch = imu_data["pitch"] imu_msg.roll = imu_data["roll"] imu_msg.yaw = imu_data["yaw"] imu_pub.publish(imu_msg) rospy.loginfo("pitch:%.2f, roll:%.2f, yaw:%.2f", imu_msg.pitch, imu_msg.roll, imu_msg.yaw) except Exception as exc: rospy.loginfo("Failed to open port")
|
python文件赋予权限并添加到CmakeList.txt
1 2 3 4
| catkin_install_python(PROGRAMS scripts/ros_pyserial_pub.py DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION} )
|
连接stm32
赋予串口权限
1
| sudo chmod 777 /dev/ttyUSB0
|
运行发布文件
1 2 3
| roscore source ./devel/setup.bash rosrun hello_vscode ros_pyserial_pub.py
|

订阅
查看话题
1 2 3
| /imu /rosout /rosout_agg
|
订阅话题

python实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
import rospy from hello_vscode.msg import Imu
def doImu(imu_msg): rospy.loginfo("--------------------------") rospy.loginfo("Pitch: %.4f", imu_msg.pitch) rospy.loginfo("Roll: %.4f", imu_msg.roll) rospy.loginfo("Yaw: %.4f", imu_msg.yaw)
if __name__=="__main__":
rospy.init_node("imu_sub") sub = rospy.Subscriber("imu", Imu, doImu, queue_size=10) rospy.spin()
|
运行
1 2 3
| roscore source ./devel/setup.bash rosrun hello_vscode ros_pyserial_sub.py
|

