Skip to content

Commit

Permalink
Handle exception in case S3 upload fails
Browse files Browse the repository at this point in the history
  • Loading branch information
krysal committed Apr 21, 2024
1 parent 24a8905 commit a9d9894
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions ingestion_server/ingestion_server/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,14 +319,19 @@ def _save_to_disk(self, field: str, force_upload=False):
log.info(
f"Saving {len(self.buffer[field].rows)} rows of `{field}` to local file."
)
with open(f"{field}.tsv", "a") as f:
filename = f"{field}.tsv"

with open(filename, "a") as f:
csv_writer = csv.writer(f, delimiter="\t")
csv_writer.writerows(self.buffer[field].rows)
# Clean memory buffer of saved rows
self.buffer[field].rows = []

if os.path.getsize(f"{field}.tsv") >= self.disk_buffer_size or force_upload:
self._upload_to_s3(field)
if os.path.getsize(filename) >= self.disk_buffer_size or force_upload:
try:
self._upload_to_s3(field)
except Exception as e:
log.error(f"Error uploading {field} to S3: {e}")

def _upload_to_s3(self, field: str):
part_number = self.buffer[field].part
Expand All @@ -338,10 +343,12 @@ def _upload_to_s3(self, field: str):

def save(self, result: dict) -> dict[str, int]:
for field, cleaned_items in result.items():
if cleaned_items:
self.buffer[field].rows += cleaned_items
if len(self.buffer[field].rows) >= self.mem_buffer_size:
self._save_to_disk(field)
if not cleaned_items:
continue

self.buffer[field].rows += cleaned_items
if len(self.buffer[field].rows) >= self.mem_buffer_size:
self._save_to_disk(field)

return {field: len(items) for field, items in result.items()}

Expand Down

0 comments on commit a9d9894

Please sign in to comment.