-
Notifications
You must be signed in to change notification settings - Fork 179
/
hardware.py
593 lines (485 loc) · 21.1 KB
/
hardware.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
"""Hardware API wrapper module for initialization and management."""
import asyncio
import logging
from pathlib import Path
from fastapi import Depends, status
from typing import (
TYPE_CHECKING,
cast,
Annotated,
Awaitable,
Callable,
Iterator,
Iterable,
Optional,
Tuple,
)
from uuid import uuid4 # direct to avoid import cycles in service.dependencies
from traceback import format_exception_only, TracebackException
from contextlib import contextmanager, suppress
from opentrons_shared_data import deck
from opentrons_shared_data.robot.types import RobotType, RobotTypeEnum
from opentrons import initialize as initialize_api, should_use_ot3
from opentrons.config import (
IS_ROBOT,
ARCHITECTURE,
SystemArchitecture,
feature_flags as ff,
)
from opentrons.util.helpers import utc_now
from opentrons.hardware_control import ThreadManagedHardware, HardwareControlAPI, API
from opentrons.hardware_control.simulator_setup import load_simulator_thread_manager
from opentrons.hardware_control.types import StatusBarState
from opentrons.protocols.api_support.deck_type import (
guess_from_global_config as guess_deck_type_from_global_config,
)
from opentrons.protocol_engine import DeckType
from server_utils.fastapi_utils.app_state import (
AppState,
AppStateAccessor,
get_app_state,
)
from .errors.robot_errors import (
NotSupportedOnOT2,
NotSupportedOnFlex,
HardwareNotYetInitialized,
HardwareFailedToInitialize,
)
from .settings import get_settings, RobotServerSettings
from .subsystems.firmware_update_manager import (
FirmwareUpdateManager,
UpdateProcessHandle,
)
from .subsystems.models import SubSystem
from .service.task_runner import TaskRunner, get_task_runner
from .robot.control.estop_handler import EstopHandler
if TYPE_CHECKING:
from opentrons.hardware_control.ot3api import OT3API
PostInitCallback = Tuple[
Callable[[AppState, HardwareControlAPI], Awaitable[None]], bool
]
"""Function to be called when hardware init completes.
- First item is the callback
- Second item is True if this should be called BEFORE the postinit
function, or False if this should be called AFTER the postinit function."""
log = logging.getLogger(__name__)
_hw_api_accessor = AppStateAccessor[ThreadManagedHardware]("hardware_api")
_init_task_accessor = AppStateAccessor["asyncio.Task[None]"]("hardware_init_task")
_postinit_task_accessor = AppStateAccessor["asyncio.Task[None]"](
"hardware_postinit_task"
)
_firmware_update_manager_accessor = AppStateAccessor[FirmwareUpdateManager](
"firmware_update_manager"
)
_estop_handler_accessor = AppStateAccessor[EstopHandler]("estop_handler")
class _ExcPassthrough(BaseException):
def __init__(self, payload: TracebackException) -> None:
self.payload = payload
def start_initializing_hardware(
app_state: AppState, callbacks: Iterable[PostInitCallback]
) -> None:
"""Initialize the hardware API singleton, attaching it to global state.
Returns immediately while the hardware API initializes in the background.
"""
initialize_task = _init_task_accessor.get_from(app_state)
if initialize_task is None:
initialize_task = asyncio.create_task(
_initialize_hardware_api(app_state, callbacks)
)
_init_task_accessor.set_on(app_state, initialize_task)
async def clean_up_hardware(app_state: AppState) -> None:
"""Shutdown the HardwareAPI singleton and remove it from global state."""
initialize_task = _init_task_accessor.get_from(app_state)
thread_manager = _hw_api_accessor.get_from(app_state)
postinit_task = _postinit_task_accessor.get_from(app_state)
_init_task_accessor.set_on(app_state, None)
_postinit_task_accessor.set_on(app_state, None)
_hw_api_accessor.set_on(app_state, None)
if initialize_task is not None:
initialize_task.cancel()
# Ignore exceptions, since they've already been logged.
await asyncio.gather(initialize_task, return_exceptions=True)
if postinit_task is not None:
postinit_task.cancel()
await asyncio.gather(postinit_task, return_exceptions=True)
if thread_manager is not None:
thread_manager.clean_up()
# TODO(mm, 2024-01-30): Consider merging this with the Flex's LightController.
class FrontButtonLightBlinker:
"""Blinks the OT-2's front button light during certain stages of startup."""
def __init__(self) -> None:
self._hardware_and_task: Optional[
Tuple[HardwareControlAPI, "asyncio.Task[None]"]
] = None
self._hardware_init_complete = False
self._persistence_init_complete = False
async def start_blinking(self, hardware: HardwareControlAPI) -> None:
"""Start blinking the OT-2's front button light.
This should be called once during server startup, as soon as the hardware is
initialized enough to support the front button light.
Note that this is preceded by two other visually indistinguishable stages of
blinking:
1. A separate system process blinks the light while this process's Python
interpreter is initializing.
2. build_hardware_controller() blinks the light internally while it's doing hardware
initialization.
Blinking will continue until `mark_hardware_init_complete()` and
`mark_persistence_init_complete()` have both been called.
"""
assert self._hardware_and_task is None, "hardware should only be set once."
async def blink_forever() -> None:
while True:
# This should no-op on a Flex.
await hardware.set_lights(button=True)
await asyncio.sleep(0.5)
await hardware.set_lights(button=False)
await asyncio.sleep(0.5)
task = asyncio.create_task(blink_forever())
self._hardware_and_task = (hardware, task)
async def mark_hardware_init_complete(self) -> None:
"""See `start_blinking()`."""
self._hardware_init_complete = True
await self._maybe_stop_blinking()
async def mark_persistence_init_complete(self) -> None:
"""See `start_blinking()`."""
self._persistence_init_complete = True
await self._maybe_stop_blinking()
async def clean_up(self) -> None:
"""Free resources used by the `FrontButtonLightBlinker`."""
if self._hardware_and_task is not None:
_, task = self._hardware_and_task
task.cancel()
with suppress(asyncio.CancelledError):
await task
async def _maybe_stop_blinking(self) -> None:
if self._hardware_and_task is not None and self._all_complete():
# We're currently blinking, but we should stop.
hardware, task = self._hardware_and_task
task.cancel()
with suppress(asyncio.CancelledError):
await task
await hardware.set_lights(button=True)
def _all_complete(self) -> bool:
return self._persistence_init_complete and self._hardware_init_complete
# TODO(mm, 2022-10-18): Deduplicate this background initialization infrastructure
# with similar code used for initializing the persistence layer.
async def get_thread_manager(
app_state: Annotated[AppState, Depends(get_app_state)],
) -> ThreadManagedHardware:
"""Get the ThreadManager'd HardwareAPI as a route dependency.
Arguments:
app_state: Global app state from `app.state`, provided by
FastAPI's dependency injection system via `fastapi.Depends`
Returns:
The initialized ThreadManager containing a HardwareAPI
Raises:
ApiError: The Hardware API is still initializing or failed to initialize.
"""
initialize_task = _init_task_accessor.get_from(app_state)
postinit_task = _postinit_task_accessor.get_from(app_state)
hardware_api = _hw_api_accessor.get_from(app_state)
if initialize_task is None or hardware_api is None or not initialize_task.done():
raise HardwareNotYetInitialized().as_error(status.HTTP_503_SERVICE_UNAVAILABLE)
if initialize_task.cancelled():
raise HardwareFailedToInitialize(
detail="Hardware initialization cancelled."
).as_error(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
if initialize_task.exception():
exc = initialize_task.exception()
raise HardwareFailedToInitialize(detail=str(exc)).as_error(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
) from exc
if postinit_task and postinit_task.done() and postinit_task.exception():
with _format_exc_only("Hardware failed to initialize"):
postinit_task.result()
exc = cast(Exception, postinit_task.exception())
raise HardwareFailedToInitialize(detail=str(exc)).as_error(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
) from exc
return hardware_api
async def get_hardware(
thread_manager: Annotated[ThreadManagedHardware, Depends(get_thread_manager)],
) -> HardwareControlAPI:
"""Get the HardwareAPI as a route dependency.
Arguments:
thread_manager: The global thread manager singleton, provided by
FastAPI's dependency injection system via `fastapi.Depends`
Returns:
The same object, but this time properly typed as a hardware controller.
It is still a ThreadManager and provides the same guarantees that
everything will be run in another thread, but will be checked by mypy
as if it was the hardware controller.
Raises:
ApiError: The Hardware API is still initializing or failed to initialize.
"""
return thread_manager.wrapped()
def get_ot3_hardware(
thread_manager: ThreadManagedHardware,
) -> "OT3API":
"""Get a flex hardware controller."""
try:
from opentrons.hardware_control.ot3api import OT3API
except ImportError as exception:
raise NotSupportedOnOT2(detail=str(exception)).as_error(
status.HTTP_403_FORBIDDEN
) from exception
if not thread_manager.wraps_instance(OT3API):
raise NotSupportedOnOT2(
detail="This route is only available on a Flex."
).as_error(status.HTTP_403_FORBIDDEN)
return cast(OT3API, thread_manager.wrapped())
def get_ot2_hardware(
thread_manager: Annotated[ThreadManagedHardware, Depends(get_thread_manager)],
) -> "API":
"""Get an OT2 hardware controller."""
if not thread_manager.wraps_instance(API):
raise NotSupportedOnFlex(
detail="This route is only available on an OT-2."
).as_error(status.HTTP_403_FORBIDDEN)
return cast(API, thread_manager.wrapped())
async def get_firmware_update_manager(
app_state: Annotated[AppState, Depends(get_app_state)],
thread_manager: Annotated[ThreadManagedHardware, Depends(get_thread_manager)],
task_runner: Annotated[TaskRunner, Depends(get_task_runner)],
) -> FirmwareUpdateManager:
"""Get an update manager to track firmware update statuses."""
hardware = get_ot3_hardware(thread_manager)
update_manager = _firmware_update_manager_accessor.get_from(app_state)
if update_manager is None:
update_manager = FirmwareUpdateManager(
task_runner=task_runner, hw_handle=hardware
)
_firmware_update_manager_accessor.set_on(app_state, update_manager)
return update_manager
async def get_estop_handler(
app_state: Annotated[AppState, Depends(get_app_state)],
thread_manager: Annotated[ThreadManagedHardware, Depends(get_thread_manager)],
) -> EstopHandler:
"""Get an Estop Handler for working with the estop."""
hardware = get_ot3_hardware(thread_manager)
estop_handler = _estop_handler_accessor.get_from(app_state)
if estop_handler is None:
estop_handler = EstopHandler(hw_handle=hardware)
_estop_handler_accessor.set_on(app_state, estop_handler)
return estop_handler
async def get_robot_type() -> RobotType:
"""Return what kind of robot this server is running on."""
return "OT-3 Standard" if should_use_ot3() else "OT-2 Standard"
async def get_robot_type_enum() -> RobotTypeEnum:
"""Return what kind of robot this server is running on."""
return RobotTypeEnum.FLEX if should_use_ot3() else RobotTypeEnum.OT2
async def get_deck_type() -> DeckType:
"""Return what kind of deck the robot that this server is running on has."""
return DeckType(guess_deck_type_from_global_config())
async def get_deck_definition(
deck_type: Annotated[DeckType, Depends(get_deck_type)],
) -> deck.types.DeckDefinitionV5:
"""Return this robot's deck definition."""
return deck.load(deck_type, version=5)
async def _postinit_ot2_tasks(
hardware: ThreadManagedHardware,
app_state: AppState,
callbacks: Iterable[PostInitCallback],
) -> None:
"""Tasks to run on an initialized OT-2 before it is ready to use."""
try:
await _home_on_boot(hardware.wrapped())
finally:
for callback in callbacks:
if not callback[1]:
await callback[0](app_state, hardware.wrapped())
async def _home_on_boot(hardware: HardwareControlAPI) -> None:
if ff.disable_home_on_boot():
return
log.info("Homing Z axes")
try:
await hardware.home_z()
except Exception:
# If this is a flex, and the estop is asserted, we'll get an error
# here; make sure that it doesn't prevent things from actually
# starting.
log.error("Exception homing z on startup, ignoring to allow server to start")
async def _do_updates(
hardware: "OT3API", update_manager: FirmwareUpdateManager
) -> None:
update_handles = [
await update_manager.start_update_process(
str(uuid4()), SubSystem.from_hw(subsystem), utc_now()
)
for subsystem, subsystem_state in hardware.attached_subsystems.items()
if not subsystem_state.ok or subsystem_state.fw_update_needed
]
async def _until_update_finishes(handle: UpdateProcessHandle) -> None:
while True:
progress = await handle.get_progress()
if progress.error:
log.error(
f"Error updating {handle.process_details.subsystem}: {progress.error}"
)
return
elif progress.state == type(progress.state).done: # noqa: E721
log.info(f"Update complete for {handle.process_details.subsystem}")
return
else:
await asyncio.sleep(1)
await asyncio.gather(*(_until_update_finishes(handle) for handle in update_handles))
async def _postinit_ot3_tasks(
hardware_tm: ThreadManagedHardware,
app_state: AppState,
callbacks: Iterable[PostInitCallback],
) -> None:
"""Tasks to run on an initialized OT-3 before it is ready to use."""
update_manager = await get_firmware_update_manager(
app_state=app_state,
thread_manager=hardware_tm,
task_runner=get_task_runner(app_state),
)
hardware = cast("OT3API", hardware_tm)
try:
await _do_updates(hardware, update_manager)
await hardware.cache_instruments()
await _home_on_boot(hardware)
await hardware.set_status_bar_state(StatusBarState.ACTIVATION)
for callback in callbacks:
if not callback[1]:
await callback[0](app_state, hardware_tm.wrapped())
except Exception:
log.exception("Hardware initialization failure")
raise
async def _initialize_ot3_robot(
app_state: AppState, settings: RobotServerSettings, systemd_available: bool
) -> ThreadManagedHardware:
"""Initialize the OT-3 robot system."""
if settings.simulator_configuration_file_path:
return await _initialize_simulated_hardware(
settings.simulator_configuration_file_path
)
else:
return await initialize_api()
async def _initialize_ot2_robot(
app_state: AppState, settings: RobotServerSettings, systemd_available: bool
) -> ThreadManagedHardware:
"""Initialize the OT-2 robot system."""
if systemd_available:
# During boot, opentrons-gpio-setup.service will be blinking the
# front button light. Kill it here and wait for it to exit so it releases
# that GPIO line. Otherwise, our hardware initialization would get a
# "device already in use" error.
service_to_stop = "opentrons-gpio-setup"
command = ["systemctl", "stop", service_to_stop]
subprocess = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await subprocess.communicate()
if subprocess.returncode == 0:
log.info(f"Stopped {service_to_stop}.")
else:
raise RuntimeError(
f"Error stopping {service_to_stop}.",
{
"returncode": subprocess.returncode,
"stdout": stdout,
"stderr": stderr,
},
)
if settings.simulator_configuration_file_path:
return await _initialize_simulated_hardware(
settings.simulator_configuration_file_path
)
else:
return await initialize_api()
async def _initialize_simulated_hardware(
simulator_config: str,
) -> ThreadManagedHardware:
"""Initialize a simulated hardware API."""
simulator_config_path = Path(simulator_config)
log.info(f"Loading simulator from {simulator_config_path}")
return await load_simulator_thread_manager(path=simulator_config_path)
def _postinit_done_handler(task: "asyncio.Task[None]") -> None:
if task.exception():
with _format_exc("Postinit task failed"):
task.result()
else:
log.info("Postinit task complete")
def _systemd_notify(systemd_available: bool) -> None:
if systemd_available:
try:
import systemd.daemon # type: ignore
systemd.daemon.notify("READY=1")
except ImportError:
pass
async def _wrap_postinit(postinit: Awaitable[None]) -> None:
try:
return await postinit
except BaseException as be:
raise _ExcPassthrough(TracebackException.from_exception(be))
@contextmanager
def _format_exc_only(log_prefix: str) -> Iterator[None]:
try:
yield
except _ExcPassthrough as passthrough:
log.error(
f"{log_prefix}: {''.join(list(passthrough.payload.format_exception_only()))}"
)
except BaseException as be:
log.error(f"{log_prefix}: {format_exception_only(type(be), be)}")
@contextmanager
def _format_exc(log_prefix: str) -> Iterator[None]:
try:
yield
except _ExcPassthrough as passthrough:
log.error(
f"{log_prefix}: {''.join(list(passthrough.payload.format(chain=True)))}"
)
except BaseException as be:
log.error(f"{log_prefix}: {format_exception_only(type(be), be)}")
async def _initialize_hardware_api(
app_state: AppState, callbacks: Iterable[PostInitCallback]
) -> None:
"""Initialize the HardwareAPI and attach it to global state."""
app_settings = get_settings()
systemd_available = IS_ROBOT and ARCHITECTURE != SystemArchitecture.HOST
try:
if should_use_ot3():
hardware = await _initialize_ot3_robot(
app_state, app_settings, systemd_available
)
else:
hardware = await _initialize_ot2_robot(
app_state, app_settings, systemd_available
)
_hw_api_accessor.set_on(app_state, hardware)
for callback in callbacks:
if callback[1]:
await callback[0](app_state, hardware.wrapped())
# This ties systemd notification to hardware initialization. We might want to move
# systemd notification so it also waits for DB migration+initialization.
# If we do that, we need to be careful:
# - systemd timeouts might need to be increased to allow for DB migration time
# - There might be UI implications for loading states on the Flex's on-device display,
# because it polls for the server's systemd status.
_systemd_notify(systemd_available)
if should_use_ot3():
postinit_task = asyncio.create_task(
_wrap_postinit(_postinit_ot3_tasks(hardware, app_state, callbacks))
)
else:
postinit_task = asyncio.create_task(
_wrap_postinit(_postinit_ot2_tasks(hardware, app_state, callbacks))
)
postinit_task.add_done_callback(_postinit_done_handler)
_postinit_task_accessor.set_on(app_state, postinit_task)
log.info("Opentrons hardware API initialized")
except Exception:
# If something went wrong, log it here, in case the robot is powered off
# ungracefully before our cleanup code has a chance to run and receive
# the exception.
#
# todo(mm, 2021-10-22): Logging this exception should be the responsibility
# of calling code, but currently, nothing catches exceptions raised from
# this background initialization task. Once that's fixed, this log.error()
# should be removed,
log.exception("Exception during hardware background initialization.")
raise