-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest.py
111 lines (82 loc) · 3.07 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import argparse
import asyncio
import logging
import os
from mavsdk import System
logFormatter = " %(asctime)s %(name)s - %(levelname)s: %(message)s"
logging.basicConfig(level=logging.DEBUG, format=logFormatter)
parser = argparse.ArgumentParser(description="Control the drone")
parser.add_argument(
"--takeoff",
dest="takeoff",
action="store_true",
help="Run the test procedure for uploading the mission",
)
args = parser.parse_args()
async def test_takeoff():
# Init the drone
drone = System()
address = os.getenv("ARDU_SERIAL_CONN", "udp://:14540")
await drone.connect(system_address=address)
print("Waiting for drone to connect...")
async for state in drone.core.connection_state():
if state.is_connected:
print("Drone discovered!")
break
# Start parallel tasks
print_altitude_task = asyncio.ensure_future(print_altitude(drone))
print_flight_mode_task = asyncio.ensure_future(print_flight_mode(drone))
running_tasks = [print_altitude_task, print_flight_mode_task]
termination_task = asyncio.ensure_future(observe_is_in_air(drone, running_tasks))
print("Waiting for drone to have a global position estimate...")
async for health in drone.telemetry.health():
if health.is_global_position_ok:
print("Global position estimate ok")
break
# Execute the maneuvers
print("-- Arming")
await drone.action.arm()
print("-- Taking off")
await drone.action.takeoff()
await asyncio.sleep(5)
print("-- Landing")
await drone.action.land()
# Wait until the drone is landed (instead of exiting after 'land' is sent)
await termination_task
async def print_altitude(drone):
""" Prints the altitude when it changes """
previous_altitude = None
async for position in drone.telemetry.position():
altitude = round(position.relative_altitude_m)
if altitude != previous_altitude:
previous_altitude = altitude
print(f"Altitude: {altitude}")
async def print_flight_mode(drone):
""" Prints the flight mode when it changes """
previous_flight_mode = None
async for flight_mode in drone.telemetry.flight_mode():
if flight_mode != previous_flight_mode:
previous_flight_mode = flight_mode
print(f"Flight mode: {flight_mode}")
async def observe_is_in_air(drone, running_tasks):
""" Monitors whether the drone is flying or not and
returns after landing """
was_in_air = False
async for is_in_air in drone.telemetry.in_air():
if is_in_air:
was_in_air = is_in_air
if was_in_air and not is_in_air:
for task in running_tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
await asyncio.get_event_loop().shutdown_asyncgens()
return
async def run():
if args.takeoff:
await test_takeoff()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(run())