diff --git a/abr-testing/.flake8 b/abr-testing/.flake8 index cc618b04ba2..ec1d7b91184 100644 --- a/abr-testing/.flake8 +++ b/abr-testing/.flake8 @@ -21,4 +21,5 @@ docstring-convention = google noqa-require-code = true -# per-file-ignores = +per-file-ignores = + abr_testing/protocols/*: C901 diff --git a/abr-testing/abr_testing/protocols/api 2.20/10_ZymoBIOMICS_Magbead_DNA_Cells_Flex_220api.py b/abr-testing/abr_testing/protocols/api 2.20/10_ZymoBIOMICS_Magbead_DNA_Cells_Flex.py similarity index 65% rename from abr-testing/abr_testing/protocols/api 2.20/10_ZymoBIOMICS_Magbead_DNA_Cells_Flex_220api.py rename to abr-testing/abr_testing/protocols/api 2.20/10_ZymoBIOMICS_Magbead_DNA_Cells_Flex.py index 3ef8d3bbae0..75c3af5b665 100644 --- a/abr-testing/abr_testing/protocols/api 2.20/10_ZymoBIOMICS_Magbead_DNA_Cells_Flex_220api.py +++ b/abr-testing/abr_testing/protocols/api 2.20/10_ZymoBIOMICS_Magbead_DNA_Cells_Flex.py @@ -3,12 +3,14 @@ from opentrons import types from typing import List, Union from opentrons import protocol_api -from opentrons.protocol_api import Well +from opentrons.protocol_api import Well, InstrumentContext import numpy as np from opentrons.protocol_api.module_contexts import ( HeaterShakerContext, TemperatureModuleContext, + MagneticBlockContext, ) +from abr_testing.protocols import helpers metadata = { "author": "Zach Galluzzo ", @@ -48,60 +50,21 @@ def add_parameters(parameters: protocol_api.ParameterContext) -> None: """Define parameters.""" - parameters.add_int( - variable_name="heater_shaker_speed", - display_name="Heater Shaker Shake Speed", - description="Speed to set the heater shaker to", - default=2000, - minimum=200, - maximum=3000, - unit="rpm", - ) - parameters.add_str( - variable_name="mount_pos", - display_name="Mount Position", - description="What mount to use", - choices=[ - {"display_name": "Left Mount", "value": "left"}, - {"display_name": "Right Mount", "value": "right"}, - ], - default="left", - ) - parameters.add_float( - variable_name="dot_bottom", - display_name=".bottom", - description="Lowest value pipette will go to.", - default=0.5, - choices=[ - {"display_name": "0.0", "value": 0.0}, - {"display_name": "0.1", "value": 0.1}, - {"display_name": "0.2", "value": 0.2}, - {"display_name": "0.3", "value": 0.3}, - {"display_name": "0.4", "value": 0.4}, - {"display_name": "0.5", "value": 0.5}, - {"display_name": "0.6", "value": 0.6}, - {"display_name": "0.7", "value": 0.7}, - {"display_name": "0.8", "value": 0.8}, - {"display_name": "0.9", "value": 0.9}, - {"display_name": "1.0", "value": 1.0}, - ], - ) + helpers.create_hs_speed_parameter(parameters) + helpers.create_pipette_mount_parameter(parameters) + helpers.create_dot_bottom_parameter(parameters) def run(ctx: protocol_api.ProtocolContext) -> None: - """Protoco Set up.""" + """Protocol Set Up.""" heater_shaker_speed = ctx.params.heater_shaker_speed # type: ignore[attr-defined] - mount_pos = ctx.params.mount_pos # type: ignore[attr-defined] + mount = ctx.params.pipette_mount # type: ignore[attr-defined] dot_bottom = ctx.params.dot_bottom # type: ignore[attr-defined] - trash_chute = False # If this is true, trash chute is loaded in D3, otherwise trash bin is loaded there - USE_GRIPPER = True dry_run = False TIP_TRASH = ( False # True = Used tips go in Trash, False = Used tips go back into rack ) - mount = mount_pos res_type = "nest_12_reservoir_15ml" - temp_mod = True num_samples = 8 wash1_vol = 500 @@ -118,37 +81,39 @@ def run(ctx: protocol_api.ProtocolContext) -> None: if not dry_run: settling_time = 2.0 lysis_incubation = 30.0 + bind_time_1 = 10.0 + bind_time_2 = 1.0 + wash_time = 5.0 + drybeads = 9.0 + lysis_rep_1 = 3 + lysis_rep_2 = 5 + bead_reps_2 = 8 else: settling_time = 0.25 lysis_incubation = 0.25 + bind_time_1 = bind_time_2 = wash_time = 0.25 + drybeads = 0.5 + lysis_rep_1 = lysis_rep_2 = bead_reps_2 = 1 PK_vol = 20.0 bead_vol = 25.0 starting_vol = lysis_vol + sample_vol binding_buffer_vol = bind_vol + bead_vol - if trash_chute: - ctx.load_waste_chute() - else: - ctx.load_trash_bin("A3") - hs_string = "heaterShakerModuleV1" - h_s: HeaterShakerContext = ctx.load_module(hs_string, "D1") # type: ignore[assignment] + ctx.load_trash_bin("A3") + h_s: HeaterShakerContext = ctx.load_module(helpers.hs_str, "D1") # type: ignore[assignment] h_s_adapter = h_s.load_adapter("opentrons_96_deep_well_adapter") sample_plate = h_s_adapter.load_labware(deepwell_type, "Samples") h_s.close_labware_latch() - if temp_mod: - temp: TemperatureModuleContext = ctx.load_module( - "temperature module gen2", "D3" - ) # type: ignore[assignment] - temp_block = temp.load_adapter("opentrons_96_well_aluminum_block") - elutionplate = temp_block.load_labware( - "opentrons_96_wellplate_200ul_pcr_full_skirt", "Elution Plate" - ) - else: - elutionplate = ctx.load_labware( - "opentrons_96_wellplate_200ul_pcr_full_skirt", "A3", "Elution Plate" - ) - - magblock = ctx.load_module("magneticBlockV1", "C1") + temp: TemperatureModuleContext = ctx.load_module( + helpers.temp_str, "D3" + ) # type: ignore[assignment] + temp_block = temp.load_adapter("opentrons_96_well_aluminum_block") + elutionplate = temp_block.load_labware( + "opentrons_96_wellplate_200ul_pcr_full_skirt", "Elution Plate" + ) + magblock: MagneticBlockContext = ctx.load_module( + helpers.mag_str, "C1" + ) # type: ignore[assignment] waste = ( ctx.load_labware("nest_1_reservoir_195ml", "B3", "Liquid Waste") .wells()[0] @@ -183,25 +148,7 @@ def run(ctx: protocol_api.ProtocolContext) -> None: # Redefine per well for liquid definitions samps = sample_plate.wells()[: (8 * num_cols)] - colors = [ - "#008000", - "#008000", - "#A52A2A", - "#A52A2A", - "#00FFFF", - "#0000FF", - "#800080", - "#ADD8E6", - "#FF0000", - "#FFFF00", - "#FF00FF", - "#00008B", - "#7FFFD4", - "#FFC0CB", - "#FFA500", - "#00FF00", - "#C0C0C0", - ] + colors = helpers.liquid_colors locations: List[Union[List[Well], Well]] = [ lysis_, @@ -250,69 +197,58 @@ def run(ctx: protocol_api.ProtocolContext) -> None: for i_del in range(delete): colors.pop(-1) - # Defining liquids per reservoir well - def liquids_(liq_type: str, location: Union[Well, List[Well]], color, vol: float): - sampnum = 8 * (math.ceil(num_samples / 8)) - """ - Takes an individual liquid at a time and adds the color to the well - in the description. - """ - # Volume Calculation - if liq_type == "PK": - extra_samples = math.ceil(1500 / lysis_vol) - - elif liq_type == "Beads": - extra_samples = math.ceil(1500 / bind_vol) + def add_liquid( + liq_type: str, wells: Union[Well, List[Well]], color: str, vol: float + ) -> None: + """Assigns colored liquid to wells based on type and location.""" + total_samples = math.ceil(num_samples / 8) * 8 + + # Calculate extra sample volume based on liquid type + extra_samples = math.ceil( + 1500 + / ( + lysis_vol + if liq_type == "PK" + else bind_vol + if liq_type == "Beads" + else vol + ) + ) - else: - extra_samples = math.ceil(1500 / vol) - - # Defining and assigning liquids to wells - if isinstance(location, list): - limit = sample_max / len(location) # Calculates samples/ res well - iterations = math.ceil(sampnum / limit) - left = sampnum - limit - while left > limit: - left = left - limit - if left > 0: - last_iteration_samp_num = left - elif left < 0: - last_iteration_samp_num = sampnum - else: - last_iteration_samp_num = limit + # Define liquid + liquid = ctx.define_liquid( + name=liq_type, description=liq_type, display_color=color + ) - samples_per_well = [] + # Assign liquid to each well + if isinstance(wells, list): + samples_per_well = [sample_max // len(wells)] * ( + total_samples // (sample_max // len(wells)) + ) + remainder = total_samples % (sample_max // len(wells)) - for i in range(iterations): - # append the left over on the last iteration - if i == (iterations - 1): - samples_per_well.append(last_iteration_samp_num) - else: - samples_per_well.append(limit) + if remainder: + samples_per_well.append(remainder) - liq = ctx.define_liquid( - name=str(liq_type), description=str(liq_type), display_color=color - ) - for sample, well in zip( - samples_per_well, location[: len(samples_per_well)] - ): - v = vol * (sample + extra_samples) - well.load_liquid(liquid=liq, volume=v) + for sample_count, well in zip(samples_per_well, wells): + well.load_liquid( + liquid=liquid, volume=vol * (sample_count + extra_samples) + ) else: - v = vol * (sampnum + extra_samples) - liq = ctx.define_liquid( - name=str(liq_type), description=str(liq_type), display_color=color + wells.load_liquid( + liquid=liquid, volume=vol * (total_samples + extra_samples) ) - location.load_liquid(liquid=liq, volume=v) - for x, (ll, l, c, v) in enumerate(zip(liquids, locations, colors, vols)): - liquids_(ll, l, c, v) + # Apply function for each liquid configuration + for liq, well, color, vol in zip(liquids, locations, colors, vols): + add_liquid(liq, well, color, vol) m1000.flow_rate.aspirate = 300 m1000.flow_rate.dispense = 300 m1000.flow_rate.blow_out = 300 - def tiptrack(pip, tipbox): + def tiptrack(tipbox: List[Well]) -> None: + """Track Tips.""" global tip1k global drop_count if tipbox == tips: @@ -324,28 +260,13 @@ def tiptrack(pip, tipbox): drop_count = 0 ctx.pause("Please empty the waste bin of all the tips before continuing.") - def blink(): - for i in range(3): - ctx.set_rail_lights(True) - ctx.delay(minutes=0.01666667) - ctx.set_rail_lights(False) - ctx.delay(minutes=0.01666667) - - def remove_supernatant(vol): + def remove_supernatant(vol: float) -> None: + """Remove supernatant.""" ctx.comment("-----Removing Supernatant-----") m1000.flow_rate.aspirate = 30 num_trans = math.ceil(vol / 980) vol_per_trans = vol / num_trans - def _waste_track(vol): - global waste_vol - waste_vol = waste_vol + (vol * 8) - if waste_vol >= 185000: - m1000.home() - blink() - ctx.pause("Please empty liquid waste before resuming.") - waste_vol = 0 - for i, m in enumerate(samples_m): m1000.pick_up_tip(tips_sn[8 * i]) loc = m.bottom(dot_bottom) @@ -357,16 +278,17 @@ def _waste_track(vol): m1000.transfer(vol_per_trans, loc, waste, new_tip="never", air_gap=20) m1000.blow_out(waste) m1000.air_gap(20) - m1000.drop_tip(tips_sn[8 * i]) if TIP_TRASH == True else m1000.return_tip() + m1000.drop_tip(tips_sn[8 * i]) if TIP_TRASH else m1000.return_tip() m1000.flow_rate.aspirate = 300 # Transfer from Magdeck plate to H-S - h_s.open_labware_latch() - ctx.move_labware(sample_plate, h_s_adapter, use_gripper=USE_GRIPPER) - h_s.close_labware_latch() + helpers.move_labware_to_hs(ctx, sample_plate, h_s, h_s_adapter) + + def bead_mixing( + well: Well, pip: InstrumentContext, mvol: float, reps: int = 8 + ) -> None: + """Mixing. - def bead_mixing(well, pip, mvol, reps=8): - """ 'mixing' will mix liquid that contains beads. This will be done by aspirating from the bottom of the well and dispensing from the top as to mix the beads with the other liquids as much as possible. Aspiration and @@ -408,8 +330,9 @@ def bead_mixing(well, pip, mvol, reps=8): pip.flow_rate.aspirate = 300 pip.flow_rate.dispense = 300 - def mixing(well, pip, mvol, reps=8): - """ + def mixing(well: Well, pip: InstrumentContext, mvol: float, reps: int = 8) -> None: + """Mixing. + 'mixing' will mix liquid that contains beads. This will be done by aspirating from the bottom of the well and dispensing from the top as to mix the beads with the other liquids as much as possible. Aspiration and @@ -449,17 +372,18 @@ def mixing(well, pip, mvol, reps=8): pip.flow_rate.aspirate = 300 pip.flow_rate.dispense = 300 - def lysis(vol, source): + def lysis(vol: float, source: Well) -> None: + """Lysis.""" ctx.comment("-----Beginning Lysis Steps-----") ctx.pause(msg="\n Hello \n - step 1 \n - step 2") num_transfers = math.ceil(vol / 980) - tiptrack(m1000, tips) + tiptrack(tips) for i in range(num_cols): src = source tvol = vol / num_transfers # Mix Shield and PK before transferring first time if i == 0: - for x in range(3 if not dry_run else 1): + for x in range(lysis_rep_1): m1000.aspirate(vol, src.bottom(1)) m1000.dispense(vol, src.bottom(8)) # Transfer Shield and PK @@ -472,20 +396,15 @@ def lysis(vol, source): # Mix shield and pk with samples for i in range(num_cols): if i != 0: - tiptrack(m1000, tips) - mixing(samples_m[i], m1000, tvol, reps=5 if not dry_run else 1) - m1000.drop_tip() if TIP_TRASH == True else m1000.return_tip() + tiptrack(tips) + mixing(samples_m[i], m1000, tvol, reps=lysis_rep_2) + m1000.drop_tip() if TIP_TRASH else m1000.return_tip() - h_s.set_and_wait_for_shake_speed(heater_shaker_speed) - speed_val = heater_shaker_speed - ctx.delay( - minutes=lysis_incubation if not dry_run else 0.25, - msg="Shake at " + str(speed_val) + " rpm for 30 minutes.", - ) - h_s.deactivate_shaker() + helpers.set_hs_speed(ctx, h_s, heater_shaker_speed, lysis_incubation, True) + + def bind(vol1: float, vol2: float) -> None: + """Binding. - def bind(vol1, vol2): - """ `bind` will perform magnetic bead binding on each sample in the deepwell plate. Each channel of binding beads will be mixed before transfer, and the samples will be mixed with the binding beads after @@ -501,7 +420,7 @@ def bind(vol1, vol2): """ ctx.comment("-----Beginning Binding Steps-----") for i, well in enumerate(samples_m): - tiptrack(m1000, tips) + tiptrack(tips) num_trans = math.ceil(vol1 / 980) vol_per_trans = vol1 / num_trans source = binding_buffer[i // 2] @@ -520,23 +439,15 @@ def bind(vol1, vol2): vol_per_trans, source, well.top(), air_gap=20, new_tip="never" ) m1000.air_gap(20) - bead_mixing(well, m1000, vol_per_trans, reps=8 if not dry_run else 1) + bead_mixing(well, m1000, vol_per_trans, reps=bead_reps_2) m1000.blow_out() m1000.air_gap(10) - m1000.drop_tip() if TIP_TRASH == True else m1000.return_tip() + m1000.drop_tip() if TIP_TRASH else m1000.return_tip() - h_s.set_and_wait_for_shake_speed(heater_shaker_speed * 0.9) - speed_val = heater_shaker_speed * 0.9 - ctx.delay( - minutes=10 if not dry_run else 0.25, - msg="Shake at " + str(speed_val) + " rpm for 10 minutes.", - ) - h_s.deactivate_shaker() + helpers.set_hs_speed(ctx, h_s, heater_shaker_speed * 0.9, bind_time_1, True) # Transfer from H-S plate to Magdeck plate - h_s.open_labware_latch() - ctx.move_labware(sample_plate, magblock, use_gripper=USE_GRIPPER) - h_s.close_labware_latch() + helpers.move_labware_from_hs_to_mag_block(ctx, sample_plate, h_s, magblock) for bindi in np.arange( settling_time + 1, 0, -0.5 @@ -550,7 +461,7 @@ def bind(vol1, vol2): remove_supernatant(vol1 + starting_vol) ctx.comment("-----Beginning Bind #2 Steps-----") - tiptrack(m1000, tips) + tiptrack(tips) for i, well in enumerate(samples_m): num_trans = math.ceil(vol2 / 980) vol_per_trans = vol2 / num_trans @@ -575,24 +486,15 @@ def bind(vol1, vol2): for i in range(num_cols): if i != 0: - tiptrack(m1000, tips) + tiptrack(tips) bead_mixing( samples_m[i], m1000, vol_per_trans, reps=3 if not dry_run else 1 ) - m1000.drop_tip() if TIP_TRASH == True else m1000.return_tip() - - h_s.set_and_wait_for_shake_speed(heater_shaker_speed) - speed_val = heater_shaker_speed - ctx.delay( - minutes=1 if not dry_run else 0.25, - msg="Shake at " + str(speed_val) + " rpm for 1 minutes.", - ) - h_s.deactivate_shaker() + m1000.drop_tip() if TIP_TRASH else m1000.return_tip() + helpers.set_hs_speed(ctx, h_s, heater_shaker_speed, bind_time_2, True) # Transfer from H-S plate to Magdeck plate - h_s.open_labware_latch() - ctx.move_labware(sample_plate, magblock, use_gripper=USE_GRIPPER) - h_s.close_labware_latch() + helpers.move_labware_from_hs_to_mag_block(ctx, sample_plate, h_s, magblock) for bindi in np.arange( settling_time + 1, 0, -0.5 @@ -605,8 +507,8 @@ def bind(vol1, vol2): # remove initial supernatant remove_supernatant(vol2 + 25) - def wash(vol, source): - + def wash(vol: float, source: List[Well]) -> None: + """Wash Steps.""" global whichwash # Defines which wash the protocol is on to log on the app if source == wash1: @@ -626,7 +528,7 @@ def wash(vol, source): num_trans = math.ceil(vol / 980) vol_per_trans = vol / num_trans - tiptrack(m1000, tips) + tiptrack(tips) for i, m in enumerate(samples_m): if source == wash1: if i == 0 or i == 3: @@ -645,15 +547,10 @@ def wash(vol, source): air_gap=20, new_tip="never", ) - m1000.drop_tip() if TIP_TRASH == True else m1000.return_tip() - - h_s.set_and_wait_for_shake_speed(heater_shaker_speed * 0.9) - ctx.delay(minutes=5 if not dry_run else 0.25) - h_s.deactivate_shaker() + m1000.drop_tip() if TIP_TRASH else m1000.return_tip() + helpers.set_hs_speed(ctx, h_s, heater_shaker_speed * 0.9, wash_time, True) - h_s.open_labware_latch() - ctx.move_labware(sample_plate, magblock, use_gripper=USE_GRIPPER) - h_s.close_labware_latch() + helpers.move_labware_from_hs_to_mag_block(ctx, sample_plate, h_s, magblock) for washi in np.arange( settling_time, 0, -0.5 @@ -669,27 +566,19 @@ def wash(vol, source): remove_supernatant(vol) - def elute(vol): - tiptrack(m1000, tips) + def elute(vol: float) -> None: + tiptrack(tips) for i, m in enumerate(samples_m): m1000.require_liquid_presence(elution_solution) m1000.aspirate(vol, elution_solution) m1000.air_gap(20) m1000.dispense(m1000.current_volume, m.top(-3)) - m1000.drop_tip() if TIP_TRASH == True else m1000.return_tip() + m1000.drop_tip() if TIP_TRASH else m1000.return_tip() - h_s.set_and_wait_for_shake_speed(heater_shaker_speed) - speed_val = heater_shaker_speed - ctx.delay( - minutes=5 if not dry_run else 0.25, - msg="Shake at " + str(speed_val) + " rpm for 5 minutes.", - ) - h_s.deactivate_shaker() + helpers.set_hs_speed(ctx, h_s, heater_shaker_speed * 0.9, wash_time, True) # Transfer back to magnet - h_s.open_labware_latch() - ctx.move_labware(sample_plate, magblock, use_gripper=USE_GRIPPER) - h_s.close_labware_latch() + helpers.move_labware_from_hs_to_mag_block(ctx, sample_plate, h_s, magblock) for elutei in np.arange(settling_time, 0, -0.5): ctx.delay( @@ -698,7 +587,7 @@ def elute(vol): ) for i, (m, e) in enumerate(zip(samples_m, elution_samples_m)): - tiptrack(m1000, tips) + tiptrack(tips) m1000.flow_rate.dispense = 100 m1000.flow_rate.aspirate = 25 m1000.transfer( @@ -706,7 +595,7 @@ def elute(vol): ) m1000.blow_out(e.top(-2)) m1000.air_gap(20) - m1000.drop_tip() if TIP_TRASH == True else m1000.return_tip() + m1000.drop_tip() if TIP_TRASH else m1000.return_tip() m1000.flow_rate.aspirate = 150 @@ -720,10 +609,6 @@ def elute(vol): wash(wash2_vol, wash2) wash(wash3_vol, wash3) h_s.set_and_wait_for_temperature(55) - if not dry_run: - drybeads = 9.0 # Number of minutes you want to dry for - else: - drybeads = 0.5 for beaddry in np.arange(drybeads, 0, -0.5): ctx.delay( minutes=0.5, diff --git a/abr-testing/abr_testing/protocols/api 2.20/11_Dynabeads_IP_Flex_96well_RIT.py b/abr-testing/abr_testing/protocols/api 2.20/11_Dynabeads_IP_Flex_96well_RIT.py new file mode 100644 index 00000000000..5762c0e3f1f --- /dev/null +++ b/abr-testing/abr_testing/protocols/api 2.20/11_Dynabeads_IP_Flex_96well_RIT.py @@ -0,0 +1,252 @@ +"""Immunoprecipitation by Dynabeads.""" +from opentrons.protocol_api import ProtocolContext, ParameterContext, Well +from opentrons.protocol_api.module_contexts import ( + HeaterShakerContext, + TemperatureModuleContext, + MagneticBlockContext, +) +from abr_testing.protocols import helpers +from typing import List, Union + +metadata = { + "protocolName": "Immunoprecipitation by Dynabeads - (Reagents in 15 mL tubes)", + "author": "Boren Lin, Opentrons", + "description": "Isolates protein from liquid samples using protein A /G coupled magnetic beads", +} + +requirements = { + "robotType": "OT-3", + "apiLevel": "2.20", +} + + +def add_parameters(parameters: ParameterContext) -> None: + """Define parameters.""" + helpers.create_hs_speed_parameter(parameters) + helpers.create_pipette_mount_parameter(parameters) + helpers.create_dot_bottom_parameter(parameters) + + +NUM_COL = 12 + +MAG_DELAY_MIN = 1 + +BEADS_VOL = 50 +AB_VOL = 50 +SAMPLE_VOL = 200 +WASH_TIMES = 3 +WASH_VOL = 200 +ELUTION_VOL = 50 + +WASTE_VOL_MAX = 275000 + +READY_FOR_SDSPAGE = 0 + +waste_vol_chk = 0.0 +waste_vol = 0.0 + +TIP_TRASH = False + + +def run(ctx: ProtocolContext) -> None: + """Protocol.""" + # defining variables inside def run + heater_shaker_speed = ctx.params.heater_shaker_speed # type: ignore[attr-defined] + ASP_HEIGHT = ctx.params.dot_bottom # type: ignore[attr-defined] + MIX_SPEED = heater_shaker_speed + MIX_SEC = 10 + + # if on deck: + INCUBATION_SPEEND = heater_shaker_speed * 0.5 + INCUBATION_MIN = 60 + # load labware + + sample_plate = ctx.load_labware("nest_96_wellplate_2ml_deep", "B2", "samples") + wash_res = ctx.load_labware("nest_12_reservoir_15ml", "B1", "wash") + reagent_res = ctx.load_labware( + "opentrons_15_tuberack_nest_15ml_conical", "C3", "reagents" + ) + waste_res = ctx.load_labware("nest_1_reservoir_290ml", "D2", "waste") + + tips = ctx.load_labware("opentrons_flex_96_tiprack_1000ul", "B3") + tips_sample = ctx.load_labware( + "opentrons_flex_96_tiprack_1000ul", "A2", "sample tips" + ) + tips_sample_loc = tips_sample.wells()[:95] + if READY_FOR_SDSPAGE == 0: + tips_elu = ctx.load_labware( + "opentrons_flex_96_tiprack_1000ul", "A1", "elution tips" + ) + tips_elu_loc = tips_elu.wells()[:95] + tips_reused = ctx.load_labware( + "opentrons_flex_96_tiprack_1000ul", "C2", "reused tips" + ) + tips_reused_loc = tips_reused.wells()[:95] + p1000 = ctx.load_instrument("flex_8channel_1000", "right", tip_racks=[tips]) + p1000_single = ctx.load_instrument("flex_1channel_1000", "left", tip_racks=[tips]) + h_s: HeaterShakerContext = ctx.load_module(helpers.hs_str, "D1") # type: ignore[assignment] + h_s_adapter = h_s.load_adapter("opentrons_96_deep_well_adapter") + working_plate = h_s_adapter.load_labware( + "nest_96_wellplate_2ml_deep", "working plate" + ) + + if READY_FOR_SDSPAGE == 0: + temp: TemperatureModuleContext = ctx.load_module( + helpers.temp_str, "D3" + ) # type: ignore[assignment] + final_plate = temp.load_labware( + "opentrons_96_deep_well_adapter_nest_wellplate_2ml_deep", "final plate" + ) + mag: MagneticBlockContext = ctx.load_module(helpers.mag_str, "C1") # type: ignore[assignment] + + # liquids + samples = sample_plate.rows()[0][:NUM_COL] # 1 + beads = reagent_res.wells()[0] # 2 + ab = reagent_res.wells()[1] # 3 + elu = reagent_res.wells()[2] # 4 + wash = wash_res.rows()[0][:NUM_COL] # 5 + waste = waste_res.wells()[0] + working_cols = working_plate.rows()[0][:NUM_COL] # 6 + working_wells = working_plate.wells()[: NUM_COL * 8] # 6 + if READY_FOR_SDSPAGE == 0: + final_cols = final_plate.rows()[0][:NUM_COL] + + def transfer_plate_to_plate( + vol1: float, start: List[Well], end: List[Well], liquid: int + ) -> None: + """Transfer from plate to plate.""" + for i in range(NUM_COL): + if liquid == 1: + p1000.pick_up_tip(tips_sample_loc[i * 8]) + else: + p1000.pick_up_tip(tips_elu_loc[i * 8]) + start_loc = start[i] + end_loc = end[i] + p1000.aspirate(vol1, start_loc.bottom(z=ASP_HEIGHT), rate=2) + p1000.air_gap(10) + p1000.dispense(vol1 + 10, end_loc.bottom(z=15), rate=2) + p1000.blow_out() + p1000.touch_tip() + p1000.return_tip() if not TIP_TRASH else p1000.drop_tip() + + def transfer_well_to_plate( + vol2: float, + start: Union[List[Well], Well], + end: List[Well], + liquid: int, + drop_height: int = -20, + ) -> None: + """Transfer from well to plate.""" + if liquid == 5 and type(start) == List: + p1000.pick_up_tip() + for j in range(NUM_COL): + start_loc = start[j] + p1000.require_liquid_presence(start_loc) + end_loc = end[j] + p1000.aspirate(vol2, start_loc.bottom(z=ASP_HEIGHT), rate=2) + p1000.air_gap(10) + p1000.dispense(vol2 + 10, end_loc.top(z=drop_height), rate=2) + p1000.blow_out() + p1000.return_tip() if not TIP_TRASH else p1000.drop_tip() + + elif type(start) == Well: + p1000_single.pick_up_tip() + vol = vol2 * 8 + p1000_single.mix(5, vol * 0.75, start.bottom(z=ASP_HEIGHT * 5), rate=2) + p1000_single.mix(5, vol * 0.75, start.bottom(z=ASP_HEIGHT * 20), rate=2) + for j in range(NUM_COL): + end_loc_gap = end[j * 8] + if liquid == 2: + p1000_single.mix( + 2, vol * 0.75, start.bottom(z=ASP_HEIGHT * 5), rate=2 + ) + p1000_single.require_liquid_presence(start) + p1000_single.aspirate(vol, start.bottom(z=ASP_HEIGHT * 5), rate=2) + p1000_single.air_gap(10) + p1000_single.dispense(10, end_loc_gap.top(z=-5)) + for jj in range(8): + end_loc = end[j * 8 + jj] + p1000_single.dispense(vol2, end_loc.bottom(z=10), rate=0.75) + p1000_single.touch_tip() + p1000_single.blow_out() + p1000_single.return_tip() if not TIP_TRASH else p1000.drop_tip() + + def discard(vol3: float, start: List[Well]) -> None: + """Discard function.""" + global waste_vol + global waste_vol_chk + if waste_vol_chk >= WASTE_VOL_MAX: + ctx.pause("Empty Liquid Waste") + waste_vol_chk = 0 + waste_vol = 0.0 + for k in range(NUM_COL): + p1000.pick_up_tip(tips_reused_loc[k * 8]) + start_loc = start[k] + end_loc = waste + p1000.aspirate(vol3, start_loc.bottom(z=ASP_HEIGHT), rate=0.3) + p1000.air_gap(10) + p1000.dispense(vol3 + 10, end_loc.top(z=-5), rate=2) + p1000.blow_out() + p1000.return_tip() + waste_vol = vol3 * NUM_COL * 8.0 + waste_vol_chk = waste_vol_chk + waste_vol + + # protocol + + # Add beads, samples and antibody solution + h_s.close_labware_latch() + transfer_well_to_plate(BEADS_VOL, beads, working_wells, 2) + + helpers.move_labware_from_hs_to_mag_block(ctx, working_plate, h_s, mag) + + ctx.delay(minutes=MAG_DELAY_MIN) + discard(BEADS_VOL * 1.1, working_cols) + + helpers.move_labware_to_hs(ctx, working_plate, h_s, h_s_adapter) + + transfer_plate_to_plate(SAMPLE_VOL, samples, working_cols, 1) + transfer_well_to_plate(AB_VOL, ab, working_wells, 3) + + h_s.set_and_wait_for_shake_speed(rpm=MIX_SPEED) + ctx.delay(seconds=MIX_SEC) + + h_s.set_and_wait_for_shake_speed(rpm=INCUBATION_SPEEND) + ctx.delay(seconds=INCUBATION_MIN * 60) + h_s.deactivate_shaker() + + helpers.move_labware_from_hs_to_mag_block(ctx, working_plate, h_s, mag) + + ctx.delay(minutes=MAG_DELAY_MIN) + vol_total = SAMPLE_VOL + AB_VOL + discard(vol_total * 1.1, working_cols) + + # Wash + for _ in range(WASH_TIMES): + helpers.move_labware_to_hs(ctx, working_plate, h_s, h_s_adapter) + + transfer_well_to_plate(WASH_VOL, wash, working_cols, 5) + helpers.set_hs_speed(ctx, h_s, MIX_SPEED, MIX_SEC / 60, True) + helpers.move_labware_from_hs_to_mag_block(ctx, working_plate, h_s, mag) + ctx.delay(minutes=MAG_DELAY_MIN) + discard(WASH_VOL * 1.1, working_cols) + + # Elution + helpers.move_labware_to_hs(ctx, working_plate, h_s, h_s_adapter) + + transfer_well_to_plate(ELUTION_VOL, elu, working_wells, 4) + if READY_FOR_SDSPAGE == 1: + ctx.pause("Seal the Working Plate") + h_s.set_and_wait_for_temperature(70) + helpers.set_hs_speed(ctx, h_s, MIX_SPEED, (MIX_SEC / 60) + 10, True) + h_s.deactivate_heater() + h_s.open_labware_latch() + ctx.pause("Protocol Complete") + + elif READY_FOR_SDSPAGE == 0: + helpers.set_hs_speed(ctx, h_s, MIX_SPEED, (MIX_SEC / 60) + 2, True) + + temp.set_temperature(4) + helpers.move_labware_from_hs_to_mag_block(ctx, working_plate, h_s, mag) + ctx.delay(minutes=MAG_DELAY_MIN) + transfer_plate_to_plate(ELUTION_VOL * 1.1, working_cols, final_cols, 6) + temp.deactivate() diff --git a/abr-testing/abr_testing/protocols/api 2.20/12_KAPA HyperPlus Library Prep.py b/abr-testing/abr_testing/protocols/api 2.20/12_KAPA HyperPlus Library Prep.py new file mode 100644 index 00000000000..d60cfe405fd --- /dev/null +++ b/abr-testing/abr_testing/protocols/api 2.20/12_KAPA HyperPlus Library Prep.py @@ -0,0 +1,1254 @@ +"""KAPA HyperPlus Library Preparation.""" +from opentrons.protocol_api import ( + ProtocolContext, + ParameterContext, + Labware, + Well, + InstrumentContext, +) +from opentrons import types +import math +from abr_testing.protocols import helpers +from opentrons.protocol_api.module_contexts import ( + TemperatureModuleContext, + MagneticBlockContext, + ThermocyclerContext, +) +from opentrons.hardware_control.modules.types import ThermocyclerStep +from typing import List, Tuple, Optional + +metadata = { + "protocolName": "KAPA HyperPlus Library Preparation", + "author": "Your Name ", +} + +requirements = {"robotType": "Flex", "apiLevel": "2.20"} + +tt_50 = 0 +tt_200 = 0 +p50_rack_count = 0 +p200_rack_count = 0 +tip50 = 50 +tip200 = 200 +p50_racks_ondeck = [] +p200_racks_ondeck = [] +p50_racks_offdeck = [] +p200_racks_offdeck = [] + + +def add_parameters(parameters: ParameterContext) -> None: + """Parameters.""" + parameters.add_bool( + variable_name="dry_run", + display_name="Dry Run", + description="Skip incubation delays and shorten mix steps.", + default=False, + ) + parameters.add_bool( + variable_name="trash_tips", + display_name="Trash tip", + description="tip trashes after every use", + default=False, + ) + helpers.create_disposable_lid_parameter(parameters) + parameters.add_int( + variable_name="num_samples", + display_name="number of samples", + description="How many samples to be perform for library prep", + default=8, + minimum=8, + maximum=48, + ) + parameters.add_int( + variable_name="PCR_CYCLES", + display_name="number of PCR Cycles", + description="How many pcr cycles to be perform for library prep", + default=2, + minimum=2, + maximum=16, + ) + + parameters.add_int( + variable_name="Fragmentation_time", + display_name="time on thermocycler", + description="Fragmentation time in thermocycler", + default=10, + minimum=10, + maximum=30, + ) + + +def run(ctx: ProtocolContext) -> None: + """Protocol.""" + USE_GRIPPER = True + trash_tips = ctx.params.trash_tips # type: ignore[attr-defined] + dry_run = ctx.params.dry_run # type: ignore[attr-defined] + REUSE_ETOH_TIPS = False + REUSE_RSB_TIPS = ( + False # Reuse tips for RSB buffer (adding RSB, mixing, and transferring) + ) + REUSE_REMOVE_TIPS = False # Reuse tips for supernatant removal + num_samples = ctx.params.num_samples # type: ignore[attr-defined] + PCRCYCLES = ctx.params.PCR_CYCLES # type: ignore[attr-defined] + disposable_lid = ctx.params.disposable_lid # type: ignore[attr-defined] + Fragmentation_time = 10 + ligation_tc_time = 15 + used_lids: List[Labware] = [] + if dry_run: + trash_tips = False + + num_cols = math.ceil(num_samples / 8) + + # Pre-set parameters + sample_vol = 35 + frag_vol = 15 + end_repair_vol = 10 + adapter_vol = 5 + ligation_vol = 45 + amplification_vol = 30 + bead_vol_1 = 88 + bead_vol_2 = 50 + bead_vol = bead_vol_1 + bead_vol_2 + bead_inc = 2 + rsb_vol_1 = 25 + rsb_vol_2 = 20 + rsb_vol = rsb_vol_1 + rsb_vol_2 + elution_vol = 20 + elution_vol_2 = 17 + etoh_vol = 400 + + # Importing Labware, Modules and Instruments + magblock: MagneticBlockContext = ctx.load_module( + helpers.mag_str, "D2" + ) # type: ignore[assignment] + temp_mod: TemperatureModuleContext = ctx.load_module( + helpers.temp_str, "B3" + ) # type: ignore[assignment] + temp_adapter = temp_mod.load_adapter("opentrons_96_well_aluminum_block") + temp_plate = temp_adapter.load_labware( + "opentrons_96_wellplate_200ul_pcr_full_skirt", "Temp Module Reservoir Plate" + ) + + if not dry_run: + temp_mod.set_temperature(4) + tc_mod: ThermocyclerContext = ctx.load_module(helpers.tc_str) # type: ignore[assignment] + # Just in case + tc_mod.open_lid() + + FLP_plate = magblock.load_labware( + "opentrons_96_wellplate_200ul_pcr_full_skirt", "FLP Plate" + ) + samples_flp = FLP_plate.rows()[0][:num_cols] + + sample_plate = ctx.load_labware( + "opentrons_96_wellplate_200ul_pcr_full_skirt", "D1", "Sample Pate" + ) + + sample_plate_2 = ctx.load_labware( + "opentrons_96_wellplate_200ul_pcr_full_skirt", "B2", "Sample Pate" + ) + samples_2 = sample_plate_2.rows()[0][:num_cols] + samples = sample_plate.rows()[0][:num_cols] + reservoir = ctx.load_labware("nest_96_wellplate_2ml_deep", "C2") + + trash = ctx.load_waste_chute() + # Load TC Lids + unused_lids = helpers.load_disposable_lids(ctx, 5, "C3") + # Import Global Variables + + global tip50 + global tip200 + global p50_rack_count + global p200_rack_count + global tt_50 + global tt_200 + + p200 = ctx.load_instrument("flex_8channel_1000", "left") + p50 = ctx.load_instrument("flex_8channel_50", "right") + + Available_on_deck_slots = ["A2", "A3", "B3"] + Available_off_deck_slots = ["A4", "B4"] + p50_racks_to_dump = [] + p200_racks_to_dump = [] + + if REUSE_RSB_TIPS: + Available_on_deck_slots.remove("A3") + tip50_reuse = ctx.load_labware("opentrons_flex_96_tiprack_50ul", "A3") + RSB_tip = [] + p50_rack_count += 1 + tt_50 += 12 + p50.tip_racks.append(tip50_reuse) + ctx.comment(f"Adding 50 ul tip rack #{p50_rack_count}") + for x in range(num_cols): + RSB_tip.append(tip50_reuse.wells()[8 * x]) + tt_50 -= 1 + p50.starting_tip = tip50_reuse.wells()[(len(RSB_tip)) * 8] + + if REUSE_REMOVE_TIPS: + Available_on_deck_slots.remove("A2") + tip200_reuse = ctx.load_labware("opentrons_flex_96_tiprack_200ul", "A2") + RemoveSup_tip = [] + p200_rack_count += 1 + tt_200 += 12 + p200.tip_racks.append(tip200_reuse) + ctx.comment(f"Adding 200 ul tip rack #{p200_rack_count}") + for x in range(num_cols): + RemoveSup_tip.append(tip200_reuse.wells()[8 * x]) + tt_200 -= 1 + p200.starting_tip = tip200_reuse.wells()[(len(RemoveSup_tip)) * 8] + + # Load Reagent Locations in Reservoirs + + # Sample Plate + sample_liq = ctx.define_liquid( + name="Samples", + description="DNA sample of known quantity", + display_color="#C0C0C0", + ) + for well in sample_plate.wells()[: 8 * num_cols]: + well.load_liquid(liquid=sample_liq, volume=sample_vol) + + Final_liq = ctx.define_liquid( + name="Final Library", description="Final Library", display_color="#FFA500" + ) + for well in sample_plate_2.wells()[: 8 * num_cols]: + well.load_liquid(liquid=Final_liq, volume=elution_vol_2) + + # Cold Res + + adapters = temp_plate.rows()[0][:num_cols] # used for filling liquids + adapter_liq = ctx.define_liquid( + name="Adapters", + description="Adapters to ligate onto DNA insert.", + display_color="#A52A2A", + ) + for well in temp_plate.wells()[: 8 * num_cols]: + well.load_liquid(liquid=adapter_liq, volume=adapter_vol * 2) + + end_repair_cols: List[Well] = temp_plate.columns()[ + num_cols + ] # used for filling liquids + er_res = end_repair_cols[0] + er_liq = ctx.define_liquid( + name="End Repair", description="End Repair mix", display_color="#FF00FF" + ) + for well in end_repair_cols: + well.load_liquid( + liquid=er_liq, volume=(end_repair_vol * num_cols) + (0.1 * end_repair_vol) + ) + + frag: List[Well] = temp_plate.columns()[num_cols + 1] + frag_res = frag[0] + frag_liq = ctx.define_liquid( + name="Fragmentation", description="Fragmentation mix", display_color="#00FFFF" + ) + for well in frag: + well.load_liquid( + liquid=frag_liq, volume=(frag_vol * num_cols) + (0.1 * frag_vol) + ) + + ligation: List[Well] = temp_plate.columns()[num_cols + 2] + ligation_res = ligation[0] + ligation_liq = ctx.define_liquid( + name="Ligation", description="Ligation Mix", display_color="#008000" + ) + for well in ligation: + well.load_liquid( + liquid=ligation_liq, volume=(ligation_vol * num_cols) + (0.1 * ligation_vol) + ) + + lib_amplification_wells: List[Well] = temp_plate.columns()[num_cols + 3] + amplification_res = lib_amplification_wells[0] + amp_liq = ctx.define_liquid( + name="Amplification", description="Amplification Mix", display_color="#0000FF" + ) + for well in lib_amplification_wells: + well.load_liquid( + liquid=amp_liq, + volume=(amplification_vol * num_cols) + (0.1 * amplification_vol), + ) + + # Room Temp Res (deepwell) + bead = reservoir.columns()[0] + bead_res = bead[0] + bead_liq = ctx.define_liquid( + name="Ampure Beads", description="Ampure Beads", display_color="#800080" + ) + for well in bead: + well.load_liquid( + liquid=bead_liq, volume=(bead_vol * num_cols) + (0.1 * bead_vol * num_cols) + ) + + rsb = reservoir.columns()[3] + rsb_res = rsb[0] + rsb_liq = ctx.define_liquid( + name="RSB", description="Resuspension buffer", display_color="#FFFF00" + ) + for well in rsb: + well.load_liquid( + liquid=rsb_liq, volume=(rsb_vol * num_cols) + (0.1 * rsb_vol * num_cols) + ) + + etoh1 = reservoir.columns()[4] + etoh1_res = etoh1[0] + etoh_liq = ctx.define_liquid( + name="Ethanol 80%", description="Fresh 80% Ethanol", display_color="#FF00FF" + ) + for well in etoh1: + well.load_liquid( + liquid=etoh_liq, volume=(etoh_vol * num_cols) + (0.1 * etoh_vol * num_cols) + ) + + etoh2 = reservoir.columns()[5] + etoh2_res = etoh2[0] + for well in etoh2: + well.load_liquid( + liquid=etoh_liq, volume=(etoh_vol * num_cols) + (0.1 * etoh_vol * num_cols) + ) + + waste1 = reservoir.columns()[6] + waste1_res = waste1[0] + + waste2 = reservoir.columns()[7] + waste2_res = waste2[0] + + def tiptrack(rack: int, reuse_col: Optional[int], reuse: bool = False) -> None: + """Tip Track.""" + global tt_50 + global tt_200 + global p50_racks_ondeck + global p200_racks_ondeck + global p50_racks_offdeck + global p200_racks_offdeck + global p50_rack_count + global p200_rack_count + + if rack == tip50: + if ( + tt_50 == 0 and not reuse + ): # If this is the first column of tip box and these aren't reused tips + ctx.comment("Troubleshoot") + if len(Available_on_deck_slots) > 0: + avail_slot = Available_on_deck_slots[0] + p50_rack_count += 1 + tt_50 += 12 + addtiprack = ctx.load_labware( + "opentrons_flex_96_tiprack_50ul", + avail_slot, + f"50 ul Tip Rack #{p50_rack_count}", + ) + ctx.comment( + f"Add 50 ul tip rack #{p50_rack_count} to slot {avail_slot}." + ) + Available_on_deck_slots.pop(0) + p50_racks_ondeck.append(addtiprack) + p50_racks_to_dump.append(addtiprack) + p50.tip_racks.append(addtiprack) + elif ( + len(Available_on_deck_slots) == 0 + and len(Available_off_deck_slots) > 0 + ): + p50_rack_count += 1 + tt_50 += 12 + addtiprack = ctx.load_labware( + "opentrons_flex_96_tiprack_50ul", + Available_off_deck_slots[0], + f"50 ul Tip Rack #{p50_rack_count}", + ) + Available_off_deck_slots.pop( + 0 + ) # Load rack into staging area slot to be moved on deck + ctx.comment(f"Adding 50 ul tip rack #{p50_rack_count}") + p50_racks_offdeck.append( + addtiprack + ) # used in TipSwap then deleted once it is moved + p50.tip_racks.append( + addtiprack + ) # lets pipette know it can use this rack now + TipSwap( + 50 + ) # Throw first tip box out and replace with a box from staging area + elif ( + len(Available_on_deck_slots) == 0 + and len(Available_off_deck_slots) == 0 + ): # If there are no tip racks on deck or in staging area to use + ctx.pause("Please place a new 50ul Tip Rack in slot A4") + p50_rack_count += 1 + tt_50 += 12 + addtiprack = ctx.load_labware( + "opentrons_flex_96_tiprack_50ul", + "A4", + f"50 ul Tip Rack #{p50_rack_count}", + ) + ctx.comment(f"Adding 50 ul tip rack #{p50_rack_count}") + p50_racks_offdeck.append( + addtiprack + ) # used in TipSwap, then deleted once it is moved + p50.tip_racks.append( + addtiprack + ) # lets pipette know it can use this rack now + TipSwap( + 50 + ) # Throw first tip box out and replace with a box from staging area + # Call where tips will actually be picked up + if reuse and REUSE_RSB_TIPS and reuse_col: + p50.pick_up_tip(tip50_reuse.wells()[8 * reuse_col]) + else: + tt_50 -= 1 + ctx.comment("Column " + str(12 - tt_50)) + ctx.comment( + "Available On Deck Slots:" + str(len(Available_on_deck_slots)) + ) + ctx.comment( + "Available Off Deck Slots:" + str(len(Available_off_deck_slots)) + ) + p50.pick_up_tip() + + if rack == tip200: + if ( + tt_200 == 0 and not reuse + ): # If this is the first column of tip box and these aren't reused tips + if len(Available_on_deck_slots) > 0: + avail_slot = Available_on_deck_slots[0] + p200_rack_count += 1 + tt_200 += 12 + addtiprack = ctx.load_labware( + "opentrons_flex_96_tiprack_200ul", + avail_slot, + f"200 ul Tip Rack #{p200_rack_count}", + ) + ctx.comment( + f"Adding 200 ul tip rack #{p200_rack_count} to slot {avail_slot}" + ) + Available_on_deck_slots.pop(0) + p200_racks_ondeck.append(addtiprack) + p200_racks_to_dump.append(addtiprack) + p200.tip_racks.append(addtiprack) + elif ( + len(Available_on_deck_slots) == 0 + and len(Available_off_deck_slots) > 0 + ): + p200_rack_count += 1 + tt_200 += 12 + addtiprack = ctx.load_labware( + "opentrons_flex_96_tiprack_200ul", + Available_off_deck_slots[0], + f"200 ul Tip Rack #{p200_rack_count}", + ) + Available_off_deck_slots.pop( + 0 + ) # Load rack into staging area slot to be moved on deck + ctx.comment(f"Adding 200 ul tip rack #{p200_rack_count}") + p200_racks_offdeck.append( + addtiprack + ) # used in TipSwap then deleted once it is moved + p200.tip_racks.append( + addtiprack + ) # lets pipette know it can use this rack now + TipSwap( + 200 + ) # Throw first tip box out and replace with a box from staging area + elif ( + len(Available_on_deck_slots) == 0 + and len(Available_off_deck_slots) == 0 + ): # If there are no tip racks on deck or in staging area to use + ctx.pause("Please place a new 200ul Tip Rack in slot B4") + p200_rack_count += 1 + tt_200 += 12 + addtiprack = ctx.load_labware( + "opentrons_flex_96_tiprack_200ul", + "B4", + f"200 ul Tip Rack #{p200_rack_count}", + ) + ctx.comment(f"Adding 200 ul tip rack #{p200_rack_count}") + p200_racks_offdeck.append( + addtiprack + ) # used in TipSwap, then deleted once it is moved + p200.tip_racks.append( + addtiprack + ) # lets pipette know it can use this rack now + TipSwap( + 200 + ) # Throw first tip box out and replace with a box from staging area + # Call where tips will actually be picked up + if reuse and REUSE_REMOVE_TIPS and reuse_col: + p200.pick_up_tip(tip200_reuse.wells()[8 * reuse_col]) + else: + tt_200 -= 1 + ctx.comment("Column " + str(12 - tt_200)) + ctx.comment( + "Available On Deck Slots:" + str(len(Available_on_deck_slots)) + ) + ctx.comment( + "Available Off Deck Slots:" + str(len(Available_off_deck_slots)) + ) + p200.pick_up_tip() + + def TipSwap(tipvol: int) -> None: + """Tip swap.""" + if tipvol == 50: + rack_to_dispose = p50_racks_to_dump[0] + rack_to_add = p50_racks_offdeck[0] + deck_slot = p50_racks_to_dump[0].parent + p50_racks_ondeck.append(rack_to_add) + p50_racks_to_dump.pop(0) + p50_racks_to_dump.append(rack_to_add) + p50_racks_ondeck.pop(0) + p50_racks_offdeck.pop(0) + + if tipvol == 200: + rack_to_dispose = p200_racks_to_dump[0] + rack_to_add = p200_racks_offdeck[0] + deck_slot = p200_racks_to_dump[0].parent + p200_racks_ondeck.append(rack_to_add) + p200_racks_to_dump.pop(0) + p200_racks_to_dump.append(rack_to_add) + p200_racks_ondeck.pop(0) + p200_racks_offdeck.pop(0) + + ctx.move_labware( + labware=rack_to_dispose, new_location=trash, use_gripper=USE_GRIPPER + ) + ctx.move_labware( + labware=rack_to_add, new_location=deck_slot, use_gripper=USE_GRIPPER + ) + ctx.comment( + f"Threw out: {rack_to_dispose} and placed {rack_to_add} to {deck_slot}" + ) + + def run_tag_profile( + unused_lids: List[Labware], used_lids: List[Labware] + ) -> Tuple[List[Labware], List[Labware]]: + """Run Tag Profile.""" + # Presetting Thermocycler Temps + ctx.comment( + "****Starting Fragmentation Profile (37C for 10 minutes with 100C lid)****" + ) + tc_mod.set_lid_temperature(100) + tc_mod.set_block_temperature(37) + + # Move Plate to TC + ctx.comment("****Moving Plate to Pre-Warmed TC Module Block****") + ctx.move_labware(sample_plate, tc_mod, use_gripper=USE_GRIPPER) + + if disposable_lid: + lid_on_plate, unused_lids, used_lids = helpers.use_disposable_lid_with_tc( + ctx, unused_lids, used_lids, sample_plate, tc_mod + ) + else: + tc_mod.close_lid() + tc_mod.set_block_temperature( + temperature=37, hold_time_minutes=Fragmentation_time, block_max_volume=50 + ) + tc_mod.open_lid() + + if disposable_lid: + if len(used_lids) <= 1: + ctx.move_labware(lid_on_plate, "C4", use_gripper=True) + else: + ctx.move_labware(lid_on_plate, used_lids[-2], use_gripper=True) + # #Move Plate to H-S + ctx.comment("****Moving Plate off of TC****") + + ctx.move_labware(sample_plate, "D1", use_gripper=USE_GRIPPER) + return unused_lids, used_lids + + def run_er_profile( + unused_lids: List[Labware], used_lids: List[Labware] + ) -> Tuple[List[Labware], List[Labware]]: + """End Repair Profile.""" + # Presetting Thermocycler Temps + ctx.comment( + "****Starting End Repair Profile (65C for 30 minutes with 100C lid)****" + ) + tc_mod.set_lid_temperature(100) + tc_mod.set_block_temperature(65) + + # Move Plate to TC + ctx.comment("****Moving Plate to Pre-Warmed TC Module Block****") + ctx.move_labware(sample_plate, tc_mod, use_gripper=USE_GRIPPER) + + if disposable_lid: + lid_on_plate, unused_lids, used_lids = helpers.use_disposable_lid_with_tc( + ctx, unused_lids, used_lids, sample_plate, tc_mod + ) + else: + tc_mod.close_lid() + tc_mod.set_block_temperature( + temperature=65, hold_time_minutes=30, block_max_volume=50 + ) + + tc_mod.deactivate_block() + tc_mod.open_lid() + + if disposable_lid: + # move lid + if len(used_lids) <= 1: + ctx.move_labware(lid_on_plate, "C4", use_gripper=True) + else: + ctx.move_labware(lid_on_plate, used_lids[-2], use_gripper=True) + # #Move Plate to H-S + ctx.comment("****Moving Plate off of TC****") + + ctx.move_labware(sample_plate, "D1", use_gripper=USE_GRIPPER) + return unused_lids, used_lids + + def run_ligation_profile( + unused_lids: List[Labware], used_lids: List[Labware] + ) -> Tuple[List[Labware], List[Labware]]: + """Run Ligation Profile.""" + # Presetting Thermocycler Temps + ctx.comment( + "****Starting Ligation Profile (20C for 15 minutes with 100C lid)****" + ) + tc_mod.set_lid_temperature(100) + tc_mod.set_block_temperature(20) + + # Move Plate to TC + ctx.comment("****Moving Plate to Pre-Warmed TC Module Block****") + + ctx.move_labware(sample_plate, tc_mod, use_gripper=USE_GRIPPER) + + if disposable_lid: + lid_on_plate, unused_lids, used_lids = helpers.use_disposable_lid_with_tc( + ctx, unused_lids, used_lids, sample_plate, tc_mod + ) + else: + tc_mod.close_lid() + tc_mod.set_block_temperature( + temperature=20, hold_time_minutes=ligation_tc_time, block_max_volume=50 + ) + + tc_mod.deactivate_block() + + tc_mod.open_lid() + # Move lid + tc_mod.open_lid() + if disposable_lid: + if len(used_lids) <= 1: + ctx.move_labware(lid_on_plate, "C4", use_gripper=True) + else: + ctx.move_labware(lid_on_plate, used_lids[-2], use_gripper=True) + + # #Move Plate to H-S + ctx.comment("****Moving Plate off of TC****") + + ctx.move_labware(sample_plate, "D1", use_gripper=USE_GRIPPER) + return unused_lids, used_lids + + def run_amplification_profile( + unused_lids: List[Labware], used_lids: List[Labware] + ) -> Tuple[List[Labware], List[Labware]]: + """Run Amplification Profile.""" + # Presetting Thermocycler Temps + ctx.comment( + "Amplification Profile (37C for 5 min, 50C for 5 min with 100C lid)" + ) + tc_mod.set_lid_temperature(100) + tc_mod.set_block_temperature(98) + + # Move Plate to TC + ctx.comment("****Moving Sample Plate onto TC****") + ctx.move_labware(sample_plate_2, tc_mod, use_gripper=USE_GRIPPER) + + if not dry_run: + tc_mod.set_lid_temperature(105) + if disposable_lid: + lid_on_plate, unused_lids, used_lids = helpers.use_disposable_lid_with_tc( + ctx, unused_lids, used_lids, sample_plate_2, tc_mod + ) + else: + tc_mod.close_lid() + if not dry_run: + profile_PCR_1: List[ThermocyclerStep] = [ + {"temperature": 98, "hold_time_seconds": 45} + ] + tc_mod.execute_profile( + steps=profile_PCR_1, repetitions=1, block_max_volume=50 + ) + profile_PCR_2: List[ThermocyclerStep] = [ + {"temperature": 98, "hold_time_seconds": 15}, + {"temperature": 60, "hold_time_seconds": 30}, + {"temperature": 72, "hold_time_seconds": 30}, + ] + tc_mod.execute_profile( + steps=profile_PCR_2, repetitions=PCRCYCLES, block_max_volume=50 + ) + profile_PCR_3: List[ThermocyclerStep] = [ + {"temperature": 72, "hold_time_minutes": 1} + ] + tc_mod.execute_profile( + steps=profile_PCR_3, repetitions=1, block_max_volume=50 + ) + tc_mod.set_block_temperature(4) + tc_mod.open_lid() + if disposable_lid: + if len(used_lids) <= 1: + ctx.move_labware(lid_on_plate, "C4", use_gripper=True) + else: + ctx.move_labware(lid_on_plate, used_lids[-2], use_gripper=True) + + # Move Sample Plate to H-S + ctx.comment("****Moving Sample Plate back to H-S****") + ctx.move_labware(sample_plate_2, "D1", use_gripper=USE_GRIPPER) + # get FLP plate out of the way + ctx.comment("****Moving FLP Plate back to TC****") + ctx.move_labware(FLP_plate, tc_mod, use_gripper=USE_GRIPPER) + return unused_lids, used_lids + + def mix_beads( + pip: InstrumentContext, res: Well, vol: int, reps: int, col: int + ) -> None: + """Mix beads function.""" + # Multiplier tells + mix_vol = (num_cols - col) * vol + if pip == p50: + if mix_vol > 50: + mix_vol = 50 + if pip == p200: + if mix_vol > 200: + mix_vol = 200 + + if res == bead_res: + width = res.width + else: + width = res.diameter + if width: + move = (width / 2) - 1 + + loc_center_a = res.bottom().move(types.Point(x=0, y=0, z=0.5)) + loc_center_d = res.bottom().move(types.Point(x=0, y=0, z=0.5)) + loc1 = res.bottom().move(types.Point(x=move, y=0, z=5)) + loc2 = res.bottom().move(types.Point(x=0, y=move, z=5)) + loc3 = res.bottom().move(types.Point(x=-move, y=0, z=5)) + loc4 = res.bottom().move(types.Point(x=0, y=-move, z=5)) + loc5 = res.bottom().move(types.Point(x=move / 2, y=move / 2, z=5)) + loc6 = res.bottom().move(types.Point(x=-move / 2, y=move / 2, z=5)) + loc7 = res.bottom().move(types.Point(x=-move / 2, y=-move / 2, z=5)) + loc8 = res.bottom().move(types.Point(x=move / 2, y=-move / 2, z=5)) + + loc = [loc_center_d, loc1, loc5, loc2, loc6, loc3, loc7, loc4, loc8] + + pip.aspirate( + mix_vol, res.bottom().move(types.Point(x=0, y=0, z=10)) + ) # Blow bubbles to start + pip.dispense(mix_vol, loc_center_d) + for x in range(reps): + pip.aspirate(mix_vol, loc_center_a) + pip.dispense(mix_vol, loc[x]) + pip.flow_rate.aspirate = 10 + pip.flow_rate.dispense = 10 + pip.aspirate(mix_vol, loc_center_a) + pip.dispense(mix_vol, loc_center_d) + pip.flow_rate.aspirate = 150 + pip.flow_rate.dispense = 150 + + def remove_supernatant(well: Well, vol: float, waste_: Well, column: int) -> None: + """Remove supernatant.""" + ctx.comment("-------Removing " + str(vol) + "ul of Supernatant-------") + p200.flow_rate.aspirate = 15 + num_trans = math.ceil(vol / 190) + vol_per_trans = vol / num_trans + for x in range(num_trans): + tiptrack(tip200, column, reuse=True if REUSE_REMOVE_TIPS else False) + p200.aspirate(vol_per_trans / 2, well.bottom(0.2)) + ctx.delay(seconds=1) + p200.aspirate(vol_per_trans / 2, well.bottom(0.2)) + p200.air_gap(10) + p200.dispense(p200.current_volume, waste_) + p200.air_gap(10) + if REUSE_REMOVE_TIPS: + p200.return_tip() + ctx.comment("****Dropping Tip Back in Tip Box****") + else: + if trash_tips: + p200.drop_tip() + ctx.comment("****Dropping Tip in Waste shoot****") + else: + p200.return_tip() + ctx.comment("****Dropping Tip Back in Tip Box****") + p200.flow_rate.aspirate = 150 + + def Fragmentation( + unused_lids: List[Labware], used_lids: List[Labware] + ) -> Tuple[List[Labware], List[Labware]]: + """Fragmentation Function.""" + ctx.comment("-------Starting Fragmentation-------") + + for i in range(num_cols): + + ctx.comment("Mixing and Transfering beads to column " + str(i + 1)) + + tiptrack(tip50, None, reuse=False) + p50.flow_rate.dispense = 15 + p50.aspirate(frag_vol, frag_res) + p50.dispense(p50.current_volume, samples[i]) + p50.flow_rate.dispense = 150 + for x in range(10 if not dry_run else 1): + if x == 9: + p50.flow_rate.aspirate = 15 + p50.flow_rate.dispense = 15 + p50.aspirate(frag_vol, samples[i].bottom(1)) + p50.dispense(p50.current_volume, samples[i].bottom(5)) + p50.flow_rate.aspirate = 150 + p50.flow_rate.dispense = 150 + if trash_tips: + p50.drop_tip() + ctx.comment("****Dropping Tip in Waste shoot****") + else: + p50.return_tip() + ctx.comment("****Dropping Tip Back in Tip Box****") + + unused_lids, used_lids = run_tag_profile( + unused_lids, used_lids + ) # Heats TC --> moves plate to TC --> TAG Profile --> removes plate from TC + return unused_lids, used_lids + + def end_repair( + unused_lids: List[Labware], used_lids: List[Labware] + ) -> Tuple[List[Labware], List[Labware]]: + """End Repair Function.""" + ctx.comment("-------Starting end_repair-------") + + for i in range(num_cols): + + ctx.comment( + "**** Mixing and Transfering beads to column " + str(i + 1) + " ****" + ) + + tiptrack(tip50, None, reuse=False) + p50.flow_rate.dispense = 15 + p50.aspirate(end_repair_vol, er_res) + p50.dispense(p50.current_volume, samples[i]) + p50.flow_rate.dispense = 150 + for x in range(10 if not dry_run else 1): + if x == 9: + p50.flow_rate.aspirate = 15 + p50.flow_rate.dispense = 15 + p50.aspirate(end_repair_vol, samples[i].bottom(1)) + p50.dispense(p50.current_volume, samples[i].bottom(5)) + p50.flow_rate.aspirate = 150 + p50.flow_rate.dispense = 150 + if trash_tips: + p50.drop_tip() + ctx.comment("****Dropping Tip in Waste shoot****") + else: + p50.return_tip() + ctx.comment("****Dropping Tip Back in Tip Box****") + + unused_lids, used_lids = run_er_profile( + unused_lids, used_lids + ) # Heats TC --> moves plate to TC --> TAG Profile --> removes plate from TC + return unused_lids, used_lids + + # Index Ligation + + def index_ligation( + unused_lids: List[Labware], used_lids: List[Labware] + ) -> Tuple[List[Labware], List[Labware]]: + """Index Ligation.""" + ctx.comment("-------Ligating Indexes-------") + ctx.comment("-------Adding and Mixing ELM-------") + for i in samples: + tiptrack(tip50, None, reuse=False) + p50.aspirate(ligation_vol, ligation_res) + p50.dispense(p50.current_volume, i) + for x in range(10 if not dry_run else 1): + if x == 9: + p50.flow_rate.aspirate = 75 + p50.flow_rate.dispense = 75 + p50.aspirate(ligation_vol - 10, i) + p50.dispense(p50.current_volume, i.bottom(8)) + p50.flow_rate.aspirate = 150 + p50.flow_rate.dispense = 150 + if trash_tips: + p50.drop_tip() + ctx.comment("****Dropping Tip in Waste shoot****") + else: + p50.return_tip() + ctx.comment("****Dropping Tip Back in Tip Box****") + + # Add and mix adapters + ctx.comment("-------Adding and Mixing Adapters-------") + for i_well, x_well in zip(samples, adapters): + tiptrack(tip50, None, reuse=False) + p50.aspirate(adapter_vol, x_well) + p50.dispense(p50.current_volume, i_well) + for y in range(10 if not dry_run else 1): + if y == 9: + p50.flow_rate.aspirate = 75 + p50.flow_rate.dispense = 75 + p50.aspirate(40, i_well) + p50.dispense(40, i_well.bottom(8)) + if trash_tips: + p50.drop_tip() + ctx.comment("****Dropping Tip in Waste shoot****") + else: + p50.return_tip() + ctx.comment("****Dropping Tip Back in Tip Box****") + + p50.flow_rate.aspirate = 150 + p50.flow_rate.dispense = 150 + + unused_lids, used_lids = run_ligation_profile(unused_lids, used_lids) + return unused_lids, used_lids + + def lib_cleanup() -> None: + """Litigation Clean up.""" + ctx.comment("-------Starting Cleanup-------") + ctx.comment("-------Adding and Mixing Cleanup Beads-------") + + # Move FLP plate off magnetic module if it's there + if FLP_plate.parent == magblock: + ctx.comment("****Moving FLP Plate off Magnetic Module****") + ctx.move_labware(FLP_plate, tc_mod, use_gripper=USE_GRIPPER) + + for x, i in enumerate(samples): + tiptrack(tip200, None, reuse=False) + mix_beads(p200, bead_res, bead_vol_1, 7 if x == 0 else 2, x) + p200.aspirate(bead_vol_1, bead_res) + p200.dispense(bead_vol_1, i) + mix_beads(p200, i, bead_vol_1, 7 if not dry_run else 1, num_cols - 1) + for x in range(10 if not dry_run else 1): + if x == 9: + p200.flow_rate.aspirate = 75 + p200.flow_rate.dispense = 75 + p200.aspirate(bead_vol_1, i) + p200.dispense(bead_vol_1, i.bottom(8)) + p200.flow_rate.aspirate = 150 + p200.flow_rate.dispense = 150 + if trash_tips: + p200.drop_tip() + ctx.comment("****Dropping Tip in Waste shoot****") + else: + p200.return_tip() + ctx.comment("****Dropping Tip Back in Tip Box****") + + ctx.delay( + minutes=bead_inc, + msg="Please wait " + + str(bead_inc) + + " minutes while samples incubate at RT.", + ) + + ctx.comment("****Moving Labware to Magnet for Pelleting****") + ctx.move_labware(sample_plate, magblock, use_gripper=USE_GRIPPER) + + ctx.delay(minutes=4.5, msg="Time for Pelleting") + + for col, i in enumerate(samples): + remove_supernatant(i, 130, waste1_res, col) + samp_list = samples + + # Wash 2 x with 80% Ethanol + p200.flow_rate.aspirate = 75 + p200.flow_rate.dispense = 75 + for y in range(2 if not dry_run else 1): + ctx.comment(f"-------Wash # {y+1} with Ethanol-------") + if y == 0: # First wash + this_res = etoh1_res + this_waste_res = waste1_res + else: # Second Wash + this_res = etoh2_res + this_waste_res = waste2_res + if REUSE_ETOH_TIPS: + tiptrack(tip200, None, reuse=False) + for i in samp_list: + if not REUSE_ETOH_TIPS: + tiptrack(tip200, None, reuse=False) + p200.aspirate(150, this_res) + p200.air_gap(10) + p200.dispense(p200.current_volume, i.top()) + ctx.delay(seconds=1) + p200.air_gap(10) + if not REUSE_ETOH_TIPS: + p200.drop_tip() if trash_tips else p200.return_tip() + + ctx.delay(seconds=10) + # Remove the ethanol wash + for x, i in enumerate(samp_list): + if REUSE_ETOH_TIPS: + if x != 0: + tiptrack(tip200, None, reuse=False) + if not REUSE_ETOH_TIPS: + tiptrack(tip200, None, reuse=False) + p200.aspirate(155, i) + p200.air_gap(10) + p200.dispense(p200.current_volume, this_waste_res) + ctx.delay(seconds=1) + p200.air_gap(10) + if trash_tips: + p200.drop_tip() + ctx.comment("****Dropping Tip in Waste shoot****") + else: + p200.return_tip() + ctx.comment("****Dropping Tip Back in Tip Box****") + + p200.flow_rate.aspirate = 150 + p200.flow_rate.dispense = 150 + + # Wash complete, move on to drying steps. + ctx.delay(minutes=2, msg="Allow 3 minutes for residual ethanol to dry") + + # Return Plate to H-S from Magnet + + ctx.comment("****Moving sample plate off of Magnet****") + ctx.move_labware(sample_plate, "D1", use_gripper=USE_GRIPPER) + + # Adding RSB and Mixing + + for col, i in enumerate(samp_list): + ctx.comment(f"****Adding RSB to Columns: {col+1}****") + tiptrack(tip50, col, reuse=True if REUSE_RSB_TIPS else False) + p50.aspirate(rsb_vol_1, rsb_res) + p50.air_gap(5) + p50.dispense(p50.current_volume, i) + for x in range(10 if not dry_run else 1): + if x == 9: + p50.flow_rate.aspirate = 15 + p50.flow_rate.dispense = 15 + p50.aspirate(15, i.bottom(1)) + p50.dispense(15, i.bottom(4)) + p50.flow_rate.aspirate = 100 + p50.flow_rate.dispense = 100 + p50.air_gap(5) + if REUSE_RSB_TIPS: + p50.return_tip() + ctx.comment("****Dropping Tip Back in Tip Box****") + else: + if trash_tips: + p50.drop_tip() + ctx.comment("****Dropping Tip in Waste shoot****") + else: + p50.return_tip() + ctx.comment("****Dropping Tip Back in Tip Box****") + + ctx.delay( + minutes=3, msg="Allow 3 minutes for incubation and liquid aggregation." + ) + + ctx.comment("****Move Samples to Magnet for Pelleting****") + ctx.move_labware(sample_plate, magblock, use_gripper=USE_GRIPPER) + + ctx.delay(minutes=2, msg="Please allow 2 minutes for beads to pellet.") + + p200.flow_rate.aspirate = 10 + for i_int, (s, e) in enumerate(zip(samp_list, samples_2)): + tiptrack(tip50, i_int, reuse=True if REUSE_RSB_TIPS else False) + p50.aspirate(elution_vol, s) + p50.air_gap(5) + p50.dispense(p50.current_volume, e.bottom(1), push_out=3) + p50.air_gap(5) + if trash_tips: + p50.drop_tip() + ctx.comment("****Dropping Tip in Waste shoot****") + else: + p50.return_tip() + ctx.comment("****Dropping Tip Back in Tip Box****") + + # move new sample plate to D1 or heatershaker + ctx.comment("****Moving sample plate off of Magnet****") + ctx.move_labware(sample_plate_2, "D1", use_gripper=USE_GRIPPER) + + # Keep Sample PLate 1 to B2 + ctx.comment("****Moving Sample_plate_1 Plate off magnet to B2****") + ctx.move_labware(sample_plate, "B2", use_gripper=USE_GRIPPER) + + ctx.comment("****Moving FLP Plate off TC****") + ctx.move_labware(FLP_plate, magblock, use_gripper=USE_GRIPPER) + + def lib_amplification( + unused_lids: List[Labware], used_lids: List[Labware] + ) -> Tuple[List[Labware], List[Labware]]: + """Library Amplification.""" + ctx.comment("-------Starting lib_amplification-------") + + for i in range(num_cols): + + ctx.comment( + "**** Mixing and Transfering beads to column " + str(i + 1) + " ****" + ) + + tiptrack(tip50, None, reuse=False) + mix_beads( + p50, amplification_res, amplification_vol, 7 if i == 0 else 2, i + ) # 5 reps for first mix in reservoir + p50.flow_rate.dispense = 15 + p50.aspirate(amplification_vol, amplification_res) + p50.dispense(p50.current_volume, samples_2[i]) + p50.flow_rate.dispense = 150 + for x in range(10 if not dry_run else 1): + if x == 9: + p50.flow_rate.aspirate = 15 + p50.flow_rate.dispense = 15 + p50.aspirate(amplification_vol, samples_2[i].bottom(1)) + p50.dispense(p50.current_volume, samples_2[i].bottom(5)) + p50.flow_rate.aspirate = 150 + p50.flow_rate.dispense = 150 + if trash_tips: + p50.drop_tip() + ctx.comment("****Dropping Tip in Waste shoot****") + else: + p50.return_tip() + ctx.comment("****Dropping Tip Back in Tip Box****") + + unused_lids, used_lids = run_amplification_profile( + unused_lids, used_lids + ) # moves plate to TC --> TAG Profile --> removes plate from TC + return unused_lids, used_lids + + def lib_cleanup_2() -> None: + """Final Library Clean up.""" + ctx.comment("-------Starting Cleanup-------") + ctx.comment("-------Adding and Mixing Cleanup Beads-------") + for x, i in enumerate(samples_2): + tiptrack(tip200, None, reuse=False) + mix_beads(p200, bead_res, bead_vol_2, 7 if x == 0 else 2, x) + p200.aspirate(bead_vol_2, bead_res) + p200.dispense(bead_vol_2, i) + mix_beads(p200, i, bead_vol_2, 7 if not dry_run else 1, num_cols - 1) + for x in range(10 if not dry_run else 1): + if x == 9: + p200.flow_rate.aspirate = 75 + p200.flow_rate.dispense = 75 + p200.aspirate(bead_vol_2, i) + p200.dispense(bead_vol_2, i.bottom(8)) + p200.flow_rate.aspirate = 150 + p200.flow_rate.dispense = 150 + if trash_tips: + p200.drop_tip() + ctx.comment("****Dropping Tip in Waste shoot****") + else: + p200.return_tip() + ctx.comment("****Dropping Tip Back in Tip Box****") + + ctx.delay( + minutes=bead_inc, + msg="Please wait " + + str(bead_inc) + + " minutes while samples incubate at RT.", + ) + + ctx.comment("****Moving Labware to Magnet for Pelleting****") + ctx.move_labware(sample_plate_2, magblock, use_gripper=USE_GRIPPER) + + ctx.delay(minutes=4.5, msg="Time for Pelleting") + + for col, i in enumerate(samples_2): + remove_supernatant(i, 130, waste1_res, col) + samp_list_2 = samples_2 + # Wash 2 x with 80% Ethanol + + p200.flow_rate.aspirate = 75 + p200.flow_rate.dispense = 75 + for y in range(2 if not dry_run else 1): + ctx.comment(f"-------Wash # {y+1} with Ethanol-------") + if y == 0: # First wash + this_res = etoh1_res + this_waste_res = waste1_res + else: # Second Wash + this_res = etoh2_res + this_waste_res = waste2_res + if REUSE_ETOH_TIPS: + tiptrack(tip200, None, reuse=False) + for i in samp_list_2: + if not REUSE_ETOH_TIPS: + tiptrack(tip200, None, reuse=False) + p200.aspirate(150, this_res) + p200.air_gap(10) + p200.dispense(p200.current_volume, i.top()) + ctx.delay(seconds=1) + p200.air_gap(10) + if not REUSE_ETOH_TIPS: + p200.drop_tip() if trash_tips else p200.return_tip() + + ctx.delay(seconds=10) + # Remove the ethanol wash + for x, i in enumerate(samp_list_2): + if REUSE_ETOH_TIPS: + if x != 0: + tiptrack(tip200, None, reuse=False) + if not REUSE_ETOH_TIPS: + tiptrack(tip200, None, reuse=False) + p200.aspirate(155, i) + p200.air_gap(10) + p200.dispense(p200.current_volume, this_waste_res) + ctx.delay(seconds=1) + p200.air_gap(10) + if trash_tips: + p200.drop_tip() + ctx.comment("****Dropping Tip in Waste shoot****") + else: + p200.return_tip() + ctx.comment("****Dropping Tip Back in Tip Box****") + + p200.flow_rate.aspirate = 150 + p200.flow_rate.dispense = 150 + + # Washes Complete, Move on to Drying Steps + + ctx.delay(minutes=3, msg="Allow 3 minutes for residual ethanol to dry") + + ctx.comment("****Moving sample plate off of Magnet****") + ctx.move_labware(sample_plate_2, "D1", use_gripper=USE_GRIPPER) + + # Adding RSB and Mixing + + for col, i in enumerate(samp_list_2): + ctx.comment(f"****Adding RSB to Columns: {col+1}****") + tiptrack(tip50, col, reuse=True if REUSE_RSB_TIPS else False) + p50.aspirate(rsb_vol_2, rsb_res) + p50.air_gap(5) + p50.dispense(p50.current_volume, i) + for x in range(10 if not dry_run else 1): + if x == 9: + p50.flow_rate.aspirate = 15 + p50.flow_rate.dispense = 15 + p50.aspirate(15, i.bottom(1)) + p50.dispense(15, i.bottom(4)) + p50.flow_rate.aspirate = 100 + p50.flow_rate.dispense = 100 + p50.air_gap(5) + if REUSE_RSB_TIPS: + p50.return_tip() + ctx.comment("****Dropping Tip Back in Tip Box****") + else: + if trash_tips: + p50.drop_tip() + ctx.comment("****Dropping Tip in Waste shoot****") + else: + p50.return_tip() + ctx.comment("****Dropping Tip Back in Tip Box****") + + ctx.delay( + minutes=3, msg="Allow 3 minutes for incubation and liquid aggregation." + ) + + ctx.comment("****Move Samples to Magnet for Pelleting****") + ctx.move_labware(sample_plate_2, magblock, use_gripper=USE_GRIPPER) + + ctx.delay(minutes=2, msg="Please allow 2 minutes for beads to pellet.") + + p200.flow_rate.aspirate = 10 + for i_int, (s, e) in enumerate(zip(samp_list_2, samples_flp)): + tiptrack(tip50, i_int, reuse=True if REUSE_RSB_TIPS else False) + p50.aspirate(elution_vol_2, s) + p50.air_gap(5) + p50.dispense(p50.current_volume, e.bottom(1), push_out=3) + p50.air_gap(5) + if trash_tips: + p50.drop_tip() + ctx.comment("****Dropping Tip in Waste shoot****") + else: + p50.return_tip() + ctx.comment("****Dropping Tip Back in Tip Box****") + + # Set Block Temp for Final Plate + tc_mod.set_block_temperature(4) + + unused_lids, used_lids = Fragmentation(unused_lids, used_lids) + unused_lids, used_lids = end_repair(unused_lids, used_lids) + unused_lids, used_lids = index_ligation(unused_lids, used_lids) + lib_cleanup() + unused_lids, used_lids = lib_amplification(unused_lids, used_lids) + lib_cleanup_2() diff --git a/abr-testing/abr_testing/protocols/api 2.20/5_96ch complex protocol with single tip Pick Up.py b/abr-testing/abr_testing/protocols/api 2.20/5_96ch complex protocol with single tip Pick Up.py new file mode 100644 index 00000000000..53b68503202 --- /dev/null +++ b/abr-testing/abr_testing/protocols/api 2.20/5_96ch complex protocol with single tip Pick Up.py @@ -0,0 +1,424 @@ +"""96 ch Test Single Tip and Gripper Moves.""" +from opentrons.protocol_api import ( + ALL, + SINGLE, + ParameterContext, + ProtocolContext, + Labware, +) +from opentrons.protocol_api.module_contexts import ( + HeaterShakerContext, + MagneticBlockContext, + ThermocyclerContext, + TemperatureModuleContext, +) +from abr_testing.protocols import helpers +from typing import List +from opentrons.hardware_control.modules.types import ThermocyclerStep + +metadata = { + "protocolName": "96ch protocol with modules gripper moves and SINGLE tip pickup", + "author": "Derek Maggio ", +} + +requirements = { + "robotType": "OT-3", + "apiLevel": "2.20", +} + + +# prefer to move off deck, instead of waste chute disposal, if possible +PREFER_MOVE_OFF_DECK = False + + +PCR_PLATE_96_NAME = "armadillo_96_wellplate_200ul_pcr_full_skirt" +RESERVOIR_NAME = "nest_96_wellplate_2ml_deep" +TIPRACK_96_ADAPTER_NAME = "opentrons_flex_96_tiprack_adapter" +PIPETTE_96_CHANNEL_NAME = "flex_96channel_1000" + +USING_GRIPPER = True +RESET_AFTER_EACH_MOVE = True + + +def add_parameters(parameters: ParameterContext) -> None: + """Parameters.""" + helpers.create_tip_size_parameter(parameters) + helpers.create_dot_bottom_parameter(parameters) + helpers.create_disposable_lid_parameter(parameters) + + +def run(ctx: ProtocolContext) -> None: + """Protocol.""" + b = ctx.params.dot_bottom # type: ignore[attr-defined] + TIPRACK_96_NAME = ctx.params.tip_size # type: ignore[attr-defined] + disposable_lid = ctx.params.disposable_lid # type: ignore[attr-defined] + + waste_chute = ctx.load_waste_chute() + + thermocycler: ThermocyclerContext = ctx.load_module(helpers.tc_str) # type: ignore[assignment] + mag: MagneticBlockContext = ctx.load_module(helpers.mag_str, "A3") # type: ignore[assignment] + h_s: HeaterShakerContext = ctx.load_module(helpers.hs_str, "D1") # type: ignore[assignment] + temperature_module: TemperatureModuleContext = ctx.load_module( + helpers.temp_str, "C1" + ) # type: ignore[assignment] + if disposable_lid: + unused_lids = helpers.load_disposable_lids(ctx, 3, "A4") + used_lids: List[Labware] = [] + thermocycler.open_lid() + h_s.open_labware_latch() + + temperature_module_adapter = temperature_module.load_adapter( + helpers.temp_adapter_str + ) + h_s_adapter = h_s.load_adapter(helpers.hs_adapter_str) + + adapters = [temperature_module_adapter, h_s_adapter] + + source_reservoir = ctx.load_labware(RESERVOIR_NAME, "D2") + dest_pcr_plate = ctx.load_labware(PCR_PLATE_96_NAME, "C2") + + tip_rack_1 = ctx.load_labware( + TIPRACK_96_NAME, "A2", adapter=TIPRACK_96_ADAPTER_NAME + ) + tip_rack_adapter = tip_rack_1.parent + + tip_rack_2 = ctx.load_labware(TIPRACK_96_NAME, "C3") + tip_rack_3 = ctx.load_labware(TIPRACK_96_NAME, "C4") + + tip_racks = [ + tip_rack_1, + tip_rack_2, + tip_rack_3, + ] + + pipette_96_channel = ctx.load_instrument( + PIPETTE_96_CHANNEL_NAME, + mount="left", + tip_racks=tip_racks, + liquid_presence_detection=True, + ) + + water = ctx.define_liquid(name="water", description="H₂O", display_color="#42AB2D") + source_reservoir.wells_by_name()["A1"].load_liquid(liquid=water, volume=29000) + + def run_moves( + labware: Labware, move_sequences: List, reset_location: str, use_gripper: bool + ) -> None: + """Perform a series of moves for a given labware using specified move sequences. + + Will perform 2 versions of the moves: + 1.Moves to each location, resetting to the reset location after each move. + 2.Moves to each location, resetting to the reset location after all moves. + """ + + def move_to_locations( + labware_to_move: Labware, + move_locations: List, + reset_after_each_move: bool, + use_gripper: bool, + reset_location: str, + ) -> None: + """Move labware to specific destinations.""" + + def reset_labware() -> None: + """Reset the labware to the reset location.""" + ctx.move_labware( + labware_to_move, reset_location, use_gripper=use_gripper + ) + + if len(move_locations) == 0: + return + + for location in move_locations: + ctx.move_labware(labware_to_move, location, use_gripper=use_gripper) + + if reset_after_each_move: + reset_labware() + + if not reset_after_each_move: + reset_labware() + + for move_sequence in move_sequences: + move_to_locations( + labware, + move_sequence, + RESET_AFTER_EACH_MOVE, + use_gripper, + reset_location, + ) + move_to_locations( + labware, + move_sequence, + not RESET_AFTER_EACH_MOVE, + use_gripper, + reset_location, + ) + + def test_gripper_moves() -> None: + """Function to test the movement of the gripper in various locations.""" + + def deck_moves(labware: Labware, reset_location: str) -> None: + """Function to perform the movement of labware.""" + deck_move_sequence = [ + ["B2"], # Deck Moves + ["C3"], # Staging Area Slot 3 Moves + ["C4", "D4"], # Staging Area Slot 4 Moves + [ + thermocycler, + temperature_module_adapter, + h_s_adapter, + mag, + ], # Module Moves + ] + + run_moves(labware, deck_move_sequence, reset_location, USING_GRIPPER) + + def staging_area_slot_3_moves(labware: Labware, reset_location: str) -> None: + """Function to perform the movement of labware, starting w/ staging area slot 3.""" + staging_area_slot_3_move_sequence = [ + ["B2", "C2"], # Deck Moves + [], # Don't have Staging Area Slot 3 open + ["C4", "D4"], # Staging Area Slot 4 Moves + [ + thermocycler, + temperature_module_adapter, + h_s_adapter, + mag, + ], # Module Moves + ] + + run_moves( + labware, + staging_area_slot_3_move_sequence, + reset_location, + USING_GRIPPER, + ) + + def staging_area_slot_4_moves(labware: Labware, reset_location: str) -> None: + """Function to perform the movement of labware, starting with staging area slot 4.""" + staging_area_slot_4_move_sequence = [ + ["C2", "B2"], # Deck Moves + ["C3"], # Staging Area Slot 3 Moves + ["C4"], # Staging Area Slot 4 Moves + [ + thermocycler, + temperature_module_adapter, + h_s_adapter, + mag, + ], # Module Moves + ] + + run_moves( + labware, + staging_area_slot_4_move_sequence, + reset_location, + USING_GRIPPER, + ) + + def module_moves(labware: Labware, module_locations: List) -> None: + """Function to perform the movement of labware, starting on a module.""" + module_move_sequence = [ + ["C2", "B2"], # Deck Moves + ["C3"], # Staging Area Slot 3 Moves + ["C4", "D4"], # Staging Area Slot 4 Moves + ] + + for module_starting_location in module_locations: + labware_move_to_locations = module_locations.copy() + labware_move_to_locations.remove(module_starting_location) + all_sequences = module_move_sequence.copy() + all_sequences.append(labware_move_to_locations) + ctx.move_labware( + labware, module_starting_location, use_gripper=USING_GRIPPER + ) + run_moves( + labware, all_sequences, module_starting_location, USING_GRIPPER + ) + + DECK_MOVE_RESET_LOCATION = "C2" + STAGING_AREA_SLOT_3_RESET_LOCATION = "C3" + STAGING_AREA_SLOT_4_RESET_LOCATION = "D4" + + deck_moves(dest_pcr_plate, DECK_MOVE_RESET_LOCATION) + + ctx.move_labware( + dest_pcr_plate, + STAGING_AREA_SLOT_3_RESET_LOCATION, + use_gripper=USING_GRIPPER, + ) + staging_area_slot_3_moves(dest_pcr_plate, STAGING_AREA_SLOT_3_RESET_LOCATION) + + ctx.move_labware( + dest_pcr_plate, + STAGING_AREA_SLOT_4_RESET_LOCATION, + use_gripper=USING_GRIPPER, + ) + staging_area_slot_4_moves(dest_pcr_plate, STAGING_AREA_SLOT_4_RESET_LOCATION) + + module_locations = [thermocycler, mag] + adapters + module_moves(dest_pcr_plate, module_locations) + ctx.move_labware(dest_pcr_plate, thermocycler, use_gripper=USING_GRIPPER) + + def test_manual_moves() -> None: + """Test manual moves.""" + ctx.move_labware(source_reservoir, "D4", use_gripper=USING_GRIPPER) + + def test_pipetting() -> None: + """Test pipetting.""" + + def test_single_tip_pickup_usage() -> None: + """Test Single Tip Pick Up.""" + pipette_96_channel.configure_nozzle_layout(style=SINGLE, start="H12") + pipette_96_channel.liquid_presence_detection = True + tip_count = 0 # Tip counter to ensure proper tip usage + rows = ["A", "B", "C", "D", "E", "F", "G", "H"] # 8 rows + columns = range(1, 13) # 12 columns + for row in rows: + for col in columns: + well_position = f"{row}{col}" + pipette_96_channel.pick_up_tip(tip_rack_2) + + pipette_96_channel.aspirate(5, source_reservoir[well_position]) + pipette_96_channel.touch_tip() + + pipette_96_channel.dispense( + 5, dest_pcr_plate[well_position].bottom(b) + ) + pipette_96_channel.drop_tip() + tip_count += 1 + # leave this dropping in waste chute, do not use get_disposal_preference + # want to test partial drop + ctx.move_labware(tip_rack_2, waste_chute, use_gripper=USING_GRIPPER) + + def test_full_tip_rack_usage() -> None: + """Full Tip Pick Up.""" + pipette_96_channel.configure_nozzle_layout(style=ALL, start="A1") + pipette_96_channel.liquid_presence_detection = True + pipette_96_channel.pick_up_tip(tip_rack_1["A1"]) + + pipette_96_channel.aspirate(5, source_reservoir["A1"]) + pipette_96_channel.touch_tip() + + pipette_96_channel.liquid_presence_detection = False + pipette_96_channel.air_gap(height=30) + pipette_96_channel.blow_out(waste_chute) + + pipette_96_channel.aspirate(5, source_reservoir["A1"]) + pipette_96_channel.touch_tip() + + pipette_96_channel.air_gap(height=30) + pipette_96_channel.blow_out() + + pipette_96_channel.aspirate(10, source_reservoir["A1"]) + pipette_96_channel.touch_tip() + + pipette_96_channel.dispense(10, dest_pcr_plate["A1"].bottom(b)) + pipette_96_channel.mix(repetitions=5, volume=15) + pipette_96_channel.return_tip() + + ctx.move_labware(tip_rack_1, waste_chute, use_gripper=USING_GRIPPER) + ctx.move_labware(tip_rack_3, tip_rack_adapter, use_gripper=USING_GRIPPER) + + pipette_96_channel.pick_up_tip(tip_rack_3["A1"]) + pipette_96_channel.transfer( + volume=10, + source=source_reservoir["A1"], + dest=dest_pcr_plate["A1"], + new_tip="never", + touch_tip=True, + blow_out=True, + blowout_location="trash", + mix_before=(3, 5), + mix_after=(1, 5), + ) + pipette_96_channel.return_tip() + + ctx.move_labware(tip_rack_3, waste_chute, use_gripper=USING_GRIPPER) + + test_single_tip_pickup_usage() + test_full_tip_rack_usage() + + def test_module_usage(unused_lids: List[Labware], used_lids: List[Labware]) -> None: + """Test Module Use.""" + + def test_thermocycler( + unused_lids: List[Labware], used_lids: List[Labware] + ) -> None: + if disposable_lid: + ( + lid_on_plate, + unused_lids, + used_lids, + ) = helpers.use_disposable_lid_with_tc( + ctx, unused_lids, used_lids, dest_pcr_plate, thermocycler + ) + thermocycler.set_block_temperature(4) + thermocycler.set_lid_temperature(105) + # Close lid + thermocycler.close_lid() + # hold at 95° for 3 minutes + profile_TAG: List[ThermocyclerStep] = [ + {"temperature": 95, "hold_time_minutes": 3} + ] + thermocycler.execute_profile( + steps=profile_TAG, repetitions=1, block_max_volume=50 + ) + # 30x cycles of: 70° for 30s 72° for 30s 95° for 10s + profile_TAG2: List[ThermocyclerStep] = [ + {"temperature": 70, "hold_time_seconds": 30}, + {"temperature": 72, "hold_time_seconds": 30}, + {"temperature": 95, "hold_time_seconds": 10}, + ] + thermocycler.execute_profile( + steps=profile_TAG2, repetitions=30, block_max_volume=50 + ) + # hold at 72° for 5min + profile_TAG3: List[ThermocyclerStep] = [ + {"temperature": 72, "hold_time_minutes": 5} + ] + thermocycler.execute_profile( + steps=profile_TAG3, repetitions=1, block_max_volume=50 + ) + # # Cool to 4° + thermocycler.set_block_temperature(4) + thermocycler.set_lid_temperature(105) + # Open lid + thermocycler.open_lid() + if disposable_lid: + if len(used_lids) <= 1: + ctx.move_labware(lid_on_plate, "B3", use_gripper=True) + else: + ctx.move_labware(lid_on_plate, used_lids[-2], use_gripper=True) + thermocycler.deactivate() + + def test_h_s() -> None: + """Tests heatershaker.""" + h_s.open_labware_latch() + h_s.close_labware_latch() + + h_s.set_target_temperature(75.0) + h_s.set_and_wait_for_shake_speed(1000) + h_s.wait_for_temperature() + + h_s.deactivate_heater() + h_s.deactivate_shaker() + + def test_temperature_module() -> None: + """Tests temperature module.""" + temperature_module.set_temperature(80) + temperature_module.set_temperature(10) + temperature_module.deactivate() + + def test_mag() -> None: + """Tests magnetic block.""" + pass + + test_thermocycler(unused_lids, used_lids) + test_h_s() + test_temperature_module() + test_mag() + + test_pipetting() + test_gripper_moves() + test_module_usage(unused_lids, used_lids) + test_manual_moves() diff --git a/abr-testing/abr_testing/protocols/api 2.20/7_HDQ_DNA_Bacteria_Flex.py b/abr-testing/abr_testing/protocols/api 2.20/7_HDQ_DNA_Bacteria_Flex.py new file mode 100644 index 00000000000..7475eb11a34 --- /dev/null +++ b/abr-testing/abr_testing/protocols/api 2.20/7_HDQ_DNA_Bacteria_Flex.py @@ -0,0 +1,561 @@ +"""Omega HDQ DNA Extraction: Bacteria - Tissue Protocol.""" +from abr_testing.protocols import helpers +import math +from opentrons import types +from opentrons.protocol_api import ( + ProtocolContext, + Well, + ParameterContext, + InstrumentContext, +) +import numpy as np +from opentrons.protocol_api.module_contexts import ( + HeaterShakerContext, + TemperatureModuleContext, + MagneticBlockContext, +) +from typing import List, Union + +metadata = { + "author": "Zach Galluzzo ", + "protocolName": "Omega HDQ DNA Extraction: Bacteria- Tissue Protocol", +} + +requirements = { + "robotType": "OT-3", + "apiLevel": "2.20", +} +""" +Slot A1: Tips 1000 +Slot A2: Tips 1000 +Slot A3: Temperature module (gen2) with 96 well PCR block and Armadillo 96 well PCR Plate +Slot B1: Tips 1000 +Slot B2: +Slot B3: Nest 1 Well Reservoir +Slot C1: Magblock +Slot C2: +Slot C3: +Slot D1: H-S with Nest 96 Well Deep well and DW Adapter +Slot D2: Nest 12 well 15 ml Reservoir +Slot D3: Trash + +Reservoir 1: +Wells 1-2 - 9,900 ul +Well 3 - 14,310 ul +Wells 4-12 - 11,400 ul +""" + +whichwash = 1 +sample_max = 48 +tip1k = 0 +drop_count = 0 +waste_vol = 0 + + +def add_parameters(parameters: ParameterContext) -> None: + """Define Parameters.""" + helpers.create_pipette_mount_parameter(parameters) + helpers.create_hs_speed_parameter(parameters) + helpers.create_dot_bottom_parameter(parameters) + + +def run(ctx: ProtocolContext) -> None: + """Protocol.""" + heater_shaker_speed = ctx.params.heater_shaker_speed # type: ignore[attr-defined] + mount = ctx.params.pipette_mount # type: ignore[attr-defined] + dot_bottom = ctx.params.dot_bottom # type: ignore[attr-defined] + dry_run = False + TIP_TRASH = False + res_type = "nest_12_reservoir_22ml" + + num_samples = 8 + wash1_vol = 600 + wash2_vol = 600 + wash3_vol = 600 + AL_vol = 230 + sample_vol = 180 + bind_vol = 320 + elution_vol = 100 + + # Protocol Parameters + deepwell_type = "nest_96_wellplate_2ml_deep" + res_type = "nest_12_reservoir_15ml" + if not dry_run: + settling_time = 2.0 + A_lysis_time_1 = 15.0 + A_lysis_time_2 = 10.0 + bind_time = 10.0 + elute_wash_time = 5.0 + else: + settling_time = ( + elute_wash_time + ) = A_lysis_time_1 = A_lysis_time_2 = bind_time = 0.25 + PK_vol = bead_vol = 20 + AL_total_vol = AL_vol + PK_vol + starting_vol = AL_vol + sample_vol + binding_buffer_vol = bind_vol + bead_vol + + ctx.load_trash_bin("A3") + h_s: HeaterShakerContext = ctx.load_module(helpers.hs_str, "D1") # type: ignore[assignment] + h_s_adapter = h_s.load_adapter("opentrons_96_deep_well_adapter") + sample_plate = h_s_adapter.load_labware(deepwell_type, "Sample Plate") + h_s.close_labware_latch() + temp: TemperatureModuleContext = ctx.load_module( + helpers.temp_str, "D3" + ) # type: ignore[assignment] + temp_block = temp.load_adapter("opentrons_96_well_aluminum_block") + elutionplate = temp_block.load_labware( + "armadillo_96_wellplate_200ul_pcr_full_skirt", "Elution Plate" + ) + magnetic_block: MagneticBlockContext = ctx.load_module( + helpers.mag_str, "C1" + ) # type: ignore[assignment] + waste = ( + ctx.load_labware("nest_1_reservoir_195ml", "B3", "Liquid Waste") + .wells()[0] + .top() + ) + res1 = ctx.load_labware(res_type, "D2", "reagent reservoir 1") + num_cols = math.ceil(num_samples / 8) + + # Load tips and combine all similar boxes + tips1000 = ctx.load_labware("opentrons_flex_96_tiprack_1000ul", "A1", "Tips 1") + tips1001 = ctx.load_labware("opentrons_flex_96_tiprack_1000ul", "A2", "Tips 2") + tips1002 = ctx.load_labware("opentrons_flex_96_tiprack_1000ul", "B1", "Tips 3") + tips = [*tips1000.wells()[num_samples:96], *tips1001.wells(), *tips1002.wells()] + tips_sn = tips1000.wells()[:num_samples] + + # load instruments + m1000 = ctx.load_instrument("flex_8channel_1000", mount) + + """ + Here is where you can define the locations of your reagents. + """ + binding_buffer = res1.wells()[:2] + AL = res1.wells()[2] + wash1 = res1.wells()[3:6] + wash2 = res1.wells()[6:9] + wash3 = res1.wells()[9:] + + samples_m = sample_plate.rows()[0][:num_cols] + elution_samples_m = elutionplate.rows()[0][:num_cols] + + colors = helpers.liquid_colors + + # Begin with assigning plate wells before reservoir wells + samps = ctx.define_liquid( + name="Samples", description="Samples", display_color="#00FF00" + ) + elution_samps = ctx.define_liquid( + name="Elution Buffer", description="Elution Buffer", display_color="#FFA500" + ) + + for well_s in sample_plate.wells()[:num_samples]: + well_s.load_liquid(liquid=samps, volume=sample_vol) + + for well_e in elutionplate.wells()[:num_samples]: + well_e.load_liquid(liquid=elution_samps, volume=elution_vol) + + # Start defining reservoir wells + locations: List[Union[List[Well], Well]] = [ + AL, + AL, + binding_buffer, + binding_buffer, + wash1, + wash2, + wash3, + ] + vols = [AL_vol, PK_vol, bead_vol, bind_vol, wash1_vol, wash2_vol, wash3_vol] + liquids = ["AL Lysis", "PK", "Beads", "Binding", "Wash 1", "Wash 2", "Wash 3"] + + delete = len(colors) - len(liquids) + + if delete >= 1: + for i in range(delete): + colors.pop(-1) + + def add_liquid( + liq_type: str, wells: Union[Well, List[Well]], color: str, vol: float + ) -> None: + """Assigns colored liquid to wells based on type and location.""" + total_samples = math.ceil(num_samples / 8) * 8 + + # Calculate extra sample volume based on liquid type + extra_samples = math.ceil( + 1500 + / (AL_vol if liq_type == "PK" else bind_vol if liq_type == "Beads" else vol) + ) + + # Define liquid + liquid = ctx.define_liquid( + name=liq_type, description=liq_type, display_color=color + ) + + # Assign liquid to each well + if isinstance(wells, list): + samples_per_well = [sample_max // len(wells)] * ( + total_samples // (sample_max // len(wells)) + ) + remainder = total_samples % (sample_max // len(wells)) + + if remainder: + samples_per_well.append(remainder) + + for sample_count, well in zip(samples_per_well, wells): + well.load_liquid( + liquid=liquid, volume=vol * (sample_count + extra_samples) + ) + else: + wells.load_liquid( + liquid=liquid, volume=vol * (total_samples + extra_samples) + ) + + # Apply function for each liquid configuration + for liq, well, color, vol in zip(liquids, locations, colors, vols): + add_liquid(liq, well, color, vol) + + m1000.flow_rate.aspirate = 300 + m1000.flow_rate.dispense = 300 + m1000.flow_rate.blow_out = 300 + + def tiptrack(tipbox: List[Well]) -> None: + """Track Tips.""" + global tip1k + global drop_count + if tipbox == tips: + m1000.pick_up_tip(tipbox[int(tip1k)]) + tip1k = tip1k + 8 + drop_count = drop_count + 8 + if drop_count >= 150: + drop_count = 0 + ctx.pause("Empty Waste Bin.") + + def remove_supernatant(vol: float) -> None: + """Remove supernatants.""" + ctx.comment("-----Removing Supernatant-----") + m1000.flow_rate.aspirate = 150 + num_trans = math.ceil(vol / 980) + vol_per_trans = vol / num_trans + + for i, m in enumerate(samples_m): + m1000.pick_up_tip(tips_sn[8 * i]) + loc = m.bottom(dot_bottom) + for _ in range(num_trans): + if m1000.current_volume > 0: + # void air gap if necessary + m1000.dispense(m1000.current_volume, m.top()) + m1000.move_to(m.center()) + m1000.transfer(vol_per_trans, loc, waste, new_tip="never", air_gap=20) + m1000.blow_out(waste) + m1000.air_gap(20) + m1000.drop_tip(tips_sn[8 * i]) if TIP_TRASH else m1000.return_tip() + m1000.flow_rate.aspirate = 300 + helpers.move_labware_to_hs(ctx, sample_plate, h_s, h_s_adapter) + + def bead_mixing( + well: Well, pip: InstrumentContext, mvol: float, reps: int = 8 + ) -> None: + """Bead Mixing. + + 'mixing' will mix liquid that contains beads. This will be done by + aspirating from the bottom of the well and dispensing from the top as to + mix the beads with the other liquids as much as possible. Aspiration and + dispensing will also be reversed for a short to to ensure maximal mixing. + param well: The current well that the mixing will occur in. + param pip: The pipet that is currently attached/ being used. + param mvol: The volume that is transferred before the mixing steps. + param reps: The number of mix repetitions that should occur. Note~ + During each mix rep, there are 2 cycles of aspirating from bottom, + dispensing at the top and 2 cycles of aspirating from middle, + dispensing at the bottom + """ + center = well.top().move(types.Point(x=0, y=0, z=5)) + aspbot = well.bottom().move(types.Point(x=0, y=2, z=1)) + asptop = well.bottom().move(types.Point(x=0, y=-2, z=2.5)) + disbot = well.bottom().move(types.Point(x=0, y=1.5, z=3)) + distop = well.top().move(types.Point(x=0, y=1.5, z=0)) + + if mvol > 1000: + mvol = 1000 + + vol = mvol * 0.9 + + pip.flow_rate.aspirate = 500 + pip.flow_rate.dispense = 500 + + pip.move_to(center) + for _ in range(reps): + pip.aspirate(vol, aspbot) + pip.dispense(vol, distop) + pip.aspirate(vol, asptop) + pip.dispense(vol, disbot) + if _ == reps - 1: + pip.flow_rate.aspirate = 150 + pip.flow_rate.dispense = 100 + pip.aspirate(vol, aspbot) + pip.dispense(vol, aspbot) + + pip.flow_rate.aspirate = 300 + pip.flow_rate.dispense = 300 + + def mixing(well: Well, pip: InstrumentContext, mvol: float, reps: int = 8) -> None: + """Mixing. + + 'mixing' will mix liquid that contains beads. This will be done by + aspirating from the bottom of the well and dispensing from the top as to + mix the beads with the other liquids as much as possible. Aspiration and + dispensing will also be reversed for a short to to ensure maximal mixing. + param well: The current well that the mixing will occur in. + param pip: The pipet that is currently attached/ being used. + param mvol: The volume that is transferred before the mixing steps. + param reps: The number of mix repetitions that should occur. Note~ + During each mix rep, there are 2 cycles of aspirating from bottom, + dispensing at the top and 2 cycles of aspirating from middle, + dispensing at the bottom + """ + center = well.top(5) + asp = well.bottom(1) + disp = well.top(-8) + + if mvol > 1000: + mvol = 1000 + + vol = mvol * 0.9 + + pip.flow_rate.aspirate = 500 + pip.flow_rate.dispense = 500 + + pip.move_to(center) + for _ in range(reps): + pip.aspirate(vol, asp) + pip.dispense(vol, disp) + pip.aspirate(vol, asp) + pip.dispense(vol, disp) + if _ == reps - 1: + pip.flow_rate.aspirate = 150 + pip.flow_rate.dispense = 100 + pip.aspirate(vol, asp) + pip.dispense(vol, asp) + + pip.flow_rate.aspirate = 300 + pip.flow_rate.dispense = 300 + + def A_lysis(vol: float, source: Well) -> None: + """A Lysis.""" + ctx.comment("-----Mixing then transferring AL buffer-----") + num_transfers = math.ceil(vol / 980) + tiptrack(tips) + for i in range(num_cols): + if num_cols >= 5: + if i == 0: + height = 10 + else: + height = 1 + else: + height = 1 + src = source + tvol = vol / num_transfers + for t in range(num_transfers): + if i == 0 and t == 0: + for _ in range(3): + m1000.require_liquid_presence(src) + m1000.aspirate(tvol, src.bottom(1)) + m1000.dispense(tvol, src.bottom(4)) + m1000.require_liquid_presence(src) + m1000.aspirate(tvol, src.bottom(height)) + m1000.air_gap(10) + m1000.dispense(m1000.current_volume, samples_m[i].top()) + m1000.air_gap(20) + + for i in range(num_cols): + if i != 0: + tiptrack(tips) + mixing( + samples_m[i], m1000, tvol - 40, reps=10 if not dry_run else 1 + ) # vol is 250 AL + 180 sample + m1000.air_gap(20) + m1000.drop_tip() if TIP_TRASH else m1000.return_tip() + + ctx.comment("-----Mixing then Heating AL and Sample-----") + + helpers.set_hs_speed(ctx, h_s, heater_shaker_speed, A_lysis_time_1, False) + if not dry_run: + h_s.set_and_wait_for_temperature(55) + ctx.delay( + minutes=A_lysis_time_2, + msg="Incubating at 55C " + + str(heater_shaker_speed) + + " rpm for 10 minutes.", + ) + h_s.deactivate_shaker() + + def bind(vol: float) -> None: + """Bind. + + `bind` will perform magnetic bead binding on each sample in the + deepwell plate. Each channel of binding beads will be mixed before + transfer, and the samples will be mixed with the binding beads after + the transfer. The magnetic deck activates after the addition to all + samples, and the supernatant is removed after bead bining. + :param vol (float): The amount of volume to aspirate from the elution + buffer source and dispense to each well containing + beads. + :param park (boolean): Whether to save sample-corresponding tips + between adding elution buffer and transferring + supernatant to the final clean elutions PCR + plate. + """ + ctx.comment("-----Beginning Bind Steps-----") + tiptrack(tips) + for i, well in enumerate(samples_m): + num_trans = math.ceil(vol / 980) + vol_per_trans = vol / num_trans + source = binding_buffer[i // 3] + if i == 0: + reps = 6 if not dry_run else 1 + else: + reps = 1 + ctx.comment("-----Mixing Beads in Reservoir-----") + bead_mixing(source, m1000, vol_per_trans, reps=reps if not dry_run else 1) + # Transfer beads and binding from source to H-S plate + for t in range(num_trans): + if m1000.current_volume > 0: + # void air gap if necessary + m1000.dispense(m1000.current_volume, source.top()) + m1000.transfer( + vol_per_trans, source, well.top(), air_gap=20, new_tip="never" + ) + if t < num_trans - 1: + m1000.air_gap(20) + + ctx.comment("-----Mixing Beads in Plate-----") + for i in range(num_cols): + if i != 0: + tiptrack(tips) + mixing( + samples_m[i], m1000, vol + starting_vol, reps=10 if not dry_run else 1 + ) + m1000.drop_tip() if TIP_TRASH else m1000.return_tip() + + ctx.comment("-----Incubating Beads and Bind on H-S-----") + + speed_val = heater_shaker_speed * 0.9 + helpers.set_hs_speed(ctx, h_s, speed_val, bind_time, True) + + # Transfer from H-S plate to Magdeck plate + helpers.move_labware_from_hs_to_mag_block( + ctx, sample_plate, h_s, magnetic_block + ) + for bindi in np.arange( + settling_time + 1, 0, -0.5 + ): # Settling time delay with countdown timer + ctx.delay( + minutes=0.5, + msg="There are " + str(bindi) + " minutes left in the incubation.", + ) + + # remove initial supernatant + remove_supernatant(vol + starting_vol) + + def wash(vol: float, source: List[Well]) -> None: + """Wash function.""" + global whichwash # Defines which wash the protocol is on to log on the app + + if source == wash1: + whichwash = 1 + if source == wash2: + whichwash = 2 + if source == wash3: + whichwash = 3 + + ctx.comment("-----Beginning Wash #" + str(whichwash) + "-----") + + num_trans = math.ceil(vol / 980) + vol_per_trans = vol / num_trans + tiptrack(tips) + for i, m in enumerate(samples_m): + src = source[i // 2] + for n in range(num_trans): + if m1000.current_volume > 0: + m1000.dispense(m1000.current_volume, src.top()) + m1000.transfer(vol_per_trans, src, m.top(), air_gap=20, new_tip="never") + m1000.drop_tip() if TIP_TRASH else m1000.return_tip() + + helpers.set_hs_speed(ctx, h_s, heater_shaker_speed, elute_wash_time, True) + + helpers.move_labware_from_hs_to_mag_block( + ctx, sample_plate, h_s, magnetic_block + ) + + for washi in np.arange( + settling_time, 0, -0.5 + ): # settling time timer for washes + ctx.delay( + minutes=0.5, + msg="There are " + + str(washi) + + " minutes left in wash " + + str(whichwash) + + " incubation.", + ) + + remove_supernatant(vol) + + def elute(vol: float) -> None: + """Elution Function.""" + ctx.comment("-----Beginning Elution Steps-----") + tiptrack(tips) + for i, (m, e) in enumerate(zip(samples_m, elution_samples_m)): + m1000.flow_rate.aspirate = 25 + m1000.aspirate(vol, e.bottom(dot_bottom)) + m1000.air_gap(20) + m1000.dispense(m1000.current_volume, m.top()) + m1000.flow_rate.aspirate = 150 + m1000.drop_tip() if TIP_TRASH else m1000.return_tip() + + h_s.set_and_wait_for_shake_speed(heater_shaker_speed * 1.1) + speed_val = heater_shaker_speed * 1.1 + helpers.set_hs_speed(ctx, h_s, speed_val, elute_wash_time, True) + + # Transfer back to magnet + helpers.move_labware_from_hs_to_mag_block( + ctx, sample_plate, h_s, magnetic_block + ) + + for elutei in np.arange(settling_time, 0, -0.5): + ctx.delay( + minutes=0.5, + msg="Incubating on MagDeck for " + str(elutei) + " more minutes.", + ) + + for i, (m, e) in enumerate(zip(samples_m, elution_samples_m)): + tiptrack(tips) + m1000.flow_rate.dispense = 100 + m1000.flow_rate.aspirate = 150 + m1000.transfer( + vol, m.bottom(dot_bottom), e.bottom(5), air_gap=20, new_tip="never" + ) + m1000.blow_out(e.top(-2)) + m1000.air_gap(20) + m1000.drop_tip() if TIP_TRASH else m1000.return_tip() + + """ + Here is where you can call the methods defined above to fit your specific + protocol. The normal sequence is: + """ + A_lysis(AL_total_vol, AL) + bind(binding_buffer_vol) + wash(wash1_vol, wash1) + wash(wash2_vol, wash2) + wash(wash3_vol, wash3) + if not dry_run: + drybeads = 10.0 # Number of minutes you want to dry for + else: + drybeads = 0.5 + for beaddry in np.arange(drybeads, 0, -0.5): + ctx.delay( + minutes=0.5, + msg="There are " + str(beaddry) + " minutes left in the drying step.", + ) + elute(elution_vol) diff --git a/abr-testing/abr_testing/protocols/api 2.20/9_Magmax_RNA_Cells_Flex_csv.py b/abr-testing/abr_testing/protocols/api 2.20/9_Magmax_RNA_Cells_Flex_csv.py new file mode 100644 index 00000000000..f4c94305bea --- /dev/null +++ b/abr-testing/abr_testing/protocols/api 2.20/9_Magmax_RNA_Cells_Flex_csv.py @@ -0,0 +1,587 @@ +"""Thermo MagMax RNA Extraction: Cells Multi-Channel.""" +import math +from opentrons import types +from opentrons.protocol_api import ( + ProtocolContext, + ParameterContext, + Well, + InstrumentContext, +) +from typing import List +from opentrons.protocol_api.module_contexts import ( + HeaterShakerContext, + MagneticBlockContext, + TemperatureModuleContext, +) + +import numpy as np +from abr_testing.protocols import helpers + +metadata = { + "author": "Zach Galluzzo ", + "protocolName": "Thermo MagMax RNA Extraction: Cells Multi-Channel", +} + +requirements = { + "robotType": "OT-3", + "apiLevel": "2.20", +} +""" +Slot A1: Tips 200 +Slot A2: Tips 200 +Slot A3: Temperature module (gen2) with 96 well PCR block and Armadillo 96 well PCR Plate +** Plate gets 55 ul per well in each well of the entire plate +Slot B1: Tips 200 +Slot B2: Tips 200 +Slot B3: Nest 1 Well Reservoir +Slot C1: Magblock +Slot C2: +Slot C3: +Slot D1: H-S with Nest 96 Well Deepwell and DW Adapter +Slot D2: Nest 12 well 15 ml Reservoir +Slot D3: Trash + +Reservoir 1: +Well 1 - 8120 ul +Well 2 - 6400 ul +Well 3-7 - 8550 ul +""" + +whichwash = 1 +sample_max = 48 +tip = 0 +drop_count = 0 +waste_vol = 0 + + +# Start protocol +def add_parameters(parameters: ParameterContext) -> None: + """Parameters.""" + parameters.add_csv_file( + variable_name="parameters_csv", + display_name="Parameters CSV File", + description="CSV file containing parameters for this protocol", + ) + + +def run(ctx: ProtocolContext) -> None: + """Protocol.""" + dry_run = False + inc_lysis = True + mount = "left" + res_type = "nest_12_reservoir_15ml" + TIP_TRASH = False + num_samples = 48 + wash_vol = 150 + lysis_vol = 140 + stop_vol = 100 + elution_vol = dnase_vol = 50 + # default="\protocols\csv_parameters\9_parameters.csv", + csv_params = ctx.params.parameters_csv.parse_as_csv() # type: ignore[attr-defined] + heater_shaker_speed = int(csv_params[1][0]) + dot_bottom = csv_params[1][1] + + # Protocol Parameters + deepwell_type = "nest_96_wellplate_2ml_deep" + if not dry_run: + settling_time = 2.0 + lysis_time = 1.0 + drybeads = 2.0 # Number of minutes you want to dry for + bind_time = wash_time = 5.0 + dnase_time = 10.0 + stop_time = elute_time = 3.0 + else: + settling_time = 0.25 + lysis_time = 0.25 + drybeads = elute_time = 0.25 + bind_time = wash_time = dnase_time = stop_time = 0.25 + bead_vol = 20 + ctx.load_trash_bin("A3") + h_s: HeaterShakerContext = ctx.load_module(helpers.hs_str, "D1") # type: ignore[assignment] + h_s_adapter = h_s.load_adapter("opentrons_96_deep_well_adapter") + sample_plate = h_s_adapter.load_labware(deepwell_type, "Sample Plate") + h_s.close_labware_latch() + temp: TemperatureModuleContext = ctx.load_module( + helpers.temp_str, "D3" + ) # type: ignore[assignment] + temp_block = temp.load_adapter("opentrons_96_well_aluminum_block") + elutionplate = temp_block.load_labware( + "armadilo_96_wellplate_200ul_pcr_full_skirt", "Elution Plate" + ) + temp.set_temperature(4) + magblock: MagneticBlockContext = ctx.load_module( + helpers.mag_str, "C1" + ) # type: ignore[assignment] + waste = ( + ctx.load_labware("nest_1_reservoir_195ml", "B3", "Liquid Waste") + .wells()[0] + .top() + ) + res1 = ctx.load_labware(res_type, "D2", "reagent reservoir 1") + num_cols = math.ceil(num_samples / 8) + + # Load tips and combine all similar boxes + tips200 = ctx.load_labware("opentrons_flex_96_tiprack_200ul", "A1", "Tips 1") + tips201 = ctx.load_labware("opentrons_flex_96_tiprack_200ul", "A2", "Tips 2") + tips202 = ctx.load_labware("opentrons_flex_96_tiprack_200ul", "B1", "Tips 3") + tips203 = ctx.load_labware("opentrons_flex_96_tiprack_200ul", "B2", "Tips 4") + tips = [ + *tips200.wells()[num_samples:96], + *tips201.wells(), + *tips202.wells(), + *tips203.wells(), + ] + tips_sn = tips200.wells()[:num_samples] + + # load P1000M pipette + m1000 = ctx.load_instrument("flex_8channel_1000", mount) + + # Load Liquid Locations in Reservoir + elution_solution = elutionplate.rows()[0][:num_cols] + dnase1 = elutionplate.rows()[0][num_cols : 2 * num_cols] + lysis_ = res1.wells()[0] + stopreaction = res1.wells()[1] + wash1 = res1.wells()[2] + wash2 = res1.wells()[3] + wash3 = res1.wells()[4] + wash4 = res1.wells()[5] + wash5 = res1.wells()[6] + + """ + Here is where you can define the locations of your reagents. + """ + samples_m = sample_plate.rows()[0][:num_cols] # 20ul beads each well + cells_m = sample_plate.rows()[0][num_cols : 2 * num_cols] + elution_samples_m = elutionplate.rows()[0][:num_cols] + # Do the same for color mapping + beads_ = sample_plate.wells()[: (8 * num_cols)] + cells_ = sample_plate.wells()[(8 * num_cols) : (16 * num_cols)] + elution_samps = elutionplate.wells()[: (8 * num_cols)] + dnase1_ = elutionplate.wells()[(8 * num_cols) : (16 * num_cols)] + + colors = helpers.liquid_colors + + locations = [lysis_, wash1, wash2, wash3, wash4, wash5, stopreaction] + vols = [lysis_vol, wash_vol, wash_vol, wash_vol, wash_vol, wash_vol, stop_vol] + liquids = ["Lysis", "Wash 1", "Wash 2", "Wash 3", "Wash 4", "Wash 5", "Stop"] + + dnase_liq = ctx.define_liquid( + name="DNAse", description="DNAse", display_color="#C0C0C0" + ) + eluate = ctx.define_liquid( + name="Elution Buffer", description="Elution Buffer", display_color="#00FF00" + ) + bead = ctx.define_liquid(name="Beads", description="Beads", display_color="#FFA500") + sample = ctx.define_liquid( + name="Sample", description="Cell Pellet", display_color="#FFC0CB" + ) + + # Add liquids to non-reservoir labware + for i in beads_: + i.load_liquid(liquid=bead, volume=bead_vol) + for i in cells_: + i.load_liquid(liquid=sample, volume=0) + for i in dnase1_: + i.load_liquid(liquid=dnase_liq, volume=dnase_vol) + for i in elution_samps: + i.load_liquid(liquid=eluate, volume=elution_vol) + + delete = len(colors) - len(liquids) + + if delete >= 1: + for color_del in range(delete): + colors.pop(-1) + + def liquids_(liq: str, location: Well, color: str, vol: float) -> None: + """Define Liquids.""" + sampnum = 8 * (math.ceil(num_samples / 8)) + # Volume Calculation + extra_samples = math.ceil(1500 / vol) + + v = vol * (sampnum + extra_samples) + loaded_liq = ctx.define_liquid(name=liq, description=liq, display_color=color) + location.load_liquid(liquid=loaded_liq, volume=v) + + for x, (ll, l, c, v) in enumerate(zip(liquids, locations, colors, vols)): + liquids_(ll, l, c, v) + + m1000.flow_rate.aspirate = 50 + m1000.flow_rate.dispense = 150 + m1000.flow_rate.blow_out = 300 + + def tiptrack(pip: InstrumentContext, tipbox: List[Well]) -> None: + """Tip Track.""" + global tip + global drop_count + pip.pick_up_tip(tipbox[int(tip)]) + tip = tip + 8 + drop_count = drop_count + 8 + if drop_count >= 250: + drop_count = 0 + if TIP_TRASH: + ctx.pause("Empty Trash bin.") + + def remove_supernatant(vol: float) -> None: + """Remove Supernatant.""" + ctx.comment("-----Removing Supernatant-----") + m1000.flow_rate.aspirate = 30 + num_trans = math.ceil(vol / 180) + vol_per_trans = vol / num_trans + + for i, m in enumerate(samples_m): + m1000.pick_up_tip(tips_sn[8 * i]) + loc = m.bottom(dot_bottom) + for _ in range(num_trans): + if m1000.current_volume > 0: + # void air gap if necessary + m1000.dispense(m1000.current_volume, m.top()) + m1000.move_to(m.center()) + m1000.transfer(vol_per_trans, loc, waste, new_tip="never", air_gap=20) + m1000.blow_out(waste) + m1000.air_gap(20) + m1000.drop_tip(tips_sn[8 * i]) if TIP_TRASH else m1000.return_tip() + m1000.flow_rate.aspirate = 300 + # Move Plate From Magnet to H-S + helpers.move_labware_to_hs(ctx, sample_plate, h_s, h_s_adapter) + + def bead_mixing( + well: Well, pip: InstrumentContext, mvol: float, reps: int = 8 + ) -> None: + """Bead Mixing. + + 'mixing' will mix liquid that contains beads. This will be done by + aspirating from the bottom of the well and dispensing from the top as to + mix the beads with the other liquids as much as possible. Aspiration and + dispensing will also be reversed for a short to to ensure maximal mixing. + param well: The current well that the mixing will occur in. + param pip: The pipet that is currently attached/ being used. + param mvol: The volume that is transferred before the mixing steps. + param reps: The number of mix repetitions that should occur. Note~ + During each mix rep, there are 2 cycles of aspirating from bottom, + dispensing at the top and 2 cycles of aspirating from middle, + dispensing at the bottom + """ + center = well.top().move(types.Point(x=0, y=0, z=5)) + aspbot = well.bottom().move(types.Point(x=0, y=0, z=1)) + asptop = well.bottom().move(types.Point(x=2, y=-2, z=1)) + disbot = well.bottom().move(types.Point(x=-2, y=1.5, z=2)) + distop = well.bottom().move(types.Point(x=0, y=0, z=6)) + + if mvol > 1000: + mvol = 1000 + + vol = mvol * 0.9 + + pip.flow_rate.aspirate = 500 + pip.flow_rate.dispense = 500 + + pip.move_to(center) + for _ in range(reps): + pip.aspirate(vol, aspbot) + pip.dispense(vol, distop) + pip.aspirate(vol, asptop) + pip.dispense(vol, disbot) + if _ == reps - 1: + pip.flow_rate.aspirate = 100 + pip.flow_rate.dispense = 75 + pip.aspirate(vol, aspbot) + pip.dispense(vol, aspbot) + + pip.flow_rate.aspirate = 300 + pip.flow_rate.dispense = 300 + + def mixing(well: Well, pip: InstrumentContext, mvol: float, reps: int = 8) -> None: + """Mixing. + + 'mixing' will mix liquid that contains beads. This will be done by + aspirating from the bottom of the well and dispensing from the top as to + mix the beads with the other liquids as much as possible. Aspiration and + dispensing will also be reversed for a short to to ensure maximal mixing. + param well: The current well that the mixing will occur in. + param pip: The pipet that is currently attached/ being used. + param mvol: The volume that is transferred before the mixing steps. + param reps: The number of mix repetitions that should occur. Note~ + During each mix rep, there are 2 cycles of aspirating from bottom, + dispensing at the top and 2 cycles of aspirating from middle, + dispensing at the bottom + """ + center = well.top(5) + asp = well.bottom(dot_bottom) + disp = well.top(-8) + + if mvol > 1000: + mvol = 1000 + + vol = mvol * 0.9 + + pip.flow_rate.aspirate = 500 + pip.flow_rate.dispense = 500 + + pip.move_to(center) + for _ in range(reps): + pip.aspirate(vol, asp) + pip.dispense(vol, disp) + pip.aspirate(vol, asp) + pip.dispense(vol, disp) + if _ == reps - 1: + pip.flow_rate.aspirate = 100 + pip.flow_rate.dispense = 75 + pip.aspirate(vol, asp) + pip.dispense(vol, asp) + + pip.flow_rate.aspirate = 300 + pip.flow_rate.dispense = 300 + + def lysis(vol: float, source: Well) -> None: + """Lysis Steps.""" + ctx.comment("-----Beginning lysis steps-----") + num_transfers = math.ceil(vol / 180) + tiptrack(m1000, tips) + for i in range(num_cols): + src = source + tvol = vol / num_transfers + for t in range(num_transfers): + m1000.require_liquid_presence(src) + m1000.aspirate(tvol, src.bottom(1)) + m1000.dispense(m1000.current_volume, cells_m[i].top(-3)) + + # mix after adding all reagent to wells with cells + for i in range(num_cols): + if i != 0: + tiptrack(m1000, tips) + for x in range(8 if not dry_run else 1): + m1000.aspirate(tvol * 0.75, cells_m[i].bottom(dot_bottom)) + m1000.dispense(tvol * 0.75, cells_m[i].bottom(8)) + if x == 3: + ctx.delay(minutes=0.0167) + m1000.blow_out(cells_m[i].bottom(1)) + m1000.drop_tip() if TIP_TRASH else m1000.return_tip() + + helpers.set_hs_speed(ctx, h_s, heater_shaker_speed, lysis_time, True) + + def bind() -> None: + """Bind. + + `bind` will perform magnetic bead binding on each sample in the + deepwell plate. Each channel of binding beads will be mixed before + transfer, and the samples will be mixed with the binding beads after + the transfer. The magnetic deck activates after the addition to all + samples, and the supernatant is removed after bead binding. + :param vol (float): The amount of volume to aspirate from the elution + buffer source and dispense to each well containing + beads. + :param park (boolean): Whether to save sample-corresponding tips + between adding elution buffer and transferring + supernatant to the final clean elutions PCR + plate. + """ + ctx.comment("-----Beginning bind steps-----") + for i, well in enumerate(samples_m): + # Transfer cells+lysis/bind to wells with beads + tiptrack(m1000, tips) + m1000.aspirate(185, cells_m[i].bottom(dot_bottom)) + m1000.air_gap(10) + m1000.dispense(m1000.current_volume, well.bottom(8)) + # Mix after transfer + bead_mixing(well, m1000, 130, reps=5 if not dry_run else 1) + m1000.air_gap(10) + m1000.drop_tip() if TIP_TRASH else m1000.return_tip() + helpers.set_hs_speed(ctx, h_s, heater_shaker_speed, bind_time, True) + + # Transfer from H-S plate to Magdeck plate + helpers.move_labware_from_hs_to_mag_block(ctx, sample_plate, h_s, magblock) + + for bindi in np.arange( + settling_time, 0, -0.5 + ): # Settling time delay with countdown timer + ctx.delay( + minutes=0.5, + msg="There are " + str(bindi) + " minutes left in the incubation.", + ) + + # remove initial supernatant + remove_supernatant(180) + + def wash(vol: float, source: Well) -> None: + """Wash Function.""" + global whichwash # Defines which wash the protocol is on to log on the app + + if source == wash1: + whichwash = 1 + if source == wash2: + whichwash = 2 + if source == wash3: + whichwash = 3 + if source == wash4: + whichwash = 4 + + ctx.comment("-----Now starting Wash #" + str(whichwash) + "-----") + + tiptrack(m1000, tips) + num_trans = math.ceil(vol / 180) + vol_per_trans = vol / num_trans + for i, m in enumerate(samples_m): + src = source + m1000.require_liquid_presence(src) + for n in range(num_trans): + m1000.aspirate(vol_per_trans, src) + m1000.air_gap(10) + m1000.dispense(m1000.current_volume, m.top(-2)) + ctx.delay(seconds=2) + m1000.blow_out(m.top(-2)) + m1000.air_gap(10) + m1000.drop_tip() if TIP_TRASH else m1000.return_tip() + + # Shake for 5 minutes to mix wash with beads + helpers.set_hs_speed(ctx, h_s, heater_shaker_speed, wash_time, True) + + # Transfer from H-S plate to Magdeck plate + helpers.move_labware_from_hs_to_mag_block(ctx, sample_plate, h_s, magblock) + + for washi in np.arange( + settling_time, 0, -0.5 + ): # settling time timer for washes + ctx.delay( + minutes=0.5, + msg="There are " + + str(washi) + + " minutes left in wash " + + str(whichwash) + + " incubation.", + ) + + remove_supernatant(vol) + + def dnase(vol: float, source: List[Well]) -> None: + """Steps for DNAseI.""" + ctx.comment("-----DNAseI Steps Beginning-----") + num_trans = math.ceil(vol / 180) + vol_per_trans = vol / num_trans + tiptrack(m1000, tips) + for i, m in enumerate(samples_m): + src = source[i] + m1000.flow_rate.aspirate = 10 + for n in range(num_trans): + if m1000.current_volume > 0: + m1000.dispense(m1000.current_volume, src.top()) + m1000.aspirate(vol_per_trans, src.bottom(dot_bottom)) + m1000.dispense(vol_per_trans, m.top(-3)) + m1000.blow_out(m.top(-3)) + m1000.air_gap(20) + + m1000.flow_rate.aspirate = 300 + + # Is this mixing needed? \/\/\/ + for i in range(num_cols): + if i != 0: + tiptrack(m1000, tips) + mixing(samples_m[i], m1000, 45, reps=5 if not dry_run else 1) + m1000.drop_tip() if TIP_TRASH else m1000.return_tip() + + # Shake for 10 minutes to mix DNAseI + helpers.set_hs_speed(ctx, h_s, heater_shaker_speed, dnase_time, True) + + def stop_reaction(vol: float, source: Well) -> None: + """Adding stop solution.""" + ctx.comment("-----Adding Stop Solution-----") + tiptrack(m1000, tips) + num_trans = math.ceil(vol / 180) + vol_per_trans = vol / num_trans + for i, m in enumerate(samples_m): + src = source + for n in range(num_trans): + if m1000.current_volume > 0: + m1000.dispense(m1000.current_volume, src.top()) + m1000.transfer(vol_per_trans, src, m.top(), air_gap=20, new_tip="never") + m1000.blow_out(m.top(-3)) + m1000.air_gap(20) + + m1000.drop_tip() if TIP_TRASH else m1000.return_tip() + + # Shake for 3 minutes to mix wash with beads + helpers.set_hs_speed(ctx, h_s, heater_shaker_speed, stop_time, True) + + # Transfer from H-S plate to Magdeck plate + helpers.move_labware_from_hs_to_mag_block(ctx, sample_plate, h_s, magblock) + + for stop in np.arange(settling_time, 0, -0.5): + ctx.delay( + minutes=0.5, + msg="There are " + str(stop) + " minutes left in this incubation.", + ) + + remove_supernatant(vol + 50) + + def elute(vol: float) -> None: + """Elution.""" + ctx.comment("-----Elution Beginning-----") + tiptrack(m1000, tips) + m1000.flow_rate.aspirate = 10 + for i, m in enumerate(samples_m): + loc = m.top(-2) + m1000.aspirate(vol, elution_solution[i]) + m1000.air_gap(10) + m1000.dispense(m1000.current_volume, loc) + m1000.blow_out(m.top(-3)) + m1000.air_gap(10) + + m1000.flow_rate.aspirate = 300 + + # Is this mixing needed? \/\/\/ + for i in range(num_cols): + if i != 0: + tiptrack(m1000, tips) + for mixes in range(10): + m1000.aspirate(elution_vol - 10, samples_m[i]) + m1000.dispense(elution_vol - 10, samples_m[i].bottom(10)) + if mixes == 9: + m1000.flow_rate.dispense = 20 + m1000.aspirate(elution_vol - 10, samples_m[i]) + m1000.dispense(elution_vol - 10, samples_m[i].bottom(10)) + m1000.flow_rate.dispense = 300 + m1000.drop_tip() if TIP_TRASH else m1000.return_tip() + + # Shake for 3 minutes to mix wash with beads + helpers.set_hs_speed(ctx, h_s, heater_shaker_speed, elute_time, True) + + # Transfer from H-S plate to Magdeck plate + helpers.move_labware_from_hs_to_mag_block(ctx, sample_plate, h_s, magblock) + + for elutei in np.arange(settling_time, 0, -0.5): + ctx.delay( + minutes=0.5, + msg="Incubating on MagDeck for " + str(elutei) + " more minutes.", + ) + + ctx.comment("-----Trasnferring Sample to Elution Plate-----") + for i, (m, e) in enumerate(zip(samples_m, elution_samples_m)): + tiptrack(m1000, tips) + loc = m.bottom(dot_bottom) + m1000.transfer(vol, loc, e.bottom(5), air_gap=20, new_tip="never") + m1000.blow_out(e.top(-2)) + m1000.air_gap(20) + m1000.drop_tip() if TIP_TRASH else m1000.return_tip() + + """ + Here is where you can call the methods defined above to fit your specific + protocol. The normal sequence is: + """ + if inc_lysis: + lysis(lysis_vol, lysis_) + bind() + wash(wash_vol, wash1) + wash(wash_vol, wash2) + # dnase1 treatment + dnase(dnase_vol, dnase1) + stop_reaction(stop_vol, stopreaction) + # Resume washes + wash(wash_vol, wash3) + wash(wash_vol, wash4) + wash(wash_vol, wash5) + + for beaddry in np.arange(drybeads, 0, -0.5): + ctx.delay( + minutes=0.5, + msg="There are " + str(beaddry) + " minutes left in the drying step.", + ) + elute(elution_vol) diff --git a/abr-testing/abr_testing/protocols/csv_parameters/9_parameters.csv b/abr-testing/abr_testing/protocols/csv_parameters/9_parameters.csv new file mode 100644 index 00000000000..a5f947d97d5 --- /dev/null +++ b/abr-testing/abr_testing/protocols/csv_parameters/9_parameters.csv @@ -0,0 +1,2 @@ +heater_shaker_speed, dot_bottom +2000, 0.1 \ No newline at end of file diff --git a/abr-testing/abr_testing/protocols/helpers.py b/abr-testing/abr_testing/protocols/helpers.py index d7392a8c67e..5032cda33ac 100644 --- a/abr-testing/abr_testing/protocols/helpers.py +++ b/abr-testing/abr_testing/protocols/helpers.py @@ -1,7 +1,19 @@ """Helper functions commonly used in protocols.""" -from opentrons.protocol_api import ProtocolContext, Labware, InstrumentContext +from opentrons.protocol_api import ( + ProtocolContext, + Labware, + InstrumentContext, + ParameterContext, +) from typing import Tuple +from opentrons.protocol_api.module_contexts import ( + HeaterShakerContext, + MagneticBlockContext, + ThermocyclerContext, +) + +from typing import List def load_common_liquid_setup_labware_and_instruments( @@ -17,3 +29,182 @@ def load_common_liquid_setup_labware_and_instruments( # Source_reservoir source_reservoir = protocol.load_labware("axygen_1_reservoir_90ml", "C2") return source_reservoir, p1000 + + +def create_pipette_mount_parameter(parameters: ParameterContext) -> None: + """Create parameter to specify pipette mount.""" + parameters.add_str( + variable_name="pipette_mount", + display_name="Pipette Mount", + choices=[ + {"display_name": "Left", "value": "left"}, + {"display_name": "Right", "value": "right"}, + ], + default="left", + ) + + +def create_disposable_lid_parameter(parameters: ParameterContext) -> None: + """Create parameter to use/not use disposable lid.""" + parameters.add_bool( + variable_name="disposable_lid", + display_name="Disposable Lid", + description="True means use lid.", + default=True, + ) + + +def create_tip_size_parameter(parameters: ParameterContext) -> None: + """Create parameter for tip size.""" + parameters.add_str( + variable_name="tip_size", + display_name="Tip Size", + description="Set Tip Size", + choices=[ + {"display_name": "50 uL", "value": "opentrons_flex_96_tiprack_50ul"}, + {"display_name": "200 µL", "value": "opentrons_flex_96_tiprack_200ul"}, + {"display_name": "1000 µL", "value": "opentrons_flex_96_tiprack_1000ul"}, + ], + default="opentrons_flex_96_tiprack_1000ul", + ) + + +def create_dot_bottom_parameter(parameters: ParameterContext) -> None: + """Create parameter for dot bottom value.""" + parameters.add_float( + variable_name="dot_bottom", + display_name=".bottom", + description="Lowest value pipette will go to.", + default=0.5, + choices=[ + {"display_name": "0.0", "value": 0.0}, + {"display_name": "0.1", "value": 0.1}, + {"display_name": "0.2", "value": 0.2}, + {"display_name": "0.3", "value": 0.3}, + {"display_name": "0.4", "value": 0.4}, + {"display_name": "0.5", "value": 0.5}, + {"display_name": "0.6", "value": 0.6}, + {"display_name": "0.7", "value": 0.7}, + {"display_name": "0.8", "value": 0.8}, + {"display_name": "0.9", "value": 0.9}, + {"display_name": "1.0", "value": 1.0}, + ], + ) + + +def create_hs_speed_parameter(parameters: ParameterContext) -> None: + """Create parameter for max heatershaker speed.""" + parameters.add_int( + variable_name="heater_shaker_speed", + display_name="Heater Shaker Shake Speed", + description="Speed to set the heater shaker to", + default=2000, + minimum=200, + maximum=3000, + unit="rpm", + ) + + +def move_labware_from_hs_to_mag_block( + protocol: ProtocolContext, + labware_to_move: Labware, + hs: HeaterShakerContext, + mag_block: MagneticBlockContext, +) -> None: + """Move labware from heatershaker to magnetic block.""" + hs.open_labware_latch() + protocol.move_labware(labware_to_move, mag_block, use_gripper=True) + hs.close_labware_latch() + + +def move_labware_to_hs( + protocol: ProtocolContext, + labware_to_move: Labware, + hs: HeaterShakerContext, + hs_adapter: Labware, +) -> None: + """Move labware to heatershaker.""" + hs.open_labware_latch() + protocol.move_labware(labware_to_move, hs_adapter, use_gripper=True) + hs.close_labware_latch() + + +def set_hs_speed( + protocol: ProtocolContext, + hs: HeaterShakerContext, + hs_speed: int, + time_min: float, + deactivate: bool, +) -> None: + """Set heatershaker for a speed and duration.""" + hs.set_and_wait_for_shake_speed(hs_speed) + protocol.delay( + minutes=time_min, + msg=f"Shake at {hs_speed} rpm for {time_min} minutes.", + ) + if deactivate: + hs.deactivate_shaker() + + +def load_disposable_lids( + protocol: ProtocolContext, num_of_lids: int, deck_slot: str +) -> List[Labware]: + """Load Stack of Disposable lids.""" + unused_lids = [ + protocol.load_labware("opentrons_tough_pcr_auto_sealing_lid", deck_slot) + ] + for i in range(num_of_lids - 1): + unused_lids.append( + unused_lids[-1].load_labware("opentrons_tough_pcr_auto_sealing_lid") + ) + unused_lids.reverse() + return unused_lids + + +def use_disposable_lid_with_tc( + protocol: ProtocolContext, + unused_lids: List[Labware], + used_lids: List[Labware], + plate_in_thermocycler: Labware, + thermocycler: ThermocyclerContext, +) -> Tuple[Labware, List[Labware], List[Labware]]: + """Use disposable lid with thermocycler.""" + lid_on_plate = unused_lids[0] + protocol.move_labware(lid_on_plate, plate_in_thermocycler, use_gripper=True) + # Remove lid from the list + unused_lids.pop(0) + used_lids.append(lid_on_plate) + thermocycler.close_lid() + return lid_on_plate, unused_lids, used_lids + + +# CONSTANTS + +liquid_colors = [ + "#008000", + "#008000", + "#A52A2A", + "#A52A2A", + "#00FFFF", + "#0000FF", + "#800080", + "#ADD8E6", + "#FF0000", + "#FFFF00", + "#FF00FF", + "#00008B", + "#7FFFD4", + "#FFC0CB", + "#FFA500", + "#00FF00", + "#C0C0C0", +] + +hs_adapter_str = "opentrons_96_pcr_adapter" +hs_str = "heaterShakerModuleV1" +mag_str = "magneticBlockV1" +temp_adapter_str = "opentrons_96_well_aluminum_block" +temp_str = "temperature module gen2" +tc_str = "thermocycler module gen2" + +# TODO: Create dictionary of labware, module, and adapter.