Skip to content

Commit

Permalink
added queries by mod date
Browse files Browse the repository at this point in the history
  • Loading branch information
mullenkamp committed Nov 27, 2022
1 parent 9b37e9a commit 9b1381d
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 16 deletions.
4 changes: 2 additions & 2 deletions conda/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% set name = "tethysts" %}
{% set version = "4.5.7" %}
{% set version = "4.5.8" %}
# {% set sha256 = "ae2cc83fb5a75e8dc3e1b2c2137deea412c8a4c7c9acca52bf4ec59de52a80c9" %}

# sha256 is the prefered checksum -- you can get it for a file with:
Expand Down Expand Up @@ -44,7 +44,7 @@ requirements:
- requests
- shapely
- tethys-data-models >=0.4.11
- hdf5tools >=0.1.4
- hdf5tools >=0.1.10
- s3tethys >=0.0.4

test:
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
name = 'tethysts'
main_package = 'tethysts'
datasets = 'datasets/time_series'
version = '4.5.7'
version = '4.5.8'
descrip = 'tethys time series S3 extraction'

# The below code is for readthedocs. To have sphinx/readthedocs interact with
Expand All @@ -19,7 +19,7 @@
if os.environ.get('READTHEDOCS', False) == 'True':
INSTALL_REQUIRES = []
else:
INSTALL_REQUIRES = ['zstandard', 'pandas', 'xarray', 'scipy', 'orjson', 'requests', 'shapely', 'tethys-data-models>=0.4.11', 'hdf5tools>=0.1.4', 's3tethys>=0.0.4']
INSTALL_REQUIRES = ['zstandard', 'pandas', 'xarray', 'scipy', 'orjson', 'requests', 'shapely', 'tethys-data-models>=0.4.11', 'hdf5tools>=0.1.10', 's3tethys>=0.0.4']

# Get the long description from the README file
with open(os.path.join(here, 'README.rst'), encoding='utf-8') as f:
Expand Down
4 changes: 2 additions & 2 deletions tethysts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ def get_results(self,
## Get results chunks
rc_list = self._get_results_chunks(dataset_id, vd)

chunks = utils.chunk_filters(rc_list, stn_ids, time_interval, from_date, to_date, heights, bands)
chunks = utils.chunk_filters(rc_list, stn_ids, time_interval, from_date, to_date, heights, bands, from_mod_date, to_mod_date)

if chunks:

Expand Down Expand Up @@ -550,7 +550,7 @@ def get_results(self,
xr.backends.file_manager.FILE_CACHE.clear()

## combine results
xr3 = utils.results_concat(results_list, output_path=output_path, from_date=from_date, to_date=to_date, compression=compression)
xr3 = utils.results_concat(results_list, output_path=output_path, from_date=from_date, to_date=to_date, from_mod_date=from_mod_date, to_mod_date=to_mod_date, compression=compression)

## Convert to new version
attrs = xr3.attrs.copy()
Expand Down
14 changes: 11 additions & 3 deletions tethysts/tests/utest_tethysts.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@
bands: int = None
squeeze_dims: bool = False
threads: int = 30

output_path = None
compression='zstd'

remote = {'bucket': 'nz-open-modelling-consortium', 'public_url': 'https://b2.nzrivers.xyz/file/', 'version': 4}
remote = {'bucket': 'fire-emergency-nz', 'public_url': 'https://b2.tethys-ts.xyz/file/', 'version': 4}
Expand All @@ -235,6 +236,7 @@
remote = {'bucket': 'noaa-nwm', 'public_url': 'https://b2.tethys-ts.xyz/file', 'version': 4}
remote = {'bucket': 'jaxa-data', 'public_url': 'https://b2.tethys-ts.xyz/file', 'version': 4}
remote = {'bucket': 'linz-data', 'public_url': 'https://b2.tethys-ts.xyz/file', 'version': 4}
remote = {'bucket': 'mdc-env', 'public_url': 'https://b2.tethys-ts.xyz/file', 'version': 4}

cache = '/media/nvme1/cache/tethys'
# cache = '/home/mike/cache/tethys'
Expand Down Expand Up @@ -287,6 +289,9 @@
dataset_id = '980177538186a9bb9c9a0672'
dataset_id = '54374801c0311a98a0f8e5ef'

dataset_id = 'ef79659d4175c6d70d748b5e'
station_ids = 'b8db7f021cf6e422d6d85881'

self = Tethys([remote], cache=cache)
self = Tethys([remote])
self = Tethys()
Expand All @@ -295,7 +300,7 @@
stns1 = self.get_stations(dataset_id)
stns1 = self.get_stations(dataset_id, version_date=version_date)

station_ids = [s['station_id'] for s in stns1[:1]]
station_ids = [s['station_id'] for s in stns1[:2]]
station_ids = [s['station_id'] for s in stns1 if ref in s['ref']]

results1 = self.get_results(dataset_id, station_ids, heights=None)
Expand All @@ -310,6 +315,9 @@

results1 = self.get_results(dataset_id, station_ids, heights=None, from_date='2015-04-01')

results1 = self.get_results(dataset_id, station_ids, heights=None, from_mod_date='2022-10-01')
results1 = self.get_results(dataset_id, station_ids, heights=None, to_mod_date='2022-10-01')


for d in self.datasets:
rv1 = self.get_versions(d['dataset_id'])
Expand Down Expand Up @@ -371,7 +379,7 @@




stn_ids = {s['station_id']: pd.Timestamp(s['time_range']['to_date']) - pd.Timestamp(s['time_range']['from_date']) for s in stns1}



Expand Down
42 changes: 35 additions & 7 deletions tethysts/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,30 +433,45 @@ def read_json_zstd(obj):
# return file_obj


def chunk_filters(results_chunks, stn_ids, time_interval=None, from_date=None, to_date=None, heights=None, bands=None):
def chunk_filters(results_chunks, stn_ids, time_interval=None, from_date=None, to_date=None, heights=None, bands=None, from_mod_date=None, to_mod_date=None):
"""
"""
## Stations filter
rc2 = copy.deepcopy([rc for rc in results_chunks if rc['station_id'] in stn_ids])
first_one = rc2[0]

## Temporal filter
if isinstance(from_date, (str, pd.Timestamp, datetime)) and ('chunk_day' in rc2[0]):
## Temporal filters
if isinstance(from_date, (str, pd.Timestamp, datetime)) and ('chunk_day' in first_one):
from_date1 = int(pd.Timestamp(from_date).timestamp()/60/60/24)
rc2 = [rc for rc in rc2 if (rc['chunk_day'] + time_interval) >= from_date1]

if len(rc2) == 0:
return rc2

if isinstance(to_date, (str, pd.Timestamp, datetime)) and ('chunk_day' in rc2[0]):
if isinstance(to_date, (str, pd.Timestamp, datetime)) and ('chunk_day' in first_one):
to_date1 = int(pd.Timestamp(to_date).timestamp()/60/60/24)
rc2 = [rc for rc in rc2 if rc['chunk_day'] <= to_date1]

if len(rc2) == 0:
return rc2

if isinstance(from_mod_date, (str, pd.Timestamp, datetime)) and ('modified_date' in first_one):
from_mod_date1 = pd.Timestamp(from_mod_date)
rc2 = [rc for rc in rc2 if pd.Timestamp(rc['modified_date']) >= from_mod_date1]

if len(rc2) == 0:
return rc2

if isinstance(to_mod_date, (str, pd.Timestamp, datetime)) and ('modified_date' in first_one):
to_mod_date1 = pd.Timestamp(to_mod_date)
rc2 = [rc for rc in rc2 if pd.Timestamp(rc['modified_date']) <= to_mod_date1]

if len(rc2) == 0:
return rc2

## Heights and bands filter
if (heights is not None) and ('height' in rc2[0]):
if (heights is not None) and ('height' in first_one):
if isinstance(heights, (int, float)):
h1 = [int(heights*1000)]
elif isinstance(heights, list):
Expand All @@ -468,7 +483,7 @@ def chunk_filters(results_chunks, stn_ids, time_interval=None, from_date=None, t
if len(rc2) == 0:
return rc2

if (bands is not None) and ('band' in rc2[0]):
if (bands is not None) and ('band' in first_one):
if isinstance(bands, int):
b1 = [heights]
elif isinstance(bands, list):
Expand Down Expand Up @@ -841,7 +856,7 @@ def xr_concat(datasets: List[xr.Dataset]):
return xr3


def results_concat(results_list, output_path=None, from_date=None, to_date=None, compression='lzf'):
def results_concat(results_list, output_path=None, from_date=None, to_date=None, from_mod_date=None, to_mod_date=None, compression='lzf'):
"""
"""
Expand All @@ -855,6 +870,19 @@ def results_concat(results_list, output_path=None, from_date=None, to_date=None,

xr3 = xr.open_dataset(output_path, engine='h5netcdf', cache=False)

## Deal with mod dates filters
if ((from_mod_date is not None) or (to_mod_date is not None)) and ('modified_date' in xr3):
mod_dates = xr3['modified_date'].copy().load()

if (from_mod_date is not None) and (to_mod_date is not None):
mod_bool = (mod_dates >= pd.Timestamp(from_mod_date)) & (mod_dates <= pd.Timestamp(to_mod_date))
elif (from_mod_date is not None):
mod_bool = (mod_dates >= pd.Timestamp(from_mod_date))
elif (to_mod_date is not None):
mod_bool = (mod_dates <= pd.Timestamp(to_mod_date))

xr3 = xr3.where(mod_bool, drop=True)

return xr3


Expand Down

0 comments on commit 9b1381d

Please sign in to comment.