Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic interface #5

Merged
merged 9 commits into from
Aug 31, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ repos:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-docstring-first
- id: check-yaml
andersy005 marked this conversation as resolved.
Show resolved Hide resolved
- id: double-quote-string-fixer

- repo: https://github.com/ambv/black
Expand Down
1 change: 1 addition & 0 deletions ci/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ dependencies:
- pytest
- pytest-cov
- xarray
- yamale
25 changes: 16 additions & 9 deletions ecgtools/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ def __init__(
FileNotFoundError
When `root_path` does not exist.
"""
self.root_path = Path(root_path)
if not self.root_path.is_dir():
if root_path is not None:
self.root_path = Path(root_path)
if root_path is not None and not self.root_path.is_dir():
raise FileNotFoundError(f'{root_path} directory does not exist')
if parser is not None and not callable(parser):
raise TypeError('parser must be callable.')
Expand Down Expand Up @@ -200,6 +201,7 @@ def build(
cat_id: str = None,
description: str = None,
attributes: List[dict] = None,
local_attrs: dict = {},
) -> 'Builder':
"""
Harvest attributes for a list of files. This method produces a pandas dataframe
Expand Down Expand Up @@ -260,7 +262,8 @@ def build(
}

filelist = self.filelist or self._get_filelist_from_dirs()
df = parse_files_attributes(filelist, self.parser, self.lazy, self.nbatches)

df = parse_files_attributes(filelist, local_attrs, self.parser, self.lazy, self.nbatches)

if attributes is None:
attributes = []
Expand All @@ -272,7 +275,7 @@ def build(
self.df = df
return self

def update(self, catalog_file: str, path_column: str) -> 'Builder':
def update(self, catalog_file: str, path_column: str, local_attrs: dict = {},) -> 'Builder':
"""
Update a previously built catalog.

Expand All @@ -288,12 +291,13 @@ def update(self, catalog_file: str, path_column: str) -> 'Builder':
self.old_df = pd.read_csv(catalog_file)
filelist_from_prev_cat = self.old_df[path_column].tolist()
filelist = self._get_filelist_from_dirs()

# Case 1: The new filelist has files that are not included in the
# Previously built catalog
files_to_parse = list(set(filelist) - set(filelist_from_prev_cat))
if files_to_parse:
self.new_df = parse_files_attributes(
files_to_parse, self.parser, self.lazy, self.nbatches
files_to_parse, local_attrs, self.parser, self.lazy, self.nbatches
)
else:
self.new_df = pd.DataFrame()
Expand Down Expand Up @@ -342,7 +346,11 @@ def save(self, catalog_file: str, **kwargs,) -> 'Builder':


def parse_files_attributes(
filepaths: list, parser: callable = None, lazy: bool = True, nbatches: int = 25,
filepaths: list,
local_attrs: dict,
parser: callable = None,
lazy: bool = True,
nbatches: int = 25,
) -> pd.DataFrame:
"""
Harvest attributes for a list of files.
Expand Down Expand Up @@ -376,15 +384,15 @@ def batch(seq):
result_batch = dask.delayed(batch)(filepaths[i : i + nbatches])
results.append(result_batch)
else:
results = [_parse_file_attributes(filepath, parser) for filepath in filepaths]
results = [_parse_file_attributes(filepath, parser, local_attrs) for filepath in filepaths]

if dask.is_dask_collection(results[0]):
results = dask.compute(*results)
results = list(itertools.chain(*results))
return pd.DataFrame(results)


def _parse_file_attributes(filepath: str, parser: callable = None):
def _parse_file_attributes(filepath: str, parser: callable = None, local_attrs: dict = {}):
sherimickelson marked this conversation as resolved.
Show resolved Hide resolved
"""
Single file attributes harvesting

Expand All @@ -405,7 +413,6 @@ def _parse_file_attributes(filepath: str, parser: callable = None):
results = {'path': filepath}
if parser is not None:
x = parser(filepath)
# Merge x and results dictionaries
results = {**x, **results}
return results

Expand Down
217 changes: 216 additions & 1 deletion ecgtools/parsers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
import functools
import glob
import os
import re
from collections import defaultdict
from pathlib import Path

import netCDF4 as nc
import pandas as pd
import xarray as xr

from ecgtools import Builder


def extract_attr_with_regex(
input_str: str, regex: str, strip_chars: str = None, ignore_case: bool = True
Expand All @@ -25,7 +34,7 @@ def extract_attr_with_regex(

def cmip6_default_parser(
filepath: str,
global_attrs: list,
global_attrs: list = None,
variable_attrs: list = None,
attrs_mapping: dict = None,
add_dim: bool = True,
Expand Down Expand Up @@ -54,6 +63,7 @@ def cmip6_default_parser(
dict
A dictionary of attributes harvested from the input CMIP6 netCDF file.
"""

try:
results = {'path': filepath}
ds = xr.open_dataset(filepath, decode_times=True, use_cftime=True, chunks={})
Expand Down Expand Up @@ -92,3 +102,208 @@ def cmip6_default_parser(
data = {'exception': str(e), 'file': filepath}
print(data)
return {}


class YAMLParser:
"""
Creates a parser that parses a yaml file in order to create a catalog file
"""

def __init__(
self, yaml_path: str, csv_path: str = None, validater: str = 'yamale',
) -> 'YAMLParser':
"""
Get a list of files from a list of directories.

Parameters
----------
yaml_path : str
Path to the yaml file to be parsed
csv_path : str, optional
Full path to the output csv file
validater : str, optional
Choice of yaml validater. Valid options: 'yamale' or 'internal'; Default: yamale
"""

import yaml

self.yaml_path = yaml_path
self.csv_path = csv_path
self.builder = None
self.validater = validater

# Read in the yaml file and validate
with open(self.yaml_path, 'r') as f:
self.input_yaml = yaml.safe_load(f)
self.valid_yaml = self._validate_yaml()

def _validate_yaml(self):
"""
Validates the generic yaml input against the schema. It uses either yamale or the internal validater.

Parameters
----------
None

Returns
-------
boolean
True - passes the validation, False - fails the validation
"""

# verify the format is correct
if self.validater == 'yamale':
try:
import yamale
sherimickelson marked this conversation as resolved.
Show resolved Hide resolved
sherimickelson marked this conversation as resolved.
Show resolved Hide resolved

print('Validating yaml file with yamale.')
cwd = Path(os.path.dirname(__file__))
schema_path = str(cwd.parent / 'schema') + '/generic_schema.yaml'
schema = yamale.make_schema(schema_path)
data = yamale.make_data(self.yaml_path)
try:
yamale.validate(schema, data, strict=False)
print('Validation success! 👍')
return True
except ValueError as e:
print(
'Yamale found that your file, '
+ self.yaml_path
+ ' is not formatted correctly.'
)
print(e)
return False
except ImportError:
print('Did not validate yaml because yamale not found.')
print('If unexpected results occur, try installing yamale and rerun.')
return True
else:
print('Did not validate yaml.')
print('If unexpected results occur, try installing yamale and rerun.')
return True

def _parser_netcdf(self, filepath, local_attrs):
"""
Opens a netcdf file in order to gather time and requested attribute information.
Also attaches assigned attributes gathered from the yaml file.

Parameters
----------
filepath : str
The full path to the netcdf file to attatch attributes to.
local_attrs : dict
Holds attributes that need to be attached to the filenpath.

Returns
-------
dict
Returns all of the attributes that need to be assigned to the netcdf.
"""

fileparts = {}

try:
fileparts['variable'] = []
fileparts['start_time'] = []
fileparts['end_time'] = []
fileparts['path'] = []

# open file
d = nc.Dataset(filepath, 'r')

# find what the time (unlimited) dimension is
dims = list(dict(d.dimensions).keys())

# loop through all variables
for v in d.variables:
# add all variables that are not coordinates to the catalog
if v not in dims:
fileparts['variable'].append(v)
fileparts['path'].append(filepath)

if 'time' in d.variables.keys():
times = d['time']
fileparts['start_time'].append(times[0])
fileparts['end_time'].append(times[-1])

# add the keys that are common just to the particular glob string
# fileparts.update(local_attrs[filepath])
for lv in local_attrs[filepath].keys():
if '<<' in local_attrs[filepath][lv]:
if lv not in fileparts.keys():
fileparts[lv] = []
if hasattr(d.variables[v], lv):
fileparts[lv].append(getattr(d.variables[v], lv))
else:
fileparts[lv].append('NaN')
elif '<' in local_attrs[filepath][lv]:
k = local_attrs[filepath][lv].replace('<', '').replace('>', '')
if hasattr(d, k):
fileparts[lv] = getattr(d, k)
else:
fileparts[lv] = 'NaN'
else:
if lv not in fileparts.keys():
fileparts[lv] = []
fileparts[lv].append(local_attrs[filepath][lv])
# close netcdf file
d.close()
except Exception:
pass
return fileparts

def parser(self) -> 'Builder':
"""
Method used to start the parsing process.

Parameters
----------
None

Returns
-------
Builder
Returns a Builder object.
"""

# loop over datasets
df_parts = []
entries = defaultdict(dict)
# for dataset in input_yaml.keys():
for dataset in self.input_yaml['catalog']:
# get a list of keys that are common to all files in the dataset
for g in dataset.keys():
if 'data_sources' not in g and 'ensemble' not in g:
entries['global'] = dataset[g]
# loop over ensemble members, if they exist
if 'ensemble' in dataset.keys():
for member in dataset['ensemble']:
glob_string = member.pop('glob_string')
filelist = glob.glob(glob_string)
for f in filelist:
entries[f].update(member)
# loop over all of the data_sources for the dataset, create a dataframe
# for each data_source, append that dataframe to a list that will contain
# the full dataframe (or catalog) based on everything in the yaml file.
for stream_info in dataset['data_sources']:
filelist = glob.glob(stream_info['glob_string'])
stream_info.pop('glob_string')
for f in filelist:
entries[f].update(stream_info)

partial_parser_netcdf = functools.partial(self._parser_netcdf, local_attrs=entries)
self.builder = Builder(None, parser=partial_parser_netcdf, lazy=False)
self.builder.filelist = [x for x in entries.keys() if x != 'global']
df_parts.append(
self.builder.build('path', 'variable')
.df.set_index('path')
.apply(lambda x: x.apply(pd.Series).stack())
.reset_index()
.drop('level_1', 1)
)
# create the combined dataframe from all of the data_sources and datasets from
# the yaml file
df = pd.concat(df_parts, sort=False)

self.builder.df = df.sort_values(by=['path'])
return self.builder
Loading