Skip to content

Commit

Permalink
Linting
Browse files Browse the repository at this point in the history
  • Loading branch information
rzats committed Aug 7, 2023
1 parent 20abd8c commit b90e941
Showing 1 changed file with 61 additions and 60 deletions.
121 changes: 61 additions & 60 deletions quidel_covidtest/delphi_quidel_covidtest/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import atexit
from datetime import datetime
from multiprocessing import Manager, Pool, cpu_count, current_process
import os
import time
from typing import Dict, Any

Expand Down Expand Up @@ -169,69 +168,71 @@ def run_module(params: Dict[str, Any]):
prefix="wip_")
smoothers = get_smooth_info(sensors, SMOOTHERS)
n_cpu = min(8, cpu_count()) # for parallelization
lock = Manager().Lock() # for using loggers in multiple threads
logger.info("Parallelizing sensor generation", n_cpu=n_cpu)
with Pool(n_cpu) as pool:
pool_results = []
for geo_res in NONPARENT_GEO_RESOLUTIONS:
geo_data, res_key = geo_map(geo_res, data)
geo_groups = geo_data.groupby(res_key)
for agegroup in AGE_GROUPS:
for sensor in sensors:
if agegroup == "total":
sensor_name = sensor
else:
sensor_name = "_".join([sensor, agegroup])
pool_results.append(
pool.apply_async(
generate_and_export_for_nonparent_geo,
args=(
# generate_sensors_for_parent_geo
geo_groups, res_key,
smoothers[sensor][1], smoothers[sensor][0],
first_date, last_date, agegroup,
# create_export_csv
geo_res, sensor_name, export_dir,
export_start_date, export_end_date,
# logger params
lock,
params["common"].get("log_filename"),
params["common"].get("log_exceptions", True)
with Manager() as manager:
# for using loggers in multiple threads
lock = manager.Lock() # pylint: disable=no-member
logger.info("Parallelizing sensor generation", n_cpu=n_cpu)
with Pool(n_cpu) as pool:
pool_results = []
for geo_res in NONPARENT_GEO_RESOLUTIONS:
geo_data, res_key = geo_map(geo_res, data)
geo_groups = geo_data.groupby(res_key)
for agegroup in AGE_GROUPS:
for sensor in sensors:
if agegroup == "total":
sensor_name = sensor
else:
sensor_name = "_".join([sensor, agegroup])
pool_results.append(
pool.apply_async(
generate_and_export_for_nonparent_geo,
args=(
# generate_sensors_for_parent_geo
geo_groups, res_key,
smoothers[sensor][1], smoothers[sensor][0],
first_date, last_date, agegroup,
# create_export_csv
geo_res, sensor_name, export_dir,
export_start_date, export_end_date,
# logger params
lock,
params["common"].get("log_filename"),
params["common"].get("log_exceptions", True)
)
)
)
)
assert geo_res == "state" # Make sure geo_groups is for state level
# County/HRR/MSA level
for geo_res in PARENT_GEO_RESOLUTIONS:
geo_data, res_key = geo_map(geo_res, data) # using the last geo_groups
for agegroup in AGE_GROUPS:
for sensor in sensors:
if agegroup == "total":
sensor_name = sensor
else:
sensor_name = "_".join([sensor, agegroup])
pool_results.append(
pool.apply_async(
generate_and_export_for_parent_geo,
args=(
# generate_sensors_for_parent_geo
geo_groups, geo_data, res_key,
smoothers[sensor][1], smoothers[sensor][0],
first_date, last_date, agegroup,
# create_export_csv
geo_res, sensor_name, export_dir,
export_start_date, export_end_date,
# logger params
lock,
params["common"].get("log_filename"),
params["common"].get("log_exceptions", True)
assert geo_res == "state" # Make sure geo_groups is for state level
# County/HRR/MSA level
for geo_res in PARENT_GEO_RESOLUTIONS:
geo_data, res_key = geo_map(geo_res, data) # using the last geo_groups
for agegroup in AGE_GROUPS:
for sensor in sensors:
if agegroup == "total":
sensor_name = sensor
else:
sensor_name = "_".join([sensor, agegroup])
pool_results.append(
pool.apply_async(
generate_and_export_for_parent_geo,
args=(
# generate_sensors_for_parent_geo
geo_groups, geo_data, res_key,
smoothers[sensor][1], smoothers[sensor][0],
first_date, last_date, agegroup,
# create_export_csv
geo_res, sensor_name, export_dir,
export_start_date, export_end_date,
# logger params
lock,
params["common"].get("log_filename"),
params["common"].get("log_exceptions", True)
)
)
)
)
pool_results = [proc.get() for proc in pool_results]
for dates in pool_results:
if len(dates) > 0:
stats.append((max(dates), len(dates)))
pool_results = [proc.get() for proc in pool_results]
for dates in pool_results:
if len(dates) > 0:
stats.append((max(dates), len(dates)))

# Export the cache file if the pipeline runs successfully.
# Otherwise, don't update the cache file
Expand Down

0 comments on commit b90e941

Please sign in to comment.