From eb32d56bb987db111f9cd6dc629c875a173de023 Mon Sep 17 00:00:00 2001 From: Sashank Thupukari Date: Tue, 2 Jun 2020 05:04:07 -0400 Subject: [PATCH] (dagit-out-of-process-4) Implement get_external_pipeline for OutOfProcessRepositoryLocation Summary: Here we implement the `get_external_pipeline` method on the `OutOfProcessRepositoryLocation`. This lets us enable the test matrix on several sets of tests that query for `pipelineOrError` or `runConfigSchemaOrError`. This also adds a new method to the `DagsterGraphQLContext`: `validate_external_pipeline`. Given a selector, this method returns whether a give pipeline and a subset are valid. This is used in the `get_external_pipeline` method for proper error handling over the host/user process boundary. Test Plan: unit Reviewers: alangenfeld, schrockn Reviewed By: alangenfeld, schrockn Differential Revision: https://dagster.phacility.com/D3223 --- .../dagster_graphql/implementation/context.py | 30 +++- .../implementation/external.py | 20 +-- .../dagster_graphql/schema/pipelines.py | 4 +- .../snapshots/snap_test_environment_schema.py | 130 +++++++++++++- .../graphql/snapshots/snap_test_presets.py | 125 +++++++++++++- .../graphql/test_environment_schema.py | 162 +++++++++--------- .../graphql/test_mode_definitions.py | 79 ++++----- .../graphql/test_presets.py | 38 ++-- .../graphql/test_subset.py | 58 ++++--- .../dagster/api/snapshot_repository.py | 8 +- python_modules/dagster/dagster/cli/api.py | 34 ++-- .../repository_location.py | 35 ++-- 12 files changed, 504 insertions(+), 219 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/context.py b/python_modules/dagster-graphql/dagster_graphql/implementation/context.py index a699dc70210ce..47f04a0a980b0 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/context.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/context.py @@ -1,9 +1,14 @@ +from dagster_graphql.implementation.utils import UserFacingGraphQLError +from dagster_graphql.schema.errors import DauphinInvalidSubsetError +from dagster_graphql.schema.pipelines import DauphinPipeline + from dagster import check from dagster.core.host_representation import ( InProcessRepositoryLocation, PipelineSelector, RepositoryLocation, ) +from dagster.core.host_representation.external import ExternalPipeline from dagster.core.instance import DagsterInstance @@ -35,11 +40,32 @@ def get_repository_location(self, name): def has_repository_location(self, name): return name in self._repository_locations - def get_external_pipeline(self, selector): + def get_subset_external_pipeline(self, selector): check.inst_param(selector, 'selector', PipelineSelector) # We have to grab the pipeline from the location instead of the repository directly # since we may have to request a subset we don't have in memory yet - return self._repository_locations[selector.location_name].get_external_pipeline(selector) + + repository_location = self._repository_locations[selector.location_name] + external_repository = repository_location.get_repository(selector.repository_name) + + subset_result = repository_location.get_subset_external_pipeline_result(selector) + if not subset_result.success: + error_info = subset_result.error + raise UserFacingGraphQLError( + DauphinInvalidSubsetError( + message="{message}{cause_message}".format( + message=error_info.message, + cause_message="\n{}".format(error_info.cause.message) + if error_info.cause + else "", + ), + pipeline=DauphinPipeline(self.get_full_external_pipeline(selector)), + ) + ) + + return ExternalPipeline( + subset_result.external_pipeline_data, repository_handle=external_repository.handle, + ) def has_external_pipeline(self, selector): check.inst_param(selector, 'selector', PipelineSelector) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/external.py b/python_modules/dagster-graphql/dagster_graphql/implementation/external.py index a295577289434..431c2b28d4a3c 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/external.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/external.py @@ -1,16 +1,12 @@ -import sys - from graphql.execution.base import ResolveInfo from dagster import check from dagster.config.validate import validate_config_from_snap -from dagster.core.errors import DagsterInvalidSubsetError from dagster.core.host_representation import ( ExternalExecutionPlan, ExternalPipeline, PipelineSelector, ) -from dagster.utils.error import serializable_error_info_from_exc_info from .utils import UserFacingGraphQLError, capture_dauphin_error, legacy_pipeline_selector @@ -32,7 +28,7 @@ def legacy_get_external_pipeline_or_raise(graphene_info, pipeline_name, solid_su check.str_param(pipeline_name, 'pipeline_name') check.opt_list_param(solid_subset, 'solid_subset', of_type=str) - return graphene_info.context.get_external_pipeline( + return graphene_info.context.get_subset_external_pipeline( legacy_pipeline_selector(graphene_info.context, pipeline_name, solid_subset) ) @@ -59,19 +55,7 @@ def get_external_pipeline_or_raise(graphene_info, selector): ) ) - try: - return graphene_info.context.get_external_pipeline(selector) - except DagsterInvalidSubsetError: - error_info = serializable_error_info_from_exc_info(sys.exc_info()) - raise UserFacingGraphQLError( - DauphinInvalidSubsetError( - message="{message}\n{cause_message}".format( - message=error_info.message, - cause_message=error_info.cause.message if error_info.cause else "", - ), - pipeline=graphene_info.schema.type_named('Pipeline')(full_pipeline), - ) - ) + return graphene_info.context.get_subset_external_pipeline(selector) def ensure_valid_config(external_pipeline, mode, environment_dict): diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/pipelines.py b/python_modules/dagster-graphql/dagster_graphql/schema/pipelines.py index fd6429c228692..81a88964e2bb7 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/pipelines.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/pipelines.py @@ -351,7 +351,9 @@ def resolve_solidSubset(self, _graphene_info): return self._active_preset_data.solid_subset def resolve_runConfigYaml(self, _graphene_info): - yaml_str = yaml.dump(self._active_preset_data.environment_dict, default_flow_style=False) + yaml_str = yaml.safe_dump( + self._active_preset_data.environment_dict, default_flow_style=False + ) return yaml_str if yaml_str else '' def resolve_mode(self, _graphene_info): diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_environment_schema.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_environment_schema.py index c10194518a553..1595ea7654491 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_environment_schema.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_environment_schema.py @@ -6,7 +6,7 @@ snapshots = Snapshot() -snapshots['test_basic_valid_config_on_run_config_schema 1'] = { +snapshots['TestEnvironmentSchema.test_basic_valid_config_on_run_config_schema[readonly_in_memory_instance_in_process_env] 1'] = { 'runConfigSchemaOrError': { 'isRunConfigValid': { '__typename': 'PipelineConfigValidationValid', @@ -15,7 +15,133 @@ } } -snapshots['test_basic_invalid_config_on_run_config_schema 1'] = { +snapshots['TestEnvironmentSchema.test_basic_valid_config_on_run_config_schema[readonly_in_memory_instance_out_of_process_env] 1'] = { + 'runConfigSchemaOrError': { + 'isRunConfigValid': { + '__typename': 'PipelineConfigValidationValid', + 'pipelineName': 'csv_hello_world' + } + } +} + +snapshots['TestEnvironmentSchema.test_basic_valid_config_on_run_config_schema[readonly_sqlite_instance_in_process_env] 1'] = { + 'runConfigSchemaOrError': { + 'isRunConfigValid': { + '__typename': 'PipelineConfigValidationValid', + 'pipelineName': 'csv_hello_world' + } + } +} + +snapshots['TestEnvironmentSchema.test_basic_valid_config_on_run_config_schema[readonly_sqlite_instance_out_of_process_env] 1'] = { + 'runConfigSchemaOrError': { + 'isRunConfigValid': { + '__typename': 'PipelineConfigValidationValid', + 'pipelineName': 'csv_hello_world' + } + } +} + +snapshots['TestEnvironmentSchema.test_basic_invalid_config_on_run_config_schema[readonly_in_memory_instance_in_process_env] 1'] = { + 'runConfigSchemaOrError': { + 'isRunConfigValid': { + '__typename': 'PipelineConfigValidationInvalid', + 'errors': [ + { + '__typename': 'FieldNotDefinedConfigError', + 'fieldName': 'nope', + 'message': 'Undefined field "nope" at the root. Expected: "{ execution?: { in_process?: { config?: { marker_to_close?: String retries?: { disabled?: { } enabled?: { } } } } multiprocess?: { config?: { max_concurrent?: Int retries?: { disabled?: { } enabled?: { } } } } } loggers?: { console?: { config?: { log_level?: String name?: String } } } resources?: { } solids: { sum_solid: { inputs: { num: String } outputs?: [{ result?: String }] } sum_sq_solid?: { outputs?: [{ result?: String }] } } storage?: { filesystem?: { config?: { base_dir?: String } } in_memory?: { } } }".', + 'reason': 'FIELD_NOT_DEFINED', + 'stack': { + 'entries': [ + ] + } + }, + { + '__typename': 'MissingFieldConfigError', + 'field': { + 'name': 'solids' + }, + 'message': 'Missing required field "solids" at the root. Available Fields: "[\'execution\', \'loggers\', \'resources\', \'solids\', \'storage\']".', + 'reason': 'MISSING_REQUIRED_FIELD', + 'stack': { + 'entries': [ + ] + } + } + ], + 'pipelineName': 'csv_hello_world' + } + } +} + +snapshots['TestEnvironmentSchema.test_basic_invalid_config_on_run_config_schema[readonly_in_memory_instance_out_of_process_env] 1'] = { + 'runConfigSchemaOrError': { + 'isRunConfigValid': { + '__typename': 'PipelineConfigValidationInvalid', + 'errors': [ + { + '__typename': 'FieldNotDefinedConfigError', + 'fieldName': 'nope', + 'message': 'Undefined field "nope" at the root. Expected: "{ execution?: { in_process?: { config?: { marker_to_close?: String retries?: { disabled?: { } enabled?: { } } } } multiprocess?: { config?: { max_concurrent?: Int retries?: { disabled?: { } enabled?: { } } } } } loggers?: { console?: { config?: { log_level?: String name?: String } } } resources?: { } solids: { sum_solid: { inputs: { num: String } outputs?: [{ result?: String }] } sum_sq_solid?: { outputs?: [{ result?: String }] } } storage?: { filesystem?: { config?: { base_dir?: String } } in_memory?: { } } }".', + 'reason': 'FIELD_NOT_DEFINED', + 'stack': { + 'entries': [ + ] + } + }, + { + '__typename': 'MissingFieldConfigError', + 'field': { + 'name': 'solids' + }, + 'message': 'Missing required field "solids" at the root. Available Fields: "[\'execution\', \'loggers\', \'resources\', \'solids\', \'storage\']".', + 'reason': 'MISSING_REQUIRED_FIELD', + 'stack': { + 'entries': [ + ] + } + } + ], + 'pipelineName': 'csv_hello_world' + } + } +} + +snapshots['TestEnvironmentSchema.test_basic_invalid_config_on_run_config_schema[readonly_sqlite_instance_in_process_env] 1'] = { + 'runConfigSchemaOrError': { + 'isRunConfigValid': { + '__typename': 'PipelineConfigValidationInvalid', + 'errors': [ + { + '__typename': 'FieldNotDefinedConfigError', + 'fieldName': 'nope', + 'message': 'Undefined field "nope" at the root. Expected: "{ execution?: { in_process?: { config?: { marker_to_close?: String retries?: { disabled?: { } enabled?: { } } } } multiprocess?: { config?: { max_concurrent?: Int retries?: { disabled?: { } enabled?: { } } } } } loggers?: { console?: { config?: { log_level?: String name?: String } } } resources?: { } solids: { sum_solid: { inputs: { num: String } outputs?: [{ result?: String }] } sum_sq_solid?: { outputs?: [{ result?: String }] } } storage?: { filesystem?: { config?: { base_dir?: String } } in_memory?: { } } }".', + 'reason': 'FIELD_NOT_DEFINED', + 'stack': { + 'entries': [ + ] + } + }, + { + '__typename': 'MissingFieldConfigError', + 'field': { + 'name': 'solids' + }, + 'message': 'Missing required field "solids" at the root. Available Fields: "[\'execution\', \'loggers\', \'resources\', \'solids\', \'storage\']".', + 'reason': 'MISSING_REQUIRED_FIELD', + 'stack': { + 'entries': [ + ] + } + } + ], + 'pipelineName': 'csv_hello_world' + } + } +} + +snapshots['TestEnvironmentSchema.test_basic_invalid_config_on_run_config_schema[readonly_sqlite_instance_out_of_process_env] 1'] = { 'runConfigSchemaOrError': { 'isRunConfigValid': { '__typename': 'PipelineConfigValidationInvalid', diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_presets.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_presets.py index 2865f4059bcb2..320085e337fca 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_presets.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/snapshots/snap_test_presets.py @@ -6,7 +6,130 @@ snapshots = Snapshot() -snapshots['test_basic_preset_query_with_presets 1'] = { +snapshots['TestPresets.test_basic_preset_query_with_presets[readonly_in_memory_instance_in_process_env] 1'] = { + 'pipelineOrError': { + 'name': 'csv_hello_world', + 'presets': [ + { + '__typename': 'PipelinePreset', + 'mode': 'default', + 'name': 'prod', + 'runConfigYaml': '''solids: + sum_solid: + inputs: + num: data/num_prod.csv +''', + 'solidSubset': None + }, + { + '__typename': 'PipelinePreset', + 'mode': 'default', + 'name': 'test', + 'runConfigYaml': '''solids: + sum_solid: + inputs: + num: data/num.csv +''', + 'solidSubset': None + }, + { + '__typename': 'PipelinePreset', + 'mode': 'default', + 'name': 'test_inline', + 'runConfigYaml': '''solids: + sum_solid: + inputs: + num: /data/num.csv +''', + 'solidSubset': None + } + ] + } +} + +snapshots['TestPresets.test_basic_preset_query_with_presets[readonly_sqlite_instance_in_process_env] 1'] = { + 'pipelineOrError': { + 'name': 'csv_hello_world', + 'presets': [ + { + '__typename': 'PipelinePreset', + 'mode': 'default', + 'name': 'prod', + 'runConfigYaml': '''solids: + sum_solid: + inputs: + num: data/num_prod.csv +''', + 'solidSubset': None + }, + { + '__typename': 'PipelinePreset', + 'mode': 'default', + 'name': 'test', + 'runConfigYaml': '''solids: + sum_solid: + inputs: + num: data/num.csv +''', + 'solidSubset': None + }, + { + '__typename': 'PipelinePreset', + 'mode': 'default', + 'name': 'test_inline', + 'runConfigYaml': '''solids: + sum_solid: + inputs: + num: /data/num.csv +''', + 'solidSubset': None + } + ] + } +} + +snapshots['TestPresets.test_basic_preset_query_with_presets[readonly_in_memory_instance_out_of_process_env] 1'] = { + 'pipelineOrError': { + 'name': 'csv_hello_world', + 'presets': [ + { + '__typename': 'PipelinePreset', + 'mode': 'default', + 'name': 'prod', + 'runConfigYaml': '''solids: + sum_solid: + inputs: + num: data/num_prod.csv +''', + 'solidSubset': None + }, + { + '__typename': 'PipelinePreset', + 'mode': 'default', + 'name': 'test', + 'runConfigYaml': '''solids: + sum_solid: + inputs: + num: data/num.csv +''', + 'solidSubset': None + }, + { + '__typename': 'PipelinePreset', + 'mode': 'default', + 'name': 'test_inline', + 'runConfigYaml': '''solids: + sum_solid: + inputs: + num: /data/num.csv +''', + 'solidSubset': None + } + ] + } +} + +snapshots['TestPresets.test_basic_preset_query_with_presets[readonly_sqlite_instance_out_of_process_env] 1'] = { 'pipelineOrError': { 'name': 'csv_hello_world', 'presets': [ diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_environment_schema.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_environment_schema.py index 80b5635342236..c639132aea891 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_environment_schema.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_environment_schema.py @@ -1,5 +1,9 @@ +import sys + +import pytest from dagster_graphql.test.utils import execute_dagster_graphql, get_legacy_pipeline_selector +from .graphql_context_test_suite import ReadonlyGraphQLContextTestMatrix from .setup import csv_hello_world_solids_config RUN_CONFIG_SCHEMA_QUERY = ''' @@ -19,49 +23,6 @@ } ''' - -def test_successful_run_config_schema(graphql_context): - selector = get_legacy_pipeline_selector(graphql_context, 'multi_mode_with_resources') - result = execute_dagster_graphql( - graphql_context, - RUN_CONFIG_SCHEMA_QUERY, - variables={'selector': selector, 'mode': 'add_mode',}, - ) - assert result.data['runConfigSchemaOrError']['__typename'] == 'RunConfigSchema' - - -def test_run_config_schema_pipeline_not_found(graphql_context): - selector = get_legacy_pipeline_selector(graphql_context, 'jkdjfkdjfd') - result = execute_dagster_graphql( - graphql_context, - RUN_CONFIG_SCHEMA_QUERY, - variables={'selector': selector, 'mode': 'add_mode'}, - ) - assert result.data['runConfigSchemaOrError']['__typename'] == 'PipelineNotFoundError' - - -def test_run_config_schema_solid_not_found(graphql_context): - selector = get_legacy_pipeline_selector( - graphql_context, 'multi_mode_with_resources', ['kdjfkdj'] - ) - result = execute_dagster_graphql( - graphql_context, - RUN_CONFIG_SCHEMA_QUERY, - variables={'selector': selector, 'mode': 'add_mode',}, - ) - assert result.data['runConfigSchemaOrError']['__typename'] == 'InvalidSubsetError' - - -def test_run_config_schema_mode_not_found(graphql_context): - selector = get_legacy_pipeline_selector(graphql_context, 'multi_mode_with_resources') - result = execute_dagster_graphql( - graphql_context, - RUN_CONFIG_SCHEMA_QUERY, - variables={'selector': selector, 'mode': 'kdjfdk'}, - ) - assert result.data['runConfigSchemaOrError']['__typename'] == 'ModeNotFoundError' - - RUN_CONFIG_SCHEMA_CONFIG_TYPE_QUERY = ''' query($selector: PipelineSelector! $mode: String! $configTypeName: String!) { @@ -146,39 +107,82 @@ def test_run_config_schema_mode_not_found(graphql_context): ''' -def test_basic_valid_config_on_run_config_schema(graphql_context, snapshot): - selector = get_legacy_pipeline_selector(graphql_context, 'csv_hello_world') - result = execute_dagster_graphql( - graphql_context, - RUN_CONFIG_SCHEMA_CONFIG_VALIDATION_QUERY, - variables={ - 'selector': selector, - 'mode': 'default', - 'runConfigData': csv_hello_world_solids_config(), - }, - ) - - assert not result.errors - assert result.data - assert ( - result.data['runConfigSchemaOrError']['isRunConfigValid']['__typename'] - == 'PipelineConfigValidationValid' - ) - snapshot.assert_match(result.data) - - -def test_basic_invalid_config_on_run_config_schema(graphql_context, snapshot): - selector = get_legacy_pipeline_selector(graphql_context, 'csv_hello_world') - result = execute_dagster_graphql( - graphql_context, - RUN_CONFIG_SCHEMA_CONFIG_VALIDATION_QUERY, - variables={'selector': selector, 'mode': 'default', 'runConfigData': {'nope': 'kdjfd'},}, - ) - - assert not result.errors - assert result.data - assert ( - result.data['runConfigSchemaOrError']['isRunConfigValid']['__typename'] - == 'PipelineConfigValidationInvalid' - ) - snapshot.assert_match(result.data) +class TestEnvironmentSchema(ReadonlyGraphQLContextTestMatrix): + def test_successful_run_config_schema(self, graphql_context): + selector = get_legacy_pipeline_selector(graphql_context, 'multi_mode_with_resources') + result = execute_dagster_graphql( + graphql_context, + RUN_CONFIG_SCHEMA_QUERY, + variables={'selector': selector, 'mode': 'add_mode',}, + ) + assert result.data['runConfigSchemaOrError']['__typename'] == 'RunConfigSchema' + + def test_run_config_schema_pipeline_not_found(self, graphql_context): + selector = get_legacy_pipeline_selector(graphql_context, 'jkdjfkdjfd') + result = execute_dagster_graphql( + graphql_context, + RUN_CONFIG_SCHEMA_QUERY, + variables={'selector': selector, 'mode': 'add_mode'}, + ) + assert result.data['runConfigSchemaOrError']['__typename'] == 'PipelineNotFoundError' + + def test_run_config_schema_solid_not_found(self, graphql_context): + selector = get_legacy_pipeline_selector( + graphql_context, 'multi_mode_with_resources', ['kdjfkdj'] + ) + result = execute_dagster_graphql( + graphql_context, + RUN_CONFIG_SCHEMA_QUERY, + variables={'selector': selector, 'mode': 'add_mode',}, + ) + assert result.data['runConfigSchemaOrError']['__typename'] == 'InvalidSubsetError' + + def test_run_config_schema_mode_not_found(self, graphql_context): + selector = get_legacy_pipeline_selector(graphql_context, 'multi_mode_with_resources') + result = execute_dagster_graphql( + graphql_context, + RUN_CONFIG_SCHEMA_QUERY, + variables={'selector': selector, 'mode': 'kdjfdk'}, + ) + assert result.data['runConfigSchemaOrError']['__typename'] == 'ModeNotFoundError' + + def test_basic_valid_config_on_run_config_schema(self, graphql_context, snapshot): + selector = get_legacy_pipeline_selector(graphql_context, 'csv_hello_world') + result = execute_dagster_graphql( + graphql_context, + RUN_CONFIG_SCHEMA_CONFIG_VALIDATION_QUERY, + variables={ + 'selector': selector, + 'mode': 'default', + 'runConfigData': csv_hello_world_solids_config(), + }, + ) + + assert not result.errors + assert result.data + assert ( + result.data['runConfigSchemaOrError']['isRunConfigValid']['__typename'] + == 'PipelineConfigValidationValid' + ) + snapshot.assert_match(result.data) + + @pytest.mark.skipif(sys.version_info < (3,), reason="Snapshot not working correctly on py2") + def test_basic_invalid_config_on_run_config_schema(self, graphql_context, snapshot): + selector = get_legacy_pipeline_selector(graphql_context, 'csv_hello_world') + result = execute_dagster_graphql( + graphql_context, + RUN_CONFIG_SCHEMA_CONFIG_VALIDATION_QUERY, + variables={ + 'selector': selector, + 'mode': 'default', + 'runConfigData': {'nope': 'kdjfd'}, + }, + ) + + assert not result.errors + assert result.data + assert ( + result.data['runConfigSchemaOrError']['isRunConfigValid']['__typename'] + == 'PipelineConfigValidationInvalid' + ) + snapshot.assert_match(result.data) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_mode_definitions.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_mode_definitions.py index 975e22791992b..69abaa701341c 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_mode_definitions.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_mode_definitions.py @@ -2,8 +2,33 @@ import pytest from dagster_graphql.test.utils import execute_dagster_graphql, get_legacy_pipeline_selector +from .graphql_context_test_suite import ReadonlyGraphQLContextTestMatrix from .utils import sync_execute_get_events +MODE_QUERY = ''' +query ModesQuery($selector: PipelineSelector!, $mode: String!) +{ + runConfigSchemaOrError(selector: $selector, mode: $mode ) { + __typename + ... on RunConfigSchema { + rootConfigType { + key + ... on CompositeConfigType { + fields { + configType { + key + } + } + } + } + allConfigTypes { + key + } + } + } +} +''' + def get_step_output(logs, step_key): for log in logs: @@ -11,6 +36,13 @@ def get_step_output(logs, step_key): return log +def execute_modes_query(context, pipeline_name, mode): + selector = get_legacy_pipeline_selector(context, pipeline_name) + return execute_dagster_graphql( + context, MODE_QUERY, variables={'selector': selector, 'mode': mode,}, + ) + + def test_multi_mode_successful(graphql_context): selector = get_legacy_pipeline_selector(graphql_context, 'multi_mode_with_resources') add_mode_logs = sync_execute_get_events( @@ -50,43 +82,12 @@ def test_multi_mode_successful(graphql_context): get_step_output(double_adder_mode_logs, 'apply_to_three.compute') -MODE_QUERY = ''' -query ModesQuery($selector: PipelineSelector!, $mode: String!) -{ - runConfigSchemaOrError(selector: $selector, mode: $mode ) { - __typename - ... on RunConfigSchema { - rootConfigType { - key - ... on CompositeConfigType { - fields { - configType { - key - } - } - } - } - allConfigTypes { - key - } - } - } -} -''' - - -def execute_modes_query(context, pipeline_name, mode): - selector = get_legacy_pipeline_selector(context, pipeline_name) - return execute_dagster_graphql( - context, MODE_QUERY, variables={'selector': selector, 'mode': mode,}, - ) - +class TestModeDefinitions(ReadonlyGraphQLContextTestMatrix): + def test_query_multi_mode(self, graphql_context): + with pytest.raises(graphql.error.base.GraphQLError): + execute_modes_query(graphql_context, 'multi_mode_with_resources', mode=None) -def test_query_multi_mode(graphql_context): - with pytest.raises(graphql.error.base.GraphQLError): - execute_modes_query(graphql_context, 'multi_mode_with_resources', mode=None) - - modeful_result = execute_modes_query( - graphql_context, 'multi_mode_with_resources', mode='add_mode' - ) - assert modeful_result.data['runConfigSchemaOrError']['__typename'] == 'RunConfigSchema' + modeful_result = execute_modes_query( + graphql_context, 'multi_mode_with_resources', mode='add_mode' + ) + assert modeful_result.data['runConfigSchemaOrError']['__typename'] == 'RunConfigSchema' diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_presets.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_presets.py index 1c8690af33ddc..9a3b0554e7198 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_presets.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_presets.py @@ -3,27 +3,27 @@ from dagster_graphql.test.preset_query import execute_preset_query +from .graphql_context_test_suite import ReadonlyGraphQLContextTestMatrix -def test_basic_preset_query_no_presets(graphql_context): - result = execute_preset_query('csv_hello_world_two', graphql_context) - assert result.data == OrderedDict( - [('pipelineOrError', OrderedDict([('name', 'csv_hello_world_two'), ('presets', [])]))] - ) +class TestPresets(ReadonlyGraphQLContextTestMatrix): + def test_basic_preset_query_no_presets(self, graphql_context): + result = execute_preset_query('csv_hello_world_two', graphql_context) + assert result.data == OrderedDict( + [('pipelineOrError', OrderedDict([('name', 'csv_hello_world_two'), ('presets', [])]))] + ) -def test_basic_preset_query_with_presets(graphql_context, snapshot): - result = execute_preset_query('csv_hello_world', graphql_context) + def test_basic_preset_query_with_presets(self, graphql_context, snapshot): + result = execute_preset_query('csv_hello_world', graphql_context) - assert [preset_data['name'] for preset_data in result.data['pipelineOrError']['presets']] == [ - 'prod', - 'test', - 'test_inline', - ] + assert [ + preset_data['name'] for preset_data in result.data['pipelineOrError']['presets'] + ] == ['prod', 'test', 'test_inline',] - # Remove local filepath from snapshot - result.data['pipelineOrError']['presets'][2]['runConfigYaml'] = re.sub( - r'num: .*/data/num.csv', - 'num: /data/num.csv', - result.data['pipelineOrError']['presets'][2]['runConfigYaml'], - ) - snapshot.assert_match(result.data) + # Remove local filepath from snapshot + result.data['pipelineOrError']['presets'][2]['runConfigYaml'] = re.sub( + r'num: .*/data/num.csv', + 'num: /data/num.csv', + result.data['pipelineOrError']['presets'][2]['runConfigYaml'], + ) + snapshot.assert_match(result.data) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_subset.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_subset.py index cc3bc65b335c0..8efc910fe8dba 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_subset.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_subset.py @@ -4,6 +4,8 @@ import pytest from dagster_graphql.test.utils import execute_dagster_graphql, get_legacy_pipeline_selector +from .graphql_context_test_suite import ReadonlyGraphQLContextTestMatrix + SCHEMA_OR_ERROR_SUBSET_QUERY = ''' query EnvironmentQuery($selector: PipelineSelector!){ runConfigSchemaOrError(selector: $selector) { @@ -47,34 +49,34 @@ def types_dict_of_result(subset_result, top_key): } -def test_csv_hello_world_pipeline_or_error_subset_wrong_solid_name(graphql_context): - selector = get_legacy_pipeline_selector(graphql_context, 'csv_hello_world', ['nope']) - result = execute_dagster_graphql( - graphql_context, SCHEMA_OR_ERROR_SUBSET_QUERY, {'selector': selector} - ) - - assert not result.errors - assert result.data - assert result.data['runConfigSchemaOrError']['__typename'] == 'InvalidSubsetError' - assert '"nope" does not exist' in result.data['runConfigSchemaOrError']['message'] +class TestSolidSubsets(ReadonlyGraphQLContextTestMatrix): + def test_csv_hello_world_pipeline_or_error_subset_wrong_solid_name(self, graphql_context): + selector = get_legacy_pipeline_selector(graphql_context, 'csv_hello_world', ['nope']) + result = execute_dagster_graphql( + graphql_context, SCHEMA_OR_ERROR_SUBSET_QUERY, {'selector': selector} + ) + assert not result.errors + assert result.data + assert result.data['runConfigSchemaOrError']['__typename'] == 'InvalidSubsetError' + assert '"nope" does not exist' in result.data['runConfigSchemaOrError']['message'] -@pytest.mark.skipif(sys.version_info.major < 3, reason='Exception cause only available on py3+') -def test_pipeline_with_invalid_definition_error(graphql_context): - selector = get_legacy_pipeline_selector( - graphql_context, 'pipeline_with_invalid_definition_error', ['fail_subset'] - ) - result = execute_dagster_graphql( - graphql_context, SCHEMA_OR_ERROR_SUBSET_QUERY, {'selector': selector} - ) - assert not result.errors - assert result.data - assert result.data['runConfigSchemaOrError']['__typename'] == 'InvalidSubsetError' + @pytest.mark.skipif(sys.version_info.major < 3, reason='Exception cause only available on py3+') + def test_pipeline_with_invalid_definition_error(self, graphql_context): + selector = get_legacy_pipeline_selector( + graphql_context, 'pipeline_with_invalid_definition_error', ['fail_subset'] + ) + result = execute_dagster_graphql( + graphql_context, SCHEMA_OR_ERROR_SUBSET_QUERY, {'selector': selector} + ) + assert not result.errors + assert result.data + assert result.data['runConfigSchemaOrError']['__typename'] == 'InvalidSubsetError' - assert re.match( - ( - r'.*DagsterInvalidSubsetError[\s\S]*' - r'add a input_hydration_config for the type "InputTypeWithoutHydration"' - ), - result.data['runConfigSchemaOrError']['message'], - ) + assert re.match( + ( + r'.*DagsterInvalidSubsetError[\s\S]*' + r'add a input_hydration_config for the type "InputTypeWithoutHydration"' + ), + result.data['runConfigSchemaOrError']['message'], + ) diff --git a/python_modules/dagster/dagster/api/snapshot_repository.py b/python_modules/dagster/dagster/api/snapshot_repository.py index e2d3a7ff5ced9..28b69a36b47e7 100644 --- a/python_modules/dagster/dagster/api/snapshot_repository.py +++ b/python_modules/dagster/dagster/api/snapshot_repository.py @@ -1,5 +1,3 @@ -import subprocess - from dagster import check from dagster.core.host_representation import ( ExternalRepository, @@ -11,6 +9,8 @@ from dagster.seven import xplat_shlex_split from dagster.utils.temp_file import get_temp_file_name +from .utils import execute_command_in_subprocess + def sync_get_external_repository(location_handle): check.inst_param(location_handle, 'location_handle', LocationHandle) @@ -20,8 +20,8 @@ def sync_get_external_repository(location_handle): parts = ['dagster', 'api', 'snapshot', 'repository', output_file] + xplat_shlex_split( location_handle.pointer.get_cli_args() ) - returncode = subprocess.check_call(parts) - check.invariant(returncode == 0, 'dagster api cli invocation did not complete successfully') + + execute_command_in_subprocess(parts) external_repository_data = read_unary_response(output_file) check.inst(external_repository_data, ExternalRepositoryData) diff --git a/python_modules/dagster/dagster/cli/api.py b/python_modules/dagster/dagster/cli/api.py index d39ed3c54eb5e..0c5f3893da96c 100644 --- a/python_modules/dagster/dagster/cli/api.py +++ b/python_modules/dagster/dagster/cli/api.py @@ -23,6 +23,25 @@ from dagster.serdes.ipc import ipc_write_stream, ipc_write_unary_response, setup_interrupt_support from dagster.utils.error import serializable_error_info_from_exc_info +# Helpers + + +def get_external_pipeline_subset_result(recon_pipeline, solid_subset): + check.inst_param(recon_pipeline, 'recon_pipeline', ReconstructablePipeline) + + definition = recon_pipeline.get_definition() + if solid_subset: + try: + definition = definition.subset_for_execution(solid_subset) + except DagsterInvalidSubsetError: + return ExternalPipelineSubsetResult( + success=False, error=serializable_error_info_from_exc_info(sys.exc_info()) + ) + + external_pipeline_data = external_pipeline_data_from_def(definition) + return ExternalPipelineSubsetResult(success=True, external_pipeline_data=external_pipeline_data) + + # Snapshot CLI @@ -44,22 +63,11 @@ def repository_snapshot_command(output_file, **kwargs): @click.option('--solid-subset', '-s', help="JSON encoded list of solids") def pipeline_subset_snapshot_command(output_file, solid_subset, **kwargs): recon_pipeline = recon_pipeline_for_cli_args(kwargs) - definition = recon_pipeline.get_definition() if solid_subset: - try: - definition = definition.subset_for_execution(json.loads(solid_subset)) - except DagsterInvalidSubsetError: - return ipc_write_unary_response( - output_file, - ExternalPipelineSubsetResult( - success=False, error=serializable_error_info_from_exc_info(sys.exc_info()) - ), - ) + solid_subset = json.loads(solid_subset) - external_pipeline_data = external_pipeline_data_from_def(definition) ipc_write_unary_response( - output_file, - ExternalPipelineSubsetResult(success=True, external_pipeline_data=external_pipeline_data), + output_file, get_external_pipeline_subset_result(recon_pipeline, solid_subset) ) diff --git a/python_modules/dagster/dagster/core/host_representation/repository_location.py b/python_modules/dagster/dagster/core/host_representation/repository_location.py index 6f0e8635e961f..de8bfbd8bad0a 100644 --- a/python_modules/dagster/dagster/core/host_representation/repository_location.py +++ b/python_modules/dagster/dagster/core/host_representation/repository_location.py @@ -12,13 +12,11 @@ LocationHandle, RepositoryHandle, ) +from dagster.core.host_representation.handle import PipelineHandle from dagster.core.instance import DagsterInstance -from dagster.core.snap import snapshot_from_execution_plan +from dagster.core.snap.execution_plan_snapshot import snapshot_from_execution_plan from dagster.core.storage.pipeline_run import PipelineRun -from dagster.utils.hosted_user_process import ( - external_pipeline_from_recon_pipeline, - external_repo_from_def, -) +from dagster.utils.hosted_user_process import external_repo_from_def from .selector import PipelineSelector @@ -76,7 +74,7 @@ def execute_plan( pass @abstractmethod - def get_external_pipeline(self, selector): + def get_subset_external_pipeline_result(self, selector): pass @@ -116,7 +114,7 @@ def has_repository(self, name): def get_repositories(self): return self._repositories - def get_external_pipeline(self, selector): + def get_subset_external_pipeline_result(self, selector): check.inst_param(selector, 'selector', PipelineSelector) check.invariant( selector.location_name == self.name, @@ -125,10 +123,10 @@ def get_external_pipeline(self, selector): ), ) - return external_pipeline_from_recon_pipeline( - recon_pipeline=self.get_reconstructable_pipeline(selector.pipeline_name), - solid_subset=selector.solid_subset, - repository_handle=self._external_repo.handle, + from dagster.cli.api import get_external_pipeline_subset_result + + return get_external_pipeline_subset_result( + self.get_reconstructable_pipeline(selector.pipeline_name), selector.solid_subset ) def get_external_execution_plan( @@ -219,5 +217,16 @@ def execute_plan( def execute_pipeline(self, instance, external_pipeline, pipeline_run): raise NotImplementedError() - def get_external_pipeline(self, selector): - raise NotImplementedError() + def get_subset_external_pipeline_result(self, selector): + from dagster.api.snapshot_pipeline import sync_get_external_pipeline_subset + + check.inst_param(selector, 'selector', PipelineSelector) + check.invariant( + selector.location_name == self.name, + 'PipelineSelector location_name mismatch, got {selector.location_name} expected {self.name}'.format( + self=self, selector=selector + ), + ) + + pipeline_handle = PipelineHandle(selector.pipeline_name, self.external_repository.handle) + return sync_get_external_pipeline_subset(pipeline_handle, selector.solid_subset)