Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for negative prices and energy delivery #31

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 77 additions & 40 deletions custom_components/dynamic_energy_cost/energy_based_sensors.py
Pluimvee marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ def __init__(self, hass, energy_sensor_id, price_sensor_id, interval):
self.hass = hass
self._energy_sensor_id = energy_sensor_id
self._price_sensor_id = price_sensor_id
self._state = None
self._unit_of_measurement = 'EUR' # Default to EUR, will update after entity addition
self._state = 0 # intermediate energy costs
self._unit_of_measurement = 'EUR' # Default to EUR, will update after entity addition
self._interval = interval
self._last_energy_reading = None
self._cumulative_energy_kwh = 0
self._last_energy_reading = None # updated on first energy change and any price change events
self._cumulative_energy = 0
self._cumulative_cost = 0 # updated on price change events and used for more precise cost calculations
self._last_reset_time = now()
self.schedule_next_reset()
_LOGGER.debug("Sensor initialized with energy sensor ID %s and price sensor ID %s.", energy_sensor_id, price_sensor_id)
Expand Down Expand Up @@ -54,7 +55,8 @@ def async_reset(self):
"""Reset the energy cost and cumulative energy kWh."""
_LOGGER.debug(f"Resetting cost for {self.entity_id}")
self._state = 0
self._cumulative_energy_kwh = 0
self._cumulative_energy = 0
self._cumulative_cost = 0
self.async_write_ha_state()

@property
Expand Down Expand Up @@ -100,11 +102,14 @@ def icon(self):
def extra_state_attributes(self):
"""Return the state attributes of the device."""
attrs = super().extra_state_attributes or {} # Ensure it's a dict
attrs['cumulative_energy_kwh'] = self._cumulative_energy_kwh
attrs['cumulative_energy'] = self._cumulative_energy
attrs['last_energy_reading'] = self._last_energy_reading
attrs['average_energy_cost'] = self._state / self._cumulative_energy_kwh if self._cumulative_energy_kwh else 0
attrs['cumulative_cost'] = self._cumulative_cost
if self._cumulative_energy > 0:
attrs['average_energy_cost'] = self._cumulative_cost / self._cumulative_energy
return attrs

# -----------------------------------------------------------------------------------------------
async def async_added_to_hass(self):
"""Load the last known state and subscribe to updates."""
await super().async_added_to_hass()
Expand All @@ -113,83 +118,115 @@ async def async_added_to_hass(self):
if last_state and last_state.state not in ['unknown', 'unavailable', None]:
self._state = float(last_state.state)
self._last_energy_reading = float(last_state.attributes.get('last_energy_reading'))
self._cumulative_energy_kwh = float(last_state.attributes.get('cumulative_energy_kwh'))
if last_state.attributes.get('cumulative_energy') is not None:
self._cumulative_energy = float(last_state.attributes.get('cumulative_energy'))
if last_state.attributes.get('cumulative_cost') is not None:
self._cumulative_cost = float(last_state.attributes.get('cumulative_cost'))
self.async_write_ha_state()
async_track_state_change_event(self.hass, self._energy_sensor_id, self._async_update_energy_price_event)
async_track_state_change_event(self.hass, self._energy_sensor_id, self._async_update_energy_event)
async_track_state_change_event(self.hass, self._price_sensor_id, self._async_update_price_event)
self.schedule_next_reset()

# -----------------------------------------------------------------------------------------------
def get_currency(self):
"""Extract the currency from the unit of measurement of the price sensor."""
price_entity = self.hass.states.get(self._price_sensor_id)
if price_entity and price_entity.attributes.get('unit_of_measurement'):
currency = price_entity.attributes['unit_of_measurement'].split('/')[0].strip()
if (currency == '€'):
currency = 'EUR'
_LOGGER.debug(f"Extracted currency '{currency}' from unit of measurement '{price_entity.attributes['unit_of_measurement']}'.")
return currency
else:
_LOGGER.warning(f"Unit of measurement not available or invalid for sensor {self._price_sensor_id}, defaulting to 'EUR'.")
return 'EUR' # Default to EUR if not found


# -----------------------------------------------------------------------------------------------
# determine the next reset time
def calculate_next_reset_time(self):
current_time = now()
if self._interval == "daily":
next_reset = current_time.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
next_reset = current_time.replace(hour=0, minute=0, second=1, microsecond=0) + timedelta(days=1)
elif self._interval == "monthly":
next_month = (current_time.replace(day=1) + timedelta(days=32)).replace(day=1)
next_reset = next_month.replace(hour=0, minute=0, second=0, microsecond=0)
next_reset = next_month.replace(hour=0, minute=0, second=1, microsecond=0)
elif self._interval == "yearly":
next_reset = current_time.replace(year=current_time.year + 1, month=1, day=1, hour=0, minute=0, second=0, microsecond=0)
next_reset = current_time.replace(year=current_time.year + 1, month=1, day=1, hour=0, minute=0, second=1, microsecond=0)
return next_reset

# schedule a reset
def schedule_next_reset(self):
next_reset = self.calculate_next_reset_time()
async_track_point_in_time(self.hass, self._reset_meter, next_reset)

# reset meter to 0, called by track point in time
async def _reset_meter(self, _):
self._state = 0 # Reset the cost to zero
self._cumulative_energy_kwh = 0 # Reset the cumulative energy kWh count to zero
self._cumulative_cost = 0
self._cumulative_energy = 0 # Reset the cumulative energy kWh count to zero
self._last_energy_reading = None
self.async_write_ha_state() # Update the state in Home Assistant
self.schedule_next_reset() # Reschedule the next reset
_LOGGER.debug(f"Meter reset for {self.name} and cumulative energy reset to {self._cumulative_energy_kwh}. Next reset scheduled.")

async def _async_update_energy_price_event(self, event):
"""Handle sensor state changes based on event data."""
new_state = event.data.get('new_state')
if new_state is None or new_state.state in ['unknown', 'unavailable']:
_LOGGER.debug("New state is unknown or unavailable, skipping update.")
return
await self.async_update()
_LOGGER.debug(f"Meter reset for {self.name} and cumulative energy reset to {self._cumulative_energy}. Next reset scheduled.")

async def async_update(self):
"""Update the energy costs using the latest sensor states, only adding incremental costs."""
_LOGGER.debug("Attempting to update energy costs.")
energy_state = self.hass.states.get(self._energy_sensor_id)
price_state = self.hass.states.get(self._price_sensor_id)
# -----------------------------------------------------------------------------------------------
# when there is a price change we need to calculate the consumed energy used since last reading for the old price
async def _async_update_price_event(self, event):
"""Handle price sensor state changes."""
old_price_state = event.data.get('old_state')
energy_state = self.hass.states.get(self._energy_sensor_id) #current energy readings

if not energy_state or not price_state or energy_state.state in ['unknown', 'unavailable'] or price_state.state in ['unknown', 'unavailable']:
if not energy_state or not old_price_state or energy_state.state in ['unknown', 'unavailable'] or old_price_state.state in ['unknown', 'unavailable']:
_LOGGER.warning("One or more sensors are unavailable. Skipping update.")
return

try:
current_energy = float(energy_state.state)
price = float(price_state.state)
price = float(old_price_state.state)

if self._last_energy_reading is not None and current_energy >= self._last_energy_reading:
if self._last_energy_reading is not None:
energy_difference = current_energy - self._last_energy_reading
cost_increment = energy_difference * price
self._state = (self._state if self._state is not None else 0) + cost_increment
self._cumulative_energy_kwh += energy_difference # Add to the running total of energy
_LOGGER.info(f"Energy cost incremented by {cost_increment} EUR, total cost now {self._state} EUR")

elif self._last_energy_reading is not None and current_energy < self._last_energy_reading:
_LOGGER.debug("Possible meter reset or rollback detected; recalculating from new base.")
# Optionally reset the cost if you determine it's a complete reset
# self._state = 0 # Uncomment this if you need to reset the state
self._cumulative_cost += cost_increment
_LOGGER.info(f"Energy cost synchronized from {self._state} EUR to {self._cumulative_cost} EUR")
self._state = self._cumulative_cost
self._cumulative_energy += energy_difference # Add to the running total of energy

else:
_LOGGER.debug("No previous energy reading available; initializing with current reading.")

self._last_energy_reading = current_energy # Always update the last reading
self.async_write_ha_state()

except Exception as e:
_LOGGER.error(f"Failed to update energy costs due to an error: {e}", exc_info=True)
pass

# -----------------------------------------------------------------------------------------------
async def _async_update_energy_event(self, event):
"""Handle energy sensor state changes."""
"""Update the energy costs using the latest sensor states, adding both incremental as decremental costs."""
energy_state = event.data.get('new_state')
price_state = self.hass.states.get(self._price_sensor_id)

if not energy_state or not price_state or energy_state.state in ['unknown', 'unavailable'] or price_state.state in ['unknown', 'unavailable']:
_LOGGER.warning("One or more sensors are unavailable. Skipping update.")
return

if self._last_energy_reading is None:
_LOGGER.debug("No previous energy reading available; initializing with current reading.")
self._last_energy_reading = current_energy # Initialize with current reading
return

try:
current_energy = float(energy_state.state)
price = float(price_state.state)

energy_difference = current_energy - self._last_energy_reading
cost_increment = energy_difference * price
self._state = self._cumulative_cost + cost_increment # set state to the cumulative cost + increment since last energy reading
_LOGGER.info(f"Energy cost incremented by {cost_increment} EUR, total cost now {self._state} EUR")

self.async_write_ha_state()

except Exception as e:
Expand Down
4 changes: 0 additions & 4 deletions custom_components/dynamic_energy_cost/power_based_sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,6 @@ def _handle_real_time_cost_update(self, event):
current_cost = Decimal(new_state.state)
_LOGGER.debug(f"Current cost retrieved from state: {current_cost}") # Log current cost

if current_cost <= 0: # Skip updates if the new cost is not positive
_LOGGER.debug(f"Skipping update as the calculated cost {current_cost} is not positive.")
return

time_difference = now() - self._last_update
hours_passed = Decimal(time_difference.total_seconds()) / Decimal(3600) # Convert time difference to hours as Decimal
_LOGGER.debug(f"Time difference calculated as: {time_difference}, which is {hours_passed} hours.") # Log time difference in hours
Expand Down