diff --git a/docs/docs/guides/implementing-pipeline-testing.md b/docs/docs/guides/implementing-pipeline-testing.md new file mode 100644 index 000000000..afb8797b2 --- /dev/null +++ b/docs/docs/guides/implementing-pipeline-testing.md @@ -0,0 +1,46 @@ +# Implemeting Pipeline Testing + +## Overview +Pipelines are the core of NodeStream. +They are the building blocks that allow you to define your data processing logic. +Since so many parts of the application depend on pipelines. +It is important to test them thoroughly to ensure that the integration between the different components is working as expected. + +Nodestream has some built-in tools to help you test your pipelines. See the [Project#get_snapshot_for](https://nodestream-proj.github.io/nodestream/python_reference/project/project/#nodestream.project.project.Project.get_snapshot_for) method for more details on running pipelines. + +## Examples + +### `pytest` + +```python +# tests/test_pipelines.py + +import dataclasses +import json + +from nodestream.project import Project + +from freezegun import freeze_time + +# Step 1: Define a custom JSON encoder that will convert dataclasses to dicts. +class EnhancedJSONEncoder(json.JSONEncoder): + def default(self, o): + if dataclasses.is_dataclass(o): + return dataclasses.asdict(o) + return super().default(o) + +# Step 2: Freeze time so that the snapshot is deterministic. +@freeze_time("2020-01-01") +def test_pipeline_snapshot(snapshot): + # Step 3: Load your application project + project = Project.read_from_file("nodestream.yaml") + + # Step 4: Run the pipeline and get the results gathered as a list. + snapshot = project.get_snapshot_for("pipeline") + snapshot_str = json.dumps(snapshot, cls=EnhancedJSONEncoder) + + # Step 5: Assert that the results match the snapshot + # Note: This will fail the first time you run it. + snapshot.snapshot_dir = "snapshots" + snapshot.assert_match(snapshot_str, "test_name_snapshot.json") +``` \ No newline at end of file diff --git a/nodestream/pipeline/pipeline_file_loader.py b/nodestream/pipeline/pipeline_file_loader.py index 00f65a752..e1ffa59e1 100644 --- a/nodestream/pipeline/pipeline_file_loader.py +++ b/nodestream/pipeline/pipeline_file_loader.py @@ -53,6 +53,10 @@ class PipelineInitializationArguments: def for_introspection(cls): return cls(annotations=["introspection"]) + @classmethod + def for_testing(cls): + return cls(annotations=["test"]) + def initialize_from_file_data(self, file_data: List[dict]): return Pipeline(self.load_steps(ClassLoader(), file_data)) diff --git a/nodestream/project/pipeline_progress_reporter.py b/nodestream/project/pipeline_progress_reporter.py index 7e8ba6c19..c6a87933f 100644 --- a/nodestream/project/pipeline_progress_reporter.py +++ b/nodestream/project/pipeline_progress_reporter.py @@ -53,6 +53,27 @@ class PipelineProgressReporter: on_start_callback: Callable[[], None] = field(default=no_op) on_finish_callback: Callable[[PipelineContext], None] = field(default=no_op) + @classmethod + def for_testing(cls, results_list: list) -> "PipelineProgressReporter": + """Create a `PipelineProgressReporter` for testing. + + This method is intended to be used for testing purposes only. It will create a + `PipelineProgressReporter` with the default values for testing. + + Args: + results_list: The list to append results to. + + Returns: + PipelineProgressReporter: A `PipelineProgressReporter` for testing. + """ + return cls( + reporting_frequency=1, + logger=getLogger("test"), + callback=lambda _, record: results_list.append(record), + on_start_callback=no_op, + on_finish_callback=no_op, + ) + async def execute_with_reporting(self, pipeline: Pipeline): """Execute the given pipeline with progress reporting. diff --git a/nodestream/project/project.py b/nodestream/project/project.py index eda0ee70b..5b4bb46fd 100644 --- a/nodestream/project/project.py +++ b/nodestream/project/project.py @@ -95,6 +95,22 @@ async def run(self, request: RunRequest): for scope in self.scopes_by_name.values(): await scope.run_request(request) + async def get_snapshot_for(self, pipeline_name: str) -> list: + """Returns the output of the pipeline with the given name to be used as a snapshot. + + This method is intended for testing purposes only. It will run the pipeline and return the results. + The pipeline is run with the `test` annotation, so components that you want to run must be annotated with `test` or not annotated at all. + + Args: + pipeline_name (str): The name of the pipeline to get a snapshot for. + + Returns: + str: The snapshot of the pipeline. + """ + run_request = RunRequest.for_testing(pipeline_name, results := []) + await self.run(run_request) + return results + def add_scope(self, scope: PipelineScope): """Adds a scope to the project. diff --git a/nodestream/project/run_request.py b/nodestream/project/run_request.py index 32211b3d1..fb5c00cb4 100644 --- a/nodestream/project/run_request.py +++ b/nodestream/project/run_request.py @@ -19,6 +19,27 @@ class RunRequest: initialization_arguments: PipelineInitializationArguments progress_reporter: PipelineProgressReporter + @classmethod + def for_testing(cls, pipeline_name: str, results_list: list) -> "RunRequest": + """Create a `RunRequest` for testing. + + This method is intended to be used for testing purposes only. It will create a + run request with the given pipeline name and `PipelineInitializationArguments` + for testing. + + Args: + pipeline_name: The name of the pipeline to run. + results_list: The list to append results to. + + Returns: + RunRequest: A `RunRequest` for testing. + """ + return cls( + pipeline_name, + PipelineInitializationArguments.for_testing(), + PipelineProgressReporter.for_testing(results_list), + ) + async def execute_with_definition(self, definition: PipelineDefinition): """Execute this run request with the given pipeline definition. diff --git a/tests/unit/pipeline/test_pipeline_file_loader.py b/tests/unit/pipeline/test_pipeline_file_loader.py index 0c25e2b13..19f83b95c 100644 --- a/tests/unit/pipeline/test_pipeline_file_loader.py +++ b/tests/unit/pipeline/test_pipeline_file_loader.py @@ -24,3 +24,9 @@ def test_basic_file_load_with_annotations(): assert_that(result.steps, has_length(2)) assert_that(result.steps[0], instance_of(PassStep)) assert_that(result.steps[1], instance_of(PassStep)) + + +def test_init_args_for_testing(): + init_args = PipelineInitializationArguments.for_testing() + assert_that(init_args.annotations, has_length(1)) + assert_that(init_args.annotations[0], "test") diff --git a/tests/unit/project/test_pipeline_progress_reporter.py b/tests/unit/project/test_pipeline_progress_reporter.py index 16b9be09f..08734b3bd 100644 --- a/tests/unit/project/test_pipeline_progress_reporter.py +++ b/tests/unit/project/test_pipeline_progress_reporter.py @@ -11,3 +11,10 @@ async def test_pipeline_progress_reporter_calls_with_reporting_frequency(mocker) reporter = PipelineProgressReporter(reporting_frequency=10, callback=mocker.Mock()) await reporter.execute_with_reporting(pipeline) assert_that(reporter.callback.call_count, equal_to(10)) + + +@pytest.mark.asyncio +async def test_pipeline_progress_reporter_for_testing(mocker): + result = PipelineProgressReporter.for_testing([]) + assert_that(result.reporting_frequency, equal_to(1)) + assert_that(result.logger.name, equal_to("test")) diff --git a/tests/unit/project/test_project.py b/tests/unit/project/test_project.py index a73a910b6..6350bda3c 100644 --- a/tests/unit/project/test_project.py +++ b/tests/unit/project/test_project.py @@ -92,3 +92,10 @@ def test_get_schema_with_overrides(project, mocker): project.generate_graph_schema.return_value.apply_type_overrides_from_file.assert_called_once_with( "some/path" ) + + +@pytest.mark.asyncio +async def test_get_snapshot_for(project, mocker): + project.run = mocker.AsyncMock() + await project.get_snapshot_for("pipeline") + project.run.assert_awaited_once()