From 9b1381d03c0d34a8001f9702a1537674338c23c2 Mon Sep 17 00:00:00 2001 From: Mike Kittridge Date: Sun, 27 Nov 2022 18:34:21 +1300 Subject: [PATCH] added queries by mod date --- conda/meta.yaml | 4 +-- setup.py | 4 +-- tethysts/main.py | 4 +-- tethysts/tests/utest_tethysts.py | 14 ++++++++--- tethysts/utils.py | 42 ++++++++++++++++++++++++++------ 5 files changed, 52 insertions(+), 16 deletions(-) diff --git a/conda/meta.yaml b/conda/meta.yaml index eb79659..1422e3d 100644 --- a/conda/meta.yaml +++ b/conda/meta.yaml @@ -1,5 +1,5 @@ {% set name = "tethysts" %} -{% set version = "4.5.7" %} +{% set version = "4.5.8" %} # {% set sha256 = "ae2cc83fb5a75e8dc3e1b2c2137deea412c8a4c7c9acca52bf4ec59de52a80c9" %} # sha256 is the prefered checksum -- you can get it for a file with: @@ -44,7 +44,7 @@ requirements: - requests - shapely - tethys-data-models >=0.4.11 - - hdf5tools >=0.1.4 + - hdf5tools >=0.1.10 - s3tethys >=0.0.4 test: diff --git a/setup.py b/setup.py index 973a4f7..65c2be9 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ name = 'tethysts' main_package = 'tethysts' datasets = 'datasets/time_series' -version = '4.5.7' +version = '4.5.8' descrip = 'tethys time series S3 extraction' # The below code is for readthedocs. To have sphinx/readthedocs interact with @@ -19,7 +19,7 @@ if os.environ.get('READTHEDOCS', False) == 'True': INSTALL_REQUIRES = [] else: - INSTALL_REQUIRES = ['zstandard', 'pandas', 'xarray', 'scipy', 'orjson', 'requests', 'shapely', 'tethys-data-models>=0.4.11', 'hdf5tools>=0.1.4', 's3tethys>=0.0.4'] + INSTALL_REQUIRES = ['zstandard', 'pandas', 'xarray', 'scipy', 'orjson', 'requests', 'shapely', 'tethys-data-models>=0.4.11', 'hdf5tools>=0.1.10', 's3tethys>=0.0.4'] # Get the long description from the README file with open(os.path.join(here, 'README.rst'), encoding='utf-8') as f: diff --git a/tethysts/main.py b/tethysts/main.py index ea35333..0e75723 100644 --- a/tethysts/main.py +++ b/tethysts/main.py @@ -521,7 +521,7 @@ def get_results(self, ## Get results chunks rc_list = self._get_results_chunks(dataset_id, vd) - chunks = utils.chunk_filters(rc_list, stn_ids, time_interval, from_date, to_date, heights, bands) + chunks = utils.chunk_filters(rc_list, stn_ids, time_interval, from_date, to_date, heights, bands, from_mod_date, to_mod_date) if chunks: @@ -550,7 +550,7 @@ def get_results(self, xr.backends.file_manager.FILE_CACHE.clear() ## combine results - xr3 = utils.results_concat(results_list, output_path=output_path, from_date=from_date, to_date=to_date, compression=compression) + xr3 = utils.results_concat(results_list, output_path=output_path, from_date=from_date, to_date=to_date, from_mod_date=from_mod_date, to_mod_date=to_mod_date, compression=compression) ## Convert to new version attrs = xr3.attrs.copy() diff --git a/tethysts/tests/utest_tethysts.py b/tethysts/tests/utest_tethysts.py index 62ebb34..1e86351 100644 --- a/tethysts/tests/utest_tethysts.py +++ b/tethysts/tests/utest_tethysts.py @@ -215,7 +215,8 @@ bands: int = None squeeze_dims: bool = False threads: int = 30 - +output_path = None +compression='zstd' remote = {'bucket': 'nz-open-modelling-consortium', 'public_url': 'https://b2.nzrivers.xyz/file/', 'version': 4} remote = {'bucket': 'fire-emergency-nz', 'public_url': 'https://b2.tethys-ts.xyz/file/', 'version': 4} @@ -235,6 +236,7 @@ remote = {'bucket': 'noaa-nwm', 'public_url': 'https://b2.tethys-ts.xyz/file', 'version': 4} remote = {'bucket': 'jaxa-data', 'public_url': 'https://b2.tethys-ts.xyz/file', 'version': 4} remote = {'bucket': 'linz-data', 'public_url': 'https://b2.tethys-ts.xyz/file', 'version': 4} +remote = {'bucket': 'mdc-env', 'public_url': 'https://b2.tethys-ts.xyz/file', 'version': 4} cache = '/media/nvme1/cache/tethys' # cache = '/home/mike/cache/tethys' @@ -287,6 +289,9 @@ dataset_id = '980177538186a9bb9c9a0672' dataset_id = '54374801c0311a98a0f8e5ef' +dataset_id = 'ef79659d4175c6d70d748b5e' +station_ids = 'b8db7f021cf6e422d6d85881' + self = Tethys([remote], cache=cache) self = Tethys([remote]) self = Tethys() @@ -295,7 +300,7 @@ stns1 = self.get_stations(dataset_id) stns1 = self.get_stations(dataset_id, version_date=version_date) -station_ids = [s['station_id'] for s in stns1[:1]] +station_ids = [s['station_id'] for s in stns1[:2]] station_ids = [s['station_id'] for s in stns1 if ref in s['ref']] results1 = self.get_results(dataset_id, station_ids, heights=None) @@ -310,6 +315,9 @@ results1 = self.get_results(dataset_id, station_ids, heights=None, from_date='2015-04-01') +results1 = self.get_results(dataset_id, station_ids, heights=None, from_mod_date='2022-10-01') +results1 = self.get_results(dataset_id, station_ids, heights=None, to_mod_date='2022-10-01') + for d in self.datasets: rv1 = self.get_versions(d['dataset_id']) @@ -371,7 +379,7 @@ - +stn_ids = {s['station_id']: pd.Timestamp(s['time_range']['to_date']) - pd.Timestamp(s['time_range']['from_date']) for s in stns1} diff --git a/tethysts/utils.py b/tethysts/utils.py index 910075d..45c4bd9 100644 --- a/tethysts/utils.py +++ b/tethysts/utils.py @@ -433,30 +433,45 @@ def read_json_zstd(obj): # return file_obj -def chunk_filters(results_chunks, stn_ids, time_interval=None, from_date=None, to_date=None, heights=None, bands=None): +def chunk_filters(results_chunks, stn_ids, time_interval=None, from_date=None, to_date=None, heights=None, bands=None, from_mod_date=None, to_mod_date=None): """ """ ## Stations filter rc2 = copy.deepcopy([rc for rc in results_chunks if rc['station_id'] in stn_ids]) + first_one = rc2[0] - ## Temporal filter - if isinstance(from_date, (str, pd.Timestamp, datetime)) and ('chunk_day' in rc2[0]): + ## Temporal filters + if isinstance(from_date, (str, pd.Timestamp, datetime)) and ('chunk_day' in first_one): from_date1 = int(pd.Timestamp(from_date).timestamp()/60/60/24) rc2 = [rc for rc in rc2 if (rc['chunk_day'] + time_interval) >= from_date1] if len(rc2) == 0: return rc2 - if isinstance(to_date, (str, pd.Timestamp, datetime)) and ('chunk_day' in rc2[0]): + if isinstance(to_date, (str, pd.Timestamp, datetime)) and ('chunk_day' in first_one): to_date1 = int(pd.Timestamp(to_date).timestamp()/60/60/24) rc2 = [rc for rc in rc2 if rc['chunk_day'] <= to_date1] if len(rc2) == 0: return rc2 + if isinstance(from_mod_date, (str, pd.Timestamp, datetime)) and ('modified_date' in first_one): + from_mod_date1 = pd.Timestamp(from_mod_date) + rc2 = [rc for rc in rc2 if pd.Timestamp(rc['modified_date']) >= from_mod_date1] + + if len(rc2) == 0: + return rc2 + + if isinstance(to_mod_date, (str, pd.Timestamp, datetime)) and ('modified_date' in first_one): + to_mod_date1 = pd.Timestamp(to_mod_date) + rc2 = [rc for rc in rc2 if pd.Timestamp(rc['modified_date']) <= to_mod_date1] + + if len(rc2) == 0: + return rc2 + ## Heights and bands filter - if (heights is not None) and ('height' in rc2[0]): + if (heights is not None) and ('height' in first_one): if isinstance(heights, (int, float)): h1 = [int(heights*1000)] elif isinstance(heights, list): @@ -468,7 +483,7 @@ def chunk_filters(results_chunks, stn_ids, time_interval=None, from_date=None, t if len(rc2) == 0: return rc2 - if (bands is not None) and ('band' in rc2[0]): + if (bands is not None) and ('band' in first_one): if isinstance(bands, int): b1 = [heights] elif isinstance(bands, list): @@ -841,7 +856,7 @@ def xr_concat(datasets: List[xr.Dataset]): return xr3 -def results_concat(results_list, output_path=None, from_date=None, to_date=None, compression='lzf'): +def results_concat(results_list, output_path=None, from_date=None, to_date=None, from_mod_date=None, to_mod_date=None, compression='lzf'): """ """ @@ -855,6 +870,19 @@ def results_concat(results_list, output_path=None, from_date=None, to_date=None, xr3 = xr.open_dataset(output_path, engine='h5netcdf', cache=False) + ## Deal with mod dates filters + if ((from_mod_date is not None) or (to_mod_date is not None)) and ('modified_date' in xr3): + mod_dates = xr3['modified_date'].copy().load() + + if (from_mod_date is not None) and (to_mod_date is not None): + mod_bool = (mod_dates >= pd.Timestamp(from_mod_date)) & (mod_dates <= pd.Timestamp(to_mod_date)) + elif (from_mod_date is not None): + mod_bool = (mod_dates >= pd.Timestamp(from_mod_date)) + elif (to_mod_date is not None): + mod_bool = (mod_dates <= pd.Timestamp(to_mod_date)) + + xr3 = xr3.where(mod_bool, drop=True) + return xr3