Skip to content

Commit

Permalink
(dagit-out-of-process-4) Implement get_external_pipeline for OutOfPro…
Browse files Browse the repository at this point in the history
…cessRepositoryLocation

Summary:
Here we implement the `get_external_pipeline` method on the `OutOfProcessRepositoryLocation`. This lets us enable the test matrix on several sets of tests that query for `pipelineOrError` or `runConfigSchemaOrError`.

This also adds a new method to the `DagsterGraphQLContext`: `validate_external_pipeline`. Given a selector, this method returns whether a give pipeline and a subset are valid. This is used in the `get_external_pipeline` method for proper error handling over the host/user process boundary.

Test Plan: unit

Reviewers: alangenfeld, schrockn

Reviewed By: alangenfeld, schrockn

Differential Revision: https://dagster.phacility.com/D3223
  • Loading branch information
helloworld committed Jun 3, 2020
1 parent c06d58f commit eb32d56
Show file tree
Hide file tree
Showing 12 changed files with 504 additions and 219 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
from dagster_graphql.implementation.utils import UserFacingGraphQLError
from dagster_graphql.schema.errors import DauphinInvalidSubsetError
from dagster_graphql.schema.pipelines import DauphinPipeline

from dagster import check
from dagster.core.host_representation import (
InProcessRepositoryLocation,
PipelineSelector,
RepositoryLocation,
)
from dagster.core.host_representation.external import ExternalPipeline
from dagster.core.instance import DagsterInstance


Expand Down Expand Up @@ -35,11 +40,32 @@ def get_repository_location(self, name):
def has_repository_location(self, name):
return name in self._repository_locations

def get_external_pipeline(self, selector):
def get_subset_external_pipeline(self, selector):
check.inst_param(selector, 'selector', PipelineSelector)
# We have to grab the pipeline from the location instead of the repository directly
# since we may have to request a subset we don't have in memory yet
return self._repository_locations[selector.location_name].get_external_pipeline(selector)

repository_location = self._repository_locations[selector.location_name]
external_repository = repository_location.get_repository(selector.repository_name)

subset_result = repository_location.get_subset_external_pipeline_result(selector)
if not subset_result.success:
error_info = subset_result.error
raise UserFacingGraphQLError(
DauphinInvalidSubsetError(
message="{message}{cause_message}".format(
message=error_info.message,
cause_message="\n{}".format(error_info.cause.message)
if error_info.cause
else "",
),
pipeline=DauphinPipeline(self.get_full_external_pipeline(selector)),
)
)

return ExternalPipeline(
subset_result.external_pipeline_data, repository_handle=external_repository.handle,
)

def has_external_pipeline(self, selector):
check.inst_param(selector, 'selector', PipelineSelector)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
import sys

from graphql.execution.base import ResolveInfo

from dagster import check
from dagster.config.validate import validate_config_from_snap
from dagster.core.errors import DagsterInvalidSubsetError
from dagster.core.host_representation import (
ExternalExecutionPlan,
ExternalPipeline,
PipelineSelector,
)
from dagster.utils.error import serializable_error_info_from_exc_info

from .utils import UserFacingGraphQLError, capture_dauphin_error, legacy_pipeline_selector

Expand All @@ -32,7 +28,7 @@ def legacy_get_external_pipeline_or_raise(graphene_info, pipeline_name, solid_su
check.str_param(pipeline_name, 'pipeline_name')
check.opt_list_param(solid_subset, 'solid_subset', of_type=str)

return graphene_info.context.get_external_pipeline(
return graphene_info.context.get_subset_external_pipeline(
legacy_pipeline_selector(graphene_info.context, pipeline_name, solid_subset)
)

Expand All @@ -59,19 +55,7 @@ def get_external_pipeline_or_raise(graphene_info, selector):
)
)

try:
return graphene_info.context.get_external_pipeline(selector)
except DagsterInvalidSubsetError:
error_info = serializable_error_info_from_exc_info(sys.exc_info())
raise UserFacingGraphQLError(
DauphinInvalidSubsetError(
message="{message}\n{cause_message}".format(
message=error_info.message,
cause_message=error_info.cause.message if error_info.cause else "",
),
pipeline=graphene_info.schema.type_named('Pipeline')(full_pipeline),
)
)
return graphene_info.context.get_subset_external_pipeline(selector)


def ensure_valid_config(external_pipeline, mode, environment_dict):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,9 @@ def resolve_solidSubset(self, _graphene_info):
return self._active_preset_data.solid_subset

def resolve_runConfigYaml(self, _graphene_info):
yaml_str = yaml.dump(self._active_preset_data.environment_dict, default_flow_style=False)
yaml_str = yaml.safe_dump(
self._active_preset_data.environment_dict, default_flow_style=False
)
return yaml_str if yaml_str else ''

def resolve_mode(self, _graphene_info):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

snapshots = Snapshot()

snapshots['test_basic_valid_config_on_run_config_schema 1'] = {
snapshots['TestEnvironmentSchema.test_basic_valid_config_on_run_config_schema[readonly_in_memory_instance_in_process_env] 1'] = {
'runConfigSchemaOrError': {
'isRunConfigValid': {
'__typename': 'PipelineConfigValidationValid',
Expand All @@ -15,7 +15,133 @@
}
}

snapshots['test_basic_invalid_config_on_run_config_schema 1'] = {
snapshots['TestEnvironmentSchema.test_basic_valid_config_on_run_config_schema[readonly_in_memory_instance_out_of_process_env] 1'] = {
'runConfigSchemaOrError': {
'isRunConfigValid': {
'__typename': 'PipelineConfigValidationValid',
'pipelineName': 'csv_hello_world'
}
}
}

snapshots['TestEnvironmentSchema.test_basic_valid_config_on_run_config_schema[readonly_sqlite_instance_in_process_env] 1'] = {
'runConfigSchemaOrError': {
'isRunConfigValid': {
'__typename': 'PipelineConfigValidationValid',
'pipelineName': 'csv_hello_world'
}
}
}

snapshots['TestEnvironmentSchema.test_basic_valid_config_on_run_config_schema[readonly_sqlite_instance_out_of_process_env] 1'] = {
'runConfigSchemaOrError': {
'isRunConfigValid': {
'__typename': 'PipelineConfigValidationValid',
'pipelineName': 'csv_hello_world'
}
}
}

snapshots['TestEnvironmentSchema.test_basic_invalid_config_on_run_config_schema[readonly_in_memory_instance_in_process_env] 1'] = {
'runConfigSchemaOrError': {
'isRunConfigValid': {
'__typename': 'PipelineConfigValidationInvalid',
'errors': [
{
'__typename': 'FieldNotDefinedConfigError',
'fieldName': 'nope',
'message': 'Undefined field "nope" at the root. Expected: "{ execution?: { in_process?: { config?: { marker_to_close?: String retries?: { disabled?: { } enabled?: { } } } } multiprocess?: { config?: { max_concurrent?: Int retries?: { disabled?: { } enabled?: { } } } } } loggers?: { console?: { config?: { log_level?: String name?: String } } } resources?: { } solids: { sum_solid: { inputs: { num: String } outputs?: [{ result?: String }] } sum_sq_solid?: { outputs?: [{ result?: String }] } } storage?: { filesystem?: { config?: { base_dir?: String } } in_memory?: { } } }".',
'reason': 'FIELD_NOT_DEFINED',
'stack': {
'entries': [
]
}
},
{
'__typename': 'MissingFieldConfigError',
'field': {
'name': 'solids'
},
'message': 'Missing required field "solids" at the root. Available Fields: "[\'execution\', \'loggers\', \'resources\', \'solids\', \'storage\']".',
'reason': 'MISSING_REQUIRED_FIELD',
'stack': {
'entries': [
]
}
}
],
'pipelineName': 'csv_hello_world'
}
}
}

snapshots['TestEnvironmentSchema.test_basic_invalid_config_on_run_config_schema[readonly_in_memory_instance_out_of_process_env] 1'] = {
'runConfigSchemaOrError': {
'isRunConfigValid': {
'__typename': 'PipelineConfigValidationInvalid',
'errors': [
{
'__typename': 'FieldNotDefinedConfigError',
'fieldName': 'nope',
'message': 'Undefined field "nope" at the root. Expected: "{ execution?: { in_process?: { config?: { marker_to_close?: String retries?: { disabled?: { } enabled?: { } } } } multiprocess?: { config?: { max_concurrent?: Int retries?: { disabled?: { } enabled?: { } } } } } loggers?: { console?: { config?: { log_level?: String name?: String } } } resources?: { } solids: { sum_solid: { inputs: { num: String } outputs?: [{ result?: String }] } sum_sq_solid?: { outputs?: [{ result?: String }] } } storage?: { filesystem?: { config?: { base_dir?: String } } in_memory?: { } } }".',
'reason': 'FIELD_NOT_DEFINED',
'stack': {
'entries': [
]
}
},
{
'__typename': 'MissingFieldConfigError',
'field': {
'name': 'solids'
},
'message': 'Missing required field "solids" at the root. Available Fields: "[\'execution\', \'loggers\', \'resources\', \'solids\', \'storage\']".',
'reason': 'MISSING_REQUIRED_FIELD',
'stack': {
'entries': [
]
}
}
],
'pipelineName': 'csv_hello_world'
}
}
}

snapshots['TestEnvironmentSchema.test_basic_invalid_config_on_run_config_schema[readonly_sqlite_instance_in_process_env] 1'] = {
'runConfigSchemaOrError': {
'isRunConfigValid': {
'__typename': 'PipelineConfigValidationInvalid',
'errors': [
{
'__typename': 'FieldNotDefinedConfigError',
'fieldName': 'nope',
'message': 'Undefined field "nope" at the root. Expected: "{ execution?: { in_process?: { config?: { marker_to_close?: String retries?: { disabled?: { } enabled?: { } } } } multiprocess?: { config?: { max_concurrent?: Int retries?: { disabled?: { } enabled?: { } } } } } loggers?: { console?: { config?: { log_level?: String name?: String } } } resources?: { } solids: { sum_solid: { inputs: { num: String } outputs?: [{ result?: String }] } sum_sq_solid?: { outputs?: [{ result?: String }] } } storage?: { filesystem?: { config?: { base_dir?: String } } in_memory?: { } } }".',
'reason': 'FIELD_NOT_DEFINED',
'stack': {
'entries': [
]
}
},
{
'__typename': 'MissingFieldConfigError',
'field': {
'name': 'solids'
},
'message': 'Missing required field "solids" at the root. Available Fields: "[\'execution\', \'loggers\', \'resources\', \'solids\', \'storage\']".',
'reason': 'MISSING_REQUIRED_FIELD',
'stack': {
'entries': [
]
}
}
],
'pipelineName': 'csv_hello_world'
}
}
}

snapshots['TestEnvironmentSchema.test_basic_invalid_config_on_run_config_schema[readonly_sqlite_instance_out_of_process_env] 1'] = {
'runConfigSchemaOrError': {
'isRunConfigValid': {
'__typename': 'PipelineConfigValidationInvalid',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,130 @@

snapshots = Snapshot()

snapshots['test_basic_preset_query_with_presets 1'] = {
snapshots['TestPresets.test_basic_preset_query_with_presets[readonly_in_memory_instance_in_process_env] 1'] = {
'pipelineOrError': {
'name': 'csv_hello_world',
'presets': [
{
'__typename': 'PipelinePreset',
'mode': 'default',
'name': 'prod',
'runConfigYaml': '''solids:
sum_solid:
inputs:
num: data/num_prod.csv
''',
'solidSubset': None
},
{
'__typename': 'PipelinePreset',
'mode': 'default',
'name': 'test',
'runConfigYaml': '''solids:
sum_solid:
inputs:
num: data/num.csv
''',
'solidSubset': None
},
{
'__typename': 'PipelinePreset',
'mode': 'default',
'name': 'test_inline',
'runConfigYaml': '''solids:
sum_solid:
inputs:
num: /data/num.csv
''',
'solidSubset': None
}
]
}
}

snapshots['TestPresets.test_basic_preset_query_with_presets[readonly_sqlite_instance_in_process_env] 1'] = {
'pipelineOrError': {
'name': 'csv_hello_world',
'presets': [
{
'__typename': 'PipelinePreset',
'mode': 'default',
'name': 'prod',
'runConfigYaml': '''solids:
sum_solid:
inputs:
num: data/num_prod.csv
''',
'solidSubset': None
},
{
'__typename': 'PipelinePreset',
'mode': 'default',
'name': 'test',
'runConfigYaml': '''solids:
sum_solid:
inputs:
num: data/num.csv
''',
'solidSubset': None
},
{
'__typename': 'PipelinePreset',
'mode': 'default',
'name': 'test_inline',
'runConfigYaml': '''solids:
sum_solid:
inputs:
num: /data/num.csv
''',
'solidSubset': None
}
]
}
}

snapshots['TestPresets.test_basic_preset_query_with_presets[readonly_in_memory_instance_out_of_process_env] 1'] = {
'pipelineOrError': {
'name': 'csv_hello_world',
'presets': [
{
'__typename': 'PipelinePreset',
'mode': 'default',
'name': 'prod',
'runConfigYaml': '''solids:
sum_solid:
inputs:
num: data/num_prod.csv
''',
'solidSubset': None
},
{
'__typename': 'PipelinePreset',
'mode': 'default',
'name': 'test',
'runConfigYaml': '''solids:
sum_solid:
inputs:
num: data/num.csv
''',
'solidSubset': None
},
{
'__typename': 'PipelinePreset',
'mode': 'default',
'name': 'test_inline',
'runConfigYaml': '''solids:
sum_solid:
inputs:
num: /data/num.csv
''',
'solidSubset': None
}
]
}
}

snapshots['TestPresets.test_basic_preset_query_with_presets[readonly_sqlite_instance_out_of_process_env] 1'] = {
'pipelineOrError': {
'name': 'csv_hello_world',
'presets': [
Expand Down
Loading

0 comments on commit eb32d56

Please sign in to comment.