Skip to content

Commit

Permalink
Add support for the Dask workflow engine (#1323)
Browse files Browse the repository at this point in the history
Closes #1321.
  • Loading branch information
Andrew-S-Rosen authored Dec 9, 2023
1 parent fe3d917 commit e0351ba
Show file tree
Hide file tree
Showing 19 changed files with 688 additions and 92 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ jobs:
fail-fast: true
matrix:
python-version: ["3.11"]
wflow_engine: [parsl, redun, jobflow]
wflow_engine: [dask, parsl, redun, jobflow]

defaults:
run:
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [0.4.3]

### Added

- Added preliminary support for the `Dask` workflow engine via Dask Delayed and Dask Distributed.

### Changed

- Renamed `CREATE_UNIQUE_WORKDIR` to `CREATE_UNIQUE_DIR` to better reflect its utility.
Expand Down
17 changes: 17 additions & 0 deletions docs/user/basics/wflow_overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ Everyone's computing needs are different, so we ensured that quacc is interopera
- It requires a centralized server to be running continuously in order to manage the workflows
- High-security HPC environments may be difficult to access via SSH with the centralized server approach

=== "Dask"

[Dask](https://www.dask.org/) is a popular parallel computing library for Python. We use [Dask Delayed](https://docs.dask.org/en/stable/delayed.html) for lazy function execution, [Dask Distributed](https://distributed.dask.org/en/stable/) for distributed compute, and [Dask-Jobqueue](https://jobqueue.dask.org/en/latest/) for orchestrating the execution on HPC machines.

Pros:

- Extremely popular
- Has significant support for running on HPC resources
- It does not involve a centralized server or network connectivity
- Supports the pilot job model and advanced queuing schemes via Dask Jobqueue

Cons:

- Retrieving the results of a workflow is not as straightforward as other solutions
- Monitoring job progress is more challenging and less detailed than other solutions
- The documentation, while comprehensive, can be difficult to follow given the various Dask components

=== "Redun"

[Redun](https://insitro.github.io/redun/) is a flexible workflow management program developed by [Insitro](https://insitro.com/).
Expand Down
63 changes: 63 additions & 0 deletions docs/user/basics/wflow_syntax.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,20 @@ To help enable interoperability between workflow engines, quacc offers a unified

</center>

=== "Dask"

Take a moment to read the Dask Delayed documentation [overview page](https://docs.dask.org/en/stable/delayed.html) to get a sense of how the Dask decorators works and the Dask Distributed [quickstart page](https://distributed.dask.org/en/stable/quickstart.html) to understand how to submit tasks to a Dask cluster. Namely, you should understand the `@delayed` decorator and how to interface with the `Client`.

<center>

| Quacc | Covalent |
| ------------------ | ---------------------------------- |
| `#!Python @job` | `#!Python @delayed` |
| `#!Python @flow` | No effect |
| `#!Python @subflow` | `#!Python delayed(...).compute()` |

</center>

=== "Redun"

Take a moment to read the Redun documentation's [Design Overview page](https://insitro.github.io/redun/design.html) to get a sense of how Redun works. Namely, you should understand the `Task` decorator and how to interface with the `Scheduler`.
Expand Down Expand Up @@ -170,6 +184,51 @@ graph LR

3. This command will dispatch the workflow to the Covalent server.

=== "Dask"

!!! Important

If you haven't done so yet, make sure you update the quacc `WORKFLOW_ENGINE` [configuration variable](../settings/settings.md) and load the default Dask client:

```bash
quacc set WORKFLOW_ENGINE dask
```

```python title="python"
from dask.distributed import Client

client = Client() # (1)!
```

1. It is necessary to instantiate a Dask client before running Dask workflows. This command loads the default (local) client and only needs to be done once.

```python
from quacc import job


@job # (1)!
def add(a, b):
return a + b


@job
def mult(a, b):
return a * b


def workflow(a, b, c): # (2)!
return mult(add(a, b), c)


delayed = workflow(1, 2, 3)
result = client.compute(delayed).result() # 9
print(result)
```

1. The `#!Python @job` decorator will be transformed into `#!Python @delayed`.

2. The `#!Python @flow` decorator doesn't actually do anything when using Dask, so we chose to not include it here for brevity.

=== "Redun"

!!! Important
Expand Down Expand Up @@ -274,6 +333,10 @@ add.__wrapped__(1, 2)

If you want to learn more about Covalent, you can read the [Covalent Documentation](https://docs.covalent.xyz/docs/). Please refer to the Covalent [Discussion Board](https://github.com/AgnostiqHQ/covalent/discussions) for any Covalent-specific questions.

=== "Dask"

If you want to learn more about Dask, you can read the [Dask Delayed documentation](https://docs.dask.org/en/stable/delayed.html) to read more about the decorators and the [Dask Distributed documentation](https://distributed.dask.org/en/stable/) to read more about the distributed Dask cluster. Please refer to the [Dask Discourse page](https://discourse.dask.org/) for Dask-specific questions.

=== "Redun"

If you want to learn more about Redun, you can read the [Redun documentation](https://insitro.github.io/redun/index.html).
Expand Down
4 changes: 4 additions & 0 deletions docs/user/wflow_engine/executors1.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ In the previous examples, we have been running calculations on our local machine

2. The `SlurmExecutor` must have `use_srun=False` in order for ASE-based calculators to be launched appropriately.

=== "Dask"

A Dask cluster can be set up to be used with a queueing system like that found on most HPC machines. This is done via [Dask Jobqueue](https://jobqueue.dask.org/en/latest/index.html). Example configurations for various queuing systems can be found in ["Example Deployments"](https://jobqueue.dask.org/en/latest/examples.html) section of the documentation.

=== "Redun"

Out-of-the-box, Redun will run on your local machine. However, in practice, you will probably want to specify a dedicated executor.
Expand Down
54 changes: 54 additions & 0 deletions docs/user/wflow_engine/wflow_engines1.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,41 @@ graph LR

3. The `ct.get_result` function is used to fetch the workflow status and results from the server. You don't need to set `wait=True` in practice. Once you dispatch the workflow, it will begin running (if the resources are available).

=== "Dask"

!!! Important

If you haven't done so yet, make sure you update the quacc `WORKFLOW_ENGINE` [configuration variable](../settings/settings.md) and load the default Dask cluster:

```bash title="terminal"
quacc set WORKFLOW_ENGINE dask
```

```python title="python"
from dask.distributed import Client

client = Client()
```

```python
from ase.build import bulk
from quacc.recipes.emt.core import relax_job

# Make an Atoms object of a bulk Cu structure
atoms = bulk("Cu")

# Call the PythonApp
delayed = relax_job(atoms) # (1)!

# Print result
result = client.compute(delayed).result()
print(result) # (2)!
```

1. The `relax_job` function was pre-defined in quacc with a `#!Python @job` decorator, which is why we did not need to include it here. We also did not need to use a `#!Python @flow` decorator because Parsl does not have an analogous decorator.

2. Calling `client.compute()` dispatches the compute job to the active Dask cluster. The use of `.result()` serves to block any further calculations from running until it is resolved. Calling `.result()` also returns the function output as opposed to the `Delayed` object.

=== "Redun"

!!! Important
Expand Down Expand Up @@ -204,6 +239,25 @@ graph LR

1. We didn't need to wrap `bulk_to_slabs_flow` with a decorator because it is already pre-decorated with a `#!Python @flow` decorator.

=== "Dask"

```python
from ase.build import bulk
from quacc.recipes.emt.slabs import bulk_to_slabs_flow

# Define the Atoms object
atoms = bulk("Cu")

# Define the workflow
delayed = bulk_to_slabs_flow(atoms)

# Print the results
result = dask.compute(*client.gather(delayed)) # (1)!
print(result)
```

1. Calling `client.gatheer()` will collect the outputs from multiple `Delayed` objects.

=== "Redun"

```python
Expand Down
110 changes: 110 additions & 0 deletions docs/user/wflow_engine/wflow_engines2.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,57 @@ graph LR

4. You don't need to set `wait=True` in practice. Once you dispatch the workflow, it will begin running (if the resources are available). The `ct.get_result` function is used to fetch the workflow status and results from the server.

=== "Dask"

!!! Important

If you haven't done so yet, make sure you update the quacc `WORKFLOW_ENGINE` [configuration variable](../settings/settings.md) and load the default Dask cluster:

```bash title="terminal"
quacc set WORKFLOW_ENGINE dask
```

```python title="python"
from dask.distributed import Client

client = Client()
```

```python
from ase.build import bulk
from quacc.recipes.emt.core import relax_job, static_job


# Define the workflow
def workflow(atoms):
# Define Job 1
delayed1 = relax_job(atoms) # (1)!

# Define Job 2, which takes the output of Job 1 as input
delayed12 = static_job(delayed1["atoms"])

return delayed1


# Make an Atoms object of a bulk Cu structure
atoms = bulk("Cu")

# Dispatch the workflow
delayed = workflow(atoms)

# Fetch the result
result = client.compute(delayed).result() # (2)!
print(result)
```

1. The `relax_job` function was pre-defined in quacc with a `#!Python @job` decorator, which is why we did not need to include it here.

2. The use of `client.compute()` submits the job to the cluster, and `.result()` serves to block any further calculations from running until it is resolved. Calling `.result()` also returns the function output as opposed to the `Delayed` object.

!!! Note

Dask will implicitly know to call `.result()` on any `Delayed` it receives, and it is good to rely on this fact to avoid unnecessary blocking.

=== "Redun"

!!! Important
Expand Down Expand Up @@ -268,6 +319,36 @@ graph LR
print(result)
```

=== "Dask"

```python
from ase.build import bulk, molecule
from quacc.recipes.emt.core import relax_job


# Define workflow
def workflow(atoms1, atoms2):
# Define two independent relaxation jobs
result1 = relax_job(atoms1)
result2 = relax_job(atoms2)

return {"result1": result1, "result2": result2}


# Define two Atoms objects
atoms1 = bulk("Cu")
atoms2 = molecule("N2")

# Define two independent relaxation jobs
delayed = workflow(atoms1, atoms2)

# Fetch the results
results = client.gather(client.compute(delayed))
result1 = results["result1"]
result2 = results["result2"]
print(result1, result2)
```

=== "Redun"

```python
Expand Down Expand Up @@ -397,6 +478,35 @@ graph LR

1. We didn't need to wrap `bulk_to_slabs_flow` with a decorator because it is already pre-decorated with a `#!Python @flow` decorator. We also chose to set `#!Python run_static=False` here to disable the static calculation that is normally carried out in this workflow.

=== "Dask"

```python
from ase.build import bulk
from quacc.recipes.emt.core import relax_job
from quacc.recipes.emt.slabs import bulk_to_slabs_flow


# Define the workflow
def workflow(atoms):
relaxed_bulk = relax_job(atoms)
relaxed_slabs = bulk_to_slabs_flow(relaxed_bulk["atoms"], run_static=False) # (1)!

return relaxed_slabs


# Define the Atoms object
atoms = bulk("Cu")

# Dispatch the workflow
delayed = workflow(atoms)

# Fetch the results
result = dask.compute(*client.gather(delayed))
print(result)
```

1. We chose to set `#!Python run_static=False` here to disable the static calculation that is normally carried out in this workflow.

=== "Redun"

```python
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies = [

[project.optional-dependencies]
covalent = ["covalent>=0.226.0rc0", "covalent-hpc-plugin>=0.0.3"]
dask = ["dask[distributed]>=2023.12.0", "dask-jobqueue>=0.8.2"]
defects = ["pymatgen-analysis-defects>=2023.8.22", "shakenbreak>=3.2.0"]
jobflow = ["jobflow>=0.1.14", "fireworks>=2.0.3"]
newtonnet = ["newtonnet>=1.1"]
Expand Down
4 changes: 2 additions & 2 deletions src/quacc/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
installed_engine = next(
(
wflow_engine
for wflow_engine in ["parsl", "covalent", "prefect", "redun", "jobflow"]
for wflow_engine in ["parsl", "covalent", "dask", "prefect", "redun", "jobflow"]
if util.find_spec(wflow_engine)
),
"local",
Expand Down Expand Up @@ -54,7 +54,7 @@ class QuaccSettings(BaseSettings):
# ---------------------------

WORKFLOW_ENGINE: Literal[
"covalent", "jobflow", "parsl", "prefect", "redun", "local"
"covalent", "dask", "parsl", "redun", "jobflow", "prefect", "local"
] = Field(
installed_engine,
description=(
Expand Down
Loading

0 comments on commit e0351ba

Please sign in to comment.