Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🍌 add default executor and isolate prefect #213

Merged
merged 6 commits into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ There is one key exception to the rules above -- and that is with `MAJOR`=0 rele
- add NPT and MatProj molecular dynamics workflows
- add SCAN workflows for static energy and relaxation
- test files can be provided within zip files, fixing excessive line counts on git commits
- add simmate worker that can run "out-of-box" and requires no set up

**Refactors**
- to simplify the creation of new workflows, `S3Task` is now `S3Workflow` and database tables are dynamically determined using the workflow name
- workflows of a given type (e.g. relaxation or static-energy) now share a database tables in order to simplify overall database architecture
- migrate from `os.path` to `pathlib.Path` throughout package
- isolate prefect use to separate executors

**Fixes**
- None
Expand Down
37 changes: 22 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ command: mpirun -n 8 vasp_std > vasp.out
from simmate.workflows.relaxation import Relaxation__Vasp__Matproj as workflow

state = workflow.run(structure="NaCl.cif")
result = workflow.result()
result = state.result()
```


Expand Down Expand Up @@ -180,21 +180,28 @@ structure.add_oxidation_state_by_guess()
```


4. _**Ease of Scalability.**_ At the beginning of a project, you may want to write and run code on a single computer and single core. But as you run into some intense calculations, you may want to use all of your CPU and GPU to run calculations. At the extreme, some projects require thousands of computers across numerous locations, including university clusters (using SLURM or PBS) and cloud computing (using Kubernetes and Docker). Simmate can meet all of these needs thanks to integration with [Dask](https://github.com/dask/dask) and [Prefect](https://github.com/PrefectHQ/prefect):
4. _**Ease of Scalability.**_ At the beginning of a project, you may want to write and run code on a single computer and single core. But as you run into some intense calculations, you may want to use all of your CPU and GPU to run calculations. At the extreme, some projects require thousands of computers across numerous locations, including university clusters (using SLURM or PBS) and cloud computing (using Kubernetes and Docker). Simmate can meet all of these needs thanks to integration with a custom `SimmateExecutor` (the default), [Dask](https://github.com/dask/dask), and/or [Prefect](https://github.com/PrefectHQ/prefect):
```python
# To run the tasks of a single workflow in parallel, use Dask.
from prefect.task_runners import DaskTaskRunner
workflow.task_runner = DaskTaskRunner()
state = workflow.run(...)

# To run many workflows in parallel, use Prefect.
# Once you configure Prefect, you simply switch
# from using "run" to "run_cloud"
prefect_flow_run_id = workflow.run_cloud(...)

# You can use different combinations of these two parallelization strategies as well!
# Using Prefect and Dask, we can scale out accross various computer resources
# with a few lines of code.
# on your local computer, schedule your workflow run
state = workflow.run_cloud(...)

# Note, your workflow won't run locally.
# While you wait for it run, you can check it's status
state.is_done()
state.is_running()

# or cancel the job before it runs
state.cancel()

# or wait until the job completes and grab the result!
# The job won't run until you start a worker (see command below)
result = state.result()
```

``` bash
# In a separate terminal or even on a remote HPC cluster, you
# can start a worker that will start running any scheduled jobs
simmate workflow-engine start-worker
```


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pymatgen.analysis.structure_matcher import StructureMatcher

from simmate.toolkit import Structure
from simmate.workflow_engine import task, Workflow
from simmate.workflow_engine import Workflow
from simmate.database.third_parties import MatprojStructure
from simmate.calculators.vasp.workflows.static_energy.matproj import (
StaticEnergy__Vasp__Matproj,
Expand Down Expand Up @@ -52,7 +52,7 @@ def run_config(
directory=prebadelf_result["directory"],
).result()

save_badelf_results(badelf_result, prebadelf_result["prefect_flow_run_id"])
save_badelf_results(badelf_result, prebadelf_result["run_id"])


# -----------------------------------------------------------------------------
Expand Down Expand Up @@ -83,7 +83,6 @@ class PopulationAnalysis__Vasp__PrebadelfMatproj(StaticEnergy__Vasp__Matproj):
)


@task
def get_structure_w_empties(
structure,
empty_ion_template,
Expand Down Expand Up @@ -155,16 +154,15 @@ def get_structure_w_empties(


# THIS IS A COPY/PASTE FROM THE BADER WORKFLOW -- I need to condense these
@task
def save_badelf_results(bader_result, prefect_flow_run_id):
def save_badelf_results(bader_result, run_id):
# load the results. We are particullary after the first result with
# is a pandas dataframe of oxidation states.
oxidation_data, extra_data = bader_result["result"]

# load the calculation entry for this workflow run. This should already
# exist thanks to the load_input_and_register task of the prebader workflow
calculation = PopulationAnalysis__Bader__Badelf.database_table.from_prefect_context(
prefect_flow_run_id,
calculation = PopulationAnalysis__Bader__Badelf.database_table.from_run_context(
run_id,
PopulationAnalysis__Vasp__PrebadelfMatproj.name_full,
)
# BUG: can't use context to grab the id because workflow tasks generate a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,17 @@ def run_config(
return bader_result

@classmethod
def _save_to_database(cls, bader_result):
def _save_to_database(cls, bader_result, run_id):
# load the results. We are particullary after the first result with
# is a pandas dataframe of oxidation states.
oxidation_data, extra_data = bader_result["result"]

# load the calculation entry for this workflow run. This should already
# exist thanks to the load_input_and_register task of the prebader workflow
calculation = cls.database_table.from_prefect_context()
calculation = cls.database_table.from_run_context(
run_id=run_id,
workflow_name=cls.name_full,
)
# BUG: can't use context to grab the id because workflow tasks generate a
# different id than the main workflow

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@
Diffusion__Vasp__NebAllPathsMit,
)


# @pytest.mark.prefect_db
@pytest.mark.slow
@pytest.mark.prefect_db
@pytest.mark.django_db
def test_neb(sample_structures, tmp_path, mocker):

Expand Down
19 changes: 19 additions & 0 deletions src/simmate/command_line/test/test_worker_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-

import pytest

from simmate.command_line.workflow_engine import workflow_engine


@pytest.mark.django_db
def test_database_dump_and_load(command_line_runner):

# dump the database to json
result = command_line_runner.invoke(workflow_engine, ["start-singleflow-worker"])
assert result.exit_code == 0

# load the database to json
result = command_line_runner.invoke(
workflow_engine, ["start-worker", "-n", "1", "-e", "true"]
)
assert result.exit_code == 0
18 changes: 13 additions & 5 deletions src/simmate/command_line/test/test_workflows_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@

import yaml

from prefect.states import Completed

from simmate.calculators.vasp.inputs import Potcar
from simmate.workflow_engine import Workflow
from simmate.workflow_engine.workflow import DummyState

from simmate.command_line.workflows import workflows
from simmate.conftest import make_dummy_files
Expand Down Expand Up @@ -107,8 +106,11 @@ def test_workflows_run(command_line_runner, structure, mocker, tmp_path):
mocker.patch.object(
Workflow,
"run",
return_value=Completed(),
return_value=DummyState(None),
)
# the code above can be modified for a prefect executor
# from prefect.states import Completed
# return_value=Completed(),

# now try writing input files to the tmp_path
result = command_line_runner.invoke(
Expand Down Expand Up @@ -149,8 +151,11 @@ def test_workflows_run_cloud(command_line_runner, structure, mocker, tmp_path):
mocker.patch.object(
Workflow,
"run_cloud",
return_value=Completed(),
return_value=DummyState(None),
)
# the code above can be modified for a prefect executor
# from prefect.states import Completed
# return_value=Completed(),

# now try writing input files to the tmp_path
result = command_line_runner.invoke(
Expand Down Expand Up @@ -194,8 +199,11 @@ def test_workflows_run_yaml(command_line_runner, structure, mocker, tmp_path):
mocker.patch.object(
Workflow,
"run",
return_value=Completed(),
return_value=DummyState(None),
)
# the code above can be modified for a prefect executor
# from prefect.states import Completed
# return_value=Completed(),

# now try writing input files to the tmp_path
result = command_line_runner.invoke(
Expand Down
Loading