diff --git a/ingestion_server/.gitignore b/ingestion_server/.gitignore index 8d789d0ab7b..2eb231ef50a 100644 --- a/ingestion_server/.gitignore +++ b/ingestion_server/.gitignore @@ -1,3 +1,4 @@ # Mutex ingestion_server/db ingestion_server/lock +ingestion_server/**.tsv diff --git a/ingestion_server/ingestion_server/cleanup.py b/ingestion_server/ingestion_server/cleanup.py index c634e1ab5e8..20fc4d14a79 100644 --- a/ingestion_server/ingestion_server/cleanup.py +++ b/ingestion_server/ingestion_server/cleanup.py @@ -3,7 +3,7 @@ This includes cleaning up malformed URLs and filtering out undesirable tags. """ - +import csv import logging as log import multiprocessing import time @@ -50,6 +50,18 @@ # may be inaccurate. TAG_MIN_CONFIDENCE = 0.90 +# We know that flickr and wikimedia support TLS, so we can add them here +TLS_CACHE = { + "www.flickr.com": True, + "commons.wikimedia.org": True, + ".geograph.org.uk": True, + "www.geograph.org.uk": True, + ".eol.org": True, + "www.eol.org": True, + ".digitaltmuseum.org": True, + "collections.musee-mccord.qc.ca": False, +} + def _tag_denylisted(tag): """Check if a tag is banned or contains a banned substring.""" @@ -89,7 +101,7 @@ def cleanup_url(url, tls_support): except KeyError: tls_supported = TlsTest.test_tls_supported(url) tls_support[_tld] = tls_supported - log.info(f"Tested domain {_tld}") + log.debug(f"Tested domain {_tld}") if tls_supported: return f"'https://{url}'" @@ -154,6 +166,17 @@ def cleanup_tags(tags): } +def _get_cleanable_fields(table): + """ + Extract global and sources-specific field names from + _cleanup_config for specific table. + """ + cleanable_fields = [] + for source in _cleanup_config["tables"][table]["sources"].values(): + cleanable_fields += list(source["fields"].keys()) + return cleanable_fields + + class TlsTest: """ Test URLs to add the correct protocol when missing and use HTTPS when available. @@ -184,25 +207,26 @@ def test_tls_supported(cls, url): return True -def _clean_data_worker(rows, temp_table, sources_config): +def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]): log.info("Starting data cleaning worker") global_field_to_func = sources_config["*"]["fields"] worker_conn = database_connect() log.info("Data cleaning worker connected to database") write_cur = worker_conn.cursor(cursor_factory=DictCursor) log.info(f"Cleaning {len(rows)} rows") - tls_cache = {} + start_time = time.time() + cleaned_values = {field: [] for field in all_fields} for row in rows: + source, _id, identifier = row["source"], row["id"], row["identifier"] + # Map fields that need updating to their cleaning functions - source = row["source"] - _id = row["id"] + fields_to_update = {**global_field_to_func} if source in sources_config: source_field_to_func = sources_config[source]["fields"] # Merge source-local and global function field mappings - fields_to_update = {**global_field_to_func, **source_field_to_func} - else: - fields_to_update = global_field_to_func + fields_to_update |= {**source_field_to_func} + # Map fields to their cleaned data cleaned_data = {} for update_field in fields_to_update: @@ -211,21 +235,27 @@ def _clean_data_worker(rows, temp_table, sources_config): continue cleaning_func = fields_to_update[update_field] if cleaning_func == CleanupFunctions.cleanup_url: - clean = cleaning_func(url=dirty_value, tls_support=tls_cache) + clean = cleaning_func(url=dirty_value, tls_support=TLS_CACHE) else: clean = cleaning_func(dirty_value) if clean: cleaned_data[update_field] = clean + log.debug( + f"Updated {update_field} for {identifier} " + f"from '{dirty_value}' to '{clean}'" + ) # Generate SQL update for all the fields we just cleaned update_field_expressions = [] - for field in cleaned_data: - update_field_expressions.append(f"{field} = {cleaned_data[field]}") + for field, clean_value in cleaned_data.items(): + update_field_expressions.append(f"{field} = {clean_value}") + cleaned_values[field].append((identifier, clean_value)) + if len(update_field_expressions) > 0: update_query = f"""UPDATE {temp_table} SET {', '.join(update_field_expressions)} WHERE id = {_id} """ write_cur.execute(update_query) - log.info(f"TLS cache: {tls_cache}") + log.info(f"TLS cache: {TLS_CACHE}") log.info("Worker committing changes...") worker_conn.commit() write_cur.close() @@ -233,7 +263,24 @@ def _clean_data_worker(rows, temp_table, sources_config): end_time = time.time() total_time = end_time - start_time log.info(f"Worker finished batch in {total_time}") - return True + return cleaned_values + + +def save_cleaned_data(result: dict) -> dict[str, int]: + log.info("Saving cleaned data...") + start_time = time.time() + + cleanup_counts = {field: len(items) for field, items in result.items()} + for field, cleaned_items in result.items(): + if cleaned_items: + with open(f"{field}.tsv", "a") as f: + csv_writer = csv.writer(f, delimiter="\t") + csv_writer.writerows(cleaned_items) + + end_time = time.time() + total_time = end_time - start_time + log.info(f"Finished saving cleaned data in {total_time},\n{cleanup_counts}") + return cleanup_counts def clean_image_data(table): @@ -241,7 +288,6 @@ def clean_image_data(table): Clean up data loaded from upstream that is unsuitable for prod before going live. :param table: The staging table for the new data - :param upstream_db: A dict specifying the connection details of the upstream DB :return: None """ @@ -258,11 +304,11 @@ def clean_image_data(table): fields_to_clean = set() for p in sources: _fields = list(table_config["sources"][p]["fields"]) - for f in _fields: - fields_to_clean.add(f) + fields_to_clean.update(_fields) cleanup_selection = ( - f"SELECT id, source, " f"{', '.join(fields_to_clean)} from temp_import_{table}" + f"SELECT id, identifier, source, " + f"{', '.join(fields_to_clean)} from temp_import_{table}" ) log.info(f'Running cleanup on selection "{cleanup_selection}"') conn = database_connect(autocommit=True) @@ -281,6 +327,9 @@ def clean_image_data(table): jobs = [] num_workers = multiprocessing.cpu_count() num_cleaned = 0 + cleaned_counts_by_field = {field: 0 for field in fields_to_clean} + cleanable_fields_for_table = _get_cleanable_fields("image") + while batch: # Divide updates into jobs for parallel execution. batch_start_time = time.time() @@ -294,17 +343,33 @@ def clean_image_data(table): end = job_size * n last_end = end # Arguments for parallel _clean_data_worker calls - jobs.append((batch[start:end], temp_table, source_config)) + jobs.append( + ( + batch[start:end], + temp_table, + source_config, + cleanable_fields_for_table, + ) + ) pool = multiprocessing.Pool(processes=num_workers) log.info(f"Starting {len(jobs)} cleaning jobs") - conn.commit() - pool.starmap(_clean_data_worker, jobs) + + results = pool.starmap(_clean_data_worker, jobs) + + for result in results: + batch_cleaned_counts = save_cleaned_data(result) + for field in batch_cleaned_counts: + cleaned_counts_by_field[field] += batch_cleaned_counts[field] pool.close() + num_cleaned += len(batch) batch_end_time = time.time() rate = len(batch) / (batch_end_time - batch_start_time) - log.info(f"Batch finished, records/s: cleanup_rate={rate}") - log.info(f"Fetching next batch. Records cleaned so far: {num_cleaned}") + log.info( + f"Batch finished, records/s: cleanup_rate={rate}, " + f"items cleaned: {batch_cleaned_counts}.\n" + f"Fetching next batch." + ) jobs = [] batch = iter_cur.fetchmany(size=CLEANUP_BUFFER_SIZE) conn.commit() @@ -312,4 +377,7 @@ def clean_image_data(table): conn.close() end_time = time.time() cleanup_time = end_time - start_time - log.info(f"Cleaned all records in {cleanup_time} seconds") + log.info( + f"Cleaned all records in {cleanup_time} seconds," + f"counts: {cleaned_counts_by_field}" + ) diff --git a/ingestion_server/test/integration_test.py b/ingestion_server/test/integration_test.py index 98891195aae..7770c6483a4 100644 --- a/ingestion_server/test/integration_test.py +++ b/ingestion_server/test/integration_test.py @@ -253,8 +253,12 @@ def _ingest_upstream(self, model, suffix="integration"): stat_msg = "The job should launch successfully and return 202 ACCEPTED." self.assertEqual(res.status_code, 202, msg=stat_msg) + logging.info( + f"Waiting for the task to send us a callback {self.__class__.cb_queue}" + ) + # Wait for the task to send us a callback. - assert self.__class__.cb_queue.get(timeout=120) == "CALLBACK!" + assert self.__class__.cb_queue.get(timeout=240) == "CALLBACK!" # Check that the indices remained the same after_indices = self._get_indices(self.downstream_db, model)