Skip to content

Commit

Permalink
Expose power distribution results from PVPool and EVChargerPool
Browse files Browse the repository at this point in the history
Signed-off-by: Sahas Subramanian <[email protected]>
  • Loading branch information
shsms committed Jul 2, 2024
1 parent 2e255ad commit ddf92b0
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 7 deletions.
6 changes: 6 additions & 0 deletions src/frequenz/sdk/microgrid/_data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ def new_ev_charger_pool(
power_manager_bounds_subs_sender=(
self._ev_power_wrapper.bounds_subscription_channel.new_sender()
),
power_distribution_results_fetcher=(
self._ev_power_wrapper.distribution_results_fetcher()
),
component_ids=component_ids,
)
)
Expand Down Expand Up @@ -334,6 +337,9 @@ def new_pv_pool(
power_manager_bounds_subs_sender=(
self._pv_power_wrapper.bounds_subscription_channel.new_sender()
),
power_distribution_results_fetcher=(
self._pv_power_wrapper.distribution_results_fetcher()
),
component_ids=component_ids,
)

Expand Down
19 changes: 17 additions & 2 deletions src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from collections import abc
from datetime import timedelta

from ..._internal._channels import ReceiverFetcher
from ...actor import _power_managing
from ..._internal._channels import ReceiverFetcher, ReceiverFetcherWith
from ...actor import _power_managing, power_distributing
from ...timeseries import Bounds
from .._base_types import SystemBounds
from .._quantities import Current, Power
Expand Down Expand Up @@ -231,6 +231,21 @@ def power_status(self) -> ReceiverFetcher[EVChargerPoolReport]:

return channel

@property
def power_distribution_results(self) -> ReceiverFetcher[power_distributing.Result]:
"""Get a receiver to receive power distribution results.
Returns:
A receiver that will stream power distribution results for the pool's set of
EV chargers.
"""
return ReceiverFetcherWith(
self._pool_ref_store.power_distribution_results_fetcher,
lambda recv: recv.filter(
lambda x: x.request.component_ids == self._pool_ref_store.component_ids
),
)

async def stop(self) -> None:
"""Stop all tasks and channels owned by the EVChargerPool."""
await self._pool_ref_store.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@

"""Manages shared state/tasks for a set of EV chargers."""


import asyncio
import uuid
from collections import abc

from frequenz.channels import Broadcast, Receiver, Sender
from frequenz.client.microgrid import ComponentCategory

from ..._internal._channels import ReceiverFetcher
from ...actor import ChannelRegistry, ComponentMetricRequest
from ...actor._power_managing._base_classes import Proposal, ReportRequest
from ...actor.power_distributing import ComponentPoolStatus
from ...actor.power_distributing import ComponentPoolStatus, Result
from ...microgrid import connection_manager
from .._base_types import SystemBounds
from ..formula_engine._formula_engine_pool import FormulaEnginePool
Expand All @@ -40,6 +40,7 @@ def __init__( # pylint: disable=too-many-arguments
status_receiver: Receiver[ComponentPoolStatus],
power_manager_requests_sender: Sender[Proposal],
power_manager_bounds_subs_sender: Sender[ReportRequest],
power_distribution_results_fetcher: ReceiverFetcher[Result],
component_ids: abc.Set[int] | None = None,
):
"""Create an instance of the class.
Expand All @@ -55,6 +56,8 @@ def __init__( # pylint: disable=too-many-arguments
requests to the power managing actor.
power_manager_bounds_subs_sender: A Channel sender for sending power bounds
subscription requests to the power managing actor.
power_distribution_results_fetcher: A ReceiverFetcher for the results from
the power distributing actor.
component_ids: An optional list of component_ids belonging to this pool. If
not specified, IDs of all EV Chargers in the microgrid will be fetched
from the component graph.
Expand All @@ -64,6 +67,7 @@ def __init__( # pylint: disable=too-many-arguments
self.status_receiver = status_receiver
self.power_manager_requests_sender = power_manager_requests_sender
self.power_manager_bounds_subs_sender = power_manager_bounds_subs_sender
self.power_distribution_results_fetcher = power_distribution_results_fetcher

if component_ids is not None:
self.component_ids: frozenset[int] = frozenset(component_ids)
Expand Down
19 changes: 17 additions & 2 deletions src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from collections import abc
from datetime import timedelta

from ..._internal._channels import ReceiverFetcher
from ...actor import _power_managing
from ..._internal._channels import ReceiverFetcher, ReceiverFetcherWith
from ...actor import _power_managing, power_distributing
from ...timeseries import Bounds
from .._base_types import SystemBounds
from .._quantities import Power
Expand Down Expand Up @@ -190,6 +190,21 @@ def power_status(self) -> ReceiverFetcher[PVPoolReport]:

return channel

@property
def power_distribution_results(self) -> ReceiverFetcher[power_distributing.Result]:
"""Get a receiver to receive power distribution results.
Returns:
A receiver that will stream power distribution results for the pool's set of
PV inverters.
"""
return ReceiverFetcherWith(
self._pool_ref_store.power_distribution_results_fetcher,
lambda recv: recv.filter(
lambda x: x.request.component_ids == self._pool_ref_store.component_ids
),
)

async def stop(self) -> None:
"""Stop all tasks and channels owned by the PVPool."""
await self._pool_ref_store.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
from frequenz.channels import Broadcast, Receiver, Sender
from frequenz.client.microgrid import ComponentCategory, InverterType

from ..._internal._channels import ReceiverFetcher
from ...actor import ChannelRegistry, ComponentMetricRequest
from ...actor._power_managing._base_classes import Proposal, ReportRequest
from ...actor.power_distributing import ComponentPoolStatus
from ...actor.power_distributing import ComponentPoolStatus, Result
from ...microgrid import connection_manager
from .._base_types import SystemBounds
from ..formula_engine._formula_engine_pool import FormulaEnginePool
Expand All @@ -40,6 +41,7 @@ def __init__( # pylint: disable=too-many-arguments
status_receiver: Receiver[ComponentPoolStatus],
power_manager_requests_sender: Sender[Proposal],
power_manager_bounds_subs_sender: Sender[ReportRequest],
power_distribution_results_fetcher: ReceiverFetcher[Result],
component_ids: abc.Set[int] | None = None,
):
"""Initialize this instance.
Expand All @@ -55,6 +57,8 @@ def __init__( # pylint: disable=too-many-arguments
requests to the power managing actor.
power_manager_bounds_subs_sender: A Channel sender for sending power bounds
subscription requests to the power managing actor.
power_distribution_results_fetcher: A ReceiverFetcher for the results from
the power distributing actor.
component_ids: An optional list of component_ids belonging to this pool. If
not specified, IDs of all PV inverters in the microgrid will be fetched
from the component graph.
Expand All @@ -64,6 +68,7 @@ def __init__( # pylint: disable=too-many-arguments
self.status_receiver = status_receiver
self.power_manager_requests_sender = power_manager_requests_sender
self.power_manager_bounds_subs_sender = power_manager_bounds_subs_sender
self.power_distribution_results_fetcher = power_distribution_results_fetcher

if component_ids is not None:
self.component_ids: frozenset[int] = frozenset(component_ids)
Expand Down

0 comments on commit ddf92b0

Please sign in to comment.