Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic interface #5

Merged
merged 9 commits into from
Aug 31, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ repos:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-docstring-first
- id: check-yaml
andersy005 marked this conversation as resolved.
Show resolved Hide resolved
- id: double-quote-string-fixer

- repo: https://github.com/ambv/black
Expand Down
40 changes: 30 additions & 10 deletions ecgtools/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ def __init__(
FileNotFoundError
When `root_path` does not exist.
"""
self.root_path = Path(root_path)
if not self.root_path.is_dir():
if root_path is not None:
self.root_path = Path(root_path)
if root_path is not None and not self.root_path.is_dir():
raise FileNotFoundError(f'{root_path} directory does not exist')
if parser is not None and not callable(parser):
raise TypeError('parser must be callable.')
Expand Down Expand Up @@ -200,6 +201,7 @@ def build(
cat_id: str = None,
description: str = None,
attributes: List[dict] = None,
local_attrs: dict = None,
) -> 'Builder':
"""
Harvest attributes for a list of files. This method produces a pandas dataframe
Expand Down Expand Up @@ -259,8 +261,15 @@ def build(
'aggregations': aggregations,
}

filelist = self.filelist or self._get_filelist_from_dirs()
df = parse_files_attributes(filelist, self.parser, self.lazy, self.nbatches)
if len(self.filelist) == 0:
filelist = self.filelist or self._get_filelist_from_dirs()
else:
filelist = self.filelist
sherimickelson marked this conversation as resolved.
Show resolved Hide resolved

if local_attrs is None:
local_attrs = {}
sherimickelson marked this conversation as resolved.
Show resolved Hide resolved

df = parse_files_attributes(filelist, local_attrs, self.parser, self.lazy, self.nbatches)

if attributes is None:
attributes = []
Expand All @@ -272,7 +281,7 @@ def build(
self.df = df
return self

def update(self, catalog_file: str, path_column: str) -> 'Builder':
def update(self, catalog_file: str, path_column: str, local_attrs: dict = None,) -> 'Builder':
"""
Update a previously built catalog.

Expand All @@ -288,12 +297,16 @@ def update(self, catalog_file: str, path_column: str) -> 'Builder':
self.old_df = pd.read_csv(catalog_file)
filelist_from_prev_cat = self.old_df[path_column].tolist()
filelist = self._get_filelist_from_dirs()

if local_attrs is None:
local_attrs = {}

sherimickelson marked this conversation as resolved.
Show resolved Hide resolved
# Case 1: The new filelist has files that are not included in the
# Previously built catalog
files_to_parse = list(set(filelist) - set(filelist_from_prev_cat))
if files_to_parse:
self.new_df = parse_files_attributes(
files_to_parse, self.parser, self.lazy, self.nbatches
files_to_parse, local_attrs, self.parser, self.lazy, self.nbatches
)
else:
self.new_df = pd.DataFrame()
Expand Down Expand Up @@ -342,7 +355,11 @@ def save(self, catalog_file: str, **kwargs,) -> 'Builder':


def parse_files_attributes(
filepaths: list, parser: callable = None, lazy: bool = True, nbatches: int = 25,
filepaths: list,
local_attrs: dict,
parser: callable = None,
lazy: bool = True,
nbatches: int = 25,
) -> pd.DataFrame:
"""
Harvest attributes for a list of files.
Expand Down Expand Up @@ -376,15 +393,15 @@ def batch(seq):
result_batch = dask.delayed(batch)(filepaths[i : i + nbatches])
results.append(result_batch)
else:
results = [_parse_file_attributes(filepath, parser) for filepath in filepaths]
results = [_parse_file_attributes(filepath, parser, local_attrs) for filepath in filepaths]

if dask.is_dask_collection(results[0]):
results = dask.compute(*results)
results = list(itertools.chain(*results))
return pd.DataFrame(results)


def _parse_file_attributes(filepath: str, parser: callable = None):
def _parse_file_attributes(filepath: str, parser: callable = None, local_attrs: dict = {}):
sherimickelson marked this conversation as resolved.
Show resolved Hide resolved
"""
Single file attributes harvesting

Expand All @@ -404,7 +421,10 @@ def _parse_file_attributes(filepath: str, parser: callable = None):

results = {'path': filepath}
if parser is not None:
x = parser(filepath)
if len(local_attrs.keys()) == 0:
x = parser(filepath)
else:
x = parser(filepath, local_attrs)
sherimickelson marked this conversation as resolved.
Show resolved Hide resolved
# Merge x and results dictionaries
results = {**x, **results}
return results
Expand Down
257 changes: 257 additions & 0 deletions ecgtools/parsers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import glob
import os
import re
from collections import defaultdict
from pathlib import Path

import netCDF4 as nc
import pandas as pd
import xarray as xr

from ecgtools import Builder


def extract_attr_with_regex(
input_str: str, regex: str, strip_chars: str = None, ignore_case: bool = True
Expand Down Expand Up @@ -92,3 +100,252 @@ def cmip6_default_parser(
data = {'exception': str(e), 'file': filepath}
print(data)
return {}


class YAML_Parser:
sherimickelson marked this conversation as resolved.
Show resolved Hide resolved
"""
sherimickelson marked this conversation as resolved.
Show resolved Hide resolved
Creates a parser that parses a yaml file in order to create a catalog file
"""

def __init__(
self, yaml_path: str, csv_path: str = None, validater: str = 'yamale',
) -> 'YAML_Parser':
"""
Get a list of files from a list of directories.

Parameters
----------
yaml_path : str
Path to the yaml file to be parsed
csv_path : str, optional
Full path to the output csv file
validater : str, optional
Choice of yaml validater. Valid options: 'yamale' or 'internal'; Default: yamale
"""

import yaml

self.yaml_path = yaml_path
self.csv_path = csv_path
self.builder = Builder(None, parser=self._parser_netcdf, lazy=False)
self.validater = validater

# Read in the yaml file and validate
with open(self.yaml_path, 'r') as f:
self.input_yaml = yaml.safe_load(f)
self.valid_yaml = self._validate_yaml()

def _internal_validation(self):
"""
Validates the generic yaml input against the schema if the user does not have yamale in
their environment.

Parameters
----------
None

Returns
-------
boolean
True - passes the validation, False - fails the validation
"""

# verify that we're working with a dictionary
if not isinstance(self.input_yaml, dict):
print(
'ERROR: The experiment/dataset top level is not a dictionary. Make sure you follow the correct format.'
)
return False
# verify that the first line is 'catalog:' and it only appears once in the yaml file
if len(self.input_yaml.keys()) != 1 or 'catalog' not in self.input_yaml.keys():
print(
"ERROR: The first line in the yaml file must be 'catalog:' and it should only appear once."
)
return False
if not isinstance(self.input_yaml['catalog'], list):
print(
'ERROR: The catalog entries are not in a list. make sure you follow the corrrect format.'
)
return False
for dataset in self.input_yaml['catalog']:
# check to see if there is a data_sources key for each dataset
if 'data_sources' not in dataset.keys():
print("ERROR: Each experiment/dataset must have the key 'data_sources'.")
return False
# verify that we're working with a list at this level
if not isinstance(dataset['data_sources'], list):
print(
'ERROR: The data_source entries are not in a list. Make sure you follow the correct format.'
)
return False
for stream_info in dataset['data_sources']:
# check to make sure that there's a 'glob_string' key for each data_source
if 'glob_string' not in stream_info.keys():
print("ERROR: Each data_source must contain a 'glob_string' key.")
return False
# ensemble is an option, but we still need to verify that it meets the rules if it is added
if 'ensemble' in dataset.keys():
# verify that we're working with a list at this level
if not isinstance(dataset['ensemble'], list):
print(
'ERROR: The ensemble entries are not in a list. Make sure you follow the correct format.'
)
return False
for stream_info in dataset['ensemble']:
# check to make sure that there's a 'glob_string' key for each ensemble entry
if 'glob_string' not in stream_info.keys():
print("ERROR: Each ensemble must contain a 'glob_string' key.")
return False
return True

def _validate_yaml(self):
"""
Validates the generic yaml input against the schema. It uses either yamale or the internal validater.

Parameters
----------
None

Returns
-------
boolean
True - passes the validation, False - fails the validation
"""

# verify the format is correct
if self.validater == 'yamale':
try:
import yamale
sherimickelson marked this conversation as resolved.
Show resolved Hide resolved
sherimickelson marked this conversation as resolved.
Show resolved Hide resolved

print('Validating yaml file with yamale.')
cwd = Path(os.path.dirname(__file__))
schema_path = str(cwd.parent / 'schema') + '/generic_schema.yaml'
schema = yamale.make_schema(schema_path)
data = yamale.make_data(self.yaml_path)
try:
yamale.validate(schema, data)
print('Validation success! 👍')
return True
except ValueError as e:
print(
'Yamale found that your file, '
+ self.yaml_path
+ ' is not formatted correctly.'
)
print(e)
return False
except ImportError:
print('Validating yaml file internally.')
return self._internal_validation()
else:
print('Validating yaml file internally.')
return self._internal_validation()

def _parser_netcdf(self, filepath, local_attrs):
"""
Opens a netcdf file in order to gather time and requested attribute information.
Also attaches assigned attributes gathered from the yaml file.

Parameters
----------
filepath : str
The full path to the netcdf file to attatch attributes to.
local_attrs : dict
Holds attributes that need to be attached to the filenpath.

Returns
-------
dict
Returns all of the attributes that need to be assigned to the netcdf.
"""

fileparts = {}
fileparts['path'] = filepath

try:
fileparts['variable'] = []
# open file
d = nc.Dataset(filepath, 'r')
# find what the time (unlimited) dimension is
dims = list(dict(d.dimensions).keys())
if 'time' in d.variables.keys():
times = d['time']
start = str(times[0])
end = str(times[-1])
fileparts['time_range'] = start + '-' + end
sherimickelson marked this conversation as resolved.
Show resolved Hide resolved
# loop through all variables
for v in d.variables.keys():
sherimickelson marked this conversation as resolved.
Show resolved Hide resolved
# add all variables that are not coordinates to the catalog
if v not in dims:
fileparts['variable'].append(v)

# add the keys that are common just to the particular glob string
# fileparts.update(local_attrs[filepath])
for lv in local_attrs[filepath].keys():
if '<<' in local_attrs[filepath][lv]:
for v in fileparts['variable']:
if lv not in fileparts.keys():
fileparts[lv] = []
if hasattr(d.variables[v], lv):
fileparts[lv].append(getattr(d.variables[v], lv))
else:
fileparts[lv].append('None')
elif '<' in local_attrs[filepath][lv]:
k = local_attrs[filepath][lv].replace('<', '').replace('>', '')
if hasattr(d, k):
fileparts[lv] = getattr(d, k)
else:
fileparts[lv] = 'None'
else:
fileparts[lv] = local_attrs[filepath][lv]
# close netcdf file
d.close()
except Exception:
pass
return fileparts

def parser(self) -> 'YAML_Parser':
sherimickelson marked this conversation as resolved.
Show resolved Hide resolved
"""
Method used to start the parsing process.

Parameters
----------
None

Returns
-------
Builder
Returns a Builder object.
"""

# loop over datasets
df_parts = []
entries = defaultdict(dict)
# for dataset in input_yaml.keys():
for dataset in self.input_yaml['catalog']:
# get a list of keys that are common to all files in the dataset
for g in dataset.keys():
if 'data_sources' not in g and 'ensemble' not in g:
entries['global'] = dataset[g]
# loop over ensemble members, if they exist
if 'ensemble' in dataset.keys():
for member in dataset['ensemble']:
glob_string = member.pop('glob_string')
self.builder.filelist = glob.glob(glob_string)
for f in self.builder.filelist:
entries[f].update(member)
# loop over all of the data_sources for the dataset, create a dataframe
# for each data_source, append that dataframe to a list that will contain
# the full dataframe (or catalog) based on everything in the yaml file.
for stream_info in dataset['data_sources']:
self.builder.filelist = glob.glob(stream_info['glob_string'])
stream_info.pop('glob_string')
for f in self.builder.filelist:
entries[f].update(stream_info)
df_parts.append(self.builder.build('path', 'variable', local_attrs=entries).df)
# create the combined dataframe from all of the data_sources and datasets from
# the yaml file
df = pd.concat(df_parts, sort=False)

self.builder.df = df.sort_values(by=['path'])
return self.builder
Loading