Skip to content

Commit

Permalink
Adding skip for intake.
Browse files Browse the repository at this point in the history
  • Loading branch information
rhysrevans3 committed Apr 4, 2024
1 parent f05e3ea commit b48be58
Showing 1 changed file with 26 additions and 13 deletions.
39 changes: 26 additions & 13 deletions stac_generator/plugins/inputs/intake_esm.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,24 @@ def __init__(self, **kwargs):
self.uri = kwargs["uri"]

self.object_attr = kwargs["object_path_attr"]
self.skip = kwargs.get("skip", -1)

self.intake_kwargs = kwargs.get("catalog_kwargs", {})
self.search_kwargs = kwargs.get("search_kwargs")

def open_catalog(self):
"""Open the ESM catalog and perform a search, if required."""
LOGGER.info(f"Opening catalog {self.uri}")
catalog = intake.open_esm_datastore(self.uri, **self.intake_kwargs)
def open_catalog(self, uri, intake_kwargs):
"""Open the ESM catalog."""
LOGGER.info(f"Opening catalog {uri}")
catalog = intake.open_esm_datastore(uri, **intake_kwargs)

return catalog

def search_catalog(self, catalog, search_kwargs):
"""Perform a search ESM catalog."""
LOGGER.info(f"Searching catalog")

if self.search_kwargs:
catalog = catalog.search(**self.search_kwargs)
catalog = catalog.search(**search_kwargs)

LOGGER.info(f"Found {len(catalog.df)} items")
return catalog
Expand All @@ -92,17 +99,23 @@ def run(self, generator: BaseGenerator):
total_files = 0
start = datetime.now()

catalog = self.open_catalog()
catalog = self.open_catalog(self.uri, **self.intake_kwargs)

if self.search_kwargs:
catalog = self.open_catalog(catalog, **self.intake_kwargs)

count = 0
for _, row in catalog.df.iterrows():
uri = getattr(row, self.object_attr)
if count > self.skip:
uri = getattr(row, self.object_attr)

if self.should_process(uri):
generator.process(uri)
LOGGER.debug(f"Input processing: {uri}")
else:
LOGGER.debug(f"Input skipping: {uri}")
if self.should_process(uri):
generator.process(uri)
LOGGER.debug(f"Input processing: {uri}")
else:
LOGGER.debug(f"Input skipping: {uri}")

total_files += 1
total_files += 1

end = datetime.now()
print(f"Processed {total_files} files from {self.uri} in {end-start}")

0 comments on commit b48be58

Please sign in to comment.