Skip to content

Commit

Permalink
Save cleaned up data during the cleanup step (#904)
Browse files Browse the repository at this point in the history
* Save cleaned up data during the cleanup step

* Update ingestion_server/ingestion_server/cleanup.py

Co-authored-by: Krystle Salazar <[email protected]>

* Add .tsv files to .gitignore

* Refactor for readability

* Improve TLS_CACHE

---------

Co-authored-by: Krystle Salazar <[email protected]>
  • Loading branch information
obulat and krysal authored Mar 28, 2023
1 parent 3245120 commit fd199b9
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 25 deletions.
1 change: 1 addition & 0 deletions ingestion_server/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# Mutex
ingestion_server/db
ingestion_server/lock
ingestion_server/**.tsv
116 changes: 92 additions & 24 deletions ingestion_server/ingestion_server/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This includes cleaning up malformed URLs and filtering out undesirable tags.
"""

import csv
import logging as log
import multiprocessing
import time
Expand Down Expand Up @@ -50,6 +50,18 @@
# may be inaccurate.
TAG_MIN_CONFIDENCE = 0.90

# We know that flickr and wikimedia support TLS, so we can add them here
TLS_CACHE = {
"www.flickr.com": True,
"commons.wikimedia.org": True,
".geograph.org.uk": True,
"www.geograph.org.uk": True,
".eol.org": True,
"www.eol.org": True,
".digitaltmuseum.org": True,
"collections.musee-mccord.qc.ca": False,
}


def _tag_denylisted(tag):
"""Check if a tag is banned or contains a banned substring."""
Expand Down Expand Up @@ -89,7 +101,7 @@ def cleanup_url(url, tls_support):
except KeyError:
tls_supported = TlsTest.test_tls_supported(url)
tls_support[_tld] = tls_supported
log.info(f"Tested domain {_tld}")
log.debug(f"Tested domain {_tld}")

if tls_supported:
return f"'https://{url}'"
Expand Down Expand Up @@ -154,6 +166,17 @@ def cleanup_tags(tags):
}


def _get_cleanable_fields(table):
"""
Extract global and sources-specific field names from
_cleanup_config for specific table.
"""
cleanable_fields = []
for source in _cleanup_config["tables"][table]["sources"].values():
cleanable_fields += list(source["fields"].keys())
return cleanable_fields


class TlsTest:
"""
Test URLs to add the correct protocol when missing and use HTTPS when available.
Expand Down Expand Up @@ -184,25 +207,26 @@ def test_tls_supported(cls, url):
return True


def _clean_data_worker(rows, temp_table, sources_config):
def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]):
log.info("Starting data cleaning worker")
global_field_to_func = sources_config["*"]["fields"]
worker_conn = database_connect()
log.info("Data cleaning worker connected to database")
write_cur = worker_conn.cursor(cursor_factory=DictCursor)
log.info(f"Cleaning {len(rows)} rows")
tls_cache = {}

start_time = time.time()
cleaned_values = {field: [] for field in all_fields}
for row in rows:
source, _id, identifier = row["source"], row["id"], row["identifier"]

# Map fields that need updating to their cleaning functions
source = row["source"]
_id = row["id"]
fields_to_update = {**global_field_to_func}
if source in sources_config:
source_field_to_func = sources_config[source]["fields"]
# Merge source-local and global function field mappings
fields_to_update = {**global_field_to_func, **source_field_to_func}
else:
fields_to_update = global_field_to_func
fields_to_update |= {**source_field_to_func}

# Map fields to their cleaned data
cleaned_data = {}
for update_field in fields_to_update:
Expand All @@ -211,37 +235,59 @@ def _clean_data_worker(rows, temp_table, sources_config):
continue
cleaning_func = fields_to_update[update_field]
if cleaning_func == CleanupFunctions.cleanup_url:
clean = cleaning_func(url=dirty_value, tls_support=tls_cache)
clean = cleaning_func(url=dirty_value, tls_support=TLS_CACHE)
else:
clean = cleaning_func(dirty_value)
if clean:
cleaned_data[update_field] = clean
log.debug(
f"Updated {update_field} for {identifier} "
f"from '{dirty_value}' to '{clean}'"
)
# Generate SQL update for all the fields we just cleaned
update_field_expressions = []
for field in cleaned_data:
update_field_expressions.append(f"{field} = {cleaned_data[field]}")
for field, clean_value in cleaned_data.items():
update_field_expressions.append(f"{field} = {clean_value}")
cleaned_values[field].append((identifier, clean_value))

if len(update_field_expressions) > 0:
update_query = f"""UPDATE {temp_table} SET
{', '.join(update_field_expressions)} WHERE id = {_id}
"""
write_cur.execute(update_query)
log.info(f"TLS cache: {tls_cache}")
log.info(f"TLS cache: {TLS_CACHE}")
log.info("Worker committing changes...")
worker_conn.commit()
write_cur.close()
worker_conn.close()
end_time = time.time()
total_time = end_time - start_time
log.info(f"Worker finished batch in {total_time}")
return True
return cleaned_values


def save_cleaned_data(result: dict) -> dict[str, int]:
log.info("Saving cleaned data...")
start_time = time.time()

cleanup_counts = {field: len(items) for field, items in result.items()}
for field, cleaned_items in result.items():
if cleaned_items:
with open(f"{field}.tsv", "a") as f:
csv_writer = csv.writer(f, delimiter="\t")
csv_writer.writerows(cleaned_items)

end_time = time.time()
total_time = end_time - start_time
log.info(f"Finished saving cleaned data in {total_time},\n{cleanup_counts}")
return cleanup_counts


def clean_image_data(table):
"""
Clean up data loaded from upstream that is unsuitable for prod before going live.
:param table: The staging table for the new data
:param upstream_db: A dict specifying the connection details of the upstream DB
:return: None
"""

Expand All @@ -258,11 +304,11 @@ def clean_image_data(table):
fields_to_clean = set()
for p in sources:
_fields = list(table_config["sources"][p]["fields"])
for f in _fields:
fields_to_clean.add(f)
fields_to_clean.update(_fields)

cleanup_selection = (
f"SELECT id, source, " f"{', '.join(fields_to_clean)} from temp_import_{table}"
f"SELECT id, identifier, source, "
f"{', '.join(fields_to_clean)} from temp_import_{table}"
)
log.info(f'Running cleanup on selection "{cleanup_selection}"')
conn = database_connect(autocommit=True)
Expand All @@ -281,6 +327,9 @@ def clean_image_data(table):
jobs = []
num_workers = multiprocessing.cpu_count()
num_cleaned = 0
cleaned_counts_by_field = {field: 0 for field in fields_to_clean}
cleanable_fields_for_table = _get_cleanable_fields("image")

while batch:
# Divide updates into jobs for parallel execution.
batch_start_time = time.time()
Expand All @@ -294,22 +343,41 @@ def clean_image_data(table):
end = job_size * n
last_end = end
# Arguments for parallel _clean_data_worker calls
jobs.append((batch[start:end], temp_table, source_config))
jobs.append(
(
batch[start:end],
temp_table,
source_config,
cleanable_fields_for_table,
)
)
pool = multiprocessing.Pool(processes=num_workers)
log.info(f"Starting {len(jobs)} cleaning jobs")
conn.commit()
pool.starmap(_clean_data_worker, jobs)

results = pool.starmap(_clean_data_worker, jobs)

for result in results:
batch_cleaned_counts = save_cleaned_data(result)
for field in batch_cleaned_counts:
cleaned_counts_by_field[field] += batch_cleaned_counts[field]
pool.close()

num_cleaned += len(batch)
batch_end_time = time.time()
rate = len(batch) / (batch_end_time - batch_start_time)
log.info(f"Batch finished, records/s: cleanup_rate={rate}")
log.info(f"Fetching next batch. Records cleaned so far: {num_cleaned}")
log.info(
f"Batch finished, records/s: cleanup_rate={rate}, "
f"items cleaned: {batch_cleaned_counts}.\n"
f"Fetching next batch."
)
jobs = []
batch = iter_cur.fetchmany(size=CLEANUP_BUFFER_SIZE)
conn.commit()
iter_cur.close()
conn.close()
end_time = time.time()
cleanup_time = end_time - start_time
log.info(f"Cleaned all records in {cleanup_time} seconds")
log.info(
f"Cleaned all records in {cleanup_time} seconds,"
f"counts: {cleaned_counts_by_field}"
)
6 changes: 5 additions & 1 deletion ingestion_server/test/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,12 @@ def _ingest_upstream(self, model, suffix="integration"):
stat_msg = "The job should launch successfully and return 202 ACCEPTED."
self.assertEqual(res.status_code, 202, msg=stat_msg)

logging.info(
f"Waiting for the task to send us a callback {self.__class__.cb_queue}"
)

# Wait for the task to send us a callback.
assert self.__class__.cb_queue.get(timeout=120) == "CALLBACK!"
assert self.__class__.cb_queue.get(timeout=240) == "CALLBACK!"

# Check that the indices remained the same
after_indices = self._get_indices(self.downstream_db, model)
Expand Down

0 comments on commit fd199b9

Please sign in to comment.