Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a bug where meta-indicators are dropped during region-processing #228

Merged
merged 9 commits into from
Mar 15, 2023
107 changes: 59 additions & 48 deletions nomenclature/processor/region.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing import Dict, List, Optional, Union

import jsonschema
import numpy as np
import pandas as pd
import pyam
import pydantic
import yaml
Expand Down Expand Up @@ -406,24 +408,27 @@ def apply(self, df: IamDataFrame) -> IamDataFrame:
"""
processed_dfs: List[IamDataFrame] = []
for model in df.model:

model_df = df.filter(model=model)

# If no mapping is defined the data frame is returned unchanged
if model not in self.mappings:
logger.info(f"No model mapping found for model {model}")
logger.info(f"No model mapping found for model '{model}'")
danielhuppmann marked this conversation as resolved.
Show resolved Hide resolved
processed_dfs.append(model_df)

# Otherwise we first rename, then aggregate
else:
# before aggregating, check that all regions are valid

# Before aggregating, check that all regions are valid
self.mappings[model].validate_regions(self.region_codelist)
file = self.mappings[model].file
logger.info(
f"Applying region-processing for model {model} from file "
f"{self.mappings[model].file}"
f"Applying region-processing for model '{model}' from '{file}'"
)

# Check for regions not mentioned in the model mapping
self.mappings[model].check_unexpected_regions(model_df)
_processed_dfs = []
_processed_data: List[pd.Series] = []

# Silence pyam's empty filter warnings
with adjust_log_level(logger="pyam", level="ERROR"):
Expand All @@ -433,8 +438,10 @@ def apply(self, df: IamDataFrame) -> IamDataFrame:
region=self.mappings[model].model_native_region_names
)
if not _df.empty:
_processed_dfs.append(
_df.rename(region=self.mappings[model].rename_mapping)
_processed_data.append(
_df.rename(
region=self.mappings[model].rename_mapping
)._data
)

# Aggregate
Expand All @@ -444,11 +451,11 @@ def apply(self, df: IamDataFrame) -> IamDataFrame:
# If the common region is only comprised of a single model
# native region, just rename
if cr.is_single_constituent_region:
_processed_dfs.append(
model_df.filter(
region=cr.constituent_regions[0]
).rename(region=cr.rename_dict)
)
_df = model_df.filter(
region=cr.constituent_regions[0]
).rename(region=cr.rename_dict)
if not _df.empty:
_processed_data.append(_df._data)
continue

# if there are multiple constituent regions, aggregate
Expand All @@ -461,12 +468,12 @@ def apply(self, df: IamDataFrame) -> IamDataFrame:
df.variable
)
]
_processed_dfs.append(
model_df.aggregate_region(
simple_vars,
*regions,
)
_df = model_df.aggregate_region(
simple_vars,
*regions,
)
if _df is not None and not _df.empty:
_processed_data.append(_df._data)

# Second, special weighted aggregation
for var in self.variable_codelist.vars_kwargs(df.variable):
Expand All @@ -478,7 +485,7 @@ def apply(self, df: IamDataFrame) -> IamDataFrame:
**var.pyam_agg_kwargs,
)
if _df is not None and not _df.empty:
_processed_dfs.append(_df)
_processed_data.append(_df._data)
else:
for rename_var in var.region_aggregation:
for _rename, _kwargs in rename_var.items():
Expand All @@ -489,10 +496,10 @@ def apply(self, df: IamDataFrame) -> IamDataFrame:
**_kwargs,
)
if _df is not None and not _df.empty:
_processed_dfs.append(
_processed_data.append(
_df.rename(
variable={var.name: _rename}
)
)._data
)

common_region_df = model_df.filter(
Expand All @@ -501,20 +508,24 @@ def apply(self, df: IamDataFrame) -> IamDataFrame:
)

# concatenate and merge with data provided at common-region level
if _processed_dfs:
processed_dfs.append(
_merge_with_provided_data(_processed_dfs, common_region_df)
)
if _processed_data:
_data = pd.concat(_processed_data)
if not common_region_df.empty:
_data = _compare_and_merge(common_region_df._data, _data)

# if data exists only at the common-region level
elif not common_region_df.empty:
processed_dfs.append(common_region_df)
_data = common_region_df._data

# raise an error if processed_dfs has no entries or all are empty
if not processed_dfs or all(df.empty for df in processed_dfs):
raise ValueError(
f"The region-processing for model(s) {df.model} returned an empty "
"dataset"
)
# raise an error if region-processing yields an empty result
else:
raise ValueError(
f"Region-processing for model '{model}' returned an empty "
"dataset"
)

# cast processed timeseries data and meta indicators to IamDataFrame
processed_dfs.append(IamDataFrame(_data, meta=model_df.meta))

return pyam.concat(processed_dfs)

Expand All @@ -532,28 +543,28 @@ def _aggregate_region(df, var, *regions, **kwargs):
raise e


def _merge_with_provided_data(
_processed_df: IamDataFrame, common_region_df: IamDataFrame
) -> IamDataFrame:
"""Compare and merge provided and aggregated results"""
def _compare_and_merge(original: pd.Series, aggregated: pd.Series, ) -> IamDataFrame:
"""Compare and merge original and aggregated results"""

# validate that aggregated data matches to original data
aggregate_df = pyam.concat(_processed_df)
compare = pyam.compare(
common_region_df,
aggregate_df,
left_label="common-region",
right_label="aggregation",
# compare processed (aggregated) data and data provided at the common-region level
compare = pd.merge(
left=original.rename(index="original"),
right=aggregated.rename(index="aggregated"),
how="outer",
left_index=True,
right_index=True,
)
# drop all data which is not in both data frames
diff = compare.dropna()

if diff is not None and len(diff):
logging.warning("Difference between original and aggregated data:\n" f"{diff}")
# drop rows that are not in conflict
compare = compare.dropna()
compare = compare[~np.isclose(compare["original"], compare["aggregated"])]

if compare is not None and len(compare):
logging.warning(f"Difference between original and aggregated data:\n{compare}")

# merge aggregated data onto original common-region data
index = aggregate_df._data.index.difference(common_region_df._data.index)
return common_region_df.append(aggregate_df._data[index])
index = aggregated.index.difference(original.index)
return pd.concat([original, aggregated[index]])


def _check_exclude_region_overlap(values: Dict, region_type: str) -> Dict:
Expand Down
15 changes: 14 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pathlib import Path
from typing import Dict, List
import pytest
import numpy as np
import pandas as pd
from pyam import IamDataFrame, IAMC_IDX
from nomenclature import DataStructureDefinition
Expand Down Expand Up @@ -33,7 +34,19 @@ def extras_definition():

@pytest.fixture(scope="function")
def simple_df():
yield IamDataFrame(TEST_DF)
df = IamDataFrame(TEST_DF)
add_meta(df)
yield df


def add_meta(df):
"""Add simple meta indicators"""
if len(df.index) == 1:
df.set_meta([1.], "number")
df.set_meta(["foo"], "string")
if len(df.index) == 2:
df.set_meta([1., 2.], "number")
df.set_meta(["foo", np.nan], "string")


def remove_file_from_mapping(mapping: Dict[str, Code]) -> List[Dict]:
Expand Down
33 changes: 24 additions & 9 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from nomenclature.processor.region import RegionProcessor
from pyam import IAMC_IDX, IamDataFrame, assert_iamframe_equal

from conftest import TEST_DATA_DIR
from conftest import TEST_DATA_DIR, add_meta


@pytest.mark.parametrize("model_name", ["model_a", "model_c"])
Expand All @@ -32,6 +32,7 @@ def test_region_processing_rename(model_name):
columns=IAMC_IDX + [2005, 2010],
)
)
add_meta(test_df)

exp = copy.deepcopy(test_df)
exp.filter(region=["region_a", "region_B"], inplace=True)
Expand Down Expand Up @@ -62,7 +63,7 @@ def test_region_processing_empty_raises(rp_dir):
columns=IAMC_IDX + [2005, 2010],
)
)
with pytest.raises(ValueError, match=("'model_a', 'model_b'.*empty dataset")):
with pytest.raises(ValueError, match=("Region.*'model_a'.*empty dataset")):
process(
test_df,
dsd := DataStructureDefinition(TEST_DATA_DIR / "region_processing/dsd"),
Expand Down Expand Up @@ -99,6 +100,8 @@ def test_region_processing_aggregate():
columns=IAMC_IDX + [2005, 2010],
)
)
add_meta(test_df)

exp = IamDataFrame(
pd.DataFrame(
[
Expand All @@ -108,6 +111,7 @@ def test_region_processing_aggregate():
columns=IAMC_IDX + [2005, 2010],
)
)
add_meta(exp)

obs = process(
test_df,
Expand Down Expand Up @@ -142,6 +146,8 @@ def test_region_processing_complete(directory):
columns=IAMC_IDX + [2005, 2010],
)
)
add_meta(test_df)

exp = IamDataFrame(
pd.DataFrame(
[
Expand All @@ -156,6 +162,7 @@ def test_region_processing_complete(directory):
columns=IAMC_IDX + [2005, 2010],
)
)
add_meta(exp)

obs = process(
test_df,
Expand Down Expand Up @@ -217,11 +224,13 @@ def test_region_processing_weighted_aggregation(folder, exp_df, args, caplog):
columns=IAMC_IDX + [2005, 2010],
)
)
add_meta(test_df)

if args is not None:
test_df = test_df.filter(**args)

exp = IamDataFrame(pd.DataFrame(exp_df, columns=IAMC_IDX + [2005, 2010]))
add_meta(exp)

obs = process(
test_df,
Expand Down Expand Up @@ -251,7 +260,7 @@ def test_region_processing_skip_aggregation(model_name, region_names):
# * model "m_a" renames native regions and the world region is skipped
# * model "m_b" renames single constituent common regions

input_df = IamDataFrame(
test_df = IamDataFrame(
pd.DataFrame(
[
[model_name, "s_a", region_names[0], "Primary Energy", "EJ/yr", 1, 2],
Expand All @@ -260,6 +269,8 @@ def test_region_processing_skip_aggregation(model_name, region_names):
columns=IAMC_IDX + [2005, 2010],
)
)
add_meta(test_df)

exp = IamDataFrame(
pd.DataFrame(
[
Expand All @@ -269,9 +280,10 @@ def test_region_processing_skip_aggregation(model_name, region_names):
columns=IAMC_IDX + [2005, 2010],
)
)
add_meta(exp)

obs = process(
input_df,
test_df,
dsd := DataStructureDefinition(
TEST_DATA_DIR / "region_processing/skip_aggregation/dsd"
),
Expand Down Expand Up @@ -351,8 +363,7 @@ def test_region_processing_skip_aggregation(model_name, region_names):
[["m_a", "s_a", "World", "Primary Energy", "EJ/yr", 5, 6]],
[
"Difference between original and aggregated data:",
"m_a s_a World Primary Energy",
"2005 5 4",
"m_a s_a World Primary Energy EJ/yr 2005 5 4",
],
),
( # Conflict between overlapping renamed variable and provided data
Expand All @@ -364,8 +375,7 @@ def test_region_processing_skip_aggregation(model_name, region_names):
[["m_a", "s_a", "World", "Variable B", "EJ/yr", 4, 6]],
[
"Difference between original and aggregated data:",
"m_a s_a World Variable B EJ/yr",
"2005 4 3",
"m_a s_a World Variable B EJ/yr 2005 4 3",
],
),
],
Expand All @@ -381,14 +391,19 @@ def test_partial_aggregation(input_data, exp_data, warning, caplog):
# * Using the region-aggregation attribute to create an additional variable
# * Variable is available in provided and aggregated data but different

test_df = IamDataFrame(pd.DataFrame(input_data, columns=IAMC_IDX + [2005, 2010]))
add_meta(test_df)

obs = process(
IamDataFrame(pd.DataFrame(input_data, columns=IAMC_IDX + [2005, 2010])),
test_df,
dsd := DataStructureDefinition(TEST_DATA_DIR / "region_processing/dsd"),
processor=RegionProcessor.from_directory(
TEST_DATA_DIR / "region_processing/partial_aggregation", dsd
),
)

exp = IamDataFrame(pd.DataFrame(exp_data, columns=IAMC_IDX + [2005, 2010]))
add_meta(exp)

# Assert that we get the expected values
assert_iamframe_equal(obs, exp)
Expand Down