From d8fbeddbc9d69e39f55f60e496d82c2dc2e5ea5e Mon Sep 17 00:00:00 2001 From: Jack Sundberg Date: Tue, 9 Aug 2022 10:04:11 -0400 Subject: [PATCH 1/6] add back custom executor --- README.md | 2 +- src/simmate/configuration/django/settings.py | 1 + src/simmate/database/utilities.py | 3 +- .../website/workflow_engine/__init__.py | 0 src/simmate/website/workflow_engine/apps.py | 7 + src/simmate/website/workflow_engine/models.py | 7 + .../workflow_engine/execution/README.md | 36 ++++ .../workflow_engine/execution/__init__.py | 6 + .../workflow_engine/execution/database.py | 81 +++++++++ .../workflow_engine/execution/executor.py | 148 +++++++++++++++ .../workflow_engine/execution/future.py | 135 ++++++++++++++ .../workflow_engine/execution/worker.py | 172 ++++++++++++++++++ 12 files changed, 596 insertions(+), 2 deletions(-) create mode 100644 src/simmate/website/workflow_engine/__init__.py create mode 100644 src/simmate/website/workflow_engine/apps.py create mode 100644 src/simmate/website/workflow_engine/models.py create mode 100644 src/simmate/workflow_engine/execution/README.md create mode 100644 src/simmate/workflow_engine/execution/__init__.py create mode 100644 src/simmate/workflow_engine/execution/database.py create mode 100644 src/simmate/workflow_engine/execution/executor.py create mode 100644 src/simmate/workflow_engine/execution/future.py create mode 100644 src/simmate/workflow_engine/execution/worker.py diff --git a/README.md b/README.md index 51a73ceb3..f95d4acfb 100644 --- a/README.md +++ b/README.md @@ -124,7 +124,7 @@ command: mpirun -n 8 vasp_std > vasp.out from simmate.workflows.relaxation import Relaxation__Vasp__Matproj as workflow state = workflow.run(structure="NaCl.cif") -result = workflow.result() +result = state.result() ``` diff --git a/src/simmate/configuration/django/settings.py b/src/simmate/configuration/django/settings.py index 23145d5e8..c8afbb42e 100644 --- a/src/simmate/configuration/django/settings.py +++ b/src/simmate/configuration/django/settings.py @@ -161,6 +161,7 @@ "simmate.website.core_components.apps.CoreComponentsConfig", "simmate.website.third_parties.apps.ThirdPartyConfig", "simmate.website.workflows.apps.WorkflowsConfig", + "simmate.website.workflow_engine.apps.WorkflowEngineConfig", # # These are built-in django apps that we use for extra features "django.contrib.admin", diff --git a/src/simmate/database/utilities.py b/src/simmate/database/utilities.py index 5dbd8d653..c2691acf2 100644 --- a/src/simmate/database/utilities.py +++ b/src/simmate/database/utilities.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import shutil +from pathlib import Path from django.apps import apps from django.core.management import call_command @@ -53,7 +54,7 @@ def reset_database(apps_to_migrate=APPS_TO_MIGRATE, use_prebuilt=False): if app_config.label not in apps_to_migrate: continue - migration_dir = app_config.path / "migrations" + migration_dir = Path(app_config.path) / "migrations" if migration_dir.exists(): shutil.rmtree(migration_dir) continue diff --git a/src/simmate/website/workflow_engine/__init__.py b/src/simmate/website/workflow_engine/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/simmate/website/workflow_engine/apps.py b/src/simmate/website/workflow_engine/apps.py new file mode 100644 index 000000000..59b605654 --- /dev/null +++ b/src/simmate/website/workflow_engine/apps.py @@ -0,0 +1,7 @@ +from django.apps import AppConfig + + +class WorkflowEngineConfig(AppConfig): + + # use the full import path for this app b/c it's within a package + name = "simmate.website.workflow_engine" diff --git a/src/simmate/website/workflow_engine/models.py b/src/simmate/website/workflow_engine/models.py new file mode 100644 index 000000000..3ec4cf6d4 --- /dev/null +++ b/src/simmate/website/workflow_engine/models.py @@ -0,0 +1,7 @@ +# -*- coding: utf-8 -*- + +# I store all of my models elsewhere, so this file simply exists to show django where +# they are located at. I do this based on the directions given by: +# https://docs.djangoproject.com/en/3.1/topics/db/models/#organizing-models-in-a-package + +from simmate.workflow_engine.execution.database import WorkItem diff --git a/src/simmate/workflow_engine/execution/README.md b/src/simmate/workflow_engine/execution/README.md new file mode 100644 index 000000000..ec1fdcdfa --- /dev/null +++ b/src/simmate/workflow_engine/execution/README.md @@ -0,0 +1,36 @@ + +> :warning: This module represents an alternative to Prefect. It is meant to be a stable quick-start alternative, but lacks the scaling and numerous features that Prefect offers. + +This is an SQL executor that intends to be a stripped down version of FireWorks and Prefect. The scheduler is directly built into the django database, which makes it so you don't have to deal with firewalls or complex setups -- any worker that can connect to the database will work just fine. The downside is that the executor is slower (bc each task requires multiple database calls and also writing to the database). It's a trade off of speed for stability, but this is okay because many workflows in Simmate are >1min and the speed penality is well below 1 second. + +Example usage: + +```python +from simmate.workflow_engine.execution.executor import SimmateExecutor + +executor = SimmateExecutor() + +# EXAMPLE 1 +future = executor.submit(sum, [4, 3, 2, 1]) +assert future.result() == 10 + +# EXAMPLE 2 +import time + + +def test(): + futures = [executor.submit(time.sleep, 5) for n in range(10)] + return executor.wait(futures) + + +test() + +# ---------------------------------------------------------------------------- + +from simmate.workflow_engine.execution.worker import SimmateWorker + +worker = SimmateWorker(waittime_on_empty_queue=1) # nitems_max=1 +worker.start() + +# ---------------------------------------------------------------------------- +``` diff --git a/src/simmate/workflow_engine/execution/__init__.py b/src/simmate/workflow_engine/execution/__init__.py new file mode 100644 index 000000000..c5d2af81d --- /dev/null +++ b/src/simmate/workflow_engine/execution/__init__.py @@ -0,0 +1,6 @@ +# -*- coding: utf-8 -*- + +from .database import WorkItem +from .future import SimmateFuture +from .executor import SimmateExecutor +from .worker import SimmateWorker diff --git a/src/simmate/workflow_engine/execution/database.py b/src/simmate/workflow_engine/execution/database.py new file mode 100644 index 000000000..dc22b2a02 --- /dev/null +++ b/src/simmate/workflow_engine/execution/database.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- + +# import pickle +import cloudpickle # needed to serialize Prefect workflow runs and tasks + +from simmate.database.base_data_types import DatabaseTable, table_column + +# BUG: I have this database table within a module that calls "database.connect" +# at a higher level... Will this cause circular import issues? + + +class WorkItem(DatabaseTable): + # The name WorkItem comes from the naming convention use here: + # https://github.com/python/cpython/blob/master/Lib/concurrent/futures/thread.py + + # For serialization, I just use the pickle module, but in the future, I may + # want to do a priority of MsgPk >> JSON >> Pickle. I could check if the given + # object(s) has a serialize() method, check if has a to_json() method, and then + # as a last resort just pickle the object. + # https://docs.python.org/3/library/pickle.html + + # Pickled objects are just written as byte strings, so I stored them in django + # as a BinaryField which accepts bytes. + # https://docs.djangoproject.com/en/3.1/ref/models/fields/#binaryfield + + # There is a repo that goes one step further and makes a PickleField for django + # but this looks over the top for what I need -- even though django-picklefield + # is only one file! Still, I can use it as a reference if I ever run into bugs + # https://github.com/gintas/django-picklefield + + class Meta: + app_label = "workflow_engine" + + labels = table_column.JSONField(default=[]) + """ + List of labels to submit the task with, which helps with submitting workers + for a specific type of task/workflow. + """ + + fxn = table_column.BinaryField() + """ + The function to be called, which is serialized into a binary format. + """ + + args = table_column.BinaryField(default=cloudpickle.dumps([])) + """ + positional arguments to be passed into fxn + """ + + kwargs = table_column.BinaryField(default=cloudpickle.dumps({})) + """ + keyword arguments to be passed into fxn + """ + + result = table_column.BinaryField(blank=True, null=True) + """ + the output of fxn(*args, **kwargs) + """ + + # These states are based on the python queue module + class StatusOptions(table_column.TextChoices): + PENDING = "P" + RUNNING = "R" + CANCELLED = "C" + # CANCELLED_AND_NOTIFIED = "N" # !!! when should I use this? + FINISHED = "F" + + # TODO -- I should consider indexing this column for speed because it's + # the most queried column by far. + status = table_column.CharField( + max_length=1, + choices=StatusOptions.choices, + default=StatusOptions.PENDING, + ) + """ + the status/state of the workitem + """ + + # TODO: Consider creating a separate table for Worker and linking it to this + # table via a foreign key. + # worker_id = table_column.CharField(max_length=50, blank=True, null=True) diff --git a/src/simmate/workflow_engine/execution/executor.py b/src/simmate/workflow_engine/execution/executor.py new file mode 100644 index 000000000..f345c6809 --- /dev/null +++ b/src/simmate/workflow_engine/execution/executor.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- + +# import pickle +import cloudpickle # needed to serialize Prefect workflow runs and tasks + +from simmate.workflow_engine.execution.database import WorkItem +from simmate.workflow_engine.execution.future import SimmateFuture + + +class SimmateExecutor: + """ + Sets up a connection to the queue database. Unlike normal executors, + this does not set up any workers -- you must launch Worker instances + elsewhere. It's primary role is to connect to the queue database + and generate futures for workers. Therefore, think of the Executor + as how you SUBMIT tasks and then the Worker is how you RUN jobs. You + need both classes to have the setup working properly. + + Only use this Executor when Dask can't solve your problem! It's main + use it to bypass university HPC cluster's firewalls because here worker + signals are one-directional -- that is they query a database and there + is never a signal sent to the worker like other executors do. Thus + we can have workers anywhere we'd like as long as they have access + to internet - so even multiple HPC clusters will work. At the moment, + the executor has no idea how many workers exist and their state. I may + add a "worker heartbeat" table to the queue database for the executor + to read and run managerial tasks based off though. + """ + + # This class is modeled after the following... + # https://github.com/python/cpython/blob/master/Lib/concurrent/futures/thread.py + # https://docs.python.org/3/library/concurrent.futures.html + # from concurrent.futures import Executor # No need to inherit at the moment + + def submit( + self, + fxn, + *args, + labels=[], + **kwargs, + ): + + # The *args and **kwargs input separates args into a tuple and kwargs into + # a dictionary for me, which makes their storage very easy! + + # make the WorkItem where all of the provided inputs are pickled and + # save the workitem to the database. + # Pickling is just converting them to a byte string format + # No lock is needed to do this because adding a new row is handled + # by the database with ease, even if some different Executor is + # adding another WorkItem at the same time. + # TODO - should I put pickling in a "try" in case it fails? + workitem = WorkItem.objects.create( + fxn=cloudpickle.dumps(fxn), + args=cloudpickle.dumps(args), + kwargs=cloudpickle.dumps(kwargs), + # TODO: labels + ) + + # create the future object + future = SimmateFuture(pk=workitem.pk) + + # and return the future for use + return future + + def map(self, fxn, iterables, timeout=None, chunksize=100): # TODO + # chunksize indicates how many to add at one + # iterables is a list of (*args, **kwargs) + # add many fn(*args, **kwargs) to queue + + # TODO -- This is not supported at the moment. I should use the + # .bulk_create method to do this in the future: + # https://docs.djangoproject.com/en/3.1/ref/models/querysets/#bulk-create + + # raise an error to ensure user sees this isn't supported yet. + raise Exception("This method is not supported yet") + + def shutdown(self, wait=True, cancel_futures=False): # TODO + # whether to wait until the queue is empty + # whether to cancel futures and clear database + pass + + def wait(self, futures): + """ + Waits for all futures to complete before returning a list of their results + """ + # If a dictionary of {key1: future1, key2: future2, ...} is given, + # then we return a dictionary of which futures replaced by results. + # NOTE: this is really for compatibility with Prefect's FlowRunner. + if isinstance(futures, dict): + print("\n\nWAIT") + import time + + time.sleep(10) + for key, future in futures.items(): + print((key, future, future.pk, future.done())) + # print(future.result()) + return {key: future.result() for key, future in futures.items()} + # otherwise this is a list of futures, so return a list of results + else: + return [future.result() for future in futures] + + # ------------------------------------------------------------------------ + # ------------------------------------------------------------------------ + # ------------------------------------------------------------------------ + + # These methods are for managing and monitoring the queue + # I attach these directly to the Executor rather than having a separate + # DjangoQueue class that inherits from python's Queue module. + # If there is a good reason to make a separate class in the future, + # I can start from these methods here and the following link: + # https://docs.python.org/3/library/queue.html + + def queue_size(self): + """ + Return the approximate size of the queue. + """ + # Count the number of WorkItem(s) that have a status of PENDING + # !!! Should I include RUNNING in the count? If so I do that with... + # from django.db.models import Q + # ...filter(Q(status="P") | Q(status="R")) + queue_size = WorkItem.objects.filter(status="P").count() + return queue_size + + def clear_queue(self, are_you_sure=False): + """ + Empties the WorkItem database table and delete everything. This will + not stop the workers if they are in the middle of a job though. + """ + # Make sure the user ment to do this, otherwise raise an exception + if not are_you_sure: + raise Exception( + "Are you sure you want to do this? it deletes all of your queue" + "data and you can't get it back. If so, set are_you_sure=True." + ) + else: + WorkItem.objects.all().delete() + + def clear_finished(self, are_you_sure=False): + """ + Empties the WorkItem database table and delete everything. This will + not stop the workers if they are in the middle of a job though. + """ + # Make sure the user ment to do this, otherwise raise an exception + if not are_you_sure: + raise Exception + else: + WorkItem.objects.filter(status="F").delete() diff --git a/src/simmate/workflow_engine/execution/future.py b/src/simmate/workflow_engine/execution/future.py new file mode 100644 index 000000000..845f02103 --- /dev/null +++ b/src/simmate/workflow_engine/execution/future.py @@ -0,0 +1,135 @@ +# -*- coding: utf-8 -*- + +import time + +# import pickle +import cloudpickle # needed to serialize Prefect workflow runs and tasks + +from simmate.workflow_engine.execution.database import WorkItem + + +class SimmateFuture: + # class based on... + # https://docs.python.org/3/library/concurrent.futures.html + # Some methods still need to be added, but I have no need for them yet. + # from concurrent.futures import Future # No need to inherit at the moment + + # locking rows is done by... + # https://docs.djangoproject.com/en/3.1/ref/models/querysets/#select-for-update + + def __init__(self, pk): + + # This is the WorkItem personal key which tells us which row in the table + # we should loook at. + self.pk = pk + + def cancel(self): + """ + Attempt to cancel the call. If the call is currently being executed or + finished running and cannot be cancelled then the method will return + False, otherwise the call will be cancelled and the method will return + True. + """ + # Query the WorkItem, lock it for editting, and check the status. + workitem = WorkItem.objects.select_for_update().get(pk=self.pk) + + # check if the status is *not* PENDING + if workitem.status != "P": + # if so, the job is already running or finished, in which case + # we can't cancel it. + return False + + else: + # If it is still pending, we can go ahead and cancel it. + # This does not delete the task from the queue database though + workitem.status = "C" + workitem.save() + return True + + def cancelled(self): + """ + Return True if the call was successfully cancelled. + """ + # I don't use a lock to check the status here + workitem = WorkItem.objects.only("status").get(pk=self.pk) + + # check the status and indicate whether it is CANCELED or not + if workitem.status == "C": + return True + else: + return False + + def running(self): + """ + Return True if the call is currently being executed and cannot be cancelled. + """ + # I don't use a lock to check the status here + workitem = WorkItem.objects.only("status").get(pk=self.pk) + + # check the status and indicate whether it is RUNNING or not + if workitem.status == "R": + return True + else: + return False + + def done(self): + """ + Return True if the call was successfully cancelled or finished running. + """ + # I don't use a lock to check the status here + workitem = WorkItem.objects.only("status").get(pk=self.pk) + + # check the status and indicate whether it is FINISHED or CANCELED + if workitem.status == "F" or workitem.status == "C": + return True + else: + return False + + def result(self, timeout=None, sleep_step=0.1): + """ + Return the value returned by the call. If the call hasn’t yet completed + then this method will wait up to timeout seconds. If the call hasn’t + completed in timeout seconds, then a concurrent.futures.TimeoutError + will be raised. timeout can be an int or float. If timeout is not + specified or None, there is no limit to the wait time. + + If the future is cancelled before completing then CancelledError + will be raised. + + If the call raised, this method will raise the same exception. + """ + # if no timeout was set, use infinity so we wait forever. + if not timeout: + timeout = float("inf") + + # Loop endlessly until the job completes or we timeout + time_start = time.time() + + while (time.time() - time_start) < timeout: + # I don't use a lock to check the status here + workitem = WorkItem.objects.only("status", "result").get(pk=self.pk) + status = workitem.status + + if status == "F": # FINISHED + # grab the result, unpickle it, and return it + result = cloudpickle.loads(workitem.result) + # if the result is an Error or Exception, raise it + if isinstance(result, Exception): + raise result + # otherwise return the result as-s + else: + return result + + elif status == "C": # CANCELED + raise CancelledError("This item was cancelled and has no result") + + elif status == "P" or status == "R": # PENDING or RUNNING + # sleep the set amount before restarting the while loop + time.sleep(sleep_step) + + # if the loop exits and we reached this line, then we've hit the timeout + raise TimeoutError("The time-limit to wait for this result has been exceeded") + + +class CancelledError(Exception): + pass diff --git a/src/simmate/workflow_engine/execution/worker.py b/src/simmate/workflow_engine/execution/worker.py new file mode 100644 index 000000000..92691f229 --- /dev/null +++ b/src/simmate/workflow_engine/execution/worker.py @@ -0,0 +1,172 @@ +# -*- coding: utf-8 -*- + +import time + +# import pickle +import cloudpickle # needed to serialize Prefect workflow runs and tasks + +from django.db import transaction + +from simmate.workflow_engine.execution.database import WorkItem + +# This string is just something fancy to display in the console when a worker +# starts up. +# This uses "Small Slant" from https://patorjk.com/software/taag/ +HEADER_ART = r""" + _____ __ _ __ __ + / __(_)_ _ __ _ ___ _/ /____ | | /| / /__ ____/ /_____ ____ + _\ \/ / ' \/ ' \/ _ `/ __/ -_) | |/ |/ / _ \/ __/ '_/ -_) __/ +/___/_/_/_/_/_/_/_/\_,_/\__/\__/ |__/|__/\___/_/ /_/\_\\__/_/ + +""" + + +class SimmateWorker: + + # Ideally, this worker would involve multiple threads threads going. One + # thread would update the queue database with a "heartbeat" to let it know + # that it is still working on tasks. The other thread will run the given + # workitems in serial. I could even allow for more threads so that this + # worker can run multiple workitems at once and in parallel. + # However, if this level of implementation is needed, we should instead + # switch to using Prefect, which has it built in. + + def __init__( + self, + # limit of tasks and lifetime of the worker + nitems_max: int = None, + timeout: float = None, + # wait_on_timeout=False, # TODO + # settings on what to do when the queue is empty + close_on_empty_queue: bool = False, + waittime_on_empty_queue: float = 60, + labels: list[str] = [], + ): + + # the maximum number of workitems to run before closing down + # if no limit was set, we can go to infinity! + self.nitems_max = nitems_max or float("inf") + + # Don't start a new workitem after this time. The worker will be shut down. + # if no timeout was set, use infinity so we wait forever. + self.timeout = timeout or float("inf") + + # whether to wait on the running workitems to finish before shutting down + # the timedout worker. + # self.wait_on_timeout = wait_on_timeout # # TODO + + # whether to close if the queue is empty + self.close_on_empty_queue = close_on_empty_queue + # if the queue is found to be empty, we will give it one last chance + # to fill. Check the queue again after this time sleeping and if it is + # still empty, close the worker. + self.waittime_on_empty_queue = waittime_on_empty_queue + + def start(self): + + # print the header in the console to let the user know the worker started + print(HEADER_ART) + + # establish starting point for the worker + time_start = time.time() + ntasks_finished = 0 + + # Loop endlessly until one of the following happens... + # the timeout limit is hit + # the queue is empty + # the nitems limit is hit + while True: + + # check for timeout before starting a new workitem and exit + # if we've hit the limit. + if (time.time() - time_start) > self.timeout: + # TODO - check wait_on_timeout if running in parallel. + print("The time-limit for this worker has been hit.") + return + + # check the number of jobs completed so far, and exit if we hit + # the limit + if ntasks_finished >= self.nitems_max: + print("Maxium number of WorkItems hit for this worker.") + return + + # check the length of the queue and while it is empty, we want to + # loop. The exception of looping endlessly is if we want the worker + # to shutdown instead. + while self.queue_size() == 0: + # if it is empty, we want to sleep for a little and check again + time.sleep(self.waittime_on_empty_queue) + + # This is a special condition where we may want to close the + # worker if the queue stays empty + if self.close_on_empty_queue: + # after we just waited, let's check the queue size again + if self.queue_size() == 0: + # if it's still empty, we should close the worker + print("The queue is empty so the worker has been closed.") + return + + # If we've made it this far, we're ready to grab a new WorkItem + # and run it! + # Query for PENDING WorkItems, lock it for editting, and update + # the status to RUNNING + workitem = WorkItem.objects.select_for_update().filter(status="P").first() + + # our lock exists only within this transation + with transaction.atomic(): + # update the status to running before starting it so no other + # worker tries to grab the same WorkItem + workitem.status = "R" + # TODO -- indicate that the WorkItem is with this Worker (relationship) + workitem.save() + + # Print out the job ID that is being ran for the user to see + print(f"Running WorkItem with id {workitem.id}.") + + # now let's unpickle the WorkItem components + fxn = cloudpickle.loads(workitem.fxn) + args = cloudpickle.loads(workitem.args) + kwargs = cloudpickle.loads(workitem.kwargs) + + # Try running the WorkItem + try: + result = fxn(*args, **kwargs) + # if it fails, we want to "capture" the error and return it + # rather than have the Worker fail itself. + except Exception as exception: + result = exception + + # whatever the result, we need to try to pickle it now + try: + result_pickled = cloudpickle.dumps(result) + # if this fails, we even want to pickle the error and return it + except Exception as exception: + result_pickled = cloudpickle.dumps(exception) + + # requery the WorkItem to restart our lock + workitem = WorkItem.objects.select_for_update().get(pk=workitem.pk) + + # our lock exists only within this transation + with transaction.atomic(): + # pickle the result and update the workitem's result and status + # !!! should I have the pickle inside of a Try? + workitem.result = result_pickled + workitem.status = "F" + workitem.save() + + # mark down that we've completed one WorkItem + ntasks_finished += 1 + + # Print out the job ID that was just finished for the user to see. + print("Completed WorkItem.") + + def queue_size(self): + """ + Return the approximate size of the queue. + """ + # Count the number of WorkItem(s) that have a status of PENDING + # !!! Should I include RUNNING in the count? If so I do that with... + # from django.db.models import Q + # ...filter(Q(status="P") | Q(status="R")) + queue_size = WorkItem.objects.filter(status="P").count() + return queue_size From 1665fe37c11863f26d5f7ebe268e6e3b00a6214a Mon Sep 17 00:00:00 2001 From: Jack Sundberg Date: Tue, 9 Aug 2022 12:11:50 -0400 Subject: [PATCH 2/6] checkpoint in new executor --- .../database/base_data_types/calculation.py | 24 +- src/simmate/workflow_engine/utilities.py | 23 +- src/simmate/workflow_engine/workflow.py | 494 ++++++++++-------- 3 files changed, 315 insertions(+), 226 deletions(-) diff --git a/src/simmate/database/base_data_types/calculation.py b/src/simmate/database/base_data_types/calculation.py index 097ae4f0c..fffbb5215 100644 --- a/src/simmate/database/base_data_types/calculation.py +++ b/src/simmate/database/base_data_types/calculation.py @@ -58,7 +58,7 @@ class Meta: like... "/path/to/simmate-task-abc123" """ - prefect_flow_run_id = table_column.CharField( + run_id = table_column.CharField( max_length=50, blank=True, null=True, @@ -137,8 +137,11 @@ def prefect_state(self) -> str: return flowrunview.state.__class__.__name__ if flowrunview else None @classmethod - def from_prefect_context( - cls, prefect_flow_run_id: str = None, workflow_name: str = None, **kwargs + def from_run_context( + cls, + run_id: str = None, + workflow_name: str = None, + **kwargs, ): """ Given a prefect id, this method will do one of the following... @@ -149,16 +152,19 @@ def from_prefect_context( It will then return the corresponding Calculation instance. """ - if not prefect_flow_run_id or not workflow_name: + if not run_id or not workflow_name: # Grab the database_table that we want to save the results in run_context = FlowRunContext.get() if run_context: - prefect_flow_run_id = str(run_context.flow_run.id) workflow = run_context.flow.simmate_workflow workflow_name = workflow.name_full + run_id = str(run_context.flow_run.id) assert workflow.database_table == cls else: - raise Exception("Please provide a flow_id and workflow name.") + raise Exception( + "No Prefect FlowRunContext was detected, so you must provide " + "flow_id and workflow_name to the from_run_context method." + ) # Depending on how a workflow was submitted, there may be a calculation # extry existing already -- which we need to grab and then update. If it's @@ -166,14 +172,14 @@ def from_prefect_context( # check if the calculation already exists in our database, and if so, # grab it and return it. - if cls.objects.filter(prefect_flow_run_id=prefect_flow_run_id).exists(): - return cls.objects.get(prefect_flow_run_id=prefect_flow_run_id) + if cls.objects.filter(run_id=run_id).exists(): + return cls.objects.get(run_id=run_id) # Otherwise we need to create a new one and return that. # To handle the initialization of other Simmate mix-ins, we pass all # information to the from_toolkit method rather than directly to cls. calculation = cls.from_toolkit( - prefect_flow_run_id=prefect_flow_run_id, + run_id=run_id, location=platform.node(), workflow_name=workflow_name, **kwargs, diff --git a/src/simmate/workflow_engine/utilities.py b/src/simmate/workflow_engine/utilities.py index 16a9b9337..9ba5e1aa4 100644 --- a/src/simmate/workflow_engine/utilities.py +++ b/src/simmate/workflow_engine/utilities.py @@ -1,14 +1,27 @@ # -*- coding: utf-8 -*- -from prefect import task +from prefect.context import FlowRunContext -from typing import List -# OPTIMIZE: I need to return a dictionary because Prefect struggles to handle +def get_prefect_id(raise_if_no_context: bool = True): + """ + grabs the prefect flow run id from context (if there is one) + """ + # Grab the flow run id for reference. + run_context = FlowRunContext.get() + if run_context: + run_id = str(run_context.flow_run.id) + return run_id + elif not run_context and not raise_if_no_context: + return None # no error is raised + else: + raise Exception("Cannot detetect a Prefect FlowRunContext") + + +# OPTIMIZE: I needed to return a dictionary because Prefect struggles to handle # a list or tuple return in their workflow context. Maybe this will change in # Prefect Orion though. -@task -def parse_multi_command(command: str, commands_out: List[int]) -> dict: +def parse_multi_command(command: str, commands_out: list[int]) -> dict: """ Given a list of commands separated by semicolons (;), this simply separates the commands into individual ones. diff --git a/src/simmate/workflow_engine/workflow.py b/src/simmate/workflow_engine/workflow.py index 84a045aed..6959f78bb 100644 --- a/src/simmate/workflow_engine/workflow.py +++ b/src/simmate/workflow_engine/workflow.py @@ -270,6 +270,7 @@ class Example__Python__MyFavoriteSettings(Workflow): import re import inspect from typing import List, Any +import uuid from functools import cache # cached_property doesnt work with classmethod from prefect.tasks import task # present only for convience imports elsewhere @@ -284,6 +285,11 @@ class Example__Python__MyFavoriteSettings(Workflow): import simmate from simmate.database.base_data_types import Calculation from simmate.utilities import async_to_sync, get_directory, copy_directory +from simmate.workflow_engine.execution import SimmateExecutor +from simmate.workflow_engine.utilities import get_prefect_id + +# I disable prefect until I have a working deployment system +USE_PREFECT = False class Workflow: @@ -397,58 +403,73 @@ def run_config(cls): ) @classmethod - def _run_full(cls, **kwargs): + def _run_full(cls, run_id=None, **kwargs): """ This method should not be called directly. Use the `run` method instead. """ - kwargs_cleaned = cls._load_input_and_register(**kwargs) + # This method is isolated only because we want to wrap it as a prefect + # workflow in some cases. + run_id = run_id or cls._get_run_id() + kwargs_cleaned = cls._load_input_and_register(run_id=run_id, **kwargs) result = cls.run_config(**kwargs_cleaned) if cls.use_database: - result["calculation_id"] = cls._save_to_database(result) + result["calculation_id"] = cls._save_to_database(result, run_id=run_id) return result - @classmethod - @cache - def _to_prefect_flow(cls) -> Flow: - """ - Converts this workflow into a Prefect flow - """ - - # Instead of the @flow decorator, we build the flow instance directly - flow = Flow( - fn=cls._run_full, - name=cls.name_full, - version=cls.version, - # Skip type checking because I don't have robust typing yet - # e.g. Structure type inputs also accept inputs like a filename - validate_parameters=False, - ) - - # as an extra, we set this attribute to the prefect flow instance, which - # allows us to access the source Simmate Workflow easily with Prefect's - # context managers. - flow.simmate_workflow = cls - - return flow + @staticmethod + def _get_run_id(): + if USE_PREFECT: + run_id = get_prefect_id() + else: + run_id = str(uuid.uuid4()) + return run_id @classmethod - def run(cls, **kwargs) -> State: + def run(cls, **kwargs): """ - A convience method to run a workflow as a subflow in a prefect context. + runs the workflow locally """ + if USE_PREFECT: + state = cls._run_prefect(**kwargs) + else: + print(f"Starting new run of {cls.name_full}") + result = cls._run_full(**kwargs) # no run_id as a new one will be made + print(f"Completed run of {cls.name_full}") + state = DummyState(result) + return state - subflow = cls._to_prefect_flow() - state = subflow(return_state=True, **kwargs) + @classmethod + def run_cloud(cls, **kwargs) -> str: + # Because we often want to save some info to our database even before + # the calculation starts/finishes, we do that with. An example is + # storing the structure and run id that we just submitted. + if USE_PREFECT: + run_id = cls._run_prefect_cloud(**kwargs) + cls._register_calculation(run_id=run_id, **kwargs) + # BUG: Will there be a race condition here? What if the workflow finishes + # and tries writing to the databse before this is done? + # BUG: if parameters are improperly set, this line will fail, while the + # job submission (above) will suceed. Should I cancel the flow run if + # this occurs? + return run_id + else: + print(f"Submitting new run of {cls.name_full}") + run_id = cls._get_run_id() + cls._register_calculation(run_id=run_id, **kwargs) + executor = SimmateExecutor() + future = executor.submit(cls._run_full, run_id=run_id, **kwargs) + # Would it be better to return the future...? + return future - # We don't want to block and wait because this might disable parallel - # features of subflows. We therefore return the state and let the - # user decide if/when to block. - # result = state.result() + return run_id - return state + @classmethod + @property + async def nflows_submitted(cls) -> int: + raise NotImplementedError() @classmethod - def _save_to_database(cls, result): + def _save_to_database(cls, result, run_id): # split our results and corrections (which are given as a dict) into # separate variables @@ -458,7 +479,10 @@ def _save_to_database(cls, result): # load the calculation entry for this workflow run. This should already # exist thanks to the load_input_and_register task. - calculation = cls.database_table.from_prefect_context() + calculation = cls.database_table.from_run_context( + run_id=run_id, + workflow_name=cls.name_full, + ) # now update the calculation entry with our results calculation.update_from_vasp_run(vasprun, corrections, directory) @@ -606,7 +630,7 @@ def show_config(cls): # ------------------------------------------------------------------------- @classmethod - def _load_input_and_register(cls, **parameters: Any) -> dict: + def _load_input_and_register(cls, run_id: str, **parameters: Any) -> dict: """ How the input was submitted as a parameter depends on if we are submitting to Prefect Cloud, running the flow locally, or even continuing from a @@ -757,7 +781,7 @@ def _load_input_and_register(cls, **parameters: Any) -> dict: # and also see which structures/runs have been submitted aready. if cls.use_database: - cls._register_calculation(**parameters_cleaned) + cls._register_calculation(run_id=run_id, **parameters_cleaned) # --------------------------------------------------------------------- @@ -773,17 +797,13 @@ def _load_input_and_register(cls, **parameters: Any) -> dict: else parameters ) - # Grab the flow run id for reference. - run_context = FlowRunContext.get() - prefect_flow_run_id = str(run_context.flow_run.id) - # We want to write a file summarizing the inputs used for this # workflow run. This allows future users to reproduce the results if # desired -- and it also allows us to load old results into a database. input_summary = dict( workflow_name=cls.name_full, # this ID is ingored as an input but needed for loading past data - prefect_flow_run_id=prefect_flow_run_id, + run_id=run_id, **parameters_serialized, ) @@ -805,9 +825,11 @@ def _parameters_to_register(cls) -> List[str]: """ A list of input parameters that should be used to register the calculation. """ - parameters_to_register = [ - "prefect_flow_run_id" - ] # run is always used to register but is never an input parameter + + # run is always used to register but is never an input parameter + parameters_to_register = [] + # run_id and workflow_name are used to register but these are + # implemented with the _register_calculation elsewhere table_columns = cls.database_table.get_column_names() @@ -825,7 +847,7 @@ def _parameters_to_register(cls) -> List[str]: return parameters_to_register @classmethod - def _register_calculation(cls, **kwargs) -> Calculation: + def _register_calculation(cls, run_id=None, **kwargs) -> Calculation: """ If the workflow is linked to a calculation table in the Simmate database, this adds the flow run to the database. @@ -833,7 +855,7 @@ def _register_calculation(cls, **kwargs) -> Calculation: Parameters passed should be deserialized and cleaned. This method should not be called directly as it is used within the - `run_cloud` method and `load_input_and_register` task. + `run_prefect_cloud` method and `load_input_and_register` task. """ # We first need to grab the database table where we want to register @@ -846,16 +868,11 @@ def _register_calculation(cls, **kwargs) -> Calculation: # context -- where the context has information such as the workflow # we are using (and the database table linked to that workflow). if cls == Workflow: - + raise Exception("Checking if this method is ever used") run_context = FlowRunContext.get() workflow = run_context.flow.simmate_workflow database_table = workflow.database_table - # as an extra, add the prefect_flow_run_id the kwargs in case it - # wasn't set already. - if "prefect_flow_run_id" not in kwargs: - kwargs["prefect_flow_run_id"] = str(run_context.flow_run.id) - # Otherwise we should be using the subclass Workflow that has the # database_table property set. else: @@ -882,14 +899,18 @@ def _register_calculation(cls, **kwargs) -> Calculation: # back to json before saving to the database. if "workflow_base" in register_kwargs_cleaned: parameters_serialized = cls._serialize_parameters(**register_kwargs_cleaned) - prefect_flow_run_id = parameters_serialized.pop("prefect_flow_run_id", None) - calculation = database_table.from_prefect_context( - prefect_flow_run_id=prefect_flow_run_id, + calculation = database_table.from_run_context( + run_id=run_id, + workflow_name=cls.name_full, **parameters_serialized, ) else: # load/create the calculation for this workflow run - calculation = database_table.from_prefect_context(**register_kwargs_cleaned) + calculation = database_table.from_run_context( + run_id=run_id, + workflow_name=cls.name_full, + **register_kwargs_cleaned, + ) return calculation @@ -900,7 +921,7 @@ def _serialize_parameters(**parameters) -> dict: use. This method should not be called directly as it is used within the - run_cloud() method. + run_prefect_cloud() method. """ # TODO: consider moving this into prefect's core code as a contribution. @@ -1028,191 +1049,240 @@ def _deserialize_parameters( # the submission of workflows to cloud # ------------------------------------------------------------------------- - @classmethod - def run_cloud(cls, **kwargs) -> str: - """ - This schedules the workflow to run remotely on Prefect Cloud. + if USE_PREFECT: + + @classmethod + @cache + def _to_prefect_flow(cls) -> Flow: + """ + Converts this workflow into a Prefect flow + """ + + # Instead of the @flow decorator, we build the flow instance directly + flow = Flow( + fn=cls._run_full, + name=cls.name_full, + version=cls.version, + # Skip type checking because I don't have robust typing yet + # e.g. Structure type inputs also accept inputs like a filename + validate_parameters=False, + ) - #### Parameters + # as an extra, we set this attribute to the prefect flow instance, which + # allows us to access the source Simmate Workflow easily with Prefect's + # context managers. + flow.simmate_workflow = cls - - `labels`: - a list of labels to schedule the workflow with + return flow - - `wait_for_run`: - whether to wait for the workflow to finish. If False, the workflow - will simply be submitted and then exit. The default is True. + @classmethod + def _run_prefect(cls, **kwargs) -> State: + """ + A convience method to run a workflow as a subflow in a prefect context. + """ - - `**kwargs`: - all options that are normally passed to the workflow.run() method + subflow = cls._to_prefect_flow() + state = subflow(return_state=True, **kwargs) - #### Returns + # We don't want to block and wait because this might disable parallel + # features of subflows. We therefore return the state and let the + # user decide if/when to block. + # result = state.result() - - The flow run id that was used in prefect cloud. + return state + @classmethod + def _run_prefect_cloud(cls, **kwargs) -> str: + """ + This schedules the workflow to run remotely on Prefect Cloud. - #### Usage + #### Parameters - Make sure you have Prefect properly configured and have registered your - workflow with the backend. + - `labels`: + a list of labels to schedule the workflow with - Note that this method can be viewed as a fork of: - - from prefect.tasks.prefect.flow_run import create_flow_run - It can also be viewed as a more convenient way to call to client.create_flow_run. - I don't accept any other client.create_flow_run() inputs besides 'labels'. - This may change in the future if I need to set flow run names or schedules. - """ + - `wait_for_run`: + whether to wait for the workflow to finish. If False, the workflow + will simply be submitted and then exit. The default is True. - # Prefect does not properly deserialize objects that have - # as as_dict or to_dict method, so we use a custom method to do that here - parameters_serialized = cls._serialize_parameters(**kwargs) - # BUG: What if we are submitting using a filename? We don't want to - # submit to a cluster and have the job fail because it doesn't have - # access to the file. One solution could be to deserialize right before - # serializing in the next line in order to ensure parameters that - # accept file names are submitted with all necessary data. + - `**kwargs`: + all options that are normally passed to the workflow.run() method - # Now we submit the workflow. - flow_run_id = cls._submit_to_prefect(parameters=parameters_serialized) + #### Returns - # Because we often want to save some info to our database even before - # the calculation starts/finishes, we do that here. An example is - # storing the structure and prefect id that we just submitted. - cls._register_calculation(prefect_flow_run_id=flow_run_id, **kwargs) - # BUG: Will there be a race condition here? What if the workflow finishes - # and tries writing to the databse before this is done? - # BUG: if parameters are improperly set, this line will fail, while the - # job submission (above) will suceed. Should I cancel the flow run if - # this occurs? - - # return the flow_run_id for the user - return flow_run_id + - The flow run id that was used in prefect cloud. - @classmethod - @property - @cache - @async_to_sync - async def deployment_id(cls) -> str: - """ - Grabs the deployment id from the prefect database if it exists, and - if not, creates the depolyment and then returns the id. - This is a synchronous and cached version of `_get_deployment_id` and - this is the preferred method to use for beginners. - """ - return await cls._get_deployment_id() + #### Usage - @classmethod - async def _get_deployment_id(cls) -> str: - """ - Grabs the deployment id from the prefect database if it exists, and - if not, creates the depolyment and then returns the id. + Make sure you have Prefect properly configured and have registered your + workflow with the backend. - This is an asynchronous method and should only be used when within - other async methods. Beginners should instead use the `deployment_id` - property. - """ + Note that this method can be viewed as a fork of: + - from prefect.tasks.prefect.flow_run import create_flow_run + It can also be viewed as a more convenient way to call to client.create_flow_run. + I don't accept any other client.create_flow_run() inputs besides 'labels'. + This may change in the future if I need to set flow run names or schedules. + """ - async with get_client() as client: - response = await client.read_deployments( - flow_filter=FlowFilter( - name={"any_": [cls.name_full]}, - ), - ) + # Prefect does not properly deserialize objects that have + # as as_dict or to_dict method, so we use a custom method to do that here + parameters_serialized = cls._serialize_parameters(**kwargs) + # BUG: What if we are submitting using a filename? We don't want to + # submit to a cluster and have the job fail because it doesn't have + # access to the file. One solution could be to deserialize right before + # serializing in the next line in order to ensure parameters that + # accept file names are submitted with all necessary data. - # If this is the first time accessing the deployment id, we will need - # to create the deployment - if not response: - deployment_id = await cls._create_deployment() + # Now we submit the workflow. + flow_run_id = cls._submit_to_prefect(parameters=parameters_serialized) - # there should only be one deployment associated with this workflow - # if it's been deployed already. - elif len(response) == 1: - deployment_id = str(response[0].id) + # return the flow_run_id for the user + return flow_run_id - else: - raise Exception("There are duplicate deployments for this workflow!") + @classmethod + @property + @cache + @async_to_sync + async def depolyment_id_prefect(cls) -> str: + """ + Grabs the deployment id from the prefect database if it exists, and + if not, creates the depolyment and then returns the id. - return deployment_id + This is a synchronous and cached version of `_get_depolyment_id_prefect` and + this is the preferred method to use for beginners. + """ + return await cls._get_depolyment_id_prefect() - @classmethod - async def _create_deployment(cls) -> str: - """ - Registers this workflow to the prefect database as a deployment. + @classmethod + async def _get_depolyment_id_prefect(cls) -> str: + """ + Grabs the deployment id from the prefect database if it exists, and + if not, creates the depolyment and then returns the id. - This method should not be called directly. It will be called by - other methods when appropriate - """ + This is an asynchronous method and should only be used when within + other async methods. Beginners should instead use the `depolyment_id_prefect` + property. + """ - # raise error until python-deployments are supported again - raise Exception( - "Prefect 2.0 has removed the ability to create deployments in " - "python, so this feature is currently disabled." - ) - # When this is removed, be sure to re-add the test_workflow_cloud unittest - - from prefect.deployments import Deployment - - # NOTE: we do not use the client.create_deployment method because it - # is called within the Deployment.create() method for us. - deployment = Deployment( - name=cls.name_full, - flow=cls._to_prefect_flow(), - packager=OrionPackager(serializer=PickleSerializer()), - # OPTIMIZE: it would be better if I could figure out the ImportSerializer - # here. Only issue is that prefect would need to know to import AND - # call a method. - tags=[ - "simmate", - cls.name_type, - cls.name_calculator, - ], - ) + async with get_client() as client: + response = await client.read_deployments( + flow_filter=FlowFilter( + name={"any_": [cls.name_full]}, + ), + ) - deployment_id = await deployment.create() + # If this is the first time accessing the deployment id, we will need + # to create the deployment + if not response: + deployment_id = await cls._create_deployment_prefect() - return str(deployment_id) # convert from UUID to str first + # there should only be one deployment associated with this workflow + # if it's been deployed already. + elif len(response) == 1: + deployment_id = str(response[0].id) - @classmethod - @async_to_sync - async def _submit_to_prefect(cls, **kwargs) -> str: - """ - Submits a flow run to prefect cloud. + else: + raise Exception("There are duplicate deployments for this workflow!") - This method should not be used directly. Instead use `run_cloud`. - """ + return deployment_id + + @classmethod + async def _create_deployment_prefect(cls) -> str: + """ + Registers this workflow to the prefect database as a deployment. - # The reason we have this code as a separate method is because we want - # to isolate Prefect's async calls from Django's sync-restricted calls - # (i.e. django raises errors if called within an async context). - # Therefore, methods like `run_cloud` can't have both this async code - # AND methods like _register_calculation that make sync database calls. + This method should not be called directly. It will be called by + other methods when appropriate + """ - async with get_client() as client: - response = await client.create_flow_run_from_deployment( - deployment_id=await cls._get_deployment_id(), - **kwargs, + # raise error until python-deployments are supported again + raise Exception( + "Prefect 2.0 has removed the ability to create deployments in " + "python, so this feature is currently disabled." + ) + # When this is removed, be sure to re-add the test_workflow_cloud unittest + + from prefect.deployments import Deployment + + # NOTE: we do not use the client.create_deployment method because it + # is called within the Deployment.create() method for us. + deployment = Deployment( + name=cls.name_full, + flow=cls._to_prefect_flow(), + packager=OrionPackager(serializer=PickleSerializer()), + # OPTIMIZE: it would be better if I could figure out the ImportSerializer + # here. Only issue is that prefect would need to know to import AND + # call a method. + tags=[ + "simmate", + cls.name_type, + cls.name_calculator, + ], ) - flow_run_id = str(response.id) - return flow_run_id + deployment_id = await deployment.create() - @classmethod - @property - @async_to_sync - async def nflows_submitted(cls) -> int: - """ - Queries Prefect to see how many workflows are in a scheduled, running, - or pending state. - """ + return str(deployment_id) # convert from UUID to str first - async with get_client() as client: - response = await client.read_flow_runs( - flow_filter=FlowFilter( - name={"any_": [cls.name_full]}, - ), - flow_run_filter=FlowRunFilter( - state={"type": {"any_": ["SCHEDULED", "PENDING", "RUNNING"]}} - ), - ) + @classmethod + @async_to_sync + async def _submit_to_prefect(cls, **kwargs) -> str: + """ + Submits a flow run to prefect cloud. + + This method should not be used directly. Instead use `_run_prefect_cloud`. + """ + + # The reason we have this code as a separate method is because we want + # to isolate Prefect's async calls from Django's sync-restricted calls + # (i.e. django raises errors if called within an async context). + # Therefore, methods like `_run_prefect_cloud` can't have both this async code + # AND methods like _register_calculation that make sync database calls. + + async with get_client() as client: + response = await client.create_flow_run_from_deployment( + deployment_id=await cls._get_depolyment_id_prefect(), + **kwargs, + ) + + flow_run_id = str(response.id) + return flow_run_id + + @classmethod + @property + @async_to_sync + async def nflows_submitted_prefect(cls) -> int: + """ + Queries Prefect to see how many workflows are in a scheduled, running, + or pending state. + """ + + async with get_client() as client: + response = await client.read_flow_runs( + flow_filter=FlowFilter( + name={"any_": [cls.name_full]}, + ), + flow_run_filter=FlowRunFilter( + state={"type": {"any_": ["SCHEDULED", "PENDING", "RUNNING"]}} + ), + ) + + return len(response) + + +class DummyState: + """ + This class is meant to emulate Prefect States. By wrapping a result in + this, we enable higher-level features that depend on a call to + `state.result()`. + + This class should not be used directly as it is automatically applied with + the `Workflow.run` method + """ + + def __init__(self, result): + self._result = result - return len(response) + def result(self): + return self._result From 6074dbe53858e54ba4cf52d6508cc68f97f4b9b0 Mon Sep 17 00:00:00 2001 From: Jack Sundberg Date: Tue, 9 Aug 2022 13:11:58 -0400 Subject: [PATCH 3/6] establish working executor --- .../workflow_engine/execution/database.py | 11 +++-- .../workflow_engine/execution/executor.py | 4 +- .../workflow_engine/execution/worker.py | 23 ++++++--- src/simmate/workflow_engine/workflow.py | 48 +++++++++++++++---- 4 files changed, 67 insertions(+), 19 deletions(-) diff --git a/src/simmate/workflow_engine/execution/database.py b/src/simmate/workflow_engine/execution/database.py index dc22b2a02..496daa05d 100644 --- a/src/simmate/workflow_engine/execution/database.py +++ b/src/simmate/workflow_engine/execution/database.py @@ -31,10 +31,10 @@ class WorkItem(DatabaseTable): class Meta: app_label = "workflow_engine" - labels = table_column.JSONField(default=[]) + tags = table_column.JSONField(default=[]) """ - List of labels to submit the task with, which helps with submitting workers - for a specific type of task/workflow. + List of tags to submit the task with, which helps with submitting workers + for a specific type of task/workflow. (e.g. ["simmate", "custom"]) """ fxn = table_column.BinaryField() @@ -57,6 +57,11 @@ class Meta: the output of fxn(*args, **kwargs) """ + source = None + """ + Source column is not needed so setting this to None disables the column + """ + # These states are based on the python queue module class StatusOptions(table_column.TextChoices): PENDING = "P" diff --git a/src/simmate/workflow_engine/execution/executor.py b/src/simmate/workflow_engine/execution/executor.py index f345c6809..9229d4a1c 100644 --- a/src/simmate/workflow_engine/execution/executor.py +++ b/src/simmate/workflow_engine/execution/executor.py @@ -36,7 +36,7 @@ def submit( self, fxn, *args, - labels=[], + tags=[], **kwargs, ): @@ -54,7 +54,7 @@ def submit( fxn=cloudpickle.dumps(fxn), args=cloudpickle.dumps(args), kwargs=cloudpickle.dumps(kwargs), - # TODO: labels + tags=tags, # should be json serializable already ) # create the future object diff --git a/src/simmate/workflow_engine/execution/worker.py b/src/simmate/workflow_engine/execution/worker.py index 92691f229..0e26b6f25 100644 --- a/src/simmate/workflow_engine/execution/worker.py +++ b/src/simmate/workflow_engine/execution/worker.py @@ -40,8 +40,11 @@ def __init__( # settings on what to do when the queue is empty close_on_empty_queue: bool = False, waittime_on_empty_queue: float = 60, - labels: list[str] = [], + tags: list[str] = ["simmate"], # should default be empty...? ): + # the tags to query tasks for. If no tags were given, the worker will + # query for tasks that have NO tags + self.tags = tags # the maximum number of workitems to run before closing down # if no limit was set, we can go to infinity! @@ -81,13 +84,13 @@ def start(self): # if we've hit the limit. if (time.time() - time_start) > self.timeout: # TODO - check wait_on_timeout if running in parallel. - print("The time-limit for this worker has been hit.") + print("The time-limit for this worker has been hit. Shutting down.") return # check the number of jobs completed so far, and exit if we hit # the limit if ntasks_finished >= self.nitems_max: - print("Maxium number of WorkItems hit for this worker.") + print("Maxium number of WorkItems hit for this worker. Shutting down.") return # check the length of the queue and while it is empty, we want to @@ -103,14 +106,22 @@ def start(self): # after we just waited, let's check the queue size again if self.queue_size() == 0: # if it's still empty, we should close the worker - print("The queue is empty so the worker has been closed.") + print("The task queue is empty. Shutting down.") return # If we've made it this far, we're ready to grab a new WorkItem # and run it! # Query for PENDING WorkItems, lock it for editting, and update # the status to RUNNING - workitem = WorkItem.objects.select_for_update().filter(status="P").first() + query_results = WorkItem.objects.select_for_update().filter(status="P") + # filter down by tags + if self.tags: + for tag in self.tags: + query_results = query_results.filter(tags__icontains=tag) + else: + query_results = query_results.filter(tags=self.tags) + # and grab the first result + workitem = query_results.first() # our lock exists only within this transation with transaction.atomic(): @@ -121,7 +132,7 @@ def start(self): workitem.save() # Print out the job ID that is being ran for the user to see - print(f"Running WorkItem with id {workitem.id}.") + print(f"Running WorkItem with id {workitem.id}") # now let's unpickle the WorkItem components fxn = cloudpickle.loads(workitem.fxn) diff --git a/src/simmate/workflow_engine/workflow.py b/src/simmate/workflow_engine/workflow.py index 6959f78bb..408a3d0c7 100644 --- a/src/simmate/workflow_engine/workflow.py +++ b/src/simmate/workflow_engine/workflow.py @@ -272,6 +272,7 @@ class Example__Python__MyFavoriteSettings(Workflow): from typing import List, Any import uuid from functools import cache # cached_property doesnt work with classmethod +from pathlib import Path from prefect.tasks import task # present only for convience imports elsewhere from prefect.flows import Flow @@ -439,11 +440,12 @@ def run(cls, **kwargs): return state @classmethod - def run_cloud(cls, **kwargs) -> str: + def run_cloud(cls, return_future: bool = True, **kwargs) -> str: # Because we often want to save some info to our database even before # the calculation starts/finishes, we do that with. An example is # storing the structure and run id that we just submitted. if USE_PREFECT: + run_id = cls._run_prefect_cloud(**kwargs) cls._register_calculation(run_id=run_id, **kwargs) # BUG: Will there be a race condition here? What if the workflow finishes @@ -451,15 +453,27 @@ def run_cloud(cls, **kwargs) -> str: # BUG: if parameters are improperly set, this line will fail, while the # job submission (above) will suceed. Should I cancel the flow run if # this occurs? + + if return_future: + raise Exception("Prefect cannot return futures from submisson.") + return run_id else: print(f"Submitting new run of {cls.name_full}") + run_id = cls._get_run_id() cls._register_calculation(run_id=run_id, **kwargs) + executor = SimmateExecutor() - future = executor.submit(cls._run_full, run_id=run_id, **kwargs) + future = executor.submit( + cls._run_full, + run_id=run_id, + tags=cls.tags, + **kwargs, + ) # Would it be better to return the future...? - return future + if return_future: + return future return run_id @@ -539,6 +553,23 @@ def name_preset(cls) -> str: """ return cls.name_full.split(".")[2] + @classmethod + @property + def tags(cls) -> list[str]: + """ + Lists of tags to submit a the workflow with when using run_cloud. + """ + tags = [ + "simmate", + cls.name_type, + cls.name_calculator, + ] + + if USE_PREFECT: + return tags + else: + return tags + [cls.name_full] + # ------------------------------------------------------------------------- # Properties/method that set website UI documentation and help users # explore input options and settings used. @@ -949,6 +980,8 @@ def _serialize_parameters(**parameters) -> dict: parameter_value = parameter_value.as_dict() elif hasattr(parameter_value, "to_dict"): parameter_value = parameter_value.to_dict() + elif parameter_key == "directory": + parameter_value = str(parameter_value) # convert Path to str # workflow_base and input_parameters are special cases that # may require a refactor (for customized workflows) @@ -1042,6 +1075,9 @@ def _deserialize_parameters( parameters["supercell_end"] ) + if "directory" in parameters.keys(): + parameters_cleaned["directory"] = Path(parameters_cleaned["directory"]) + return parameters_cleaned # ------------------------------------------------------------------------- @@ -1214,11 +1250,7 @@ async def _create_deployment_prefect(cls) -> str: # OPTIMIZE: it would be better if I could figure out the ImportSerializer # here. Only issue is that prefect would need to know to import AND # call a method. - tags=[ - "simmate", - cls.name_type, - cls.name_calculator, - ], + tags=cls.tags, ) deployment_id = await deployment.create() From 74e4b7e9ebccb947088452d3096278dd00bae8e7 Mon Sep 17 00:00:00 2001 From: Jack Sundberg Date: Tue, 9 Aug 2022 14:18:46 -0400 Subject: [PATCH 4/6] checkpoint in refactor --- README.md | 35 +- .../command_line/test/test_workflows_cli.py | 11 +- .../configuration/prefect/connect_to_dask.py | 1 + src/simmate/conftest.py | 17 +- .../database/base_data_types/calculation.py | 4 +- .../evolution/database.py | 6 +- src/simmate/workflow_engine/__init__.py | 4 +- .../workflow_engine/execution/README.md | 4 +- .../workflow_engine/execution/executor.py | 56 +-- .../workflow_engine/execution/future.py | 6 +- .../execution_prefect/__init__.py | 1 + .../{ => execution_prefect}/worker.py | 2 +- .../execution_prefect/workflow.py | 268 ++++++++++ .../workflow_engine/test/test_s3_workflow.py | 44 +- src/simmate/workflow_engine/utilities.py | 18 - src/simmate/workflow_engine/workflow.py | 475 +++++------------- 16 files changed, 492 insertions(+), 460 deletions(-) create mode 100644 src/simmate/workflow_engine/execution_prefect/__init__.py rename src/simmate/workflow_engine/{ => execution_prefect}/worker.py (99%) create mode 100644 src/simmate/workflow_engine/execution_prefect/workflow.py diff --git a/README.md b/README.md index f95d4acfb..da3b8371d 100644 --- a/README.md +++ b/README.md @@ -180,21 +180,28 @@ structure.add_oxidation_state_by_guess() ``` -4. _**Ease of Scalability.**_ At the beginning of a project, you may want to write and run code on a single computer and single core. But as you run into some intense calculations, you may want to use all of your CPU and GPU to run calculations. At the extreme, some projects require thousands of computers across numerous locations, including university clusters (using SLURM or PBS) and cloud computing (using Kubernetes and Docker). Simmate can meet all of these needs thanks to integration with [Dask](https://github.com/dask/dask) and [Prefect](https://github.com/PrefectHQ/prefect): +4. _**Ease of Scalability.**_ At the beginning of a project, you may want to write and run code on a single computer and single core. But as you run into some intense calculations, you may want to use all of your CPU and GPU to run calculations. At the extreme, some projects require thousands of computers across numerous locations, including university clusters (using SLURM or PBS) and cloud computing (using Kubernetes and Docker). Simmate can meet all of these needs thanks to integration with a custom `SimmateExecutor` (the default), [Dask](https://github.com/dask/dask), and/or [Prefect](https://github.com/PrefectHQ/prefect): ```python -# To run the tasks of a single workflow in parallel, use Dask. -from prefect.task_runners import DaskTaskRunner -workflow.task_runner = DaskTaskRunner() -state = workflow.run(...) - -# To run many workflows in parallel, use Prefect. -# Once you configure Prefect, you simply switch -# from using "run" to "run_cloud" -prefect_flow_run_id = workflow.run_cloud(...) - -# You can use different combinations of these two parallelization strategies as well! -# Using Prefect and Dask, we can scale out accross various computer resources -# with a few lines of code. +# on your local computer, schedule your workflow run +state = workflow.run_cloud(...) + +# Note, your workflow won't run locally. +# While you wait for it run, you can check it's status +state.is_done() +state.is_running() + +# or cancel the job before it runs +state.cancel() + +# or wait until the job completes and grab the result! +# The job won't run until you start a worker (see command below) +result = state.result() +``` + +``` bash +# In a separate terminal or even on a remote HPC cluster, you +# can start a worker that will start running any scheduled jobs +simmate workflow-engine start-worker ``` diff --git a/src/simmate/command_line/test/test_workflows_cli.py b/src/simmate/command_line/test/test_workflows_cli.py index 2085af578..be7e51faf 100644 --- a/src/simmate/command_line/test/test_workflows_cli.py +++ b/src/simmate/command_line/test/test_workflows_cli.py @@ -2,8 +2,6 @@ import yaml -from prefect.states import Completed - from simmate.calculators.vasp.inputs import Potcar from simmate.workflow_engine import Workflow @@ -109,6 +107,9 @@ def test_workflows_run(command_line_runner, structure, mocker, tmp_path): "run", return_value=Completed(), ) + # the code above can be modified for a prefect executor + # from prefect.states import Completed + # return_value=Completed(), # now try writing input files to the tmp_path result = command_line_runner.invoke( @@ -151,6 +152,9 @@ def test_workflows_run_cloud(command_line_runner, structure, mocker, tmp_path): "run_cloud", return_value=Completed(), ) + # the code above can be modified for a prefect executor + # from prefect.states import Completed + # return_value=Completed(), # now try writing input files to the tmp_path result = command_line_runner.invoke( @@ -196,6 +200,9 @@ def test_workflows_run_yaml(command_line_runner, structure, mocker, tmp_path): "run", return_value=Completed(), ) + # the code above can be modified for a prefect executor + # from prefect.states import Completed + # return_value=Completed(), # now try writing input files to the tmp_path result = command_line_runner.invoke( diff --git a/src/simmate/configuration/prefect/connect_to_dask.py b/src/simmate/configuration/prefect/connect_to_dask.py index 5ea5a1847..cee237100 100644 --- a/src/simmate/configuration/prefect/connect_to_dask.py +++ b/src/simmate/configuration/prefect/connect_to_dask.py @@ -20,6 +20,7 @@ def set_default_executor(dask_scheduler_address): agent = LocalAgent(name="ExampleAgent") agent.start() """ + raise NotImplementedError("This method has not been ported to prefect 2.0") # All workflows should be pointed to the Dask cluster as the default Executor. # We can grab the Dask scheduler's address using the cluster object from above. diff --git a/src/simmate/conftest.py b/src/simmate/conftest.py index 86fdee2e9..656b201d2 100644 --- a/src/simmate/conftest.py +++ b/src/simmate/conftest.py @@ -20,7 +20,6 @@ import pytest from click.testing import CliRunner from django.contrib.auth.models import User -from prefect.testing.utilities import prefect_test_harness from simmate.utilities import get_directory from simmate.toolkit import Structure, Composition, base_data_types @@ -259,10 +258,12 @@ def command_line_runner(): return CliRunner() -@pytest.fixture(autouse=True, scope="session") -def prefect_test_fixture(): - """ - For all prefect flows and tasks, this will automatically use a dummy-database - """ - with prefect_test_harness(): - yield +# !!! Disable harness until prefect is reimplemented +# from prefect.testing.utilities import prefect_test_harness +# @pytest.fixture(autouse=True, scope="session") +# def prefect_test_fixture(): +# """ +# For all prefect flows and tasks, this will automatically use a dummy-database +# """ +# with prefect_test_harness(): +# yield diff --git a/src/simmate/database/base_data_types/calculation.py b/src/simmate/database/base_data_types/calculation.py index fffbb5215..f513daf90 100644 --- a/src/simmate/database/base_data_types/calculation.py +++ b/src/simmate/database/base_data_types/calculation.py @@ -2,8 +2,6 @@ import platform -from prefect.context import FlowRunContext - from simmate.database.base_data_types import DatabaseTable, table_column @@ -154,6 +152,8 @@ def from_run_context( if not run_id or not workflow_name: # Grab the database_table that we want to save the results in + from prefect.context import FlowRunContext + run_context = FlowRunContext.get() if run_context: workflow = run_context.flow.simmate_workflow diff --git a/src/simmate/toolkit/structure_prediction/evolution/database.py b/src/simmate/toolkit/structure_prediction/evolution/database.py index 2a4458012..f1e5c85a2 100644 --- a/src/simmate/toolkit/structure_prediction/evolution/database.py +++ b/src/simmate/toolkit/structure_prediction/evolution/database.py @@ -2,8 +2,6 @@ import plotly.graph_objects as plotly_go from django.apps import apps as django_apps -from prefect.client import get_client -from prefect.orion.schemas.filters import FlowRunFilter from simmate.database.base_data_types import table_column, DatabaseTable from simmate.utilities import async_to_sync @@ -281,6 +279,10 @@ async def _check_still_running_ids(prefect_flow_run_ids): This is normally used within `update_flow_run_ids` and shouldn't be called directly. """ + raise NotImplementedError("porting to general executor") + + from prefect.client import get_client + from prefect.orion.schemas.filters import FlowRunFilter # The reason we have this code as a separate method is because we want # to isolate Prefect's async calls from Django's sync-restricted calls diff --git a/src/simmate/workflow_engine/__init__.py b/src/simmate/workflow_engine/__init__.py index 68efb9d7f..453a0065f 100644 --- a/src/simmate/workflow_engine/__init__.py +++ b/src/simmate/workflow_engine/__init__.py @@ -8,12 +8,12 @@ # django in the code below this. These two imports don't depend on the database # module, so it may be worth isolating them for faster imports. from .error_handler import ErrorHandler -from .worker import Worker # All imports below this point depend on the simmate.database module and therefore # need a database connection. This line sets up the database connection so # that models can be imported. from simmate.database import connect -from .workflow import Workflow, task +from .workflow import Workflow from .s3_workflow import S3Workflow +from .execution import SimmateWorker as Worker diff --git a/src/simmate/workflow_engine/execution/README.md b/src/simmate/workflow_engine/execution/README.md index ec1fdcdfa..13b8731f3 100644 --- a/src/simmate/workflow_engine/execution/README.md +++ b/src/simmate/workflow_engine/execution/README.md @@ -8,10 +8,8 @@ Example usage: ```python from simmate.workflow_engine.execution.executor import SimmateExecutor -executor = SimmateExecutor() - # EXAMPLE 1 -future = executor.submit(sum, [4, 3, 2, 1]) +future = SimmateExecutor.submit(sum, [4, 3, 2, 1]) assert future.result() == 10 # EXAMPLE 2 diff --git a/src/simmate/workflow_engine/execution/executor.py b/src/simmate/workflow_engine/execution/executor.py index 9229d4a1c..b2740063f 100644 --- a/src/simmate/workflow_engine/execution/executor.py +++ b/src/simmate/workflow_engine/execution/executor.py @@ -32,13 +32,8 @@ class SimmateExecutor: # https://docs.python.org/3/library/concurrent.futures.html # from concurrent.futures import Executor # No need to inherit at the moment - def submit( - self, - fxn, - *args, - tags=[], - **kwargs, - ): + @staticmethod + def submit(fxn, *args, tags=[], **kwargs): # The *args and **kwargs input separates args into a tuple and kwargs into # a dictionary for me, which makes their storage very easy! @@ -63,23 +58,7 @@ def submit( # and return the future for use return future - def map(self, fxn, iterables, timeout=None, chunksize=100): # TODO - # chunksize indicates how many to add at one - # iterables is a list of (*args, **kwargs) - # add many fn(*args, **kwargs) to queue - - # TODO -- This is not supported at the moment. I should use the - # .bulk_create method to do this in the future: - # https://docs.djangoproject.com/en/3.1/ref/models/querysets/#bulk-create - - # raise an error to ensure user sees this isn't supported yet. - raise Exception("This method is not supported yet") - - def shutdown(self, wait=True, cancel_futures=False): # TODO - # whether to wait until the queue is empty - # whether to cancel futures and clear database - pass - + @staticmethod def wait(self, futures): """ Waits for all futures to complete before returning a list of their results @@ -100,16 +79,14 @@ def wait(self, futures): else: return [future.result() for future in futures] - # ------------------------------------------------------------------------ - # ------------------------------------------------------------------------ - # ------------------------------------------------------------------------ - + # ------------------------------------------------------------------------- # These methods are for managing and monitoring the queue # I attach these directly to the Executor rather than having a separate # DjangoQueue class that inherits from python's Queue module. # If there is a good reason to make a separate class in the future, # I can start from these methods here and the following link: # https://docs.python.org/3/library/queue.html + # ------------------------------------------------------------------------- def queue_size(self): """ @@ -146,3 +123,26 @@ def clear_finished(self, are_you_sure=False): raise Exception else: WorkItem.objects.filter(status="F").delete() + + # ------------------------------------------------------------------------- + # Extra methods to add if I want to be consistent with other Executor classes + # ------------------------------------------------------------------------- + + # @staticmethod + # def map(fxn, iterables, timeout=None, chunksize=100): # TODO + # # chunksize indicates how many to add at one + # # iterables is a list of (*args, **kwargs) + # # add many fn(*args, **kwargs) to queue + + # # TODO -- This is not supported at the moment. I should use the + # # .bulk_create method to do this in the future: + # # https://docs.djangoproject.com/en/3.1/ref/models/querysets/#bulk-create + + # # raise an error to ensure user sees this isn't supported yet. + # raise Exception("This method is not supported yet") + + # @staticmethod + # def shutdown(wait=True, cancel_futures=False): # TODO + # # whether to wait until the queue is empty + # # whether to cancel futures and clear database + # pass diff --git a/src/simmate/workflow_engine/execution/future.py b/src/simmate/workflow_engine/execution/future.py index 845f02103..742447f08 100644 --- a/src/simmate/workflow_engine/execution/future.py +++ b/src/simmate/workflow_engine/execution/future.py @@ -46,7 +46,7 @@ def cancel(self): workitem.save() return True - def cancelled(self): + def is_cancelled(self): """ Return True if the call was successfully cancelled. """ @@ -59,7 +59,7 @@ def cancelled(self): else: return False - def running(self): + def is_running(self): """ Return True if the call is currently being executed and cannot be cancelled. """ @@ -72,7 +72,7 @@ def running(self): else: return False - def done(self): + def is_done(self): """ Return True if the call was successfully cancelled or finished running. """ diff --git a/src/simmate/workflow_engine/execution_prefect/__init__.py b/src/simmate/workflow_engine/execution_prefect/__init__.py new file mode 100644 index 000000000..40a96afc6 --- /dev/null +++ b/src/simmate/workflow_engine/execution_prefect/__init__.py @@ -0,0 +1 @@ +# -*- coding: utf-8 -*- diff --git a/src/simmate/workflow_engine/worker.py b/src/simmate/workflow_engine/execution_prefect/worker.py similarity index 99% rename from src/simmate/workflow_engine/worker.py rename to src/simmate/workflow_engine/execution_prefect/worker.py index a06c3317e..d00ea4e1c 100644 --- a/src/simmate/workflow_engine/worker.py +++ b/src/simmate/workflow_engine/execution_prefect/worker.py @@ -25,7 +25,7 @@ """ -class Worker: +class PrefectWorker: """ A worker is a process that checks the Prefect database for scheduled workflow runs and then submits them. diff --git a/src/simmate/workflow_engine/execution_prefect/workflow.py b/src/simmate/workflow_engine/execution_prefect/workflow.py new file mode 100644 index 000000000..bb04b718f --- /dev/null +++ b/src/simmate/workflow_engine/execution_prefect/workflow.py @@ -0,0 +1,268 @@ +# -*- coding: utf-8 -*- + +from functools import cache # cached_property doesnt work with classmethod + +from prefect.tasks import task # present only for convience imports elsewhere +from prefect.flows import Flow +from prefect.states import State +from prefect.context import FlowRunContext +from prefect.client import get_client +from prefect.orion.schemas.filters import FlowFilter, FlowRunFilter +from prefect.packaging import OrionPackager +from prefect.packaging.serializers import PickleSerializer + +from simmate.utilities import async_to_sync + + +class PrefectWorkflow: + @classmethod + def run(cls, **kwargs) -> State: + """ + runs the workflow locally in a prefect context + """ + + subflow = cls._to_prefect_flow() + state = subflow(return_state=True, **kwargs) + + # We don't want to block and wait because this might disable parallel + # features of subflows. We therefore return the state and let the + # user decide if/when to block. + # result = state.result() + + return state + + @classmethod + def run_cloud(cls, return_future: bool = True, **kwargs) -> str: + """ + This schedules the workflow to run remotely on Prefect Cloud. + + #### Parameters + + - `labels`: + a list of labels to schedule the workflow with + + - `wait_for_run`: + whether to wait for the workflow to finish. If False, the workflow + will simply be submitted and then exit. The default is True. + + - `**kwargs`: + all options that are normally passed to the workflow.run() method + + #### Returns + + - The flow run id that was used in prefect cloud. + + + #### Usage + + Make sure you have Prefect properly configured and have registered your + workflow with the backend. + + Note that this method can be viewed as a fork of: + - from prefect.tasks.prefect.flow_run import create_flow_run + It can also be viewed as a more convenient way to call to client.create_flow_run. + I don't accept any other client.create_flow_run() inputs besides 'labels'. + This may change in the future if I need to set flow run names or schedules. + """ + + # Prefect does not properly deserialize objects that have + # as as_dict or to_dict method, so we use a custom method to do that here + parameters_serialized = cls._serialize_parameters(**kwargs) + # BUG: What if we are submitting using a filename? We don't want to + # submit to a cluster and have the job fail because it doesn't have + # access to the file. One solution could be to deserialize right before + # serializing in the next line in order to ensure parameters that + # accept file names are submitted with all necessary data. + + # Now we submit the workflow. + run_id = cls._submit_to_prefect(parameters=parameters_serialized) + + cls._register_calculation(run_id=run_id, **kwargs) + # BUG: Will there be a race condition here? What if the workflow finishes + # and tries writing to the databse before this is done? + # BUG: if parameters are improperly set, this line will fail, while the + # job submission (above) will suceed. Should I cancel the flow run if + # this occurs? + + if return_future: + raise Exception("Prefect cannot return futures from submisson.") + + # return the flow run_id for the user + return run_id + + @classmethod + @property + def tags(cls) -> list[str]: + """ + Lists of tags to submit a the workflow with when using run_cloud. + """ + return [ + "simmate", + cls.name_type, + cls.name_calculator, + ] + + @staticmethod + def _get_run_id(raise_if_no_context: bool = True): + """ + grabs the prefect flow run id from context (if there is one) + """ + # Grab the flow run id for reference. + run_context = FlowRunContext.get() + if run_context: + run_id = str(run_context.flow_run.id) + return run_id + elif not run_context and not raise_if_no_context: + return None # no error is raised + else: + raise Exception("Cannot detetect a Prefect FlowRunContext") + + @classmethod + @cache + def _to_prefect_flow(cls) -> Flow: + """ + Converts this workflow into a Prefect flow + """ + + # Instead of the @flow decorator, we build the flow instance directly + flow = Flow( + fn=cls._run_full, + name=cls.name_full, + version=cls.version, + # Skip type checking because I don't have robust typing yet + # e.g. Structure type inputs also accept inputs like a filename + validate_parameters=False, + ) + + # as an extra, we set this attribute to the prefect flow instance, which + # allows us to access the source Simmate Workflow easily with Prefect's + # context managers. + flow.simmate_workflow = cls + + return flow + + @classmethod + @property + @cache + @async_to_sync + async def depolyment_id_prefect(cls) -> str: + """ + Grabs the deployment id from the prefect database if it exists, and + if not, creates the depolyment and then returns the id. + + This is a synchronous and cached version of `_get_depolyment_id_prefect` and + this is the preferred method to use for beginners. + """ + return await cls._get_depolyment_id_prefect() + + @classmethod + async def _get_depolyment_id_prefect(cls) -> str: + """ + Grabs the deployment id from the prefect database if it exists, and + if not, creates the depolyment and then returns the id. + + This is an asynchronous method and should only be used when within + other async methods. Beginners should instead use the `depolyment_id_prefect` + property. + """ + + async with get_client() as client: + response = await client.read_deployments( + flow_filter=FlowFilter( + name={"any_": [cls.name_full]}, + ), + ) + + # If this is the first time accessing the deployment id, we will need + # to create the deployment + if not response: + deployment_id = await cls._create_deployment_prefect() + + # there should only be one deployment associated with this workflow + # if it's been deployed already. + elif len(response) == 1: + deployment_id = str(response[0].id) + + else: + raise Exception("There are duplicate deployments for this workflow!") + + return deployment_id + + @classmethod + async def _create_deployment_prefect(cls) -> str: + """ + Registers this workflow to the prefect database as a deployment. + + This method should not be called directly. It will be called by + other methods when appropriate + """ + + # raise error until python-deployments are supported again + raise Exception( + "Prefect 2.0 has removed the ability to create deployments in " + "python, so this feature is currently disabled." + ) + # When this is removed, be sure to re-add the test_workflow_cloud unittest + + from prefect.deployments import Deployment + + # NOTE: we do not use the client.create_deployment method because it + # is called within the Deployment.create() method for us. + deployment = Deployment( + name=cls.name_full, + flow=cls._to_prefect_flow(), + packager=OrionPackager(serializer=PickleSerializer()), + # OPTIMIZE: it would be better if I could figure out the ImportSerializer + # here. Only issue is that prefect would need to know to import AND + # call a method. + tags=cls.tags, + ) + + deployment_id = await deployment.create() + + return str(deployment_id) # convert from UUID to str first + + @classmethod + @async_to_sync + async def _submit_to_prefect(cls, **kwargs) -> str: + """ + Submits a flow run to prefect cloud. + + This method should not be used directly. Instead use `_run_prefect_cloud`. + """ + + # The reason we have this code as a separate method is because we want + # to isolate Prefect's async calls from Django's sync-restricted calls + # (i.e. django raises errors if called within an async context). + # Therefore, methods like `_run_prefect_cloud` can't have both this async code + # AND methods like _register_calculation that make sync database calls. + + async with get_client() as client: + response = await client.create_flow_run_from_deployment( + deployment_id=await cls._get_depolyment_id_prefect(), + **kwargs, + ) + + flow_run_id = str(response.id) + return flow_run_id + + @classmethod + @property + @async_to_sync + async def nflows_submitted(cls) -> int: + """ + Queries Prefect to see how many workflows are in a scheduled, running, + or pending state. + """ + + async with get_client() as client: + response = await client.read_flow_runs( + flow_filter=FlowFilter( + name={"any_": [cls.name_full]}, + ), + flow_run_filter=FlowRunFilter( + state={"type": {"any_": ["SCHEDULED", "PENDING", "RUNNING"]}} + ), + ) + + return len(response) diff --git a/src/simmate/workflow_engine/test/test_s3_workflow.py b/src/simmate/workflow_engine/test/test_s3_workflow.py index 20ab252fa..52cac9acd 100644 --- a/src/simmate/workflow_engine/test/test_s3_workflow.py +++ b/src/simmate/workflow_engine/test/test_s3_workflow.py @@ -21,8 +21,6 @@ import pytest -from prefect import flow - from simmate.workflow_engine import ErrorHandler, S3Workflow from simmate.workflow_engine.s3_workflow import ( NonZeroExitError, @@ -84,7 +82,7 @@ def terminate_job(self, directory, **kwargs): @pytest.mark.prefect_db -def test_s3task_methods(): +def test_s3workflow_methods(): class Customized__Testing__DummyWorkflow(S3Workflow): command = "echo dummy" use_database = False @@ -96,8 +94,6 @@ class Customized__Testing__DummyWorkflow(S3Workflow): workflow.show_config() # a print statment w. nothing else to check - workflow._to_prefect_flow() # unused for now - # Test basic run state = workflow.run() result = state.result() @@ -105,21 +101,12 @@ class Customized__Testing__DummyWorkflow(S3Workflow): assert result["directory"].exists() shutil.rmtree(result["directory"]) - # Test as a subflow - @flow - def test(): - state = workflow.run() - return state.result() - - state = test(return_state=True) - result = state.result() - assert state.is_completed() assert result["directory"].exists() shutil.rmtree(result["directory"]) -def test_s3task_1(): +def test_s3workflow_1(): # run a basic task w.o. any handlers or monitoring class Customized__Testing__DummyWorkflow(S3Workflow): @@ -139,7 +126,7 @@ class Customized__Testing__DummyWorkflow(S3Workflow): output["directory"].rmdir() -def test_s3task_2(): +def test_s3workflow_2(): # test file compression class Customized__Testing__DummyWorkflow(S3Workflow): @@ -159,7 +146,7 @@ class Customized__Testing__DummyWorkflow(S3Workflow): output["directory"].with_suffix(".zip").unlink() -def test_s3task_3(tmp_path): +def test_s3workflow_3(tmp_path): # Make a task with error handlers, monitoring, and specific directory class Customized__Testing__DummyWorkflow(S3Workflow): @@ -181,7 +168,7 @@ class Customized__Testing__DummyWorkflow(S3Workflow): } -def test_s3task_4(tmp_path): +def test_s3workflow_4(tmp_path): # test nonzero returncode class Customized__Testing__DummyWorkflow(S3Workflow): @@ -198,7 +185,7 @@ class Customized__Testing__DummyWorkflow(S3Workflow): ) -def test_s3task_5(tmp_path): +def test_s3workflow_5(tmp_path): # testing handler-triggered failures class Customized__Testing__DummyWorkflow(S3Workflow): @@ -215,7 +202,7 @@ class Customized__Testing__DummyWorkflow(S3Workflow): ) -def test_s3task_6(tmp_path): +def test_s3workflow_6(tmp_path): # monitor failure class Customized__Testing__DummyWorkflow(S3Workflow): @@ -232,7 +219,7 @@ class Customized__Testing__DummyWorkflow(S3Workflow): ) -def test_s3task_7(tmp_path): +def test_s3workflow_7(tmp_path): # special-monitor failure (non-terminating monitor) class Customized__Testing__DummyWorkflow(S3Workflow): @@ -249,7 +236,7 @@ class Customized__Testing__DummyWorkflow(S3Workflow): ) -def test_s3task_8(tmp_path): +def test_s3workflow_8(tmp_path): # check that monitor exits cleanly when retries are not allowed and no # workup method raises an error @@ -264,7 +251,7 @@ class Customized__Testing__DummyWorkflow(S3Workflow): assert len(result["corrections"]) == 1 -def test_s3task_9(tmp_path): +def test_s3workflow_9(tmp_path): # make sure an error is raised when a file is missing class Customized__Testing__DummyWorkflow(S3Workflow): @@ -277,3 +264,14 @@ class Customized__Testing__DummyWorkflow(S3Workflow): Customized__Testing__DummyWorkflow.run_config, directory=tmp_path, ) + + +# !!! Unitests to use with Prefect Executor +# Test as a subflow +# from prefect import flow +# @flow +# def test(): +# state = workflow.run() +# return state.result() +# state = test(return_state=True) +# result = state.result() diff --git a/src/simmate/workflow_engine/utilities.py b/src/simmate/workflow_engine/utilities.py index 9ba5e1aa4..d0b494872 100644 --- a/src/simmate/workflow_engine/utilities.py +++ b/src/simmate/workflow_engine/utilities.py @@ -1,23 +1,5 @@ # -*- coding: utf-8 -*- -from prefect.context import FlowRunContext - - -def get_prefect_id(raise_if_no_context: bool = True): - """ - grabs the prefect flow run id from context (if there is one) - """ - # Grab the flow run id for reference. - run_context = FlowRunContext.get() - if run_context: - run_id = str(run_context.flow_run.id) - return run_id - elif not run_context and not raise_if_no_context: - return None # no error is raised - else: - raise Exception("Cannot detetect a Prefect FlowRunContext") - - # OPTIMIZE: I needed to return a dictionary because Prefect struggles to handle # a list or tuple return in their workflow context. Maybe this will change in # Prefect Orion though. diff --git a/src/simmate/workflow_engine/workflow.py b/src/simmate/workflow_engine/workflow.py index 408a3d0c7..4a20d6605 100644 --- a/src/simmate/workflow_engine/workflow.py +++ b/src/simmate/workflow_engine/workflow.py @@ -271,26 +271,29 @@ class Example__Python__MyFavoriteSettings(Workflow): import inspect from typing import List, Any import uuid -from functools import cache # cached_property doesnt work with classmethod from pathlib import Path -from prefect.tasks import task # present only for convience imports elsewhere -from prefect.flows import Flow -from prefect.states import State -from prefect.context import FlowRunContext -from prefect.client import get_client -from prefect.orion.schemas.filters import FlowFilter, FlowRunFilter -from prefect.packaging import OrionPackager -from prefect.packaging.serializers import PickleSerializer - import simmate from simmate.database.base_data_types import Calculation -from simmate.utilities import async_to_sync, get_directory, copy_directory -from simmate.workflow_engine.execution import SimmateExecutor -from simmate.workflow_engine.utilities import get_prefect_id +from simmate.utilities import get_directory, copy_directory +from simmate.workflow_engine.execution import SimmateExecutor, WorkItem -# I disable prefect until I have a working deployment system -USE_PREFECT = False + +class DummyState: + """ + This class is meant to emulate Prefect States. By wrapping a result into + State, we enable higher-level features that depend on a call to + `state.result()`. + + This class should not be used directly as it is automatically applied with + the `Workflow.run` method + """ + + def __init__(self, result): + self._result = result + + def result(self): + return self._result class Workflow: @@ -333,6 +336,104 @@ class Workflow: `**kwargs` are passed and let's us gather the inputs in one place. """ + # ------------------------------------------------------------------------- + # Core methods that handle how and what a workflow run does + # and how it is submitted + # ------------------------------------------------------------------------- + + @classmethod + def run_config(cls): + """ + The workflow method, which can be overwritten when inheriting from this + class. This can be either a staticmethod or classmethod. + """ + raise NotImplementedError( + "When creating a custom workflow, make sure you set a run_config method!" + ) + + @classmethod + def _run_full(cls, run_id=None, **kwargs): + """ + This method should not be called directly. Use the `run` method instead. + """ + # This method is isolated only because we want to wrap it as a prefect + # workflow in some cases. + run_id = run_id or cls._get_run_id() + kwargs_cleaned = cls._load_input_and_register(run_id=run_id, **kwargs) + result = cls.run_config(**kwargs_cleaned) + if cls.use_database: + result["calculation_id"] = cls._save_to_database(result, run_id=run_id) + return result + + @staticmethod + def _get_run_id(): + """ + generates a random id to use as a workflow run id + """ + # This is a separate method in order to allow the prefect executor to + # overwrite this method. + unique_id = str(uuid.uuid4()) + return unique_id + + @classmethod + def run(cls, **kwargs) -> DummyState: + """ + runs the workflow locally + """ + # Note: this is a separate method and wrapper around run_full because + # we want to allow Prefect executor to overwrite this method. + print(f"Starting new run of {cls.name_full}") + result = cls._run_full(**kwargs) # no run_id as a new one will be made + print(f"Completed run of {cls.name_full}") + state = DummyState(result) + return state + + @classmethod + def run_cloud(cls, return_future: bool = True, **kwargs) -> str: + """ + submits the workflow run to cloud database to be ran by a worker + """ + + print(f"Submitting new run of {cls.name_full}") + + # Because we often want to save some info to our database even before + # the calculation starts/finishes, we do that by calling _register_calc + # at this higher level. An example is storing the structure and run id. + # Thus, we create and register the run_id up front + run_id = cls._get_run_id() + cls._register_calculation(run_id=run_id, **kwargs) + + future = SimmateExecutor.submit( + cls._run_full, # should this be the run method...? + run_id=run_id, + tags=cls.tags, + **kwargs, + ) + + # If the user wants the future, return that instead of the run_id + if return_future: + return future + + return run_id + + # ------------------------------------------------------------------------- + # Methods that interact with the Executor class in order to see what + # has been submitted to cloud. + # ------------------------------------------------------------------------- + + @classmethod + @property + def nflows_submitted(cls) -> int: + """ + Queries the Simmate database to see how many workflows are in a + running or pending state. + """ + return WorkItem.objects.filter(status__in=["P", "R"], tags=cls.tags).count() + + # ------------------------------------------------------------------------- + # Methods that help with accessing the database and saving results + # ------------------------------------------------------------------------- + @classmethod @property def database_table(cls) -> Calculation: @@ -388,100 +489,6 @@ def database_table(cls) -> Calculation: else: raise NotImplementedError("Unable to detect proper database table") - # ------------------------------------------------------------------------- - # Core methods that handle how and what a workflow run does. This include - # helper methods that integrate with Prefect. - # ------------------------------------------------------------------------- - - @classmethod - def run_config(cls): - """ - The workflow method, which can be overwritten when inheriting from this - class. This can be either a staticmethod or classmethod. - """ - raise NotImplementedError( - "When creating a custom workflow, make sure you set a run_config method!" - ) - - @classmethod - def _run_full(cls, run_id=None, **kwargs): - """ - This method should not be called directly. Use the `run` method instead. - """ - # This method is isolated only because we want to wrap it as a prefect - # workflow in some cases. - run_id = run_id or cls._get_run_id() - kwargs_cleaned = cls._load_input_and_register(run_id=run_id, **kwargs) - result = cls.run_config(**kwargs_cleaned) - if cls.use_database: - result["calculation_id"] = cls._save_to_database(result, run_id=run_id) - return result - - @staticmethod - def _get_run_id(): - if USE_PREFECT: - run_id = get_prefect_id() - else: - run_id = str(uuid.uuid4()) - return run_id - - @classmethod - def run(cls, **kwargs): - """ - runs the workflow locally - """ - if USE_PREFECT: - state = cls._run_prefect(**kwargs) - else: - print(f"Starting new run of {cls.name_full}") - result = cls._run_full(**kwargs) # no run_id as a new one will be made - print(f"Completed run of {cls.name_full}") - state = DummyState(result) - return state - - @classmethod - def run_cloud(cls, return_future: bool = True, **kwargs) -> str: - # Because we often want to save some info to our database even before - # the calculation starts/finishes, we do that with. An example is - # storing the structure and run id that we just submitted. - if USE_PREFECT: - - run_id = cls._run_prefect_cloud(**kwargs) - cls._register_calculation(run_id=run_id, **kwargs) - # BUG: Will there be a race condition here? What if the workflow finishes - # and tries writing to the databse before this is done? - # BUG: if parameters are improperly set, this line will fail, while the - # job submission (above) will suceed. Should I cancel the flow run if - # this occurs? - - if return_future: - raise Exception("Prefect cannot return futures from submisson.") - - return run_id - else: - print(f"Submitting new run of {cls.name_full}") - - run_id = cls._get_run_id() - cls._register_calculation(run_id=run_id, **kwargs) - - executor = SimmateExecutor() - future = executor.submit( - cls._run_full, - run_id=run_id, - tags=cls.tags, - **kwargs, - ) - # Would it be better to return the future...? - if return_future: - return future - - return run_id - - @classmethod - @property - async def nflows_submitted(cls) -> int: - raise NotImplementedError() - @classmethod def _save_to_database(cls, result, run_id): @@ -559,17 +566,13 @@ def tags(cls) -> list[str]: """ Lists of tags to submit a the workflow with when using run_cloud. """ - tags = [ + return [ "simmate", cls.name_type, cls.name_calculator, + cls.name_full, ] - if USE_PREFECT: - return tags - else: - return tags + [cls.name_full] - # ------------------------------------------------------------------------- # Properties/method that set website UI documentation and help users # explore input options and settings used. @@ -900,6 +903,9 @@ def _register_calculation(cls, run_id=None, **kwargs) -> Calculation: # we are using (and the database table linked to that workflow). if cls == Workflow: raise Exception("Checking if this method is ever used") + + from prefect.context import FlowRunContext + run_context = FlowRunContext.get() workflow = run_context.flow.simmate_workflow database_table = workflow.database_table @@ -1079,242 +1085,3 @@ def _deserialize_parameters( parameters_cleaned["directory"] = Path(parameters_cleaned["directory"]) return parameters_cleaned - - # ------------------------------------------------------------------------- - # Methods/properties that integrate with a Prefect server and enable - # the submission of workflows to cloud - # ------------------------------------------------------------------------- - - if USE_PREFECT: - - @classmethod - @cache - def _to_prefect_flow(cls) -> Flow: - """ - Converts this workflow into a Prefect flow - """ - - # Instead of the @flow decorator, we build the flow instance directly - flow = Flow( - fn=cls._run_full, - name=cls.name_full, - version=cls.version, - # Skip type checking because I don't have robust typing yet - # e.g. Structure type inputs also accept inputs like a filename - validate_parameters=False, - ) - - # as an extra, we set this attribute to the prefect flow instance, which - # allows us to access the source Simmate Workflow easily with Prefect's - # context managers. - flow.simmate_workflow = cls - - return flow - - @classmethod - def _run_prefect(cls, **kwargs) -> State: - """ - A convience method to run a workflow as a subflow in a prefect context. - """ - - subflow = cls._to_prefect_flow() - state = subflow(return_state=True, **kwargs) - - # We don't want to block and wait because this might disable parallel - # features of subflows. We therefore return the state and let the - # user decide if/when to block. - # result = state.result() - - return state - - @classmethod - def _run_prefect_cloud(cls, **kwargs) -> str: - """ - This schedules the workflow to run remotely on Prefect Cloud. - - #### Parameters - - - `labels`: - a list of labels to schedule the workflow with - - - `wait_for_run`: - whether to wait for the workflow to finish. If False, the workflow - will simply be submitted and then exit. The default is True. - - - `**kwargs`: - all options that are normally passed to the workflow.run() method - - #### Returns - - - The flow run id that was used in prefect cloud. - - - #### Usage - - Make sure you have Prefect properly configured and have registered your - workflow with the backend. - - Note that this method can be viewed as a fork of: - - from prefect.tasks.prefect.flow_run import create_flow_run - It can also be viewed as a more convenient way to call to client.create_flow_run. - I don't accept any other client.create_flow_run() inputs besides 'labels'. - This may change in the future if I need to set flow run names or schedules. - """ - - # Prefect does not properly deserialize objects that have - # as as_dict or to_dict method, so we use a custom method to do that here - parameters_serialized = cls._serialize_parameters(**kwargs) - # BUG: What if we are submitting using a filename? We don't want to - # submit to a cluster and have the job fail because it doesn't have - # access to the file. One solution could be to deserialize right before - # serializing in the next line in order to ensure parameters that - # accept file names are submitted with all necessary data. - - # Now we submit the workflow. - flow_run_id = cls._submit_to_prefect(parameters=parameters_serialized) - - # return the flow_run_id for the user - return flow_run_id - - @classmethod - @property - @cache - @async_to_sync - async def depolyment_id_prefect(cls) -> str: - """ - Grabs the deployment id from the prefect database if it exists, and - if not, creates the depolyment and then returns the id. - - This is a synchronous and cached version of `_get_depolyment_id_prefect` and - this is the preferred method to use for beginners. - """ - return await cls._get_depolyment_id_prefect() - - @classmethod - async def _get_depolyment_id_prefect(cls) -> str: - """ - Grabs the deployment id from the prefect database if it exists, and - if not, creates the depolyment and then returns the id. - - This is an asynchronous method and should only be used when within - other async methods. Beginners should instead use the `depolyment_id_prefect` - property. - """ - - async with get_client() as client: - response = await client.read_deployments( - flow_filter=FlowFilter( - name={"any_": [cls.name_full]}, - ), - ) - - # If this is the first time accessing the deployment id, we will need - # to create the deployment - if not response: - deployment_id = await cls._create_deployment_prefect() - - # there should only be one deployment associated with this workflow - # if it's been deployed already. - elif len(response) == 1: - deployment_id = str(response[0].id) - - else: - raise Exception("There are duplicate deployments for this workflow!") - - return deployment_id - - @classmethod - async def _create_deployment_prefect(cls) -> str: - """ - Registers this workflow to the prefect database as a deployment. - - This method should not be called directly. It will be called by - other methods when appropriate - """ - - # raise error until python-deployments are supported again - raise Exception( - "Prefect 2.0 has removed the ability to create deployments in " - "python, so this feature is currently disabled." - ) - # When this is removed, be sure to re-add the test_workflow_cloud unittest - - from prefect.deployments import Deployment - - # NOTE: we do not use the client.create_deployment method because it - # is called within the Deployment.create() method for us. - deployment = Deployment( - name=cls.name_full, - flow=cls._to_prefect_flow(), - packager=OrionPackager(serializer=PickleSerializer()), - # OPTIMIZE: it would be better if I could figure out the ImportSerializer - # here. Only issue is that prefect would need to know to import AND - # call a method. - tags=cls.tags, - ) - - deployment_id = await deployment.create() - - return str(deployment_id) # convert from UUID to str first - - @classmethod - @async_to_sync - async def _submit_to_prefect(cls, **kwargs) -> str: - """ - Submits a flow run to prefect cloud. - - This method should not be used directly. Instead use `_run_prefect_cloud`. - """ - - # The reason we have this code as a separate method is because we want - # to isolate Prefect's async calls from Django's sync-restricted calls - # (i.e. django raises errors if called within an async context). - # Therefore, methods like `_run_prefect_cloud` can't have both this async code - # AND methods like _register_calculation that make sync database calls. - - async with get_client() as client: - response = await client.create_flow_run_from_deployment( - deployment_id=await cls._get_depolyment_id_prefect(), - **kwargs, - ) - - flow_run_id = str(response.id) - return flow_run_id - - @classmethod - @property - @async_to_sync - async def nflows_submitted_prefect(cls) -> int: - """ - Queries Prefect to see how many workflows are in a scheduled, running, - or pending state. - """ - - async with get_client() as client: - response = await client.read_flow_runs( - flow_filter=FlowFilter( - name={"any_": [cls.name_full]}, - ), - flow_run_filter=FlowRunFilter( - state={"type": {"any_": ["SCHEDULED", "PENDING", "RUNNING"]}} - ), - ) - - return len(response) - - -class DummyState: - """ - This class is meant to emulate Prefect States. By wrapping a result in - this, we enable higher-level features that depend on a call to - `state.result()`. - - This class should not be used directly as it is automatically applied with - the `Workflow.run` method - """ - - def __init__(self, result): - self._result = result - - def result(self): - return self._result From 8caf6046f703da4ab74cae932c2248fd3891b4f0 Mon Sep 17 00:00:00 2001 From: Jack Sundberg Date: Tue, 9 Aug 2022 17:03:25 -0400 Subject: [PATCH 5/6] establish working unittests --- CHANGELOG.md | 2 + .../workflows/population_analysis/badelf.py | 12 +- .../workflows/population_analysis/bader.py | 7 +- .../vasp/workflows/test/test_all_paths.py | 3 +- .../command_line/test/test_worker_cli.py | 19 ++ .../command_line/test/test_workflows_cli.py | 7 +- src/simmate/command_line/workflow_engine.py | 158 ++++---------- .../command_line/workflow_engine_prefect.py | 193 ++++++++++++++++++ src/simmate/database/base_data_types/base.py | 13 ++ .../database/base_data_types/calculation.py | 10 +- .../database/base_data_types/relaxation.py | 2 +- .../test/test_calculation_db.py | 20 +- .../base_data_types/test/test_dynamics_db.py | 8 +- .../test/test_relaxation_db.py | 8 +- .../test/test_static_energy_db.py | 8 +- .../file_converters/structure/database.py | 12 +- .../evolution/database.py | 10 +- .../evolution/search_engine.py | 6 +- .../core_components/filters/calculation.py | 2 +- .../base_data_types/calculation.html | 2 +- .../base_filter_types/calculation.html | 2 +- .../workflow_engine/execution/README.md | 2 +- .../workflow_engine/execution/worker.py | 32 +-- .../workflow_engine/test/test_s3_workflow.py | 9 +- .../workflow_engine/test/test_workflow.py | 83 ++++---- src/simmate/workflow_engine/workflow.py | 42 +++- src/simmate/workflows/restart.py | 2 +- .../workflows/test/test_all_workflow_runs.py | 24 ++- src/simmate/workflows/utilities.py | 2 +- tutorials/05_Search_the_database.md | 2 +- 30 files changed, 440 insertions(+), 262 deletions(-) create mode 100644 src/simmate/command_line/test/test_worker_cli.py create mode 100644 src/simmate/command_line/workflow_engine_prefect.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 978ee2787..85853ce4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,11 +25,13 @@ There is one key exception to the rules above -- and that is with `MAJOR`=0 rele - add NPT and MatProj molecular dynamics workflows - add SCAN workflows for static energy and relaxation - test files can be provided within zip files, fixing excessive line counts on git commits +- add simmate worker that can run "out-of-box" and requires no set up **Refactors** - to simplify the creation of new workflows, `S3Task` is now `S3Workflow` and database tables are dynamically determined using the workflow name - workflows of a given type (e.g. relaxation or static-energy) now share a database tables in order to simplify overall database architecture - migrate from `os.path` to `pathlib.Path` throughout package +- isolate prefect use to separate executors **Fixes** - None diff --git a/src/simmate/calculators/vasp/workflows/population_analysis/badelf.py b/src/simmate/calculators/vasp/workflows/population_analysis/badelf.py index b14fca2a6..570221dc5 100644 --- a/src/simmate/calculators/vasp/workflows/population_analysis/badelf.py +++ b/src/simmate/calculators/vasp/workflows/population_analysis/badelf.py @@ -5,7 +5,7 @@ from pymatgen.analysis.structure_matcher import StructureMatcher from simmate.toolkit import Structure -from simmate.workflow_engine import task, Workflow +from simmate.workflow_engine import Workflow from simmate.database.third_parties import MatprojStructure from simmate.calculators.vasp.workflows.static_energy.matproj import ( StaticEnergy__Vasp__Matproj, @@ -52,7 +52,7 @@ def run_config( directory=prebadelf_result["directory"], ).result() - save_badelf_results(badelf_result, prebadelf_result["prefect_flow_run_id"]) + save_badelf_results(badelf_result, prebadelf_result["run_id"]) # ----------------------------------------------------------------------------- @@ -83,7 +83,6 @@ class PopulationAnalysis__Vasp__PrebadelfMatproj(StaticEnergy__Vasp__Matproj): ) -@task def get_structure_w_empties( structure, empty_ion_template, @@ -155,16 +154,15 @@ def get_structure_w_empties( # THIS IS A COPY/PASTE FROM THE BADER WORKFLOW -- I need to condense these -@task -def save_badelf_results(bader_result, prefect_flow_run_id): +def save_badelf_results(bader_result, run_id): # load the results. We are particullary after the first result with # is a pandas dataframe of oxidation states. oxidation_data, extra_data = bader_result["result"] # load the calculation entry for this workflow run. This should already # exist thanks to the load_input_and_register task of the prebader workflow - calculation = PopulationAnalysis__Bader__Badelf.database_table.from_prefect_context( - prefect_flow_run_id, + calculation = PopulationAnalysis__Bader__Badelf.database_table.from_run_context( + run_id, PopulationAnalysis__Vasp__PrebadelfMatproj.name_full, ) # BUG: can't use context to grab the id because workflow tasks generate a diff --git a/src/simmate/calculators/vasp/workflows/population_analysis/bader.py b/src/simmate/calculators/vasp/workflows/population_analysis/bader.py index eb9a07606..2711f5696 100644 --- a/src/simmate/calculators/vasp/workflows/population_analysis/bader.py +++ b/src/simmate/calculators/vasp/workflows/population_analysis/bader.py @@ -50,14 +50,17 @@ def run_config( return bader_result @classmethod - def _save_to_database(cls, bader_result): + def _save_to_database(cls, bader_result, run_id): # load the results. We are particullary after the first result with # is a pandas dataframe of oxidation states. oxidation_data, extra_data = bader_result["result"] # load the calculation entry for this workflow run. This should already # exist thanks to the load_input_and_register task of the prebader workflow - calculation = cls.database_table.from_prefect_context() + calculation = cls.database_table.from_run_context( + run_id=run_id, + workflow_name=cls.name_full, + ) # BUG: can't use context to grab the id because workflow tasks generate a # different id than the main workflow diff --git a/src/simmate/calculators/vasp/workflows/test/test_all_paths.py b/src/simmate/calculators/vasp/workflows/test/test_all_paths.py index 394dbdffe..7cc7876c2 100644 --- a/src/simmate/calculators/vasp/workflows/test/test_all_paths.py +++ b/src/simmate/calculators/vasp/workflows/test/test_all_paths.py @@ -9,9 +9,8 @@ Diffusion__Vasp__NebAllPathsMit, ) - +# @pytest.mark.prefect_db @pytest.mark.slow -@pytest.mark.prefect_db @pytest.mark.django_db def test_neb(sample_structures, tmp_path, mocker): diff --git a/src/simmate/command_line/test/test_worker_cli.py b/src/simmate/command_line/test/test_worker_cli.py new file mode 100644 index 000000000..9571e4238 --- /dev/null +++ b/src/simmate/command_line/test/test_worker_cli.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- + +import pytest + +from simmate.command_line.workflow_engine import workflow_engine + + +@pytest.mark.django_db +def test_database_dump_and_load(command_line_runner): + + # dump the database to json + result = command_line_runner.invoke(workflow_engine, ["start-singleflow-worker"]) + assert result.exit_code == 0 + + # load the database to json + result = command_line_runner.invoke( + workflow_engine, ["start-worker", "-n", "1", "-e", "true"] + ) + assert result.exit_code == 0 diff --git a/src/simmate/command_line/test/test_workflows_cli.py b/src/simmate/command_line/test/test_workflows_cli.py index be7e51faf..1fbd5836e 100644 --- a/src/simmate/command_line/test/test_workflows_cli.py +++ b/src/simmate/command_line/test/test_workflows_cli.py @@ -4,6 +4,7 @@ from simmate.calculators.vasp.inputs import Potcar from simmate.workflow_engine import Workflow +from simmate.workflow_engine.workflow import DummyState from simmate.command_line.workflows import workflows from simmate.conftest import make_dummy_files @@ -105,7 +106,7 @@ def test_workflows_run(command_line_runner, structure, mocker, tmp_path): mocker.patch.object( Workflow, "run", - return_value=Completed(), + return_value=DummyState(None), ) # the code above can be modified for a prefect executor # from prefect.states import Completed @@ -150,7 +151,7 @@ def test_workflows_run_cloud(command_line_runner, structure, mocker, tmp_path): mocker.patch.object( Workflow, "run_cloud", - return_value=Completed(), + return_value=DummyState(None), ) # the code above can be modified for a prefect executor # from prefect.states import Completed @@ -198,7 +199,7 @@ def test_workflows_run_yaml(command_line_runner, structure, mocker, tmp_path): mocker.patch.object( Workflow, "run", - return_value=Completed(), + return_value=DummyState(None), ) # the code above can be modified for a prefect executor # from prefect.states import Completed diff --git a/src/simmate/command_line/workflow_engine.py b/src/simmate/command_line/workflow_engine.py index 695fa92bd..003e66298 100644 --- a/src/simmate/command_line/workflow_engine.py +++ b/src/simmate/command_line/workflow_engine.py @@ -11,53 +11,57 @@ @click.group() def workflow_engine(): """ - A group of commands for starting up Prefect Agents and Dask Clusters. - There is also a simple "Simmate Cluster" that you can use too, but it - is only meant for quick testing. - - Setting up your computational resources can be tricky, so be sure to go - through our tutorial before trying this on your own: + A group of commands for starting up Simmate Workers, Prefect Agents, and + Dask Clusters. These are meant for setting up remote computational resources. """ pass @workflow_engine.command() @click.option( - "--queue_name", - "-q", - default=None, - help="the unique name to give the work queue", -) -@click.option( - "--concurrency_limit", - "-c", - default=1, - help="the max number of workflow runs to run in parallel", -) -@click.option( - "--nflows_max", + "--nitems_max", "-n", default=None, - help="the number of workflows runs to submit before shutdown", + type=int, + help="the number of task run to submit before shutdown", ) @click.option( "--timeout", "-t", default=None, + type=float, help="the time (in seconds) after which this worker will stop running jobs and shutdown", ) @click.option( "--close_on_empty_queue", "-e", default=False, + type=bool, help="whether to shutdown when the queue is empty", ) +@click.option( + "--waittime_on_empty_queue", + "-w", + default=1, + type=float, + help=( + "if the queue is empty, the time (in seconds) the worker should wait" + " before checking the queue again" + ), +) +@click.option( + "--tags", + "-t", + default=["simmate"], + help="tags to filter tasks by for submission. defaults to just 'simmate'", + multiple=True, +) def start_worker( - queue_name, - concurrency_limit, - nflows_max, + nitems_max, timeout, close_on_empty_queue, + waittime_on_empty_queue, + tags, ): """ This starts a Simmate Worker which will query the database for jobs to run. @@ -66,13 +70,13 @@ def start_worker( from simmate.workflow_engine import Worker worker = Worker( - queue_name=queue_name, - concurrency_limit=concurrency_limit, - nflows_max=nflows_max, - timeout=timeout, - close_on_empty_queue=close_on_empty_queue, + nitems_max, + timeout, + close_on_empty_queue, + waittime_on_empty_queue, + tags, ) - worker.run() + worker.start() @workflow_engine.command() @@ -85,103 +89,12 @@ def start_singleflow_worker(): this is such a common use-case, we include this command for convienence. It is the same as... - simmate workflow-engine start-worker -c 1 -n 1 -e True + simmate workflow-engine start-worker -n 1 -e True -t simmate """ from simmate.workflow_engine import Worker - worker = Worker( - concurrency_limit=1, - nflows_max=1, - close_on_empty_queue=True, - ) - worker.run() - - -@workflow_engine.command() -@click.option( - "--agent_name", - "-n", - help="the name of the agent that will be visible in prefect cloud", -) -@click.option( - "--agent_labels", - "-l", - help="labels that the agent will query for. To list multiple do... -l label1 -l label2", - multiple=True, -) -@click.option( - "--n_workers", - "-w", - help="the number of separate workers to run/submit", - type=int, -) -@click.option( - "--cpus_per_worker", - "-c", - help="the number of cpus that each worker will request", - type=int, -) -@click.option( - "--memory_per_worker", - "-m", - help="the amount of memory that each worker will request", -) -@click.option( - "--walltime_per_worker", - "-t", - help="the timelimit set for each worker", -) -def run_cluster( - agent_name, - agent_labels, - n_workers, - cpus_per_worker, - memory_per_worker, - walltime_per_worker, -): - """ - This starts up a Dask cluster and a Prefect Agent in order to run Simmate jobs. - - This convenience command is really only meant for basic purposes, as Dask - and Prefect teams offer their own commands with much more control. For - example, this command uses Prefect's LocalAgent, but there are other - advanced types available such as DockerAgent which may be better for your - use case. - - If you would like this cluster to run endlessly in the background, you can - submit it with something like "nohup simmate workflow-engine run-cluster &". - The "nohup" and "&" symbol together make it so this runs in the background AND - it won't shutdown if you close your terminal (or ssh). To stop this from running, - you'll now need to find the running process and kill it. Use something like - "ps -aef | grep simmate" to find the running process and grab its ID. Then - kill that process id with "kill 123" (if the id was 123). - """ - raise NotImplementedError("This method is still being ported to Prefect v2") - - # All input arguments are optional. Therefore, I go through all of them and - # remove the ones that weren't set. This prevents overwritting default settings - # at a lower level. Note, I also rename several parameters (e.g. "cpus_per_worker"). - # These different names for the CLI only exist to make things easier for beginners. - agent_kwargs = locals() # grabs all input parameters as a dict - possible_kwargs = list(agent_kwargs.keys()) # because we will be deleting keys - for key in possible_kwargs: - if not agent_kwargs[key]: - agent_kwargs.pop(key) - continue - # renaming several parameters... - if key == "cpus_per_worker": - agent_kwargs["job_cpu"] = agent_kwargs.pop(key) - elif key == "memory_per_worker": - agent_kwargs["job_mem"] = agent_kwargs.pop(key) - elif key == "walltime_per_job": - agent_kwargs["walltime"] = agent_kwargs.pop(key) - - from simmate.configuration.prefect.setup_resources import ( - run_cluster_and_agent, - ) - - run_cluster_and_agent(**agent_kwargs) + Worker.run_singleflow_worker() # explicitly list functions so that pdoc doesn't skip them @@ -189,5 +102,4 @@ def run_cluster( "workflow_engine", "start_worker", "start_singleflow_worker", - "run_cluster", ] diff --git a/src/simmate/command_line/workflow_engine_prefect.py b/src/simmate/command_line/workflow_engine_prefect.py new file mode 100644 index 000000000..695fa92bd --- /dev/null +++ b/src/simmate/command_line/workflow_engine_prefect.py @@ -0,0 +1,193 @@ +# -*- coding: utf-8 -*- + +""" +This defines commands for managing your Simmate workflow engine. All commands are +accessible through the "simmate workflow-engine" command. +""" + +import click + + +@click.group() +def workflow_engine(): + """ + A group of commands for starting up Prefect Agents and Dask Clusters. + There is also a simple "Simmate Cluster" that you can use too, but it + is only meant for quick testing. + + Setting up your computational resources can be tricky, so be sure to go + through our tutorial before trying this on your own: + """ + pass + + +@workflow_engine.command() +@click.option( + "--queue_name", + "-q", + default=None, + help="the unique name to give the work queue", +) +@click.option( + "--concurrency_limit", + "-c", + default=1, + help="the max number of workflow runs to run in parallel", +) +@click.option( + "--nflows_max", + "-n", + default=None, + help="the number of workflows runs to submit before shutdown", +) +@click.option( + "--timeout", + "-t", + default=None, + help="the time (in seconds) after which this worker will stop running jobs and shutdown", +) +@click.option( + "--close_on_empty_queue", + "-e", + default=False, + help="whether to shutdown when the queue is empty", +) +def start_worker( + queue_name, + concurrency_limit, + nflows_max, + timeout, + close_on_empty_queue, +): + """ + This starts a Simmate Worker which will query the database for jobs to run. + """ + + from simmate.workflow_engine import Worker + + worker = Worker( + queue_name=queue_name, + concurrency_limit=concurrency_limit, + nflows_max=nflows_max, + timeout=timeout, + close_on_empty_queue=close_on_empty_queue, + ) + worker.run() + + +@workflow_engine.command() +def start_singleflow_worker(): + """ + This starts a Simmate Worker that only runs one job and then shuts down. Also, + if no job is available in the queue, it will shut down. + + Note: this can be acheived using the start-worker command too, but because + this is such a common use-case, we include this command for convienence. + It is the same as... + + simmate workflow-engine start-worker -c 1 -n 1 -e True + """ + + from simmate.workflow_engine import Worker + + worker = Worker( + concurrency_limit=1, + nflows_max=1, + close_on_empty_queue=True, + ) + worker.run() + + +@workflow_engine.command() +@click.option( + "--agent_name", + "-n", + help="the name of the agent that will be visible in prefect cloud", +) +@click.option( + "--agent_labels", + "-l", + help="labels that the agent will query for. To list multiple do... -l label1 -l label2", + multiple=True, +) +@click.option( + "--n_workers", + "-w", + help="the number of separate workers to run/submit", + type=int, +) +@click.option( + "--cpus_per_worker", + "-c", + help="the number of cpus that each worker will request", + type=int, +) +@click.option( + "--memory_per_worker", + "-m", + help="the amount of memory that each worker will request", +) +@click.option( + "--walltime_per_worker", + "-t", + help="the timelimit set for each worker", +) +def run_cluster( + agent_name, + agent_labels, + n_workers, + cpus_per_worker, + memory_per_worker, + walltime_per_worker, +): + """ + This starts up a Dask cluster and a Prefect Agent in order to run Simmate jobs. + + This convenience command is really only meant for basic purposes, as Dask + and Prefect teams offer their own commands with much more control. For + example, this command uses Prefect's LocalAgent, but there are other + advanced types available such as DockerAgent which may be better for your + use case. + + If you would like this cluster to run endlessly in the background, you can + submit it with something like "nohup simmate workflow-engine run-cluster &". + The "nohup" and "&" symbol together make it so this runs in the background AND + it won't shutdown if you close your terminal (or ssh). To stop this from running, + you'll now need to find the running process and kill it. Use something like + "ps -aef | grep simmate" to find the running process and grab its ID. Then + kill that process id with "kill 123" (if the id was 123). + """ + raise NotImplementedError("This method is still being ported to Prefect v2") + + # All input arguments are optional. Therefore, I go through all of them and + # remove the ones that weren't set. This prevents overwritting default settings + # at a lower level. Note, I also rename several parameters (e.g. "cpus_per_worker"). + # These different names for the CLI only exist to make things easier for beginners. + agent_kwargs = locals() # grabs all input parameters as a dict + possible_kwargs = list(agent_kwargs.keys()) # because we will be deleting keys + for key in possible_kwargs: + if not agent_kwargs[key]: + agent_kwargs.pop(key) + continue + # renaming several parameters... + if key == "cpus_per_worker": + agent_kwargs["job_cpu"] = agent_kwargs.pop(key) + elif key == "memory_per_worker": + agent_kwargs["job_mem"] = agent_kwargs.pop(key) + elif key == "walltime_per_job": + agent_kwargs["walltime"] = agent_kwargs.pop(key) + + from simmate.configuration.prefect.setup_resources import ( + run_cluster_and_agent, + ) + + run_cluster_and_agent(**agent_kwargs) + + +# explicitly list functions so that pdoc doesn't skip them +__all__ = [ + "workflow_engine", + "start_worker", + "start_singleflow_worker", + "run_cluster", +] diff --git a/src/simmate/database/base_data_types/base.py b/src/simmate/database/base_data_types/base.py index 4f1232dd5..6b5d99d3a 100644 --- a/src/simmate/database/base_data_types/base.py +++ b/src/simmate/database/base_data_types/base.py @@ -196,6 +196,19 @@ def to_archive(self, filename: Union[str, Path] = None): # we can now delete the csv file csv_filename.unlink() + def filter_by_tags(self, tags: list[str]): + """ + A utility filter() method that + """ + + if tags: + new_query = self + for tag in tags: + new_query = new_query.filter(tags__icontains=tag) + else: + new_query = self.filter(tags=[]) + return new_query + # Copied this line from... # https://github.com/chrisdev/django-pandas/blob/master/django_pandas/managers.py diff --git a/src/simmate/database/base_data_types/calculation.py b/src/simmate/database/base_data_types/calculation.py index f513daf90..2d42efde5 100644 --- a/src/simmate/database/base_data_types/calculation.py +++ b/src/simmate/database/base_data_types/calculation.py @@ -20,7 +20,7 @@ class Meta: base_info = [ "directory", - "prefect_flow_run_id", + "run_id", "created_at", "updated_at", "corrections", @@ -86,12 +86,12 @@ def prefect_cloud_link(self) -> str: doesn't check to confirm it's been registered. To actually confirm that, use the `flow_run_view` attribute instead. """ - return f"https://cloud.prefect.io/simmate/flow-run/{self.prefect_flow_run_id}" + return f"https://cloud.prefect.io/simmate/flow-run/{self.run_id}" @property def flow_run_view(self): # -> FlowRunView """ - Checks if the prefect_flow_run_id was registered with Prefect Cloud, and + Checks if the run_id was registered with Prefect Cloud, and if so, returns a [FlowRunView](https://docs.prefect.io/orchestration/flow-runs/inspection.html) that hold metadata such as the status. This metadata includes... @@ -119,7 +119,7 @@ def flow_run_view(self): # -> FlowRunView @property def prefect_flow_run_name(self) -> str: """ - Gives the user-friendly name of this run if the prefect_flow_run_id + Gives the user-friendly name of this run if the run_id was registered with Prefect Cloud. (an example name is "friendly-bumblebee"). """ flowrunview = self.flow_run_view @@ -128,7 +128,7 @@ def prefect_flow_run_name(self) -> str: @property def prefect_state(self) -> str: """ - Gives the current state of this run if the prefect_flow_run_id + Gives the current state of this run if the run_id was registered with Prefect Cloud. (ex: "Scheduled" or "Successful") """ flowrunview = self.flow_run_view diff --git a/src/simmate/database/base_data_types/relaxation.py b/src/simmate/database/base_data_types/relaxation.py index fda5424cc..93ae92014 100644 --- a/src/simmate/database/base_data_types/relaxation.py +++ b/src/simmate/database/base_data_types/relaxation.py @@ -190,7 +190,7 @@ def from_vasp_run(cls, vasprun: Vasprun): # Note, the information does not matter at this point because it will be # populated below relaxation = cls.from_toolkit(structure=vasprun.structures[-1]) - # TODO: need to pull prefect_flow_run_id from metadata file. + # TODO: need to pull run_id from metadata file. # Now we have the relaxation data all loaded and can save it to the database relaxation.save() diff --git a/src/simmate/database/base_data_types/test/test_calculation_db.py b/src/simmate/database/base_data_types/test/test_calculation_db.py index 7b5b692ae..cebd6f429 100644 --- a/src/simmate/database/base_data_types/test/test_calculation_db.py +++ b/src/simmate/database/base_data_types/test/test_calculation_db.py @@ -12,16 +12,16 @@ def test_calculation_table(): TestCalculation.show_columns() # test writing to database - calc_db = TestCalculation.from_prefect_context( - prefect_flow_run_id="example-id-123", + calc_db = TestCalculation.from_run_context( + run_id="example-id-123", workflow_name="example.test.workflow", ) calc_db.save() # try grabbing the calculation again and make sure it loaded from the # database rather than creating a new entry - calc_db2 = TestCalculation.from_prefect_context( - prefect_flow_run_id="example-id-123", + calc_db2 = TestCalculation.from_run_context( + run_id="example-id-123", workflow_name="example.test.workflow", ) assert calc_db.id == calc_db2.id @@ -34,8 +34,8 @@ def test_calculation_table(): # and test incorrect passing with pytest.raises(Exception): - calc_db2 = TestCalculation.from_prefect_context( - prefect_flow_run_id="example-id-123", + calc_db2 = TestCalculation.from_run_context( + run_id="example-id-123", # workflow_name --> missing but required ) @@ -43,13 +43,13 @@ def test_calculation_table(): @pytest.mark.django_db def test_calculation_archives(): - calc_db = TestCalculation.from_prefect_context( - prefect_flow_run_id="example-id-123", + calc_db = TestCalculation.from_run_context( + run_id="example-id-123", workflow_name="example.test.workflow", ) calc_db.save() - calc_db2 = TestCalculation.from_prefect_context( - prefect_flow_run_id="example-id-123", + calc_db2 = TestCalculation.from_run_context( + run_id="example-id-123", workflow_name="example.test.workflow", ) calc_db2.save() diff --git a/src/simmate/database/base_data_types/test/test_dynamics_db.py b/src/simmate/database/base_data_types/test/test_dynamics_db.py index 8d5a20d0a..92dbf4eff 100644 --- a/src/simmate/database/base_data_types/test/test_dynamics_db.py +++ b/src/simmate/database/base_data_types/test/test_dynamics_db.py @@ -18,8 +18,8 @@ def test_static_energy_table(structure): DynamicsIonicStep.show_columns() # test writing to database - structure_db = DynamicsRun.from_prefect_context( - prefect_flow_run_id="example-id-123", + structure_db = DynamicsRun.from_run_context( + run_id="example-id-123", workflow_name="example.test.workflow", structure=structure, ) @@ -27,8 +27,8 @@ def test_static_energy_table(structure): # try grabbing the calculation again and make sure it loaded from the # database rather than creating a new entry - structure_db2 = DynamicsRun.from_prefect_context( - prefect_flow_run_id="example-id-123", + structure_db2 = DynamicsRun.from_run_context( + run_id="example-id-123", workflow_name="example.test.workflow", ) assert structure_db.id == structure_db.id diff --git a/src/simmate/database/base_data_types/test/test_relaxation_db.py b/src/simmate/database/base_data_types/test/test_relaxation_db.py index 7ba9eaf3a..56cabf7c4 100644 --- a/src/simmate/database/base_data_types/test/test_relaxation_db.py +++ b/src/simmate/database/base_data_types/test/test_relaxation_db.py @@ -18,8 +18,8 @@ def test_relaxation_table(structure): IonicStep.show_columns() # test writing to database - structure_db = Relaxation.from_prefect_context( - prefect_flow_run_id="example-id-123", + structure_db = Relaxation.from_run_context( + run_id="example-id-123", workflow_name="example.test.workflow", structure=structure, ) @@ -27,8 +27,8 @@ def test_relaxation_table(structure): # try grabbing the calculation again and make sure it loaded from the # database rather than creating a new entry - structure_db2 = Relaxation.from_prefect_context( - prefect_flow_run_id="example-id-123", + structure_db2 = Relaxation.from_run_context( + run_id="example-id-123", workflow_name="example.test.workflow", ) assert structure_db.id == structure_db2.id diff --git a/src/simmate/database/base_data_types/test/test_static_energy_db.py b/src/simmate/database/base_data_types/test/test_static_energy_db.py index 0ac888b2f..2daf72981 100644 --- a/src/simmate/database/base_data_types/test/test_static_energy_db.py +++ b/src/simmate/database/base_data_types/test/test_static_energy_db.py @@ -15,8 +15,8 @@ def test_static_energy_table(structure, tmp_path): StaticEnergy.show_columns() # test writing to database - structure_db = StaticEnergy.from_prefect_context( - prefect_flow_run_id="example-id-123", + structure_db = StaticEnergy.from_run_context( + run_id="example-id-123", workflow_name="example.test.workflow", structure=structure, ) @@ -24,8 +24,8 @@ def test_static_energy_table(structure, tmp_path): # try grabbing the calculation again and make sure it loaded from the # database rather than creating a new entry - structure_db2 = StaticEnergy.from_prefect_context( - prefect_flow_run_id="example-id-123", + structure_db2 = StaticEnergy.from_run_context( + run_id="example-id-123", workflow_name="example.test.workflow", ) assert structure_db.id == structure_db2.id diff --git a/src/simmate/file_converters/structure/database.py b/src/simmate/file_converters/structure/database.py index dd211fcc9..1f7319610 100644 --- a/src/simmate/file_converters/structure/database.py +++ b/src/simmate/file_converters/structure/database.py @@ -97,14 +97,14 @@ def get_toolkit_from_database_dict(structure_dict: dict) -> ToolkitStructure: # These attributes tells us which structure to grab from our datatable. # The user should have only provided one -- if they gave more, we just # use whichever one comes first. - prefect_flow_run_id = structure_dict.get("prefect_flow_run_id") + run_id = structure_dict.get("run_id") database_id = structure_dict.get("database_id") directory = structure_dict.get("directory") - # we must have either a prefect_flow_run_id or database_id - if not prefect_flow_run_id and not database_id and not directory: + # we must have either a run_id or database_id + if not run_id and not database_id and not directory: raise Exception( - "You must have either a prefect_flow_run_id, database_id, " + "You must have either a run_id, database_id, " "or directory provided if you want to load a structure from " "a previous calculation." ) @@ -113,9 +113,9 @@ def get_toolkit_from_database_dict(structure_dict: dict) -> ToolkitStructure: # are unique so all three should return a single calculation. if database_id: database_object = datatable.objects.get(id=database_id) - elif prefect_flow_run_id: + elif run_id: database_object = datatable.objects.get( - prefect_flow_run_id=prefect_flow_run_id, + run_id=run_id, ) elif directory: database_object = datatable.objects.get(directory=directory) diff --git a/src/simmate/toolkit/structure_prediction/evolution/database.py b/src/simmate/toolkit/structure_prediction/evolution/database.py index f1e5c85a2..6f12d378c 100644 --- a/src/simmate/toolkit/structure_prediction/evolution/database.py +++ b/src/simmate/toolkit/structure_prediction/evolution/database.py @@ -259,7 +259,7 @@ class Meta: settings = table_column.JSONField(default=dict) # This list limits to ids that are submitted or running - prefect_flow_run_ids = table_column.JSONField(default=list) + run_ids = table_column.JSONField(default=list) search = table_column.ForeignKey( EvolutionarySearch, @@ -269,7 +269,7 @@ class Meta: @staticmethod @async_to_sync - async def _check_still_running_ids(prefect_flow_run_ids): + async def _check_still_running_ids(run_ids): """ Queries Prefect to see check on a list of flow run ids and determines which ones are still in a scheduled, running, or pending state. @@ -291,7 +291,7 @@ async def _check_still_running_ids(prefect_flow_run_ids): async with get_client() as client: response = await client.read_flow_runs( flow_run_filter=FlowRunFilter( - id={"any_": prefect_flow_run_ids}, + id={"any_": run_ids}, state={"type": {"any_": ["SCHEDULED", "PENDING", "RUNNING"]}}, ), ) @@ -308,10 +308,10 @@ def update_flow_run_ids(self): """ # make the async call to Prefect client - still_running_ids = self._check_still_running_ids(self.prefect_flow_run_ids) + still_running_ids = self._check_still_running_ids(self.run_ids) # we now have our new list of IDs! Let's update it to the database - self.prefect_flow_run_ids = still_running_ids + self.run_ids = still_running_ids self.save() return still_running_ids diff --git a/src/simmate/toolkit/structure_prediction/evolution/search_engine.py b/src/simmate/toolkit/structure_prediction/evolution/search_engine.py index 8c05a239b..6329f82e1 100644 --- a/src/simmate/toolkit/structure_prediction/evolution/search_engine.py +++ b/src/simmate/toolkit/structure_prediction/evolution/search_engine.py @@ -470,14 +470,12 @@ def _check_steadystate_workflows(self): # Attached the flow_run_id to our source so we know how many # associated jobs are running. - source_db.prefect_flow_run_ids.append(flow_run_id) + source_db.run_ids.append(flow_run_id) source_db.save() # update the source on the calculation # TODO: use the flow run id from above to grab the calc - calculation = self.calculation_datatable.objects.get( - prefect_flow_run_id=flow_run_id - ) + calculation = self.calculation_datatable.objects.get(run_id=flow_run_id) calculation.source = f"{source.__class__.__name__}" calculation.source_id = parent_ids calculation.save() diff --git a/src/simmate/website/core_components/filters/calculation.py b/src/simmate/website/core_components/filters/calculation.py index a2a7dd32f..4d1ebfe2d 100644 --- a/src/simmate/website/core_components/filters/calculation.py +++ b/src/simmate/website/core_components/filters/calculation.py @@ -10,7 +10,7 @@ class Meta: model = CalculationTable fields = dict( directory=["exact"], - prefect_flow_run_id=["exact"], + run_id=["exact"], created_at=["range"], updated_at=["range"], ) diff --git a/src/simmate/website/templates/core_components/base_data_types/calculation.html b/src/simmate/website/templates/core_components/base_data_types/calculation.html index 0158e5a6f..3f7a4e191 100644 --- a/src/simmate/website/templates/core_components/base_data_types/calculation.html +++ b/src/simmate/website/templates/core_components/base_data_types/calculation.html @@ -7,7 +7,7 @@

Prefect Cloud:

if calculation.flow_run_view: name = calculation.prefect_flow_run_name # and link to calculation.prefect_cloud_link - id = calculation.prefect_flow_run_id + id = calculation.run_id state = calculation.prefect_state # and then a button that links to calculation.prefect_cloud_link else: diff --git a/src/simmate/website/templates/core_components/base_filter_types/calculation.html b/src/simmate/website/templates/core_components/base_filter_types/calculation.html index 758bad4b4..ace4963fb 100644 --- a/src/simmate/website/templates/core_components/base_filter_types/calculation.html +++ b/src/simmate/website/templates/core_components/base_filter_types/calculation.html @@ -9,7 +9,7 @@

- {{ form.prefect_flow_run_id | as_crispy_field }} + {{ form.run_id | as_crispy_field }} {{ form.directory | as_crispy_field }} {{ form.created_at__range | as_crispy_field }} {{ form.updated_at__range | as_crispy_field }} diff --git a/src/simmate/workflow_engine/execution/README.md b/src/simmate/workflow_engine/execution/README.md index 13b8731f3..e1b77fcc1 100644 --- a/src/simmate/workflow_engine/execution/README.md +++ b/src/simmate/workflow_engine/execution/README.md @@ -27,7 +27,7 @@ test() from simmate.workflow_engine.execution.worker import SimmateWorker -worker = SimmateWorker(waittime_on_empty_queue=1) # nitems_max=1 +worker = SimmateWorker(waittime_on_empty_queue=1, tags=[]) # nitems_max=1 worker.start() # ---------------------------------------------------------------------------- diff --git a/src/simmate/workflow_engine/execution/worker.py b/src/simmate/workflow_engine/execution/worker.py index 0e26b6f25..df6422e1c 100644 --- a/src/simmate/workflow_engine/execution/worker.py +++ b/src/simmate/workflow_engine/execution/worker.py @@ -39,7 +39,7 @@ def __init__( # wait_on_timeout=False, # TODO # settings on what to do when the queue is empty close_on_empty_queue: bool = False, - waittime_on_empty_queue: float = 60, + waittime_on_empty_queue: float = 1, tags: list[str] = ["simmate"], # should default be empty...? ): # the tags to query tasks for. If no tags were given, the worker will @@ -112,16 +112,13 @@ def start(self): # If we've made it this far, we're ready to grab a new WorkItem # and run it! # Query for PENDING WorkItems, lock it for editting, and update - # the status to RUNNING - query_results = WorkItem.objects.select_for_update().filter(status="P") - # filter down by tags - if self.tags: - for tag in self.tags: - query_results = query_results.filter(tags__icontains=tag) - else: - query_results = query_results.filter(tags=self.tags) - # and grab the first result - workitem = query_results.first() + # the status to RUNNING. And grab the first result + workitem = ( + WorkItem.objects.select_for_update() + .filter(status="P") + .filter_by_tags(self.tags) + .first() + ) # our lock exists only within this transation with transaction.atomic(): @@ -179,5 +176,16 @@ def queue_size(self): # !!! Should I include RUNNING in the count? If so I do that with... # from django.db.models import Q # ...filter(Q(status="P") | Q(status="R")) - queue_size = WorkItem.objects.filter(status="P").count() + queue_size = ( + WorkItem.objects.filter(status="P").filter_by_tags(self.tags).count() + ) return queue_size + + @classmethod + def run_singleflow_worker(cls): + worker = cls( + nitems_max=1, + close_on_empty_queue=True, + tags=["simmate"], + ) + worker.start() diff --git a/src/simmate/workflow_engine/test/test_s3_workflow.py b/src/simmate/workflow_engine/test/test_s3_workflow.py index 52cac9acd..df775c478 100644 --- a/src/simmate/workflow_engine/test/test_s3_workflow.py +++ b/src/simmate/workflow_engine/test/test_s3_workflow.py @@ -81,7 +81,7 @@ def terminate_job(self, directory, **kwargs): # ---------------------------------------------------------------------------- -@pytest.mark.prefect_db +# @pytest.mark.prefect_db def test_s3workflow_methods(): class Customized__Testing__DummyWorkflow(S3Workflow): command = "echo dummy" @@ -101,10 +101,6 @@ class Customized__Testing__DummyWorkflow(S3Workflow): assert result["directory"].exists() shutil.rmtree(result["directory"]) - assert state.is_completed() - assert result["directory"].exists() - shutil.rmtree(result["directory"]) - def test_s3workflow_1(): # run a basic task w.o. any handlers or monitoring @@ -275,3 +271,6 @@ class Customized__Testing__DummyWorkflow(S3Workflow): # return state.result() # state = test(return_state=True) # result = state.result() +# assert state.is_completed() +# assert result["directory"].exists() +# shutil.rmtree(result["directory"]) diff --git a/src/simmate/workflow_engine/test/test_workflow.py b/src/simmate/workflow_engine/test/test_workflow.py index b800ff0ff..86978c2ea 100644 --- a/src/simmate/workflow_engine/test/test_workflow.py +++ b/src/simmate/workflow_engine/test/test_workflow.py @@ -2,20 +2,10 @@ import pytest -from simmate.workflow_engine import Workflow, task +from simmate.workflow_engine import Workflow from simmate.website.test_app.models import TestCalculation -@task -def dummy_task_1(a): - return 1 - - -@task -def dummy_task_2(a): - return 2 - - class DummyProject__DummyCaclulator__DummyPreset(Workflow): """ Minimal example of a workflow @@ -31,23 +21,15 @@ class DummyProject__DummyCaclulator__DummyPreset(Workflow): @staticmethod def run_config(source=None, structure=None, **kwargs): - x = dummy_task_1(source) - y = dummy_task_2(structure) - return x + y + return 1 + 2 # copy to variable for shorthand use DummyFlow = DummyProject__DummyCaclulator__DummyPreset -@pytest.mark.prefect_db @pytest.mark.django_db def test_workflow(tmp_path): - # Run the workflow just like you would for the base Prefect class - flow = DummyFlow._to_prefect_flow() - state = flow(return_state=True, directory=tmp_path) - assert state.is_completed() - assert state.result() == 3 # Same exact thing but using higher-level method state = DummyFlow.run(directory=tmp_path) @@ -64,10 +46,7 @@ def test_workflow(tmp_path): assert DummyFlow.description_doc == DummyFlow.__doc__ assert DummyFlow.description_doc.strip() == "Minimal example of a workflow" assert DummyFlow.parameter_names == ["source", "structure"] - assert DummyFlow._parameters_to_register == [ - "prefect_flow_run_id", - "source", - ] + assert DummyFlow._parameters_to_register == ["source"] DummyFlow.show_parameters() # a print statment w. nothing else to check assert isinstance(DummyFlow.get_config(), dict) @@ -75,26 +54,6 @@ def test_workflow(tmp_path): DummyFlow.show_config() # a print statment w. nothing else to check -# @pytest.mark.prefect_db -# @pytest.mark.django_db -# def test_workflow_cloud(mocker, sample_structures): - -# # test cloud properties -# deployment_id = DummyFlow.deployment_id -# assert isinstance(deployment_id, str) -# # we dont check the actual value bc its randomly generated - -# n = DummyFlow.nflows_submitted -# assert isinstance(n, int) - -# # to test serialization of input parameters we grab a toolkit object -# structure = sample_structures["C_mp-48_primitive"] - -# # Run the workflow through prefect cloud -# flow_id = DummyFlow.run_cloud(structure=structure) -# assert isinstance(flow_id, str) - - def test_serialize_parameters(): class TestParameter1: def to_dict(self): @@ -138,3 +97,39 @@ def test_deserialize_parameters(mocker): } Workflow._deserialize_parameters(**example_parameters) + + +# !!! These tests below are for the Prefect Executor, which is disabled at the moment + +# @task +# def dummy_task_1(a): +# return 1 +# @task +# def dummy_task_2(a): +# return 2 +# @staticmethod +# def run_config(source=None, structure=None, **kwargs): +# x = dummy_task_1(source) +# y = dummy_task_2(structure) +# return x + y + +# # Run the workflow just like you would for the base Prefect class +# flow = DummyFlow._to_prefect_flow() +# state = flow(return_state=True, directory=tmp_path) +# assert state.is_completed() +# assert state.result() == 3 + +# @pytest.mark.prefect_db +# @pytest.mark.django_db +# def test_workflow_cloud(mocker, sample_structures): +# # test cloud properties +# deployment_id = DummyFlow.deployment_id +# assert isinstance(deployment_id, str) +# # we dont check the actual value bc its randomly generated +# n = DummyFlow.nflows_submitted +# assert isinstance(n, int) +# # to test serialization of input parameters we grab a toolkit object +# structure = sample_structures["C_mp-48_primitive"] +# # Run the workflow through prefect cloud +# flow_id = DummyFlow.run_cloud(structure=structure) +# assert isinstance(flow_id, str) diff --git a/src/simmate/workflow_engine/workflow.py b/src/simmate/workflow_engine/workflow.py index 4a20d6605..d01ffab59 100644 --- a/src/simmate/workflow_engine/workflow.py +++ b/src/simmate/workflow_engine/workflow.py @@ -63,7 +63,7 @@ using the `run_cloud` method, which returns a Prefect flow run id. ``` python -prefect_flow_run_id = workflow.run_cloud( +run_id = workflow.run_cloud( structure="NaCl.cif", command="mpirun -n 4 vasp_std > vasp.out", ) @@ -84,7 +84,7 @@ df = table.obects.to_dataframe() # or grab a specific run result and convert to a toolkit object -entry = table.objects.get(prefect_flow_run_id="example-123456") +entry = table.objects.get(run_id="example-123456") structure = entry.to_toolkit() ``` @@ -295,6 +295,10 @@ def __init__(self, result): def result(self): return self._result + @staticmethod + def is_completed(): + return True + class Workflow: """ @@ -382,9 +386,9 @@ def run(cls, **kwargs) -> DummyState: """ # Note: this is a separate method and wrapper around run_full because # we want to allow Prefect executor to overwrite this method. - print(f"Starting new run of {cls.name_full}") + print(f"Starting {cls.name_full}") result = cls._run_full(**kwargs) # no run_id as a new one will be made - print(f"Completed run of {cls.name_full}") + print(f"Completed {cls.name_full}") state = DummyState(result) return state @@ -396,6 +400,14 @@ def run_cloud(cls, return_future: bool = True, **kwargs) -> str: print(f"Submitting new run of {cls.name_full}") + # If we are submitting using a filename, we don't want to + # submit to a cluster and have the job fail because it doesn't have + # access to the file. We therefore deserialize right before + # serializing in the next line in order to ensure parameters that + # accept file names are submitted with all necessary data. + parameters_deserialized = cls._deserialize_parameters(**kwargs) + parameters_serialized = cls._serialize_parameters(**parameters_deserialized) + # Because we often want to save some info to our database even before # the calculation starts/finishes, we do that by calling _register_calc # at this higher level. An example is storing the structure and run id. @@ -407,7 +419,7 @@ def run_cloud(cls, return_future: bool = True, **kwargs) -> str: cls._run_full, # should this be the run method...? run_id=run_id, tags=cls.tags, - **kwargs, + **parameters_serialized, ) # If the user wants the future, return that instead of the run_id @@ -438,7 +450,7 @@ def nflows_submitted(cls) -> int: @property def database_table(cls) -> Calculation: """ - The database table where calculation information (such as the prefect_flow_run_id) + The database table where calculation information (such as the run_id) is stored. The table should use `simmate.database.base_data_types.Calculation` In many cases, this table will contain all of the results you need. However, @@ -462,7 +474,9 @@ def database_table(cls) -> Calculation: return BandStructureCalc elif "density-of-states" in flow_preset: - from simmate.database.base_data_types import DensityofStatesCalc + from simmate.database.base_data_types import ( + DensityofStatesCalc, + ) return DensityofStatesCalc elif flow_type == "population-analysis": @@ -716,7 +730,11 @@ def _load_input_and_register(cls, run_id: str, **parameters: Any) -> dict: # the primary input. I go through each one at a time until I find one # that was provided -- then I exit with that parameter's value. primary_input = None - for primary_input_key in ["structure", "migration_hop", "supercell_start"]: + for primary_input_key in [ + "structure", + "migration_hop", + "supercell_start", + ]: primary_input = parameters.get(primary_input_key, None) primary_input_cleaned = parameters_cleaned.get(primary_input_key, None) if primary_input: @@ -752,7 +770,7 @@ def _load_input_and_register(cls, run_id: str, **parameters: Any) -> dict: ) # the past directory should be stored on the input object - previous_directory = primary_input_cleaned.database_object.directory + previous_directory = Path(primary_input_cleaned.database_object.directory) # Copy over all files except simmate ones (we have no need for the # summaries or error archives) @@ -1004,7 +1022,9 @@ def _serialize_parameters(**parameters) -> dict: @classmethod def _deserialize_parameters( - cls, add_defaults_from_attr: bool = True, **parameters + cls, + add_defaults_from_attr: bool = True, + **parameters, ) -> dict: """ converts all parameters to appropriate python objects @@ -1081,7 +1101,7 @@ def _deserialize_parameters( parameters["supercell_end"] ) - if "directory" in parameters.keys(): + if parameters.get("directory", None): parameters_cleaned["directory"] = Path(parameters_cleaned["directory"]) return parameters_cleaned diff --git a/src/simmate/workflows/restart.py b/src/simmate/workflows/restart.py index 164edfb86..e15012e8f 100644 --- a/src/simmate/workflows/restart.py +++ b/src/simmate/workflows/restart.py @@ -39,7 +39,7 @@ def run_config(cls, directory_old: Path, directory_new: Path = None): input_parameters = metadata.copy() # remove settings that will be reset elsewhere - input_parameters.pop("prefect_flow_run_id", None) + input_parameters.pop("run_id", None) input_parameters.pop("directory", None) input_parameters.pop("copy_previous_directory", None) input_parameters.pop("is_restart", None) diff --git a/src/simmate/workflows/test/test_all_workflow_runs.py b/src/simmate/workflows/test/test_all_workflow_runs.py index 8a4e81863..967bf1ca0 100644 --- a/src/simmate/workflows/test/test_all_workflow_runs.py +++ b/src/simmate/workflows/test/test_all_workflow_runs.py @@ -8,15 +8,33 @@ ) +# @pytest.mark.prefect_db @pytest.mark.vasp -@pytest.mark.prefect_db @pytest.mark.django_db def test_all_workflow_runs(tmp_path, sample_structures): # For testing, look at the NaCl rocksalt primitive structure structure = sample_structures["NaCl_mp-22862_primitive"] - with tmp_path.as_cwd(): + # ------------- + # https://stackoverflow.com/questions/41742317/ + import os + import contextlib + from pathlib import Path + + @contextlib.contextmanager + def working_directory(path): + """Changes working directory and returns to previous on exit.""" + prev_cwd = Path.cwd() + os.chdir(path) + try: + yield + finally: + os.chdir(prev_cwd) + + # ----------------- + + with working_directory(tmp_path): successful_flows = [] @@ -86,7 +104,7 @@ def test_all_workflow_runs(tmp_path, sample_structures): # TEST NEB FLOWS # For testing, look at I- diffusion in Y2CI2 (takes roughly 1 hr) structure = sample_structures["Y2CI2_mp-1206803_primitive"] - workflow_name = "diffusion.vasp.neb-all-paths" + workflow_name = "diffusion.vasp.neb-all-paths-mit" workflow = get_workflow(workflow_name) state = workflow.run( structure=structure, diff --git a/src/simmate/workflows/utilities.py b/src/simmate/workflows/utilities.py index b2567424e..24ded463e 100644 --- a/src/simmate/workflows/utilities.py +++ b/src/simmate/workflows/utilities.py @@ -252,7 +252,7 @@ def load_results_from_directories(base_directory: Union[str, Path] = "."): # use the metadata to update the other fields results_db.source = metadata["source"] - results_db.prefect_flow_run_id = metadata["prefect_flow_run_id"] + results_db.run_id = metadata["run_id"] # note the directory might have been moved from when this was originally # ran vs where it is now. Therefore, we update the folder location here. diff --git a/tutorials/05_Search_the_database.md b/tutorials/05_Search_the_database.md index ae17e87f7..6181a8bd1 100644 --- a/tutorials/05_Search_the_database.md +++ b/tutorials/05_Search_the_database.md @@ -164,7 +164,7 @@ table.show_columns() - formula_anonymous - spacegroup (relation to Spacegroup) - directory -- prefect_flow_run_id +- run_id - created_at - updated_at - corrections From 9a3e3909dc2eb6c9d5b02376748c243a6af0abc5 Mon Sep 17 00:00:00 2001 From: Jack Sundberg Date: Tue, 9 Aug 2022 17:14:33 -0400 Subject: [PATCH 6/6] update workflows tested --- .../workflows/test/test_all_workflow_runs.py | 49 +++++++++++-------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/src/simmate/workflows/test/test_all_workflow_runs.py b/src/simmate/workflows/test/test_all_workflow_runs.py index 967bf1ca0..b63587d13 100644 --- a/src/simmate/workflows/test/test_all_workflow_runs.py +++ b/src/simmate/workflows/test/test_all_workflow_runs.py @@ -103,22 +103,22 @@ def working_directory(path): # TEST NEB FLOWS # For testing, look at I- diffusion in Y2CI2 (takes roughly 1 hr) - structure = sample_structures["Y2CI2_mp-1206803_primitive"] - workflow_name = "diffusion.vasp.neb-all-paths-mit" - workflow = get_workflow(workflow_name) - state = workflow.run( - structure=structure, - migrating_specie="I", - command="mpirun -n 12 vasp_std > vasp.out", - directory=str(tmp_path), - nimages=1, - min_atoms=10, - max_atoms=25, - min_length=4, - ) - state.result() - if state.is_completed(): - successful_flows.append(workflow_name) + # structure = sample_structures["Y2CI2_mp-1206803_primitive"] + # workflow_name = "diffusion.vasp.neb-all-paths-mit" + # workflow = get_workflow(workflow_name) + # state = workflow.run( + # structure=structure, + # migrating_specie="I", + # command="mpirun -n 12 vasp_std > vasp.out", + # directory=str(tmp_path), + # nimages=1, + # min_atoms=10, + # max_atoms=25, + # min_length=4, + # ) + # state.result() + # if state.is_completed(): + # successful_flows.append(workflow_name) # check which flows either (1) failed or (2) weren't tested all_flows = get_list_of_all_workflows() @@ -126,9 +126,18 @@ def working_directory(path): missing_failed_flows.sort() assert missing_failed_flows == [ - "diffusion.vasp.neb-all-paths", - "diffusion.vasp.neb-from-endpoints", - "diffusion.vasp.neb-from-images", - "diffusion.vasp.neb-single-path", + "diffusion.vasp.neb-all-paths-mit", + "diffusion.vasp.neb-from-endpoints-mit", + "diffusion.vasp.neb-from-images-mit", + "diffusion.vasp.neb-single-path-mit", + "electronic-structure.vasp.matproj-hse-full" "population-analysis.vasp.badelf-matproj", + "relaxation.vasp.matproj-hse", + "relaxation.vasp.matproj-metal", + "relaxation.vasp.matproj-scan", + "relaxation.vasp.mvl-grainboundary", + "relaxation.vasp.mvl-slab", + "restart.simmate.automatic", + "static-energy.vasp.matproj-hse", + "static-energy.vasp.matproj-scan", ]