From 9127a52e031da6db320a9b6448d17a09c1307285 Mon Sep 17 00:00:00 2001 From: danielgafni Date: Wed, 25 Dec 2024 16:40:08 +0100 Subject: [PATCH] [pipes] refactor Pipes tests interface --- .../src/dagster_pipes_tests/suite.py | 113 +++++++----------- 1 file changed, 42 insertions(+), 71 deletions(-) diff --git a/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py b/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py index acb7113..3bd420c 100644 --- a/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py +++ b/libraries/pipes/tests/dagster-pipes-tests/src/dagster_pipes_tests/suite.py @@ -57,6 +57,10 @@ METADATA = json.loads(METADATA_PATH.read_text()) + +def _get_current_test_name(request): + return request.node.name.split("[")[0] + def _resolve_metadata_value( value: Any, metadata_type: PipesMetadataType ) -> MetadataValue: @@ -107,15 +111,21 @@ class PipesTestSuite: # to run all the tests BASE_ARGS = ["change", "me"] - @parametrize("metadata", METADATA_CASES) - def test_context_reconstruction( + def test_components( self, + request, metadata: Dict[str, Any], + context_injector: PipesContextInjector, + message_reader: PipesMessageReader, tmpdir_factory, capsys, ): - """This test doesn't test anything in Dagster. Instead, it provides parameters to the external process, which should check if they are loaded correctly.""" + """This test checks if the external process can access the context and the message writer correctly. + It sets an additional `--job-name` argument which can be used to check if the context was loaded correctly. + + It sets an additional `--extras` argument which points to a json file with Pipes extras which should be loaded by the context loader. The test can use this path to check if the extras were loaded correctly. + """ work_dir = tmpdir_factory.mktemp("work_dir") extras_path = work_dir / "extras.json" @@ -123,48 +133,18 @@ def test_context_reconstruction( with open(str(extras_path), "w") as f: json.dump(metadata, f) - @asset - def my_asset( - context: AssetExecutionContext, - pipes_subprocess_client: PipesSubprocessClient, - ) -> MaterializeResult: - job_name = context.run.job_name - - args = self.BASE_ARGS + [ - "--env", - f"--extras={str(extras_path)}", - f"--job-name={job_name}", - ] - - return pipes_subprocess_client.run( - context=context, - command=args, - extras=metadata, - ).get_materialize_result() - - result = materialize( - [my_asset], - resources={"pipes_subprocess_client": PipesSubprocessClient()}, - raise_on_error=False, - ) - - assert result.success - - def test_components( - self, - context_injector: PipesContextInjector, - message_reader: PipesMessageReader, - tmpdir_factory, - capsys, - ): @asset def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: args = self.BASE_ARGS + [ - "--env", - "--full", + "--extras", + str(extras_path), + "--job-name", + context.run.job_name, + "--test-name", + _get_current_test_name(request), ] if isinstance(context_injector, PipesS3ContextInjector): @@ -203,13 +183,13 @@ def my_asset( ) def test_extras( self, + request, context_injector: PipesContextInjector, metadata: Dict[str, Any], tmpdir_factory, capsys, ): """This test doesn't test anything in Dagster. Instead, it provides extras to the external process, which should check if they are loaded correctly.""" - work_dir = tmpdir_factory.mktemp("work_dir") metadata_path = work_dir / "metadata.json" @@ -222,13 +202,11 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: - job_name = context.run.job_name - args = self.BASE_ARGS + [ - "--full", - "--env", - f"--extras={metadata_path}", - f"--job-name={job_name}", + "--extras", + metadata_path, + "--test-name", + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -262,11 +240,11 @@ def my_asset( def test_error_reporting( self, + request, tmpdir_factory, capsys, ): """This test checks if the external process sends an exception message correctly.""" - if not PIPES_CONFIG.general.error_reporting: pytest.skip("general.error_reporting is not enabled in pipes.toml") @@ -282,8 +260,8 @@ def my_asset( pipes_subprocess_client: PipesSubprocessClient, ): args = self.BASE_ARGS + [ - "--full", - "--throw-error", + "--test-name", + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -328,6 +306,7 @@ def my_asset( def test_message_log( self, + request, tmpdir_factory, capsys, ): @@ -346,8 +325,8 @@ def my_asset( pipes_subprocess_client: PipesSubprocessClient, ): args = self.BASE_ARGS + [ - "--full", - "--logging", + "--test-name", + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -384,7 +363,7 @@ def my_asset( if f"{level.lower().capitalize()} message" in line: assert level in line logged_levels.add(level) - + assert logged_levels == expected_levels assert ( "[pipes] did not receive any messages from external process" @@ -394,6 +373,7 @@ def my_asset( @parametrize("custom_message_payload", CUSTOM_MESSAGE_CASES) def test_message_report_custom_message( self, + request, custom_message_payload: Any, tmpdir_factory, capsys, @@ -415,14 +395,11 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: - job_name = context.run.job_name - args = self.BASE_ARGS + [ - "--full", - "--env", - f"--job-name={job_name}", - "--custom-payload-path", + "--custom-payload", str(custom_payload_path), + "--test-name", + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -455,6 +432,7 @@ def my_asset( @parametrize("asset_key", [None, ["my_asset"]]) def test_message_report_asset_materialization( self, + request, data_version: Optional[str], asset_key: Optional[List[str]], tmpdir_factory, @@ -491,14 +469,11 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ) -> MaterializeResult: - job_name = context.run.job_name - args = self.BASE_ARGS + [ - "--full", - "--env", - f"--job-name={job_name}", "--report-asset-materialization", str(asset_materialization_path), + "--test-name", + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run( @@ -547,6 +522,7 @@ def my_asset( @parametrize("asset_key", [None, ["my_asset"]]) def test_message_report_asset_check( self, + request, passed: bool, asset_key: Optional[List[str]], severity: PipesAssetCheckSeverity, @@ -556,9 +532,7 @@ def test_message_report_asset_check( """This test checks if the external process sends asset checks correctly.""" if not PIPES_CONFIG.messages.report_asset_check: - pytest.skip( - "messages.report_asset_check is not enabled in pipes.toml" - ) + pytest.skip("messages.report_asset_check is not enabled in pipes.toml") work_dir = tmpdir_factory.mktemp("work_dir") @@ -588,14 +562,11 @@ def my_asset( context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient, ): - job_name = context.run.job_name - args = self.BASE_ARGS + [ - "--full", - "--env", - f"--job-name={job_name}", "--report-asset-check", str(report_asset_check_path), + "--test-name", + _get_current_test_name(request), ] invocation_result = pipes_subprocess_client.run(