-
Notifications
You must be signed in to change notification settings - Fork 909
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PROTOTYPE NOT TO BE MERGED] Dataset factories prototype #2560
Conversation
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
pipeline_datasets = [] | ||
for pipe in pipelines.keys(): | ||
pl_obj = pipelines.get(pipe) | ||
if pl_obj: | ||
pipeline_ds = pl_obj.data_sets() | ||
for ds in pipeline_ds: | ||
pipeline_datasets.append(ds) | ||
else: | ||
existing_pls = ", ".join(sorted(pipelines.keys())) | ||
raise KedroCliError( | ||
f"'{pipe}' pipeline not found! Existing pipelines: {existing_pls}" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arguably shorter and more readable than the original snippet:
pipeline_datasets = [] | |
for pipe in pipelines.keys(): | |
pl_obj = pipelines.get(pipe) | |
if pl_obj: | |
pipeline_ds = pl_obj.data_sets() | |
for ds in pipeline_ds: | |
pipeline_datasets.append(ds) | |
else: | |
existing_pls = ", ".join(sorted(pipelines.keys())) | |
raise KedroCliError( | |
f"'{pipe}' pipeline not found! Existing pipelines: {existing_pls}" | |
) | |
ds_groups = [pipe.data_sets() for pipe in pipelines.values() if pipe] | |
pipeline_datasets = [ds for group in ds_groups for ds in group] |
Not sure why we need the error?
for ds_name, ds_config in catalog_conf.items(): | ||
if "}" in ds_name: | ||
for pipeline_dataset in set(pipeline_datasets): | ||
result = parse(ds_name, pipeline_dataset) | ||
if result: | ||
config_copy = copy.deepcopy(ds_config) | ||
# Match results to patterns in catalog entry | ||
for key, value in config_copy.items(): | ||
if isinstance(value, Iterable) and "}" in value: | ||
string_value = str(value) | ||
config_copy[key] = string_value.format_map(result.named) | ||
catalog_copy[pipeline_dataset] = config_copy | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this extremely hard to read if not for the comments. I think it would be much better if the code actually does what the comment says, step by step, rather than nesting levels.
Moreover, this complexity seems to have hidden a logical incorrectness of the code: all datasets that appear exactly as they are in the catalog, shouldn't be matched against the patterns (which the original code doesn't do).
for ds_name, ds_config in catalog_conf.items(): | |
if "}" in ds_name: | |
for pipeline_dataset in set(pipeline_datasets): | |
result = parse(ds_name, pipeline_dataset) | |
if result: | |
config_copy = copy.deepcopy(ds_config) | |
# Match results to patterns in catalog entry | |
for key, value in config_copy.items(): | |
if isinstance(value, Iterable) and "}" in value: | |
string_value = str(value) | |
config_copy[key] = string_value.format_map(result.named) | |
catalog_copy[pipeline_dataset] = config_copy | |
skip = [ds_name for catalog_conf.keys() if "}" not in ds_name] | |
datasets = set(pipeline_datasets) - set(skip) | |
patterns = [ds_name for catalog_conf.keys() if "}" in ds_name] | |
matches = [(ds, pattern, parse(pattern, ds)) for ds in datasets for pattern in patterns] | |
matches = [(ds, pattern, result) for ds, pattern, result in matches if result] | |
for ds, pattern, result in matches: | |
cfg = copy.deepcopy(catalog_conf[pattern]) | |
# Match results to patterns in catalog entry | |
for key, value in cfg.items(): | |
if isinstance(value, Iterable) and "}" in value: | |
string_value = str(value) | |
cfg[key] = string_value.format_map(result.named) | |
catalog_copy[pipeline_dataset] = cfg | |
The code for populating the template should probably be a separate function for readability and also to ensure that we can call it recursively (as each value can be a dictionary with other values, etc).
The original code does all the matching and it doesn't stop at the first match, maybe that's intended so we can see all possible matches?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah you can completely ignore this code. As I wrote in the description this is just bonus for the prototype to show what's possible, but in no way meant to be merged. I just went for a quick implementation, because this needs to proper parsing logic and will change anyway.
@@ -520,6 +543,34 @@ def add_feed_dict(self, feed_dict: Dict[str, Any], replace: bool = False) -> Non | |||
|
|||
self.add(data_set_name, data_set, replace) | |||
|
|||
def match_name_against_dataset_factories(self, dataset_input_name: str) -> Optional[AbstractDataSet]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Crazily long name 😅
""" | ||
dataset = None | ||
# Loop through all dataset patterns and check if the given dataset name has a match. | ||
for dataset_name, dataset_config in self.dataset_patterns.items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dataset_name
here is a pattern
and the dataset_config
is a template
/factory
, right? We could use that terminology for ease and readability.
config_copy = copy.deepcopy(dataset_config) | ||
# Match results to patterns in catalog entry | ||
for key, value in config_copy.items(): | ||
# Find all dataset fields that need to be resolved with | ||
# the values that were matched. | ||
if isinstance(value, Iterable) and "}" in value: | ||
string_value = str(value) | ||
# result.named: {'root_namespace': 'germany', 'dataset_name': 'companies'} | ||
# format_map fills in dict values into a string with {...} placeholders | ||
# of the same key name. | ||
config_copy[key] = string_value.format_map(result.named) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, will make this a function for readability and call it recursively down the line, returning materialised config dictionary.
config_copy[key] = string_value.format_map(result.named) | ||
# Create dataset from catalog config. | ||
dataset = AbstractDataSet.from_config(dataset_name, config_copy) | ||
return dataset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably worth caching the match here before returning and adding it to self._data_sets
.
if self.dataset_patterns: | ||
dataset_list_minus_matched = [] | ||
for dataset in dataset_list: | ||
# If dataset matches a pattern, remove it from the list. | ||
for dataset_name in self.dataset_patterns.keys(): | ||
result = parse(dataset_name, dataset) | ||
if result: | ||
break | ||
else: | ||
dataset_list_minus_matched.append(dataset) | ||
return set(dataset_list_minus_matched) | ||
return dataset_list |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is much nicer:
if self.dataset_patterns: | |
dataset_list_minus_matched = [] | |
for dataset in dataset_list: | |
# If dataset matches a pattern, remove it from the list. | |
for dataset_name in self.dataset_patterns.keys(): | |
result = parse(dataset_name, dataset) | |
if result: | |
break | |
else: | |
dataset_list_minus_matched.append(dataset) | |
return set(dataset_list_minus_matched) | |
return dataset_list | |
if not self.dataset_patterns: | |
return dataset_list | |
dataset_list_minus_matched = dataset_list.copy() | |
for dataset in dataset_list: | |
matches = (parse(pattern, dataset) for pattern in self.dataset_patterns.keys()) | |
# If dataset matches any pattern, remove it from the list. | |
if any(matches): | |
dataset_list_minus_matched -= dataset | |
return set(dataset_list_minus_matched) |
As a general rule:
- The shorter case in an
if
statement should always go first - If there's return, there's no need of
else
(like in the original code) - There's usually a nicer solution than using a
break
in a loop (not always though)
@@ -255,6 +261,7 @@ class to be loaded is specified with the key ``type`` and their | |||
>>> catalog.save("boats", df) | |||
""" | |||
data_sets = {} | |||
dataset_patterns = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the difference between these two patterns?
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Signed-off-by: Merel Theisen <[email protected]>
Closing this prototype PR, because the proper implementation is now open: #2635 |
Description
#2510
Prototype for the dataset factories proposal in #2423
This doesn't include any sophisticated parsing rules, but just uses the basic
parse()
method from https://github.com/r1chardj0n3s/parse#readme.Development notes
DataCatalog
class instance gets instantiated from the config in: https://github.com/kedro-org/kedro/blob/807c4e64fb/kedro/framework/context/context.py#L287, it now checks for entries that are patterns (they contain "}" ) and skips those from dataset creation and instead adds those to a new "dataset_patterns" dictionary that's kept on the catalog.remove_pattern_matches()
this is a helper method to exclude any datasets that match a pattern from the runner checks for missing inputs, free outputs and unregistered datasets that need a default dataset.kedro catalog show
prints the raw catalog config files to the CLI.kedro catalog resolve
resolves any patterns in the catalog config file against datasets used in the pipeline and prints this to the CLI.Test project that I've used to test the prototype: https://github.com/merelcht/dataset-factories
Checklist
RELEASE.md
file