Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to retrieve configuration data directly from HA #399

Merged
merged 7 commits into from
Dec 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified data/test_df_final.pkl
Binary file not shown.
15 changes: 15 additions & 0 deletions docs/develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,21 @@ Lastly, to support the configuration website to generate the parameter in the li

![Screenshot from 2024-09-09 16-45-32](https://github.com/user-attachments/assets/01e7984f-3332-4e25-8076-160f51a2e0c4)

If you are only adding another option for a existing parameter, editing param_definitions.json file should be all you need. (allowing the user to select the option from the configuration page):
```json
"load_forecast_method": {
"friendly_name": "Load forecast method",
"Description": "The load forecast method that will be used. The options are ‘csv’ to load a CSV file or ‘naive’ for a simple 1-day persistence model.",
"input": "select",
"select_options": [
"naive",
"mlforecaster",
"csv",
"CALL_NEW_OPTION"
],
"default_value": "naive"
},
```

## Step 3 - Pull request

Expand Down
31 changes: 22 additions & 9 deletions src/emhass/command_line.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def set_input_data_dict(
retrieve_hass_conf, optim_conf, plant_conf = utils.get_yaml_parse(params, logger)
if type(retrieve_hass_conf) is bool:
return False

# Treat runtimeparams
params, retrieve_hass_conf, optim_conf, plant_conf = utils.treat_runtimeparams(
runtimeparams,
Expand All @@ -81,7 +81,8 @@ def set_input_data_dict(
logger,
emhass_conf,
)
# Define main objects

# Define the data retrieve object
rh = RetrieveHass(
retrieve_hass_conf["hass_url"],
retrieve_hass_conf["long_lived_token"],
Expand All @@ -92,6 +93,21 @@ def set_input_data_dict(
logger,
get_data_from_file=get_data_from_file,
)

# Retrieve basic configuration data from hass
if get_data_from_file:
with open(emhass_conf["data_path"] / "test_df_final.pkl", "rb") as inp:
_, _, _, rh.ha_config = pickle.load(inp)
else:
rh.get_ha_config()

# Update the params dict using data from the HA configuration
params = utils.update_params_with_ha_config(
params,
rh.ha_config,
)

# Define the forecast and optimization objects
fcst = Forecast(
retrieve_hass_conf,
optim_conf,
Expand All @@ -111,12 +127,13 @@ def set_input_data_dict(
emhass_conf,
logger,
)

# Perform setup based on type of action
if set_type == "perfect-optim":
# Retrieve data from hass
if get_data_from_file:
with open(emhass_conf["data_path"] / "test_df_final.pkl", "rb") as inp:
rh.df_final, days_list, var_list = pickle.load(inp)
rh.df_final, days_list, var_list, rh.ha_config = pickle.load(inp)
retrieve_hass_conf["sensor_power_load_no_var_loads"] = str(var_list[0])
retrieve_hass_conf["sensor_power_photovoltaics"] = str(var_list[1])
retrieve_hass_conf["sensor_linear_interp"] = [
Expand Down Expand Up @@ -208,7 +225,7 @@ def set_input_data_dict(
# Retrieve data from hass
if get_data_from_file:
with open(emhass_conf["data_path"] / "test_df_final.pkl", "rb") as inp:
rh.df_final, days_list, var_list = pickle.load(inp)
rh.df_final, days_list, var_list, rh.ha_config = pickle.load(inp)
retrieve_hass_conf["sensor_power_load_no_var_loads"] = str(var_list[0])
retrieve_hass_conf["sensor_power_photovoltaics"] = str(var_list[1])
retrieve_hass_conf["sensor_linear_interp"] = [
Expand Down Expand Up @@ -403,10 +420,8 @@ def weather_forecast_cache(
:rtype: bool

"""

# Parsing yaml
retrieve_hass_conf, optim_conf, plant_conf = utils.get_yaml_parse(params, logger)

# Treat runtimeparams
params, retrieve_hass_conf, optim_conf, plant_conf = utils.treat_runtimeparams(
runtimeparams,
Expand All @@ -417,21 +432,19 @@ def weather_forecast_cache(
"forecast",
logger,
emhass_conf,
{},
)

# Make sure weather_forecast_cache is true
if (params != None) and (params != "null"):
params = json.loads(params)
else:
params = {}
params["passed_data"]["weather_forecast_cache"] = True
params = json.dumps(params)

# Create Forecast object
fcst = Forecast(
retrieve_hass_conf, optim_conf, plant_conf, params, emhass_conf, logger
)

result = fcst.get_weather_forecast(optim_conf["weather_forecast_method"])
if isinstance(result, bool) and not result:
return False
Expand Down
102 changes: 98 additions & 4 deletions src/emhass/forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -886,10 +886,73 @@ def get_forecast_out_from_csv_or_list(
forecast_out = pd.concat([forecast_out, forecast_tp], axis=0)
return forecast_out

@staticmethod
def resample_data(data, freq, current_freq):
r"""
Resample a DataFrame with a custom frequency.

:param data: Original time series data with a DateTimeIndex.
:type data: pd.DataFrame
:param freq: Desired frequency for resampling (e.g., pd.Timedelta("10min")).
:type freq: pd.Timedelta
:return: Resampled data at the specified frequency.
:rtype: pd.DataFrame
"""
if freq > current_freq:
# Downsampling
# Use 'mean' to aggregate or choose other options ('sum', 'max', etc.)
resampled_data = data.resample(freq).mean()
elif freq < current_freq:
# Upsampling
# Use 'asfreq' to create empty slots, then interpolate
resampled_data = data.resample(freq).asfreq()
resampled_data = resampled_data.interpolate(method='time')
else:
# No resampling needed
resampled_data = data.copy()
return resampled_data

@staticmethod
def get_typical_load_forecast(data, forecast_date):
r"""
Forecast the load profile for the next day based on historic data.

:param data: A DataFrame with a DateTimeIndex containing the historic load data.
Must include a 'load' column.
:type data: pd.DataFrame
:param forecast_date: The date for which the forecast will be generated.
:type forecast_date: pd.Timestamp
:return: A Series with the forecasted load profile for the next day and a list of days used
to calculate the forecast.
:rtype: tuple (pd.Series, list)
"""
# Ensure the 'load' column exists
if 'load' not in data.columns:
raise ValueError("Data must have a 'load' column.")
# Filter historic data for the same month and day of the week
month = forecast_date.month
day_of_week = forecast_date.dayofweek
historic_data = data[(data.index.month == month) & (data.index.dayofweek == day_of_week)]
used_days = np.unique(historic_data.index.date)
# Align all historic data to the forecast day
aligned_data = []
for day in used_days:
daily_data = data[data.index.date == pd.Timestamp(day).date()]
aligned_daily_data = daily_data.copy()
aligned_daily_data.index = aligned_daily_data.index.map(
lambda x: x.replace(year=forecast_date.year, month=forecast_date.month, day=forecast_date.day)
)
aligned_data.append(aligned_daily_data)
# Combine all aligned historic data into a single DataFrame
combined_data = pd.concat(aligned_data)
# Compute the mean load for each timestamp
forecast = combined_data.groupby(combined_data.index).mean()
return forecast, used_days

def get_load_forecast(
self,
days_min_load_forecast: Optional[int] = 3,
method: Optional[str] = "naive",
method: Optional[str] = "typical",
csv_path: Optional[str] = "data_load_forecast.csv",
set_mix_forecast: Optional[bool] = False,
df_now: Optional[pd.DataFrame] = pd.DataFrame(),
Expand All @@ -904,10 +967,11 @@ def get_load_forecast(
will be used to generate a naive forecast, defaults to 3
:type days_min_load_forecast: int, optional
:param method: The method to be used to generate load forecast, the options \
are 'typical' for a typical household load consumption curve, \
are 'naive' for a persistance model, 'mlforecaster' for using a custom \
previously fitted machine learning model, 'csv' to read the forecast from \
a CSV file and 'list' to use data directly passed at runtime as a list of \
values. Defaults to 'naive'.
values. Defaults to 'typical'.
:type method: str, optional
:param csv_path: The path to the CSV file used when method = 'csv', \
defaults to "/data/data_load_forecast.csv"
Expand Down Expand Up @@ -956,7 +1020,7 @@ def get_load_forecast(
if self.get_data_from_file:
filename_path = self.emhass_conf["data_path"] / "test_df_final.pkl"
with open(filename_path, "rb") as inp:
rh.df_final, days_list, var_list = pickle.load(inp)
rh.df_final, days_list, var_list, rh.ha_config = pickle.load(inp)
self.var_load = var_list[0]
self.retrieve_hass_conf["sensor_power_load_no_var_loads"] = (
self.var_load
Expand All @@ -977,7 +1041,37 @@ def get_load_forecast(
):
return False
df = rh.df_final.copy()[[self.var_load_new]]
if method == "naive": # using a naive approach
if method == "typical": # using typical statistical data from a household power consumption
# Loading data from history file
model_type = "load_clustering"
data_path = self.emhass_conf["data_path"] / str("data_train_" + model_type + ".pkl")
with open(data_path, "rb") as fid:
data, _ = pickle.load(fid)
# Resample the data if needed
current_freq = pd.Timedelta('30min')
if self.freq != current_freq:
data = Forecast.resample_data(data, self.freq, current_freq)
# Generate forecast
data_list = []
dates_list = np.unique(self.forecast_dates.date).tolist()
forecast = pd.DataFrame()
for date in dates_list:
forecast_date = pd.Timestamp(date)
data.columns = ['load']
forecast_tmp, used_days = Forecast.get_typical_load_forecast(data, forecast_date)
self.logger.debug(f"Using {len(used_days)} days of data to generate the forecast.")
# Normalize the forecast
forecast_tmp = forecast_tmp*self.plant_conf['maximum_power_from_grid']/9000
data_list.extend(forecast_tmp.values.ravel().tolist())
if len(forecast) == 0:
forecast = forecast_tmp
else:
forecast = pd.concat([forecast, forecast_tmp], axis=0)
forecast.index = forecast.index.tz_convert(self.time_zone)
forecast_out = forecast.loc[forecast.index.intersection(self.forecast_dates)]
forecast_out.index.name = 'ts'
forecast_out = forecast_out.rename(columns={'load': 'yhat'})
elif method == "naive": # using a naive approach
mask_forecast_out = (
df.index > days_list[-1] - self.optim_conf["delta_forecast_daily"]
)
Expand Down
2 changes: 1 addition & 1 deletion src/emhass/optimization.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ def create_matrix(input_list, n):
cooling_constant
* (
predicted_temp[I - 1]
- outdoor_temperature_forecast[I - 1]
- outdoor_temperature_forecast.iloc[I - 1]
)
)
)
Expand Down
3 changes: 2 additions & 1 deletion src/emhass/static/data/param_definitions.json
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,12 @@
"Description": "The load forecast method that will be used. The options are ‘csv’ to load a CSV file or ‘naive’ for a simple 1-day persistence model.",
"input": "select",
"select_options": [
"typical",
"naive",
"mlforecaster",
"csv"
],
"default_value": "naive"
"default_value": "typical"
},
"set_total_pv_sell": {
"friendly_name": "PV straight to grid",
Expand Down
76 changes: 72 additions & 4 deletions src/emhass/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,70 @@ def get_forecast_dates(
return forecast_dates


def update_params_with_ha_config(
params: str,
ha_config: dict,
) -> dict:
"""
Update the params with the Home Assistant configuration.

Parameters
----------
params : str
The serialized params.
ha_config : dict
The Home Assistant configuration.

Returns
-------
dict
The updated params.
"""
# Load serialized params
params = json.loads(params)
# Update params
currency_to_symbol = {
'EUR': '€',
'USD': '$',
'GBP': '£',
'YEN': '¥',
'JPY': '¥',
'AUD': 'A$',
'CAD': 'C$',
'CHF': 'CHF', # Swiss Franc has no special symbol
'CNY': '¥',
'INR': '₹',
# Add more as needed
}
if 'currency' in ha_config.keys():
ha_config['currency'] = currency_to_symbol.get(ha_config['currency'], 'Unknown')
else:
ha_config['currency'] = '€'
if 'unit_system' not in ha_config.keys():
ha_config['unit_system'] = {'temperature': '°C'}

for k in range(params["optim_conf"]["number_of_deferrable_loads"]):
params['passed_data']['custom_predicted_temperature_id'][k].update(
{"unit_of_measurement": ha_config['unit_system']['temperature']}
)
updated_passed_dict = {
"custom_cost_fun_id": {
"unit_of_measurement": ha_config['currency'],
},
"custom_unit_load_cost_id": {
"unit_of_measurement": f"{ha_config['currency']}/kWh",
},
"custom_unit_prod_price_id": {
"unit_of_measurement": f"{ha_config['currency']}/kWh",
},
}
for key, value in updated_passed_dict.items():
params["passed_data"][key]["unit_of_measurement"] = value["unit_of_measurement"]
# Serialize the final params
params = json.dumps(params, default=str)
return params


def treat_runtimeparams(
runtimeparams: str,
params: str,
Expand Down Expand Up @@ -183,6 +247,10 @@ def treat_runtimeparams(
params["optim_conf"].update(optim_conf)
params["plant_conf"].update(plant_conf)

# Check defaults on HA retrieved config
default_currency_unit = '€'
default_temperature_unit = '°C'

# Some default data needed
custom_deferrable_forecast_id = []
custom_predicted_temperature_id = []
Expand All @@ -197,7 +265,7 @@ def treat_runtimeparams(
custom_predicted_temperature_id.append(
{
"entity_id": "sensor.temp_predicted{}".format(k),
"unit_of_measurement": "°C",
"unit_of_measurement": default_temperature_unit,
"friendly_name": "Predicted temperature {}".format(k),
}
)
Expand Down Expand Up @@ -239,7 +307,7 @@ def treat_runtimeparams(
},
"custom_cost_fun_id": {
"entity_id": "sensor.total_cost_fun_value",
"unit_of_measurement": "",
"unit_of_measurement": default_currency_unit,
"friendly_name": "Total cost function value",
},
"custom_optim_status_id": {
Expand All @@ -249,12 +317,12 @@ def treat_runtimeparams(
},
"custom_unit_load_cost_id": {
"entity_id": "sensor.unit_load_cost",
"unit_of_measurement": "€/kWh",
"unit_of_measurement": f"{default_currency_unit}/kWh",
"friendly_name": "Unit Load Cost",
},
"custom_unit_prod_price_id": {
"entity_id": "sensor.unit_prod_price",
"unit_of_measurement": "€/kWh",
"unit_of_measurement": f"{default_currency_unit}/kWh",
"friendly_name": "Unit Prod Price",
},
"custom_deferrable_forecast_id": custom_deferrable_forecast_id,
Expand Down
Loading
Loading