From 050959c51dac1c2e3d9a3668eae24f9ead6ff5ba Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 2 May 2023 10:31:16 +0100 Subject: [PATCH 01/14] Prototype approach with settings Signed-off-by: Merel Theisen --- kedro/framework/project/__init__.py | 2 ++ kedro/io/data_catalog.py | 47 +++++++++++++++++++++++++++++ kedro/runner/runner.py | 7 +++++ 3 files changed, 56 insertions(+) diff --git a/kedro/framework/project/__init__.py b/kedro/framework/project/__init__.py index 8a4922d0ba..d7dad68667 100644 --- a/kedro/framework/project/__init__.py +++ b/kedro/framework/project/__init__.py @@ -110,6 +110,7 @@ class _ProjectSettings(LazySettings): _DATA_CATALOG_CLASS = _IsSubclassValidator( "DATA_CATALOG_CLASS", default=_get_default_class("kedro.io.DataCatalog") ) + _DATA_CATALOG_ARGS = Validator("DATA_CATALOG_ARGS", default={}) def __init__(self, *args, **kwargs): @@ -124,6 +125,7 @@ def __init__(self, *args, **kwargs): self._CONFIG_LOADER_CLASS, self._CONFIG_LOADER_ARGS, self._DATA_CATALOG_CLASS, + self._DATA_CATALOG_ARGS, ] ) super().__init__(*args, **kwargs) diff --git a/kedro/io/data_catalog.py b/kedro/io/data_catalog.py index 8014a02edd..c8087f6d2f 100644 --- a/kedro/io/data_catalog.py +++ b/kedro/io/data_catalog.py @@ -11,6 +11,10 @@ from collections import defaultdict from typing import Any, Dict, List, Optional, Set, Type, Union +from kedro_datasets.pandas import CSVDataSet +from parse import parse + +from kedro.framework.project import settings from kedro.io.core import ( AbstractDataSet, AbstractVersionedDataSet, @@ -520,6 +524,49 @@ def add_feed_dict(self, feed_dict: Dict[str, Any], replace: bool = False) -> Non self.add(data_set_name, data_set, replace) + def match_name_against_dataset_factories(self, dataset_input_name: str): + """ + def create_spark_dataset(dataset_name: str, *chunks): + # e.g. here chunks=["root_namespace", "something-instead-the-*", "spark"] + return SparkDataSet(filepath=f"data/{chunks[0]}/{chunks[1]}.parquet", file_format="parquet") + + "{root_namespace}.{dataset_name}@spark": + type: spark.SparkDataSet + filepath: data/{root_namespace}/{dataset_name}.parquet + file_format: parquet + """ + + def create_pandas_csvdataset(chunks: dict): + root_namespace = chunks.get("root_namespace") + dataset_name_chunk = chunks.get("dataset_name") + return CSVDataSet( + filepath=f"data/{root_namespace}/{dataset_name_chunk}.csv", + load_args={"sep": ","}, + version=Version(None, generate_timestamp()), + ) + + catalog_args = settings.DATA_CATALOG_ARGS + print(f"Catalog ARGS: {catalog_args}") + dataset = None + datasets_in_catalog = self._data_sets + for dataset_name, dataset_config in datasets_in_catalog.items(): + # Let's assume that any name with {} in it is a pattern to be matched. + if "{" and "}" in dataset_name: + # Match the pattern name against the input name + result = parse(dataset_name, dataset_input_name) + if result: + # Call dataset factory function to create dataset. + # ❓ How???? + dataset_factories = catalog_args.get("datasets_factories") + print(f"Dataset factories = {dataset_factories}") + matching_factory = dataset_factories.get(dataset_name, None) + print(f"Matching factory = {matching_factory}") + if matching_factory: + dataset = matching_factory(result.named) + else: + dataset = create_pandas_csvdataset(result.named) + return dataset + def list(self, regex_search: Optional[str] = None) -> List[str]: """ List of all ``DataSet`` names registered in the catalog. diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 7a2444cc6d..170490040e 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -72,6 +72,13 @@ def run( hook_manager = hook_manager or _NullPluginManager() catalog = catalog.shallow_copy() + # Resolve dataset factories + # For all pipeline datasets, try match them against the catalog patterns + for dataset in pipeline.data_sets(): + matched_dataset = catalog.match_name_against_dataset_factories(dataset) + # and add the dataset if it's a match. + if matched_dataset: + catalog.add(dataset, matched_dataset) unsatisfied = pipeline.inputs() - set(catalog.list()) if unsatisfied: From 9bcd31ecfdf669559f2ac6e0e940302905cb1bb9 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Wed, 3 May 2023 15:37:26 +0100 Subject: [PATCH 02/14] Match dataset against pattern and resolve from there Signed-off-by: Merel Theisen --- kedro/io/data_catalog.py | 113 ++++++++++++++++++--------------------- kedro/runner/runner.py | 15 ++---- 2 files changed, 57 insertions(+), 71 deletions(-) diff --git a/kedro/io/data_catalog.py b/kedro/io/data_catalog.py index c8087f6d2f..888f42c3c8 100644 --- a/kedro/io/data_catalog.py +++ b/kedro/io/data_catalog.py @@ -143,6 +143,7 @@ def __init__( data_sets: Dict[str, AbstractDataSet] = None, feed_dict: Dict[str, Any] = None, layers: Dict[str, Set[str]] = None, + dataset_patterns: Dict[str, Any] = None, ) -> None: """``DataCatalog`` stores instances of ``AbstractDataSet`` implementations to provide ``load`` and ``save`` capabilities from @@ -172,6 +173,7 @@ def __init__( self._data_sets = dict(data_sets or {}) self.datasets = _FrozenDatasets(self._data_sets) self.layers = layers + self.dataset_patterns = dict(dataset_patterns or {}) # import the feed dict if feed_dict: @@ -259,6 +261,7 @@ class to be loaded is specified with the key ``type`` and their >>> catalog.save("boats", df) """ data_sets = {} + dataset_patterns = {} catalog = copy.deepcopy(catalog) or {} credentials = copy.deepcopy(credentials) or {} save_version = save_version or generate_timestamp() @@ -273,35 +276,43 @@ class to be loaded is specified with the key ``type`` and their layers: Dict[str, Set[str]] = defaultdict(set) for ds_name, ds_config in catalog.items(): - ds_layer = ds_config.pop("layer", None) - if ds_layer is not None: - layers[ds_layer].add(ds_name) - - ds_config = _resolve_credentials(ds_config, credentials) - data_sets[ds_name] = AbstractDataSet.from_config( - ds_name, ds_config, load_versions.get(ds_name), save_version - ) + # Let's assume that any name with {} in it is a dataset pattern to be matched. + if "{" and "}" in ds_name: + dataset_patterns[ds_name] = ds_config + else: + ds_layer = ds_config.pop("layer", None) + if ds_layer is not None: + layers[ds_layer].add(ds_name) + ds_config = _resolve_credentials(ds_config, credentials) + data_sets[ds_name] = AbstractDataSet.from_config( + ds_name, ds_config, load_versions.get(ds_name), save_version + ) dataset_layers = layers or None - return cls(data_sets=data_sets, layers=dataset_layers) + return cls(data_sets=data_sets, layers=dataset_layers, dataset_patterns=dataset_patterns) def _get_dataset( self, data_set_name: str, version: Version = None, suggest: bool = True ) -> AbstractDataSet: if data_set_name not in self._data_sets: - error_msg = f"DataSet '{data_set_name}' not found in the catalog" + # Try to match against pattern + matched_dataset = self.match_name_against_dataset_factories(data_set_name) + if matched_dataset: + self.add(data_set_name, matched_dataset) + else: + error_msg = f"DataSet '{data_set_name}' not found in the catalog" - # Flag to turn on/off fuzzy-matching which can be time consuming and - # slow down plugins like `kedro-viz` - if suggest: - matches = difflib.get_close_matches( - data_set_name, self._data_sets.keys() - ) - if matches: - suggestions = ", ".join(matches) - error_msg += f" - did you mean one of these instead: {suggestions}" + # Flag to turn on/off fuzzy-matching which can be time consuming and + # slow down plugins like `kedro-viz` + if suggest: + matches = difflib.get_close_matches( + data_set_name, self._data_sets.keys() + ) + if matches: + suggestions = ", ".join(matches) + error_msg += f" - did you mean one of these instead: {suggestions}" - raise DataSetNotFoundError(error_msg) + raise DataSetNotFoundError(error_msg) data_set = self._data_sets[data_set_name] if version and isinstance(data_set, AbstractVersionedDataSet): @@ -525,46 +536,18 @@ def add_feed_dict(self, feed_dict: Dict[str, Any], replace: bool = False) -> Non self.add(data_set_name, data_set, replace) def match_name_against_dataset_factories(self, dataset_input_name: str): - """ - def create_spark_dataset(dataset_name: str, *chunks): - # e.g. here chunks=["root_namespace", "something-instead-the-*", "spark"] - return SparkDataSet(filepath=f"data/{chunks[0]}/{chunks[1]}.parquet", file_format="parquet") - - "{root_namespace}.{dataset_name}@spark": - type: spark.SparkDataSet - filepath: data/{root_namespace}/{dataset_name}.parquet - file_format: parquet - """ - - def create_pandas_csvdataset(chunks: dict): - root_namespace = chunks.get("root_namespace") - dataset_name_chunk = chunks.get("dataset_name") - return CSVDataSet( - filepath=f"data/{root_namespace}/{dataset_name_chunk}.csv", - load_args={"sep": ","}, - version=Version(None, generate_timestamp()), - ) - - catalog_args = settings.DATA_CATALOG_ARGS - print(f"Catalog ARGS: {catalog_args}") dataset = None - datasets_in_catalog = self._data_sets - for dataset_name, dataset_config in datasets_in_catalog.items(): - # Let's assume that any name with {} in it is a pattern to be matched. - if "{" and "}" in dataset_name: - # Match the pattern name against the input name - result = parse(dataset_name, dataset_input_name) - if result: - # Call dataset factory function to create dataset. - # ❓ How???? - dataset_factories = catalog_args.get("datasets_factories") - print(f"Dataset factories = {dataset_factories}") - matching_factory = dataset_factories.get(dataset_name, None) - print(f"Matching factory = {matching_factory}") - if matching_factory: - dataset = matching_factory(result.named) - else: - dataset = create_pandas_csvdataset(result.named) + for dataset_name, dataset_config in self.dataset_patterns.items(): + result = parse(dataset_name, dataset_input_name) + if result: + config_copy = copy.deepcopy(dataset_config) + # Match results to patterns in catalog entry + for key, value in config_copy.items(): + if '}' in value: + string_value = str(value) + config_copy[key] = string_value.format_map(result.named) + # Create dataset from catalog config. + dataset = AbstractDataSet.from_config(dataset_name, config_copy) return dataset def list(self, regex_search: Optional[str] = None) -> List[str]: @@ -612,13 +595,23 @@ def list(self, regex_search: Optional[str] = None) -> List[str]: ) from exc return [dset_name for dset_name in self._data_sets if pattern.search(dset_name)] + def remove_pattern_matches(self, dataset_list: Set[str]): + dataset_list_minus_matched = [] + for dataset in dataset_list: + # If dataset matches a pattern, remove it from the list. + for dataset_name, dataset_config in self.dataset_patterns.items(): + result = parse(dataset_name, dataset) + if not result: + dataset_list_minus_matched.append(dataset) + return set(dataset_list_minus_matched) + def shallow_copy(self) -> "DataCatalog": """Returns a shallow copy of the current object. Returns: Copy of the current object. """ - return DataCatalog(data_sets=self._data_sets, layers=self.layers) + return DataCatalog(data_sets=self._data_sets, layers=self.layers, dataset_patterns=self.dataset_patterns) def __eq__(self, other): return (self._data_sets, self.layers) == (other._data_sets, other.layers) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 170490040e..9b3d21bbcf 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -72,22 +72,15 @@ def run( hook_manager = hook_manager or _NullPluginManager() catalog = catalog.shallow_copy() - # Resolve dataset factories - # For all pipeline datasets, try match them against the catalog patterns - for dataset in pipeline.data_sets(): - matched_dataset = catalog.match_name_against_dataset_factories(dataset) - # and add the dataset if it's a match. - if matched_dataset: - catalog.add(dataset, matched_dataset) - - unsatisfied = pipeline.inputs() - set(catalog.list()) + + unsatisfied = catalog.remove_pattern_matches(pipeline.inputs() - set(catalog.list())) if unsatisfied: raise ValueError( f"Pipeline input(s) {unsatisfied} not found in the DataCatalog" ) - free_outputs = pipeline.outputs() - set(catalog.list()) - unregistered_ds = pipeline.data_sets() - set(catalog.list()) + free_outputs = catalog.remove_pattern_matches(pipeline.outputs() - set(catalog.list())) + unregistered_ds = catalog.remove_pattern_matches(pipeline.data_sets() - set(catalog.list())) for ds_name in unregistered_ds: catalog.add(ds_name, self.create_default_data_set(ds_name)) From 431719daf9c56db4c102504f3601b80f8f05edff Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Wed, 3 May 2023 16:19:54 +0100 Subject: [PATCH 03/14] Add catalog show and resolve commands Signed-off-by: Merel Theisen --- kedro/framework/cli/catalog.py | 62 +++++++++++++++++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index 5fd64fdd43..a049140e1d 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -1,4 +1,5 @@ """A collection of CLI commands for working with Kedro catalog.""" +import copy from collections import defaultdict import click @@ -9,7 +10,7 @@ from kedro.framework.project import pipelines, settings from kedro.framework.session import KedroSession from kedro.framework.startup import ProjectMetadata - +from parse import parse def _create_session(package_name: str, **kwargs): kwargs.setdefault("save_on_close", False) @@ -174,3 +175,62 @@ def _add_missing_datasets_to_catalog(missing_ds, catalog_path): catalog_path.parent.mkdir(exist_ok=True) with catalog_path.open(mode="w") as catalog_file: yaml.safe_dump(catalog_config, catalog_file, default_flow_style=False) + + +@catalog.command("show") +@env_option +@click.pass_obj +def list_datasets(metadata: ProjectMetadata, env): + session = _create_session(metadata.package_name, env=env) + context = session.load_context() + catalog = context.config_loader["catalog"] + secho(yaml.dump(catalog)) + +@catalog.command("resolve") +@env_option +@click.option( + "--pipeline", + "-p", + type=str, + default="", + help="Name of the modular pipeline to run. If not set, " + "the project pipeline is run by default.", + callback=split_string, +) +@click.pass_obj +def list_datasets(metadata: ProjectMetadata, pipeline, env): + session = _create_session(metadata.package_name, env=env) + context = session.load_context() + catalog = context.config_loader["catalog"] + + target_pipelines = pipeline or pipelines.keys() + + pipeline_datasets = [] + for pipe in target_pipelines: + pl_obj = pipelines.get(pipe) + if pl_obj: + pipeline_ds = pl_obj.data_sets() + for ds in pipeline_ds: + pipeline_datasets.append(ds) + else: + existing_pls = ", ".join(sorted(pipelines.keys())) + raise KedroCliError( + f"'{pipe}' pipeline not found! Existing pipelines: {existing_pls}" + ) + + + catalog_copy = copy.deepcopy(catalog) + for ds_name, ds_config in catalog.items(): + if "{" and "}" in ds_name: + for pipeline_dataset in set(pipeline_datasets): + result = parse(ds_name, pipeline_dataset) + if result: + config_copy = copy.deepcopy(ds_config) + # Match results to patterns in catalog entry + for key, value in config_copy.items(): + if '}' in value: + string_value = str(value) + config_copy[key] = string_value.format_map(result.named) + catalog_copy[pipeline_dataset] = config_copy + + secho(yaml.dump(catalog_copy)) From 76b05d6afc6c65527d123cdaadc4b1399a7e1137 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Thu, 4 May 2023 14:32:27 +0100 Subject: [PATCH 04/14] Small cleanups Signed-off-by: Merel Theisen --- kedro/framework/cli/catalog.py | 13 +++++++------ kedro/framework/project/__init__.py | 4 +--- kedro/io/data_catalog.py | 26 +++++++++++++++++--------- kedro/runner/runner.py | 14 ++++++++++---- 4 files changed, 35 insertions(+), 22 deletions(-) diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index a049140e1d..a8773d0c39 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -5,12 +5,13 @@ import click import yaml from click import secho +from parse import parse from kedro.framework.cli.utils import KedroCliError, env_option, split_string from kedro.framework.project import pipelines, settings from kedro.framework.session import KedroSession from kedro.framework.startup import ProjectMetadata -from parse import parse + def _create_session(package_name: str, **kwargs): kwargs.setdefault("save_on_close", False) @@ -180,12 +181,13 @@ def _add_missing_datasets_to_catalog(missing_ds, catalog_path): @catalog.command("show") @env_option @click.pass_obj -def list_datasets(metadata: ProjectMetadata, env): +def show_catalog_datasets(metadata: ProjectMetadata, env): session = _create_session(metadata.package_name, env=env) context = session.load_context() catalog = context.config_loader["catalog"] secho(yaml.dump(catalog)) + @catalog.command("resolve") @env_option @click.option( @@ -198,7 +200,7 @@ def list_datasets(metadata: ProjectMetadata, env): callback=split_string, ) @click.pass_obj -def list_datasets(metadata: ProjectMetadata, pipeline, env): +def resolve_catalog_datasets(metadata: ProjectMetadata, pipeline, env): session = _create_session(metadata.package_name, env=env) context = session.load_context() catalog = context.config_loader["catalog"] @@ -218,17 +220,16 @@ def list_datasets(metadata: ProjectMetadata, pipeline, env): f"'{pipe}' pipeline not found! Existing pipelines: {existing_pls}" ) - catalog_copy = copy.deepcopy(catalog) for ds_name, ds_config in catalog.items(): - if "{" and "}" in ds_name: + if "}" in ds_name: for pipeline_dataset in set(pipeline_datasets): result = parse(ds_name, pipeline_dataset) if result: config_copy = copy.deepcopy(ds_config) # Match results to patterns in catalog entry for key, value in config_copy.items(): - if '}' in value: + if "}" in value: string_value = str(value) config_copy[key] = string_value.format_map(result.named) catalog_copy[pipeline_dataset] = config_copy diff --git a/kedro/framework/project/__init__.py b/kedro/framework/project/__init__.py index d7dad68667..b380ecfbca 100644 --- a/kedro/framework/project/__init__.py +++ b/kedro/framework/project/__init__.py @@ -110,10 +110,8 @@ class _ProjectSettings(LazySettings): _DATA_CATALOG_CLASS = _IsSubclassValidator( "DATA_CATALOG_CLASS", default=_get_default_class("kedro.io.DataCatalog") ) - _DATA_CATALOG_ARGS = Validator("DATA_CATALOG_ARGS", default={}) def __init__(self, *args, **kwargs): - kwargs.update( validators=[ self._CONF_SOURCE, @@ -125,7 +123,6 @@ def __init__(self, *args, **kwargs): self._CONFIG_LOADER_CLASS, self._CONFIG_LOADER_ARGS, self._DATA_CATALOG_CLASS, - self._DATA_CATALOG_ARGS, ] ) super().__init__(*args, **kwargs) @@ -135,6 +132,7 @@ def _load_data_wrapper(func): """Wrap a method in _ProjectPipelines so that data is loaded on first access. Taking inspiration from dynaconf.utils.functional.new_method_proxy """ + # pylint: disable=protected-access def inner(self, *args, **kwargs): self._load_data() diff --git a/kedro/io/data_catalog.py b/kedro/io/data_catalog.py index 888f42c3c8..f0788e5cd5 100644 --- a/kedro/io/data_catalog.py +++ b/kedro/io/data_catalog.py @@ -11,10 +11,8 @@ from collections import defaultdict from typing import Any, Dict, List, Optional, Set, Type, Union -from kedro_datasets.pandas import CSVDataSet from parse import parse -from kedro.framework.project import settings from kedro.io.core import ( AbstractDataSet, AbstractVersionedDataSet, @@ -276,8 +274,8 @@ class to be loaded is specified with the key ``type`` and their layers: Dict[str, Set[str]] = defaultdict(set) for ds_name, ds_config in catalog.items(): - # Let's assume that any name with {} in it is a dataset pattern to be matched. - if "{" and "}" in ds_name: + # Let's assume that any name with } in it is a dataset pattern to be matched. + if "}" in ds_name: dataset_patterns[ds_name] = ds_config else: ds_layer = ds_config.pop("layer", None) @@ -289,7 +287,11 @@ class to be loaded is specified with the key ``type`` and their ds_name, ds_config, load_versions.get(ds_name), save_version ) dataset_layers = layers or None - return cls(data_sets=data_sets, layers=dataset_layers, dataset_patterns=dataset_patterns) + return cls( + data_sets=data_sets, + layers=dataset_layers, + dataset_patterns=dataset_patterns, + ) def _get_dataset( self, data_set_name: str, version: Version = None, suggest: bool = True @@ -310,7 +312,9 @@ def _get_dataset( ) if matches: suggestions = ", ".join(matches) - error_msg += f" - did you mean one of these instead: {suggestions}" + error_msg += ( + f" - did you mean one of these instead: {suggestions}" + ) raise DataSetNotFoundError(error_msg) @@ -543,7 +547,7 @@ def match_name_against_dataset_factories(self, dataset_input_name: str): config_copy = copy.deepcopy(dataset_config) # Match results to patterns in catalog entry for key, value in config_copy.items(): - if '}' in value: + if "}" in value: string_value = str(value) config_copy[key] = string_value.format_map(result.named) # Create dataset from catalog config. @@ -599,7 +603,7 @@ def remove_pattern_matches(self, dataset_list: Set[str]): dataset_list_minus_matched = [] for dataset in dataset_list: # If dataset matches a pattern, remove it from the list. - for dataset_name, dataset_config in self.dataset_patterns.items(): + for dataset_name in self.dataset_patterns.keys(): result = parse(dataset_name, dataset) if not result: dataset_list_minus_matched.append(dataset) @@ -611,7 +615,11 @@ def shallow_copy(self) -> "DataCatalog": Returns: Copy of the current object. """ - return DataCatalog(data_sets=self._data_sets, layers=self.layers, dataset_patterns=self.dataset_patterns) + return DataCatalog( + data_sets=self._data_sets, + layers=self.layers, + dataset_patterns=self.dataset_patterns, + ) def __eq__(self, other): return (self._data_sets, self.layers) == (other._data_sets, other.layers) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 9b3d21bbcf..4351d43017 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -72,15 +72,21 @@ def run( hook_manager = hook_manager or _NullPluginManager() catalog = catalog.shallow_copy() - - unsatisfied = catalog.remove_pattern_matches(pipeline.inputs() - set(catalog.list())) + + unsatisfied = catalog.remove_pattern_matches( + pipeline.inputs() - set(catalog.list()) + ) if unsatisfied: raise ValueError( f"Pipeline input(s) {unsatisfied} not found in the DataCatalog" ) - free_outputs = catalog.remove_pattern_matches(pipeline.outputs() - set(catalog.list())) - unregistered_ds = catalog.remove_pattern_matches(pipeline.data_sets() - set(catalog.list())) + free_outputs = catalog.remove_pattern_matches( + pipeline.outputs() - set(catalog.list()) + ) + unregistered_ds = catalog.remove_pattern_matches( + pipeline.data_sets() - set(catalog.list()) + ) for ds_name in unregistered_ds: catalog.add(ds_name, self.create_default_data_set(ds_name)) From f563d4490295108b379070570088d6b2c25d959f Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Thu, 4 May 2023 16:43:16 +0100 Subject: [PATCH 05/14] Add comments to explain code Signed-off-by: Merel Theisen --- kedro/framework/cli/catalog.py | 28 +++++++++++----------------- kedro/io/data_catalog.py | 25 +++++++++++++++++++++++-- kedro/runner/runner.py | 7 ++++++- 3 files changed, 40 insertions(+), 20 deletions(-) diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index a8773d0c39..946eee1a88 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -184,31 +184,21 @@ def _add_missing_datasets_to_catalog(missing_ds, catalog_path): def show_catalog_datasets(metadata: ProjectMetadata, env): session = _create_session(metadata.package_name, env=env) context = session.load_context() - catalog = context.config_loader["catalog"] - secho(yaml.dump(catalog)) + catalog_conf = context.config_loader["catalog"] + secho(yaml.dump(catalog_conf)) @catalog.command("resolve") @env_option -@click.option( - "--pipeline", - "-p", - type=str, - default="", - help="Name of the modular pipeline to run. If not set, " - "the project pipeline is run by default.", - callback=split_string, -) @click.pass_obj -def resolve_catalog_datasets(metadata: ProjectMetadata, pipeline, env): +def resolve_catalog_datasets(metadata: ProjectMetadata, env): session = _create_session(metadata.package_name, env=env) context = session.load_context() - catalog = context.config_loader["catalog"] - - target_pipelines = pipeline or pipelines.keys() + catalog_conf = context.config_loader["catalog"] + # Create a list of all datasets used in the project pipelines. pipeline_datasets = [] - for pipe in target_pipelines: + for pipe in pipelines.keys(): pl_obj = pipelines.get(pipe) if pl_obj: pipeline_ds = pl_obj.data_sets() @@ -220,7 +210,11 @@ def resolve_catalog_datasets(metadata: ProjectMetadata, pipeline, env): f"'{pipe}' pipeline not found! Existing pipelines: {existing_pls}" ) - catalog_copy = copy.deepcopy(catalog) + # Create a copy of the catalog config to not modify the original. + catalog_copy = copy.deepcopy(catalog_conf) + # Loop over all entries in the catalog, find the ones that contain a pattern to be matched, + # loop over al datasets in the pipeline and match these against the patterns. + # Then expand the matches and add them to the catalog copy to display on the CLI. for ds_name, ds_config in catalog.items(): if "}" in ds_name: for pipeline_dataset in set(pipeline_datasets): diff --git a/kedro/io/data_catalog.py b/kedro/io/data_catalog.py index f0788e5cd5..718dc6562f 100644 --- a/kedro/io/data_catalog.py +++ b/kedro/io/data_catalog.py @@ -171,6 +171,8 @@ def __init__( self._data_sets = dict(data_sets or {}) self.datasets = _FrozenDatasets(self._data_sets) self.layers = layers + # Keep a record of all patterns in the catalog. + # {dataset pattern name : dataset pattern body} self.dataset_patterns = dict(dataset_patterns or {}) # import the feed dict @@ -276,6 +278,7 @@ class to be loaded is specified with the key ``type`` and their for ds_name, ds_config in catalog.items(): # Let's assume that any name with } in it is a dataset pattern to be matched. if "}" in ds_name: + # Add each pattern to the dataset_patterns dict. dataset_patterns[ds_name] = ds_config else: ds_layer = ds_config.pop("layer", None) @@ -297,7 +300,10 @@ def _get_dataset( self, data_set_name: str, version: Version = None, suggest: bool = True ) -> AbstractDataSet: if data_set_name not in self._data_sets: - # Try to match against pattern + # When a dataset is "used" in the pipeline that's not in the recorded catalog datasets, + # try to match it against the patterns in the catalog. If it's a match, resolve it to + # a dataset instance and add it to the catalog, so it only needs to be matched once + # and not everytime the dataset is used in the pipeline. matched_dataset = self.match_name_against_dataset_factories(data_set_name) if matched_dataset: self.add(data_set_name, matched_dataset) @@ -539,16 +545,29 @@ def add_feed_dict(self, feed_dict: Dict[str, Any], replace: bool = False) -> Non self.add(data_set_name, data_set, replace) - def match_name_against_dataset_factories(self, dataset_input_name: str): + def match_name_against_dataset_factories(self, dataset_input_name: str) -> Optional[AbstractDataSet]: + """ + For a given dataset name, try to match it against the dataset patterns in the catalog. + If it's a match, return the dataset instance. + """ dataset = None + # Loop through all dataset patterns and check if the given dataset name has a match. for dataset_name, dataset_config in self.dataset_patterns.items(): result = parse(dataset_name, dataset_input_name) + # If there's a match resolve the rest of the pattern to create a dataset instance. + # A result can be None or something like: + # if result: config_copy = copy.deepcopy(dataset_config) # Match results to patterns in catalog entry for key, value in config_copy.items(): + # Find all dataset fields that need to be resolved with + # the values that were matched. if "}" in value: string_value = str(value) + # result.named: {'root_namespace': 'germany', 'dataset_name': 'companies'} + # format_map fills in dict values into a string with {...} placeholders + # of the same key name. config_copy[key] = string_value.format_map(result.named) # Create dataset from catalog config. dataset = AbstractDataSet.from_config(dataset_name, config_copy) @@ -600,6 +619,8 @@ def list(self, regex_search: Optional[str] = None) -> List[str]: return [dset_name for dset_name in self._data_sets if pattern.search(dset_name)] def remove_pattern_matches(self, dataset_list: Set[str]): + """Helper method that checks which dataset names match a pattern in the catalog. + It returns a copy of the original list minus all those matched dataset names.""" dataset_list_minus_matched = [] for dataset in dataset_list: # If dataset matches a pattern, remove it from the list. diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 4351d43017..ee4e478727 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -73,6 +73,8 @@ def run( hook_manager = hook_manager or _NullPluginManager() catalog = catalog.shallow_copy() + # Check if there are any input datasets that aren't in the catalog and + # don't match a pattern in the catalog. unsatisfied = catalog.remove_pattern_matches( pipeline.inputs() - set(catalog.list()) ) @@ -80,10 +82,13 @@ def run( raise ValueError( f"Pipeline input(s) {unsatisfied} not found in the DataCatalog" ) - + # Check if there's any output datasets that aren't in the catalog and don't match a pattern + # in the catalog. free_outputs = catalog.remove_pattern_matches( pipeline.outputs() - set(catalog.list()) ) + # Check which datasets used in the pipeline aren't in the catalog and don't match + # a pattern in the catalog and create a default dataset for those datasets. unregistered_ds = catalog.remove_pattern_matches( pipeline.data_sets() - set(catalog.list()) ) From 8644fffcba64b3acfc9efba896d86ce115e09eb7 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Thu, 4 May 2023 16:56:11 +0100 Subject: [PATCH 06/14] Fix error if no patterns in the catalog Signed-off-by: Merel Theisen --- kedro/io/data_catalog.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/kedro/io/data_catalog.py b/kedro/io/data_catalog.py index 718dc6562f..7d3bea5493 100644 --- a/kedro/io/data_catalog.py +++ b/kedro/io/data_catalog.py @@ -318,9 +318,7 @@ def _get_dataset( ) if matches: suggestions = ", ".join(matches) - error_msg += ( - f" - did you mean one of these instead: {suggestions}" - ) + error_msg += f" - did you mean one of these instead: {suggestions}" raise DataSetNotFoundError(error_msg) @@ -621,14 +619,16 @@ def list(self, regex_search: Optional[str] = None) -> List[str]: def remove_pattern_matches(self, dataset_list: Set[str]): """Helper method that checks which dataset names match a pattern in the catalog. It returns a copy of the original list minus all those matched dataset names.""" - dataset_list_minus_matched = [] - for dataset in dataset_list: - # If dataset matches a pattern, remove it from the list. - for dataset_name in self.dataset_patterns.keys(): - result = parse(dataset_name, dataset) - if not result: - dataset_list_minus_matched.append(dataset) - return set(dataset_list_minus_matched) + if self.dataset_patterns: + dataset_list_minus_matched = [] + for dataset in dataset_list: + # If dataset matches a pattern, remove it from the list. + for dataset_name in self.dataset_patterns.keys(): + result = parse(dataset_name, dataset) + if not result: + dataset_list_minus_matched.append(dataset) + return set(dataset_list_minus_matched) + return dataset_list def shallow_copy(self) -> "DataCatalog": """Returns a shallow copy of the current object. From 10b2bcca623af709b1807fba102b9a1e8bfef99d Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Fri, 5 May 2023 10:15:05 +0100 Subject: [PATCH 07/14] Add parse to requirements Signed-off-by: Merel Theisen --- dependency/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/dependency/requirements.txt b/dependency/requirements.txt index 87535a356e..3dbcaa0040 100644 --- a/dependency/requirements.txt +++ b/dependency/requirements.txt @@ -12,6 +12,7 @@ importlib_resources>=1.3 # The `files()` API was introduced in `importlib_resou jmespath>=0.9.5, <1.0 more_itertools~=9.0 omegaconf~=2.3 +parse pip-tools~=6.5 pluggy~=1.0.0 PyYAML>=4.2, <7.0 From 97ccec840f6734d0baf96e6648a7948ac1a23f3c Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Fri, 5 May 2023 14:09:46 +0100 Subject: [PATCH 08/14] Fix small bugs Signed-off-by: Merel Theisen --- kedro/framework/cli/catalog.py | 6 +++--- kedro/io/data_catalog.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index 946eee1a88..1daae01da9 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -1,6 +1,6 @@ """A collection of CLI commands for working with Kedro catalog.""" import copy -from collections import defaultdict +from collections import defaultdict, Iterable import click import yaml @@ -215,7 +215,7 @@ def resolve_catalog_datasets(metadata: ProjectMetadata, env): # Loop over all entries in the catalog, find the ones that contain a pattern to be matched, # loop over al datasets in the pipeline and match these against the patterns. # Then expand the matches and add them to the catalog copy to display on the CLI. - for ds_name, ds_config in catalog.items(): + for ds_name, ds_config in catalog_conf.items(): if "}" in ds_name: for pipeline_dataset in set(pipeline_datasets): result = parse(ds_name, pipeline_dataset) @@ -223,7 +223,7 @@ def resolve_catalog_datasets(metadata: ProjectMetadata, env): config_copy = copy.deepcopy(ds_config) # Match results to patterns in catalog entry for key, value in config_copy.items(): - if "}" in value: + if isinstance(value, Iterable) and "}" in value: string_value = str(value) config_copy[key] = string_value.format_map(result.named) catalog_copy[pipeline_dataset] = config_copy diff --git a/kedro/io/data_catalog.py b/kedro/io/data_catalog.py index 7d3bea5493..b98e7ca410 100644 --- a/kedro/io/data_catalog.py +++ b/kedro/io/data_catalog.py @@ -8,7 +8,7 @@ import difflib import logging import re -from collections import defaultdict +from collections import defaultdict, Iterable from typing import Any, Dict, List, Optional, Set, Type, Union from parse import parse @@ -561,7 +561,7 @@ def match_name_against_dataset_factories(self, dataset_input_name: str) -> Optio for key, value in config_copy.items(): # Find all dataset fields that need to be resolved with # the values that were matched. - if "}" in value: + if isinstance(value, Iterable) and "}" in value: string_value = str(value) # result.named: {'root_namespace': 'germany', 'dataset_name': 'companies'} # format_map fills in dict values into a string with {...} placeholders From 6403465925a799d4e9496f259e951dc034ca9fd3 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Wed, 10 May 2023 12:38:49 +0100 Subject: [PATCH 09/14] Fix bug with removing datasets that match when multiple patterns present Signed-off-by: Merel Theisen --- kedro/io/data_catalog.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kedro/io/data_catalog.py b/kedro/io/data_catalog.py index b98e7ca410..fee6bac9e7 100644 --- a/kedro/io/data_catalog.py +++ b/kedro/io/data_catalog.py @@ -625,7 +625,9 @@ def remove_pattern_matches(self, dataset_list: Set[str]): # If dataset matches a pattern, remove it from the list. for dataset_name in self.dataset_patterns.keys(): result = parse(dataset_name, dataset) - if not result: + if result: + break + else: dataset_list_minus_matched.append(dataset) return set(dataset_list_minus_matched) return dataset_list From 50b540e4789ff173bd26f8a0a324ae8f63b8de60 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 23 May 2023 17:14:42 +0100 Subject: [PATCH 10/14] Fix import Signed-off-by: Merel Theisen --- kedro/io/data_catalog.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kedro/io/data_catalog.py b/kedro/io/data_catalog.py index fee6bac9e7..09cd075c2e 100644 --- a/kedro/io/data_catalog.py +++ b/kedro/io/data_catalog.py @@ -8,8 +8,8 @@ import difflib import logging import re -from collections import defaultdict, Iterable -from typing import Any, Dict, List, Optional, Set, Type, Union +from collections import defaultdict +from typing import Any, Dict, List, Optional, Set, Type, Union, Iterable from parse import parse From 45229d6abffd27e913eaef65fe8976612fbb445f Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 23 May 2023 17:15:54 +0100 Subject: [PATCH 11/14] Fix import Signed-off-by: Merel Theisen --- kedro/framework/cli/catalog.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index 1daae01da9..3c006c2057 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -1,6 +1,7 @@ """A collection of CLI commands for working with Kedro catalog.""" import copy -from collections import defaultdict, Iterable +from collections import defaultdict +from typing import Iterable import click import yaml From d3176a0bbe574ea921881a5e1104a35e889de856 Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 23 May 2023 17:21:35 +0100 Subject: [PATCH 12/14] Remove patterns from resolved catalog Signed-off-by: Merel Theisen --- kedro/framework/cli/catalog.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index 3c006c2057..871384af4f 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -229,4 +229,9 @@ def resolve_catalog_datasets(metadata: ProjectMetadata, env): config_copy[key] = string_value.format_map(result.named) catalog_copy[pipeline_dataset] = config_copy + # Remove all patterns from the resolved catalog + for ds_name, ds_config in catalog_copy.items(): + if "}" in ds_name: + del catalog_copy[ds_name] + secho(yaml.dump(catalog_copy)) From 9a418de30d3729e660a1f2b8b409e7c994ca965b Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 23 May 2023 17:40:46 +0100 Subject: [PATCH 13/14] Remove patterns from resolved catalog, fix Signed-off-by: Merel Theisen --- kedro/framework/cli/catalog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index 871384af4f..98f77eb3c7 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -230,7 +230,7 @@ def resolve_catalog_datasets(metadata: ProjectMetadata, env): catalog_copy[pipeline_dataset] = config_copy # Remove all patterns from the resolved catalog - for ds_name, ds_config in catalog_copy.items(): + for ds_name, ds_config in catalog_conf.items(): if "}" in ds_name: del catalog_copy[ds_name] From 61f4b9e6a99c45e1d64ea9ba3ab78fb4dfe1d6bc Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Tue, 23 May 2023 18:11:52 +0100 Subject: [PATCH 14/14] Fix check for existing datasets in catalog/pattern match Signed-off-by: Merel Theisen --- kedro/io/data_catalog.py | 13 +++++++++++++ kedro/runner/runner.py | 24 +++++++++++++++--------- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/kedro/io/data_catalog.py b/kedro/io/data_catalog.py index 09cd075c2e..d978c066dd 100644 --- a/kedro/io/data_catalog.py +++ b/kedro/io/data_catalog.py @@ -299,6 +299,7 @@ class to be loaded is specified with the key ``type`` and their def _get_dataset( self, data_set_name: str, version: Version = None, suggest: bool = True ) -> AbstractDataSet: + logging.warning(f"Getting data for {data_set_name}") if data_set_name not in self._data_sets: # When a dataset is "used" in the pipeline that's not in the recorded catalog datasets, # try to match it against the patterns in the catalog. If it's a match, resolve it to @@ -548,6 +549,7 @@ def match_name_against_dataset_factories(self, dataset_input_name: str) -> Optio For a given dataset name, try to match it against the dataset patterns in the catalog. If it's a match, return the dataset instance. """ + logging.warning(f"Matching dataset {dataset_input_name}") dataset = None # Loop through all dataset patterns and check if the given dataset name has a match. for dataset_name, dataset_config in self.dataset_patterns.items(): @@ -616,6 +618,17 @@ def list(self, regex_search: Optional[str] = None) -> List[str]: ) from exc return [dset_name for dset_name in self._data_sets if pattern.search(dset_name)] + def exists_in_catalog(self, dataset_name: str) -> bool: + """Check if a dataset exists in the catalog as an exact match or if it matches a pattern.""" + if dataset_name in self._data_sets: + return True + + if self.dataset_patterns and any( + parse(pattern, dataset_name) for pattern in self.dataset_patterns + ): + return True + return False + def remove_pattern_matches(self, dataset_list: Set[str]): """Helper method that checks which dataset names match a pattern in the catalog. It returns a copy of the original list minus all those matched dataset names.""" diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index ee4e478727..0899e2df11 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -75,23 +75,29 @@ def run( # Check if there are any input datasets that aren't in the catalog and # don't match a pattern in the catalog. - unsatisfied = catalog.remove_pattern_matches( - pipeline.inputs() - set(catalog.list()) - ) + unsatisfied = [ + input_name + for input_name in pipeline.inputs() + if not catalog.exists_in_catalog(input_name) + ] if unsatisfied: raise ValueError( f"Pipeline input(s) {unsatisfied} not found in the DataCatalog" ) # Check if there's any output datasets that aren't in the catalog and don't match a pattern # in the catalog. - free_outputs = catalog.remove_pattern_matches( - pipeline.outputs() - set(catalog.list()) - ) + free_outputs = [ + output_name + for output_name in pipeline.outputs() + if not catalog.exists_in_catalog(output_name) + ] + # Check which datasets used in the pipeline aren't in the catalog and don't match # a pattern in the catalog and create a default dataset for those datasets. - unregistered_ds = catalog.remove_pattern_matches( - pipeline.data_sets() - set(catalog.list()) - ) + unregistered_ds = [ + ds for ds in pipeline.data_sets() if not catalog.exists_in_catalog(ds) + ] + logging.warning(f"UNREGISTERED DS: {unregistered_ds}") for ds_name in unregistered_ds: catalog.add(ds_name, self.create_default_data_set(ds_name))