Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run Command Improvements #100

Merged
merged 7 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions nodestream/cli/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

from ..operations import InitializeLogger, InitializeProject, RunPipeline
from .nodestream_command import NodestreamCommand
from .shared_options import JSON_OPTION, PIPELINE_ARGUMENT, PROJECT_FILE_OPTION
from .shared_options import JSON_OPTION, MANY_PIPELINES_ARGUMENT, PROJECT_FILE_OPTION


class Run(NodestreamCommand):
name = "run"
description = "run a pipeline in the current project"
arguments = [PIPELINE_ARGUMENT]
arguments = [MANY_PIPELINES_ARGUMENT]
options = [
PROJECT_FILE_OPTION,
JSON_OPTION,
Expand Down
3 changes: 3 additions & 0 deletions nodestream/cli/commands/shared_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@


PIPELINE_ARGUMENT = argument("pipeline", "the name of the pipeline")
MANY_PIPELINES_ARGUMENT = argument(
"pipelines", "the names of the pipelines", multiple=True, optional=True
)
49 changes: 35 additions & 14 deletions nodestream/cli/operations/run_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
from typing import Iterable

from cleo.io.outputs.output import Verbosity
from yaml import safe_dump

from ...pipeline import PipelineInitializationArguments, PipelineProgressReporter
from ...pipeline.meta import PipelineContext
from ...project import Project, RunRequest
Expand All @@ -11,29 +16,48 @@
def __init__(self, project: Project) -> None:
self.project = project

def get_pipelines_to_run(self, command: NodestreamCommand) -> Iterable[str]:
supplied_commands = command.argument("pipelines")
return supplied_commands or self.project.get_all_pipeline_names()

async def perform(self, command: NodestreamCommand):
await self.project.run(self.make_run_request(command))
for pipeline_name in self.get_pipelines_to_run(command):
await self.project.run(self.make_run_request(command, pipeline_name))

def make_run_request(
self, command: NodestreamCommand, pipeline_name: str
) -> RunRequest:
def print_effective_config(config):
command.line(

Check warning on line 31 in nodestream/cli/operations/run_pipeline.py

View check run for this annotation

Codecov / codecov/patch

nodestream/cli/operations/run_pipeline.py#L31

Added line #L31 was not covered by tests
"<info>Effective configuration:</info>",
verbosity=Verbosity.VERY_VERBOSE,
)
command.line(

Check warning on line 35 in nodestream/cli/operations/run_pipeline.py

View check run for this annotation

Codecov / codecov/patch

nodestream/cli/operations/run_pipeline.py#L35

Added line #L35 was not covered by tests
f"<info>{safe_dump(config)}</info>", verbosity=Verbosity.VERY_VERBOSE
)

def make_run_request(self, command: NodestreamCommand) -> RunRequest:
return RunRequest(
pipeline_name=command.argument("pipeline"),
pipeline_name=pipeline_name,
initialization_arguments=PipelineInitializationArguments(
annotations=command.option("annotations"),
step_outbox_size=int(command.option("step-outbox-size")),
on_effective_configuration_resolved=print_effective_config,
),
progress_reporter=self.create_progress_reporter(command),
progress_reporter=self.create_progress_reporter(command, pipeline_name),
)

def get_progress_indicator(self, command: NodestreamCommand) -> "ProgressIndicator":
def get_progress_indicator(
self, command: NodestreamCommand, pipeline_name: str
) -> "ProgressIndicator":
if command.has_json_logging_set:
return ProgressIndicator(command)
return ProgressIndicator(command, pipeline_name)

return SpinnerProgressIndicator(command)
return SpinnerProgressIndicator(command, pipeline_name)

Check warning on line 55 in nodestream/cli/operations/run_pipeline.py

View check run for this annotation

Codecov / codecov/patch

nodestream/cli/operations/run_pipeline.py#L55

Added line #L55 was not covered by tests

def create_progress_reporter(
self, command: NodestreamCommand
self, command: NodestreamCommand, pipeline_name: str
) -> PipelineProgressReporter:
indicator = self.get_progress_indicator(command)
indicator = self.get_progress_indicator(command, pipeline_name)
return PipelineProgressReporter(
reporting_frequency=int(command.option("reporting-frequency")),
callback=indicator.progress_callback,
Expand All @@ -43,8 +67,9 @@


class ProgressIndicator:
def __init__(self, command: NodestreamCommand) -> None:
def __init__(self, command: NodestreamCommand, pipeline_name: str) -> None:
self.command = command
self.pipeline_name = pipeline_name

def on_start(self):
pass
Expand All @@ -55,10 +80,6 @@
def on_finish(self, context: PipelineContext):
pass

@property
def pipeline_name(self) -> str:
return self.command.argument("pipeline")


class SpinnerProgressIndicator(ProgressIndicator):
def on_start(self):
Expand Down
13 changes: 9 additions & 4 deletions nodestream/pipeline/pipeline_file_loader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional
from typing import Callable, Dict, List, Optional

from yaml import SafeLoader, load

Expand Down Expand Up @@ -58,6 +58,7 @@

step_outbox_size: int = 1000
annotations: Optional[List[str]] = None
on_effective_configuration_resolved: Optional[Callable[[List[Dict]], None]] = None

@classmethod
def for_introspection(cls):
Expand All @@ -74,10 +75,14 @@
)

def load_steps(self, class_loader, file_data):
effective = self.get_effective_configuration(file_data)
if self.on_effective_configuration_resolved:
self.on_effective_configuration_resolved(file_data)

Check warning on line 80 in nodestream/pipeline/pipeline_file_loader.py

View check run for this annotation

Codecov / codecov/patch

nodestream/pipeline/pipeline_file_loader.py#L80

Added line #L80 was not covered by tests
return [class_loader.load_class(**step_data) for step_data in effective]

def get_effective_configuration(self, file_data):
return [
class_loader.load_class(**step_data)
for step_data in file_data
if self.should_load_step(step_data)
step_data for step_data in file_data if self.should_load_step(step_data)
]

def should_load_step(self, step):
Expand Down
6 changes: 6 additions & 0 deletions nodestream/project/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ def get_scopes_by_name(self, scope_name: Optional[str]) -> Iterable[PipelineScop

return [self.scopes_by_name[scope_name]]

def get_all_pipeline_names(self) -> Iterable[str]:
"""Returns all pipeline names in the project."""
for scope in self.scopes_by_name.values():
for pipeline in scope.pipelines_by_name.values():
yield pipeline.name

def delete_pipeline(
self,
scope_name: Optional[str],
Expand Down
31 changes: 14 additions & 17 deletions tests/unit/cli/operations/test_run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from hamcrest import assert_that, equal_to

from nodestream.cli.operations.run_pipeline import RunPipeline, SpinnerProgressIndicator
from nodestream.pipeline import PipelineInitializationArguments
from nodestream.pipeline.meta import PipelineContext
from nodestream.project import Project

Expand All @@ -15,44 +14,42 @@ def run_pipeline_operation(mocker):
@pytest.mark.asyncio
async def test_run_pipeline_operation_perform(run_pipeline_operation, mocker):
run_req = "run"
cmd = mocker.Mock()
cmd.argument.return_value = ["pipeline_name"]
run_pipeline_operation.make_run_request = mocker.Mock(return_value=run_req)
await run_pipeline_operation.perform(mocker.Mock())
await run_pipeline_operation.perform(cmd)
run_pipeline_operation.project.run.assert_awaited_once_with(run_req)


def test_make_run_request(run_pipeline_operation, mocker):
annotations = ["annotation1", "annotation2"]
pipeline_name = "my_pipeline"
command = mocker.Mock()
command.option.side_effect = [["annotation1", "annotation2"], "10001", "10000"]
command.argument.return_value = "my_pipeline"
result = run_pipeline_operation.make_run_request(command)
assert_that(result.pipeline_name, equal_to("my_pipeline"))
assert_that(
result.initialization_arguments,
equal_to(
PipelineInitializationArguments(
annotations=["annotation1", "annotation2"], step_outbox_size=10001
)
),
)
command.option.side_effect = [annotations, "10001", "10000"]
command.argument.return_value = [pipeline_name]
result = run_pipeline_operation.make_run_request(command, pipeline_name)
assert_that(result.pipeline_name, equal_to(pipeline_name))
assert_that(result.initialization_arguments.annotations, equal_to(annotations))
assert_that(result.initialization_arguments.step_outbox_size, equal_to(10001))
assert_that(result.progress_reporter.reporting_frequency, equal_to(10000))


def test_spinner_on_start(mocker):
spinner = SpinnerProgressIndicator(mocker.Mock())
spinner = SpinnerProgressIndicator(mocker.Mock(), "pipeline_name")
spinner.on_start()
spinner.command.progress_indicator.assert_called_once()
spinner.progress.start.assert_called_once()


def test_spinner_on_finish(mocker):
spinner = SpinnerProgressIndicator(mocker.Mock())
spinner = SpinnerProgressIndicator(mocker.Mock(), "pipeline_name")
spinner.on_start()
spinner.on_finish(PipelineContext())
spinner.progress.finish.assert_called_once()


def test_spinner_progress_callback(mocker):
spinner = SpinnerProgressIndicator(mocker.Mock())
spinner = SpinnerProgressIndicator(mocker.Mock(), "pipeline_name")
spinner.on_start()
spinner.progress_callback(1000, None)
spinner.progress.set_message.assert_called_once()
8 changes: 7 additions & 1 deletion tests/unit/project/test_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
def scopes():
return [
PipelineScope("scope1", [PipelineDefinition("test", Path("path/to/pipeline"))]),
PipelineScope("scope2", []),
PipelineScope(
"scope2", [PipelineDefinition("test2", Path("path/to/pipeline"))]
),
]


Expand Down Expand Up @@ -145,3 +147,7 @@ async def test_get_snapshot_for(project, mocker):
project.run = mocker.AsyncMock()
await project.get_snapshot_for("pipeline")
project.run.assert_awaited_once()


def test_all_pipeline_names(project):
assert_that(list(project.get_all_pipeline_names()), equal_to(["test", "test2"]))