vscode
ros2
pygame
2.4 手动控制小车转向(上)
约 1187 个字 266 行代码 预计阅读时间 7 分钟
功能概述
在模拟器中宿主一个ROS节点服务,该服务可以接收转向消息。当收到转向消息时,重新绘制旋转后的汽车模型图片,从而实现转向效果。利用pyside6开发手动控制界面,并在该界面中寄宿ROS节点服务。当点击界面上的转向按钮时,发送转向消息到模拟器转向服务,从而实现转向控制。
一、对模型添加转向支持
修改model.py,修改代码见标亮行
model.py import pygame
import os
from importlib.resources import files
class Model :
"""
汽车模型
"""
def __init__ ( self , car ) -> None :
self . car = car
self . win = car . win
self . share_path = car . share_path
self . _load_model_image ()
def _load_model_image ( self ):
model_path = os . path . join ( self . share_path , "images/model.png" )
print ( model_path )
# 加载汽车图片
model_image = pygame . image . load ( model_path )
model_rect = model_image . get_rect ()
self . model_image = pygame . transform . smoothscale (
model_image , ( model_rect . width / 6 , model_rect . height / 6 )
)
def update ( self ):
model_image = pygame . transform . rotate ( self . model_image , - self . car . world_angle )
# 获取图片尺寸
model_rect = model_image . get_rect ()
# 获取窗口尺寸,让图片居中显示
win_rect = self . win . get_rect ()
model_rect . x = ( win_rect . width - model_rect . width ) / 2
model_rect . y = ( win_rect . height - model_rect . height ) / 2
# 在窗口的指定位置上绘制图片
self . win . blit ( model_image , model_rect )
我们通过调用!#python pygame.transform.rotate()方法,来对模型图像进行旋转支持。这里,我们调用了car实例的world_angle变量,来根据模拟器中旋转角度,进行图片旋转操作。
二、ROS消息简述
在ros中,消息类型分为话题、服务、动作三种。
这类消息主要用于发布订阅模型,可以是一对一也可以是一对多模式,如下图:
话题消息就是传递一些字段到订阅者,所以,话题消息的定义文件中就是一些字段的声明,定义文件以.msg作为后缀。示例如下:
这类消息主要用于客户端服务器模式,可以是单客户端也可以是多客户端形式,如下图:
服务消息定义中,包含请求字段和应答字段两部分组成,这两部分内容以---分割。定义文件以.srv作为后缀。示例如下:
float32 add_num_1
float32 add_num_2
---
float32 sum_num
这类消息主要用于需要耗费一定时间的调用服务,这类消息的特点时,可以定期给调用端以反馈,如下图:
动作消息定义中,包含请求字段、应答字段和反馈字段组成,这三部分内容间以---分割。定义文件以.action作为后缀。示例如下:
int32 run_command_id
---
int32 run_result
---
int run_process
三、添加转向消息
将inters包中的,include和src目录都删除掉。在inters包中,新建目录action目录,在action目录中,新建Turn.action文件,内容如下:
int32 direction # 目标方向(0:左;1:右)
float32 angle # 目标角度
---
float32 angle # 目标最终角度
float32 world_angle # 目标最终世界角度
---
float32 angle # 目标当前角度
float32 world_angle # 目标当前世界角度
修改CMakeLists.txt,添加如下标亮部分代码。
CMakeLists.txt cmake_minimum_required ( VERSION 3.8 )
project ( inters )
if ( CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang" )
add_compile_options ( - Wall - Wextra - Wpedantic )
endif ()
# find dependencies
find_package ( ament_cmake REQUIRED )
# uncomment the following section in order to fill in
# further dependencies manually.
# find_package(<dependency> REQUIRED)
find_package ( rosidl_default_generators REQUIRED )
set ( action_files
"action/Turn.action"
# etc
)
rosidl_generate_interfaces ( $ { PROJECT_NAME }
$ { action_files }
)
if ( BUILD_TESTING )
find_package ( ament_lint_auto REQUIRED )
# the following line skips the linter which checks for copyrights
# comment the line when a copyright and license is added to all source files
set ( ament_cmake_copyright_FOUND TRUE )
# the following line skips cpplint (only works in a git repo)
# comment the line when this package is in a git repo and when
# a copyright and license is added to all source files
set ( ament_cmake_cpplint_FOUND TRUE )
ament_lint_auto_find_test_dependencies ()
endif ()
ament_package ()
修改package.xml,添加如下标亮行:
package.xml <?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format= "3" >
<name> inters</name>
<version> 0.0.0</version>
<description> TODO: Package description</description>
<maintainer email= "190045431@qq.com" > fotianmoyin</maintainer>
<license> TODO: License declaration</license>
<buildtool_depend> ament_cmake</buildtool_depend>
<test_depend> ament_lint_auto</test_depend>
<test_depend> ament_lint_common</test_depend>
<buildtool_depend> rosidl_default_generators</buildtool_depend>
<depend> action_msgs</depend>
<member_of_group> rosidl_interface_packages</member_of_group>
<export>
<build_type> ament_cmake</build_type>
</export>
</package>
在终端中执行命令colcon build --package-select inters
执行示例如下:
fotianmoyin@fotianmoyin-moon:~/Projects/vscode/kitt_ws$ colcon build --packages-select inters
Starting >>> inters
Finished <<< inters [ 5 .73s]
Summary: 1 package finished [ 6 .87s]
在终端中执行. install/local_setup.sh,然后再执行ros2 interface show inters/action/Turn,一切正常就可以看到消息定义内容。
执行示例如下:
fotianmoyin@fotianmoyin-moon:~/Projects/vscode/kitt_ws$ . install/local_setup.sh
fotianmoyin@fotianmoyin-moon:~/Projects/vscode/kitt_ws$ ros2 interface show inters/action/Turn
int32 direction # 目标方向(0:左;1:右)
float32 angle # 目标角度
---
float32 angle # 目标最终角度
float32 world_angle # 目标最终世界角度
---
float32 angle # 目标当前角度
float32 world_angle # 目标当前世界角度
四、在car.py中添加转向服务支持
修改car.py,添加如下代码,看标亮部分:
car.py import pygame
import sys
import os
import getopt
from ament_index_python.packages import get_package_share_directory
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
import math
import time
from inters.action import Turn
from threading import Thread
首先,我们添加了一些导入,其中以rclpy开头的是ROS节点相关的。rclpy是ROS核心包,ActionServer是创建动作服务使用的,Node是创建ROS节点基类。math是数学计算包,time是时间相关的包,我们使用了其中的休眠方法。from inters.action import Turn 这个就是引用的我们定义的转向消息。
car.py class Car :
def __init__ ( self ) -> None :
"""初始化窗口并加载显示资源"""
# 将启动ROS服务节点的任务放到单独的线程中,这样可以避免pygame和rclpy会分别阻塞的问题
self . thd_srv = Thread ( target = self . start_srv )
self . thd_srv . start ()
pygame . init ()
if run_as_debug ():
self . share_path = os . path . join ( os . path . dirname ( __file__ ), "../resource" )
else :
self . share_path = get_package_share_directory ( "kitt" )
# 设置窗口大小
self . win = pygame . display . set_mode (( 800 , 600 ), pygame . RESIZABLE )
# 设置窗口标题
pygame . display . set_caption ( "Car" )
self . world_angle = float ( 0 ) # 车世界角度
self . world_x = float ( 0 ) # 车世界坐标x
self . world_y = float ( 0 ) # 车世界坐标y
self . model = Model ( self )
self . map = Map ( self )
pass
def start_srv ( self ):
"""启动ROS节点服务"""
rclpy . init ()
self . car_srv = CarSrv ( "car" , self )
rclpy . spin ( self . car_srv )
self . car_srv . destroy_node ()
rclpy . try_shutdown ()
def run ( self ):
"""运行模拟器"""
while True :
self . _check_events ()
self . _update_win ()
def _check_events ( self ):
"""监视键盘、鼠标事件"""
for event in pygame . event . get ():
if event . type == pygame . QUIT :
rclpy . try_shutdown ()
sys . exit ()
def _update_win ( self ):
"""更新窗口显示"""
color = ( 150 , 150 , 150 )
# 给窗口填充背景色
self . win . fill ( color = color )
# 更新地图
self . map . update ()
# 更新汽车模型
self . model . update ()
# 让最近绘制的窗口可见
pygame . display . flip ()
def set_angle ( self , angle ):
self . world_angle = angle
def turn ( self , direction , angle ):
"""
转向\n
参数:
direction:方向(0:向左;1:向右)\n
angle:角度
返回:
车当前的角度
"""
old_angle = self . world_angle
if direction == 0 :
old_angle -= angle
if direction == 1 :
old_angle += angle
self . set_angle ( old_angle % 360 )
return self . world_angle
Car的__init__初始化函数中,我们添加了启动节点服务的代码。这里,我们将启动代码放到一个单独的线程中。绘制界面的pygame在监听事件时是个无限循环。ROS节点启动后,也会持续监听请求事件,也是个循环。如果这两部分在一个线程中运行,就会导致其中一方无法正确执行。另外,我们还添加了world_angel变量来指示模拟器旋转角度。
在_check_events()函数中,我们在退出模拟器时,先调用rclpy.try_shutdown()方法,来关闭节点服务,然后再退出程序。
set_angle()方法和turn()方法,用于在接收到转向请求时,用来调整模拟器角度。我们这里定义了旋转方向和角度两个参数。最后,设置模拟器角度时,我们让旋转后的角度对360求模,为的是限制模拟器的角度范围在360之间。
car.py class CarSrv ( Node ):
"""模拟器服务节点"""
def __init__ ( self , node_name , car : Car ):
super () . __init__ ( node_name )
self . car = car
# 转向动作服务
self . _turn_action_server = ActionServer (
self , Turn , "/car/turn" , self . turn_callback
)
def turn_callback ( self , goal_handle ):
self . get_logger () . info ( "Executing turn goal..." )
feedback_msg = Turn . Feedback ()
angle = goal_handle . request . angle
iangle = math . floor ( angle ) # 整数部分
for i in range ( 1 , iangle + 1 ):
feedback_msg . world_angle = self . car . turn ( goal_handle . request . direction , 1 )
feedback_msg . angle = float ( i )
self . get_logger () . info (
f "/car/turn feedback: ( { feedback_msg . angle : .2f } , { feedback_msg . world_angle : .2f } )"
)
goal_handle . publish_feedback ( feedback_msg )
time . sleep ( 0.01 )
dangle = angle % 1 # 小数部分
if dangle != 0 :
feedback_msg . world_angle = self . car . turn (
goal_handle . request . direction , dangle
)
feedback_msg . angle = dangle
self . get_logger () . info (
f "/car/turn feedback: ( { feedback_msg . angle : .2f } , { feedback_msg . world_angle : .2f } )"
)
goal_handle . publish_feedback ( feedback_msg )
goal_handle . succeed ()
result = Turn . Result ()
result . angle = feedback_msg . angle
result . world_angle = feedback_msg . world_angle
return result
CarSrv类,是我们的节点服务类。我们首先定义了动作服务,转向服务名为/car/turn。动作类消息请求过程是这样的:首先,服务端会接收到来自客户端的请求参数,然后服务端开始执行动作。执行的过程中,不断的发送反馈信息到客户端。当动作执行完后,我们通过调用goal_handle.succeed()方法,告知客户端动作执行完毕。最后,我们再返回本次请求的最终结果参数。
为了实现类似真实场景下的转向,我们每间隔100毫秒旋转一度。最后,不足一度的部分,我们再单独旋转一下。
源码下载