Перейти к содержанию

Материалы для мастер-класса по дронам

Установка vbox ubuntu

  1. Vbox 7.2.6 Downloads – Oracle VirtualBox
  2. Ubuntu 24.04.4 LTS, официальный Download Ubuntu Desktop | Ubuntu
  3. Установка по инструкции How to run an Ubuntu Desktop virtual machine using VirtualBox 7 | Ubuntu
  4. Идёт в автоматическом режиме, указал root/root имя пользователя и пароль. Увеличил оперативку до 4096, процессоров до 4, размер жесткого диска 30 гб. Включил установку дополнений гостевой ОС
  5. После установки пользователь и пароль drone/root
  6. Установка пакетов
    sudo apt install python3 python3-pip
    pip3 --version
    
  7. pycharm

    sudo apt install libfuse2
    

    Download: https://github.com/JetBrains/intellij-community/releases/tag/pycharm%2F2025.3.2.1
    tar.gz

  8. qgc 5.08.64 https://docs.qgroundcontrol.com/master/en/qgc-user-guide/getting_started/download_and_install.html#ubuntu

    sudo apt install gstreamer1.0-plugins-bad gstreamer1.0-libav gstreamer1.0-gl -y
    sudo apt install libfuse2 -y
    sudo apt install libxcb-xinerama0 libxkbcommon-x11-0 libxcb-cursor-dev -y
    

    Скачал AppImage, chmod+x

  9. SITL. Скачать, потом

    https://github.com/ArduPilot/ardupilot.git
    Tools/environment_install/install-prereqs-ubuntu.sh -y
    

    Уже всё ставит и мавлинк и мавпрокси

So the commands I tried were,  
# 1. Activate virtualenv

source ~/venv-ardupilot/bin/activate

# 2. Navigate to ArduPilot repo

cd ~/ardupilot

# 3. Get the correct version

git fetch --all

git checkout Copter-4.6.3

# 4. Pull in all submodules

git submodule update --init --recursive

# 5. Install Python deps (optional but typical)

# pip install -r requirements.txt

# 6. Rebuild

./waf configure --board sitl

./waf copter

# 7. Launch

cd ArduCopter

sim_vehicle.py -v ArduCopter -f quad --console --map

Установка под Windows

Установка Mission Planner

PC Win11.
Release Mission Planner 1.3.83 · ArduPilot/MissionPlanner

Скачиваем, извлекаем так, чтобы было:
C:\drone\MissionPlanner\version.txt
Проверяем номер версии: 1.3.9384.38258
Создаём ярлык, для удобства:

Запускаем, проверяем, что работает и номер версии в шапке

Ручная установка SITL

  1. Копируем каталог по ссылке ... TODO
  2. Распаковываем в C:\drone\sitl\
  3. Проверяем запуск:
    C:\drone\sitl>ArduCopter.exe --help
    Options:
            --help|-h                display this help information
            --wipe|-w                wipe eeprom
            --unhide-groups|-u       parameter enumeration ignores AP_PARAM_FLAG_ENABLE
            --speedup|-s SPEEDUP     set simulation speedup
            --rate|-r RATE           set SITL framerate
            --console|-C             use console instead of TCP ports
            --instance|-I N          set instance of SITL (adds 10*instance to all port numbers)
            --synthetic-clock|-S     set synthetic clock mode
            --home|-O HOME           set start location (lat,lng,alt,yaw) or location name
    ...
    

    Из исходников берем ardupilot-Copter-4.6.3\Tools\autotest\default_params\

Запуск:
C:\drone\sitl>ArduCopter.exe --model copter --speedup 1 --slave 0 --defaults default_params/copter.parm --sim-address=127.0.0.1 -I0
Результат:

Setting SIM_SPEEDUP=1.000000
Suggested EK3_DRAG_BCOEF_* = 17.209, EK3_DRAG_MCOEF = 0.209
Starting sketch 'ArduCopter'
Starting SITL input
Using Irlock at port : 9005
bind port 5760 for SERIAL0
SERIAL0 on TCP port 5760
Waiting for connection ....

Проверка соединения SITL-MissionPlanner

В MissionPlanner подключаемся. Указываем порт TCP, номер по-умолчанию 5760.

В MissionPlanner результат подключения:

Проверяем команды.
Ждём готовности:

Выдаём ARM:

После появления ARMED надо быстро выбрать take off в контекстном меню.

Может, надо дополнительно поставить Guided-режим.
Проверяем, что взлетели:

Правой кнопкой в месте экрана и выбираем "Лететь сюда":

Садимся домой. Для этого выбираем режим RTL. Он прилетает на место, садится и переходит в режим DISARMED.

Установка пакета: pip install pymavlink
Запускаем стандартный батник sitl.

Тестовый скрипт, можно построчно, подключаемся так же, как и GCS

from pymavlink import mavutil

mav = mavutil.mavlink_connection('tcp:127.0.0.1:5760')
mav.wait_heartbeat()
print(f"Heartbeat from system (system {mav.target_system} component {mav.target_component})")

Результат подключения:

mavproxy

pip install mavproxy
Запуск с возможностью подключения наземной станции Mission Planner:

mavproxy.py --master=tcp:127.0.0.1:5760 --out=udp:127.0.0.1:14550 --out=udp:127.0.0.1:14551

Порядок запуска:
1. SITL
2. Proxy
3. MissionPlanner (автоматом подключается)
4. Python скрипт - udp:127.0.0.1:14550
Прослеживаем команды и состояния

from pymavlink import mavutil

mav = mavutil.mavlink_connection('udp:127.0.0.1:14550')
mav.wait_heartbeat()
print(f"Heartbeat from system (system {mav.target_system} component {mav.target_component})")

Считываем сообщения в канале:

>python scripts\print_all_messages.py
Heartbeat from system (system 1 component 0)
{'mavpackettype': 'AHRS', 'omegaIx': -0.0018503174651414156, 'omegaIy': -0.0018894965760409832, 'omegaIz': -0.002428414998576045, 'accel_weight': 0.0, 'renorm_val': 0.0, 'error_rp': 0.0023577536921948195, 'error_yaw': 0.001460870960727334}
{'mavpackettype': 'AHRS2', 'roll': -0.0015929507790133357, 'pitch': -0.0018164949724450707, 'yaw': -0.11898527294397354, 'altitude': 584.0899658203125, 'lat': -353632621, 'lng': 1491652374}
{'mavpackettype': 'ATTITUDE', 'time_boot_ms': 486149, 'roll': -0.0010556308552622795, 'pitch': -0.001273558591492474, 'yaw': -0.14075033366680145, 'rollspeed': -0.0002452039625495672, 'pitchspeed': -0.00028527574613690376, 'yawspeed': -0.0008685493376106024}
{'mavpackettype': 'GLOBAL_POSITION_INT', 'time_boot_ms': 486149, 'lat': -353632620, 'lon': 1491652373, 'alt': 584080, 'relative_alt': -5, 'vx': 1, 'vy': -1, 'vz': 0, 'hdg': 35194}
{'mavpackettype': 'VFR_HUD', 'airspeed': 0.02374367043375969, 'groundspeed': 0.02374294400215149, 'heading': 351, 'throttle': 0, 'alt': 584.0800170898438, 'climb': 0.00018553633708506823}
...

Описания команд:
Pymavlink · GitBook

Запуск виртуальной машины

На примере windows

Downloads – Oracle VirtualBox

Установка всего по-дефолту. Потом запускаем

Выбор ova, потом:

После импорта запускаем

Пароль root
Проверяем запуск SITL+MavProxy, иконка AP слева:

Разработка

Pycharm и виртуальное окружение

Community-версия:
Releases · JetBrains/intellij-community

Настройка виртуального окружения

from pymavlink import mavutil  

mav = mavutil.mavlink_connection('udp:localhost:14550')  
mav.wait_heartbeat()  
print(f"Heartbeat from system (system {mav.target_system} component {mav.target_component})")

Заглушка для тестов

Скрипт stubmavlink.py, который имитирует поведение реального pymavlink для тестирования согласно ТЗ:

#!/usr/bin/env python3  
# -*- coding: utf-8 -*-  
"""  
Stub‑модуль для pymavlink, имитирующий работу с дроном.  
Полностью совместим с API реального pymavlink (в том числе  
через атрибут ``master.mav``).  
"""  
from __future__ import annotations  

import random  
import threading  
import time  
from typing import Any, List, Optional  


# ----------------------------------------------------------------------  
#  Эмуляция пакета ``pymavlink`` → ``mavutil`` и вложенного ``mavlink``  
# ----------------------------------------------------------------------  
class mavutil:                              # pragma: no cover  
    """Набор констант и вспомогательных функций, аналогичный настоящему  
    пакету ``pymavlink.mavutil``.    """    # --------------------------- Константы ---------------------------  
    MAV_MODE_FLAG_SAFETY_ARMED = 128  
    MAV_MODE_FLAG_CUSTOM_MODE_ENABLED = 1  

    MAV_CMD_NAV_TAKEOFF = 22  
    MAV_CMD_NAV_LAND = 21  

    MAV_DATA_STREAM_EXTENDED_STATUS = 2  
    MAV_DATA_STREAM_POSITION = 6  

    # --------------------------- Вложенный пакет --------------------  
    class mavlink:                         # pragma: no cover  
        """Подмодуль, в котором находятся типы сообщений и  
        дополнительные константы.        """        # (повторяем константы для удобства – так же как в реальном пакете)  
        MAV_MODE_FLAG_SAFETY_ARMED = 128  
        MAV_MODE_FLAG_CUSTOM_MODE_ENABLED = 1  

        # ---------- Команды ----------  
        MAV_CMD_NAV_TAKEOFF = 22  
        MAV_CMD_NAV_LAND = 21  

        # ---------- Потоки данных ----------  
        # <-- **ОТВЕТСТВЕННО ДОБАВЛЕНО** -->        # В оригинальном pymavlink они находятся здесь, поэтому именно здесь        # они и должны быть объявлены.        MAV_DATA_STREAM_EXTENDED_STATUS = 2  
        MAV_DATA_STREAM_POSITION = 6  



        class MAV_MODE:                    # pragma: no cover  
            MAV_MODE_PREFLIGHT = 0  
            MAV_MODE_STABILIZE_DISARMED = 80  
            MAV_MODE_STABILIZE_ARMED = 208  
            MAV_MODE_GUIDED_DISARMED = 88  
            MAV_MODE_GUIDED_ARMED = 216  
            MAV_MODE_AUTO_DISARMED = 92  
            MAV_MODE_AUTO_ARMED = 220  
            MAV_MODE_TEST_DISARMED = 66  
            MAV_MODE_TEST_ARMED = 194  

        # ----------------------- Сообщения -----------------------  
        class MAVLink_heartbeat_message:  # pragma: no cover  
            """Имитация MAVLink‑сообщения HEARTBEAT."""  
            def __init__(self, system_id: int = 1, component_id: int = 1):  
                self.type = 2                     # MAV_TYPE_QUADROTOR  
                self.autopilot = 3                # MAV_AUTOPILOT_ARDUPILOT  
                self.base_mode = 81               # базовый режим (пример)  
                self.custom_mode = 0  
                self.system_status = 3            # MAV_STATE_ACTIVE  
                self.mavlink_version = 3  
                self._type = "HEARTBEAT"  

            def __repr__(self) -> str:  
                return f"<HEARTBEAT base_mode={self.base_mode} custom_mode={self.custom_mode}>"  

        class MAVLink_sys_status_message:   # pragma: no cover  
            """Имитация MAVLink‑сообщения SYS_STATUS."""  
            def __init__(self):  
                self.voltage_battery = 12500     # мВ  
                self.current_battery = 1000      # мА  
                self.battery_remaining = 85      # %  
                self._type = "SYS_STATUS"  

            def __repr__(self) -> str:  
                return (f"<SYS_STATUS voltage={self.voltage_battery}mV "  
                        f"current={self.current_battery}mA remaining={self.battery_remaining}%>")  

        class MAVLink_global_position_int_message:   # pragma: no cover  
            """Имитация MAVLink‑сообщения GLOBAL_POSITION_INT."""  
            def __init__(self):  
                self.time_boot_ms = int(time.time() * 1000)  
                self.lat = 554500000      # 55.45° * 1e7  
                self.lon = 375000000      # 37.5°  * 1e7  
                self.alt = 150000         # над уровнем моря, мм  
                self.relative_alt = 0  
                self.vx = self.vy = self.vz = 0  
                self.hdg = 0  
                self._type = "GLOBAL_POSITION_INT"  

            def __repr__(self) -> str:  
                return (f"<GLOBAL_POSITION_INT lat={self.lat} lon={self.lon} "  
                        f"rel_alt={self.relative_alt}mm>")  

    # ------------------------------------------------------------------  
    #  Вспомогательная функция – перевод custom_mode → строка режима    # ------------------------------------------------------------------    @staticmethod  
    def mode_string_v10(msg: Any) -> str:  
        """Возвращает «читаемое» название режима по полю custom_mode."""  
        modes = {  
            0: "STABILIZE",  
            1: "ACRO",  
            2: "ALT_HOLD",  
            3: "AUTO",  
            4: "GUIDED",  
            5: "LOITER",  
            6: "RTL",  
            7: "CIRCLE",  
            8: "LAND",  
            9: "DRIFT",  
            10: "SPORT",  
            11: "FLIP",  
            12: "AUTOTUNE",  
            13: "POSHOLD",  
            14: "BRAKE",  
            15: "THROW",  
            16: "AVOID_ADSB",  
            17: "GUIDED_NOGPS",  
            18: "SMART_RTL",  
            19: "FLOWHOLD",  
            20: "FOLLOW",  
            21: "ZIGZAG",  
            22: "SYSTEMID",  
            23: "AUTOROTATE",  
            24: "AUTO_RTL",  
        }  
        return modes.get(getattr(msg, "custom_mode", None), "UNKNOWN")  


# ----------------------------------------------------------------------  
#  Универсальное «сообщение» (не используется в текущей версии,  
#  но оставлено для совместимости)  
# ----------------------------------------------------------------------  
class StubMessage:  
    """Простой контейнер, имитирующий объект MAVLink‑сообщения."""  
    def __init__(self, msg_type: str, **kwargs: Any):  
        self._type = msg_type  
        for key, value in kwargs.items():  
            setattr(self, key, value)  

    def get_type(self) -> str:  # pragma: no cover  
        return self._type  

    def __repr__(self) -> str:  # pragma: no cover  
        return f"<StubMessage type={self._type}>"  


# ----------------------------------------------------------------------  
#  Прокси‑объект, который будет доступен как ``master.mav``.  
#  Он просто «перенаправляет» вызовы к самому соединению.  
# ----------------------------------------------------------------------  
class _MavProxy:  
    """Объект‑прокси, имитирующий ``master.mav`` из pymavlink."""  
    def __init__(self, conn: "StubMavlinkConnection"):  
        self._conn = conn  

    # Все ниже‑приведённые методы просто вызывают одноимённые методы  
    # у ``StubMavlinkConnection``.  Это упрощает поддержку совместимости.    def request_data_stream_send(self,  
                                 target_system: int,  
                                 target_component: int,  
                                 stream_id: int,  
                                 rate: int,  
                                 start: int) -> None:  
        self._conn.request_data_stream_send(  
            target_system, target_component, stream_id, rate, start  
        )  

    def set_mode_send(self,  
                      target_system: int,  
                      base_mode: int,  
                      custom_mode: int) -> None:  
        self._conn.set_mode_send(target_system, base_mode, custom_mode)  

    def command_long_send(self,  
                          target_system: int,  
                          target_component: int,  
                          command: int,  
                          confirmation: int,  
                          param1: float = 0,  
                          param2: float = 0,  
                          param3: float = 0,  
                          param4: float = 0,  
                          param5: float = 0,  
                          param6: float = 0,  
                          param7: float = 0) -> None:  
        self._conn.command_long_send(  
            target_system, target_component, command, confirmation,  
            param1, param2, param3, param4, param5, param6, param7  
        )  

    # Для отладки удобно иметь строковое представление.  
    def __repr__(self) -> str:  # pragma: no cover  
        return f"<MAVProxy for {self._conn.connection_string}>"  


# ----------------------------------------------------------------------  
#  Основной класс соединения – полностью имитирует API pymavlink  
# ----------------------------------------------------------------------  
class StubMavlinkConnection:  
    """Эмуляция соединения MAVLink (как в pymavlink)."""  
    # --------------------------------------------------------------  
    #  Инициализация    # --------------------------------------------------------------    def __init__(self, connection_string: str):  
        """  
        Args:            connection_string: строка подключения (не используется,                но сохраняется для совместимости с реальной библиотекой).        """        self.connection_string = connection_string  
        self.target_system = 1  
        self.target_component = 1  

        # ----- Параметры «самолёта» -----  
        self._armed: bool = False  
        self._mode: int = 4                      # 4 → GUIDED  
        self._altitude: float = 0.0              # метры над землёй  
        self._target_altitude: float = 0.0  
        self._battery_voltage: int = 12500       # мВ  
        self._battery_current: int = 1000        # мА  
        self._battery_remaining: int = 85        # %  

        self._flying: bool = False  
        self._landing: bool = False  
        self._connected: bool = True  

        # ----- Внутренняя очередь сообщений -----  
        self._messages: List[Any] = []  
        self._heartbeat_count: int = 0  

        # ----- Прокси‑объект, доступный как ``master.mav`` -----  
        self.mav = _MavProxy(self)  

        # ----- Фоновый поток телеметрии -----  
        self._running = True  
        self._thread = threading.Thread(target=self._telemetry_loop, daemon=True)  
        self._thread.start()  

        print(f"[STUB] Подключение к {connection_string} (эмуляция)")  

    # --------------------------------------------------------------  
    #  Фоновый генератор телеметрии    # --------------------------------------------------------------    def _telemetry_loop(self) -> None:  
        """Генерирует сообщения в фоновом потоке (примерно 10 Гц)."""  
        while self._running:  
            # ---- Управление высотой ----  
            if self._flying:  
                if self._landing:  
                    # Спускаемся при посадке  
                    self._altitude = max(0.0, self._altitude - 0.5)  
                    if self._altitude == 0.0:  
                        self._flying = False  
                        self._landing = False  
                        self._armed = False  
                else:  
                    # Подъём/спуск к целевой высоте  
                    if self._altitude < self._target_altitude:  
                        self._altitude = min(self._target_altitude,  
                                             self._altitude + 0.5)  
                    elif self._altitude > self._target_altitude:  
                        self._altitude = max(self._target_altitude,  
                                             self._altitude - 0.5)  

            # ---- Формируем сообщения ----  
            self._generate_heartbeat()  
            self._generate_sys_status()  
            self._generate_global_position()  

            time.sleep(0.1)          # ≈10 Гц  

    # --------------------------------------------------------------    #  Сборка отдельных сообщений    # --------------------------------------------------------------    def _generate_heartbeat(self) -> None:  
        """Создаёт сообщение HEARTBEAT и помещает его в очередь."""  
        self._heartbeat_count += 1  
        hb = mavutil.mavlink.MAVLink_heartbeat_message()  
        # Выбор режима в зависимости от текущего состояния  
        if self._flying:  
            hb.custom_mode = 8 if self._landing else 4   # LAND / GUIDED  
        else:  
            hb.custom_mode = 0                           # PREFLIGHT  
        if self._armed:  
            hb.base_mode |= mavutil.MAV_MODE_FLAG_SAFETY_ARMED  
        self._messages.append(hb)  

    def _generate_sys_status(self) -> None:  
        """Создаёт сообщение SYS_STATUS."""  
        ss = mavutil.mavlink.MAVLink_sys_status_message()  
        # Разряжаем батарею каждые 5 сек (≈50 heartbeat)  
        if self._flying and self._heartbeat_count % 50 == 0:  
            self._battery_remaining = max(0, self._battery_remaining - 1)  
            self._battery_voltage = 12500 - (85 - self._battery_remaining) * 50  
        ss.voltage_battery = self._battery_voltage  
        ss.battery_remaining = self._battery_remaining  
        self._messages.append(ss)  

    def _generate_global_position(self) -> None:  
        """Создаёт сообщение GLOBAL_POSITION_INT."""  
        pos = mavutil.mavlink.MAVLink_global_position_int_message()  
        pos.relative_alt = int(self._altitude * 1000)   # в миллиметрах  
        if self._flying:  
            # Добавляем небольшую случайную «дрифту» GPS‑координат  
            pos.lat += random.randint(-10, 10)  
            pos.lon += random.randint(-10, 10)  
        self._messages.append(pos)  

    # --------------------------------------------------------------  
    #  API, которое используют скрипты‑клиенты  
    # --------------------------------------------------------------    def wait_heartbeat(self, timeout: int = 10) -> bool:  
        """  
        Ожидает появления любого HEARTBEAT‑сообщения.  

        Returns:            ``True`` – если сообщение получено, иначе бросает ``Exception``.        """        start = time.time()  
        while time.time() - start < timeout:  
            for msg in self._messages:  
                if getattr(msg, "_type", None) == "HEARTBEAT":  
                    return True  
            time.sleep(0.1)  
        raise Exception("[STUB] Таймаут ожидания heartbeat")  

    def recv_match(self,  
                   type: Optional[str] = None,  
                   blocking: bool = False,  
                   timeout: Optional[float] = None) -> Optional[Any]:  
        """  
        Получает сообщение из внутренней очереди.  
        Args:            type:        Если указано, возвращается только сообщение данного типа.            blocking:    Если ``True`` – ожидать появления сообщения.            timeout:     Максимальное время ожидания в секундах (только при ``blocking=True``).  
        Returns:            Сообщение или ``None``.        """        if blocking:  
            deadline = None if timeout is None else time.time() + timeout  
            while True:  
                for i, msg in enumerate(self._messages):  
                    if type is None or getattr(msg, "_type", None) == type:  
                        return self._messages.pop(i)  
                if deadline is not None and time.time() > deadline:  
                    return None  
                time.sleep(0.05)  
        else:  
            for i, msg in enumerate(self._messages):  
                if type is None or getattr(msg, "_type", None) == type:  
                    return self._messages.pop(i)  
            return None  

    # --------------------------------------------------------------  
    #  Управление «мотором» и режимами    # --------------------------------------------------------------    def motors_armed_wait(self) -> None:  
        """Блокирует выполнение, пока не будет взведено состояние ``_armed``."""  
        while not self._armed:  
            time.sleep(0.05)  
        print("[STUB] Моторы взведены")  

    def motors_disarmed_wait(self) -> None:  
        """Блокирует выполнение, пока не будет снято состояние ``_armed``."""  
        while self._armed:  
            time.sleep(0.05)  
        print("[STUB] Моторы разведены")  

    def arducopter_arm(self) -> None:  
        """Имитация команды ARDUCOPTER_ARM."""  
        print("[STUB] Взведение моторов…")  
        time.sleep(1)  
        self._armed = True  

    def arducopter_disarm(self) -> None:  
        """Имитация команды ARDUCOPTER_DISARM."""  
        print("[STUB] Разведение моторов…")  
        time.sleep(1)  
        self._armed = False  

    # --------------------------------------------------------------  
    #  Протоколные запросы (вызываются через ``master.mav``)    # --------------------------------------------------------------    def request_data_stream_send(self,  
                                 target_system: int,  
                                 target_component: int,  
                                 stream_id: int,  
                                 rate: int,  
                                 start: int) -> None:  
        """  
        Запрашивает поток телеметрии (в заглушке – только вывод в консоль).        """        print(f"[STUB] Запрос потока {stream_id} с частотой {rate} Гц (start={start})")  

    def set_mode_send(self,  
                      target_system: int,  
                      base_mode: int,  
                      custom_mode: int) -> None:  
        """  
        Устанавливает режим полёта.        В реальном MAVLink это посылает COMMAND_LONG с командой MAV_CMD_DO_SET_MODE.        Здесь просто меняем внутреннее поле ``_mode`` и выводим читаемый текст.        """        self._mode = custom_mode  
        # Формируем «фиктивное» сообщение только для вывода имени режима.  
        fake_msg = type("FakeMsg", (), {"custom_mode": custom_mode})()  
        mode_name = mavutil.mode_string_v10(fake_msg)  
        print(f"[STUB] Установка режима → {mode_name} (custom_mode={custom_mode})")  

    def command_long_send(self,  
                          target_system: int,  
                          target_component: int,  
                          command: int,  
                          confirmation: int,  
                          param1: float = 0,  
                          param2: float = 0,  
                          param3: float = 0,  
                          param4: float = 0,  
                          param5: float = 0,  
                          param6: float = 0,  
                          param7: float = 0) -> None:  
        """  
        Обрабатывает команды TAKEOFF и LAND (другие игнорируются).  
        Для TAKEOFF значение высоты передаётся в ``param7`` – как в оригинальном        протоколе MAVLink.        """        if command == mavutil.MAV_CMD_NAV_TAKEOFF:  
            self._target_altitude = float(param7)  
            self._flying = True  
            print(f"[STUB] Команда TAKEOFF → цель {self._target_altitude} м")  
        elif command == mavutil.MAV_CMD_NAV_LAND:  
            self._landing = True  
            print("[STUB] Команда LAND")  
        else:  
            print(f"[STUB] Не реализована команда {command}")  

    # --------------------------------------------------------------  
    #  Завершение работы    # --------------------------------------------------------------    def close(self) -> None:  
        """Останавливает фон и помечает соединение как закрытое."""  
        self._running = False  
        self._connected = False  
        print("[STUB] Соединение закрыто")  


# ----------------------------------------------------------------------  
#  Фабричная функция – как в настоящем pymavlink  
# ----------------------------------------------------------------------  
def mavlink_connection(connection_string: str) -> StubMavlinkConnection:  
    """  
    Создаёт «соединение» с эмулятором.  
    Args:        connection_string: произвольная строка (например ``'udp:127.0.0.1:14550'``).  
    Returns:        Объект :class:`StubMavlinkConnection`.    """    return StubMavlinkConnection(connection_string)  


# Добавляем фабрику в пространство имён ``mavutil`` – так же, как в реальном пакете.  
mavutil.mavlink_connection = mavlink_connection  

# ----------------------------------------------------------------------  
#  Публичный API модуля  
# ----------------------------------------------------------------------  
__all__ = ["mavutil", "mavlink_connection"]  


# ----------------------------------------------------------------------  
#  Тестовый «демо», который можно запустить напрямую:  
# ----------------------------------------------------------------------  
if __name__ == "__main__":  
    # Пример типичного использования (аналогичный коду, написанному для pymavlink)  
    master = mavlink_connection("udp:127.0.0.1:14550")  
    master.wait_heartbeat()  
    master.arducopter_arm()  
    master.motors_armed_wait()  

    # Запрашиваем телеметрию (в заглушке просто лог)  
    master.mav.request_data_stream_send(  
        master.target_system,  
        master.target_component,  
        mavutil.MAV_DATA_STREAM_POSITION,  
        rate=10,  
        start=1,  
    )  

    # Устанавливаем режим GUIDED  
    master.mav.set_mode_send(  
        master.target_system,  
        base_mode=mavutil.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED,  
        custom_mode=4,                 # GUIDED  
    )  

    # TAKEOFF → 5 м  
    master.mav.command_long_send(  
        master.target_system,  
        master.target_component,  
        mavutil.MAV_CMD_NAV_TAKEOFF,  
        confirmation=0,  
        param7=5.0,  
    )  

    # Пара секунд «в полёте», чтобы увидеть изменение высоты  
    time.sleep(3)  

    # LAND  
    master.mav.command_long_send(  
        master.target_system,  
        master.target_component,  
        mavutil.MAV_CMD_NAV_LAND,  
        confirmation=0,  
    )  

    # Ждём завершения посадки  
    while master._flying:  
        time.sleep(0.5)  

    master.arducopter_disarm()  
    master.motors_disarmed_wait()  
    master.close()