跳转至

2.4 手动控制小车转向(上)

约 1187 个字 266 行代码 预计阅读时间 7 分钟

功能概述

在模拟器中宿主一个ROS节点服务,该服务可以接收转向消息。当收到转向消息时,重新绘制旋转后的汽车模型图片,从而实现转向效果。利用pyside6开发手动控制界面,并在该界面中寄宿ROS节点服务。当点击界面上的转向按钮时,发送转向消息到模拟器转向服务,从而实现转向控制。

一、对模型添加转向支持

修改model.py,修改代码见标亮行

model.py
import pygame
import os
from importlib.resources import files


class Model:
    """
    汽车模型
    """
    def __init__(self, car) -> None:
        self.car = car
        self.win = car.win
        self.share_path = car.share_path
        self._load_model_image()

    def _load_model_image(self):
        model_path = os.path.join(self.share_path, "images/model.png")
        print(model_path)
        # 加载汽车图片
        model_image = pygame.image.load(model_path)
        model_rect = model_image.get_rect()
        self.model_image = pygame.transform.smoothscale(
            model_image, (model_rect.width / 6, model_rect.height / 6)
        )

    def update(self):
        model_image = pygame.transform.rotate(self.model_image, -self.car.world_angle)
        # 获取图片尺寸
        model_rect = model_image.get_rect()
        # 获取窗口尺寸,让图片居中显示
        win_rect = self.win.get_rect()
        model_rect.x = (win_rect.width - model_rect.width) / 2
        model_rect.y = (win_rect.height - model_rect.height) / 2
        # 在窗口的指定位置上绘制图片
        self.win.blit(model_image, model_rect)
我们通过调用!#python pygame.transform.rotate()方法,来对模型图像进行旋转支持。这里,我们调用了car实例的world_angle变量,来根据模拟器中旋转角度,进行图片旋转操作。

二、ROS消息简述

在ros中,消息类型分为话题、服务、动作三种。

  • 第一种:话题

这类消息主要用于发布订阅模型,可以是一对一也可以是一对多模式,如下图:

Topic-SinglePublisherandSingleSubscriber

Topic-MultiplePublisherandMultipleSubscriber

话题消息就是传递一些字段到订阅者,所以,话题消息的定义文件中就是一些字段的声明,定义文件以.msg作为后缀。示例如下:

string name
int8 age

  • 第二种:服务

这类消息主要用于客户端服务器模式,可以是单客户端也可以是多客户端形式,如下图:

Service-SingleServiceClient

Service-MultipleServiceClient

服务消息定义中,包含请求字段和应答字段两部分组成,这两部分内容以---分割。定义文件以.srv作为后缀。示例如下:

float32 add_num_1
float32 add_num_2
---
float32 sum_num

  • 第三种:动作

这类消息主要用于需要耗费一定时间的调用服务,这类消息的特点时,可以定期给调用端以反馈,如下图:

Action-SingleActionClient

动作消息定义中,包含请求字段、应答字段和反馈字段组成,这三部分内容间以---分割。定义文件以.action作为后缀。示例如下:

int32 run_command_id
---
int32 run_result
---
int run_process

三、添加转向消息

inters包中的,includesrc目录都删除掉。在inters包中,新建目录action目录,在action目录中,新建Turn.action文件,内容如下:

int32 direction # 目标方向(0:左;1:右)
float32 angle # 目标角度
---
float32 angle # 目标最终角度
float32 world_angle # 目标最终世界角度
---
float32 angle # 目标当前角度
float32 world_angle # 目标当前世界角度
修改CMakeLists.txt,添加如下标亮部分代码。
CMakeLists.txt
cmake_minimum_required(VERSION 3.8)
project(inters)

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()

# find dependencies
find_package(ament_cmake REQUIRED)
# uncomment the following section in order to fill in
# further dependencies manually.
# find_package(<dependency> REQUIRED)

find_package(rosidl_default_generators REQUIRED)

set(action_files
  "action/Turn.action"
  # etc
)

rosidl_generate_interfaces(${PROJECT_NAME}
  ${action_files}
)

if(BUILD_TESTING)
  find_package(ament_lint_auto REQUIRED)
  # the following line skips the linter which checks for copyrights
  # comment the line when a copyright and license is added to all source files
  set(ament_cmake_copyright_FOUND TRUE)
  # the following line skips cpplint (only works in a git repo)
  # comment the line when this package is in a git repo and when
  # a copyright and license is added to all source files
  set(ament_cmake_cpplint_FOUND TRUE)
  ament_lint_auto_find_test_dependencies()
endif()

ament_package()
修改package.xml,添加如下标亮行:
package.xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>inters</name>
  <version>0.0.0</version>
  <description>TODO: Package description</description>
  <maintainer email="190045431@qq.com">fotianmoyin</maintainer>
  <license>TODO: License declaration</license>

  <buildtool_depend>ament_cmake</buildtool_depend>

  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_lint_common</test_depend>

  <buildtool_depend>rosidl_default_generators</buildtool_depend>
  <depend>action_msgs</depend>
  <member_of_group>rosidl_interface_packages</member_of_group>

  <export>
    <build_type>ament_cmake</build_type>
  </export>
</package>

在终端中执行命令colcon build --package-select inters 执行示例如下:

fotianmoyin@fotianmoyin-moon:~/Projects/vscode/kitt_ws$ colcon build --packages-select inters
Starting >>> inters  
Finished <<< inters [5.73s]                     

Summary: 1 package finished [6.87s]
在终端中执行. install/local_setup.sh,然后再执行ros2 interface show inters/action/Turn,一切正常就可以看到消息定义内容。 执行示例如下:
fotianmoyin@fotianmoyin-moon:~/Projects/vscode/kitt_ws$ . install/local_setup.sh
fotianmoyin@fotianmoyin-moon:~/Projects/vscode/kitt_ws$ ros2 interface show inters/action/Turn
int32 direction # 目标方向(0:左;1:右)
float32 angle # 目标角度
---
float32 angle # 目标最终角度
float32 world_angle # 目标最终世界角度
---
float32 angle # 目标当前角度
float32 world_angle # 目标当前世界角度

四、在car.py中添加转向服务支持

修改car.py,添加如下代码,看标亮部分:

car.py
import pygame
import sys
import os
import getopt
from ament_index_python.packages import get_package_share_directory
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
import math
import time
from inters.action import Turn
from threading import Thread
首先,我们添加了一些导入,其中以rclpy开头的是ROS节点相关的。rclpy是ROS核心包,ActionServer是创建动作服务使用的,Node是创建ROS节点基类。math是数学计算包,time是时间相关的包,我们使用了其中的休眠方法。from inters.action import Turn这个就是引用的我们定义的转向消息。
car.py
class Car:
    def __init__(self) -> None:
        """初始化窗口并加载显示资源"""

        # 将启动ROS服务节点的任务放到单独的线程中,这样可以避免pygame和rclpy会分别阻塞的问题
        self.thd_srv = Thread(target=self.start_srv)
        self.thd_srv.start()

        pygame.init()

        if run_as_debug():
            self.share_path = os.path.join(os.path.dirname(__file__), "../resource")
        else:
            self.share_path = get_package_share_directory("kitt")

        # 设置窗口大小
        self.win = pygame.display.set_mode((800, 600), pygame.RESIZABLE)
        # 设置窗口标题
        pygame.display.set_caption("Car")
        self.world_angle = float(0)  # 车世界角度
        self.world_x = float(0)  # 车世界坐标x
        self.world_y = float(0)  # 车世界坐标y
        self.model = Model(self)
        self.map = Map(self)
        pass

    def start_srv(self):
        """启动ROS节点服务"""
        rclpy.init()
        self.car_srv = CarSrv("car", self)
        rclpy.spin(self.car_srv)
        self.car_srv.destroy_node()
        rclpy.try_shutdown()

    def run(self):
        """运行模拟器"""
        while True:
            self._check_events()
            self._update_win()

    def _check_events(self):
        """监视键盘、鼠标事件"""
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                rclpy.try_shutdown()
                sys.exit()

    def _update_win(self):
        """更新窗口显示"""
        color = (150, 150, 150)
        # 给窗口填充背景色
        self.win.fill(color=color)
        # 更新地图
        self.map.update()
        # 更新汽车模型
        self.model.update()
        # 让最近绘制的窗口可见
        pygame.display.flip()

    def set_angle(self, angle):
        self.world_angle = angle

    def turn(self, direction, angle):
        """
        转向\n
        参数:
            direction:方向(0:向左;1:向右)\n
            angle:角度
        返回:
            车当前的角度
        """
        old_angle = self.world_angle
        if direction == 0:
            old_angle -= angle
        if direction == 1:
            old_angle += angle

        self.set_angle(old_angle % 360)

        return self.world_angle
Car__init__初始化函数中,我们添加了启动节点服务的代码。这里,我们将启动代码放到一个单独的线程中。绘制界面的pygame在监听事件时是个无限循环。ROS节点启动后,也会持续监听请求事件,也是个循环。如果这两部分在一个线程中运行,就会导致其中一方无法正确执行。另外,我们还添加了world_angel变量来指示模拟器旋转角度。

_check_events()函数中,我们在退出模拟器时,先调用rclpy.try_shutdown()方法,来关闭节点服务,然后再退出程序。

set_angle()方法和turn()方法,用于在接收到转向请求时,用来调整模拟器角度。我们这里定义了旋转方向和角度两个参数。最后,设置模拟器角度时,我们让旋转后的角度对360求模,为的是限制模拟器的角度范围在360之间。

car.py
class CarSrv(Node):
    """模拟器服务节点"""
    def __init__(self, node_name, car: Car):
        super().__init__(node_name)
        self.car = car
        # 转向动作服务
        self._turn_action_server = ActionServer(
            self, Turn, "/car/turn", self.turn_callback
        )


    def turn_callback(self, goal_handle):
        self.get_logger().info("Executing turn goal...")

        feedback_msg = Turn.Feedback()

        angle = goal_handle.request.angle
        iangle = math.floor(angle)  # 整数部分
        for i in range(1, iangle + 1):
            feedback_msg.world_angle = self.car.turn(goal_handle.request.direction, 1)
            feedback_msg.angle = float(i)
            self.get_logger().info(
                f"/car/turn feedback: ({feedback_msg.angle:.2f},{feedback_msg.world_angle:.2f})"
            )
            goal_handle.publish_feedback(feedback_msg)
            time.sleep(0.01)

        dangle = angle % 1  # 小数部分
        if dangle != 0:
            feedback_msg.world_angle = self.car.turn(
                goal_handle.request.direction, dangle
            )
            feedback_msg.angle = dangle
            self.get_logger().info(
                f"/car/turn feedback: ({feedback_msg.angle:.2f},{feedback_msg.world_angle:.2f})"
            )
            goal_handle.publish_feedback(feedback_msg)

        goal_handle.succeed()
        result = Turn.Result()
        result.angle = feedback_msg.angle
        result.world_angle = feedback_msg.world_angle
        return result
CarSrv类,是我们的节点服务类。我们首先定义了动作服务,转向服务名为/car/turn。动作类消息请求过程是这样的:首先,服务端会接收到来自客户端的请求参数,然后服务端开始执行动作。执行的过程中,不断的发送反馈信息到客户端。当动作执行完后,我们通过调用goal_handle.succeed()方法,告知客户端动作执行完毕。最后,我们再返回本次请求的最终结果参数。

为了实现类似真实场景下的转向,我们每间隔100毫秒旋转一度。最后,不足一度的部分,我们再单独旋转一下。

源码下载