From 99947670431ea6514bcdedf02479e17248baeab6 Mon Sep 17 00:00:00 2001 From: Andrii Ieroshenko Date: Mon, 7 Oct 2024 14:07:22 -0700 Subject: [PATCH] add workflow execution handlers and endpoints --- jupyter_scheduler/executors.py | 34 ++++++++++- jupyter_scheduler/extension.py | 8 ++- jupyter_scheduler/orm.py | 2 +- jupyter_scheduler/scheduler.py | 87 ++++++++++++++++++++++++++-- jupyter_scheduler/workflows.py | 102 +++++++++++++++++++++++++++------ 5 files changed, 206 insertions(+), 27 deletions(-) diff --git a/jupyter_scheduler/executors.py b/jupyter_scheduler/executors.py index 7be02242..159ce4cc 100644 --- a/jupyter_scheduler/executors.py +++ b/jupyter_scheduler/executors.py @@ -16,11 +16,11 @@ from prefect.futures import as_completed from jupyter_scheduler.models import CreateJob, DescribeJob, JobFeature, Status -from jupyter_scheduler.orm import Job, Workflow, create_session +from jupyter_scheduler.orm import Job, Workflow, WorkflowDefinition, create_session from jupyter_scheduler.parameterize import add_parameters from jupyter_scheduler.scheduler import Scheduler from jupyter_scheduler.utils import get_utc_timestamp -from jupyter_scheduler.workflows import DescribeWorkflow +from jupyter_scheduler.workflows import DescribeWorkflow, DescribeWorkflowDefinition class ExecutionManager(ABC): @@ -40,11 +40,13 @@ def __init__( db_url: str, job_id: str = None, workflow_id: str = None, + workflow_definition_id: str = None, root_dir: str = None, staging_paths: Dict[str, str] = None, ): self.job_id = job_id self.workflow_id = workflow_id + self.workflow_definition_id = workflow_definition_id self.staging_paths = staging_paths self.root_dir = root_dir self.db_url = db_url @@ -58,6 +60,17 @@ def model(self): ) self._model = DescribeWorkflow.from_orm(workflow) return self._model + if self.workflow_definition_id: + with self.db_session() as session: + workflow_definition = ( + session.query(WorkflowDefinition) + .filter( + WorkflowDefinition.workflow_definition_id == self.workflow_definition_id + ) + .first() + ) + self._model = DescribeWorkflowDefinition.from_orm(workflow_definition) + return self._model if self._model is None: with self.db_session() as session: job = session.query(Job).filter(Job.job_id == self.job_id).first() @@ -187,6 +200,23 @@ def on_complete_workflow(self): class DefaultExecutionManager(ExecutionManager): """Default execution manager that executes notebooks""" + def activate_workflow_definition(self): + workflow_definition = self.model + with self.db_session() as session: + session.query(WorkflowDefinition).filter( + WorkflowDefinition.workflow_definition_id + == workflow_definition.workflow_definition_id + ).update({"active": True}) + session.commit() + workflow_definition = ( + session.query(WorkflowDefinition) + .filter( + WorkflowDefinition.workflow_definition_id + == workflow_definition.workflow_definition_id + ) + .first() + ) + @task(name="Execute workflow task") def execute_task(self, job: Job): with self.db_session() as session: diff --git a/jupyter_scheduler/extension.py b/jupyter_scheduler/extension.py index 57d934ad..4aea3430 100644 --- a/jupyter_scheduler/extension.py +++ b/jupyter_scheduler/extension.py @@ -7,6 +7,7 @@ from jupyter_scheduler.orm import create_tables from jupyter_scheduler.workflows import ( + WorkflowDefinitionsActivationHandler, WorkflowDefinitionsHandler, WorkflowDefinitionsTasksHandler, WorkflowsHandler, @@ -54,16 +55,17 @@ class SchedulerApp(ExtensionApp): rf"scheduler/worklows/{WORKFLOW_ID_REGEX}/tasks", WorkflowsTasksHandler, ), + (r"scheduler/worklow_definitions", WorkflowDefinitionsHandler), ( rf"scheduler/worklow_definitions/{WORKFLOW_DEFINITION_ID_REGEX}", WorkflowDefinitionsHandler, ), ( - rf"scheduler/worklows/{WORKFLOW_DEFINITION_ID_REGEX}/run", - WorkflowDefinitionsHandler, + rf"scheduler/worklow_definitions/{WORKFLOW_DEFINITION_ID_REGEX}/activate", + WorkflowDefinitionsActivationHandler, ), ( - rf"scheduler/worklows/{WORKFLOW_ID_REGEX}/tasks", + rf"scheduler/worklow_definitions/{WORKFLOW_DEFINITION_ID_REGEX}/tasks", WorkflowDefinitionsTasksHandler, ), ] diff --git a/jupyter_scheduler/orm.py b/jupyter_scheduler/orm.py index 3d14dede..08bfa4e7 100644 --- a/jupyter_scheduler/orm.py +++ b/jupyter_scheduler/orm.py @@ -123,7 +123,7 @@ class Workflow(Base): class WorkflowDefinition(Base): __tablename__ = "workflow_definitions" __table_args__ = {"extend_existing": True} - workflow_id = Column(String(36), primary_key=True, default=generate_uuid) + workflow_definition_id = Column(String(36), primary_key=True, default=generate_uuid) tasks = Column(JsonType) status = Column(String(64), default=Status.CREATED) active = Column(Boolean, default=False) diff --git a/jupyter_scheduler/scheduler.py b/jupyter_scheduler/scheduler.py index 5ee99de4..e8dc1c15 100644 --- a/jupyter_scheduler/scheduler.py +++ b/jupyter_scheduler/scheduler.py @@ -37,13 +37,20 @@ UpdateJob, UpdateJobDefinition, ) -from jupyter_scheduler.orm import Job, JobDefinition, Workflow, create_session +from jupyter_scheduler.orm import Job, JobDefinition, Workflow, WorkflowDefinition, create_session from jupyter_scheduler.utils import ( copy_directory, create_output_directory, create_output_filename, ) -from jupyter_scheduler.workflows import CreateWorkflow, DescribeWorkflow, UpdateWorkflow +from jupyter_scheduler.workflows import ( + CreateWorkflow, + CreateWorkflowDefinition, + DescribeWorkflow, + DescribeWorkflowDefinition, + UpdateWorkflow, + UpdateWorkflowDefinition, +) class BaseScheduler(LoggingConfigurable): @@ -117,6 +124,10 @@ def run_workflow(self, workflow_id: str) -> str: """Triggers execution of the workflow.""" raise NotImplementedError("must be implemented by subclass") + def activate_workflow_definition(self, workflow_definition_id: str) -> str: + """Activates workflow marking it as ready for execution.""" + raise NotImplementedError("must be implemented by subclass") + def get_workflow(self, workflow_id: str) -> DescribeWorkflow: """Returns workflow record for a single workflow.""" raise NotImplementedError("must be implemented by subclass") @@ -125,6 +136,12 @@ def create_workflow_task(self, workflow_id: str, model: CreateJob) -> str: """Adds a task to a workflow.""" raise NotImplementedError("must be implemented by subclass") + def create_workflow_definition_task( + self, workflow_definition_id: str, model: CreateJobDefinition + ) -> str: + """Adds a task to a workflow definition.""" + raise NotImplementedError("must be implemented by subclass") + def update_job(self, job_id: str, model: UpdateJob): """Updates job metadata in the persistence store, for example name, status etc. In case of status @@ -176,6 +193,13 @@ def create_job_definition(self, model: CreateJobDefinition) -> str: """ raise NotImplementedError("must be implemented by subclass") + def create_workflow_definition(self, model: CreateWorkflowDefinition) -> str: + """Creates a new workflow definition record, + consider this as the template for creating + recurring/scheduled workflows. + """ + raise NotImplementedError("must be implemented by subclass") + def update_job_definition(self, job_definition_id: str, model: UpdateJobDefinition): """Updates job definition metadata in the persistence store, should only impact all future jobs. @@ -192,6 +216,10 @@ def get_job_definition(self, job_definition_id: str) -> DescribeJobDefinition: """Returns job definition record for a single job definition""" raise NotImplementedError("must be implemented by subclass") + def get_workflow_definition(self, workflow_definition_id: str) -> DescribeWorkflowDefinition: + """Returns workflow definition record for a single workflow definition""" + raise NotImplementedError("must be implemented by subclass") + def list_job_definitions(self, query: ListJobDefinitionsQuery) -> ListJobDefinitionsResponse: """Returns list of all job definitions filtered by query""" raise NotImplementedError("must be implemented by subclass") @@ -524,6 +552,13 @@ def create_workflow(self, model: CreateWorkflow) -> str: session.commit() return workflow.workflow_id + def create_workflow_definition(self, model: CreateWorkflowDefinition) -> str: + with self.db_session() as session: + workflow_definition = WorkflowDefinition(**model.dict(exclude_none=True)) + session.add(workflow_definition) + session.commit() + return workflow_definition.workflow_definition_id + def run_workflow(self, workflow_id: str) -> str: execution_manager = self.execution_manager_class( workflow_id=workflow_id, @@ -533,6 +568,15 @@ def run_workflow(self, workflow_id: str) -> str: execution_manager.process_workflow() return workflow_id + def activate_workflow_definition(self, workflow_definition_id: str) -> str: + execution_manager = self.execution_manager_class( + workflow_definition_id=workflow_definition_id, + root_dir=self.root_dir, + db_url=self.db_url, + ) + execution_manager.activate_workflow_definition() + return workflow_definition_id + def get_workflow(self, workflow_id: str) -> DescribeWorkflow: with self.db_session() as session: workflow_record = ( @@ -541,6 +585,16 @@ def get_workflow(self, workflow_id: str) -> DescribeWorkflow: model = DescribeWorkflow.from_orm(workflow_record) return model + def get_workflow_definition(self, workflow_definition_id: str) -> DescribeWorkflowDefinition: + with self.db_session() as session: + workflow_definition_record = ( + session.query(WorkflowDefinition) + .filter(WorkflowDefinition.workflow_definition_id == workflow_definition_id) + .one() + ) + model = DescribeWorkflowDefinition.from_orm(workflow_definition_record) + return model + def create_workflow_task(self, workflow_id: str, model: CreateJob) -> str: job_id = self.create_job(model, run=False) workflow: DescribeWorkflow = self.get_workflow(workflow_id) @@ -549,6 +603,20 @@ def create_workflow_task(self, workflow_id: str, model: CreateJob) -> str: self.update_workflow(workflow_id, UpdateWorkflow(tasks=updated_tasks)) return job_id + def create_workflow_definition_task( + self, workflow_definition_id: str, model: CreateJobDefinition + ) -> str: + job_definition_id = self.create_job_definition(model, add_to_task_runner=False) + workflow_definition: DescribeWorkflowDefinition = self.get_workflow_definition( + workflow_definition_id + ) + updated_tasks = (workflow_definition.tasks or [])[:] + updated_tasks.append(job_definition_id) + self.update_workflow_definition( + workflow_definition_id, UpdateWorkflowDefinition(tasks=updated_tasks) + ) + return job_definition_id + def update_workflow(self, workflow_id: str, model: UpdateWorkflow): with self.db_session() as session: session.query(Workflow).filter(Workflow.workflow_id == workflow_id).update( @@ -556,6 +624,15 @@ def update_workflow(self, workflow_id: str, model: UpdateWorkflow): ) session.commit() + def update_workflow_definition( + self, workflow_definition_id: str, model: UpdateWorkflowDefinition + ): + with self.db_session() as session: + session.query(WorkflowDefinition).filter( + WorkflowDefinition.workflow_definition_id == workflow_definition_id + ).update(model.dict(exclude_none=True)) + session.commit() + def update_job(self, job_id: str, model: UpdateJob): with self.db_session() as session: session.query(Job).filter(Job.job_id == job_id).update(model.dict(exclude_none=True)) @@ -657,7 +734,9 @@ def stop_job(self, job_id): session.commit() break - def create_job_definition(self, model: CreateJobDefinition) -> str: + def create_job_definition( + self, model: CreateJobDefinition, add_to_task_runner: bool = True + ) -> str: with self.db_session() as session: if not self.file_exists(model.input_uri): raise InputUriError(model.input_uri) @@ -681,7 +760,7 @@ def create_job_definition(self, model: CreateJobDefinition) -> str: else: self.copy_input_file(model.input_uri, staging_paths["input"]) - if self.task_runner and job_definition_schedule: + if add_to_task_runner and self.task_runner and job_definition_schedule: self.task_runner.add_job_definition(job_definition_id) return job_definition_id diff --git a/jupyter_scheduler/workflows.py b/jupyter_scheduler/workflows.py index bc599ac3..5f977413 100644 --- a/jupyter_scheduler/workflows.py +++ b/jupyter_scheduler/workflows.py @@ -14,7 +14,13 @@ ExtensionHandlerMixin, JobHandlersMixin, ) -from jupyter_scheduler.models import CreateJob, Status, UpdateJob +from jupyter_scheduler.models import ( + CreateJob, + CreateJobDefinition, + Status, + UpdateJob, + UpdateJobDefinition, +) from jupyter_scheduler.pydantic_v1 import BaseModel, ValidationError @@ -99,7 +105,7 @@ async def patch(self, _: str, task_id: str): if status and status != Status.STOPPED: raise HTTPError( 500, - "Invalid value for field 'status'. Workflow job status can only be updated to status 'STOPPED' after creation.", + "Invalid value for field 'status'. Workflow task status can only be updated to status 'STOPPED' after creation.", ) try: if status: @@ -153,8 +159,8 @@ class WorkflowDefinitionsHandler(ExtensionHandlerMixin, JobHandlersMixin, APIHan async def post(self): payload = self.get_json_body() or {} try: - workflow_id = await ensure_async( - self.scheduler.create_workflow(CreateWorkflow(**payload)) + workflow_definition_id = await ensure_async( + self.scheduler.create_workflow_definition(CreateWorkflowDefinition(**payload)) ) except ValidationError as e: self.log.exception(e) @@ -170,34 +176,41 @@ async def post(self): raise HTTPError(500, str(e)) from e except Exception as e: self.log.exception(e) - raise HTTPError(500, "Unexpected error occurred during creation of a workflow.") from e + raise HTTPError( + 500, "Unexpected error occurred during creation of a workflow definition." + ) from e else: - self.finish(json.dumps(dict(workflow_id=workflow_id))) + self.finish(json.dumps(dict(workflow_definition_id=workflow_definition_id))) @authenticated - async def get(self, workflow_id: str = None): - if not workflow_id: + async def get(self, workflow_definition_id: str = None): + if not workflow_definition_id: raise HTTPError(400, "Missing workflow_id in the request URL.") try: - workflow = await ensure_async(self.scheduler.get_workflow(workflow_id)) + workflow_definition = await ensure_async( + self.scheduler.get_workflow_definition(workflow_definition_id) + ) except SchedulerError as e: self.log.exception(e) raise HTTPError(500, str(e)) from e except Exception as e: self.log.exception(e) - raise HTTPError(500, "Unexpected error occurred while getting workflow details.") from e + raise HTTPError( + 500, "Unexpected error occurred while getting workflow definition details." + ) from e else: - self.finish(workflow.json()) + self.finish(workflow_definition.json()) class WorkflowDefinitionsTasksHandler(ExtensionHandlerMixin, JobHandlersMixin, APIHandler): @authenticated - async def post(self, workflow_id: str): + async def post(self, workflow_definition_id: str): payload = self.get_json_body() try: - task_id = await ensure_async( - self.scheduler.create_workflow_task( - workflow_id=workflow_id, model=CreateJob(**payload) + task_defintion_id = await ensure_async( + self.scheduler.create_workflow_definition_task( + workflow_definition_id=workflow_definition_id, + model=CreateJobDefinition(**payload), ) ) except ValidationError as e: @@ -215,10 +228,65 @@ async def post(self, workflow_id: str): except Exception as e: self.log.exception(e) raise HTTPError( - 500, "Unexpected error occurred during creation of workflow job." + 500, "Unexpected error occurred during creation of workflow definition task." ) from e else: - self.finish(json.dumps(dict(task_id=task_id))) + self.finish(json.dumps(dict(task_defintion_id=task_defintion_id))) + + @authenticated + async def patch(self, _: str, task_definition_id: str): + payload = self.get_json_body() + status = payload.get("status") + status = Status(status) if status else None + + try: + await ensure_async( + self.scheduler.update_job_definition( + task_definition_id, UpdateJobDefinition(**payload) + ) + ) + except ValidationError as e: + self.log.exception(e) + raise HTTPError(500, str(e)) from e + except SchedulerError as e: + self.log.exception(e) + raise HTTPError(500, str(e)) from e + except Exception as e: + self.log.exception(e) + raise HTTPError( + 500, "Unexpected error occurred while updating the workflow definition task." + ) from e + else: + self.set_status(204) + self.finish() + + +class WorkflowDefinitionsActivationHandler(ExtensionHandlerMixin, JobHandlersMixin, APIHandler): + @authenticated + async def post(self, workflow_definition_id: str): + try: + workflow_definition_id = await ensure_async( + self.scheduler.activate_workflow_definition(workflow_definition_id) + ) + except ValidationError as e: + self.log.exception(e) + raise HTTPError(500, str(e)) from e + except InputUriError as e: + self.log.exception(e) + raise HTTPError(500, str(e)) from e + except IdempotencyTokenError as e: + self.log.exception(e) + raise HTTPError(409, str(e)) from e + except SchedulerError as e: + self.log.exception(e) + raise HTTPError(500, str(e)) from e + except Exception as e: + self.log.exception(e) + raise HTTPError( + 500, "Unexpected error occurred during attempt to run a workflow." + ) from e + else: + self.finish(json.dumps(dict(workflow_definition_id=workflow_definition_id))) class CreateWorkflow(BaseModel):