Skip to content

Commit

Permalink
add workflow execution handlers and endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
andrii-i committed Oct 7, 2024
1 parent 6466c48 commit 9994767
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 27 deletions.
34 changes: 32 additions & 2 deletions jupyter_scheduler/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
from prefect.futures import as_completed

from jupyter_scheduler.models import CreateJob, DescribeJob, JobFeature, Status
from jupyter_scheduler.orm import Job, Workflow, create_session
from jupyter_scheduler.orm import Job, Workflow, WorkflowDefinition, create_session
from jupyter_scheduler.parameterize import add_parameters
from jupyter_scheduler.scheduler import Scheduler
from jupyter_scheduler.utils import get_utc_timestamp
from jupyter_scheduler.workflows import DescribeWorkflow
from jupyter_scheduler.workflows import DescribeWorkflow, DescribeWorkflowDefinition


class ExecutionManager(ABC):
Expand All @@ -40,11 +40,13 @@ def __init__(
db_url: str,
job_id: str = None,
workflow_id: str = None,
workflow_definition_id: str = None,
root_dir: str = None,
staging_paths: Dict[str, str] = None,
):
self.job_id = job_id
self.workflow_id = workflow_id
self.workflow_definition_id = workflow_definition_id
self.staging_paths = staging_paths
self.root_dir = root_dir
self.db_url = db_url
Expand All @@ -58,6 +60,17 @@ def model(self):
)
self._model = DescribeWorkflow.from_orm(workflow)
return self._model
if self.workflow_definition_id:
with self.db_session() as session:
workflow_definition = (
session.query(WorkflowDefinition)
.filter(
WorkflowDefinition.workflow_definition_id == self.workflow_definition_id
)
.first()
)
self._model = DescribeWorkflowDefinition.from_orm(workflow_definition)
return self._model
if self._model is None:
with self.db_session() as session:
job = session.query(Job).filter(Job.job_id == self.job_id).first()
Expand Down Expand Up @@ -187,6 +200,23 @@ def on_complete_workflow(self):
class DefaultExecutionManager(ExecutionManager):
"""Default execution manager that executes notebooks"""

def activate_workflow_definition(self):
workflow_definition = self.model
with self.db_session() as session:
session.query(WorkflowDefinition).filter(
WorkflowDefinition.workflow_definition_id
== workflow_definition.workflow_definition_id
).update({"active": True})
session.commit()
workflow_definition = (
session.query(WorkflowDefinition)
.filter(
WorkflowDefinition.workflow_definition_id
== workflow_definition.workflow_definition_id
)
.first()
)

@task(name="Execute workflow task")
def execute_task(self, job: Job):
with self.db_session() as session:
Expand Down
8 changes: 5 additions & 3 deletions jupyter_scheduler/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from jupyter_scheduler.orm import create_tables
from jupyter_scheduler.workflows import (
WorkflowDefinitionsActivationHandler,
WorkflowDefinitionsHandler,
WorkflowDefinitionsTasksHandler,
WorkflowsHandler,
Expand Down Expand Up @@ -54,16 +55,17 @@ class SchedulerApp(ExtensionApp):
rf"scheduler/worklows/{WORKFLOW_ID_REGEX}/tasks",
WorkflowsTasksHandler,
),
(r"scheduler/worklow_definitions", WorkflowDefinitionsHandler),
(
rf"scheduler/worklow_definitions/{WORKFLOW_DEFINITION_ID_REGEX}",
WorkflowDefinitionsHandler,
),
(
rf"scheduler/worklows/{WORKFLOW_DEFINITION_ID_REGEX}/run",
WorkflowDefinitionsHandler,
rf"scheduler/worklow_definitions/{WORKFLOW_DEFINITION_ID_REGEX}/activate",
WorkflowDefinitionsActivationHandler,
),
(
rf"scheduler/worklows/{WORKFLOW_ID_REGEX}/tasks",
rf"scheduler/worklow_definitions/{WORKFLOW_DEFINITION_ID_REGEX}/tasks",
WorkflowDefinitionsTasksHandler,
),
]
Expand Down
2 changes: 1 addition & 1 deletion jupyter_scheduler/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class Workflow(Base):
class WorkflowDefinition(Base):
__tablename__ = "workflow_definitions"
__table_args__ = {"extend_existing": True}
workflow_id = Column(String(36), primary_key=True, default=generate_uuid)
workflow_definition_id = Column(String(36), primary_key=True, default=generate_uuid)
tasks = Column(JsonType)
status = Column(String(64), default=Status.CREATED)
active = Column(Boolean, default=False)
Expand Down
87 changes: 83 additions & 4 deletions jupyter_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,20 @@
UpdateJob,
UpdateJobDefinition,
)
from jupyter_scheduler.orm import Job, JobDefinition, Workflow, create_session
from jupyter_scheduler.orm import Job, JobDefinition, Workflow, WorkflowDefinition, create_session
from jupyter_scheduler.utils import (
copy_directory,
create_output_directory,
create_output_filename,
)
from jupyter_scheduler.workflows import CreateWorkflow, DescribeWorkflow, UpdateWorkflow
from jupyter_scheduler.workflows import (
CreateWorkflow,
CreateWorkflowDefinition,
DescribeWorkflow,
DescribeWorkflowDefinition,
UpdateWorkflow,
UpdateWorkflowDefinition,
)


class BaseScheduler(LoggingConfigurable):
Expand Down Expand Up @@ -117,6 +124,10 @@ def run_workflow(self, workflow_id: str) -> str:
"""Triggers execution of the workflow."""
raise NotImplementedError("must be implemented by subclass")

def activate_workflow_definition(self, workflow_definition_id: str) -> str:
"""Activates workflow marking it as ready for execution."""
raise NotImplementedError("must be implemented by subclass")

def get_workflow(self, workflow_id: str) -> DescribeWorkflow:
"""Returns workflow record for a single workflow."""
raise NotImplementedError("must be implemented by subclass")
Expand All @@ -125,6 +136,12 @@ def create_workflow_task(self, workflow_id: str, model: CreateJob) -> str:
"""Adds a task to a workflow."""
raise NotImplementedError("must be implemented by subclass")

def create_workflow_definition_task(
self, workflow_definition_id: str, model: CreateJobDefinition
) -> str:
"""Adds a task to a workflow definition."""
raise NotImplementedError("must be implemented by subclass")

def update_job(self, job_id: str, model: UpdateJob):
"""Updates job metadata in the persistence store,
for example name, status etc. In case of status
Expand Down Expand Up @@ -176,6 +193,13 @@ def create_job_definition(self, model: CreateJobDefinition) -> str:
"""
raise NotImplementedError("must be implemented by subclass")

def create_workflow_definition(self, model: CreateWorkflowDefinition) -> str:
"""Creates a new workflow definition record,
consider this as the template for creating
recurring/scheduled workflows.
"""
raise NotImplementedError("must be implemented by subclass")

def update_job_definition(self, job_definition_id: str, model: UpdateJobDefinition):
"""Updates job definition metadata in the persistence store,
should only impact all future jobs.
Expand All @@ -192,6 +216,10 @@ def get_job_definition(self, job_definition_id: str) -> DescribeJobDefinition:
"""Returns job definition record for a single job definition"""
raise NotImplementedError("must be implemented by subclass")

def get_workflow_definition(self, workflow_definition_id: str) -> DescribeWorkflowDefinition:
"""Returns workflow definition record for a single workflow definition"""
raise NotImplementedError("must be implemented by subclass")

def list_job_definitions(self, query: ListJobDefinitionsQuery) -> ListJobDefinitionsResponse:
"""Returns list of all job definitions filtered by query"""
raise NotImplementedError("must be implemented by subclass")
Expand Down Expand Up @@ -524,6 +552,13 @@ def create_workflow(self, model: CreateWorkflow) -> str:
session.commit()
return workflow.workflow_id

def create_workflow_definition(self, model: CreateWorkflowDefinition) -> str:
with self.db_session() as session:
workflow_definition = WorkflowDefinition(**model.dict(exclude_none=True))
session.add(workflow_definition)
session.commit()
return workflow_definition.workflow_definition_id

def run_workflow(self, workflow_id: str) -> str:
execution_manager = self.execution_manager_class(
workflow_id=workflow_id,
Expand All @@ -533,6 +568,15 @@ def run_workflow(self, workflow_id: str) -> str:
execution_manager.process_workflow()
return workflow_id

def activate_workflow_definition(self, workflow_definition_id: str) -> str:
execution_manager = self.execution_manager_class(
workflow_definition_id=workflow_definition_id,
root_dir=self.root_dir,
db_url=self.db_url,
)
execution_manager.activate_workflow_definition()
return workflow_definition_id

def get_workflow(self, workflow_id: str) -> DescribeWorkflow:
with self.db_session() as session:
workflow_record = (
Expand All @@ -541,6 +585,16 @@ def get_workflow(self, workflow_id: str) -> DescribeWorkflow:
model = DescribeWorkflow.from_orm(workflow_record)
return model

def get_workflow_definition(self, workflow_definition_id: str) -> DescribeWorkflowDefinition:
with self.db_session() as session:
workflow_definition_record = (
session.query(WorkflowDefinition)
.filter(WorkflowDefinition.workflow_definition_id == workflow_definition_id)
.one()
)
model = DescribeWorkflowDefinition.from_orm(workflow_definition_record)
return model

def create_workflow_task(self, workflow_id: str, model: CreateJob) -> str:
job_id = self.create_job(model, run=False)
workflow: DescribeWorkflow = self.get_workflow(workflow_id)
Expand All @@ -549,13 +603,36 @@ def create_workflow_task(self, workflow_id: str, model: CreateJob) -> str:
self.update_workflow(workflow_id, UpdateWorkflow(tasks=updated_tasks))
return job_id

def create_workflow_definition_task(
self, workflow_definition_id: str, model: CreateJobDefinition
) -> str:
job_definition_id = self.create_job_definition(model, add_to_task_runner=False)
workflow_definition: DescribeWorkflowDefinition = self.get_workflow_definition(
workflow_definition_id
)
updated_tasks = (workflow_definition.tasks or [])[:]
updated_tasks.append(job_definition_id)
self.update_workflow_definition(
workflow_definition_id, UpdateWorkflowDefinition(tasks=updated_tasks)
)
return job_definition_id

def update_workflow(self, workflow_id: str, model: UpdateWorkflow):
with self.db_session() as session:
session.query(Workflow).filter(Workflow.workflow_id == workflow_id).update(
model.dict(exclude_none=True)
)
session.commit()

def update_workflow_definition(
self, workflow_definition_id: str, model: UpdateWorkflowDefinition
):
with self.db_session() as session:
session.query(WorkflowDefinition).filter(
WorkflowDefinition.workflow_definition_id == workflow_definition_id
).update(model.dict(exclude_none=True))
session.commit()

def update_job(self, job_id: str, model: UpdateJob):
with self.db_session() as session:
session.query(Job).filter(Job.job_id == job_id).update(model.dict(exclude_none=True))
Expand Down Expand Up @@ -657,7 +734,9 @@ def stop_job(self, job_id):
session.commit()
break

def create_job_definition(self, model: CreateJobDefinition) -> str:
def create_job_definition(
self, model: CreateJobDefinition, add_to_task_runner: bool = True
) -> str:
with self.db_session() as session:
if not self.file_exists(model.input_uri):
raise InputUriError(model.input_uri)
Expand All @@ -681,7 +760,7 @@ def create_job_definition(self, model: CreateJobDefinition) -> str:
else:
self.copy_input_file(model.input_uri, staging_paths["input"])

if self.task_runner and job_definition_schedule:
if add_to_task_runner and self.task_runner and job_definition_schedule:
self.task_runner.add_job_definition(job_definition_id)

return job_definition_id
Expand Down
Loading

0 comments on commit 9994767

Please sign in to comment.