Skip to content

Commit

Permalink
Merge pull request #79 from tethys-ts/dev
Browse files Browse the repository at this point in the history
added querying results by modified date
  • Loading branch information
mullenkamp authored Nov 27, 2022
2 parents a1c1ec7 + 9b1381d commit 3aba165
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 19 deletions.
4 changes: 2 additions & 2 deletions conda/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% set name = "tethysts" %}
{% set version = "4.5.6" %}
{% set version = "4.5.8" %}
# {% set sha256 = "ae2cc83fb5a75e8dc3e1b2c2137deea412c8a4c7c9acca52bf4ec59de52a80c9" %}

# sha256 is the prefered checksum -- you can get it for a file with:
Expand Down Expand Up @@ -44,7 +44,7 @@ requirements:
- requests
- shapely
- tethys-data-models >=0.4.11
- hdf5tools >=0.1.4
- hdf5tools >=0.1.10
- s3tethys >=0.0.4

test:
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
name = 'tethysts'
main_package = 'tethysts'
datasets = 'datasets/time_series'
version = '4.5.6'
version = '4.5.8'
descrip = 'tethys time series S3 extraction'

# The below code is for readthedocs. To have sphinx/readthedocs interact with
Expand All @@ -19,7 +19,7 @@
if os.environ.get('READTHEDOCS', False) == 'True':
INSTALL_REQUIRES = []
else:
INSTALL_REQUIRES = ['zstandard', 'pandas', 'xarray', 'scipy', 'orjson', 'requests', 'shapely', 'tethys-data-models>=0.4.11', 'hdf5tools>=0.1.4', 's3tethys>=0.0.4']
INSTALL_REQUIRES = ['zstandard', 'pandas', 'xarray', 'scipy', 'orjson', 'requests', 'shapely', 'tethys-data-models>=0.4.11', 'hdf5tools>=0.1.10', 's3tethys>=0.0.4']

# Get the long description from the README file
with open(os.path.join(here, 'README.rst'), encoding='utf-8') as f:
Expand Down
6 changes: 3 additions & 3 deletions tethysts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ def get_results(self,
## Get results chunks
rc_list = self._get_results_chunks(dataset_id, vd)

chunks = utils.chunk_filters(rc_list, stn_ids, time_interval, from_date, to_date, heights, bands)
chunks = utils.chunk_filters(rc_list, stn_ids, time_interval, from_date, to_date, heights, bands, from_mod_date, to_mod_date)

if chunks:

Expand Down Expand Up @@ -550,7 +550,7 @@ def get_results(self,
xr.backends.file_manager.FILE_CACHE.clear()

## combine results
xr3 = utils.results_concat(results_list, output_path=output_path, from_date=from_date, to_date=to_date, compression=compression)
xr3 = utils.results_concat(results_list, output_path=output_path, from_date=from_date, to_date=to_date, from_mod_date=from_mod_date, to_mod_date=to_mod_date, compression=compression)

## Convert to new version
attrs = xr3.attrs.copy()
Expand All @@ -565,7 +565,7 @@ def get_results(self,
xr3.attrs['version_date'] = pd.Timestamp(vd).tz_localize(None).isoformat()

if squeeze_dims:
xr3 = xr3.squeeze_dims()
xr3 = xr3.squeeze()

else:
xr3 = xr.Dataset()
Expand Down
20 changes: 17 additions & 3 deletions tethysts/tests/utest_tethysts.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from tethysts import Tethys
import xarray as xr
import yaml
import pandas as pd
import os
Expand Down Expand Up @@ -214,7 +215,8 @@
bands: int = None
squeeze_dims: bool = False
threads: int = 30

output_path = None
compression='zstd'

remote = {'bucket': 'nz-open-modelling-consortium', 'public_url': 'https://b2.nzrivers.xyz/file/', 'version': 4}
remote = {'bucket': 'fire-emergency-nz', 'public_url': 'https://b2.tethys-ts.xyz/file/', 'version': 4}
Expand All @@ -232,13 +234,16 @@
remote = {'bucket': 'met-solutions', 'public_url': 'https://b2.tethys-ts.xyz/file', 'version': 4}
remote = {'bucket': 'tasman-env', 'public_url': 'https://b2.tethys-ts.xyz/file', 'version': 4}
remote = {'bucket': 'noaa-nwm', 'public_url': 'https://b2.tethys-ts.xyz/file', 'version': 4}
remote = {'bucket': 'jaxa-data', 'public_url': 'https://b2.tethys-ts.xyz/file', 'version': 4}
remote = {'bucket': 'linz-data', 'public_url': 'https://b2.tethys-ts.xyz/file', 'version': 4}
remote = {'bucket': 'mdc-env', 'public_url': 'https://b2.tethys-ts.xyz/file', 'version': 4}

cache = '/media/nvme1/cache/tethys'
# cache = '/home/mike/cache/tethys'

dataset_id = '7751c5f1bf47867fb109d7eb'
dataset_id = '0b2bd62cc42f3096136f11e9'
# dataset_id = '0de7cbfe05aebc2272ceba17'
dataset_id = 'fb60aaa921d35a33727b53fe'
dataset_id = 'f16774ea29f024a306c7fc7a'
dataset_id = '9568f663d566aabb62a8e98e'

Expand All @@ -259,6 +264,7 @@
dataset_id = '0b2bd62cc42f3096136f11e9'
station_ids = 'c8db6013a9eb76705b5c80f2'
ref = 'ashley'
station_ids = '7eb2694917a2ece89e86e9b8'

dataset_id = 'b3d852cd72ac043c701493c4'

Expand All @@ -280,6 +286,11 @@
version_date = '2022-09-04T19:00:00'

dataset_id = '469b6a9ef620bce70fab5760'
dataset_id = '980177538186a9bb9c9a0672'
dataset_id = '54374801c0311a98a0f8e5ef'

dataset_id = 'ef79659d4175c6d70d748b5e'
station_ids = 'b8db7f021cf6e422d6d85881'

self = Tethys([remote], cache=cache)
self = Tethys([remote])
Expand All @@ -304,6 +315,9 @@

results1 = self.get_results(dataset_id, station_ids, heights=None, from_date='2015-04-01')

results1 = self.get_results(dataset_id, station_ids, heights=None, from_mod_date='2022-10-01')
results1 = self.get_results(dataset_id, station_ids, heights=None, to_mod_date='2022-10-01')


for d in self.datasets:
rv1 = self.get_versions(d['dataset_id'])
Expand Down Expand Up @@ -365,7 +379,7 @@




stn_ids = {s['station_id']: pd.Timestamp(s['time_range']['to_date']) - pd.Timestamp(s['time_range']['from_date']) for s in stns1}



Expand Down
47 changes: 38 additions & 9 deletions tethysts/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,30 +433,45 @@ def read_json_zstd(obj):
# return file_obj


def chunk_filters(results_chunks, stn_ids, time_interval=None, from_date=None, to_date=None, heights=None, bands=None):
def chunk_filters(results_chunks, stn_ids, time_interval=None, from_date=None, to_date=None, heights=None, bands=None, from_mod_date=None, to_mod_date=None):
"""
"""
## Stations filter
rc2 = copy.deepcopy([rc for rc in results_chunks if rc['station_id'] in stn_ids])
first_one = rc2[0]

## Temporal filter
if isinstance(from_date, (str, pd.Timestamp, datetime)) and ('chunk_day' in rc2[0]):
## Temporal filters
if isinstance(from_date, (str, pd.Timestamp, datetime)) and ('chunk_day' in first_one):
from_date1 = int(pd.Timestamp(from_date).timestamp()/60/60/24)
rc2 = [rc for rc in rc2 if (rc['chunk_day'] + time_interval) >= from_date1]

if len(rc2) == 0:
return rc2

if isinstance(to_date, (str, pd.Timestamp, datetime)) and ('chunk_day' in rc2[0]):
if isinstance(to_date, (str, pd.Timestamp, datetime)) and ('chunk_day' in first_one):
to_date1 = int(pd.Timestamp(to_date).timestamp()/60/60/24)
rc2 = [rc for rc in rc2 if rc['chunk_day'] <= to_date1]

if len(rc2) == 0:
return rc2

if isinstance(from_mod_date, (str, pd.Timestamp, datetime)) and ('modified_date' in first_one):
from_mod_date1 = pd.Timestamp(from_mod_date)
rc2 = [rc for rc in rc2 if pd.Timestamp(rc['modified_date']) >= from_mod_date1]

if len(rc2) == 0:
return rc2

if isinstance(to_mod_date, (str, pd.Timestamp, datetime)) and ('modified_date' in first_one):
to_mod_date1 = pd.Timestamp(to_mod_date)
rc2 = [rc for rc in rc2 if pd.Timestamp(rc['modified_date']) <= to_mod_date1]

if len(rc2) == 0:
return rc2

## Heights and bands filter
if (heights is not None) and ('height' in rc2[0]):
if (heights is not None) and ('height' in first_one):
if isinstance(heights, (int, float)):
h1 = [int(heights*1000)]
elif isinstance(heights, list):
Expand All @@ -468,7 +483,7 @@ def chunk_filters(results_chunks, stn_ids, time_interval=None, from_date=None, t
if len(rc2) == 0:
return rc2

if (bands is not None) and ('band' in rc2[0]):
if (bands is not None) and ('band' in first_one):
if isinstance(bands, int):
b1 = [heights]
elif isinstance(bands, list):
Expand Down Expand Up @@ -747,7 +762,7 @@ def download_results(chunk: dict, bucket: str, s3: botocore.client.BaseClient =

if chunk['key'].endswith('.zst'):
data = xr.load_dataset(s3tethys.decompress_stream_to_object(io.BytesIO(file_obj.read()), 'zstd'))
H5(data).sel(exclude_coords=['station_geometry', 'chunk_date']).to_hdf5(chunk_path)
H5(data).sel(exclude_coords=['station_geometry', 'chunk_date']).to_hdf5(chunk_path, compression='zstd')
data.close()
del data
else:
Expand All @@ -768,7 +783,7 @@ def download_results(chunk: dict, bucket: str, s3: botocore.client.BaseClient =
h1 = H5(data)
data_obj = io.BytesIO()
h1 = result_filters(h1)
h1.to_hdf5(data_obj)
h1.to_hdf5(data_obj, compression='zstd')

if isinstance(data, xr.Dataset):
data.close()
Expand Down Expand Up @@ -841,19 +856,33 @@ def xr_concat(datasets: List[xr.Dataset]):
return xr3


def results_concat(results_list, output_path=None, from_date=None, to_date=None, compression='lzf'):
def results_concat(results_list, output_path=None, from_date=None, to_date=None, from_mod_date=None, to_mod_date=None, compression='lzf'):
"""
"""
if output_path is None:
output_path = io.BytesIO()
compression = 'zstd'

h1 = H5(results_list)
h1 = result_filters(h1, from_date, to_date)
h1.to_hdf5(output_path, compression=compression)

xr3 = xr.open_dataset(output_path, engine='h5netcdf', cache=False)

## Deal with mod dates filters
if ((from_mod_date is not None) or (to_mod_date is not None)) and ('modified_date' in xr3):
mod_dates = xr3['modified_date'].copy().load()

if (from_mod_date is not None) and (to_mod_date is not None):
mod_bool = (mod_dates >= pd.Timestamp(from_mod_date)) & (mod_dates <= pd.Timestamp(to_mod_date))
elif (from_mod_date is not None):
mod_bool = (mod_dates >= pd.Timestamp(from_mod_date))
elif (to_mod_date is not None):
mod_bool = (mod_dates <= pd.Timestamp(to_mod_date))

xr3 = xr3.where(mod_bool, drop=True)

return xr3


Expand Down

0 comments on commit 3aba165

Please sign in to comment.