Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Project#get_snapshot_for API #60

Merged
merged 2 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions docs/docs/guides/implementing-pipeline-testing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Implemeting Pipeline Testing

## Overview
Pipelines are the core of NodeStream.
They are the building blocks that allow you to define your data processing logic.
Since so many parts of the application depend on pipelines.
It is important to test them thoroughly to ensure that the integration between the different components is working as expected.

Nodestream has some built-in tools to help you test your pipelines. See the [Project#get_snapshot_for](https://nodestream-proj.github.io/nodestream/python_reference/project/project/#nodestream.project.project.Project.get_snapshot_for) method for more details on running pipelines.

## Examples

### `pytest`

```python
# tests/test_pipelines.py

import dataclasses
import json

from nodestream.project import Project

from freezegun import freeze_time

# Step 1: Define a custom JSON encoder that will convert dataclasses to dicts.
class EnhancedJSONEncoder(json.JSONEncoder):
def default(self, o):
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
return super().default(o)

# Step 2: Freeze time so that the snapshot is deterministic.
@freeze_time("2020-01-01")
def test_pipeline_snapshot(snapshot):
# Step 3: Load your application project
project = Project.read_from_file("nodestream.yaml")

# Step 4: Run the pipeline and get the results gathered as a list.
snapshot = project.get_snapshot_for("pipeline")
snapshot_str = json.dumps(snapshot, cls=EnhancedJSONEncoder)

# Step 5: Assert that the results match the snapshot
# Note: This will fail the first time you run it.
snapshot.snapshot_dir = "snapshots"
snapshot.assert_match(snapshot_str, "test_name_snapshot.json")
```
4 changes: 4 additions & 0 deletions nodestream/pipeline/pipeline_file_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ class PipelineInitializationArguments:
def for_introspection(cls):
return cls(annotations=["introspection"])

@classmethod
def for_testing(cls):
return cls(annotations=["test"])

def initialize_from_file_data(self, file_data: List[dict]):
return Pipeline(self.load_steps(ClassLoader(), file_data))

Expand Down
21 changes: 21 additions & 0 deletions nodestream/project/pipeline_progress_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,27 @@ class PipelineProgressReporter:
on_start_callback: Callable[[], None] = field(default=no_op)
on_finish_callback: Callable[[PipelineContext], None] = field(default=no_op)

@classmethod
def for_testing(cls, results_list: list) -> "PipelineProgressReporter":
"""Create a `PipelineProgressReporter` for testing.

This method is intended to be used for testing purposes only. It will create a
`PipelineProgressReporter` with the default values for testing.

Args:
results_list: The list to append results to.

Returns:
PipelineProgressReporter: A `PipelineProgressReporter` for testing.
"""
return cls(
reporting_frequency=1,
logger=getLogger("test"),
callback=lambda _, record: results_list.append(record),
on_start_callback=no_op,
on_finish_callback=no_op,
)

async def execute_with_reporting(self, pipeline: Pipeline):
"""Execute the given pipeline with progress reporting.

Expand Down
16 changes: 16 additions & 0 deletions nodestream/project/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,22 @@ async def run(self, request: RunRequest):
for scope in self.scopes_by_name.values():
await scope.run_request(request)

async def get_snapshot_for(self, pipeline_name: str) -> list:
"""Returns the output of the pipeline with the given name to be used as a snapshot.

This method is intended for testing purposes only. It will run the pipeline and return the results.
The pipeline is run with the `test` annotation, so components that you want to run must be annotated with `test` or not annotated at all.

Args:
pipeline_name (str): The name of the pipeline to get a snapshot for.

Returns:
str: The snapshot of the pipeline.
"""
run_request = RunRequest.for_testing(pipeline_name, results := [])
await self.run(run_request)
return results

def add_scope(self, scope: PipelineScope):
"""Adds a scope to the project.

Expand Down
21 changes: 21 additions & 0 deletions nodestream/project/run_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,27 @@ class RunRequest:
initialization_arguments: PipelineInitializationArguments
progress_reporter: PipelineProgressReporter

@classmethod
def for_testing(cls, pipeline_name: str, results_list: list) -> "RunRequest":
"""Create a `RunRequest` for testing.

This method is intended to be used for testing purposes only. It will create a
run request with the given pipeline name and `PipelineInitializationArguments`
for testing.

Args:
pipeline_name: The name of the pipeline to run.
results_list: The list to append results to.

Returns:
RunRequest: A `RunRequest` for testing.
"""
return cls(
pipeline_name,
PipelineInitializationArguments.for_testing(),
PipelineProgressReporter.for_testing(results_list),
)

async def execute_with_definition(self, definition: PipelineDefinition):
"""Execute this run request with the given pipeline definition.

Expand Down
6 changes: 6 additions & 0 deletions tests/unit/pipeline/test_pipeline_file_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,9 @@ def test_basic_file_load_with_annotations():
assert_that(result.steps, has_length(2))
assert_that(result.steps[0], instance_of(PassStep))
assert_that(result.steps[1], instance_of(PassStep))


def test_init_args_for_testing():
init_args = PipelineInitializationArguments.for_testing()
assert_that(init_args.annotations, has_length(1))
assert_that(init_args.annotations[0], "test")
7 changes: 7 additions & 0 deletions tests/unit/project/test_pipeline_progress_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,10 @@ async def test_pipeline_progress_reporter_calls_with_reporting_frequency(mocker)
reporter = PipelineProgressReporter(reporting_frequency=10, callback=mocker.Mock())
await reporter.execute_with_reporting(pipeline)
assert_that(reporter.callback.call_count, equal_to(10))


@pytest.mark.asyncio
async def test_pipeline_progress_reporter_for_testing(mocker):
result = PipelineProgressReporter.for_testing([])
assert_that(result.reporting_frequency, equal_to(1))
assert_that(result.logger.name, equal_to("test"))
7 changes: 7 additions & 0 deletions tests/unit/project/test_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,10 @@ def test_get_schema_with_overrides(project, mocker):
project.generate_graph_schema.return_value.apply_type_overrides_from_file.assert_called_once_with(
"some/path"
)


@pytest.mark.asyncio
async def test_get_snapshot_for(project, mocker):
project.run = mocker.AsyncMock()
await project.get_snapshot_for("pipeline")
project.run.assert_awaited_once()