diff --git a/ingestion_server/ingestion_server/cleanup.py b/ingestion_server/ingestion_server/cleanup.py index 2b196e7c370..7257431700f 100644 --- a/ingestion_server/ingestion_server/cleanup.py +++ b/ingestion_server/ingestion_server/cleanup.py @@ -291,12 +291,14 @@ class CleanDataUploader: } def __init__(self): + self.date = time.strftime("%Y-%m-%d") self.buffer_size = config("CLEANUP_BUFFER_SIZE", default=10_000_000, cast=int) bucket_name = config("OPENVERSE_BUCKET", default="openverse-catalog") try: self.s3 = self._get_s3_resource() self.s3_bucket = self.s3.Bucket(bucket_name) - self.date = time.strftime("%Y-%m-%d") + # Try loading the bucket's attributes to check the connection works. + self.s3_bucket.load() except Exception as e: log.error(f"Error connecting to S3 or creating bucket: {e}") self.s3 = None @@ -319,6 +321,10 @@ def _get_s3_resource(): ) def _upload_to_s3(self, field: str): + if not self.s3_bucket: + log.warning("No S3 bucket available, skipping upload.") + return + part_number = self.buffer[field].part log.info(f"Uploading file part {part_number} of `{field}` to S3...") s3_file_name = f"{self.s3_path}/{self.date}_{field}_{part_number}.tsv" @@ -336,7 +342,7 @@ def _upload_to_s3(self, field: str): def save(self, result: dict) -> dict[str, int]: for field, cleaned_items in result.items(): - if not cleaned_items or not self.s3_bucket: + if not cleaned_items: continue self.buffer[field].rows += cleaned_items @@ -348,7 +354,7 @@ def save(self, result: dict) -> dict[str, int]: def flush(self): log.info("Clearing buffer.") for field in self.buffer: - if self.buffer[field].rows and self.s3_bucket is not None: + if self.buffer[field].rows: self._upload_to_s3(field) diff --git a/ingestion_server/justfile b/ingestion_server/justfile index e3ea1a41412..2bd74fca727 100644 --- a/ingestion_server/justfile +++ b/ingestion_server/justfile @@ -98,7 +98,7 @@ create-and-populate-filtered-index model="image" destination_suffix="init": ######### # Run ingestion-server tests locally -test-local *args="--exitfirst": +test-local *args="--verbose --exitfirst": # populate the tldextract cache before running tests to prevent unnecessary network requests during tests # and from needing to mock essentially unmockable responses pipenv run tldextract --update