From 81448a95dc20edef8eccf5151921cceabe45eee1 Mon Sep 17 00:00:00 2001 From: Justin Bousquin Date: Wed, 10 Apr 2024 17:15:28 -0500 Subject: [PATCH] Py open sci review (#56) * Update README.md Fix typo (readme.md): l7 on package name * Update README.md Fix typo (readme.md): double spaces * Update README.md Edit (general): new line at each full stop in a markdown paragraph. * Update contributing.rst fix typo (contributing.rst): double spaces * Update contributing.rst Edit (general): adding a new line at each full stop in a rst paragraph. * Update example workflow.rst Edit (general): add a new line at each full stop in a markdown or rst paragraph. * Update index.rst Edit (general): add a new line at each full stop in a markdown or rst paragraph. * wet_dry_drop() has become outside the normal workflow, what was being done in the try/except was a bit proactive and gets problematic with keeping the functions in a linear workflow. * from harmonize -> clean: df_checks(). add_qa_flag() * convert_unit_series() moved harmonize -> convert * Import specific functions instead of module * Fix docs examples * Module needs to be imported for example * 'Filter/sieve residue' & 'Yield' now included in this domain. This is meant to get the updated list, but will need to keep an eye out for additions for the example in docs like this. Especailly right now as there may be updates with WQX 2.0 -> 3.0. --- README.md | 13 +- contributing.rst | 21 +-- docs/source/example workflow.rst | 5 +- docs/source/index.rst | 9 +- harmonize_wq/basis.py | 5 +- harmonize_wq/clean.py | 143 ++++++++++++++--- harmonize_wq/convert.py | 104 ++++++++++++- harmonize_wq/domains.py | 17 +- harmonize_wq/harmonize.py | 260 +------------------------------ harmonize_wq/location.py | 21 ++- harmonize_wq/visualize.py | 6 +- harmonize_wq/wq_data.py | 78 ++++++++-- harmonize_wq/wrangle.py | 9 +- 13 files changed, 355 insertions(+), 336 deletions(-) diff --git a/README.md b/README.md index bc181c9..e07acc0 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,10 @@ # harmonize-wq Standardize, clean, and wrangle Water Quality Portal data into more analytic-ready formats -US EPA’s [Water Quality Portal (WQP)](https://www.waterqualitydata.us/) aggregates water quality, biological, and physical data provided by many organizations and has become an essential resource with tools to query and retrieval data using [python](https://github.com/USGS-python/dataretrieval) or [R](https://github.com/USGS-R/dataRetrieval). Given the variety of data and variety of data originators, using the data in analysis often requires data cleaning to ensure it meets the required quality standards and data wrangling to get it in a more analytic-ready format. Recognizing the definition of analysis-ready varies depending on the analysis, the harmonixe_wq package is intended to be a flexible water quality specific framework to help: +US EPA’s [Water Quality Portal (WQP)](https://www.waterqualitydata.us/) aggregates water quality, biological, and physical data provided by many organizations and has become an essential resource with tools to query and retrieval data using [python](https://github.com/USGS-python/dataretrieval) or [R](https://github.com/USGS-R/dataRetrieval). +Given the variety of data and variety of data originators, using the data in analysis often requires data cleaning to ensure it meets the required quality standards and data wrangling to get it in a more analytic-ready format. +Recognizing the definition of analysis-ready varies depending on the analysis, the harmonize_wq package is intended to be a flexible water quality specific framework to help: + - Identify differences in data units (including speciation and basis) - Identify differences in sampling or analytic methods - Resolve data errors using transparent assumptions @@ -73,7 +76,8 @@ df_cleaned ``` ### Transform results from long to wide format -There are many columns in the dataframe that are characteristic specific, that is they have different values for the same sample depending on the characteristic. To ensure one result for each sample after the transformation of the data these columns must either be split, generating a new column for each characteristic with values, or moved out from the table if not being used. +There are many columns in the dataframe that are characteristic specific, that is they have different values for the same sample depending on the characteristic. +To ensure one result for each sample after the transformation of the data these columns must either be split, generating a new column for each characteristic with values, or moved out from the table if not being used. ```python from harmonize_wq import wrangle @@ -108,8 +112,11 @@ QA_Temperature | QA | NA | harmonization processing quality issues ## Issue Tracker harmonize_wq is under development. Please report any bugs and enhancement ideas using the issue track: + https://github.com/USEPA/harmonize-wq/issues ## Disclaimer -The United States Environmental Protection Agency (EPA) GitHub project code is provided on an "as is" basis and the user assumes responsibility for its use. EPA has relinquished control of the information and no longer has responsibility to protect the integrity , confidentiality, or availability of the information. Any reference to specific commercial products, processes, or services by service mark, trademark, manufacturer, or otherwise, does not constitute or imply their endorsement, recommendation or favoring by EPA. The EPA seal and logo shall not be used in any manner to imply endorsement of any commercial product or activity by EPA or the United States Government. +The United States Environmental Protection Agency (EPA) GitHub project code is provided on an "as is" basis and the user assumes responsibility for its use. +EPA has relinquished control of the information and no longer has responsibility to protect the integrity, confidentiality, or availability of the information. Any reference to specific commercial products, processes, or services by service mark, trademark, manufacturer, or otherwise, does not constitute or imply their endorsement, recommendation or favoring by EPA. +The EPA seal and logo shall not be used in any manner to imply endorsement of any commercial product or activity by EPA or the United States Government. diff --git a/contributing.rst b/contributing.rst index b7b5845..6e8c647 100644 --- a/contributing.rst +++ b/contributing.rst @@ -3,13 +3,16 @@ Contributing to harmonize_wq ============================ -We’re so glad you’re thinking about contributing to an EPA open source project! If you’re unsure about anything, just ask — or submit your issue or pull request anyway. The worst that can happen is we’ll politely ask you to change something. We appreciate all friendly contributions. +We’re so glad you’re thinking about contributing to an EPA open source project! +If you’re unsure about anything, just ask — or submit your issue or pull request anyway. +The worst that can happen is we’ll politely ask you to change something. We appreciate all friendly contributions. We encourage you to read this project’s CONTRIBUTING policy (you are here), its `LICENSE `_, and its `README `_. -All contributions to this project will be released under the MIT dedication. By submitting a pull request or issue, you are agreeing to comply with this waiver of copyright interest. +All contributions to this project will be released under the MIT dedication. +By submitting a pull request or issue, you are agreeing to comply with this waiver of copyright interest. harmonize_wq uses: @@ -34,20 +37,18 @@ To contribute fixes, code, tests, or documentation, fork harmonize_wq in GitHub_ and submit the changes using a pull request against the **main** branch. - If you are submitting new code, add tests (see below) and documentation. -- Write "Closes #" in the PR description or a comment, as described in the - `GitHub docs`_. +- Write "Closes #" in the PR description or a comment, as described in the `GitHub docs`_. - Check tests and resolve any issues. In any case, feel free to use the `issue tracker`_ to discuss ideas for new features or improvements. -Notice that we will not merge a PR if tests are failing. In certain cases tests pass in your -machine but not in GitHub actions. There might be multiple reasons for this but these are some of -the most common: +Notice that we will not merge a PR if tests are failing. +In certain cases tests pass in your machine but not in GitHub actions. +There might be multiple reasons for this but these are some of the most common: - Your new code does not work for other operating systems or Python versions. -- The documentation is not being built properly or the examples in the docs are - not working. +- The documentation is not being built properly or the examples in the docs are not working. .. _`issue tracker`: https://github.com/USEPA/harmonize-wq/issues -.. _`GitHub docs`: https://help.github.com/articles/closing-issues-via-commit-messages/ \ No newline at end of file +.. _`GitHub docs`: https://help.github.com/articles/closing-issues-via-commit-messages/ diff --git a/docs/source/example workflow.rst b/docs/source/example workflow.rst index 850038b..3e8f5bd 100644 --- a/docs/source/example workflow.rst +++ b/docs/source/example workflow.rst @@ -55,7 +55,8 @@ Clean results Transform results from long to wide format ****************************************** -There are many columns in the :class:`pandas.DataFrame` that are characteristic specific, that is they have different values for the same sample depending on the characteristic. To ensure one result for each sample after the transformation of the data these columns must either be split, generating a new column for each characteristic with values, or moved out from the table if not being used. +There are many columns in the :class:`pandas.DataFrame` that are characteristic specific, that is they have different values for the same sample depending on the characteristic. +To ensure one result for each sample after the transformation of the data these columns must either be split, generating a new column for each characteristic with values, or moved out from the table if not being used. .. code-block:: python3 @@ -105,4 +106,4 @@ The number of columns in the resulting table is greatly reduced: |QA_Temperature | QA |NA |Harmonization quality issues | +----------------------------+-------------+----------------------------------------+-------------------------------+ -For more complete tutorial information, see: `demos `_ \ No newline at end of file +For more complete tutorial information, see: `demos `_ diff --git a/docs/source/index.rst b/docs/source/index.rst index bf71ae0..c307ea8 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -16,7 +16,9 @@ Standardize, clean, and wrangle Water Quality Portal data into more analytic-rea Overview ======== -US EPA’s `Water Quality Portal (WQP) `_ aggregates water quality, biological, and physical data provided by many organizations and has become an essential resource with tools to query and retrieve data using `python `_ or `R `_. Given the variety of data and data originators, using the data in analysis often requires cleaning to ensure it meets required quality standards and wrangling to get it in a more analytic-ready format. Recognizing the definition of analysis-ready varies depending on the analysis, the harmonize_wq package is intended to be a flexible water quality specific framework to help: +US EPA’s `Water Quality Portal (WQP) `_ aggregates water quality, biological, and physical data provided by many organizations and has become an essential resource with tools to query and retrieve data using `python `_ or `R `_. +Given the variety of data and data originators, using the data in analysis often requires cleaning to ensure it meets required quality standards and wrangling to get it in a more analytic-ready format. +Recognizing the definition of analysis-ready varies depending on the analysis, the harmonize_wq package is intended to be a flexible water quality specific framework to help: * Identify differences in data units (including speciation and basis) * Identify differences in sampling or analytic methods @@ -70,4 +72,7 @@ Indices and tables Disclaimer ========== -The United States Environmental Protection Agency (EPA) GitHub project code is provided on an “as is” basis and the user assumes responsibility for its use. EPA has relinquished control of the information and no longer has responsibility to protect the integrity , confidentiality, or availability of the information. Any reference to specific commercial products, processes, or services by service mark, trademark, manufacturer, or otherwise, does not constitute or imply their endorsement, recommendation or favoring by EPA. The EPA seal and logo shall not be used in any manner to imply endorsement of any commercial product or activity by EPA or the United States Government. \ No newline at end of file +The United States Environmental Protection Agency (EPA) GitHub project code is provided on an “as is” basis and the user assumes responsibility for its use. +EPA has relinquished control of the information and no longer has responsibility to protect the integrity , confidentiality, or availability of the information. +Any reference to specific commercial products, processes, or services by service mark, trademark, manufacturer, or otherwise, does not constitute or imply their endorsement, recommendation or favoring by EPA. +The EPA seal and logo shall not be used in any manner to imply endorsement of any commercial product or activity by EPA or the United States Government. diff --git a/harmonize_wq/basis.py b/harmonize_wq/basis.py index 1b1b85c..a7074fd 100644 --- a/harmonize_wq/basis.py +++ b/harmonize_wq/basis.py @@ -2,8 +2,7 @@ """Functions to process characteristic basis or return basis dictionary.""" from warnings import warn from numpy import nan -from harmonize_wq import harmonize - +from harmonize_wq.clean import add_qa_flag def unit_basis_dict(out_col): """Characteristic specific basis dictionary to define basis from units. @@ -169,7 +168,7 @@ def basis_from_unit(df_in, basis_dict, unit_col='Units', basis_col='Speciation') if old_basis != base: qa_mask = mask & (df[basis_col] == old_basis) warn(f'Mismatched {flag}', UserWarning) - df = harmonize.add_qa_flag(df, qa_mask, flag) + df = add_qa_flag(df, qa_mask, flag) # Add/update basis from unit df = set_basis(df, mask, base, basis_col) df[unit_col] = [new_unit if x == old_unit else x diff --git a/harmonize_wq/clean.py b/harmonize_wq/clean.py index 8fa549c..52ac28e 100644 --- a/harmonize_wq/clean.py +++ b/harmonize_wq/clean.py @@ -1,10 +1,11 @@ # -*- coding: utf-8 -*- """Functions to clean/correct additional columns in subset/entire dataset.""" -from warnings import warn +#from warnings import warn +from numpy import nan import dataretrieval.utils -from harmonize_wq import harmonize -from harmonize_wq import domains -from harmonize_wq import wrangle +from harmonize_wq.convert import convert_unit_series +from harmonize_wq.domains import accepted_methods +#from harmonize_wq.wrangle import add_activities_to_df def datetime(df_in): @@ -108,19 +109,62 @@ def harmonize_depth(df_in, units='meter'): unit_col = 'ResultDepthHeightMeasure/MeasureUnitCode' # Note: there are also 'Activity' cols for both of these & top/bottom depth - harmonize.df_checks(df_out, [meas_col, unit_col]) # Confirm columns in df + df_checks(df_out, [meas_col, unit_col]) # Confirm columns in df na_mask = df_out[meas_col].notna() # Mask NA to speed up processing # TODO: if units missing? params = {'quantity_series': df_out.loc[na_mask, meas_col], 'unit_series': df_out.loc[na_mask, unit_col], 'units': units, } - df_out.loc[na_mask, "Depth"] = harmonize.convert_unit_series(**params) + df_out.loc[na_mask, "Depth"] = convert_unit_series(**params) # TODO: where result depth is missing use activity depth? return df_out +def df_checks(df_in, columns=None): + """Check :class:`pandas.DataFrame` for columns. + + Parameters + ---------- + df_in : pandas.DataFrame + DataFrame that will be checked. + columns : list, optional + List of strings for column names. Default None, uses: + 'ResultMeasure/MeasureUnitCode','ResultMeasureValue','CharacteristicName'. + + Examples + -------- + Build pandas DataFrame for example: + + >>> from pandas import DataFrame + >>> df = DataFrame({'CharacteristicName': ['Phosphorus'],}) + >>> df + CharacteristicName + 0 Phosphorus + + Check for existing column: + + >>> from harmonize_wq import clean + >>> clean.df_checks(df, columns=['CharacteristicName']) + + If column is not in DataFrame it throws an AssertionError: + + >>> clean.df_checks(df, columns=['ResultMeasureValue']) + Traceback (most recent call last): + ... + AssertionError: ResultMeasureValue not in DataFrame + + """ + if columns is None: + # Assign defaults + columns = ('ResultMeasure/MeasureUnitCode', + 'ResultMeasureValue', + 'CharacteristicName') + for col in columns: + assert col in df_in.columns, f'{col} not in DataFrame' + + def check_precision(df_in, col, limit=3): """Add QA_flag if value in column has precision lower than limit. @@ -147,7 +191,7 @@ def check_precision(df_in, col, limit=3): # Create T/F mask based on len of everything after the decimal c_mask = [len(str(x).split('.')[1]) < limit for x in df_out[col]] flag = f'{col}: Imprecise: lessthan{limit}decimaldigits' - df_out = harmonize.add_qa_flag(df_out, c_mask, flag) # Assign flags + df_out = add_qa_flag(df_out, c_mask, flag) # Assign flags return df_out @@ -177,7 +221,7 @@ def methods_check(df_in, char_val, methods=None): """ if methods is None: - methods = domains.accepted_methods() + methods = accepted_methods() method_col = 'ResultAnalyticalMethod/MethodIdentifier' df2 = df_in.copy() # TODO: check df for method_col @@ -217,9 +261,9 @@ def wet_dry_checks(df_in, mask=None): df_out = df_in.copy() media_col = 'ActivityMediaName' # Check columns are in df - harmonize.df_checks(df_out, [media_col, - 'ResultSampleFractionText', - 'ResultWeightBasisText']) + df_checks(df_out, [media_col, + 'ResultSampleFractionText', + 'ResultWeightBasisText']) # QA - Sample Media, fix assigned 'Water' that are actually 'Sediment' qa_flag = f'{media_col}: Water changed to Sediment' # Create mask for bad data @@ -230,13 +274,70 @@ def wet_dry_checks(df_in, mask=None): if mask: media_mask = mask & (media_mask) # Assign QA flag where data was bad - df_out = harmonize.add_qa_flag(df_out, media_mask, qa_flag) + df_out = add_qa_flag(df_out, media_mask, qa_flag) # Fix the data df_out.loc[media_mask, 'ActivityMediaName'] = 'Sediment' return df_out +def add_qa_flag(df_in, mask, flag): + """Add flag to 'QA_flag' column in df_in. + + Parameters + ---------- + df_in : pandas.DataFrame + DataFrame that will be updated. + mask : pandas.Series + Row conditional mask to limit rows. + flag : str + Text to populate the new flag with. + + Returns + ------- + df_out : pandas.DataFrame + Updated copy of df_in. + + Examples + -------- + Build pandas DataFrame to use as input: + + >>> from pandas import DataFrame + >>> df = DataFrame({'CharacteristicName': ['Carbon', 'Phosphorus', 'Carbon',], + ... 'ResultMeasureValue': ['1.0', '0.265', '2.1'],}) + >>> df + CharacteristicName ResultMeasureValue + 0 Carbon 1.0 + 1 Phosphorus 0.265 + 2 Carbon 2.1 + + Assign simple flag string and mask to assign flag only to Carbon: + + >>> flag = 'words' + >>> mask = df['CharacteristicName']=='Carbon' + + >>> from harmonize_wq import clean + >>> clean.add_qa_flag(df, mask, flag) + CharacteristicName ResultMeasureValue QA_flag + 0 Carbon 1.0 words + 1 Phosphorus 0.265 NaN + 2 Carbon 2.1 words + """ + df_out = df_in.copy() + if 'QA_flag' not in list(df_out.columns): + df_out['QA_flag'] = nan + + # Append flag where QA_flag is not nan + cond_notna = mask & (df_out['QA_flag'].notna()) # Mask cond and not NA + existing_flags = df_out.loc[cond_notna, 'QA_flag'] # Current QA flags + df_out.loc[cond_notna, 'QA_flag'] = [f'{txt}; {flag}' for + txt in existing_flags] + # Equals flag where QA_flag is nan + df_out.loc[mask & (df_out['QA_flag'].isna()), 'QA_flag'] = flag + + return df_out + + def wet_dry_drop(df_in, wet_dry='wet', char_val=None): """Restrict to only water or only sediment samples. @@ -264,16 +365,16 @@ def wet_dry_drop(df_in, wet_dry='wet', char_val=None): # Set variables for columns and check they're in df media_col = 'ActivityMediaName' - try: - harmonize.df_checks(df2, media_col) - except AssertionError: - warn(f'Warning: {media_col} missing, querying from activities...') +# try: + df_checks(df2, media_col) +# except AssertionError: +# warn(f'Warning: {media_col} missing, querying from activities...') # Try query/join - if char_val: - df2 = wrangle.add_activities_to_df(df2, c_mask) - else: - df2 = wrangle.add_activities_to_df(df2) # no mask, runs on all - harmonize.df_checks(df2, [media_col]) # Check it's been added +# if char_val: +# df2 = add_activities_to_df(df2, c_mask) +# else: +# df2 = add_activities_to_df(df2) # no mask, runs on all +# df_checks(df2, [media_col]) # Check it's been added # if ERROR? # print('Query and join activities first') diff --git a/harmonize_wq/convert.py b/harmonize_wq/convert.py index 784e9c9..c5ea6c3 100644 --- a/harmonize_wq/convert.py +++ b/harmonize_wq/convert.py @@ -3,9 +3,12 @@ Contains several unit conversion functions not in :mod:`pint`. """ +from warnings import warn import math +import pandas import pint -from harmonize_wq import domains +from numpy import nan +from harmonize_wq.domains import registry_adds_list # TODO: does this constant belong here or in domains? @@ -27,12 +30,107 @@ u_reg = pint.UnitRegistry() # For use in wrappers # TODO: find more elegant way to do this with all definitions -for definition in domains.registry_adds_list('Turbidity'): +for definition in registry_adds_list('Turbidity'): u_reg.define(definition) -for definition in domains.registry_adds_list('Salinity'): +for definition in registry_adds_list('Salinity'): u_reg.define(definition) +#timeit: 159.17 +# def convert_unit_series(quantity_series, unit_series, units, ureg=None): +# # Convert quantities to float if they aren't already (should be) +# if quantity_series.dtype=='O': +# quantity_series = pandas.to_numeric(quantity_series) +# # Initialize classes from pint +# if ureg is None: +# ureg = pint.UnitRegistry() +# Q_ = ureg.Quantity +# # Create list of Quantity objects +# val_list = [Q_(q, ureg(unit)) for q, unit in zip(quantity_series, +# unit_series)] +# # Convert Quantity objects to new unit +# out_list = [val.to(ureg(units)) for val in val_list] +# # Re-index to return series +# return pandas.Series(out_list, index=quantity_series.index) +#timeit: 27.08 +def convert_unit_series(quantity_series, unit_series, units, ureg=None, errors='raise'): + """Convert quantities to consistent units. + + Convert list of quantities (quantity_list), each with a specified old unit, + to a quantity in units using :mod:`pint` constructor method. + + Parameters + ---------- + quantity_series : pandas.Series + List of quantities. Values should be numeric, must not include NaN. + unit_series : pandas.Series + List of units for each quantity in quantity_series. Values should be + string, must not include NaN. + units : str + Desired units. + ureg : pint.UnitRegistry, optional + Unit Registry Object with any custom units defined. The default is None. + errors : str, optional + Values of ‘ignore’, ‘raise’, or ‘skip’. The default is ‘raise’. + If ‘raise’, invalid dimension conversions will raise an exception. + If ‘skip’, invalid dimension conversions will not be converted. + If ‘ignore’, invalid dimension conversions will return the NaN. + + Returns + ------- + pandas.Series + Converted values from quantity_series in units with original index. + + Examples + -------- + Build series to use as input: + + >>> from pandas import Series + >>> quantity_series = Series([1, 10]) + >>> unit_series = Series(['mg/l', 'mg/ml',]) + + Convert series to series of pint Quantity objects in 'mg/l': + + >>> from harmonize_wq import convert + >>> convert.convert_unit_series(quantity_series, unit_series, units = 'mg/l') + 0 1.0 milligram / liter + 1 10000.000000000002 milligram / liter + dtype: object + """ + if quantity_series.dtype=='O': + quantity_series = pandas.to_numeric(quantity_series) + # Initialize classes from pint + if ureg is None: + ureg = pint.UnitRegistry() + Q_ = ureg.Quantity + + lst_series = [pandas.Series(dtype='object')] + # Note: set of series does not preservce order and must be sorted at end + for unit in list(set(unit_series)): + # Filter quantity_series by unit_series where == unit + f_quant_series = quantity_series.where(unit_series==unit).dropna() + unit_ = ureg(unit) # Set unit once per unit + result_list = [Q_(q, unit_) for q in f_quant_series] + if unit != units: + # Convert (units are all same so if one fails all will fail) + try: + result_list = [val.to(ureg(units)) for val in result_list] + except pint.DimensionalityError as exception: + if errors=='skip': + # do nothing, leave result_list unconverted + warn(f"WARNING: '{unit}' not converted") + elif errors=='ignore': + # convert to NaN + result_list = [nan for val in result_list] + warn(f"WARNING: '{unit}' converted to NaN") + else: + # errors=='raise', or anything else just in case + raise exception + # Re-index and add series to list + lst_series.append(pandas.Series(result_list, index=f_quant_series.index)) + return pandas.concat(lst_series).sort_index() + + def mass_to_moles(ureg, char_val, Q_): """Convert a mass to moles substance. diff --git a/harmonize_wq/domains.py b/harmonize_wq/domains.py index bf75b90..0415e0c 100644 --- a/harmonize_wq/domains.py +++ b/harmonize_wq/domains.py @@ -91,17 +91,20 @@ def get_domain_dict(table, cols=None): Return dictionary for domain from WQP table (e.g., 'ResultSampleFraction'), The default keys ('Name') are shown as values ('Description') are long: + >>> from harmonize_wq import domains >>> domains.get_domain_dict('ResultSampleFraction').keys() # doctest: +NORMALIZE_WHITESPACE dict_keys(['Acid Soluble', 'Bed Sediment', 'Bedload', 'Bioavailable', 'Comb Available', 'Dissolved', 'Extractable', 'Extractable, CaCO3-bound', 'Extractable, exchangeable', 'Extractable, organic-bnd', 'Extractable, other', 'Extractable, oxide-bound', - 'Extractable, residual', 'Field', 'Filterable', 'Filtered field and/or lab', - 'Filtered, field', 'Filtered, lab', 'Fixed', 'Free Available', 'Inorganic', - 'Leachable', 'Non-Filterable (Particle)', 'Non-settleable', 'Non-volatile', 'None', - 'Organic', 'Pot. Dissolved', 'Semivolatile', 'Settleable', 'Sieved', - 'Strong Acid Diss', 'Supernate', 'Suspended', 'Total', 'Total Recoverable', - 'Total Residual', 'Total Soluble', 'Unfiltered', 'Unfiltered, field', 'Vapor', - 'Volatile', 'Weak Acid Diss', 'non-linear function']) + 'Extractable, residual', 'Field', 'Filter/sieve residue', 'Filterable', + 'Filtered field and/or lab', 'Filtered, field', 'Filtered, lab', + 'Fixed', 'Free Available', 'Inorganic', 'Leachable', + 'Non-Filterable (Particle)', 'Non-settleable', 'Non-volatile', + 'None', 'Organic', 'Pot. Dissolved', 'Semivolatile', 'Settleable', + 'Sieved', 'Strong Acid Diss', 'Supernate', 'Suspended', 'Total', + 'Total Recoverable', 'Total Residual', 'Total Soluble', + 'Unfiltered', 'Unfiltered, field', 'Vapor', 'Volatile', + 'Weak Acid Diss', 'Yield', 'non-linear function']) """ if cols is None: diff --git a/harmonize_wq/harmonize.py b/harmonize_wq/harmonize.py index 68f3285..cb82af3 100644 --- a/harmonize_wq/harmonize.py +++ b/harmonize_wq/harmonize.py @@ -1,256 +1,11 @@ # -*- coding: utf-8 -*- """Functions to harmonize data retrieved from EPA's Water Quality Portal.""" from warnings import warn -import pandas -import pint from numpy import nan from harmonize_wq.wq_data import WQCharData -from harmonize_wq import domains from harmonize_wq import convert -from harmonize_wq import visualize as viz - - -def df_checks(df_in, columns=None): - """Check :class:`pandas.DataFrame` for columns. - - Parameters - ---------- - df_in : pandas.DataFrame - DataFrame that will be checked. - columns : list, optional - List of strings for column names. Default None, uses: - 'ResultMeasure/MeasureUnitCode','ResultMeasureValue','CharacteristicName'. - - Examples - -------- - Build pandas DataFrame for example: - - >>> from pandas import DataFrame - >>> df = DataFrame({'CharacteristicName': ['Phosphorus'],}) - >>> df - CharacteristicName - 0 Phosphorus - - Check for existing column: - - >>> from harmonize_wq import harmonize - >>> harmonize.df_checks(df, columns=['CharacteristicName']) - - If column is not in DataFrame it throws an AssertionError: - - >>> harmonize.df_checks(df, columns=['ResultMeasureValue']) - Traceback (most recent call last): - ... - AssertionError: ResultMeasureValue not in DataFrame - - """ - if columns is None: - # Assign defaults - columns = ('ResultMeasure/MeasureUnitCode', - 'ResultMeasureValue', - 'CharacteristicName') - for col in columns: - assert col in df_in.columns, f'{col} not in DataFrame' - - -#timeit: 159.17 -# def convert_unit_series(quantity_series, unit_series, units, ureg=None): -# # Convert quantities to float if they aren't already (should be) -# if quantity_series.dtype=='O': -# quantity_series = pandas.to_numeric(quantity_series) -# # Initialize classes from pint -# if ureg is None: -# ureg = pint.UnitRegistry() -# Q_ = ureg.Quantity -# # Create list of Quantity objects -# val_list = [Q_(q, ureg(unit)) for q, unit in zip(quantity_series, -# unit_series)] -# # Convert Quantity objects to new unit -# out_list = [val.to(ureg(units)) for val in val_list] -# # Re-index to return series -# return pandas.Series(out_list, index=quantity_series.index) -#timeit: 27.08 -def convert_unit_series(quantity_series, unit_series, units, ureg=None, errors='raise'): - """Convert quantities to consistent units. - - Convert list of quantities (quantity_list), each with a specified old unit, - to a quantity in units using :mod:`pint` constructor method. - - Parameters - ---------- - quantity_series : pandas.Series - List of quantities. Values should be numeric, must not include NaN. - unit_series : pandas.Series - List of units for each quantity in quantity_series. Values should be - string, must not include NaN. - units : str - Desired units. - ureg : pint.UnitRegistry, optional - Unit Registry Object with any custom units defined. The default is None. - errors : str, optional - Values of ‘ignore’, ‘raise’, or ‘skip’. The default is ‘raise’. - If ‘raise’, invalid dimension conversions will raise an exception. - If ‘skip’, invalid dimension conversions will not be converted. - If ‘ignore’, invalid dimension conversions will return the NaN. - - Returns - ------- - pandas.Series - Converted values from quantity_series in units with original index. - - Examples - -------- - Build series to use as input: - - >>> from pandas import Series - >>> quantity_series = Series([1, 10]) - >>> unit_series = Series(['mg/l', 'mg/ml',]) - - Convert series to series of pint Quantity objects in 'mg/l': - - >>> from harmonize_wq import harmonize - >>> harmonize.convert_unit_series(quantity_series, unit_series, units = 'mg/l') - 0 1.0 milligram / liter - 1 10000.000000000002 milligram / liter - dtype: object - """ - if quantity_series.dtype=='O': - quantity_series = pandas.to_numeric(quantity_series) - # Initialize classes from pint - if ureg is None: - ureg = pint.UnitRegistry() - Q_ = ureg.Quantity - - lst_series = [pandas.Series(dtype='object')] - # Note: set of series does not preservce order and must be sorted at end - for unit in list(set(unit_series)): - # Filter quantity_series by unit_series where == unit - f_quant_series = quantity_series.where(unit_series==unit).dropna() - unit_ = ureg(unit) # Set unit once per unit - result_list = [Q_(q, unit_) for q in f_quant_series] - if unit != units: - # Convert (units are all same so if one fails all will fail) - try: - result_list = [val.to(ureg(units)) for val in result_list] - except pint.DimensionalityError as exception: - if errors=='skip': - # do nothing, leave result_list unconverted - warn(f"WARNING: '{unit}' not converted") - elif errors=='ignore': - # convert to NaN - result_list = [nan for val in result_list] - warn(f"WARNING: '{unit}' converted to NaN") - else: - # errors=='raise', or anything else just in case - raise exception - # Re-index and add series to list - lst_series.append(pandas.Series(result_list, index=f_quant_series.index)) - return pandas.concat(lst_series).sort_index() - - -def add_qa_flag(df_in, mask, flag): - """Add flag to 'QA_flag' column in df_in. - - Parameters - ---------- - df_in : pandas.DataFrame - DataFrame that will be updated. - mask : pandas.Series - Row conditional mask to limit rows. - flag : str - Text to populate the new flag with. - - Returns - ------- - df_out : pandas.DataFrame - Updated copy of df_in. - - Examples - -------- - Build pandas DataFrame to use as input: - - >>> from pandas import DataFrame - >>> df = DataFrame({'CharacteristicName': ['Carbon', 'Phosphorus', 'Carbon',], - ... 'ResultMeasureValue': ['1.0', '0.265', '2.1'],}) - >>> df - CharacteristicName ResultMeasureValue - 0 Carbon 1.0 - 1 Phosphorus 0.265 - 2 Carbon 2.1 - - Assign simple flag string and mask to assign flag only to Carbon: - - >>> flag = 'words' - >>> mask = df['CharacteristicName']=='Carbon' - - >>> from harmonize_wq import harmonize - >>> harmonize.add_qa_flag(df, mask, flag) - CharacteristicName ResultMeasureValue QA_flag - 0 Carbon 1.0 words - 1 Phosphorus 0.265 NaN - 2 Carbon 2.1 words - """ - df_out = df_in.copy() - if 'QA_flag' not in list(df_out.columns): - df_out['QA_flag'] = nan - - # Append flag where QA_flag is not nan - cond_notna = mask & (df_out['QA_flag'].notna()) # Mask cond and not NA - existing_flags = df_out.loc[cond_notna, 'QA_flag'] # Current QA flags - df_out.loc[cond_notna, 'QA_flag'] = [f'{txt}; {flag}' for - txt in existing_flags] - # Equals flag where QA_flag is nan - df_out.loc[mask & (df_out['QA_flag'].isna()), 'QA_flag'] = flag - - return df_out - - -def units_dimension(series_in, units, ureg=None): - """List unique units not in desired units dimension. - - Parameters - ---------- - series_in : pandas.Series - Series of units. - units : str - Desired units. - ureg : pint.UnitRegistry, optional - Unit Registry Object with any custom units defined. - The default is None. - - Returns - ------- - dim_list : list - List of units with mismatched dimensions. - - Examples - -------- - Build series to use as input: - - >>> from pandas import Series - >>> unit_series = Series(['mg/l', 'mg/ml', 'g/kg']) - >>> unit_series - 0 mg/l - 1 mg/ml - 2 g/kg - dtype: object - - Get list of unique units not in desired units dimension 'mg/l': - - >>> from harmonize_wq import harmonize - >>> harmonize.units_dimension(unit_series, units='mg/l') - ['g/kg'] - """ - if ureg is None: - ureg = pint.UnitRegistry() - dim_list = [] # List for units with mismatched dimensions - dimension = ureg(units).dimensionality # units dimension - # Loop over list of unique units - for unit in list(set(series_in)): - q_ = ureg(unit) - if not q_.check(dimension): - dim_list.append(unit) - return dim_list +from harmonize_wq.domains import OUT_UNITS, UNITS_REPLACE +from harmonize_wq.visualize import print_report def dissolved_oxygen(wqp): @@ -597,7 +352,7 @@ def harmonize(df_in, char_val, units_out=None, errors='raise', if units_out: wqp.update_units(units_out) else: - units_out = domains.OUT_UNITS[out_col] + units_out = OUT_UNITS[out_col] # Update local units registry to define characteristic specific units wqp.update_ureg() # This is done based on out_col/char_val @@ -615,7 +370,7 @@ def harmonize(df_in, char_val, units_out=None, errors='raise', # Replace known special character in unit ('#' count assumed as CFU) wqp.replace_unit_str('#', 'CFU') # Replace known unit problems (e.g., assume CFU/MPN is /100ml) - wqp.replace_unit_by_dict(domains.UNITS_REPLACE[out_col]) + wqp.replace_unit_by_dict(UNITS_REPLACE[out_col]) #TODO: figure out why the above must be done before replace_unit_str # Replace all instances in results column wqp.replace_unit_str('/100ml', '/(100ml)') @@ -666,20 +421,21 @@ def harmonize(df_in, char_val, units_out=None, errors='raise', else: frac_dict = 'TADA' frac_dict = wqp.fraction(frac_dict) # Run sample fraction on WQP - + df_out = wqp.df # TODO: add activities/detection limits and filter on quality? e.g., cols: # 'ResultStatusIdentifier' = ['Historical', 'Accepted', 'Final'] # 'ResultValueTypeName' = ['Actual', 'Estimated', 'Calculated'] - # 'ResultDetectionConditionText' = ['*Non-detect', '*Present >> from pandas import Series + >>> unit_series = Series(['mg/l', 'mg/ml', 'g/kg']) + >>> unit_series + 0 mg/l + 1 mg/ml + 2 g/kg + dtype: object + + Get list of unique units not in desired units dimension 'mg/l': + + >>> from harmonize_wq import wq_data + >>> wq_data.units_dimension(unit_series, units='mg/l') + ['g/kg'] + """ + #TODO: this should be a method + if ureg is None: + ureg = pint.UnitRegistry() + dim_list = [] # List for units with mismatched dimensions + dimension = ureg(units).dimensionality # units dimension + # Loop over list of unique units + for unit in list(set(series_in)): + q_ = ureg(unit) + if not q_.check(dimension): + dim_list.append(unit) + return dim_list + class WQCharData(): """Class for specific characteristic in Water Quality Portal results. @@ -71,7 +121,7 @@ class WQCharData(): def __init__(self, df_in, char_val): df_out = df_in.copy() # self.check_df(df) - harmonize.df_checks(df_out) + df_checks(df_out) c_mask = df_out['CharacteristicName'] == char_val self.c_mask = c_mask # Deal with units: set out = in @@ -112,7 +162,7 @@ def _coerce_measure(self): flag = f'{meas_col}: "{bad_meas}" result cannot be used' cond = c_mask & (df_out[meas_col] == bad_meas) # Flag bad measures - df_out = harmonize.add_qa_flag(df_out, cond, flag) + df_out = add_qa_flag(df_out, cond, flag) df_out[self.out_col] = meas_s # Return coerced results self.df = df_out @@ -138,7 +188,7 @@ def _infer_units(self, flag_col=None): flag = self._unit_qa_flag('MISSING', flag_col) # Update mask for missing units units_mask = self.c_mask & self.df[self.col.unit_out].isna() - self.df = harmonize.add_qa_flag(self.df, units_mask, flag) # Assign flag + self.df = add_qa_flag(self.df, units_mask, flag) # Assign flag # Update with infered unit self.df.loc[units_mask, self.col.unit_out] = self.units # Note: .fillna(self.units) is slightly faster but hits datatype issues @@ -331,7 +381,7 @@ def check_units(self, flag_col=None): # New mask for bad units u_mask = self._unit_mask(unit) # Assign flag to bad units - df_out = harmonize.add_qa_flag(df_out, u_mask, flag) + df_out = add_qa_flag(df_out, u_mask, flag) df_out.loc[u_mask, self.col.unit_out] = self.units # Replace w/ default self.df = df_out @@ -399,7 +449,7 @@ def check_basis(self, basis_col='MethodSpecificationName'): c_mask = self.c_mask # Check for Method Specification column - harmonize.df_checks(self.df, [basis_col]) + df_checks(self.df, [basis_col]) # Basis from MethodSpecificationName if basis_col == 'MethodSpecificationName': @@ -578,7 +628,7 @@ def convert_units(self, default_unit=None, errors='raise'): 'units': self.units, 'ureg': self.ureg, 'errors': errors} - df_out.loc[m_mask, self.out_col] = harmonize.convert_unit_series(**params) + df_out.loc[m_mask, self.out_col] = convert_unit_series(**params) self.df = df_out def apply_conversion(self, convert_fun, unit, u_mask=None): @@ -684,10 +734,10 @@ def dimensions_list(self, m_mask=None): """ if m_mask is None: m_mask = self.measure_mask() - return harmonize.units_dimension(self.df.loc[m_mask, - self.col.unit_out], - self.units, - self.ureg) + return units_dimension(self.df.loc[m_mask, + self.col.unit_out], + self.units, + self.ureg) def replace_unit_str(self, old, new, mask=None): """Replace ALL instances of old with in WQCharData.col.unit_out column. @@ -887,7 +937,7 @@ def fraction(self, frac_dict=None, catch_all=None, suffix=None, 1 NaN 10.000000000000002 milligram / liter """ # Check for sample fraction column - harmonize.df_checks(self.df, [fract_col]) + df_checks(self.df, [fract_col]) c_mask = self.c_mask @@ -1026,7 +1076,7 @@ def dimension_fixes(self): basis_lst = list(set(self.df.loc[self.c_mask, self.col.basis])) for speciation in basis_lst: mol_params['basis'] = speciation - quant = str(convert.moles_to_mass(**mol_params)) + quant = str(moles_to_mass(**mol_params)) dim_tup = self._dimension_handling(unit, quant, self.ureg) diff --git a/harmonize_wq/wrangle.py b/harmonize_wq/wrangle.py index bead6bf..b84dced 100644 --- a/harmonize_wq/wrangle.py +++ b/harmonize_wq/wrangle.py @@ -3,8 +3,7 @@ import pandas import geopandas from harmonize_wq import domains -from harmonize_wq import harmonize -from harmonize_wq.clean import datetime, harmonize_depth +from harmonize_wq.clean import datetime, harmonize_depth, df_checks from dataretrieval import wqp @@ -16,7 +15,7 @@ def split_table(df_in): Notes ----- - Runs :func:`clean.datetime` and :func:`cleanharmonize_depth` if expected + Runs :func:`clean.datetime` and :func:`clean.harmonize_depth` if expected columns ('Activity_datetime' and 'Depth') are missing. Parameters @@ -365,7 +364,7 @@ def add_activities_to_df(df_in, mask=None): df_out = df_in.copy() # Check df for loc_field loc_col = 'MonitoringLocationIdentifier' - harmonize.df_checks(df_out, [loc_col]) + df_checks(df_out, [loc_col]) # List of unique sites and characteristicNames if mask: loc_list = list(set(df_out.loc[mask, loc_col].dropna())) @@ -435,7 +434,7 @@ def add_detection(df_in, char_val): # Check df for loc_field loc_col = 'MonitoringLocationIdentifier' res_id = 'ResultIdentifier' - harmonize.df_checks(df_out, [loc_col, res_id]) + df_checks(df_out, [loc_col, res_id]) c_mask = df_out['CharacteristicName'] == char_val # Mask to limit rows loc_series = df_out.loc[c_mask, loc_col] # Location Series res_series = df_out.loc[c_mask, res_id] # Location Series