Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(api): Ensure stack of labware on Staging Area Slot properly resolves ancestor slot #16681

Merged
merged 4 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions api/src/opentrons/protocol_api/core/engine/labware.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
LabwareOffsetCreate,
LabwareOffsetVector,
)
from opentrons.types import DeckSlotName, Point
from opentrons.types import DeckSlotName, Point, StagingSlotName
from opentrons.hardware_control.nozzle_manager import NozzleMap


Expand Down Expand Up @@ -190,9 +190,13 @@ def get_well_core(self, well_name: str) -> WellCore:
def get_deck_slot(self) -> Optional[DeckSlotName]:
"""Get the deck slot the labware is in, if on deck."""
try:
return self._engine_client.state.geometry.get_ancestor_slot_name(
ancestor = self._engine_client.state.geometry.get_ancestor_slot_name(
self.labware_id
)
if isinstance(ancestor, StagingSlotName):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like the wrong thing to return with, since the deck slot it's on is not actually on the third column. But pulling back further, this function is only used by ProtocolContext.loaded_labwares which is a function that is now wildly out of date with how we load labware (it uses OT-2 style deck slot ints, it ignores anything off-deck). So I would return to this returning None for staging slots for now, then relook into this if we decide to introduce a new, similar function.

# The only use case for get_deck_slot is with a legacy OT-2 function which resolves to a numerical deck slot, so we can ignore staging area slots for now
return None
return ancestor
except (
LabwareNotOnDeckError,
ModuleNotOnDeckError,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
)

from opentrons_shared_data.errors.exceptions import MotionPlanningFailureError
from opentrons.protocol_engine.errors import LocationIsStagingSlotError
from opentrons_shared_data.module import FLEX_TC_LID_COLLISION_ZONE

from opentrons.hardware_control import CriticalPoint
Expand Down Expand Up @@ -63,7 +64,7 @@ def __init__(self, message: str) -> None:
)


def check_safe_for_pipette_movement(
def check_safe_for_pipette_movement( # noqa: C901
engine_state: StateView,
pipette_id: str,
labware_id: str,
Expand Down Expand Up @@ -121,8 +122,12 @@ def check_safe_for_pipette_movement(
f"Requested motion with the {primary_nozzle} nozzle partial configuration"
f" is outside of robot bounds for the pipette."
)

labware_slot = engine_state.geometry.get_ancestor_slot_name(labware_id)
ancestor = engine_state.geometry.get_ancestor_slot_name(labware_id)
if isinstance(ancestor, StagingSlotName):
raise LocationIsStagingSlotError(
"Cannot perform pipette actions on labware in Staging Area Slot."
)
labware_slot = ancestor

surrounding_slots = adjacent_slots_getters.get_surrounding_slots(
slot=labware_slot.as_int(), robot_type=engine_state.config.robot_type
Expand Down
13 changes: 9 additions & 4 deletions api/src/opentrons/protocol_engine/execution/movement.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import logging
from typing import Optional, List, Union

from opentrons.types import Point, MountType
from opentrons.types import Point, MountType, StagingSlotName
from opentrons.hardware_control import HardwareControlAPI
from opentrons_shared_data.errors.exceptions import PositionUnknownError
from opentrons.protocol_engine.errors import LocationIsStagingSlotError

from ..types import (
WellLocation,
Expand Down Expand Up @@ -93,9 +94,13 @@ async def move_to_well(
self._state_store.modules.get_heater_shaker_movement_restrictors()
)

dest_slot_int = self._state_store.geometry.get_ancestor_slot_name(
labware_id
).as_int()
ancestor = self._state_store.geometry.get_ancestor_slot_name(labware_id)
if isinstance(ancestor, StagingSlotName):
raise LocationIsStagingSlotError(
"Cannot move to well on labware in Staging Area Slot."
)

dest_slot_int = ancestor.as_int()

self._hs_movement_flagger.raise_if_movement_restricted(
hs_movement_restrictors=hs_movement_restrictors,
Expand Down
43 changes: 29 additions & 14 deletions api/src/opentrons/protocol_engine/state/geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,10 +709,12 @@ def _get_lid_dock_slot_name(self, labware_id: str) -> str:
assert isinstance(labware_location, AddressableAreaLocation)
return labware_location.addressableAreaName

def get_ancestor_slot_name(self, labware_id: str) -> DeckSlotName:
def get_ancestor_slot_name(
self, labware_id: str
) -> Union[DeckSlotName, StagingSlotName]:
"""Get the slot name of the labware or the module that the labware is on."""
labware = self._labware.get(labware_id)
slot_name: DeckSlotName
slot_name: Union[DeckSlotName, StagingSlotName]

if isinstance(labware.location, DeckSlotLocation):
slot_name = labware.location.slotName
Expand All @@ -724,18 +726,14 @@ def get_ancestor_slot_name(self, labware_id: str) -> DeckSlotName:
slot_name = self.get_ancestor_slot_name(below_labware_id)
elif isinstance(labware.location, AddressableAreaLocation):
area_name = labware.location.addressableAreaName
# TODO we might want to eventually return some sort of staging slot name when we're ready to work through
# the linting nightmare it will create
if self._labware.is_absorbance_reader_lid(labware_id):
raise errors.LocationIsLidDockSlotError(
"Cannot get ancestor slot name for labware on lid dock slot."
)
if fixture_validation.is_staging_slot(area_name):
raise errors.LocationIsStagingSlotError(
"Cannot get ancestor slot name for labware on staging slot."
)
raise errors.LocationIs
slot_name = DeckSlotName.from_primitive(area_name)
elif fixture_validation.is_staging_slot(area_name):
slot_name = StagingSlotName.from_primitive(area_name)
else:
slot_name = DeckSlotName.from_primitive(area_name)
elif labware.location == OFF_DECK_LOCATION:
raise errors.LabwareNotOnDeckError(
f"Labware {labware_id} does not have a slot associated with it"
Expand Down Expand Up @@ -829,7 +827,9 @@ def get_labware_grip_point(
)

def get_extra_waypoints(
self, location: Optional[CurrentPipetteLocation], to_slot: DeckSlotName
self,
location: Optional[CurrentPipetteLocation],
to_slot: Union[DeckSlotName, StagingSlotName],
) -> List[Tuple[float, float]]:
"""Get extra waypoints for movement if thermocycler needs to be dodged."""
if location is not None:
Expand Down Expand Up @@ -888,8 +888,10 @@ def get_slot_item(
return maybe_labware or maybe_module or maybe_fixture or None

@staticmethod
def get_slot_column(slot_name: DeckSlotName) -> int:
def get_slot_column(slot_name: Union[DeckSlotName, StagingSlotName]) -> int:
"""Get the column number for the specified slot."""
if isinstance(slot_name, StagingSlotName):
return 4
row_col_name = slot_name.to_ot3_equivalent()
slot_name_match = WELL_NAME_PATTERN.match(row_col_name.value)
assert (
Expand Down Expand Up @@ -1170,7 +1172,13 @@ def get_total_nominal_gripper_offset_for_move_type(
)

assert isinstance(
ancestor, (DeckSlotLocation, ModuleLocation, OnLabwareLocation)
ancestor,
(
DeckSlotLocation,
ModuleLocation,
OnLabwareLocation,
AddressableAreaLocation,
),
), "No gripper offsets for off-deck labware"
return (
direct_parent_offset.pickUpOffset
Expand Down Expand Up @@ -1217,7 +1225,13 @@ def get_total_nominal_gripper_offset_for_move_type(
)

assert isinstance(
ancestor, (DeckSlotLocation, ModuleLocation, OnLabwareLocation)
ancestor,
(
DeckSlotLocation,
ModuleLocation,
OnLabwareLocation,
AddressableAreaLocation,
),
), "No gripper offsets for off-deck labware"
return (
direct_parent_offset.dropOffset
Expand Down Expand Up @@ -1293,6 +1307,7 @@ def _labware_gripper_offsets(
DeckSlotLocation,
ModuleLocation,
AddressableAreaLocation,
OnLabwareLocation,
),
), "No gripper offsets for off-deck labware"

Expand Down
6 changes: 3 additions & 3 deletions api/src/opentrons/protocol_engine/state/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from opentrons.protocol_engine.state.module_substates.absorbance_reader_substate import (
AbsorbanceReaderMeasureMode,
)
from opentrons.types import DeckSlotName, MountType
from opentrons.types import DeckSlotName, MountType, StagingSlotName
from ..errors import ModuleNotConnectedError

from ..types import (
Expand Down Expand Up @@ -1124,8 +1124,8 @@ def calculate_magnet_height(

def should_dodge_thermocycler(
self,
from_slot: DeckSlotName,
to_slot: DeckSlotName,
from_slot: Union[DeckSlotName, StagingSlotName],
to_slot: Union[DeckSlotName, StagingSlotName],
) -> bool:
"""Decide if the requested path would cross the thermocycler, if installed.

Expand Down
22 changes: 17 additions & 5 deletions api/src/opentrons/protocol_engine/state/motion.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from dataclasses import dataclass
from typing import List, Optional, Union

from opentrons.types import MountType, Point
from opentrons.types import MountType, Point, StagingSlotName
from opentrons.hardware_control.types import CriticalPoint
from opentrons.motion_planning.adjacent_slots_getters import (
get_east_west_slots,
Expand Down Expand Up @@ -277,9 +277,13 @@ def check_pipette_blocking_hs_latch(
current_location = self._pipettes.get_current_location()
if current_location is not None:
if isinstance(current_location, CurrentWell):
pipette_deck_slot = self._geometry.get_ancestor_slot_name(
ancestor = self._geometry.get_ancestor_slot_name(
current_location.labware_id
).as_int()
)
if isinstance(ancestor, StagingSlotName):
# Staging Area Slots cannot intersect with the h/s
return False
pipette_deck_slot = ancestor.as_int()
else:
pipette_deck_slot = (
self._addressable_areas.get_addressable_area_base_slot(
Expand All @@ -299,9 +303,13 @@ def check_pipette_blocking_hs_shaker(
current_location = self._pipettes.get_current_location()
if current_location is not None:
if isinstance(current_location, CurrentWell):
pipette_deck_slot = self._geometry.get_ancestor_slot_name(
ancestor = self._geometry.get_ancestor_slot_name(
current_location.labware_id
).as_int()
)
if isinstance(ancestor, StagingSlotName):
# Staging Area Slots cannot intersect with the h/s
return False
pipette_deck_slot = ancestor.as_int()
else:
pipette_deck_slot = (
self._addressable_areas.get_addressable_area_base_slot(
Expand All @@ -324,6 +332,10 @@ def get_touch_tip_waypoints(
"""Get a list of touch points for a touch tip operation."""
mount = self._pipettes.get_mount(pipette_id)
labware_slot = self._geometry.get_ancestor_slot_name(labware_id)
if isinstance(labware_slot, StagingSlotName):
raise errors.LocationIsStagingSlotError(
"Cannot perform Touch Tip on labware in Staging Area Slot."
)
next_to_module = self._modules.is_edge_move_unsafe(mount, labware_slot)
edge_path_type = self._labware.get_edge_path_type(
labware_id, well_name, mount, labware_slot, next_to_module
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from opentrons_shared_data.pipette import pipette_definition
from opentrons.calibration_storage.helpers import uri_from_details
from opentrons.protocols.models import LabwareDefinition
from opentrons.types import Point, DeckSlotName, MountType
from opentrons.types import Point, DeckSlotName, MountType, StagingSlotName
from opentrons_shared_data.pipette.types import PipetteNameType
from opentrons_shared_data.labware.labware_definition import (
Dimensions as LabwareDimensions,
Expand Down Expand Up @@ -2189,6 +2189,33 @@ def test_get_ancestor_slot_name(
assert subject.get_ancestor_slot_name("labware-2") == DeckSlotName.SLOT_1


def test_get_ancestor_slot_for_labware_stack_in_staging_area_slot(
decoy: Decoy,
mock_labware_view: LabwareView,
subject: GeometryView,
) -> None:
"""It should get name of ancestor slot of a stack of labware in a staging area slot."""
decoy.when(mock_labware_view.get("labware-1")).then_return(
LoadedLabware(
id="labware-1",
loadName="load-name",
definitionUri="1234",
location=AddressableAreaLocation(
addressableAreaName=StagingSlotName.SLOT_D4.id
),
)
)
decoy.when(mock_labware_view.get("labware-2")).then_return(
LoadedLabware(
id="labware-2",
loadName="load-name",
definitionUri="1234",
location=OnLabwareLocation(labwareId="labware-1"),
)
)
assert subject.get_ancestor_slot_name("labware-2") == StagingSlotName.SLOT_D4


def test_ensure_location_not_occupied_raises(
decoy: Decoy,
mock_labware_view: LabwareView,
Expand Down
Loading