diff --git a/maize/graphs/exs/biosimspace/abfe.py b/maize/graphs/exs/biosimspace/abfe.py index 04566ab..979b7c3 100644 --- a/maize/graphs/exs/biosimspace/abfe.py +++ b/maize/graphs/exs/biosimspace/abfe.py @@ -30,14 +30,19 @@ from .system_preparation import SystemPreparationBound, SystemPreparationFree -__all__ = ["AbsoluteBindingFreeEnergy", "abfe_no_prep_workflow", "abfe_with_prep_workflow"] +__all__ = [ + "AbsoluteBindingFreeEnergySingle", + "AbsoluteBindingFreeEnergyMulti", + "abfe_no_prep_workflow", + "abfe_with_prep_workflow", +] -class AbsoluteBindingFreeEnergy(Graph): +class AbsoluteBindingFreeEnergySingle(Graph): """ A class for running a single absolute binding free energy calculation - using SOMD through BioSimSpace. This requires unparameterised - input structures, and performs setup and execution of the + using SOMD through BioSimSpace. This requires parameterised and equilibrated + input structures, and performs short production simulations followed by ABFE calculations. """ @@ -236,20 +241,45 @@ def build(self) -> None: self.out = self.map_port(collect_results.out) -######################## +class AbsoluteBindingFreeEnergyMulti(Graph): + """ + A class for running multiple repeat absolute binding free energy calculations + using SOMD through BioSimSpace. This requires unparameterised + input structures and performs short production simulations followed by + ABFE calculations. + """ -# flow = Workflow(name="balance") -# load = flow.add(LoadData, parameters={"data": ["a", "b", "c"]}) + n_repeats: Parameter[int] = Parameter(default=5) + """The number of repeat calculations to perform.""" -# # Decomposes our list into items and sends them separately -# scatter = flow.add(Scatter[str]) + def build(self) -> None: + # We need a node to accumulate the results + accumulate_results = self.add( + Accumulate[AFEResult], + name="AccumulateAFEResults", + parameters={"n_packets": self.n_repeats.value}, + ) + + # Add all the repeat ABFE nodes + abfe_subgraph = self.add( + parallel( + AbsoluteBindingFreeEnergySingle, + n_branches=self.n_repeats.value, + inputs=[], + constant_inputs=["inp_bound", "inp_free"], + outputs=["out"], + ) + ) + # Connect the outputs to the accumulator + self.connect(abfe_subgraph.out, accumulate_results.inp) -# # Apply our macro -# worker_subgraph = flow.add(parallel(Delay[str], n_branches=3)) + # Map the parameters from the abfe subgraph + self.map(*abfe_subgraph.parameters.values()) -# # Accumulate multiple items into one list -# accu = flow.add(Accumulate[str], parameters={"n_packets": 3}) -################################### + # Map all the inputs to graph inputs + self.inp_free = self.map_port(abfe_subgraph.inp_free, name="inp_free") + self.inp_bound = self.map_port(abfe_subgraph.inp_bound, name="inp_bound") + self.out = self.map_port(accumulate_results.out, name="out") def get_abfe_no_prep_workflow() -> Workflow: @@ -258,16 +288,16 @@ def get_abfe_no_prep_workflow() -> Workflow: parameterised and equilibrated input systems. """ - flow = Workflow(name="absolute_binding_free_energy_no_prep", cleanup_temp=False, level="debug") + flow = Workflow(name="absolute_binding_free_energy_no_prep") - # TODO: Figure out how to loop this - abfe_calc = flow.add(AbsoluteBindingFreeEnergy, name="AbsoluteBindingFreeEnergy") + abfe_calc = flow.add(AbsoluteBindingFreeEnergyMulti, name="AbsoluteBindingFreeEnergy") save_results = flow.add(SaveAFEResults, name="SaveAFEResults") # Connect the nodes/ subgraphs flow.connect(abfe_calc.out, save_results.inp) # Map the inputs/ parameters + flow.map(abfe_calc.n_repeats) flow.combine_parameters(abfe_calc.inp_bound, name="inp_bound") flow.combine_parameters(abfe_calc.inp_free, name="inp_free") flow.map(*abfe_calc.parameters.values()) @@ -278,7 +308,8 @@ def get_abfe_no_prep_workflow() -> Workflow: return flow -abfe_no_prep_exposed = expose(get_abfe_no_prep_workflow) +# abfe_no_prep_exposed = expose(get_abfe_no_prep_workflow) +abfe_no_prep_exposed = get_abfe_no_prep_workflow def get_abfe_with_prep_workflow() -> Workflow: @@ -286,15 +317,14 @@ def get_abfe_with_prep_workflow() -> Workflow: A workflow which takes prepared but unparameterised input structures and runs 1) system preparation, 2) ABFE calculations. """ - flow = Workflow(name="absolute_binding_free_energy", level="debug") + flow = Workflow(name="absolute_binding_free_energy") # Run system preparation for each leg sys_prep_free = flow.add(SystemPreparationFree, name="SystemPreparationFree") sys_prep_bound = flow.add(SystemPreparationBound, name="SystemPreparationBound") # Run repeats of ABFE calculations - # TODO: Figure out how to do this - abfe_calc = flow.add(AbsoluteBindingFreeEnergy, name="AbsoluteBindingFreeEnergy") + abfe_calc = flow.add(AbsoluteBindingFreeEnergyMulti, name="AbsoluteBindingFreeEnergy") # Save the ABFE results save_results = flow.add(SaveAFEResults, name="SaveAFEResults") diff --git a/maize/steps/exs/biosimspace/_base.py b/maize/steps/exs/biosimspace/_base.py index 12e51ba..694915b 100644 --- a/maize/steps/exs/biosimspace/_base.py +++ b/maize/steps/exs/biosimspace/_base.py @@ -151,11 +151,13 @@ def _save_output( ) for out in self.out: + # Get unique format to avoid files being overwritten + file_base = f"bss_system_{time.strftime('%Y-%m-%d_%H-%M-%S')}" out.send( [ Path(f) for f in BSS.IO.saveMolecules( - "bss_system", + file_base, system, fileformat=["prm7", "rst7"], # Throw away velocity information to avoid diff --git a/maize/steps/exs/biosimspace/afe.py b/maize/steps/exs/biosimspace/afe.py index 61c0f57..6dbe854 100644 --- a/maize/steps/exs/biosimspace/afe.py +++ b/maize/steps/exs/biosimspace/afe.py @@ -299,8 +299,8 @@ def __radd__(self, other: "AFEResult") -> "AFEResult": class SaveAFEResults(Node): """Save an AFE result object to a CSV. The PMF and overlap information is discarded.""" - inp: Input[AFEResult] = Input() - """An alchemical free energy result.""" + inp: Input[AFEResult | list[AFEResult]] = Input() + """An alchemical free energy result, or a list of results.""" file: FileParameter[Annotated[Path, Suffix("csv")]] = FileParameter( exist_required=False, default=Path("afe_results.csv") @@ -308,18 +308,21 @@ class SaveAFEResults(Node): """Output CSV location""" def run(self) -> None: - result = self.inp.receive() + results = self.inp.receive() + # Convert non-lists into lists + results = [results] if not isinstance(results, list) else results with open(self.file.filepath, "a") as out: writer = csv.writer(out, delimiter=",", quoting=csv.QUOTE_MINIMAL) - # Only write header if it's empty - if not self.file.filepath.exists() or self.file.filepath.stat().st_size == 0: - writer.writerow(["smiles", "repeat_no", "dg", "error"]) - # Get the repeat number by checking if there are already lines with the current smiles - # present - with open(self.file.filepath, "r") as f: - lines = f.readlines() - repeat_no = len([line for line in lines if result.smiles in line]) + 1 - writer.writerow([result.smiles, repeat_no, result.dg, result.error]) + for result in results: + # Only write header if it's empty + if not self.file.filepath.exists() or self.file.filepath.stat().st_size == 0: + writer.writerow(["smiles", "repeat_no", "dg", "error"]) + # Get the repeat number by checking if there are already lines with the current smiles + # present + with open(self.file.filepath, "r") as f: + lines = f.readlines() + repeat_no = len([line for line in lines if result.smiles in line]) + 1 + writer.writerow([result.smiles, repeat_no, result.dg, result.error]) class CollectAFEResults(Node):