From 666690752a881a24620fe87de1cbae4854773a66 Mon Sep 17 00:00:00 2001 From: Jonas Hoersch Date: Mon, 8 Jul 2024 19:25:21 +0200 Subject: [PATCH 1/8] Add resolver to iamc sub-package --- src/pandas_indexing/iamc/__init__.py | 1 + src/pandas_indexing/iamc/resolver.py | 519 +++++++++++++++++++++++++++ 2 files changed, 520 insertions(+) create mode 100644 src/pandas_indexing/iamc/__init__.py create mode 100644 src/pandas_indexing/iamc/resolver.py diff --git a/src/pandas_indexing/iamc/__init__.py b/src/pandas_indexing/iamc/__init__.py new file mode 100644 index 0000000..bd1cb42 --- /dev/null +++ b/src/pandas_indexing/iamc/__init__.py @@ -0,0 +1 @@ +from .resolver import Resolver diff --git a/src/pandas_indexing/iamc/resolver.py b/src/pandas_indexing/iamc/resolver.py new file mode 100644 index 0000000..2b91a0b --- /dev/null +++ b/src/pandas_indexing/iamc/resolver.py @@ -0,0 +1,519 @@ +from __future__ import annotations + +import operator +from contextlib import contextmanager +from functools import reduce +from itertools import product +from typing import Any, Callable, Sequence, TypeVar + +from attrs import define, evolve +from pandas import DataFrame, Index, MultiIndex, Series + +from .. import arithmetics +from ..core import concat, isin +from ..utils import print_list + + +def _summarize(index: MultiIndex, names: Sequence[str]) -> DataFrame: + """Summarize unique level values grouped by `names` levels. + + Parameters + ---------- + index : MultiIndex + Index to summarize + names : Sequence[str] + Levels to group by values by + + Returns + ------- + DataFrame + Summary frame + """ + return ( + index.to_frame() + .pix.project(names)[index.names.difference(names)] + .groupby(names) + .agg(lambda x: print_list(set(x), n=42)) + ) + + +def maybe_parens(provenance: str) -> str: + if any(op in provenance for op in [" + ", " - ", " * ", " / "]): + return f"({provenance})" + return provenance + + +@define +class Context: + """Context shared between all Vars instances in a Resolver. + + Notes + ----- + Not to be instantiated by the user herself + """ + + level: str + full_index: MultiIndex + columns: Index + index: list[str] + optional_combinations: bool = False + + +SelfVar = TypeVar("SelfVar", bound="Var") + + +@define +class Var: + """Instance for a single variant. + + Attributes + ---------- + data : DataFrame + Calculated data + provenance : str + Formula for how the variant was calculated + + Notes + ----- + User does not interact with individual `Var` instances, instead she only ever holds + `Vars` instances. + """ + + data: DataFrame + provenance: str + + def __repr__(self) -> str: + return f"Var {self.provenance}\n{self.data.pix}" + + @property + def empty(self) -> bool: + return self.data.empty + + def index(self, levels: Sequence[str]) -> MultiIndex: + if not set(levels).issubset(self.data.index.names): + return MultiIndex.from_tuples([], names=levels) + return self.data.pix.unique(levels) + + def _binop( + self, op, x: SelfVar, y: SelfVar, provenance_maker: Callable[[str, str], str] + ) -> SelfVar: + if not all(isinstance(v, Var) for v in (x, y)): + return NotImplemented + provenance = provenance_maker(x.provenance, y.provenance) + return self.__class__(op(x.data, y.data, join="inner"), provenance) + + def __add__(self, other: SelfVar) -> SelfVar: + return self._binop(arithmetics.add, self, other, lambda x, y: f"{x} + {y}") + + def __sub__(self, other: SelfVar) -> SelfVar: + return self._binop( + arithmetics.sub, self, other, lambda x, y: f"{x} - {maybe_parens(y)}" + ) + + def __mul__(self, other: SelfVar) -> SelfVar: + return self._binop( + arithmetics.mul, + self, + other, + lambda x, y: f"{maybe_parens(x)} * {maybe_parens(y)}", + ) + + def __truediv__(self, other: SelfVar) -> SelfVar: + return self._binop( + arithmetics.div, + self, + other, + lambda x, y: f"{maybe_parens(x)} / {maybe_parens(y)}", + ) + + def as_df(self, **assign: str) -> DataFrame: + return self.data.pix.assign(**(assign | dict(provenance=self.provenance))) + + +SelfVars = TypeVar("SelfVars", bound="Vars") +T = TypeVar("T", bound=Index | DataFrame | Series) + + +@define +class Vars: + """`Vars` holds several derivations of a variable from data in a `Resolver` + + Attributes + ---------- + data : list of Var + Disjunct derivations of a single variable + context : Context + Shared context from the resolver + index : MultiIndex for which any derivation has data + + Notes + ----- + `Vars` are created with a Resolver and are to be composed with one another. + + Example + ------- + >>> r = Resolver.from_data(co2emissions, level="sector") + >>> energy = r["Energy"] | (r["Energy|Supply"] + r["Energy|Demand"]) + >>> r["Energy and Industrial Processes"] | (energy + r["Industrial Processes"]) + """ + + data: list[Var] + context: Context + + @classmethod + def from_data( + cls, + data: DataFrame, + value: Any, + *, + context: Context, + provenance: str | None = None, + ) -> SelfVars: + if provenance is None: + provenance = value + data = data.loc[isin(**{context.level: value})].droplevel(context.level) + return cls([Var(data, provenance)] if not data.empty else [], context) + + def __repr__(self) -> str: + index = self.index + incomplete = not self._missing(index).empty + return ( + f"Vars for {len(index)}{'*' if incomplete else ''} scenarios:\n" + + "\n".join( + ( + f"* {v.provenance} (" + f"{len(v.index(self.context.index))}" + f"{'*' if not self._missing(v).empty else ''})" + ) + for v in self.data + ) + + "\n" + ) + + @property + def index(self) -> MultiIndex: + if not self.data: + return MultiIndex.from_tuples([], names=self.context.index) + + return concat(v.index(self.context.index) for v in self.data).unique() + + def __bool__(self) -> bool: + return bool(self.data) + + def __len__(self) -> int: + return len(self.data) + + def __iter__(self): + return iter(self.data) + + def __getitem__(self, i: int) -> SelfVars: + ret = self.data[i] + if isinstance(ret, Var): + ret = [ret] + return self.__class__(ret, self.context) + + class _LocIndexer: + def __init__(self, obj: SelfVars): + self._obj = obj + + def __getitem__(self, x: Any) -> SelfVars: + obj = self._obj + return obj.__class__( + [ + var.__class__(z, var.provenance) + for var in obj.data + if not (z := var.data.loc[x]).empty + ], + obj.context, + ) + + @property + def loc(self) -> SelfVars: + return self._LocIndexer(self) + + @staticmethod + def _antijoin(data: T, vars: SelfVars) -> T: + return reduce(lambda d, v: d.pix.antijoin(v.data.index), vars.data, data) + + def antijoin(self, other: SelfVars) -> SelfVars: + """Remove everything from self that is already in `other` + + Parameters + ---------- + other : SelfVars + Another set of derivations for the same variable + + Returns + ------- + SelfVars + Subset of self that is not already provided by `other` + """ + return self.__class__( + [ + z + for var in self.data + if not (z := Var(self._antijoin(var.data, other), var.provenance)).empty + ], + self.context, + ) + + def _missing(self, partial: bool | Var | MultiIndex = False) -> MultiIndex: + full_index = self.context.full_index + if isinstance(partial, Var): + return full_index.join( + partial.index(self.context.index), how="inner" + ).pix.antijoin(partial.data.index) + + if isinstance(partial, MultiIndex): + full_index = full_index.join(partial, how="inner") + elif partial: + full_index = full_index.join(self.index, how="inner") + return self._antijoin(full_index, self) + + def missing( + self, partial: bool = True, summarize: bool = True + ) -> DataFrame | MultiIndex: + index = self._missing(partial) + return _summarize(index, self.context.index) if summarize else index + + def existing(self, summarize: bool = True) -> DataFrame | MultiIndex: + index = concat(var.data.index for var in self.data) + return _summarize(index, self.context.index) if summarize else index + + def _binop(self, op, x: SelfVars, y: SelfVars) -> SelfVars: + if not all(isinstance(v, Vars) for v in (x, y)): + return NotImplemented + + res = self.__class__( + [z for u, v in product(x, y) if not (z := op(u, v)).empty], self.context + ) + if self.context.optional_combinations: + res = res | x | y + return res + + def __add__(self, other: SelfVars) -> SelfVars: + if other == 0: + return self + return self._binop(operator.add, self, other) + + def __radd__(self, other: SelfVars) -> SelfVars: + if other == 0: + return self + return self._binop(operator.add, other, self) + + def __sub__(self, other: SelfVars) -> SelfVars: + if other == 0: + return self + return self._binop(operator.sub, self, other) + + def __rsub__(self, other: SelfVars) -> SelfVars: + if other == 0: + return self + return self._binop(operator.sub, other, self) + + def __mul__(self, other: SelfVars) -> SelfVars: + if other == 1: + return self + return self._binop(operator.mul, self, other) + + def __rmul__(self, other: SelfVars) -> SelfVars: + if other == 1: + return self + return self._binop(operator.mul, other, self) + + def __or__(self, other: SelfVars | float | int) -> SelfVars: + if isinstance(other, (float, int)): + provenance = str(other) + other_index = self._missing() + if other_index.empty: + return self + other = DataFrame( + other, + index=other_index, + columns=self.context.columns, + ) + return self.__class__(self.data + [Var(other, provenance)], self.context) + + return self ^ other.antijoin(self) + + def __ror__(self, other: SelfVars) -> SelfVars: + return other ^ self.antijoin(other) + + def __xor__(self, other: SelfVars) -> SelfVars: + if not isinstance(other, Vars): + return NotImplemented + return self.__class__(self.data + other.data, self.context) + + def as_df(self, **assign: str) -> DataFrame: + return concat(var.as_df(**assign) for var in self.data) + + +SelfResolver = TypeVar("SelfResolver", bound="Resolver") + + +@define +class Resolver: + """Resolver allows to consolidate variables by composing variants. + + Examples + -------- + >>> co2emissions = ar6.loc[isin(gas="CO2")] + >>> r = Resolver.from_data(co2emissions, "sector", ["AFOLU", "Energy"]) + >>> r["Energy"] |= r["Energy|Demand"] + r["Energy|Supply"] + >>> r["AFOLU"] |= r["AFOLU|Land"] + r["AFOLU|Agriculture"] + >>> r.as_df() + """ + + vars: dict[str, Vars] + data: DataFrame + context: Context # context is shared with all Vars created + external_data: dict[str, DataFrame] + + @classmethod + def from_data( + cls, + data: DataFrame, + level: str, + values: Sequence[str] | None = None, + index: Sequence[str] = ("model", "scenario"), + **external_data: DataFrame, + ) -> SelfResolver: + context = Context( + level, + full_index=data.index.droplevel(level).unique(), + columns=data.columns, + index=list(index), + ) + inst = cls({}, data, context, external_data) + if values is not None: + for value in values: + inst.add(value) + return inst + + def add(self, value: str) -> Vars: + self.vars[value] = vars = Vars.from_data(self.data, value, context=self.context) + return vars + + class _LocIndexer: + def __init__(self, obj: SelfResolver): + self._obj = obj + + def __getitem__(self, x: Any) -> SelfResolver: + obj = self._obj + vars = {name: z for name, vars in obj.vars.items() if (z := vars.loc[x])} + data = obj.data.loc[x] + context = evolve( + obj.context, full_index=data.index.droplevel(obj.context.level).unique() + ) + return obj.__class__(vars, data, context, obj.external_data) + + @property + def loc(self) -> SelfResolver: + return self._LocIndexer(self) + + def __len__(self) -> int: + return len(self.vars) + + def __getitem__(self, value: str) -> Vars: + vars = self.vars.get(value) + if vars is not None: + return vars + + vars = Vars.from_data(self.data, value, context=self.context) + if not vars: + try: + # get variable from additional data + prefix, rem_value = value.split("|", 1) + vars = Vars.from_data( + self.external_data[prefix], + rem_value, + provenance=value, + context=self.context, + ) + except (KeyError, ValueError): + raise KeyError( + f"{value} is not a {self.context.level} in data or external_data" + ) from None + + return vars + + def _ipython_key_completions_(self) -> list[str]: + comps = list(self.vars) + comps.extend(self.data.pix.unique(self.context.level).difference(comps)) + for n, v in self.external_data.items(): + comps.extend(f"{n}|" + v.pix.unique(self.context.level)) + return comps + + @property + def index(self) -> MultiIndex: + if not self.vars: + return MultiIndex.from_tuples([], names=self.context.index) + + return reduce( + MultiIndex.intersection, (vars.index for vars in self.vars.values()) + ) + + def __repr__(self) -> str: + num_scenarios = len(self.data.pix.unique(self.context.index)) + level = self.context.level + + s = ( + f"Resolver with data for {num_scenarios} scenarios, " + f"and {len(self)} defined {level}s for {len(self.index)} scenarios:\n" + ) + for name, vars in self.vars.items(): + s += ( + f"* {name} ({len(vars)}): " + + ", ".join( + str(len(var.data.pix.unique(self.context.index))) + for var in vars.data + ) + + "\n" + ) + + existing_provenances = [ + v.provenance for vars in self.vars.values() for v in vars + ] + + unused_values = ( + self.data.pix.unique([*self.context.index, level]) + .pix.antijoin(Index(self.vars, name=level).union(existing_provenances)) + .pix.project(level) + .value_counts() + .loc[lambda s: s > num_scenarios // 20] + ) + s += f"{len(unused_values)} {level}s for more than 5% of scenarios unused:\n" + for value, num in unused_values.items(): + s += f"* {value} ({num})\n" + return s + + def __setitem__(self, value: str, vars: Vars) -> Vars: + if not isinstance(vars, Vars): + raise TypeError(f"Expected Vars instance, found: {type(vars)}") + self.vars[value] = vars + return vars + + @contextmanager + def optional_combinations(self): + active = self.context.optional_combinations + try: + self.context.optional_combinations = True + yield + finally: + self.context.optional_combinations = active + + def as_df(self, only_consistent: bool = True) -> DataFrame: + if only_consistent: + index = self.index + + def maybe_consistent(df): + return df.pix.semijoin(index, how="right") + else: + + def maybe_consistent(df): + return df + + return concat( + vars.as_df(**{self.context.level: name}).pipe(maybe_consistent) + for name, vars in self.vars.items() + ) From 1d61e8235590e158ea1c07bb67fdcc1c01e52300 Mon Sep 17 00:00:00 2001 From: Jonas Hoersch Date: Fri, 23 Aug 2024 17:16:19 +0200 Subject: [PATCH 2/8] Add typing to resolver --- src/pandas_indexing/iamc/resolver.py | 123 ++++++++++++++------------- 1 file changed, 64 insertions(+), 59 deletions(-) diff --git a/src/pandas_indexing/iamc/resolver.py b/src/pandas_indexing/iamc/resolver.py index 2b91a0b..ae811e5 100644 --- a/src/pandas_indexing/iamc/resolver.py +++ b/src/pandas_indexing/iamc/resolver.py @@ -4,7 +4,7 @@ from contextlib import contextmanager from functools import reduce from itertools import product -from typing import Any, Callable, Sequence, TypeVar +from typing import Any, Callable, Iterator, Sequence, TypeVar from attrs import define, evolve from pandas import DataFrame, Index, MultiIndex, Series @@ -31,7 +31,7 @@ def _summarize(index: MultiIndex, names: Sequence[str]) -> DataFrame: """ return ( index.to_frame() - .pix.project(names)[index.names.difference(names)] + .pix.project(names)[index.names.difference(names)] # type: ignore .groupby(names) .agg(lambda x: print_list(set(x), n=42)) ) @@ -59,9 +59,6 @@ class Context: optional_combinations: bool = False -SelfVar = TypeVar("SelfVar", bound="Var") - - @define class Var: """Instance for a single variant. @@ -82,6 +79,8 @@ class Var: data: DataFrame provenance: str + SV = TypeVar("SV", bound="Var") + def __repr__(self) -> str: return f"Var {self.provenance}\n{self.data.pix}" @@ -92,35 +91,42 @@ def empty(self) -> bool: def index(self, levels: Sequence[str]) -> MultiIndex: if not set(levels).issubset(self.data.index.names): return MultiIndex.from_tuples([], names=levels) - return self.data.pix.unique(levels) + return self.data.pix.unique(levels) # type: ignore def _binop( - self, op, x: SelfVar, y: SelfVar, provenance_maker: Callable[[str, str], str] - ) -> SelfVar: + self: SV, + op, + x: SV, + y: SV, + provenance_maker: Callable[[str, str], str], + ) -> SV: if not all(isinstance(v, Var) for v in (x, y)): return NotImplemented provenance = provenance_maker(x.provenance, y.provenance) return self.__class__(op(x.data, y.data, join="inner"), provenance) - def __add__(self, other: SelfVar) -> SelfVar: - return self._binop(arithmetics.add, self, other, lambda x, y: f"{x} + {y}") + def __add__(self: SV, other: SV) -> SV: + return self._binop(arithmetics.add, self, other, lambda x, y: f"{x} + {y}") # type: ignore - def __sub__(self, other: SelfVar) -> SelfVar: + def __sub__(self: SV, other: SV) -> SV: return self._binop( - arithmetics.sub, self, other, lambda x, y: f"{x} - {maybe_parens(y)}" + arithmetics.sub, # type: ignore + self, + other, + lambda x, y: f"{x} - {maybe_parens(y)}", ) - def __mul__(self, other: SelfVar) -> SelfVar: + def __mul__(self: SV, other: SV) -> SV: return self._binop( - arithmetics.mul, + arithmetics.mul, # type: ignore self, other, lambda x, y: f"{maybe_parens(x)} * {maybe_parens(y)}", ) - def __truediv__(self, other: SelfVar) -> SelfVar: + def __truediv__(self: SV, other: SV) -> SV: return self._binop( - arithmetics.div, + arithmetics.div, # type: ignore self, other, lambda x, y: f"{maybe_parens(x)} / {maybe_parens(y)}", @@ -130,10 +136,6 @@ def as_df(self, **assign: str) -> DataFrame: return self.data.pix.assign(**(assign | dict(provenance=self.provenance))) -SelfVars = TypeVar("SelfVars", bound="Vars") -T = TypeVar("T", bound=Index | DataFrame | Series) - - @define class Vars: """`Vars` holds several derivations of a variable from data in a `Resolver` @@ -160,18 +162,21 @@ class Vars: data: list[Var] context: Context + SV = TypeVar("SV", bound="Vars") + T = TypeVar("T", bound=Index | DataFrame | Series) + @classmethod def from_data( - cls, + cls: type[SV], data: DataFrame, - value: Any, + value: str, *, context: Context, provenance: str | None = None, - ) -> SelfVars: + ) -> SV: if provenance is None: provenance = value - data = data.loc[isin(**{context.level: value})].droplevel(context.level) + data = data.loc[isin(**{context.level: value})].droplevel(context.level) # type: ignore return cls([Var(data, provenance)] if not data.empty else [], context) def __repr__(self) -> str: @@ -203,20 +208,20 @@ def __bool__(self) -> bool: def __len__(self) -> int: return len(self.data) - def __iter__(self): + def __iter__(self) -> Iterator[Var]: return iter(self.data) - def __getitem__(self, i: int) -> SelfVars: + def __getitem__(self: SV, i: int) -> SV: ret = self.data[i] if isinstance(ret, Var): ret = [ret] return self.__class__(ret, self.context) class _LocIndexer: - def __init__(self, obj: SelfVars): + def __init__(self, obj): self._obj = obj - def __getitem__(self, x: Any) -> SelfVars: + def __getitem__(self, x: Any) -> Vars: obj = self._obj return obj.__class__( [ @@ -228,24 +233,24 @@ def __getitem__(self, x: Any) -> SelfVars: ) @property - def loc(self) -> SelfVars: + def loc(self) -> _LocIndexer: return self._LocIndexer(self) @staticmethod - def _antijoin(data: T, vars: SelfVars) -> T: - return reduce(lambda d, v: d.pix.antijoin(v.data.index), vars.data, data) + def _antijoin(data: T, vars: Vars) -> T: + return reduce(lambda d, v: d.pix.antijoin(v.data.index), vars.data, data) # type: ignore - def antijoin(self, other: SelfVars) -> SelfVars: + def antijoin(self: SV, other: SV) -> SV: """Remove everything from self that is already in `other` Parameters ---------- - other : SelfVars + other : Vars Another set of derivations for the same variable Returns ------- - SelfVars + Vars Subset of self that is not already provided by `other` """ return self.__class__( @@ -278,9 +283,10 @@ def missing( def existing(self, summarize: bool = True) -> DataFrame | MultiIndex: index = concat(var.data.index for var in self.data) + assert isinstance(index, MultiIndex) return _summarize(index, self.context.index) if summarize else index - def _binop(self, op, x: SelfVars, y: SelfVars) -> SelfVars: + def _binop(self: SV, op: Callable[[Var, Var], Var], x: SV, y: SV) -> SV: if not all(isinstance(v, Vars) for v in (x, y)): return NotImplemented @@ -291,37 +297,37 @@ def _binop(self, op, x: SelfVars, y: SelfVars) -> SelfVars: res = res | x | y return res - def __add__(self, other: SelfVars) -> SelfVars: + def __add__(self: SV, other: SV) -> SV: if other == 0: return self return self._binop(operator.add, self, other) - def __radd__(self, other: SelfVars) -> SelfVars: + def __radd__(self: SV, other: SV) -> SV: if other == 0: return self return self._binop(operator.add, other, self) - def __sub__(self, other: SelfVars) -> SelfVars: + def __sub__(self: SV, other: SV) -> SV: if other == 0: return self return self._binop(operator.sub, self, other) - def __rsub__(self, other: SelfVars) -> SelfVars: + def __rsub__(self: SV, other: SV) -> SV: if other == 0: return self return self._binop(operator.sub, other, self) - def __mul__(self, other: SelfVars) -> SelfVars: + def __mul__(self: SV, other: SV) -> SV: if other == 1: return self return self._binop(operator.mul, self, other) - def __rmul__(self, other: SelfVars) -> SelfVars: + def __rmul__(self: SV, other: SV) -> SV: if other == 1: return self return self._binop(operator.mul, other, self) - def __or__(self, other: SelfVars | float | int) -> SelfVars: + def __or__(self: SV, other: SV | float | int) -> SV: if isinstance(other, (float, int)): provenance = str(other) other_index = self._missing() @@ -336,10 +342,10 @@ def __or__(self, other: SelfVars | float | int) -> SelfVars: return self ^ other.antijoin(self) - def __ror__(self, other: SelfVars) -> SelfVars: + def __ror__(self: SV, other: SV) -> SV: return other ^ self.antijoin(other) - def __xor__(self, other: SelfVars) -> SelfVars: + def __xor__(self: SV, other: SV) -> SV: if not isinstance(other, Vars): return NotImplemented return self.__class__(self.data + other.data, self.context) @@ -348,15 +354,12 @@ def as_df(self, **assign: str) -> DataFrame: return concat(var.as_df(**assign) for var in self.data) -SelfResolver = TypeVar("SelfResolver", bound="Resolver") - - @define class Resolver: """Resolver allows to consolidate variables by composing variants. - Examples - -------- + Usage + ----- >>> co2emissions = ar6.loc[isin(gas="CO2")] >>> r = Resolver.from_data(co2emissions, "sector", ["AFOLU", "Energy"]) >>> r["Energy"] |= r["Energy|Demand"] + r["Energy|Supply"] @@ -369,15 +372,17 @@ class Resolver: context: Context # context is shared with all Vars created external_data: dict[str, DataFrame] + SR = TypeVar("SR", bound="Resolver") + @classmethod def from_data( - cls, + cls: type[SR], data: DataFrame, level: str, values: Sequence[str] | None = None, index: Sequence[str] = ("model", "scenario"), **external_data: DataFrame, - ) -> SelfResolver: + ) -> SR: context = Context( level, full_index=data.index.droplevel(level).unique(), @@ -395,10 +400,10 @@ def add(self, value: str) -> Vars: return vars class _LocIndexer: - def __init__(self, obj: SelfResolver): + def __init__(self, obj: Resolver): self._obj = obj - def __getitem__(self, x: Any) -> SelfResolver: + def __getitem__(self, x: Any) -> Resolver: obj = self._obj vars = {name: z for name, vars in obj.vars.items() if (z := vars.loc[x])} data = obj.data.loc[x] @@ -408,7 +413,7 @@ def __getitem__(self, x: Any) -> SelfResolver: return obj.__class__(vars, data, context, obj.external_data) @property - def loc(self) -> SelfResolver: + def loc(self) -> _LocIndexer: return self._LocIndexer(self) def __len__(self) -> int: @@ -439,9 +444,9 @@ def __getitem__(self, value: str) -> Vars: def _ipython_key_completions_(self) -> list[str]: comps = list(self.vars) - comps.extend(self.data.pix.unique(self.context.level).difference(comps)) + comps.extend(self.data.pix.unique(self.context.level).difference(comps)) # type: ignore for n, v in self.external_data.items(): - comps.extend(f"{n}|" + v.pix.unique(self.context.level)) + comps.extend(f"{n}|" + v.pix.unique(self.context.level)) # type: ignore return comps @property @@ -454,7 +459,7 @@ def index(self) -> MultiIndex: ) def __repr__(self) -> str: - num_scenarios = len(self.data.pix.unique(self.context.index)) + num_scenarios = len(self.data.pix.unique(self.context.index)) # type: ignore level = self.context.level s = ( @@ -465,7 +470,7 @@ def __repr__(self) -> str: s += ( f"* {name} ({len(vars)}): " + ", ".join( - str(len(var.data.pix.unique(self.context.index))) + str(len(var.data.pix.unique(self.context.index))) # type: ignore for var in vars.data ) + "\n" @@ -476,7 +481,7 @@ def __repr__(self) -> str: ] unused_values = ( - self.data.pix.unique([*self.context.index, level]) + self.data.pix.unique([*self.context.index, level]) # type: ignore .pix.antijoin(Index(self.vars, name=level).union(existing_provenances)) .pix.project(level) .value_counts() From 6edd4878caaead24542fc04536410486459d05fd Mon Sep 17 00:00:00 2001 From: Jonas Hoersch Date: Thu, 10 Oct 2024 11:38:47 +0200 Subject: [PATCH 3/8] feat (iamc.resolver): Add iamc_aggregate --- src/pandas_indexing/iamc/resolver.py | 60 ++++++++++++++++++++++++++-- 1 file changed, 56 insertions(+), 4 deletions(-) diff --git a/src/pandas_indexing/iamc/resolver.py b/src/pandas_indexing/iamc/resolver.py index ae811e5..84a2e35 100644 --- a/src/pandas_indexing/iamc/resolver.py +++ b/src/pandas_indexing/iamc/resolver.py @@ -1,6 +1,7 @@ from __future__ import annotations import operator +import re from contextlib import contextmanager from functools import reduce from itertools import product @@ -10,8 +11,9 @@ from pandas import DataFrame, Index, MultiIndex, Series from .. import arithmetics -from ..core import concat, isin -from ..utils import print_list +from ..core import concat +from ..selectors import isin, ismatch +from ..utils import print_list, shell_pattern_to_regex def _summarize(index: MultiIndex, names: Sequence[str]) -> DataFrame: @@ -395,8 +397,11 @@ def from_data( inst.add(value) return inst - def add(self, value: str) -> Vars: - self.vars[value] = vars = Vars.from_data(self.data, value, context=self.context) + def add(self, value: str, iamc_aggregate: bool = False) -> Vars: + vars = Vars.from_data(self.data, value, context=self.context) + if iamc_aggregate: + vars = vars | self.iamc_aggregate(value) + self.vars[value] = vars return vars class _LocIndexer: @@ -498,6 +503,53 @@ def __setitem__(self, value: str, vars: Vars) -> Vars: self.vars[value] = vars return vars + def iamc_aggregate(self, value: str) -> Vars: + pattern = f"{value}|*" + overwritten_variables = { + name: var + for name, var in self.vars.items() + if ( + re.match(shell_pattern_to_regex(pattern), name) + and (var.data[0].provenance != name or len(var.data) > 1) + ) + } + + data = ( + concat( + [ + self.data.loc[ + ismatch(**{self.context.level: pattern}) + ].pix.antijoin( + Index(overwritten_variables, name=self.context.level) + ), + *( + v.as_df(**{self.context.level: n}).droplevel("provenance") + for n, v in overwritten_variables.items() + ), + ] + ) + .groupby(self.data.index.names.difference([self.context.level])) + .sum() + ) + if data.empty: + return Vars([], self.context) + + conditions = ( + ( + " with special " + + ", ".join(name.removeprefix(value) for name in overwritten_variables) + ) + if overwritten_variables + else "" + ) + provenance = f"sum({pattern}{conditions})" + + return Vars([Var(data, provenance)], self.context) + + def add_iamc_aggregate(self, value: str) -> Resolver: + self[value] |= self.iamc_aggregate(value) + return self[value] + @contextmanager def optional_combinations(self): active = self.context.optional_combinations From 85ec3dbba2cbeb4febf607bd248112784dbfe562 Mon Sep 17 00:00:00 2001 From: Jonas Hoersch Date: Thu, 10 Oct 2024 16:39:32 +0200 Subject: [PATCH 4/8] enh (iamc.resolver): Allow __getitem__ to return empty Vars --- src/pandas_indexing/iamc/resolver.py | 76 +++++++++++++++------------- 1 file changed, 41 insertions(+), 35 deletions(-) diff --git a/src/pandas_indexing/iamc/resolver.py b/src/pandas_indexing/iamc/resolver.py index 84a2e35..ca26634 100644 --- a/src/pandas_indexing/iamc/resolver.py +++ b/src/pandas_indexing/iamc/resolver.py @@ -184,22 +184,30 @@ def from_data( def __repr__(self) -> str: index = self.index incomplete = not self._missing(index).empty + return ( - f"Vars for {len(index)}{'*' if incomplete else ''} scenarios:\n" - + "\n".join( - ( - f"* {v.provenance} (" - f"{len(v.index(self.context.index))}" - f"{'*' if not self._missing(v).empty else ''})" + ( + f"Vars for {len(index)}{'*' if incomplete else ''} scenarios:\n" + + "\n".join( + ( + f"* {v.provenance} (" + f"{len(v.index(self.context.index))}" + f"{'*' if not self._missing(v).empty else ''})" + ) + for v in self.data ) - for v in self.data ) - + "\n" + if self.data + else "Vars empty" ) + @property + def empty(self) -> bool: + return not self + @property def index(self) -> MultiIndex: - if not self.data: + if self.empty: return MultiIndex.from_tuples([], names=self.context.index) return concat(v.index(self.context.index) for v in self.data).unique() @@ -397,12 +405,12 @@ def from_data( inst.add(value) return inst - def add(self, value: str, iamc_aggregate: bool = False) -> Vars: + def add(self: SR, value: str, iamc_aggregate: bool = False) -> SR: vars = Vars.from_data(self.data, value, context=self.context) if iamc_aggregate: vars = vars | self.iamc_aggregate(value) self.vars[value] = vars - return vars + return self class _LocIndexer: def __init__(self, obj: Resolver): @@ -441,9 +449,7 @@ def __getitem__(self, value: str) -> Vars: context=self.context, ) except (KeyError, ValueError): - raise KeyError( - f"{value} is not a {self.context.level} in data or external_data" - ) from None + vars = Vars([], context=self.context) return vars @@ -464,27 +470,26 @@ def index(self) -> MultiIndex: ) def __repr__(self) -> str: + lines = [] num_scenarios = len(self.data.pix.unique(self.context.index)) # type: ignore level = self.context.level - s = ( + lines.append( f"Resolver with data for {num_scenarios} scenarios, " - f"and {len(self)} defined {level}s for {len(self.index)} scenarios:\n" + f"and {len(self)} defined {level}s for {len(self.index)} scenarios:" ) - for name, vars in self.vars.items(): - s += ( - f"* {name} ({len(vars)}): " - + ", ".join( - str(len(var.data.pix.unique(self.context.index))) # type: ignore - for var in vars.data - ) - + "\n" + lines.extend( + f"* {name} ({len(vars)}): " + + ", ".join( + str(len(var.data.pix.unique(self.context.index))) # type: ignore + for var in vars.data ) + for name, vars in self.vars.items() + ) - existing_provenances = [ + existing_provenances = set( v.provenance for vars in self.vars.values() for v in vars - ] - + ) unused_values = ( self.data.pix.unique([*self.context.index, level]) # type: ignore .pix.antijoin(Index(self.vars, name=level).union(existing_provenances)) @@ -492,10 +497,11 @@ def __repr__(self) -> str: .value_counts() .loc[lambda s: s > num_scenarios // 20] ) - s += f"{len(unused_values)} {level}s for more than 5% of scenarios unused:\n" - for value, num in unused_values.items(): - s += f"* {value} ({num})\n" - return s + lines.append( + f"{len(unused_values)} {level}s for more than 5% of scenarios unused:" + ) + lines.extend(f"* {value} ({num})" for value, num in unused_values.items()) + return "\n".join(lines) def __setitem__(self, value: str, vars: Vars) -> Vars: if not isinstance(vars, Vars): @@ -503,11 +509,11 @@ def __setitem__(self, value: str, vars: Vars) -> Vars: self.vars[value] = vars return vars - def iamc_aggregate(self, value: str) -> Vars: + def iamc_aggregate(self, value: str, **overwrites) -> Vars: pattern = f"{value}|*" overwritten_variables = { name: var - for name, var in self.vars.items() + for name, var in (self.vars | overwrites).items() if ( re.match(shell_pattern_to_regex(pattern), name) and (var.data[0].provenance != name or len(var.data) > 1) @@ -546,9 +552,9 @@ def iamc_aggregate(self, value: str) -> Vars: return Vars([Var(data, provenance)], self.context) - def add_iamc_aggregate(self, value: str) -> Resolver: + def add_iamc_aggregate(self: SR, value: str) -> SR: self[value] |= self.iamc_aggregate(value) - return self[value] + return self @contextmanager def optional_combinations(self): From b5519ed2fe05a0bb695f64520af94d8733e6066f Mon Sep 17 00:00:00 2001 From: Jonas Hoersch Date: Tue, 15 Oct 2024 14:35:18 +0200 Subject: [PATCH 5/8] feat(iamc.resolver): Support stacking resolvers --- src/pandas_indexing/iamc/resolver.py | 35 +++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/src/pandas_indexing/iamc/resolver.py b/src/pandas_indexing/iamc/resolver.py index ca26634..c53fb4c 100644 --- a/src/pandas_indexing/iamc/resolver.py +++ b/src/pandas_indexing/iamc/resolver.py @@ -134,6 +134,9 @@ def __truediv__(self: SV, other: SV) -> SV: lambda x, y: f"{maybe_parens(x)} / {maybe_parens(y)}", ) + def __neg__(self: SV) -> SV: + return self.__class__(-self.data, "- " + maybe_parens(self.provenance)) + def as_df(self, **assign: str) -> DataFrame: return self.data.pix.assign(**(assign | dict(provenance=self.provenance))) @@ -181,6 +184,17 @@ def from_data( data = data.loc[isin(**{context.level: value})].droplevel(context.level) # type: ignore return cls([Var(data, provenance)] if not data.empty else [], context) + @classmethod + def from_additionalresolver( + cls: type[SV], + vars: SV, + prefix: str, + *, + context: Context, + ) -> SV: + data = [evolve(var, provenance=f"{prefix}({var.provenance})") for var in vars] + return cls(data, context) + def __repr__(self) -> str: index = self.index incomplete = not self._missing(index).empty @@ -337,6 +351,9 @@ def __rmul__(self: SV, other: SV) -> SV: return self return self._binop(operator.mul, other, self) + def __neg__(self: SV) -> SV: + return self.__class__([-v for v in self], self.context) + def __or__(self: SV, other: SV | float | int) -> SV: if isinstance(other, (float, int)): provenance = str(other) @@ -442,12 +459,18 @@ def __getitem__(self, value: str) -> Vars: try: # get variable from additional data prefix, rem_value = value.split("|", 1) - vars = Vars.from_data( - self.external_data[prefix], - rem_value, - provenance=value, - context=self.context, - ) + data = self.external_data[prefix] + if isinstance(data, Resolver): + vars = Vars.from_additionalresolver( + data[rem_value], prefix=prefix, context=self.context + ) + else: + vars = Vars.from_data( + self.external_data[prefix], + rem_value, + provenance=value, + context=self.context, + ) except (KeyError, ValueError): vars = Vars([], context=self.context) From 8f702ff779e181fec5b1a3375698962e1f103f2f Mon Sep 17 00:00:00 2001 From: Jonas Hoersch Date: Tue, 15 Oct 2024 15:06:29 +0200 Subject: [PATCH 6/8] enh(iamc.resolver): Improve Vars representation --- src/pandas_indexing/iamc/resolver.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/src/pandas_indexing/iamc/resolver.py b/src/pandas_indexing/iamc/resolver.py index c53fb4c..054fc08 100644 --- a/src/pandas_indexing/iamc/resolver.py +++ b/src/pandas_indexing/iamc/resolver.py @@ -199,22 +199,25 @@ def __repr__(self) -> str: index = self.index incomplete = not self._missing(index).empty - return ( - ( - f"Vars for {len(index)}{'*' if incomplete else ''} scenarios:\n" - + "\n".join( - ( - f"* {v.provenance} (" - f"{len(v.index(self.context.index))}" - f"{'*' if not self._missing(v).empty else ''})" - ) - for v in self.data + if self.empty: + return "Vars empty" + + s = ( + f"Vars for {len(index)}{'*' if incomplete else ''} scenarios:\n" + + "\n".join( + ( + f"* {v.provenance} (" + f"{len(v.index(self.context.index))}" + f"{'*' if not self._missing(v).empty else ''})" ) + for v in self.data ) - if self.data - else "Vars empty" ) + if len(self) == 1: + s += f"\n\nDetails (since only a single variant):\n{self.data[0]}" + return s + @property def empty(self) -> bool: return not self From 87e3673070c181faafdc9e3822fa60539c406b99 Mon Sep 17 00:00:00 2001 From: Jonas Hoersch Date: Thu, 24 Oct 2024 12:01:11 +0200 Subject: [PATCH 7/8] fix(iamc.resolver): Share optional_combinations trigger into subsets --- src/pandas_indexing/iamc/resolver.py | 36 ++++++++++++++++++---------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/src/pandas_indexing/iamc/resolver.py b/src/pandas_indexing/iamc/resolver.py index 054fc08..b4c1b79 100644 --- a/src/pandas_indexing/iamc/resolver.py +++ b/src/pandas_indexing/iamc/resolver.py @@ -2,12 +2,11 @@ import operator import re -from contextlib import contextmanager from functools import reduce from itertools import product from typing import Any, Callable, Iterator, Sequence, TypeVar -from attrs import define, evolve +from attrs import define, evolve, field from pandas import DataFrame, Index, MultiIndex, Series from .. import arithmetics @@ -45,6 +44,22 @@ def maybe_parens(provenance: str) -> str: return provenance +@define +class SharedTrigger: + active: int = field(default=0, converter=int) + + def __enter__(self): + self.active += 1 + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.active -= 1 + return False + + def __bool__(self): + return bool(self.active) + + @define class Context: """Context shared between all Vars instances in a Resolver. @@ -58,7 +73,7 @@ class Context: full_index: MultiIndex columns: Index index: list[str] - optional_combinations: bool = False + optional_combinations: SharedTrigger = field(factory=SharedTrigger) @define @@ -193,7 +208,7 @@ def from_additionalresolver( context: Context, ) -> SV: data = [evolve(var, provenance=f"{prefix}({var.provenance})") for var in vars] - return cls(data, context) + return cls(data, evolve(context, full_index=vars.context.full_index)) def __repr__(self) -> str: index = self.index @@ -486,6 +501,10 @@ def _ipython_key_completions_(self) -> list[str]: comps.extend(f"{n}|" + v.pix.unique(self.context.level)) # type: ignore return comps + @property + def optional_combinations(self): + return self.context.optional_combinations + @property def index(self) -> MultiIndex: if not self.vars: @@ -582,15 +601,6 @@ def add_iamc_aggregate(self: SR, value: str) -> SR: self[value] |= self.iamc_aggregate(value) return self - @contextmanager - def optional_combinations(self): - active = self.context.optional_combinations - try: - self.context.optional_combinations = True - yield - finally: - self.context.optional_combinations = active - def as_df(self, only_consistent: bool = True) -> DataFrame: if only_consistent: index = self.index From d6a5cd2701553adb993d116b4a1e50a732d8b1ad Mon Sep 17 00:00:00 2001 From: Jonas Hoersch Date: Thu, 24 Oct 2024 16:42:48 +0200 Subject: [PATCH 8/8] Update CHANGELOG --- CHANGELOG.rst | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index a5d081f..5ed0e08 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -3,6 +3,11 @@ Changelog ========= +v0.6.0 (2024-10-24) +------------------------------------------------------------ +* Add :mod:`~iamc.resolver` module with the class :class:`~iamc.Resolver` that + supports consolidating IAMC-style scenario data with non-homogeneous variable + coverage. Documentation is unfortunately still missing. * Add support for so-called optional patterns to :func:`~core.extractlevel` and :func:`~core.formatlevel`, for instance: ``df.pix.extract(variable="Emissions|{gas}|{sector}", optional=["sector"])``