Skip to content

Commit

Permalink
Add multi-replicate ABFE workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
fjclark committed Apr 24, 2024
1 parent 034a245 commit 39d520b
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 34 deletions.
72 changes: 51 additions & 21 deletions maize/graphs/exs/biosimspace/abfe.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,19 @@

from .system_preparation import SystemPreparationBound, SystemPreparationFree

__all__ = ["AbsoluteBindingFreeEnergy", "abfe_no_prep_workflow", "abfe_with_prep_workflow"]
__all__ = [
"AbsoluteBindingFreeEnergySingle",
"AbsoluteBindingFreeEnergyMulti",
"abfe_no_prep_workflow",
"abfe_with_prep_workflow",
]


class AbsoluteBindingFreeEnergy(Graph):
class AbsoluteBindingFreeEnergySingle(Graph):
"""
A class for running a single absolute binding free energy calculation
using SOMD through BioSimSpace. This requires unparameterised
input structures, and performs setup and execution of the
using SOMD through BioSimSpace. This requires parameterised and equilibrated
input structures, and performs short production simulations followed by
ABFE calculations.
"""

Expand Down Expand Up @@ -236,20 +241,45 @@ def build(self) -> None:
self.out = self.map_port(collect_results.out)


########################
class AbsoluteBindingFreeEnergyMulti(Graph):
"""
A class for running multiple repeat absolute binding free energy calculations
using SOMD through BioSimSpace. This requires unparameterised
input structures and performs short production simulations followed by
ABFE calculations.
"""

# flow = Workflow(name="balance")
# load = flow.add(LoadData, parameters={"data": ["a", "b", "c"]})
n_repeats: Parameter[int] = Parameter(default=5)
"""The number of repeat calculations to perform."""

# # Decomposes our list into items and sends them separately
# scatter = flow.add(Scatter[str])
def build(self) -> None:
# We need a node to accumulate the results
accumulate_results = self.add(
Accumulate[AFEResult],
name="AccumulateAFEResults",
parameters={"n_packets": self.n_repeats.value},
)

# Add all the repeat ABFE nodes
abfe_subgraph = self.add(
parallel(
AbsoluteBindingFreeEnergySingle,
n_branches=self.n_repeats.value,
inputs=[],
constant_inputs=["inp_bound", "inp_free"],
outputs=["out"],
)
)
# Connect the outputs to the accumulator
self.connect(abfe_subgraph.out, accumulate_results.inp)

# # Apply our macro
# worker_subgraph = flow.add(parallel(Delay[str], n_branches=3))
# Map the parameters from the abfe subgraph
self.map(*abfe_subgraph.parameters.values())

# # Accumulate multiple items into one list
# accu = flow.add(Accumulate[str], parameters={"n_packets": 3})
###################################
# Map all the inputs to graph inputs
self.inp_free = self.map_port(abfe_subgraph.inp_free, name="inp_free")
self.inp_bound = self.map_port(abfe_subgraph.inp_bound, name="inp_bound")
self.out = self.map_port(accumulate_results.out, name="out")


def get_abfe_no_prep_workflow() -> Workflow:
Expand All @@ -258,16 +288,16 @@ def get_abfe_no_prep_workflow() -> Workflow:
parameterised and equilibrated input systems.
"""

flow = Workflow(name="absolute_binding_free_energy_no_prep", cleanup_temp=False, level="debug")
flow = Workflow(name="absolute_binding_free_energy_no_prep")

# TODO: Figure out how to loop this
abfe_calc = flow.add(AbsoluteBindingFreeEnergy, name="AbsoluteBindingFreeEnergy")
abfe_calc = flow.add(AbsoluteBindingFreeEnergyMulti, name="AbsoluteBindingFreeEnergy")
save_results = flow.add(SaveAFEResults, name="SaveAFEResults")

# Connect the nodes/ subgraphs
flow.connect(abfe_calc.out, save_results.inp)

# Map the inputs/ parameters
flow.map(abfe_calc.n_repeats)
flow.combine_parameters(abfe_calc.inp_bound, name="inp_bound")
flow.combine_parameters(abfe_calc.inp_free, name="inp_free")
flow.map(*abfe_calc.parameters.values())
Expand All @@ -278,23 +308,23 @@ def get_abfe_no_prep_workflow() -> Workflow:
return flow


abfe_no_prep_exposed = expose(get_abfe_no_prep_workflow)
# abfe_no_prep_exposed = expose(get_abfe_no_prep_workflow)
abfe_no_prep_exposed = get_abfe_no_prep_workflow


def get_abfe_with_prep_workflow() -> Workflow:
"""
A workflow which takes prepared but unparameterised input structures and
runs 1) system preparation, 2) ABFE calculations.
"""
flow = Workflow(name="absolute_binding_free_energy", level="debug")
flow = Workflow(name="absolute_binding_free_energy")

# Run system preparation for each leg
sys_prep_free = flow.add(SystemPreparationFree, name="SystemPreparationFree")
sys_prep_bound = flow.add(SystemPreparationBound, name="SystemPreparationBound")

# Run repeats of ABFE calculations
# TODO: Figure out how to do this
abfe_calc = flow.add(AbsoluteBindingFreeEnergy, name="AbsoluteBindingFreeEnergy")
abfe_calc = flow.add(AbsoluteBindingFreeEnergyMulti, name="AbsoluteBindingFreeEnergy")

# Save the ABFE results
save_results = flow.add(SaveAFEResults, name="SaveAFEResults")
Expand Down
4 changes: 3 additions & 1 deletion maize/steps/exs/biosimspace/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,13 @@ def _save_output(
)

for out in self.out:
# Get unique format to avoid files being overwritten
file_base = f"bss_system_{time.strftime('%Y-%m-%d_%H-%M-%S')}"
out.send(
[
Path(f)
for f in BSS.IO.saveMolecules(
"bss_system",
file_base,
system,
fileformat=["prm7", "rst7"],
# Throw away velocity information to avoid
Expand Down
27 changes: 15 additions & 12 deletions maize/steps/exs/biosimspace/afe.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,27 +299,30 @@ def __radd__(self, other: "AFEResult") -> "AFEResult":
class SaveAFEResults(Node):
"""Save an AFE result object to a CSV. The PMF and overlap information is discarded."""

inp: Input[AFEResult] = Input()
"""An alchemical free energy result."""
inp: Input[AFEResult | list[AFEResult]] = Input()
"""An alchemical free energy result, or a list of results."""

file: FileParameter[Annotated[Path, Suffix("csv")]] = FileParameter(
exist_required=False, default=Path("afe_results.csv")
)
"""Output CSV location"""

def run(self) -> None:
result = self.inp.receive()
results = self.inp.receive()
# Convert non-lists into lists
results = [results] if not isinstance(results, list) else results
with open(self.file.filepath, "a") as out:
writer = csv.writer(out, delimiter=",", quoting=csv.QUOTE_MINIMAL)
# Only write header if it's empty
if not self.file.filepath.exists() or self.file.filepath.stat().st_size == 0:
writer.writerow(["smiles", "repeat_no", "dg", "error"])
# Get the repeat number by checking if there are already lines with the current smiles
# present
with open(self.file.filepath, "r") as f:
lines = f.readlines()
repeat_no = len([line for line in lines if result.smiles in line]) + 1
writer.writerow([result.smiles, repeat_no, result.dg, result.error])
for result in results:
# Only write header if it's empty
if not self.file.filepath.exists() or self.file.filepath.stat().st_size == 0:
writer.writerow(["smiles", "repeat_no", "dg", "error"])
# Get the repeat number by checking if there are already lines with the current smiles
# present
with open(self.file.filepath, "r") as f:
lines = f.readlines()
repeat_no = len([line for line in lines if result.smiles in line]) + 1
writer.writerow([result.smiles, repeat_no, result.dg, result.error])


class CollectAFEResults(Node):
Expand Down

0 comments on commit 39d520b

Please sign in to comment.