Skip to content

Commit

Permalink
Save cleaned data of Ingestion Server to AWS S3 (#4163)
Browse files Browse the repository at this point in the history
* Use proper perf_counter for timers
* Refine local file management and catch exceptions
* Adjust tests
* Simplify saving cleaned rows
* Load the bucket's attributes

---------

Co-authored-by: Madison Swain-Bowden <[email protected]>
  • Loading branch information
krysal and AetherUnbound authored May 9, 2024
1 parent b868207 commit 43d4d09
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 29 deletions.
2 changes: 1 addition & 1 deletion docker/minio/env.template
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ MINIO_ROOT_USER=test_key
MINIO_ROOT_PASSWORD=test_secret

# Comma separated list of buckets to create on startup
BUCKETS_TO_CREATE=openverse-storage,openverse-airflow-logs
BUCKETS_TO_CREATE=openverse-catalog,openverse-storage,openverse-airflow-logs
122 changes: 95 additions & 27 deletions ingestion_server/ingestion_server/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@
import csv
import logging as log
import multiprocessing
import os
import time
import uuid
from dataclasses import dataclass
from urllib.parse import urlparse

import boto3
import requests as re
import tldextract
from decouple import config
from psycopg2.extras import DictCursor, Json

from ingestion_server.db_helpers import database_connect
Expand Down Expand Up @@ -217,7 +221,7 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]):
write_cur = worker_conn.cursor(cursor_factory=DictCursor)
log.info(f"Cleaning {len(rows)} rows")

start_time = time.time()
start_time = time.perf_counter()
cleaned_values = {field: [] for field in all_fields}
for row in rows:
source, _id, identifier = row["source"], row["id"], row["identifier"]
Expand Down Expand Up @@ -251,9 +255,6 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]):
for field, clean_value in cleaned_data.items():
update_field_expressions.append(f"{field} = {clean_value}")
# Save cleaned values for later
# (except for tags, which take up too much space)
if field == "tags":
continue
cleaned_values[field].append((identifier, clean_value))

if len(update_field_expressions) > 0:
Expand All @@ -266,30 +267,95 @@ def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]):
worker_conn.commit()
write_cur.close()
worker_conn.close()
end_time = time.time()
end_time = time.perf_counter()
total_time = end_time - start_time
log.info(f"Worker finished batch in {total_time}")
return cleaned_values


def save_cleaned_data(result: dict) -> dict[str, int]:
log.info("Saving cleaned data...")
start_time = time.time()
@dataclass
class FieldBuffered:
part: int
rows: list[tuple[str, str]]

cleanup_counts = {field: len(items) for field, items in result.items()}
for field, cleaned_items in result.items():
# Skip the tag field because the file is too large and fills up the disk
if field == "tag":
continue
if cleaned_items:
with open(f"{field}.tsv", "a") as f:
csv_writer = csv.writer(f, delimiter="\t")
csv_writer.writerows(cleaned_items)

end_time = time.time()
total_time = end_time - start_time
log.info(f"Finished saving cleaned data in {total_time},\n{cleanup_counts}")
return cleanup_counts
class CleanDataUploader:
# Number of lines to keep in memory before writing to S3
buffer_size: int

s3_path = "shared/data-refresh-cleaned-data"

buffer = {
field: FieldBuffered(part=1, rows=[])
for field in _cleanup_config["tables"]["image"]["sources"]["*"]["fields"]
}

def __init__(self):
self.date = time.strftime("%Y-%m-%d")
self.buffer_size = config("CLEANUP_BUFFER_SIZE", default=10_000_000, cast=int)
bucket_name = config("OPENVERSE_BUCKET", default="openverse-catalog")
try:
self.s3 = self._get_s3_resource()
self.s3_bucket = self.s3.Bucket(bucket_name)
# Try loading the bucket's attributes to check the connection works.
self.s3_bucket.load()
except Exception as e:
log.error(f"Error connecting to S3 or creating bucket: {e}")
self.s3 = None
self.s3_bucket = None

@staticmethod
def _get_s3_resource():
if config("ENVIRONMENT", default="local") == "local":
return boto3.resource(
"s3",
endpoint_url=config("AWS_S3_ENDPOINT", default="http://s3:5000"),
aws_access_key_id=config("AWS_ACCESS_KEY_ID", default="test_key"),
aws_secret_access_key=config(
"AWS_SECRET_ACCESS_KEY", default="test_secret"
),
)

return boto3.resource(
"s3", region_name=config("AWS_REGION", default="us-east-1")
)

def _upload_to_s3(self, field: str):
if not self.s3_bucket:
log.warning("No S3 bucket available, skipping upload.")
return

part_number = self.buffer[field].part
log.info(f"Uploading file part {part_number} of `{field}` to S3...")
s3_file_name = f"{self.s3_path}/{self.date}_{field}_{part_number}.tsv"
tsv_file = f"{field}.tsv"
with open(tsv_file, "w") as f:
csv_writer = csv.writer(f, delimiter="\t")
csv_writer.writerows(self.buffer[field].rows)
try:
self.s3_bucket.upload_file(tsv_file, s3_file_name)
except Exception as e:
log.error(f"Error uploading {field} to S3: {e}")
os.remove(tsv_file)
self.buffer[field].part += 1
self.buffer[field].rows = []

def save(self, result: dict) -> dict[str, int]:
for field, cleaned_items in result.items():
if not cleaned_items:
continue

self.buffer[field].rows += cleaned_items
if len(self.buffer[field].rows) >= self.buffer_size:
self._upload_to_s3(field)

return {field: len(items) for field, items in result.items()}

def flush(self):
log.info("Clearing buffer.")
for field in self.buffer:
if self.buffer[field].rows:
self._upload_to_s3(field)


def clean_image_data(table):
Expand All @@ -299,11 +365,12 @@ def clean_image_data(table):
:param table: The staging table for the new data
:return: None
"""
data_uploader = CleanDataUploader()

# Map each table to the fields that need to be cleaned up. Then, map each
# field to its cleanup function.
log.info("Cleaning up data...")
start_time = time.time()
start_time = time.perf_counter()
table_config = _cleanup_config["tables"][table]

# Pull data from selected sources only.
Expand Down Expand Up @@ -341,7 +408,7 @@ def clean_image_data(table):

while batch:
# Divide updates into jobs for parallel execution.
batch_start_time = time.time()
batch_start_time = time.perf_counter()
temp_table = f"temp_import_{table}"
job_size = int(len(batch) / num_workers)
last_end = -1
Expand All @@ -364,14 +431,14 @@ def clean_image_data(table):
log.info(f"Starting {len(jobs)} cleaning jobs")

for result in pool.starmap(_clean_data_worker, jobs):
batch_cleaned_counts = save_cleaned_data(result)
batch_cleaned_counts = data_uploader.save(result)
for field in batch_cleaned_counts:
cleaned_counts_by_field[field] += batch_cleaned_counts[field]
pool.close()
pool.join()

num_cleaned += len(batch)
batch_end_time = time.time()
batch_end_time = time.perf_counter()
rate = len(batch) / (batch_end_time - batch_start_time)
log.info(
f"Batch finished, records/s: cleanup_rate={rate}, "
Expand All @@ -383,9 +450,10 @@ def clean_image_data(table):
conn.commit()
iter_cur.close()
conn.close()
end_time = time.time()
data_uploader.flush()
end_time = time.perf_counter()
cleanup_time = end_time - start_time
log.info(
f"Cleaned all records in {cleanup_time} seconds,"
f"Cleaned all records in {cleanup_time:.3f} seconds,"
f"counts: {cleaned_counts_by_field}"
)
2 changes: 1 addition & 1 deletion ingestion_server/test/unit_tests/test_slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"environment",
[
# Default environment
None,
"local",
# Different, explicit environment
"staging",
],
Expand Down

0 comments on commit 43d4d09

Please sign in to comment.