diff --git a/ingestion_server/ingestion_server/cleanup.py b/ingestion_server/ingestion_server/cleanup.py index b33ab0ce36c..231433822ba 100644 --- a/ingestion_server/ingestion_server/cleanup.py +++ b/ingestion_server/ingestion_server/cleanup.py @@ -319,14 +319,19 @@ def _save_to_disk(self, field: str, force_upload=False): log.info( f"Saving {len(self.buffer[field].rows)} rows of `{field}` to local file." ) - with open(f"{field}.tsv", "a") as f: + filename = f"{field}.tsv" + + with open(filename, "a") as f: csv_writer = csv.writer(f, delimiter="\t") csv_writer.writerows(self.buffer[field].rows) # Clean memory buffer of saved rows self.buffer[field].rows = [] - if os.path.getsize(f"{field}.tsv") >= self.disk_buffer_size or force_upload: - self._upload_to_s3(field) + if os.path.getsize(filename) >= self.disk_buffer_size or force_upload: + try: + self._upload_to_s3(field) + except Exception as e: + log.error(f"Error uploading {field} to S3: {e}") def _upload_to_s3(self, field: str): part_number = self.buffer[field].part @@ -338,10 +343,12 @@ def _upload_to_s3(self, field: str): def save(self, result: dict) -> dict[str, int]: for field, cleaned_items in result.items(): - if cleaned_items: - self.buffer[field].rows += cleaned_items - if len(self.buffer[field].rows) >= self.mem_buffer_size: - self._save_to_disk(field) + if not cleaned_items: + continue + + self.buffer[field].rows += cleaned_items + if len(self.buffer[field].rows) >= self.mem_buffer_size: + self._save_to_disk(field) return {field: len(items) for field, items in result.items()}