Skip to content

Commit

Permalink
Issue #115 allow AggregatorBatchJobs to get backend id for given coll…
Browse files Browse the repository at this point in the history
…ection id
  • Loading branch information
soxofaan committed Sep 5, 2023
1 parent b4fc95a commit a81f8fb
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
20 changes: 15 additions & 5 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def set_backends_for_collection(self, cid: str, backends: Iterable[str]):
self._data[cid]["backends"] = list(backends)

def get_backends_for_collection(self, cid: str) -> List[str]:
"""Get backend ids that provide given collection id."""
if cid not in self._data:
raise CollectionNotFoundException(collection_id=cid)
return self._data[cid]["backends"]
Expand Down Expand Up @@ -205,6 +206,11 @@ def evaluate(backend_id, pg):

return [functools.partial(evaluate, pg=pg) for pg in process_graphs]

def get_backends_for_collection(self, cid: str) -> List[str]:
"""Get backend ids that provide given collection id."""
metadata, internal = self._get_all_metadata_cached()
return internal.get_backends_for_collection(cid=cid)

def get_backend_candidates_for_collections(self, collections: Iterable[str]) -> List[str]:
"""
Get backend ids providing all given collections
Expand Down Expand Up @@ -568,13 +574,16 @@ def _process_load_ml_model(
class AggregatorBatchJobs(BatchJobs):

def __init__(
self,
backends: MultiBackendConnection,
processing: AggregatorProcessing,
partitioned_job_tracker: Optional[PartitionedJobTracker] = None,
self,
*,
backends: MultiBackendConnection,
catalog: AggregatorCollectionCatalog,
processing: AggregatorProcessing,
partitioned_job_tracker: Optional[PartitionedJobTracker] = None,
):
super(AggregatorBatchJobs, self).__init__()
self.backends = backends
self._catalog = catalog
self.processing = processing
self.partitioned_job_tracker = partitioned_job_tracker

Expand Down Expand Up @@ -1127,8 +1136,9 @@ def __init__(self, backends: MultiBackendConnection, config: AggregatorConfig):

batch_jobs = AggregatorBatchJobs(
backends=backends,
catalog=catalog,
processing=processing,
partitioned_job_tracker=partitioned_job_tracker
partitioned_job_tracker=partitioned_job_tracker,
)

secondary_services = AggregatorSecondaryServices(backends=backends, processing=processing, config=config)
Expand Down
1 change: 1 addition & 0 deletions src/openeo_aggregator/partitionedjobs/crossbackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def run_partitioned_job(
) -> dict:
"""
Run partitioned job (probably with dependencies between subjobs)
with an active polling loop for tracking and scheduling the subjobs
.. warning::
this is experimental functionality
Expand Down

0 comments on commit a81f8fb

Please sign in to comment.