From 8d07267c034332a697ac1280d147d727f910a473 Mon Sep 17 00:00:00 2001 From: Aaron Spring Date: Thu, 6 May 2021 00:43:10 +0200 Subject: [PATCH 01/13] Update cat.py --- intake_thredds/cat.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/intake_thredds/cat.py b/intake_thredds/cat.py index 126f8a8..03454a9 100644 --- a/intake_thredds/cat.py +++ b/intake_thredds/cat.py @@ -27,6 +27,10 @@ class ThreddsCatalog(Catalog): def __init__(self, url: str, driver: str = 'opendap', **kwargs): self.url = url self.driver = driver + if "decode_times" in kwargs: + self.xarray_kwargs = {"decode_times": kwargs["decode_times"]} + else: + self.xarray_kwargs = None super().__init__(**kwargs) def _load(self): From d4aefb90fb0c716e95b12c346237fdc5f8fb7365 Mon Sep 17 00:00:00 2001 From: Aaron Spring Date: Thu, 6 May 2021 00:47:16 +0200 Subject: [PATCH 02/13] Update source.py --- intake_thredds/source.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/intake_thredds/source.py b/intake_thredds/source.py index be3e868..04211e2 100644 --- a/intake_thredds/source.py +++ b/intake_thredds/source.py @@ -87,7 +87,10 @@ def _open_dataset(self): break path = self.path[i:] if self.progressbar: - data = [ds.to_dask() for ds in tqdm(_match(cat, path), desc='Dataset(s)', ncols=79)] + if self.xarray_kwargs: + data = [ds(**self.xarray_kwargs).to_dask() for ds in tqdm(_match(cat, path), desc='Dataset(s)', ncols=79)] + else: + data = [ds.to_dask() for ds in tqdm(_match(cat, path), desc='Dataset(s)', ncols=79)] else: data = [ds.to_dask() for ds in _match(cat, path)] self._ds = xr.combine_by_coords(data, combine_attrs='override') From 09f0019a9eafc8dc23796566bd8b69320fdbe67a Mon Sep 17 00:00:00 2001 From: Aaron Spring Date: Thu, 6 May 2021 00:48:46 +0200 Subject: [PATCH 03/13] Update cat.py --- intake_thredds/cat.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/intake_thredds/cat.py b/intake_thredds/cat.py index 03454a9..14aef7d 100644 --- a/intake_thredds/cat.py +++ b/intake_thredds/cat.py @@ -28,9 +28,9 @@ def __init__(self, url: str, driver: str = 'opendap', **kwargs): self.url = url self.driver = driver if "decode_times" in kwargs: - self.xarray_kwargs = {"decode_times": kwargs["decode_times"]} + self.xarray_kwargs = {"xarray_kwargs": {"decode_times": kwargs["decode_times"]}} else: - self.xarray_kwargs = None + self.xarray_kwargs = {} super().__init__(**kwargs) def _load(self): From c5e77ebde392c7f04636322ff55f54838f752410 Mon Sep 17 00:00:00 2001 From: Aaron Spring Date: Thu, 6 May 2021 00:51:36 +0200 Subject: [PATCH 04/13] Update source.py --- intake_thredds/source.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/intake_thredds/source.py b/intake_thredds/source.py index 04211e2..cf5f0a3 100644 --- a/intake_thredds/source.py +++ b/intake_thredds/source.py @@ -87,12 +87,9 @@ def _open_dataset(self): break path = self.path[i:] if self.progressbar: - if self.xarray_kwargs: - data = [ds(**self.xarray_kwargs).to_dask() for ds in tqdm(_match(cat, path), desc='Dataset(s)', ncols=79)] - else: - data = [ds.to_dask() for ds in tqdm(_match(cat, path), desc='Dataset(s)', ncols=79)] + data = [ds(**self.xarray_kwargs).to_dask() for ds in tqdm(_match(cat, path), desc='Dataset(s)', ncols=79)] else: - data = [ds.to_dask() for ds in _match(cat, path)] + data = [ds(**self.xarray_kwargs).to_dask() for ds in _match(cat, path)] self._ds = xr.combine_by_coords(data, combine_attrs='override') From ad84c4aec9bc3a105df1ea3217f6613b0c6bb7be Mon Sep 17 00:00:00 2001 From: Aaron Spring Date: Thu, 6 May 2021 00:56:13 +0200 Subject: [PATCH 05/13] Update test_cat.py --- tests/test_cat.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_cat.py b/tests/test_cat.py index ef8743a..734c81d 100644 --- a/tests/test_cat.py +++ b/tests/test_cat.py @@ -23,10 +23,11 @@ def test_ThreddsCatalog_init_catalog(thredds_cat_url): assert 'random_attribute' in cat.metadata +@pytest.mark.parametrize('decode_times', [True, False]) @pytest.mark.parametrize('driver', ['netcdf', 'opendap']) def test_ThreddsCatalog(thredds_cat_url, driver): """Test entry.to_dask() is xr.Dataset and allows opendap and netcdf as source.""" - cat = intake.open_thredds_cat(thredds_cat_url, driver=driver) + cat = intake.open_thredds_cat(thredds_cat_url, driver=driver, decode_times=decode_times) entry = cat['sst.mon.19712000.ltm.v3.nc'] if driver == 'opendap': assert isinstance(entry, intake_xarray.opendap.OpenDapSource) From 977cf2ab5aa1d875a9cf9e2ee61701e9ad29f7ed Mon Sep 17 00:00:00 2001 From: Aaron Spring Date: Thu, 6 May 2021 01:00:50 +0200 Subject: [PATCH 06/13] Update source.py --- intake_thredds/source.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/intake_thredds/source.py b/intake_thredds/source.py index cf5f0a3..709e619 100644 --- a/intake_thredds/source.py +++ b/intake_thredds/source.py @@ -57,7 +57,7 @@ class THREDDSMergedSource(DataSourceMixin): name = 'thredds_merged' partition_access = True - def __init__(self, url, path, driver='opendap', progressbar=True, metadata=None): + def __init__(self, url, path, driver='opendap', progressbar=True, xarray_kwargs={}, metadata=None): super(THREDDSMergedSource, self).__init__(metadata=metadata) self.urlpath = url @@ -65,6 +65,7 @@ def __init__(self, url, path, driver='opendap', progressbar=True, metadata=None) self.metadata.update({'fsspec_pre_url': 'simplecache::'}) self.path = path self.driver = driver + self.xarray_kwargs = xarray_kwargs self._ds = None self.progressbar = progressbar if self.progressbar and tqdm is None: From 88b45bd017b331271da1b34de5c700d58166717d Mon Sep 17 00:00:00 2001 From: Aaron Spring Date: Thu, 6 May 2021 01:02:11 +0200 Subject: [PATCH 07/13] Update cat.py --- intake_thredds/cat.py | 1 + 1 file changed, 1 insertion(+) diff --git a/intake_thredds/cat.py b/intake_thredds/cat.py index 14aef7d..9f65f9d 100644 --- a/intake_thredds/cat.py +++ b/intake_thredds/cat.py @@ -29,6 +29,7 @@ def __init__(self, url: str, driver: str = 'opendap', **kwargs): self.driver = driver if "decode_times" in kwargs: self.xarray_kwargs = {"xarray_kwargs": {"decode_times": kwargs["decode_times"]}} + del xarray_kwargs["decode_times"] else: self.xarray_kwargs = {} super().__init__(**kwargs) From 58b88ebba3b30ea05f9b446dd2da15d9482c66f4 Mon Sep 17 00:00:00 2001 From: Aaron Spring Date: Thu, 6 May 2021 01:06:38 +0200 Subject: [PATCH 08/13] Update test_cat.py --- tests/test_cat.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_cat.py b/tests/test_cat.py index 734c81d..3a3ce2a 100644 --- a/tests/test_cat.py +++ b/tests/test_cat.py @@ -25,7 +25,7 @@ def test_ThreddsCatalog_init_catalog(thredds_cat_url): @pytest.mark.parametrize('decode_times', [True, False]) @pytest.mark.parametrize('driver', ['netcdf', 'opendap']) -def test_ThreddsCatalog(thredds_cat_url, driver): +def test_ThreddsCatalog(thredds_cat_url, driver, decode_times): """Test entry.to_dask() is xr.Dataset and allows opendap and netcdf as source.""" cat = intake.open_thredds_cat(thredds_cat_url, driver=driver, decode_times=decode_times) entry = cat['sst.mon.19712000.ltm.v3.nc'] From 0795a0b6d1021a47d6259200b8c6a5619a6e985b Mon Sep 17 00:00:00 2001 From: Aaron Spring Date: Thu, 6 May 2021 14:16:13 +0200 Subject: [PATCH 09/13] Update cat.py --- intake_thredds/cat.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/intake_thredds/cat.py b/intake_thredds/cat.py index 9f65f9d..a2025ab 100644 --- a/intake_thredds/cat.py +++ b/intake_thredds/cat.py @@ -29,7 +29,7 @@ def __init__(self, url: str, driver: str = 'opendap', **kwargs): self.driver = driver if "decode_times" in kwargs: self.xarray_kwargs = {"xarray_kwargs": {"decode_times": kwargs["decode_times"]}} - del xarray_kwargs["decode_times"] + del kwargs["decode_times"] else: self.xarray_kwargs = {} super().__init__(**kwargs) From 5125f9f0066622600dbed12020a43412078ca4a1 Mon Sep 17 00:00:00 2001 From: AS Date: Thu, 6 May 2021 16:40:42 +0200 Subject: [PATCH 10/13] fix --- CHANGELOG.md | 6 ++++++ intake_thredds/cat.py | 5 ----- intake_thredds/source.py | 11 ++++++++--- tests/test_cat.py | 5 +++++ 4 files changed, 19 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ff1e01f..ac24e95 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## Intake-thredds v2021.x.xx + +### Features + +- Allow `xarray_kwargs` for :py:class:`~intake_thredds.THREDDSMergedSource` [#32](https://github.com/NCAR/intake-thredds/pull/32) ([@aaronspring](https://github.com/aaronspring)) + ## Intake-thredds v2021.2.17 ([full changelog](https://github.com/NCAR/intake-thredds/compare/792fdc08e7fbbf66455fe554ca9a0f1f8a14ae32...ccb3c469a07cc7adf058ce0f8ba41197ebc5b7c7)) diff --git a/intake_thredds/cat.py b/intake_thredds/cat.py index a2025ab..126f8a8 100644 --- a/intake_thredds/cat.py +++ b/intake_thredds/cat.py @@ -27,11 +27,6 @@ class ThreddsCatalog(Catalog): def __init__(self, url: str, driver: str = 'opendap', **kwargs): self.url = url self.driver = driver - if "decode_times" in kwargs: - self.xarray_kwargs = {"xarray_kwargs": {"decode_times": kwargs["decode_times"]}} - del kwargs["decode_times"] - else: - self.xarray_kwargs = {} super().__init__(**kwargs) def _load(self): diff --git a/intake_thredds/source.py b/intake_thredds/source.py index 709e619..cf0c090 100644 --- a/intake_thredds/source.py +++ b/intake_thredds/source.py @@ -57,7 +57,9 @@ class THREDDSMergedSource(DataSourceMixin): name = 'thredds_merged' partition_access = True - def __init__(self, url, path, driver='opendap', progressbar=True, xarray_kwargs={}, metadata=None): + def __init__( + self, url, path, driver='opendap', progressbar=True, xarray_kwargs={}, metadata=None + ): super(THREDDSMergedSource, self).__init__(metadata=metadata) self.urlpath = url @@ -88,9 +90,12 @@ def _open_dataset(self): break path = self.path[i:] if self.progressbar: - data = [ds(**self.xarray_kwargs).to_dask() for ds in tqdm(_match(cat, path), desc='Dataset(s)', ncols=79)] + data = [ + ds(xarray_kwargs=self.xarray_kwargs).to_dask() + for ds in tqdm(_match(cat, path), desc='Dataset(s)', ncols=79) + ] else: - data = [ds(**self.xarray_kwargs).to_dask() for ds in _match(cat, path)] + data = [ds(xarray_kwargs=self.xarray_kwargs).to_dask() for ds in _match(cat, path)] self._ds = xr.combine_by_coords(data, combine_attrs='override') diff --git a/tests/test_cat.py b/tests/test_cat.py index 3a3ce2a..60b8d54 100644 --- a/tests/test_cat.py +++ b/tests/test_cat.py @@ -47,6 +47,11 @@ def test_ThreddsCatalog(thredds_cat_url, driver, decode_times): ) ds = entry(chunks={}).to_dask() assert isinstance(ds, xr.Dataset) + # check xarray_kwargs + if decode_times: + assert 'units' not in ds.time.attrs + else: + assert 'units' in ds.time.attrs def test_ThreddsCatalog_simplecache_netcdf(thredds_cat_url): From 6e62019de3e9d6e3120c3684efbcd2b96054d5a3 Mon Sep 17 00:00:00 2001 From: AS Date: Thu, 6 May 2021 16:56:40 +0200 Subject: [PATCH 11/13] fix tests --- tests/test_cat.py | 10 ++-------- tests/test_source.py | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/tests/test_cat.py b/tests/test_cat.py index 60b8d54..ef8743a 100644 --- a/tests/test_cat.py +++ b/tests/test_cat.py @@ -23,11 +23,10 @@ def test_ThreddsCatalog_init_catalog(thredds_cat_url): assert 'random_attribute' in cat.metadata -@pytest.mark.parametrize('decode_times', [True, False]) @pytest.mark.parametrize('driver', ['netcdf', 'opendap']) -def test_ThreddsCatalog(thredds_cat_url, driver, decode_times): +def test_ThreddsCatalog(thredds_cat_url, driver): """Test entry.to_dask() is xr.Dataset and allows opendap and netcdf as source.""" - cat = intake.open_thredds_cat(thredds_cat_url, driver=driver, decode_times=decode_times) + cat = intake.open_thredds_cat(thredds_cat_url, driver=driver) entry = cat['sst.mon.19712000.ltm.v3.nc'] if driver == 'opendap': assert isinstance(entry, intake_xarray.opendap.OpenDapSource) @@ -47,11 +46,6 @@ def test_ThreddsCatalog(thredds_cat_url, driver, decode_times): ) ds = entry(chunks={}).to_dask() assert isinstance(ds, xr.Dataset) - # check xarray_kwargs - if decode_times: - assert 'units' not in ds.time.attrs - else: - assert 'units' in ds.time.attrs def test_ThreddsCatalog_simplecache_netcdf(thredds_cat_url): diff --git a/tests/test_source.py b/tests/test_source.py index 9ba48cb..f7d9017 100644 --- a/tests/test_source.py +++ b/tests/test_source.py @@ -97,3 +97,19 @@ def test_THREDDSMergedSource_simplecache_fails_opendap(THREDDSMergedSource_cat_s intake.open_thredds_cat( f'simplecache::{THREDDSMergedSource_cat_short_url}', driver='opendap' ) + + +@pytest.mark.parametrize('driver', ['netcdf', 'opendap']) +@pytest.mark.parametrize('decode_times', [True, False]) +def test_THREDDSMergedSource_xarray_kwargs(THREDDSMergedSource_cat_short_url, driver, decode_times): + """Test xarray_kwargs.""" + ds = intake.open_thredds_cat( + THREDDSMergedSource_cat_short_url, + driver='opendap', + xarray_kwargs={'decode_times': decode_times}, + ).to_dask() + # check xarray_kwargs + if decode_times: + assert 'units' not in ds.time.attrs + else: + assert 'units' in ds.time.attrs From 15f8f9401991f0235a2e3065370b0e4fbba924a6 Mon Sep 17 00:00:00 2001 From: AS Date: Thu, 6 May 2021 17:17:34 +0200 Subject: [PATCH 12/13] fix tests, fail for netcdfsource still --- intake_thredds/source.py | 4 ++-- tests/test_source.py | 12 +++++++++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/intake_thredds/source.py b/intake_thredds/source.py index cf0c090..bfa5aad 100644 --- a/intake_thredds/source.py +++ b/intake_thredds/source.py @@ -91,11 +91,11 @@ def _open_dataset(self): path = self.path[i:] if self.progressbar: data = [ - ds(xarray_kwargs=self.xarray_kwargs).to_dask() + ds(**self.xarray_kwargs).to_dask() for ds in tqdm(_match(cat, path), desc='Dataset(s)', ncols=79) ] else: - data = [ds(xarray_kwargs=self.xarray_kwargs).to_dask() for ds in _match(cat, path)] + data = [ds(**self.xarray_kwargs).to_dask() for ds in _match(cat, path)] self._ds = xr.combine_by_coords(data, combine_attrs='override') diff --git a/tests/test_source.py b/tests/test_source.py index f7d9017..ac9f07b 100644 --- a/tests/test_source.py +++ b/tests/test_source.py @@ -103,9 +103,15 @@ def test_THREDDSMergedSource_simplecache_fails_opendap(THREDDSMergedSource_cat_s @pytest.mark.parametrize('decode_times', [True, False]) def test_THREDDSMergedSource_xarray_kwargs(THREDDSMergedSource_cat_short_url, driver, decode_times): """Test xarray_kwargs.""" - ds = intake.open_thredds_cat( - THREDDSMergedSource_cat_short_url, - driver='opendap', + ds = intake.open_thredds_merged( + 'https://psl.noaa.gov/thredds/catalog.xml', + [ + 'Datasets', + 'ncep.reanalysis.dailyavgs', + 'surface', + 'air.sig995.194*.nc', + ], + driver=driver, xarray_kwargs={'decode_times': decode_times}, ).to_dask() # check xarray_kwargs From efcbaf0a6fe710fdb2f621d47417d22f68482448 Mon Sep 17 00:00:00 2001 From: AS Date: Thu, 6 May 2021 17:22:47 +0200 Subject: [PATCH 13/13] working --- intake_thredds/source.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/intake_thredds/source.py b/intake_thredds/source.py index bfa5aad..cf0c090 100644 --- a/intake_thredds/source.py +++ b/intake_thredds/source.py @@ -91,11 +91,11 @@ def _open_dataset(self): path = self.path[i:] if self.progressbar: data = [ - ds(**self.xarray_kwargs).to_dask() + ds(xarray_kwargs=self.xarray_kwargs).to_dask() for ds in tqdm(_match(cat, path), desc='Dataset(s)', ncols=79) ] else: - data = [ds(**self.xarray_kwargs).to_dask() for ds in _match(cat, path)] + data = [ds(xarray_kwargs=self.xarray_kwargs).to_dask() for ds in _match(cat, path)] self._ds = xr.combine_by_coords(data, combine_attrs='override')