2

阿木币

0

精华

7 小时

在线时间

应届白菜

Rank: 1

发表于 2025-1-2 11:13:56 319 浏览 0 回复

[控制算法] 代码实现问题

目前想用代码实现无人机的功能,脱离遥控器。再观察遥控器的输入是通道值后,以代码重新定义。目前更改飞行模式,解锁,都可以实现,就是油门,偏航这些功能通道值改变了,但是真机上没反应,想问问各位大佬,是我代码的问题,还是px4的配置问题,要怎么修改呢

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import rospy
from mavros_msgs.msg import State, RCIn, PositionTarget
from mavros_msgs.srv import CommandBool, SetMode
from std_msgs.msg import Header
from geometry_msgs.msg import Point, PoseStamped
import threading
import math

# 定义全局变量
uav_state = State()
current_position = Point()
initial_channels = [1495, 1495, 1495, 1495, 1910, 1495, 1045, 1045, 1045, 1045, 1495, 1495, 1495, 1495, 1495, 1495]
current_channels = initial_channels[:]

# RC 通道定义
RC_CHANNELS = {
    "ROLL": 0,       # 横滚通道
    "PITCH": 1,      # 俯仰通道
    "THROTTLE": 2,   # 油门通道
    "YAW": 3,        # 偏航通道
    "MODE": 4,       # 模式通道
}

# 更新无人机状态
def uav_state_cb(msg):
    global uav_state
    uav_state = msg
    rospy.loginfo_throttle(5, "无人机状态更新: connected=%s, armed=%s, mode=%s",
                           uav_state.connected, uav_state.armed, uav_state.mode)

# 更新无人机当前位置
def odom_cb(msg):
    global current_position
    current_position = msg.pose.position

# 持续发布 RC 输入的线程
def rc_publisher_thread(rc_pub):
    global current_channels
    rate = rospy.Rate(50)  # 50Hz
    while not rospy.is_shutdown():
        rc_input = RCIn()
        rc_input.header = Header()
        rc_input.header.stamp = rospy.Time.now()
        rc_input.channels = current_channels[:]
        rc_pub.publish(rc_input)
        rate.sleep()

# 切换飞控模式
def switch_to_mode(set_mode_service, mode, mode_value, custom_mode_value=None):
    global current_channels
    rospy.loginfo(f"正在尝试切换到 {mode} 模式...")
    try:
        # 更新模式通道值
        current_channels[RC_CHANNELS["MODE"]] = mode_value

        # 调用模式切换服务
        res = set_mode_service(custom_mode=mode)
        if res.mode_sent:
            rospy.loginfo(f"已切换到 {mode} 模式!")
            return True
        else:
            rospy.logerr(f"切换到 {mode} 模式失败!")
            return False
    except rospy.ServiceException as e:
        rospy.logerr(f"调用模式切换服务失败: {e}")
        return False

# 等待进入目标模式
def wait_for_mode(target_mode, timeout=10):
    rospy.loginfo(f"等待进入 {target_mode} 模式...")
    start_time = rospy.Time.now()
    while uav_state.mode != target_mode and not rospy.is_shutdown():
        rospy.sleep(0.1)
        if (rospy.Time.now() - start_time).to_sec() > timeout:
            rospy.logerr(f"等待 {target_mode} 模式超时!")
            return False
    rospy.loginfo(f"已进入 {target_mode} 模式")
    return True

# 发布目标位置
def send_target_position(position_pub, x, y, z):
    target = PositionTarget()
    target.header.stamp = rospy.Time.now()
    target.coordinate_frame = PositionTarget.FRAME_LOCAL_NED
    target.type_mask = PositionTarget.IGNORE_VX + PositionTarget.IGNORE_VY + PositionTarget.IGNORE_VZ
    target.position = Point(x, y, z)
    position_pub.publish(target)

# 增大油门控制起飞
def increase_throttle_for_takeoff(target_altitude, step=100, interval=0.5):
    global current_channels
    rospy.loginfo("逐步增加油门进行起飞...")
    while current_position.z < target_altitude and not rospy.is_shutdown():
        current_channels[RC_CHANNELS["THROTTLE"]] += step
        if current_channels[RC_CHANNELS["THROTTLE"]] > 2000:
            current_channels[RC_CHANNELS["THROTTLE"]] = 2000
        rospy.sleep(interval)
    rospy.loginfo("已达到目标高度!")

# 控制无人机绕圈飞行
def rotate_in_circle(position_pub, center_x, center_y, radius=2, speed=0.2, circles=2):
    steps_per_circle = int(2 * math.pi * radius / speed)
    total_steps = steps_per_circle * circles
    for step in range(total_steps):
        angle = 2 * math.pi * (step % steps_per_circle) / steps_per_circle
        x = center_x + radius * math.cos(angle)
        y = center_y + radius * math.sin(angle)
        z = current_position.z
        send_target_position(position_pub, x, y, z)
        rospy.sleep(speed)

# 主函数
def main():
    rospy.init_node("uav_mode_control", anonymous=True)

    # 订阅无人机状态和位置信息
    rospy.Subscriber("/mavros/state", State, uav_state_cb)
    rospy.Subscriber("/mavros/local_position/pose", PoseStamped, odom_cb)

    # 初始化服务和发布器
    rospy.wait_for_service("/mavros/cmd/arming")
    rospy.wait_for_service("/mavros/set_mode")
    arming_service = rospy.ServiceProxy("/mavros/cmd/arming", CommandBool)
    set_mode_service = rospy.ServiceProxy("/mavros/set_mode", SetMode)
    rc_pub = rospy.Publisher('/mavros/rc/in', RCIn, queue_size=10)
    position_pub = rospy.Publisher('/mavros/setpoint_position/local', PositionTarget, queue_size=10)

    # 启动 RC 输入发布线程
    rc_thread = threading.Thread(target=rc_publisher_thread, args=(rc_pub,))
    rc_thread.daemon = True  # 主线程退出时自动关闭
    rc_thread.start()

    # 切换到 POSHOLD 模式
    if not switch_to_mode(set_mode_service, "POSHOLD", 1495):
        return

    # 切换到 ALTHOLD 模式
    if not switch_to_mode(set_mode_service, "ALT_HOLD", 1045):
        return

    # 解锁无人机
    try:
        res = arming_service(value=True)
        if not res.success:
            rospy.logerr("解锁失败,程序退出")
            return
        rospy.loginfo("解锁成功")
    except rospy.ServiceException as e:
        rospy.logerr(f"解锁服务调用失败: {e}")
        return

    # 起飞到 2 米高度
    increase_throttle_for_takeoff(target_altitude=2)

    # 切换到 AUTO 模式
    if not switch_to_mode(set_mode_service, "AUTO", 1495, custom_mode_value=1945):
        return

    # 绕圈飞行
    rotate_in_circle(position_pub, current_position.x, current_position.y)

    # 着陆并上锁
    send_target_position(position_pub, current_position.x, current_position.y, 0)
    rospy.sleep(10)
    try:
        res = arming_service(value=False)
        if res.success:
            rospy.loginfo("无人机已成功上锁")
        else:
            rospy.logerr("无人机上锁失败")
    except rospy.ServiceException as e:
        rospy.logerr(f"上锁服务调用失败: {e}")

if __name__ == "__main__":
    try:
        main()
    except rospy.ROSInterruptException:
        pass


我知道答案 回答被采纳将会获得2 阿木币 已有0人回答

扫一扫浏览分享
回复

使用道具 举报

返回列表
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

快速回复 返回顶部 返回列表