Skip to content

Commit

Permalink
Parallelize model data generation
Browse files Browse the repository at this point in the history
The tasks for model data generation are dynamically generated using a predefined bunch size. An additional task for each scenario handles the remaining grid districts using the real number/ids of the grid districts.
  • Loading branch information
nesnoj committed Jun 3, 2022
1 parent 2497eb9 commit c3542f3
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 17 deletions.
1 change: 1 addition & 0 deletions src/egon/data/datasets.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1058,3 +1058,4 @@ emobility_mit:
model_timeseries:
reduce_memory: True
export_results_to_csv: True
parallel_tasks: 10
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
"""

from functools import partial
from pathlib import Path
from urllib.request import urlretrieve
import os
Expand All @@ -93,8 +94,9 @@
allocate_evs_to_grid_districts
)
from egon.data.datasets.emobility.motorized_individual_travel.model_timeseries import (
generate_model_data_eGon2035,
generate_model_data_eGon100RE
generate_model_data_bunch,
generate_model_data_eGon2035_remaining,
generate_model_data_eGon100RE_remaining,
)
from egon.data.datasets.emobility.motorized_individual_travel.helpers import (
COLUMNS_KBA,
Expand All @@ -103,6 +105,7 @@
DATA_BUNDLE_DIR,
TESTMODE_OFF,
TRIP_COLUMN_MAPPING,
MVGD_MIN_COUNT,
)

# ========== Register np datatypes with SQLA ==========
Expand Down Expand Up @@ -351,6 +354,46 @@ def import_csv(f):

class MotorizedIndividualTravel(Dataset):
def __init__(self, dependencies):
def generate_model_data_tasks(scenario_name):
"""Dynamically generate tasks for model data creation.
The goal is to speed up the creation of model timeseries. However,
the exact number of parallel task cannot be determined during the
DAG building as the number of grid districts (MVGD) is calculated
within another pipeline task.
Approach: assuming an approx. count of `mvgd_min_count` of 3700,
the majority of the MVGDs can be parallelized. The remainder is
handled subsequently in XXX.
The number of parallel tasks is defined via parameter
`parallel_tasks` in the dataset config `datasets.yml`.
Parameters
----------
scenario_name : str
Scenario name
Returns
-------
set of functools.partial
The tasks. Each element is of
:func:`egon.data.datasets.emobility.motorized_individual_travel.model_timeseries.generate_model_data`
"""
parallel_tasks = DATASET_CFG["model_timeseries"].get(
"parallel_tasks",
1
)
mvgd_bunch_size = divmod(
MVGD_MIN_COUNT,
parallel_tasks
)[0]
return list(
partial(
generate_model_data_bunch,
scenario_name=scenario_name,
bunch=range(_ * mvgd_bunch_size, (_ + 1) * mvgd_bunch_size)
) for _ in range(parallel_tasks)
)

super().__init__(
name="MotorizedIndividualTravel",
version="0.0.0.dev",
Expand All @@ -360,6 +403,11 @@ def __init__(self, dependencies):
{(download_and_preprocess, allocate_evs_numbers),
(extract_trip_file, write_evs_trips_to_db)},
allocate_evs_to_grid_districts,
{generate_model_data_eGon2035, generate_model_data_eGon100RE}
{
{generate_model_data_tasks(scenario_name="eGon2035"),
generate_model_data_eGon2035_remaining},
{generate_model_data_tasks(scenario_name="eGon100RE"),
generate_model_data_eGon100RE_remaining}
},
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
"drive_end_timesteps": "drive_end",
"consumption_kWh": "consumption"
}
MVGD_MIN_COUNT = 3700 if TESTMODE_OFF else 150


def read_kba_data():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from egon.data.datasets.emobility.motorized_individual_travel.helpers import (
DATASET_CFG,
WORKING_DIR,
MVGD_MIN_COUNT,
read_simbev_metadata_file,
reduce_mem_usage,
)
Expand All @@ -56,6 +57,7 @@
EgonPfHvStore,
EgonPfHvStoreTimeseries,
)
from egon.data.datasets.mv_grid_districts import MvGridDistricts
from egon.data.datasets.scenario_parameters import get_sector_parameters


Expand Down Expand Up @@ -754,6 +756,19 @@ def write_to_file():
write_to_file()


def load_grid_district_ids() -> pd.Series:
"""Load bus IDs of all grid districts"""
with db.session_scope() as session:
query_mvgd = session.query(
MvGridDistricts.bus_id
)
return pd.read_sql(
query_mvgd.statement,
query_mvgd.session.bind,
index_col=None
).bus_id.sort_values()


def generate_model_data_grid_district(
scenario_name: str,
scenario_variation_parameters: dict,
Expand Down Expand Up @@ -813,21 +828,31 @@ def generate_model_data_grid_district(
return static_params, load_ts



def generate_model_data(scenario_name: str):
"""Generates timeseries from simBEV trip data for all MV grid districts
def generate_model_data_bunch(scenario_name: str, bunch: range) -> None:
"""Generates timeseries from simBEV trip data for a bunch of MV grid
districts.
Parameters
----------
scenario_name : str
Scenario name
bunch : list
Bunch of grid districts to generate data for, e.g. [1,2,..,100].
Note: `bunch` is NOT a list of grid districts but is used for slicing
the ordered list (by bus_id) of grid districts! This is used for
parallelization. See
:meth:`egon.data.datasets.emobility.motorized_individual_travel.MotorizedIndividualTravel.generate_model_data_tasks`
"""
# Get list of grid districts / substations for this bunch
mvgd_bus_ids = load_grid_district_ids().iloc[bunch]

# Get scenario variation name
scenario_var_name = DATASET_CFG["scenario"]["variation"][scenario_name]

print(
f"SCENARIO: {scenario_name}, SCENARIO VARIATION: {scenario_var_name}"
f"SCENARIO: {scenario_name}, "
f"SCENARIO VARIATION: {scenario_var_name}, "
f"BUNCH: {bunch[0]}-{bunch[-1]}"
)

# Load scenario params for scenario and scenario variation
Expand All @@ -841,12 +866,15 @@ def generate_model_data(scenario_name: str):
session.query(
EgonEvMvGridDistrict.bus_id,
EgonEvMvGridDistrict.egon_ev_pool_ev_id.label("ev_id"),
)
.filter(EgonEvMvGridDistrict.scenario == scenario_name)
.filter(
).filter(
EgonEvMvGridDistrict.scenario == scenario_name
).filter(
EgonEvMvGridDistrict.scenario_variation == scenario_var_name
).filter(
EgonEvMvGridDistrict.bus_id.in_(mvgd_bus_ids)
).filter(
EgonEvMvGridDistrict.egon_ev_pool_ev_id.isnot(None)
)
.filter(EgonEvMvGridDistrict.egon_ev_pool_ev_id.isnot(None))
)
evs_grid_district = pd.read_sql(
query.statement, query.session.bind, index_col=None
Expand Down Expand Up @@ -894,11 +922,21 @@ def generate_model_data(scenario_name: str):
)


def generate_model_data_eGon2035():
"""Generates timeseries for eGon2035 scenario"""
generate_model_data(scenario_name="eGon2035")
def generate_model_data_eGon2035_remaining():
"""Generates timeseries for eGon2035 scenario for grid districts which
has not been processed in the parallel tasks before.
"""
generate_model_data_bunch(
scenario_name="eGon2035",
bunch=range(MVGD_MIN_COUNT, len(load_grid_district_ids()))
)


def generate_model_data_eGon100RE():
"""Generates timeseries for eGon100RE scenario"""
generate_model_data(scenario_name="eGon100RE")
def generate_model_data_eGon100RE_remaining():
"""Generates timeseries for eGon100RE scenario for grid districts which
has not been processed in the parallel tasks before.
"""
generate_model_data_bunch(
scenario_name="eGon100RE",
bunch=range(MVGD_MIN_COUNT, len(load_grid_district_ids()))
)

0 comments on commit c3542f3

Please sign in to comment.