vscode
ros2
pygame
2.4 手动控制小车转向(上)
约 1187 个字 266 行代码 预计阅读时间 7 分钟
功能概述
在模拟器中宿主一个ROS节点服务,该服务可以接收转向消息。当收到转向消息时,重新绘制旋转后的汽车模型图片,从而实现转向效果。利用pyside6开发手动控制界面,并在该界面中寄宿ROS节点服务。当点击界面上的转向按钮时,发送转向消息到模拟器转向服务,从而实现转向控制。
一、对模型添加转向支持
修改model.py
,修改代码见标亮行
model.py import pygame
import os
from importlib.resources import files
class Model :
"""
汽车模型
"""
def __init__ ( self , car ) -> None :
self . car = car
self . win = car . win
self . share_path = car . share_path
self . _load_model_image ()
def _load_model_image ( self ):
model_path = os . path . join ( self . share_path , "images/model.png" )
print ( model_path )
# 加载汽车图片
model_image = pygame . image . load ( model_path )
model_rect = model_image . get_rect ()
self . model_image = pygame . transform . smoothscale (
model_image , ( model_rect . width / 6 , model_rect . height / 6 )
)
def update ( self ):
model_image = pygame . transform . rotate ( self . model_image , - self . car . world_angle )
# 获取图片尺寸
model_rect = model_image . get_rect ()
# 获取窗口尺寸,让图片居中显示
win_rect = self . win . get_rect ()
model_rect . x = ( win_rect . width - model_rect . width ) / 2
model_rect . y = ( win_rect . height - model_rect . height ) / 2
# 在窗口的指定位置上绘制图片
self . win . blit ( model_image , model_rect )
我们通过调用!#python pygame.transform.rotate()
方法,来对模型图像进行旋转支持。这里,我们调用了car
实例的world_angle
变量,来根据模拟器中旋转角度,进行图片旋转操作。
二、ROS消息简述
在ros中,消息类型分为话题、服务、动作三种。
这类消息主要用于发布订阅模型,可以是一对一也可以是一对多模式,如下图:
话题消息就是传递一些字段到订阅者,所以,话题消息的定义文件中就是一些字段的声明,定义文件以.msg
作为后缀。示例如下:
这类消息主要用于客户端服务器模式,可以是单客户端也可以是多客户端形式,如下图:
服务消息定义中,包含请求字段和应答字段两部分组成,这两部分内容以---
分割。定义文件以.srv
作为后缀。示例如下:
float32 add_num_1
float32 add_num_2
---
float32 sum_num
这类消息主要用于需要耗费一定时间的调用服务,这类消息的特点时,可以定期给调用端以反馈,如下图:
动作消息定义中,包含请求字段、应答字段和反馈字段组成,这三部分内容间以---
分割。定义文件以.action
作为后缀。示例如下:
int32 run_command_id
---
int32 run_result
---
int run_process
三、添加转向消息
将inters
包中的,include
和src
目录都删除掉。在inters
包中,新建目录action
目录,在action
目录中,新建Turn.action
文件,内容如下:
int32 direction # 目标方向(0:左;1:右)
float32 angle # 目标角度
---
float32 angle # 目标最终角度
float32 world_angle # 目标最终世界角度
---
float32 angle # 目标当前角度
float32 world_angle # 目标当前世界角度
修改CMakeLists.txt
,添加如下标亮部分代码。
CMakeLists.txt cmake_minimum_required ( VERSION 3.8 )
project ( inters )
if ( CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang" )
add_compile_options ( - Wall - Wextra - Wpedantic )
endif ()
# find dependencies
find_package ( ament_cmake REQUIRED )
# uncomment the following section in order to fill in
# further dependencies manually.
# find_package(<dependency> REQUIRED)
find_package ( rosidl_default_generators REQUIRED )
set ( action_files
"action/Turn.action"
# etc
)
rosidl_generate_interfaces ( $ { PROJECT_NAME }
$ { action_files }
)
if ( BUILD_TESTING )
find_package ( ament_lint_auto REQUIRED )
# the following line skips the linter which checks for copyrights
# comment the line when a copyright and license is added to all source files
set ( ament_cmake_copyright_FOUND TRUE )
# the following line skips cpplint (only works in a git repo)
# comment the line when this package is in a git repo and when
# a copyright and license is added to all source files
set ( ament_cmake_cpplint_FOUND TRUE )
ament_lint_auto_find_test_dependencies ()
endif ()
ament_package ()
修改package.xml
,添加如下标亮行:
package.xml <?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format= "3" >
<name> inters</name>
<version> 0.0.0</version>
<description> TODO: Package description</description>
<maintainer email= "190045431@qq.com" > fotianmoyin</maintainer>
<license> TODO: License declaration</license>
<buildtool_depend> ament_cmake</buildtool_depend>
<test_depend> ament_lint_auto</test_depend>
<test_depend> ament_lint_common</test_depend>
<buildtool_depend> rosidl_default_generators</buildtool_depend>
<depend> action_msgs</depend>
<member_of_group> rosidl_interface_packages</member_of_group>
<export>
<build_type> ament_cmake</build_type>
</export>
</package>
在终端中执行命令colcon build --package-select inters
执行示例如下:
fotianmoyin@fotianmoyin-moon:~/Projects/vscode/kitt_ws$ colcon build --packages-select inters
Starting >>> inters
Finished <<< inters [ 5 .73s]
Summary: 1 package finished [ 6 .87s]
在终端中执行. install/local_setup.sh
,然后再执行ros2 interface show inters/action/Turn
,一切正常就可以看到消息定义内容。
执行示例如下:
fotianmoyin@fotianmoyin-moon:~/Projects/vscode/kitt_ws$ . install/local_setup.sh
fotianmoyin@fotianmoyin-moon:~/Projects/vscode/kitt_ws$ ros2 interface show inters/action/Turn
int32 direction # 目标方向(0:左;1:右)
float32 angle # 目标角度
---
float32 angle # 目标最终角度
float32 world_angle # 目标最终世界角度
---
float32 angle # 目标当前角度
float32 world_angle # 目标当前世界角度
四、在car.py
中添加转向服务支持
修改car.py
,添加如下代码,看标亮部分:
car.py import pygame
import sys
import os
import getopt
from ament_index_python.packages import get_package_share_directory
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
import math
import time
from inters.action import Turn
from threading import Thread
首先,我们添加了一些导入,其中以rclpy开头的是ROS节点相关的。rclpy
是ROS核心包,ActionServer
是创建动作服务使用的,Node
是创建ROS节点基类。math
是数学计算包,time
是时间相关的包,我们使用了其中的休眠方法。from inters.action import Turn
这个就是引用的我们定义的转向消息。
car.py class Car :
def __init__ ( self ) -> None :
"""初始化窗口并加载显示资源"""
# 将启动ROS服务节点的任务放到单独的线程中,这样可以避免pygame和rclpy会分别阻塞的问题
self . thd_srv = Thread ( target = self . start_srv )
self . thd_srv . start ()
pygame . init ()
if run_as_debug ():
self . share_path = os . path . join ( os . path . dirname ( __file__ ), "../resource" )
else :
self . share_path = get_package_share_directory ( "kitt" )
# 设置窗口大小
self . win = pygame . display . set_mode (( 800 , 600 ), pygame . RESIZABLE )
# 设置窗口标题
pygame . display . set_caption ( "Car" )
self . world_angle = float ( 0 ) # 车世界角度
self . world_x = float ( 0 ) # 车世界坐标x
self . world_y = float ( 0 ) # 车世界坐标y
self . model = Model ( self )
self . map = Map ( self )
pass
def start_srv ( self ):
"""启动ROS节点服务"""
rclpy . init ()
self . car_srv = CarSrv ( "car" , self )
rclpy . spin ( self . car_srv )
self . car_srv . destroy_node ()
rclpy . try_shutdown ()
def run ( self ):
"""运行模拟器"""
while True :
self . _check_events ()
self . _update_win ()
def _check_events ( self ):
"""监视键盘、鼠标事件"""
for event in pygame . event . get ():
if event . type == pygame . QUIT :
rclpy . try_shutdown ()
sys . exit ()
def _update_win ( self ):
"""更新窗口显示"""
color = ( 150 , 150 , 150 )
# 给窗口填充背景色
self . win . fill ( color = color )
# 更新地图
self . map . update ()
# 更新汽车模型
self . model . update ()
# 让最近绘制的窗口可见
pygame . display . flip ()
def set_angle ( self , angle ):
self . world_angle = angle
def turn ( self , direction , angle ):
"""
转向\n
参数:
direction:方向(0:向左;1:向右)\n
angle:角度
返回:
车当前的角度
"""
old_angle = self . world_angle
if direction == 0 :
old_angle -= angle
if direction == 1 :
old_angle += angle
self . set_angle ( old_angle % 360 )
return self . world_angle
Car
的__init__
初始化函数中,我们添加了启动节点服务的代码。这里,我们将启动代码放到一个单独的线程中。绘制界面的pygame在监听事件时是个无限循环。ROS节点启动后,也会持续监听请求事件,也是个循环。如果这两部分在一个线程中运行,就会导致其中一方无法正确执行。另外,我们还添加了world_angel
变量来指示模拟器旋转角度。
在_check_events()
函数中,我们在退出模拟器时,先调用rclpy.try_shutdown()
方法,来关闭节点服务,然后再退出程序。
set_angle()
方法和turn()
方法,用于在接收到转向请求时,用来调整模拟器角度。我们这里定义了旋转方向和角度两个参数。最后,设置模拟器角度时,我们让旋转后的角度对360求模,为的是限制模拟器的角度范围在360之间。
car.py class CarSrv ( Node ):
"""模拟器服务节点"""
def __init__ ( self , node_name , car : Car ):
super () . __init__ ( node_name )
self . car = car
# 转向动作服务
self . _turn_action_server = ActionServer (
self , Turn , "/car/turn" , self . turn_callback
)
def turn_callback ( self , goal_handle ):
self . get_logger () . info ( "Executing turn goal..." )
feedback_msg = Turn . Feedback ()
angle = goal_handle . request . angle
iangle = math . floor ( angle ) # 整数部分
for i in range ( 1 , iangle + 1 ):
feedback_msg . world_angle = self . car . turn ( goal_handle . request . direction , 1 )
feedback_msg . angle = float ( i )
self . get_logger () . info (
f "/car/turn feedback: ( { feedback_msg . angle : .2f } , { feedback_msg . world_angle : .2f } )"
)
goal_handle . publish_feedback ( feedback_msg )
time . sleep ( 0.01 )
dangle = angle % 1 # 小数部分
if dangle != 0 :
feedback_msg . world_angle = self . car . turn (
goal_handle . request . direction , dangle
)
feedback_msg . angle = dangle
self . get_logger () . info (
f "/car/turn feedback: ( { feedback_msg . angle : .2f } , { feedback_msg . world_angle : .2f } )"
)
goal_handle . publish_feedback ( feedback_msg )
goal_handle . succeed ()
result = Turn . Result ()
result . angle = feedback_msg . angle
result . world_angle = feedback_msg . world_angle
return result
CarSrv
类,是我们的节点服务类。我们首先定义了动作服务,转向服务名为/car/turn
。动作类消息请求过程是这样的:首先,服务端会接收到来自客户端的请求参数,然后服务端开始执行动作。执行的过程中,不断的发送反馈信息到客户端。当动作执行完后,我们通过调用goal_handle.succeed()
方法,告知客户端动作执行完毕。最后,我们再返回本次请求的最终结果参数。
为了实现类似真实场景下的转向,我们每间隔100毫秒旋转一度。最后,不足一度的部分,我们再单独旋转一下。
源码下载