From d4c716e56e607c1017f094e7873846e47647eea8 Mon Sep 17 00:00:00 2001 From: sherimickelson Date: Thu, 16 Jul 2020 11:48:11 -0600 Subject: [PATCH 1/8] First checkin for the generic yaml parser. --- .pre-commit-config.yaml | 1 - ecgtools/core.py | 40 +++- ecgtools/parsers.py | 257 ++++++++++++++++++++++ notebooks/Generic_Interface_Example.ipynb | 81 +++++++ requirements.txt | 1 + sample_yaml/cmip6.yaml | 13 ++ sample_yaml/ensemble.yaml | 17 ++ schema/generic_schema.yaml | 9 + setup.cfg | 2 +- setup.py | 1 + tests/test_core.py | 228 ++++++++++++++++++- 11 files changed, 637 insertions(+), 13 deletions(-) create mode 100644 notebooks/Generic_Interface_Example.ipynb create mode 100644 sample_yaml/cmip6.yaml create mode 100644 sample_yaml/ensemble.yaml create mode 100644 schema/generic_schema.yaml diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f85d795..f644139 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -6,7 +6,6 @@ repos: - id: trailing-whitespace - id: end-of-file-fixer - id: check-docstring-first - - id: check-yaml - id: double-quote-string-fixer - repo: https://github.com/ambv/black diff --git a/ecgtools/core.py b/ecgtools/core.py index bdec487..4d49257 100644 --- a/ecgtools/core.py +++ b/ecgtools/core.py @@ -50,8 +50,9 @@ def __init__( FileNotFoundError When `root_path` does not exist. """ - self.root_path = Path(root_path) - if not self.root_path.is_dir(): + if root_path is not None: + self.root_path = Path(root_path) + if root_path is not None and not self.root_path.is_dir(): raise FileNotFoundError(f'{root_path} directory does not exist') if parser is not None and not callable(parser): raise TypeError('parser must be callable.') @@ -200,6 +201,7 @@ def build( cat_id: str = None, description: str = None, attributes: List[dict] = None, + local_attrs: dict = None, ) -> 'Builder': """ Harvest attributes for a list of files. This method produces a pandas dataframe @@ -259,8 +261,15 @@ def build( 'aggregations': aggregations, } - filelist = self.filelist or self._get_filelist_from_dirs() - df = parse_files_attributes(filelist, self.parser, self.lazy, self.nbatches) + if len(self.filelist) == 0: + filelist = self.filelist or self._get_filelist_from_dirs() + else: + filelist = self.filelist + + if local_attrs is None: + local_attrs = {} + + df = parse_files_attributes(filelist, local_attrs, self.parser, self.lazy, self.nbatches) if attributes is None: attributes = [] @@ -272,7 +281,7 @@ def build( self.df = df return self - def update(self, catalog_file: str, path_column: str) -> 'Builder': + def update(self, catalog_file: str, path_column: str, local_attrs: dict = None,) -> 'Builder': """ Update a previously built catalog. @@ -288,12 +297,16 @@ def update(self, catalog_file: str, path_column: str) -> 'Builder': self.old_df = pd.read_csv(catalog_file) filelist_from_prev_cat = self.old_df[path_column].tolist() filelist = self._get_filelist_from_dirs() + + if local_attrs is None: + local_attrs = {} + # Case 1: The new filelist has files that are not included in the # Previously built catalog files_to_parse = list(set(filelist) - set(filelist_from_prev_cat)) if files_to_parse: self.new_df = parse_files_attributes( - files_to_parse, self.parser, self.lazy, self.nbatches + files_to_parse, local_attrs, self.parser, self.lazy, self.nbatches ) else: self.new_df = pd.DataFrame() @@ -342,7 +355,11 @@ def save(self, catalog_file: str, **kwargs,) -> 'Builder': def parse_files_attributes( - filepaths: list, parser: callable = None, lazy: bool = True, nbatches: int = 25, + filepaths: list, + local_attrs: dict, + parser: callable = None, + lazy: bool = True, + nbatches: int = 25, ) -> pd.DataFrame: """ Harvest attributes for a list of files. @@ -376,7 +393,7 @@ def batch(seq): result_batch = dask.delayed(batch)(filepaths[i : i + nbatches]) results.append(result_batch) else: - results = [_parse_file_attributes(filepath, parser) for filepath in filepaths] + results = [_parse_file_attributes(filepath, local_attrs, parser) for filepath in filepaths] if dask.is_dask_collection(results[0]): results = dask.compute(*results) @@ -384,7 +401,7 @@ def batch(seq): return pd.DataFrame(results) -def _parse_file_attributes(filepath: str, parser: callable = None): +def _parse_file_attributes(filepath: str, local_attrs: dict, parser: callable = None): """ Single file attributes harvesting @@ -404,7 +421,10 @@ def _parse_file_attributes(filepath: str, parser: callable = None): results = {'path': filepath} if parser is not None: - x = parser(filepath) + if not bool(local_attrs): + x = parser(filepath) + else: + x = parser(filepath, local_attrs) # Merge x and results dictionaries results = {**x, **results} return results diff --git a/ecgtools/parsers.py b/ecgtools/parsers.py index 715de9f..c39d712 100644 --- a/ecgtools/parsers.py +++ b/ecgtools/parsers.py @@ -1,7 +1,15 @@ +import glob +import os import re +from collections import defaultdict +from pathlib import Path +import netCDF4 as nc +import pandas as pd import xarray as xr +from ecgtools import Builder + def extract_attr_with_regex( input_str: str, regex: str, strip_chars: str = None, ignore_case: bool = True @@ -92,3 +100,252 @@ def cmip6_default_parser( data = {'exception': str(e), 'file': filepath} print(data) return {} + + +class YAML_Parser: + """ + Creates a parser that parses a yaml file in order to create a catalog file + """ + + def __init__( + self, yaml_path: str, csv_path: str = None, validater: str = 'yamale', + ) -> 'YAML_Parser': + """ + Get a list of files from a list of directories. + + Parameters + ---------- + yaml_path : str + Path to the yaml file to be parsed + csv_path : str, optional + Full path to the output csv file + validater : str, optional + Choice of yaml validater. Valid options: 'yamale' or 'internal'; Default: yamale + """ + + import yaml + + self.yaml_path = yaml_path + self.csv_path = csv_path + self.builder = Builder(None, parser=self._parser_netcdf, lazy=False) + self.validater = validater + + # Read in the yaml file and validate + with open(self.yaml_path, 'r') as f: + self.input_yaml = yaml.safe_load(f) + self.valid_yaml = self._validate_yaml() + + def _internal_validation(self): + """ + Validates the generic yaml input against the schema if the user does not have yamale in + their environment. + + Parameters + ---------- + None + + Returns + ------- + boolean + True - passes the validation, False - fails the validation + """ + + # verify that we're working with a dictionary + if not isinstance(self.input_yaml, dict): + print( + 'ERROR: The experiment/dataset top level is not a dictionary. Make sure you follow the correct format.' + ) + return False + # verify that the first line is 'catalog:' and it only appears once in the yaml file + if len(self.input_yaml.keys()) != 1 or 'catalog' not in self.input_yaml.keys(): + print( + "ERROR: The first line in the yaml file must be 'catalog:' and it should only appear once." + ) + return False + if not isinstance(self.input_yaml['catalog'], list): + print( + 'ERROR: The catalog entries are not in a list. make sure you follow the corrrect format.' + ) + return False + for dataset in self.input_yaml['catalog']: + # check to see if there is a data_sources key for each dataset + if 'data_sources' not in dataset.keys(): + print("ERROR: Each experiment/dataset must have the key 'data_sources'.") + return False + # verify that we're working with a list at this level + if not isinstance(dataset['data_sources'], list): + print( + 'ERROR: The data_source entries are not in a list. Make sure you follow the correct format.' + ) + return False + for stream_info in dataset['data_sources']: + # check to make sure that there's a 'glob_string' key for each data_source + if 'glob_string' not in stream_info.keys(): + print("ERROR: Each data_source must contain a 'glob_string' key.") + return False + # ensemble is an option, but we still need to verify that it meets the rules if it is added + if 'ensemble' in dataset.keys(): + # verify that we're working with a list at this level + if not isinstance(dataset['ensemble'], list): + print( + 'ERROR: The ensemble entries are not in a list. Make sure you follow the correct format.' + ) + return False + for stream_info in dataset['ensemble']: + # check to make sure that there's a 'glob_string' key for each ensemble entry + if 'glob_string' not in stream_info.keys(): + print("ERROR: Each ensemble must contain a 'glob_string' key.") + return False + return True + + def _validate_yaml(self): + """ + Validates the generic yaml input against the schema. It uses either yamale or the internal validater. + + Parameters + ---------- + None + + Returns + ------- + boolean + True - passes the validation, False - fails the validation + """ + + # verify the format is correct + if self.validater == 'yamale': + try: + import yamale + + print('Validating yaml file with yamale.') + cwd = Path(os.path.dirname(__file__)) + schema_path = str(cwd.parent / 'schema') + '/generic_schema.yaml' + schema = yamale.make_schema(schema_path) + data = yamale.make_data(self.yaml_path) + try: + yamale.validate(schema, data) + print('Validation success! 👍') + return True + except ValueError as e: + print( + 'Yamale found that your file, ' + + self.yaml_path + + ' is not formatted correctly.' + ) + print(e) + return False + except ImportError: + print('Validating yaml file internally.') + return self._internal_validation() + else: + print('Validating yaml file internally.') + return self._internal_validation() + + def _parser_netcdf(self, filepath, local_attrs): + """ + Opens a netcdf file in order to gather time and requested attribute information. + Also attaches assigned attributes gathered from the yaml file. + + Parameters + ---------- + filepath : str + The full path to the netcdf file to attatch attributes to. + local_attrs : dict + Holds attributes that need to be attached to the filenpath. + + Returns + ------- + dict + Returns all of the attributes that need to be assigned to the netcdf. + """ + + fileparts = {} + fileparts['path'] = filepath + + try: + fileparts['variable'] = [] + # open file + d = nc.Dataset(filepath, 'r') + # find what the time (unlimited) dimension is + dims = list(dict(d.dimensions).keys()) + if 'time' in d.variables.keys(): + times = d['time'] + start = str(times[0]) + end = str(times[-1]) + fileparts['time_range'] = start + '-' + end + # loop through all variables + for v in d.variables.keys(): + # add all variables that are not coordinates to the catalog + if v not in dims: + fileparts['variable'].append(v) + + # add the keys that are common just to the particular glob string + # fileparts.update(local_attrs[filepath]) + for lv in local_attrs[filepath].keys(): + if '<<' in local_attrs[filepath][lv]: + for v in fileparts['variable']: + if lv not in fileparts.keys(): + fileparts[lv] = [] + if hasattr(d.variables[v], lv): + fileparts[lv].append(getattr(d.variables[v], lv)) + else: + fileparts[lv].append('None') + elif '<' in local_attrs[filepath][lv]: + k = local_attrs[filepath][lv].replace('<', '').replace('>', '') + if hasattr(d, k): + fileparts[lv] = getattr(d, k) + else: + fileparts[lv] = 'None' + else: + fileparts[lv] = local_attrs[filepath][lv] + # close netcdf file + d.close() + except Exception: + pass + return fileparts + + def parser(self) -> 'YAML_Parser': + """ + Method used to start the parsing process. + + Parameters + ---------- + None + + Returns + ------- + Builder + Returns a Builder object. + """ + + # loop over datasets + df_parts = [] + entries = defaultdict(dict) + # for dataset in input_yaml.keys(): + for dataset in self.input_yaml['catalog']: + # get a list of keys that are common to all files in the dataset + for g in dataset.keys(): + if 'data_sources' not in g and 'ensemble' not in g: + entries['global'] = dataset[g] + # loop over ensemble members, if they exist + if 'ensemble' in dataset.keys(): + for member in dataset['ensemble']: + glob_string = member.pop('glob_string') + self.builder.filelist = glob.glob(glob_string) + for f in self.builder.filelist: + entries[f].update(member) + # loop over all of the data_sources for the dataset, create a dataframe + # for each data_source, append that dataframe to a list that will contain + # the full dataframe (or catalog) based on everything in the yaml file. + for stream_info in dataset['data_sources']: + self.builder.filelist = glob.glob(stream_info['glob_string']) + stream_info.pop('glob_string') + for f in self.builder.filelist: + entries[f].update(stream_info) + df_parts.append(self.builder.build('path', 'variable', local_attrs=entries).df) + # create the combined dataframe from all of the data_sources and datasets from + # the yaml file + df = pd.concat(df_parts, sort=False) + + self.builder.df = df.sort_values(by=['path']) + return self.builder diff --git a/notebooks/Generic_Interface_Example.ipynb b/notebooks/Generic_Interface_Example.ipynb new file mode 100644 index 0000000..94cbcef --- /dev/null +++ b/notebooks/Generic_Interface_Example.ipynb @@ -0,0 +1,81 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from IPython.core.interactiveshell import InteractiveShell\n", + "\n", + "InteractiveShell.ast_node_interactivity = \"all\"\n", + "from IPython.display import HTML\n", + "\n", + "import pprint\n", + "from ecgtools import parsers" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "yaml_path = '../sample_yaml/cmip6.yaml'\n", + "csv_path = '/path/to/put/output/file/cmip6_new.csv'\n", + "\n", + "Parser = parsers.YAML_Parser(yaml_path, csv_path=csv_path)\n", + "b = Parser.parser()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "b.df.columns" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "b.df.info()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "b.save(csv_path)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.4" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/requirements.txt b/requirements.txt index f223d98..79f142f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +netCDF4 xarray dask[delayed] dask[bag] diff --git a/sample_yaml/cmip6.yaml b/sample_yaml/cmip6.yaml new file mode 100644 index 0000000..978e05b --- /dev/null +++ b/sample_yaml/cmip6.yaml @@ -0,0 +1,13 @@ +catalog: +- experiment: CMIP6 + data_sources: + - glob_string: sample_data/cmip/CMIP6/*/*/*/*/*/*/*/*/*/*/*.nc + experiment_id: + frequency: + parent_experiment_id: + parent_variant_label: + table_id: + variant_label: + long_name: <> + units: <> + standard_name: <> diff --git a/sample_yaml/ensemble.yaml b/sample_yaml/ensemble.yaml new file mode 100644 index 0000000..b97ca33 --- /dev/null +++ b/sample_yaml/ensemble.yaml @@ -0,0 +1,17 @@ +catalog: +- experiment: b.e11.BRCP85C5CNBDRD + ctrl_experiment: piControl + ensemble: + - glob_string: sample_data/cesm-le/b.e11.BRCP85C5CNBDRD.f09_g16.001*.nc + experiment_name: b.e11.BRCP85C5CNBDRD.f09_g16.001 + member_id: '001' + - glob_string: sample_data/cesm-le/b.e11.BRCP85C5CNBDRD.f09_g16.002*.nc + experiment_name: b.e11.BRCP85C5CNBDRD.f09_g16.002 + member_id: '002' + data_sources: + - glob_string: sample_data/cesm-le/b.e11.BRCP85C5CNBDRD.f09_g16.*pop.h.* + model_name: pop + time_freq: + long_name: <> + units: <> + cell_methods: <> diff --git a/schema/generic_schema.yaml b/schema/generic_schema.yaml new file mode 100644 index 0000000..0356883 --- /dev/null +++ b/schema/generic_schema.yaml @@ -0,0 +1,9 @@ +catalog: list(include('experiment')) +--- +experiment: + ensemble: list(include('ensembleL'), required=False) + data_sources: list(include('data_sourcesL')) +ensembleL: + glob_string: str() +data_sourcesL: + glob_string: str() diff --git a/setup.cfg b/setup.cfg index 22f8700..f8792eb 100644 --- a/setup.cfg +++ b/setup.cfg @@ -7,7 +7,7 @@ select = B,C,E,F,W,T4,B9 [isort] known_first_party=ecgtools -known_third_party=dask,intake,pandas,pkg_resources,pytest,setuptools,xarray +known_third_party=dask,intake,netCDF4,pandas,pkg_resources,pytest,setuptools,xarray,yaml multi_line_output=3 include_trailing_comma=True force_grid_wrap=0 diff --git a/setup.py b/setup.py index 69c9c3a..d1d3f66 100644 --- a/setup.py +++ b/setup.py @@ -34,6 +34,7 @@ keywords='ecgtools', name='ecgtools', packages=find_packages(include=['ecgtools', 'ecgtools.*']), + data_files=[('schema', ['schema/generic_schema.yaml'])], url='https://github.com/NCAR/ecgtools', project_urls={ 'Documentation': 'https://github.com/NCAR/ecgtools', diff --git a/tests/test_core.py b/tests/test_core.py index bc00e27..a94e2bd 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -6,12 +6,14 @@ import intake import pandas as pd import pytest +import yaml from ecgtools import Builder -from ecgtools.parsers import cmip6_default_parser +from ecgtools.parsers import YAML_Parser, cmip6_default_parser here = Path(os.path.dirname(__file__)) cmip6_root_path = here.parent / 'sample_data' / 'cmip' / 'CMIP6' +yaml_root_path = here.parent / 'sample_yaml' cmip6_global_attrs = [ 'activity_id', @@ -122,3 +124,227 @@ def test_builder_update(root_path, parser, num_items, dummy_assets): builder = builder.update(catalog_file, path_column='path') assert builder.old_df.size == num_items + len(dummy_assets) assert (builder.df.size - builder.old_df.size) == builder.new_df.size - len(dummy_assets) + + +@pytest.mark.parametrize( + 'yaml_path, csv_path, validater, expected_df_shape', + [ + (str(yaml_root_path) + '/cmip6.yaml', None, 'yamale', (59, 12)), + (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', (59, 12)), + (str(yaml_root_path) + '/ensemble.yaml', None, 'yamale', (114, 10)), + (str(yaml_root_path) + '/ensemble.yaml', None, 'internal', (114, 10)), + ], +) +def test_yaml_parser(yaml_path, csv_path, validater, expected_df_shape): + p = YAML_Parser(yaml_path, csv_path=csv_path, validater=validater) + b = p.parser() + + assert p.valid_yaml + assert b.df.shape == expected_df_shape + assert isinstance(b.df, pd.DataFrame) + assert len(b.filelist) == len(b.df) + + +yinput1 = [] +yinput2 = {'foo': []} +yinput3 = { + 'catalog': [ + { + 'ctrl_experiment': 'piControl', + 'ensemble': [ + { + 'experiment_name': 'b.e11.BRCP85C5CNBDRD.f09_g16.001', + 'glob_string': 'sample_data/cesm-le/b.e11.BRCP85C5CNBDRD.f09_g16.001*.nc', + 'member_id': '001', + }, + { + 'experiment_name': 'b.e11.BRCP85C5CNBDRD.f09_g16.002', + 'glob_string': 'sample_data/cesm-le/b.e11.BRCP85C5CNBDRD.f09_g16.002*.nc', + 'member_id': '002', + }, + ], + 'experiment': 'b.e11.BRCP85C5CNBDRD', + } + ] +} +yinput4 = { + 'catalog': [ + {'ctrl_experiment': 'piControl', 'data_sources': {}, 'experiment': 'b.e11.BRCP85C5CNBDRD'} + ] +} +yinput5 = { + 'catalog': [ + { + 'ctrl_experiment': 'piControl', + 'data_sources': [ + { + 'cell_methods': '<>', + 'glob_foo_string': 'sample_data/cesm-le/b.e11.BRCP85C5CNBDRD.f09_g16.*pop.h.*', + 'long_name': '<>', + 'model_name': 'pop', + 'time_freq': '', + 'units': '<>', + } + ], + 'ensemble': [ + { + 'experiment_name': 'b.e11.BRCP85C5CNBDRD.f09_g16.001', + 'glob_string': 'sample_data/cesm-le/b.e11.BRCP85C5CNBDRD.f09_g16.001*.nc', + 'member_id': '001', + }, + { + 'experiment_name': 'b.e11.BRCP85C5CNBDRD.f09_g16.002', + 'glob_string': 'sample_data/cesm-le/b.e11.BRCP85C5CNBDRD.f09_g16.002*.nc', + 'member_id': '002', + }, + ], + 'experiment': 'b.e11.BRCP85C5CNBDRD', + } + ] +} +yinput6 = { + 'catalog': [ + { + 'ctrl_experiment': 'piControl', + 'data_sources': [ + { + 'cell_methods': '<>', + 'glob_string': 'sample_data/cesm-le/b.e11.BRCP85C5CNBDRD.f09_g16.*pop.h.*', + 'long_name': '<>', + 'model_name': 'pop', + 'time_freq': '', + 'units': '<>', + } + ], + 'ensemble': {}, + 'experiment': 'b.e11.BRCP85C5CNBDRD', + } + ] +} +yinput7 = { + 'catalog': [ + { + 'ctrl_experiment': 'piControl', + 'data_sources': [ + { + 'cell_methods': '<>', + 'glob_string': 'sample_data/cesm-le/b.e11.BRCP85C5CNBDRD.f09_g16.*pop.h.*', + 'long_name': '<>', + 'model_name': 'pop', + 'time_freq': '', + 'units': '<>', + } + ], + 'ensemble': [ + { + 'experiment_name': 'b.e11.BRCP85C5CNBDRD.f09_g16.001', + 'glob_foo_string': 'sample_data/cesm-le/b.e11.BRCP85C5CNBDRD.f09_g16.001*.nc', + 'member_id': '001', + }, + { + 'experiment_name': 'b.e11.BRCP85C5CNBDRD.f09_g16.002', + 'glob_string': 'sample_data/cesm-le/b.e11.BRCP85C5CNBDRD.f09_g16.002*.nc', + 'member_id': '002', + }, + ], + 'experiment': 'b.e11.BRCP85C5CNBDRD', + } + ] +} +yinput8 = { + 'catalog': [ + { + 'ctrl_experiment': 'piControl', + 'data_sources': [ + { + 'cell_methods': '<>', + 'glob_string': 'sample_data/cesm-le/b.e11.BRCP85C5CNBDRD.f09_g16.*pop.h.*', + 'long_name': '<>', + 'model_name': 'pop', + 'time_freq': '', + 'units': '<>', + } + ], + 'ensemble': [ + { + 'experiment_name': 'b.e11.BRCP85C5CNBDRD.f09_g16.001', + 'glob_foo_string': 'sample_data/cesm-le/b.e11.BRCP85C5CNBDRD.f09_g16.001*.nc', + 'member_id': '001', + }, + { + 'experiment_name': 'b.e11.BRCP85C5CNBDRD.f09_g16.002', + 'glob_string': 'sample_data/cesm-le/b.e11.BRCP85C5CNBDRD.f09_g16.002*.nc', + 'member_id': '002', + }, + ], + 'experiment': 'b.e11.BRCP85C5CNBDRD', + } + ] +} +yinput9 = { + 'catalog': [ + { + 'ctrl_experiment': 'piControl', + 'data_sources': [ + { + 'cell_methods': '<>', + 'glob_string': 'sample_data/cesm-le/b.e11.BRCP85C5CNBDRD.f09_g16.*pop.h.*', + 'long_name': '<>', + 'model_name': 'pop', + 'time_freq': '', + 'units': '<>', + } + ], + 'ensemble': [ + { + 'experiment_name': 'b.e11.BRCP85C5CNBDRD.f09_g16.001', + 'glob_foo_string': 'sample_data/cesm-le/b.e11.BRCP85C5CNBDRD.f09_g16.001*.nc', + 'member_id': '001', + }, + { + 'experiment_name': 'b.e11.BRCP85C5CNBDRD.f09_g16.002', + 'glob_foo_string': 'sample_data/cesm-le/b.e11.BRCP85C5CNBDRD.f09_g16.002*.nc', + 'member_id': '002', + }, + ], + 'experiment': 'b.e11.BRCP85C5CNBDRD', + } + ] +} +yinput10 = {'catalog': {}} + + +@pytest.mark.parametrize( + 'yaml_path, csv_path, validater, data', + [ + (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', yinput1), + (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', yinput2), + (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', yinput3), + (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', yinput4), + (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', yinput5), + (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', yinput6), + (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', yinput7), + (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', yinput8), + (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', yinput9), + (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', yinput10), + (str(yaml_root_path) + '/cmip6.yaml', None, 'yamale', yinput1), + (str(yaml_root_path) + '/cmip6.yaml', None, 'yamale', yinput2), + (str(yaml_root_path) + '/cmip6.yaml', None, 'yamale', yinput3), + (str(yaml_root_path) + '/cmip6.yaml', None, 'yamale', yinput4), + (str(yaml_root_path) + '/cmip6.yaml', None, 'yamale', yinput5), + (str(yaml_root_path) + '/cmip6.yaml', None, 'yamale', yinput6), + (str(yaml_root_path) + '/cmip6.yaml', None, 'yamale', yinput7), + (str(yaml_root_path) + '/cmip6.yaml', None, 'yamale', yinput8), + (str(yaml_root_path) + '/cmip6.yaml', None, 'yamale', yinput9), + (str(yaml_root_path) + '/cmip6.yaml', None, 'yamale', yinput10), + ], +) +def test_yaml_validation(yaml_path, csv_path, validater, data): + + with TemporaryDirectory() as local_dir: + temp_yaml_file = f'{local_dir}/my_yaml.yaml' + with open(temp_yaml_file, 'w') as f: + yaml.dump(data, f) + + p = YAML_Parser(temp_yaml_file, csv_path=csv_path, validater=validater) + assert bool(p.valid_yaml) is False From 61053432fe42474779ae2c3314fd69997296dcb2 Mon Sep 17 00:00:00 2001 From: sherimickelson Date: Mon, 27 Jul 2020 10:03:59 -0600 Subject: [PATCH 2/8] Fix for test that failed. --- ecgtools/core.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ecgtools/core.py b/ecgtools/core.py index 4d49257..0bc79eb 100644 --- a/ecgtools/core.py +++ b/ecgtools/core.py @@ -393,7 +393,7 @@ def batch(seq): result_batch = dask.delayed(batch)(filepaths[i : i + nbatches]) results.append(result_batch) else: - results = [_parse_file_attributes(filepath, local_attrs, parser) for filepath in filepaths] + results = [_parse_file_attributes(filepath, parser, local_attrs) for filepath in filepaths] if dask.is_dask_collection(results[0]): results = dask.compute(*results) @@ -401,7 +401,7 @@ def batch(seq): return pd.DataFrame(results) -def _parse_file_attributes(filepath: str, local_attrs: dict, parser: callable = None): +def _parse_file_attributes(filepath: str, parser: callable = None, local_attrs: dict = {}): """ Single file attributes harvesting @@ -421,7 +421,7 @@ def _parse_file_attributes(filepath: str, local_attrs: dict, parser: callable = results = {'path': filepath} if parser is not None: - if not bool(local_attrs): + if len(local_attrs.keys()) == 0: x = parser(filepath) else: x = parser(filepath, local_attrs) From e7a2090c5dead77cb92e9e0ed5e3b003eccf4019 Mon Sep 17 00:00:00 2001 From: sherimickelson Date: Tue, 4 Aug 2020 15:36:48 -0600 Subject: [PATCH 3/8] Add suggested changes. --- ci/environment.yml | 1 + ecgtools/core.py | 21 ++---- ecgtools/parsers.py | 166 +++++++++++++++++--------------------------- requirements.txt | 1 + tests/test_core.py | 18 +---- 5 files changed, 71 insertions(+), 136 deletions(-) diff --git a/ci/environment.yml b/ci/environment.yml index e98015e..643649c 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -12,3 +12,4 @@ dependencies: - pytest - pytest-cov - xarray + - yamale diff --git a/ecgtools/core.py b/ecgtools/core.py index 6c78a0a..9134fb3 100644 --- a/ecgtools/core.py +++ b/ecgtools/core.py @@ -201,7 +201,7 @@ def build( cat_id: str = None, description: str = None, attributes: List[dict] = None, - local_attrs: dict = None, + local_attrs: dict = {}, ) -> 'Builder': """ Harvest attributes for a list of files. This method produces a pandas dataframe @@ -261,13 +261,7 @@ def build( 'aggregations': aggregations, } - if len(self.filelist) == 0: - filelist = self.filelist or self._get_filelist_from_dirs() - else: - filelist = self.filelist - - if local_attrs is None: - local_attrs = {} + filelist = self.filelist or self._get_filelist_from_dirs() df = parse_files_attributes(filelist, local_attrs, self.parser, self.lazy, self.nbatches) @@ -281,7 +275,7 @@ def build( self.df = df return self - def update(self, catalog_file: str, path_column: str, local_attrs: dict = None,) -> 'Builder': + def update(self, catalog_file: str, path_column: str, local_attrs: dict = {},) -> 'Builder': """ Update a previously built catalog. @@ -298,9 +292,6 @@ def update(self, catalog_file: str, path_column: str, local_attrs: dict = None,) filelist_from_prev_cat = self.old_df[path_column].tolist() filelist = self._get_filelist_from_dirs() - if local_attrs is None: - local_attrs = {} - # Case 1: The new filelist has files that are not included in the # Previously built catalog files_to_parse = list(set(filelist) - set(filelist_from_prev_cat)) @@ -421,11 +412,7 @@ def _parse_file_attributes(filepath: str, parser: callable = None, local_attrs: results = {'path': filepath} if parser is not None: - if len(local_attrs.keys()) == 0: - x = parser(filepath) - else: - x = parser(filepath, local_attrs) - # Merge x and results dictionaries + x = parser(filepath) results = {**x, **results} return results diff --git a/ecgtools/parsers.py b/ecgtools/parsers.py index c39d712..cacfef1 100644 --- a/ecgtools/parsers.py +++ b/ecgtools/parsers.py @@ -1,3 +1,4 @@ +import functools import glob import os import re @@ -33,7 +34,7 @@ def extract_attr_with_regex( def cmip6_default_parser( filepath: str, - global_attrs: list, + global_attrs: list = None, variable_attrs: list = None, attrs_mapping: dict = None, add_dim: bool = True, @@ -62,6 +63,7 @@ def cmip6_default_parser( dict A dictionary of attributes harvested from the input CMIP6 netCDF file. """ + try: results = {'path': filepath} ds = xr.open_dataset(filepath, decode_times=True, use_cftime=True, chunks={}) @@ -127,7 +129,7 @@ def __init__( self.yaml_path = yaml_path self.csv_path = csv_path - self.builder = Builder(None, parser=self._parser_netcdf, lazy=False) + self.builder = None self.validater = validater # Read in the yaml file and validate @@ -135,69 +137,6 @@ def __init__( self.input_yaml = yaml.safe_load(f) self.valid_yaml = self._validate_yaml() - def _internal_validation(self): - """ - Validates the generic yaml input against the schema if the user does not have yamale in - their environment. - - Parameters - ---------- - None - - Returns - ------- - boolean - True - passes the validation, False - fails the validation - """ - - # verify that we're working with a dictionary - if not isinstance(self.input_yaml, dict): - print( - 'ERROR: The experiment/dataset top level is not a dictionary. Make sure you follow the correct format.' - ) - return False - # verify that the first line is 'catalog:' and it only appears once in the yaml file - if len(self.input_yaml.keys()) != 1 or 'catalog' not in self.input_yaml.keys(): - print( - "ERROR: The first line in the yaml file must be 'catalog:' and it should only appear once." - ) - return False - if not isinstance(self.input_yaml['catalog'], list): - print( - 'ERROR: The catalog entries are not in a list. make sure you follow the corrrect format.' - ) - return False - for dataset in self.input_yaml['catalog']: - # check to see if there is a data_sources key for each dataset - if 'data_sources' not in dataset.keys(): - print("ERROR: Each experiment/dataset must have the key 'data_sources'.") - return False - # verify that we're working with a list at this level - if not isinstance(dataset['data_sources'], list): - print( - 'ERROR: The data_source entries are not in a list. Make sure you follow the correct format.' - ) - return False - for stream_info in dataset['data_sources']: - # check to make sure that there's a 'glob_string' key for each data_source - if 'glob_string' not in stream_info.keys(): - print("ERROR: Each data_source must contain a 'glob_string' key.") - return False - # ensemble is an option, but we still need to verify that it meets the rules if it is added - if 'ensemble' in dataset.keys(): - # verify that we're working with a list at this level - if not isinstance(dataset['ensemble'], list): - print( - 'ERROR: The ensemble entries are not in a list. Make sure you follow the correct format.' - ) - return False - for stream_info in dataset['ensemble']: - # check to make sure that there's a 'glob_string' key for each ensemble entry - if 'glob_string' not in stream_info.keys(): - print("ERROR: Each ensemble must contain a 'glob_string' key.") - return False - return True - def _validate_yaml(self): """ Validates the generic yaml input against the schema. It uses either yamale or the internal validater. @@ -235,11 +174,13 @@ def _validate_yaml(self): print(e) return False except ImportError: - print('Validating yaml file internally.') - return self._internal_validation() + print('Did not validate yaml because yamale not found.') + print('If unexpected results occur, try installing yamale and rerun.') + return True else: - print('Validating yaml file internally.') - return self._internal_validation() + print('Did not validate yaml.') + print('If unexpected results occur, try installing yamale and rerun.') + return True def _parser_netcdf(self, filepath, local_attrs): """ @@ -260,51 +201,58 @@ def _parser_netcdf(self, filepath, local_attrs): """ fileparts = {} - fileparts['path'] = filepath try: fileparts['variable'] = [] + fileparts['start_time'] = [] + fileparts['end_time'] = [] + fileparts['path'] = [] + # open file d = nc.Dataset(filepath, 'r') + # find what the time (unlimited) dimension is dims = list(dict(d.dimensions).keys()) - if 'time' in d.variables.keys(): - times = d['time'] - start = str(times[0]) - end = str(times[-1]) - fileparts['time_range'] = start + '-' + end + # loop through all variables - for v in d.variables.keys(): + for v in d.variables: # add all variables that are not coordinates to the catalog if v not in dims: fileparts['variable'].append(v) - - # add the keys that are common just to the particular glob string - # fileparts.update(local_attrs[filepath]) - for lv in local_attrs[filepath].keys(): - if '<<' in local_attrs[filepath][lv]: - for v in fileparts['variable']: - if lv not in fileparts.keys(): - fileparts[lv] = [] - if hasattr(d.variables[v], lv): - fileparts[lv].append(getattr(d.variables[v], lv)) + fileparts['path'].append(filepath) + + if 'time' in d.variables.keys(): + times = d['time'] + fileparts['start_time'].append(times[0]) + fileparts['end_time'].append(times[-1]) + + # add the keys that are common just to the particular glob string + # fileparts.update(local_attrs[filepath]) + for lv in local_attrs[filepath].keys(): + if '<<' in local_attrs[filepath][lv]: + if lv not in fileparts.keys(): + fileparts[lv] = [] + if hasattr(d.variables[v], lv): + fileparts[lv].append(getattr(d.variables[v], lv)) + else: + fileparts[lv].append('NaN') + elif '<' in local_attrs[filepath][lv]: + k = local_attrs[filepath][lv].replace('<', '').replace('>', '') + if hasattr(d, k): + fileparts[lv] = getattr(d, k) + else: + fileparts[lv] = 'NaN' else: - fileparts[lv].append('None') - elif '<' in local_attrs[filepath][lv]: - k = local_attrs[filepath][lv].replace('<', '').replace('>', '') - if hasattr(d, k): - fileparts[lv] = getattr(d, k) - else: - fileparts[lv] = 'None' - else: - fileparts[lv] = local_attrs[filepath][lv] + if lv not in fileparts.keys(): + fileparts[lv] = [] + fileparts[lv].append(local_attrs[filepath][lv]) # close netcdf file d.close() except Exception: pass return fileparts - def parser(self) -> 'YAML_Parser': + def parser(self) -> 'Builder': """ Method used to start the parsing process. @@ -331,21 +279,31 @@ def parser(self) -> 'YAML_Parser': if 'ensemble' in dataset.keys(): for member in dataset['ensemble']: glob_string = member.pop('glob_string') - self.builder.filelist = glob.glob(glob_string) - for f in self.builder.filelist: + filelist = glob.glob(glob_string) + for f in filelist: entries[f].update(member) # loop over all of the data_sources for the dataset, create a dataframe # for each data_source, append that dataframe to a list that will contain # the full dataframe (or catalog) based on everything in the yaml file. for stream_info in dataset['data_sources']: - self.builder.filelist = glob.glob(stream_info['glob_string']) + filelist = glob.glob(stream_info['glob_string']) stream_info.pop('glob_string') - for f in self.builder.filelist: + for f in filelist: entries[f].update(stream_info) - df_parts.append(self.builder.build('path', 'variable', local_attrs=entries).df) - # create the combined dataframe from all of the data_sources and datasets from - # the yaml file - df = pd.concat(df_parts, sort=False) + + partial_parser_netcdf = functools.partial(self._parser_netcdf, local_attrs=entries) + self.builder = Builder(None, parser=partial_parser_netcdf, lazy=False) + self.builder.filelist = [x for x in entries.keys() if x != 'global'] + df_parts.append( + self.builder.build('path', 'variable') + .df.set_index('path') + .apply(lambda x: x.apply(pd.Series).stack()) + .reset_index() + .drop('level_1', 1) + ) + # create the combined dataframe from all of the data_sources and datasets from + # the yaml file + df = pd.concat(df_parts, sort=False) self.builder.df = df.sort_values(by=['path']) return self.builder diff --git a/requirements.txt b/requirements.txt index 79f142f..b9cada2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ netCDF4 xarray +yamale dask[delayed] dask[bag] diff --git a/tests/test_core.py b/tests/test_core.py index a94e2bd..bbae09c 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -129,10 +129,9 @@ def test_builder_update(root_path, parser, num_items, dummy_assets): @pytest.mark.parametrize( 'yaml_path, csv_path, validater, expected_df_shape', [ - (str(yaml_root_path) + '/cmip6.yaml', None, 'yamale', (59, 12)), - (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', (59, 12)), - (str(yaml_root_path) + '/ensemble.yaml', None, 'yamale', (114, 10)), - (str(yaml_root_path) + '/ensemble.yaml', None, 'internal', (114, 10)), + (str(yaml_root_path) + '/cmip6.yaml', None, 'yamale', (177, 13)), + (str(yaml_root_path) + '/cmip6.yaml', None, 'foo', (177, 13)), + (str(yaml_root_path) + '/ensemble.yaml', None, 'yamale', (6612, 11)), ], ) def test_yaml_parser(yaml_path, csv_path, validater, expected_df_shape): @@ -142,7 +141,6 @@ def test_yaml_parser(yaml_path, csv_path, validater, expected_df_shape): assert p.valid_yaml assert b.df.shape == expected_df_shape assert isinstance(b.df, pd.DataFrame) - assert len(b.filelist) == len(b.df) yinput1 = [] @@ -317,16 +315,6 @@ def test_yaml_parser(yaml_path, csv_path, validater, expected_df_shape): @pytest.mark.parametrize( 'yaml_path, csv_path, validater, data', [ - (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', yinput1), - (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', yinput2), - (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', yinput3), - (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', yinput4), - (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', yinput5), - (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', yinput6), - (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', yinput7), - (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', yinput8), - (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', yinput9), - (str(yaml_root_path) + '/cmip6.yaml', None, 'internal', yinput10), (str(yaml_root_path) + '/cmip6.yaml', None, 'yamale', yinput1), (str(yaml_root_path) + '/cmip6.yaml', None, 'yamale', yinput2), (str(yaml_root_path) + '/cmip6.yaml', None, 'yamale', yinput3), From 1304c265851c3e345f9af911259c3683860ca3e3 Mon Sep 17 00:00:00 2001 From: sherimickelson Date: Tue, 4 Aug 2020 16:33:22 -0600 Subject: [PATCH 4/8] Fix testing failure caused by an update to yamale. --- ecgtools/parsers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ecgtools/parsers.py b/ecgtools/parsers.py index cacfef1..7ab872d 100644 --- a/ecgtools/parsers.py +++ b/ecgtools/parsers.py @@ -162,7 +162,7 @@ def _validate_yaml(self): schema = yamale.make_schema(schema_path) data = yamale.make_data(self.yaml_path) try: - yamale.validate(schema, data) + yamale.validate(schema, data, strict=False) print('Validation success! 👍') return True except ValueError as e: From f0eb3da1a4b63798c6f49c8dff57360935d92201 Mon Sep 17 00:00:00 2001 From: sherimickelson Date: Tue, 4 Aug 2020 16:47:32 -0600 Subject: [PATCH 5/8] Change YAML_Parser to YAMLParser --- ecgtools/parsers.py | 4 ++-- notebooks/Generic_Interface_Example.ipynb | 2 +- tests/test_core.py | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/ecgtools/parsers.py b/ecgtools/parsers.py index 7ab872d..3c80b58 100644 --- a/ecgtools/parsers.py +++ b/ecgtools/parsers.py @@ -104,14 +104,14 @@ def cmip6_default_parser( return {} -class YAML_Parser: +class YAMLParser: """ Creates a parser that parses a yaml file in order to create a catalog file """ def __init__( self, yaml_path: str, csv_path: str = None, validater: str = 'yamale', - ) -> 'YAML_Parser': + ) -> 'YAMLParser': """ Get a list of files from a list of directories. diff --git a/notebooks/Generic_Interface_Example.ipynb b/notebooks/Generic_Interface_Example.ipynb index 94cbcef..770e530 100644 --- a/notebooks/Generic_Interface_Example.ipynb +++ b/notebooks/Generic_Interface_Example.ipynb @@ -25,7 +25,7 @@ "yaml_path = '../sample_yaml/cmip6.yaml'\n", "csv_path = '/path/to/put/output/file/cmip6_new.csv'\n", "\n", - "Parser = parsers.YAML_Parser(yaml_path, csv_path=csv_path)\n", + "Parser = parsers.YAMLParser(yaml_path, csv_path=csv_path)\n", "b = Parser.parser()" ] }, diff --git a/tests/test_core.py b/tests/test_core.py index bbae09c..963a9d0 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -9,7 +9,7 @@ import yaml from ecgtools import Builder -from ecgtools.parsers import YAML_Parser, cmip6_default_parser +from ecgtools.parsers import YAMLParser, cmip6_default_parser here = Path(os.path.dirname(__file__)) cmip6_root_path = here.parent / 'sample_data' / 'cmip' / 'CMIP6' @@ -135,7 +135,7 @@ def test_builder_update(root_path, parser, num_items, dummy_assets): ], ) def test_yaml_parser(yaml_path, csv_path, validater, expected_df_shape): - p = YAML_Parser(yaml_path, csv_path=csv_path, validater=validater) + p = YAMLParser(yaml_path, csv_path=csv_path, validater=validater) b = p.parser() assert p.valid_yaml @@ -334,5 +334,5 @@ def test_yaml_validation(yaml_path, csv_path, validater, data): with open(temp_yaml_file, 'w') as f: yaml.dump(data, f) - p = YAML_Parser(temp_yaml_file, csv_path=csv_path, validater=validater) + p = YAMLParser(temp_yaml_file, csv_path=csv_path, validater=validater) assert bool(p.valid_yaml) is False From c1cb278f4250fc72ee7ed73a66392bcb01a84bfe Mon Sep 17 00:00:00 2001 From: sherimickelson Date: Tue, 4 Aug 2020 16:55:48 -0600 Subject: [PATCH 6/8] Remove try block around yamale import --- ecgtools/parsers.py | 40 ++++++++++++++++++---------------------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/ecgtools/parsers.py b/ecgtools/parsers.py index 3c80b58..d5d8284 100644 --- a/ecgtools/parsers.py +++ b/ecgtools/parsers.py @@ -153,30 +153,26 @@ def _validate_yaml(self): # verify the format is correct if self.validater == 'yamale': + + import yamale + + print('Validating yaml file with yamale.') + cwd = Path(os.path.dirname(__file__)) + schema_path = str(cwd.parent / 'schema') + '/generic_schema.yaml' + schema = yamale.make_schema(schema_path) + data = yamale.make_data(self.yaml_path) try: - import yamale - - print('Validating yaml file with yamale.') - cwd = Path(os.path.dirname(__file__)) - schema_path = str(cwd.parent / 'schema') + '/generic_schema.yaml' - schema = yamale.make_schema(schema_path) - data = yamale.make_data(self.yaml_path) - try: - yamale.validate(schema, data, strict=False) - print('Validation success! 👍') - return True - except ValueError as e: - print( - 'Yamale found that your file, ' - + self.yaml_path - + ' is not formatted correctly.' - ) - print(e) - return False - except ImportError: - print('Did not validate yaml because yamale not found.') - print('If unexpected results occur, try installing yamale and rerun.') + yamale.validate(schema, data, strict=False) + print('Validation success! 👍') return True + except ValueError as e: + print( + 'Yamale found that your file, ' + + self.yaml_path + + ' is not formatted correctly.' + ) + print(e) + return False else: print('Did not validate yaml.') print('If unexpected results occur, try installing yamale and rerun.') From c0048883dc2bde5211ffbd5ae7eb88726094706a Mon Sep 17 00:00:00 2001 From: sherimickelson Date: Wed, 5 Aug 2020 07:56:04 -0600 Subject: [PATCH 7/8] Fix global attributes. --- ecgtools/parsers.py | 9 ++++++++- tests/test_core.py | 6 +++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/ecgtools/parsers.py b/ecgtools/parsers.py index d5d8284..902de37 100644 --- a/ecgtools/parsers.py +++ b/ecgtools/parsers.py @@ -222,6 +222,12 @@ def _parser_netcdf(self, filepath, local_attrs): fileparts['start_time'].append(times[0]) fileparts['end_time'].append(times[-1]) + # add global attributes + for g in local_attrs['global'].keys(): + if g not in fileparts.keys(): + fileparts[g] = [] + fileparts[g].append(local_attrs['global'][g]) + # add the keys that are common just to the particular glob string # fileparts.update(local_attrs[filepath]) for lv in local_attrs[filepath].keys(): @@ -268,9 +274,10 @@ def parser(self) -> 'Builder': # for dataset in input_yaml.keys(): for dataset in self.input_yaml['catalog']: # get a list of keys that are common to all files in the dataset + entries['global'] = {} for g in dataset.keys(): if 'data_sources' not in g and 'ensemble' not in g: - entries['global'] = dataset[g] + entries['global'][g] = dataset[g] # loop over ensemble members, if they exist if 'ensemble' in dataset.keys(): for member in dataset['ensemble']: diff --git a/tests/test_core.py b/tests/test_core.py index 963a9d0..c0bf097 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -129,9 +129,9 @@ def test_builder_update(root_path, parser, num_items, dummy_assets): @pytest.mark.parametrize( 'yaml_path, csv_path, validater, expected_df_shape', [ - (str(yaml_root_path) + '/cmip6.yaml', None, 'yamale', (177, 13)), - (str(yaml_root_path) + '/cmip6.yaml', None, 'foo', (177, 13)), - (str(yaml_root_path) + '/ensemble.yaml', None, 'yamale', (6612, 11)), + (str(yaml_root_path) + '/cmip6.yaml', None, 'yamale', (177, 14)), + (str(yaml_root_path) + '/cmip6.yaml', None, 'foo', (177, 14)), + (str(yaml_root_path) + '/ensemble.yaml', None, 'yamale', (6612, 13)), ], ) def test_yaml_parser(yaml_path, csv_path, validater, expected_df_shape): From 8121b8fdb50fce03e00e165d6a9a053afa96ea7f Mon Sep 17 00:00:00 2001 From: sherimickelson Date: Wed, 5 Aug 2020 12:03:12 -0600 Subject: [PATCH 8/8] Code cleanup. --- ecgtools/core.py | 17 ++++++----------- ecgtools/parsers.py | 10 ++++------ 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/ecgtools/core.py b/ecgtools/core.py index 9134fb3..84d06fa 100644 --- a/ecgtools/core.py +++ b/ecgtools/core.py @@ -201,7 +201,6 @@ def build( cat_id: str = None, description: str = None, attributes: List[dict] = None, - local_attrs: dict = {}, ) -> 'Builder': """ Harvest attributes for a list of files. This method produces a pandas dataframe @@ -263,7 +262,7 @@ def build( filelist = self.filelist or self._get_filelist_from_dirs() - df = parse_files_attributes(filelist, local_attrs, self.parser, self.lazy, self.nbatches) + df = parse_files_attributes(filelist, self.parser, self.lazy, self.nbatches) if attributes is None: attributes = [] @@ -275,7 +274,7 @@ def build( self.df = df return self - def update(self, catalog_file: str, path_column: str, local_attrs: dict = {},) -> 'Builder': + def update(self, catalog_file: str, path_column: str,) -> 'Builder': """ Update a previously built catalog. @@ -297,7 +296,7 @@ def update(self, catalog_file: str, path_column: str, local_attrs: dict = {},) - files_to_parse = list(set(filelist) - set(filelist_from_prev_cat)) if files_to_parse: self.new_df = parse_files_attributes( - files_to_parse, local_attrs, self.parser, self.lazy, self.nbatches + files_to_parse, self.parser, self.lazy, self.nbatches ) else: self.new_df = pd.DataFrame() @@ -346,11 +345,7 @@ def save(self, catalog_file: str, **kwargs,) -> 'Builder': def parse_files_attributes( - filepaths: list, - local_attrs: dict, - parser: callable = None, - lazy: bool = True, - nbatches: int = 25, + filepaths: list, parser: callable = None, lazy: bool = True, nbatches: int = 25, ) -> pd.DataFrame: """ Harvest attributes for a list of files. @@ -384,7 +379,7 @@ def batch(seq): result_batch = dask.delayed(batch)(filepaths[i : i + nbatches]) results.append(result_batch) else: - results = [_parse_file_attributes(filepath, parser, local_attrs) for filepath in filepaths] + results = [_parse_file_attributes(filepath, parser) for filepath in filepaths] if dask.is_dask_collection(results[0]): results = dask.compute(*results) @@ -392,7 +387,7 @@ def batch(seq): return pd.DataFrame(results) -def _parse_file_attributes(filepath: str, parser: callable = None, local_attrs: dict = {}): +def _parse_file_attributes(filepath: str, parser: callable = None): """ Single file attributes harvesting diff --git a/ecgtools/parsers.py b/ecgtools/parsers.py index 902de37..6bcae9a 100644 --- a/ecgtools/parsers.py +++ b/ecgtools/parsers.py @@ -231,9 +231,9 @@ def _parser_netcdf(self, filepath, local_attrs): # add the keys that are common just to the particular glob string # fileparts.update(local_attrs[filepath]) for lv in local_attrs[filepath].keys(): + if lv not in fileparts.keys(): + fileparts[lv] = [] if '<<' in local_attrs[filepath][lv]: - if lv not in fileparts.keys(): - fileparts[lv] = [] if hasattr(d.variables[v], lv): fileparts[lv].append(getattr(d.variables[v], lv)) else: @@ -241,12 +241,10 @@ def _parser_netcdf(self, filepath, local_attrs): elif '<' in local_attrs[filepath][lv]: k = local_attrs[filepath][lv].replace('<', '').replace('>', '') if hasattr(d, k): - fileparts[lv] = getattr(d, k) + fileparts[lv].append(getattr(d, k)) else: - fileparts[lv] = 'NaN' + fileparts[lv].append('NaN') else: - if lv not in fileparts.keys(): - fileparts[lv] = [] fileparts[lv].append(local_attrs[filepath][lv]) # close netcdf file d.close()