diff --git a/ingestion_server/ingestion_server/cleanup.py b/ingestion_server/ingestion_server/cleanup.py index 8d1fae097b3..1aafd7a95e8 100644 --- a/ingestion_server/ingestion_server/cleanup.py +++ b/ingestion_server/ingestion_server/cleanup.py @@ -351,16 +351,19 @@ def clean_image_data(table): cleanable_fields_for_table, ) ) - with multiprocessing.Pool(processes=num_workers) as pool: - log.info(f"Starting {len(jobs)} cleaning jobs") - - for result in pool.starmap(_clean_data_worker, jobs): - batch_cleaned_counts = save_cleaned_data(result) - log.info(f"Batch cleaned counts: {batch_cleaned_counts}") - log.info(f"Multiprocessing batch finished, result: {result}") - for field in batch_cleaned_counts: - cleaned_counts_by_field[field] += batch_cleaned_counts[field] - log.info("Finished cleaning jobs") + pool = multiprocessing.Pool(processes=num_workers) + log.info(f"Starting {len(jobs)} cleaning jobs") + + results = pool.starmap(_clean_data_worker, jobs) + log.info(f"Multiprocessing pool finished, results: {results}") + for result in results: + batch_cleaned_counts = save_cleaned_data(result) + log.info(f"Batch cleaned counts: {batch_cleaned_counts}") + log.info(f"Multiprocessing batch finished, result: {result}") + for field in batch_cleaned_counts: + cleaned_counts_by_field[field] += batch_cleaned_counts[field] + pool.close() + log.info("Finished cleaning jobs") log.info("Finished multiprocessing pool") num_cleaned += len(batch) batch_end_time = time.time()