#!/usr/bin/env python | |
# Import the ROS libraries, and load the manifest file which through <depend package=... /> will give us access to the project dependencies | |
from aifc import Aifc_read | |
from operator import contains | |
import roslib | |
import rospy | |
import math | |
import serial | |
import time | |
from geometry_msgs.msg import Twist # for sending commands to the drone | |
from std_msgs.msg import Empty # for land/takeoff/emergency | |
from std_msgs.msg import Header | |
from std_msgs.msg import String | |
from nav_msgs.msg import Odometry | |
from geometry_msgs.msg import PoseStamped | |
from geometry_msgs.msg import Point | |
ROBOT_MAX_LIN_VEL = 0.3 | |
ROBOT_MAX_ANG_VEL = 1.82 | |
FLAG = False | |
FLAGZ = False | |
ser = serial.Serial() | |
def com_vel_callback(msg): | |
ser.flushInput() | |
vx = msg.linear.x | |
vy = msg.linear.y | |
vz = msg.angular.z | |
# print(vx, vy, vz) | |
data_tem = bytearray(100) | |
counter = 0 | |
data_tem[counter] = 0x57 | |
data_tem[counter + 1] = int(vx * 10) % 10 | |
data_tem[counter + 2] = int(vx * 100) % 10 | |
data_tem[counter + 3] = int(vx * 1000) % 10 | |
if vx >= 0: | |
data_tem[counter + 4] = 1 | |
else: | |
data_tem[counter + 4] = 0 | |
data_tem[counter + 5] = int(vy * 10) % 10 | |
data_tem[counter + 6] = int(vy * 100) % 10 | |
data_tem[counter + 7] = int(vy * 1000) % 10 | |
if vy >= 0: | |
data_tem[counter + 8] = 1 | |
else: | |
data_tem[counter + 8] = 0 | |
data_tem[counter + 9] = int(abs(vz) * 10) % 10 | |
data_tem[counter + 10] = int(abs(vz) * 100) % 10 | |
data_tem[counter + 11] = int(abs(vz) * 1000) % 10 | |
if vz >= 0: | |
data_tem[counter + 12] = 1 | |
else: | |
data_tem[counter + 12] = 0 | |
data_tem[counter + 13] = 0xFF | |
ser.write(data_tem[:counter + 14]) | |
ser.flushInput() | |
# rospy.sleep(0.01) | |
def rosSerial_init(name, baud): | |
try: | |
ser.port = name | |
ser.baudrate = baud | |
ser.timeout = 1 | |
ser.open() | |
except serial.SerialException: | |
rospy.logerr("Unable to open port") | |
return -1 | |
if ser.is_open: | |
rospy.loginfo("Serial Port initialized") | |
else: | |
return -1 | |
def constrain(input, low, high): | |
if input < low: | |
input = low | |
elif input > high: | |
input = high | |
else: | |
input = input | |
return input | |
class RobotInterface: | |
def __init__(self, navdata_name, cmd_topic_name): | |
self.k_p_xy = 1.0 | |
self.k_d_xy = 0.0 | |
self.k_yaw = 0.3 | |
self.limit_cmd_xy = 10 | |
self.target_point = Twist() | |
self.state = Odometry() | |
self.move_cmd = Twist() | |
self.get_target = False | |
self.sub_pose = rospy.Subscriber(navdata_name, Odometry, self.callback_pose) | |
self.pub_move_cmd = rospy.Publisher(cmd_topic_name, Twist, queue_size=1) | |
rospy.on_shutdown(self.Reset) | |
def Reset(self): | |
print("shutting down the ros...") | |
def callback_pose(self, msg): | |
self.state = msg | |
def move_cmd_send(self, move_cmd): | |
self.pub_move_cmd.publish(move_cmd) | |
def set_target_point(self, point): | |
self.target_point = point | |
self.get_target = True | |
def quat2eul(self, orientation): | |
x = orientation.x | |
y = orientation.y | |
z = orientation.z | |
w = orientation.w | |
t0 = -2.0 * (y * y + z * z) + 1.0 | |
t1 = +2.0 * (x * y + w * z) | |
t2 = -2.0 * (x * z - w * y) | |
t3 = +2.0 * (y * z + w * x) | |
t4 = -2.0 * (x * x + y * y) + 1.0 | |
euler_angle = Point() | |
euler_angle.x = -math.asin(t2) | |
euler_angle.y = -math.atan2(t3, t4) | |
euler_angle.z = -math.atan2(t1, t0) | |
return euler_angle | |
def run(self): | |
global FLAG | |
global FLAGZ | |
cmd = Twist() | |
if (self.get_target): | |
err_x = (self.target_point.linear.x - self.state.pose.pose.position.x-0.1) | |
err_y = (self.target_point.linear.y - self.state.pose.pose.position.y) | |
euler_angle = self.quat2eul(self.state.pose.pose.orientation) | |
# print(self.state.pose.pose.position.x, self.state.pose.pose.position.y, euler_angle.z) | |
yaw = euler_angle.z | |
# print(yaw) | |
cmd.linear.x = constrain(self.k_p_xy * (err_x * math.cos(yaw) + err_y * math.sin(yaw)), -ROBOT_MAX_LIN_VEL, ROBOT_MAX_LIN_VEL) | |
cmd.linear.y = constrain(self.k_p_xy * (- err_x * math.sin(yaw) + err_y * math.cos(yaw)), -ROBOT_MAX_LIN_VEL, ROBOT_MAX_LIN_VEL) | |
cmd.angular.z = constrain(-self.k_yaw * (self.target_point.angular.z - yaw),-ROBOT_MAX_ANG_VEL/2, ROBOT_MAX_ANG_VEL/2) | |
if abs(cmd.linear.x) < 0.003 : | |
cmd.linear.x = 0 | |
if abs(cmd.linear.y) < 0.003 : | |
cmd.linear.y = 0 | |
if abs(cmd.angular.z) < 0.003 : | |
cmd.angular.z = 0 | |
if FLAGZ: | |
cmd.linear.x = 0 | |
cmd.linear.y = 0 | |
com_vel_callback(cmd) | |
self.move_cmd_send(cmd) | |
if cmd.linear.x == 0 and cmd.linear.y == 0 : | |
FLAGZ = True | |
# print(cmd.linear.x,cmd.linear.y,cmd.angular.z) | |
if cmd.angular.z == 0 and cmd.linear.x == 0 and cmd.linear.y == 0 : | |
FLAG = True | |
if __name__ == "__main__": | |
rospy.init_node('CarMecanum') | |
# rospy.init_node("serial_example_node") | |
car_mecanum = RobotInterface('/camera/odom/sample', '/cmd_vel') | |
# write_sub = rospy.Subscriber("/cmd_vel", Twist, com_vel_callback) | |
if rosSerial_init("/dev/ttyUSB0", 115200) == -1: | |
exit(-1) | |
rate = rospy.Rate(100) | |
t_start = time.time() | |
target = Twist() | |
# 90 --> 1.583 | |
goal = [[0, 0, 1.52, 1.87, 1.86,1.86, 1.86], [0, 0.2, 0.2, 0.2, 1, 1, 1 ],[0, 0, 0, 0, 0, -1.583 , 0]] | |
# goal = [[0, 0, 0.2], [0, 0, 0],[0, -1.583, -1.583]] | |
i = 0 | |
goalx = goal[0][i] | |
goaly = goal[1][i] | |
goalz = 0 | |
target.linear.x = 0 | |
target.linear.y = 0 | |
target.angular.z = 0 | |
w = 0.05 | |
while not rospy.is_shutdown(): | |
# print(FLAG) | |
if FLAG: | |
time.sleep(2) | |
FLAG = False | |
FLAGZ = False | |
i += 1 | |
print(i) | |
goalx = goal[0][i] | |
goaly = goal[1][i] | |
goalz = goal[2][i] | |
# if FLAGZ: | |
# goalz = goal[2][i] | |
target.linear.x = goalx | |
target.linear.y = goaly | |
target.angular.z = goalz | |
car_mecanum.set_target_point(target) | |
car_mecanum.run() | |
rate.sleep() |
# 解释
#!/usr/bin/env python |
这是一个常见的行,指示操作系统使用 Python 来执行这个脚本。
from aifc import Aifc_read | |
from operator import contains |
这里导入了一些模块,但是在代码中并没有使用它们,可能是不必要的导入。
import roslib | |
import rospy | |
import math | |
import serial | |
import time |
这些是导入了一些重要的 Python 库和模块,包括 ROS 的库( roslib 和 rospy )、数学库( math )、串口通信库( serial )以及时间库( time )。
from geometry_msgs.msg import Twist # 用于发送控制机器人的命令 | |
from std_msgs.msg import Empty # 用于控制机器人的起飞、降落、紧急停止等 | |
from std_msgs.msg import Header | |
from std_msgs.msg import String | |
from nav_msgs.msg import Odometry | |
from geometry_msgs.msg import PoseStamped | |
from geometry_msgs.msg import Point |
这些是 ROS 消息类型,用于通过 ROS 话题进行数据传输。每个消息类型都有特定的结构和字段,这些字段可以包含有关机器人状态、命令和传感器数据的信息。
ROBOT_MAX_LIN_VEL = 0.3 | |
ROBOT_MAX_ANG_VEL = 1.82 | |
FLAG = False | |
FLAGZ = False |
这里定义了一些全局变量,包括机器人的最大线性速度和最大角速度,以及两个布尔变量 FLAG 和 FLAGZ 。
ser = serial.Serial() |
这是一个串行通信对象,稍后将用于与机器人的串行端口通信。
def com_vel_callback(msg): |
这是一个回调函数,它用于处理来自 ROS 话题 /cmd_vel 的消息。这个消息通常包含控制机器人的线性速度和角速度。
ser.flushInput() |
这里清空了串口输入缓冲区。
vx = msg.linear.x | |
vy = msg.linear.y | |
vz = msg.angular.z |
这里从接收到的消息中提取了线性速度(vx、vy)和角速度(vz)。
data_tem = bytearray(100) | |
counter = 0 | |
data_tem[counter] = 0x57 |
这里创建了一个字节数组 data_tem 和一个计数器 counter ,并将字节 0x57 存储在 data_tem 的第一个位置。
data_tem[counter + 1] = int(vx * 10) % 10 | |
data_tem[counter + 2] = int(vx * 100) % 10 | |
data_tem[counter + 3] = int(vx * 1000) % 10 |
这里将线性速度 vx 的各个位数存储在 data_tem 中,通过将速度乘以不同的因子来实现。
if vx >= 0: | |
data_tem[counter + 4] = 1 | |
else: | |
data_tem[counter + 4] = 0 |
这里将一个表示速度方向的位存储在 data_tem 中,如果 vx 大于等于 0,表示正方向,否则是负方向。
data_tem[counter + 5] = int(vy * 10) % 10 | |
data_tem[counter + 6] = int(vy * 100) % 10 | |
data_tem[counter + 7] = int(vy * 1000) % 10 |
这里将线性速度 vy 的各个位数以及速度方向存储在 data_tem 中。
if vy >= 0: | |
data_tem[counter + 8] = 1 | |
else: | |
data_tem[counter + 8] = 0 |
这里存储 vy 的方向信息,与 vx 类似。
data_tem[counter + 9] = int(abs(vz) * 10) % 10 | |
data_tem[counter + 10] = int(abs(vz) * 100) % 10 | |
data_tem[counter + 11] = int(abs(vz) * 1000) % 10 |
这里将角速度 vz 的各个位数存储在 data_tem 中,同时也存储了方向信息。
if vz >= 0: | |
data_tem[counter + 12] = 1 | |
else: | |
data_tem[counter + 12] = 0 |
这里存储 vz 的方向信息。
data_tem[counter + 13] = 0xFF | |
ser.write(data_tem[:counter + 14]) | |
ser.flushInput() |
这里将一个结束标志( 0xFF )存储在 data_tem 中,并将整个数据包写入串口。然后清空了串口输入缓冲区。
def rosSerial_init(name, baud): |
这是初始化串口通信的函数,接受串口名称和波特率作为参数。
try: | |
ser.port = name | |
ser.baudrate = baud | |
ser.timeout = 1 | |
ser.open() | |
except serial.SerialException: | |
rospy.logerr("Unable to open port") | |
return -1 |
在这里,尝试将串口对象的属性设置为传入的串口名称和波特率,并打开串口。如果出现串口打开失败的异常,将记录错误消息并返回 - 1。
if ser.is_open: | |
rospy.loginfo("Serial Port initialized") | |
else: | |
return -1 |
如果串口成功打开,将记录初始化成功的信息。
继续解释代码:
def constrain(input, low, high): | |
if input < low: | |
input = low | |
elif input > high: | |
input = high | |
else: | |
input = input | |
return input |
这是一个限制输入值在指定范围内的函数。如果输入值小于最小值 low ,则将输入值设置为 low ;如果输入值大于最大值 high ,则将输入值设置为 high ;否则,保持输入值不变,然后返回最终结果。
class RobotInterface: |
这是一个自定义的类 RobotInterface 的定义,它包含了一些机器人控制相关的方法和属性。
def __init__(self, navdata_name, cmd_topic_name): |
这是 RobotInterface 类的构造函数,它在创建类的实例时被调用。它接受两个参数 navdata_name 和 cmd_topic_name ,这些参数可能是用于指定 ROS 话题名称的。
self.k_p_xy = 1.0 | |
self.k_d_xy = 0.0 | |
self.k_yaw = 0.3 | |
self.limit_cmd_xy = 10 | |
self.target_point = Twist() | |
self.state = Odometry() | |
self.move_cmd = Twist() | |
self.get_target = False |
在构造函数中,初始化了一些属性,包括控制器的一些参数(如 k_p_xy 、 k_d_xy 、 k_yaw 、 limit_cmd_xy )、目标点 target_point 、机器人状态 state 、移动命令 move_cmd 和一个标志 get_target 。
self.sub_pose = rospy.Subscriber(navdata_name, Odometry, self.callback_pose) | |
self.pub_move_cmd = rospy.Publisher(cmd_topic_name, Twist, queue_size=1) | |
rospy.on_shutdown(self.Reset) |
在构造函数中还创建了 ROS 的订阅者和发布者对象,用于接收机器人状态信息和发送移动命令。同时,在 ROS 节点关闭时,会执行 self.Reset 方法。
def Reset(self): | |
print("shutting down the ros...") |
这是一个类方法 Reset ,在 ROS 节点关闭时被调用。它简单地打印一条消息。
def callback_pose(self, msg): | |
self.state = msg |
这是一个回调函数 callback_pose ,它用于处理来自 ROS 话题的机器人状态信息,并将其存储在类属性 self.state 中。
def move_cmd_send(self, move_cmd): | |
self.pub_move_cmd.publish(move_cmd) |
这是一个方法 move_cmd_send ,它用于发布机器人的移动命令到 ROS 话题,以实现机器人的运动控制。
def set_target_point(self, point): | |
self.target_point = point | |
self.get_target = True |
这是一个方法 set_target_point ,它接受一个目标点 point ,并将其存储在类属性 self.target_point 中,同时将标志 self.get_target 设置为 True ,表示已经设置了目标点。
def quat2eul(self, orientation): |
这是一个方法 quat2eul ,用于将四元数表示的姿态信息转换为欧拉角(Euler angles)表示的姿态信息。
def run(self): |
这是一个方法 run ,它实现了机器人的运动控制逻辑。
global FLAG | |
global FLAGZ | |
cmd = Twist() | |
if (self.get_target): |
在 run 方法中,首先声明了全局变量 FLAG 和 FLAGZ ,然后创建了一个空的 Twist 类型的消息对象 cmd 。接着检查 self.get_target 标志,如果为 True ,表示已经设置了目标点,就执行下面的运动控制逻辑。
err_x = (self.target_point.linear.x - self.state.pose.pose.position.x-0.1) | |
err_y = (self.target_point.linear.y - self.state.pose.pose.position.y) | |
euler_angle = self.quat2eul(self.state.pose.pose.orientation) |
在运动控制逻辑中,计算了当前机器人位置与目标点之间的误差,包括线性坐标 err_x 和 err_y ,以及姿态角度 euler_angle 。
yaw = euler_angle.z | |
cmd.linear.x = constrain(self.k_p_xy * (err_x * math.cos(yaw) + err_y * math.sin(yaw)), -ROBOT_MAX_LIN_VEL, ROBOT_MAX_LIN_VEL) | |
cmd.linear.y = constrain(self.k_p_xy * (- err_x * math.sin(yaw) + err_y * math.cos(yaw)), -ROBOT_MAX_LIN_VEL, ROBOT_MAX_LIN_VEL) | |
cmd.angular.z = constrain(-self.k_yaw * (self.target_point.angular.z - yaw),-ROBOT_MAX_ANG_VEL/2, ROBOT_MAX_ANG_VEL/2) |
根据误差和一些控制参数,计算出新的线性速度和角速度命令 cmd 。这里使用了 constrain 函数来确保速度命令不超过最大值。
if abs(cmd.linear.x) < 0.003 : | |
cmd.linear.x = 0 | |
if abs(cmd.linear.y) < 0.003 : | |
cmd.linear.y = 0 | |
if abs(cmd.angular.z) < 0.003 : | |
cmd.angular.z = 0 |
这里进一步处理线性速度和角速度命令,将非常小的值(小于 0.003)设为 0,以避免微小的速度命令。
if FLAGZ: | |
cmd.linear.x = 0 | |
cmd.linear.y = 0 |
如果 FLAGZ 标志为 True ,表示需要停止机器人的线性运动。
com_vel_callback(cmd) | |
self.move_cmd_send(cmd) |
调用 com_vel_callback 函数将命令 cmd 发送给机器人,然后使用 self.move_cmd_send(cmd) 发布命令到 ROS 话题。
if cmd.linear.x == 0 and cmd.linear.y == 0 : | |
FLAGZ = True |
这里检查如果机器人的线性速度 cmd.linear.x 和 cmd.linear.y 都为零,就将 FLAGZ 设置为 True ,表示机器人的线性运动已经停止。
if cmd.angular.z == 0 and cmd.linear.x == 0 and cmd.linear.y == 0 : | |
FLAG = True |
这里检查如果机器人的角速度 cmd.angular.z 和线性速度 cmd.linear.x 以及 cmd.linear.y 都为零,就将 FLAG 设置为 True ,表示机器人的运动已经停止。
if __name__ == "__main__": |
这是 Python 中的一个惯用用法,表示如果这个脚本被直接运行而不是被导入为模块时,才执行以下代码块。
rospy.init_node('CarMecanum') |
这里初始化了一个 ROS 节点,节点名称为 'CarMecanum' 。
car_mecanum = RobotInterface('/camera/odom/sample', '/cmd_vel') |
创建了一个 RobotInterface 类的实例 car_mecanum ,并传入了两个参数,分别是机器人状态话题名称和移动命令话题名称。
if rosSerial_init("/dev/ttyUSB0", 115200) == -1: | |
exit(-1) |
调用 rosSerial_init 函数,初始化串行通信,如果初始化失败则退出脚本。
rate = rospy.Rate(100) |
创建一个 ROS 的速率控制对象,设置为 100Hz,即每秒循环 100 次。
t_start = time.time() | |
target = Twist() |
获取当前时间,并创建一个空的 Twist 类型的消息对象 target 。
goal = [[0, 0, 1.52, 1.87, 1.86, 1.86, 1.86], [0, 0.2, 0.2, 0.2, 1, 1, 1], [0, 0, 0, 0, 0, -1.583, 0]] |
定义了一个目标点列表 goal ,它包含了三个子列表,每个子列表表示一个目标点的线性速度和角速度信息。
i = 0 | |
goalx = goal[0][i] | |
goaly = goal[1][i] | |
goalz = 0 | |
target.linear.x = 0 | |
target.linear.y = 0 | |
target.angular.z = 0 | |
w = 0.05 |
初始化了一些变量,包括索引 i ,以及目标点的初始值。
while not rospy.is_shutdown(): |
这是一个 ROS 节点的主循环,只要 ROS 节点没有被关闭,就会一直执行以下代码块。
if FLAG: | |
time.sleep(2) | |
FLAG = False | |
FLAGZ = False | |
i += 1 | |
print(i) | |
goalx = goal[0][i] | |
goaly = goal[1][i] | |
goalz = goal[2][i] |
如果 FLAG 为 True ,表示当前目标点已经达到,程序会休眠 2 秒,然后将 FLAG 和 FLAGZ 重置为 False ,并切换到下一个目标点。
target.linear.x = goalx | |
target.linear.y = goaly | |
target.angular.z = goalz |
将计算好的目标点的线性速度和角速度信息存储在 target 中。
car_mecanum.set_target_point(target) | |
car_mecanum.run() | |
rate.sleep() |
通过 car_mecanum 对象的方法设置目标点,然后调用 run 方法执行机器人的运动控制,最后使用速率控制对象 rate 控制循环的频率。
这就是代码的主要逻辑和功能。它是一个 ROS 控制机器人运动的程序,根据一系列预定义的目标点来控制机器人的运动。当达到一个目标点后,会等待一段时间然后切换到下一个目标点。同时,通过串口通信将机器人的速度命令发送给机器人。