diff --git a/.gitignore b/.gitignore index fa1f0d74..2e8d38d6 100644 --- a/.gitignore +++ b/.gitignore @@ -9,7 +9,6 @@ secrets_emhass.yaml .vscode/tasks.json *.html *.pkl -data/actionLogs.txt **/app @@ -41,6 +40,10 @@ share/python-wheels/ *.egg MANIFEST +# Local session data +data/actionLogs.txt +data/entities/*.json + # PyInstaller # Usually these files are written by a python script from a template # before PyInstaller builds the exe, so as to inject date/other infos into it. diff --git a/.vscode/launch.json b/.vscode/launch.json index f0ceae3a..1800d75c 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -23,6 +23,7 @@ "OPTIONS_PATH": "/workspaces/emhass/options.json", "SECRETS_PATH": "/workspaces/emhass/secrets_emhass.yaml", "DATA_PATH": "/workspaces/emhass/data/", + "LOGGING_LEVEL": "DEBUG" } }, { @@ -52,6 +53,7 @@ "LAT": "45.83", //optional change "LON": "6.86", //optional change "ALT": "4807.8", //optional change + "LOGGING_LEVEL": "DEBUG" //optional change }, } ] diff --git a/README.md b/README.md index ece28630..fa608b8d 100644 --- a/README.md +++ b/README.md @@ -250,6 +250,8 @@ sudo chmod +x /home/user/emhass/scripts/publish_data.sh ``` ### Common for any installation method +#### Options 1, Home Assistant automate publish + In `automations.yaml`: ```yaml - alias: EMHASS day-ahead optimization @@ -265,9 +267,36 @@ In `automations.yaml`: action: - service: shell_command.publish_data ``` -In these automations the day-ahead optimization is performed everyday at 5:30am and the data is published every 5 minutes. +In these automation's the day-ahead optimization is performed once a day, everyday at 5:30am, and the data *(output of automation)* is published every 5 minutes. + +#### Option 2, EMHASS automate publish -The final action will be to link a sensor value in Home Assistant to control the switch of a desired controllable load. For example imagine that I want to control my water heater and that the `publish-data` action is publishing the optimized value of a deferrable load that I want to be linked to my water heater desired behavior. In this case we could use an automation like this one below to control the desired real switch: +In `automations.yaml`: +```yaml +- alias: EMHASS day-ahead optimization + trigger: + platform: time + at: '05:30:00' + action: + - service: shell_command.dayahead_optim + - service: shell_command.publish_data +``` +in configuration page/`config_emhass.yaml` +```json +"method_ts_round": "first" +"continual_publish": true +``` +In this automation the day-ahead optimization is performed once a day, everyday at 5:30am. +If the `freq` parameter is set to `30` *(default)* in the configuration, the results of the day-ahead optimization will generate 48 values *(for each entity)*, a value for each 30 minutes in a day *(i.e. 24 hrs x 2)*. + +Setting the parameter `continual_publish` to `true` in the configuration page, will allow EMHASS to store the optimization results as entities/sensors into seperate json files. `continual_publish` will periodically (every `freq` amount of minutes) run a publish, and publish the optimization results of each generated entities/sensors to Home Assistant. The current state of the sensor/entity being updated every time publish runs, selecting one of the 48 stored values, by comparing the stored values timestamps, the current timestamp and [`"method_ts_round": "first"`](#the-publish-data-specificities) to select the optimal stored value for the current state. + +option 1 and 2 are very similar, however option 2 (`continual_publish`) will require a cpu thread to constantly be run inside of EMHASS, lowering efficiency. The reason why you may pick one over the other is explained in more detail bellow in [continual_publish](#continual_publish-emhass-automation). + +Lastly, we can link a EMHASS published entities/sensor's current state to a Home Assistant entity on/off switch, controlling a desired controllable load. +For example, imagine that I want to control my water heater. I can use a published `deferrable` EMHASS entity to control my water heaters desired behavior. In this case, we could use an automation like below, to control the desired water heater on and off: + +on: ```yaml automation: - alias: Water Heater Optimized ON @@ -282,7 +311,7 @@ automation: - service: homeassistant.turn_on entity_id: switch.water_heater_switch ``` -A second automation should be used to turn off the switch: +off: ```yaml automation: - alias: Water Heater Optimized OFF @@ -297,14 +326,15 @@ automation: - service: homeassistant.turn_off entity_id: switch.water_heater_switch ``` +The result of these automation's will turn on and off the Home Assistant entity `switch.water_heater_switch` using the current state from the EMHASS entity `sensor.p_deferrable0`. `sensor.p_deferrable0` being the entity generated from the EMHASS day-ahead optimization and published by examples above. The `sensor.p_deferrable0` entity current state being updated every 30 minutes (or `freq` minutes) via a automated publish option 1 or 2. *(selecting one of the 48 stored data values)* ## The publish-data specificities -The `publish-data` command will push to Home Assistant the optimization results for each deferrable load defined in the configuration. For example if you have defined two deferrable loads, then the command will publish `sensor.p_deferrable0` and `sensor.p_deferrable1` to Home Assistant. When the `dayahead-optim` is launched, after the optimization, a csv file will be saved on disk. The `publish-data` command will load the latest csv file and look for the closest timestamp that match the current time using the `datetime.now()` method in Python. This means that if EMHASS is configured for 30min time step optimizations, the csv will be saved with timestamps 00:00, 00:30, 01:00, 01:30, ... and so on. If the current time is 00:05, then the closest timestamp of the optimization results that will be published is 00:00. If the current time is 00:25, then the closest timestamp of the optimization results that will be published is 00:30. +`publish-data` (which is either run manually, or automatically via `continual_publish` or Home Assistant automation), will push the optimization results to Home Assistant for each deferrable load defined in the configuration. For example if you have defined two deferrable loads, then the command will publish `sensor.p_deferrable0` and `sensor.p_deferrable1` to Home Assistant. When the `dayahead-optim` is launched, after the optimization, either entity json files or a csv file will be saved on disk. The `publish-data` command will load the latest csv/json files to look for the closest timestamp that match the current time using the `datetime.now()` method in Python. This means that if EMHASS is configured for 30min time step optimizations, the csv/json will be saved with timestamps 00:00, 00:30, 01:00, 01:30, ... and so on. If the current time is 00:05, and parameter `method_ts_round` is set to `nearest` in the configuration, then the closest timestamp of the optimization results that will be published is 00:00. If the current time is 00:25, then the closest timestamp of the optimization results that will be published is 00:30. The `publish-data` command will also publish PV and load forecast data on sensors `p_pv_forecast` and `p_load_forecast`. If using a battery, then the battery optimized power and the SOC will be published on sensors `p_batt_forecast` and `soc_batt_forecast`. On these sensors the future values are passed as nested attributes. -It is possible to provide custm sensor names for all the data exported by the `publish-data` command. For this, when using the `publish-data` endpoint just add some runtime parameters as dictionaries like this: +If you run publish manually *(or via a Home Assistant Automation)*, it is possible to provide custom sensor names for all the data exported by the `publish-data` command. For this, when using the `publish-data` endpoint we can just add some runtime parameters as dictionaries like this: ```yaml shell_command: publish_data: "curl -i -H \"Content-Type:application/json\" -X POST -d '{\"custom_load_forecast_id\": {\"entity_id\": \"sensor.p_load_forecast\", \"unit_of_measurement\": \"W\", \"friendly_name\": \"Load Power Forecast\"}}' http://localhost:5000/action/publish-data" @@ -352,12 +382,85 @@ In EMHASS we have basically 4 forecasts to deal with: - PV production selling price forecast: at what price are you selling your excess PV production on the next 24h. This is given in EUR/kWh. -The sensor containing the load data should be specified in parameter `var_load` in the configuration file. As we want to optimize the household energies, when need to forecast the load power conumption. The default method for this is a naive approach using 1-day persistence. The load data variable should not contain the data from the deferrable loads themselves. For example, lets say that you set your deferrable load to be the washing machine. The variable that you should enter in EMHASS will be: `var_load: 'sensor.power_load_no_var_loads'` and `sensor.power_load_no_var_loads = sensor.power_load - sensor.power_washing_machine`. This is supposing that the overall load of your house is contained in variable: `sensor.power_load`. The sensor `sensor.power_load_no_var_loads` can be easily created with a new template sensor in Home Assistant. +The sensor containing the load data should be specified in parameter `var_load` in the configuration file. As we want to optimize the household energies, when need to forecast the load power consumption. The default method for this is a naive approach using 1-day persistence. The load data variable should not contain the data from the deferrable loads themselves. For example, lets say that you set your deferrable load to be the washing machine. The variable that you should enter in EMHASS will be: `var_load: 'sensor.power_load_no_var_loads'` and `sensor.power_load_no_var_loads = sensor.power_load - sensor.power_washing_machine`. This is supposing that the overall load of your house is contained in variable: `sensor.power_load`. The sensor `sensor.power_load_no_var_loads` can be easily created with a new template sensor in Home Assistant. If you are implementing a MPC controller, then you should also need to provide some data at the optimization runtime using the key `runtimeparams`. The valid values to pass for both forecast data and MPC related data are explained below. +### Alternative publish methods +Due to the flexibility of EMHASS, multiple different approaches to publishing the optimization results have been created. Select a option that best meets your use case: + +#### publish last optimization *(manual)* +By default, running an optimization in EMHASS will output the results into the csv file: `data_path/opt_res_latest.csv` *(overriding the existing data on that file)*. We run the publish command to publish the last optimization saved in the `opt_res_latest.csv`: +```bash +# RUN dayahead +curl -i -H 'Content-Type:application/json' -X POST -d {} http://localhost:5000/action/dayahead-optim +# Then publish teh results of dayahead +curl -i -H 'Content-Type:application/json' -X POST -d {} http://localhost:5000/action/publish-data +``` +*Note, the published entities from the publish-data action will not automatically update the entities current state (current state being used to check when to turn on and off appliances via Home Assistant automatons). To update the EMHASS entities state, another publish would have to be re-run later when the current time matches the next values timestamp (E.g every 30 minutes). See examples bellow for methods to automate the publish-action.* + +#### continual_publish *(EMHASS Automation)* +As discussed in [Common for any installation method - option 2](#option-2-emhass-automate-publish), setting `continual_publish` to `true` in the configuration saves the output of the optimization into the `data_path/entities` folder *(a .json file for each sensor/entity)*. A constant loop (in `freq` minutes) will run, observe the .json files in that folder, and publish the saved files periodically (updating the current state of the entity by comparing date.now with the saved data value timestamps). + +For users that wish to run multiple different optimizations, you can set the runtime parameter: `publish_prefix` to something like: `"mpc_"` or `"dh_"`. This will generate unique entity_id names per optimization and save these unique entities as separate files in the folder. All the entity files will then be updated when the next loop iteration runs. If a different `freq` integer was passed as a runtime parameter in an optimization, the `continual_publish` loop will be based on the lowest `freq` saved. An example: + +```bash +# RUN dayahead, with freq=30 (default), prefix=dh_ +curl -i -H 'Content-Type:application/json' -X POST -d '{"publish_prefix":"dh_"}' http://localhost:5000/action/dayahead-optim +# RUN MPC, with freq=5, prefix=mpc_ +curl -i -H 'Content-Type:application/json' -X POST -d '{"freq":5,"publish_prefix":"mpc_"}' http://localhost:5000/action/naive-mpc-optim +``` +This will tell continual_publish to loop every 5 minutes based on the freq passed in MPC. All entities from the output of dayahead "dh_" and MPC "mpc_" will be published every 5 minutes. + +
+ +*It is recommended to use the 2 other options bellow once you have a more advance understanding of EMHASS and/or Home Assistant.* + +#### Mixture of continual_publish and manual *(Home Assistant Automation for Publish)* + +You can choose to save one optimization for continual_publish and bypass another optimization by setting `"continual_publish":false` runtime parameter: +```bash +# RUN dayahead, with freq=30 (default), prefix=dh_, included into continual_publish +curl -i -H 'Content-Type:application/json' -X POST -d '{"publish_prefix":"dh_"}' http://localhost:5000/action/dayahead-optim + +# RUN MPC, with freq=5, prefix=mpc_, Manually publish, excluded from continual_publish loop +curl -i -H 'Content-Type:application/json' -X POST -d '{"continual_publish":false,"freq":5,"publish_prefix":"mpc_"}' http://localhost:5000/action/naive-mpc-optim +# Publish MPC output +curl -i -H 'Content-Type:application/json' -X POST -d {} http://localhost:5000/action/publish-data +``` +This example saves the dayahead optimization into `data_path/entities` as .json files, being included in the `continutal_publish` loop (publishing every 30 minutes). The MPC optimization will not be saved in `data_path/entities`, and therefore only into `data_path/opt_res_latest.csv`. Requiring a publish-data action to be run manually (or via a Home Assistant) Automation for the MPC results. + +#### Manual *(Home Assistant Automation for Publish)* + +For users who wish to have full control of exactly when they will like to run a publish and have the ability to save multiple different optimizations. The `entity_save` runtime parameter has been created to save the optimization output entities to .json files whilst `continual_publish` is set to `false` in the configuration. Allowing the user to reference the saved .json files manually via a publish: + +in configuration page/`config_emhass.yaml` : +```json +"continual_publish": false +``` +POST action : +```bash +# RUN dayahead, with freq=30 (default), prefix=dh_, save entity +curl -i -H 'Content-Type:application/json' -X POST -d '{"entity_save": true, "publish_prefix":"dh_"}' http://localhost:5000/action/dayahead-optim +# RUN MPC, with freq=5, prefix=mpc_, save entity +curl -i -H 'Content-Type:application/json' -X POST -d '{"entity_save": true", "freq":5,"publish_prefix":"mpc_"}' http://localhost:5000/action/naive-mpc-optim +``` +You can then reference these .json saved entities via their `publish_prefix`. Include the same `publish_prefix` in the `publish_data` action: +```bash +#Publish the MPC optimization ran above +curl -i -H 'Content-Type:application/json' -X POST -d '{"publish_prefix":"mpc_"}' http://localhost:5000/action/publish-data +``` +This will publish all entities from the MPC (_mpc) optimization above. +
+Alternatively, you can choose to publish all the saved files .json files with `publish_prefix` = all: +```bash +#Publish all saved entities +curl -i -H 'Content-Type:application/json' -X POST -d '{"publish_prefix":"all"}' http://localhost:5000/action/publish-data +``` +This action will publish the dayahead (_dh) and MPC (_mpc) optimization results from the optimizations above. + ### Forecast data It is possible to provide EMHASS with your own forecast data. For this just add the data as list of values to a data dictionary during the call to `emhass` using the `runtimeparams` option. @@ -456,7 +559,7 @@ Check the dedicated section in the documentation here: [https://emhass.readthedo ## Development -Pull request are very much accepted on this project. For development you can find some instructions here [Development](https://emhass.readthedocs.io/en/latest/develop.html) +Pull request are very much accepted on this project. For development you can find some instructions here [Development](https://emhass.readthedocs.io/en/latest/develop.html). ## Troubleshooting diff --git a/config_emhass.yaml b/config_emhass.yaml index b73e6d99..6d306e51 100644 --- a/config_emhass.yaml +++ b/config_emhass.yaml @@ -13,6 +13,7 @@ retrieve_hass_conf: - 'sensor.power_photovoltaics' - 'sensor.power_load_no_var_loads' method_ts_round: 'nearest' # Set the method for timestamp rounding, options are: first, last and nearest + continual_publish: False # Save published sensor data and check for state change every freq minutes optim_conf: set_use_battery: False # consider a battery storage diff --git a/docs/config.md b/docs/config.md index ef46ad20..c2fce240 100644 --- a/docs/config.md +++ b/docs/config.md @@ -24,6 +24,7 @@ These are the parameters that we will need to define to retrieve data from Home - 'sensor.power_photovoltaics' - 'sensor.power_load_no_var_loads' - `method_ts_round`: Set the method for timestamp rounding, options are: first, last and nearest. +- `continual_publish`: set to True to save entities to .json after optimization run. Then automatically republish the saved entities *(with updated current state value)* every freq minutes. *entity data saved to data_path/entities.* A second part of this section is given by some privacy-sensitive parameters that should be included in a `secrets_emhass.yaml` file alongside the `config_emhass.yaml` file. diff --git a/docs/differences.md b/docs/differences.md index 68fbf5f1..1ed50434 100644 --- a/docs/differences.md +++ b/docs/differences.md @@ -22,6 +22,7 @@ See bellow for a list of associations between the parameters from `config_emhass | retrieve_hass_conf | load_negative | load_negative | | | retrieve_hass_conf | set_zero_min | set_zero_min | | | retrieve_hass_conf | method_ts_round | method_ts_round | | +| retrieve_hass_conf | continual_publish | continual_publish | | | params_secrets | solcast_api_key | optional_solcast_api_key | | | params_secrets | solcast_rooftop_id | optional_solcast_rooftop_id | | | params_secrets | solar_forecast_kwp | optional_solar_forecast_kwp | | diff --git a/options.json b/options.json index b0b55076..0b154ed1 100644 --- a/options.json +++ b/options.json @@ -6,6 +6,7 @@ "optimization_time_step": 30, "historic_days_to_retrieve": 2, "method_ts_round": "nearest", + "continual_publish": false, "optional_solcast_api_key": "empty", "optional_solcast_rooftop_id": "empty", "optional_solar_forecast_kwp": 5, diff --git a/src/emhass/command_line.py b/src/emhass/command_line.py index 7bb455b3..d2e62696 100644 --- a/src/emhass/command_line.py +++ b/src/emhass/command_line.py @@ -3,6 +3,7 @@ import argparse import os +import time import pathlib import logging import json @@ -285,6 +286,18 @@ def perfect_forecast_optim(input_data_dict: dict, logger: logging.Logger, if not debug: opt_res.to_csv( input_data_dict['emhass_conf']['data_path'] / filename, index_label='timestamp') + + + if not isinstance(input_data_dict["params"],dict): + params = json.loads(input_data_dict["params"]) + else: + params = input_data_dict["params"] + + # if continual_publish, save perfect results to data_path/entities json + if input_data_dict["retrieve_hass_conf"].get("continual_publish",False) or params["passed_data"].get("entity_save",False): + #Trigger the publish function, save entity data and not post to HA + publish_data(input_data_dict, logger, entity_save=True, dont_post=True) + return opt_res @@ -330,7 +343,19 @@ def dayahead_forecast_optim(input_data_dict: dict, logger: logging.Logger, filename = "opt_res_latest.csv" if not debug: opt_res_dayahead.to_csv( - input_data_dict['emhass_conf']['data_path'] / filename, index_label='timestamp') + input_data_dict['emhass_conf']['data_path'] / filename, index_label='timestamp') + + if not isinstance(input_data_dict["params"],dict): + params = json.loads(input_data_dict["params"]) + else: + params = input_data_dict["params"] + + + # if continual_publish, save day_ahead results to data_path/entities json + if input_data_dict["retrieve_hass_conf"].get("continual_publish",False) or params["passed_data"].get("entity_save",False): + #Trigger the publish function, save entity data and not post to HA + publish_data(input_data_dict, logger, entity_save=True, dont_post=True) + return opt_res_dayahead @@ -384,7 +409,18 @@ def naive_mpc_optim(input_data_dict: dict, logger: logging.Logger, filename = "opt_res_latest.csv" if not debug: opt_res_naive_mpc.to_csv( - input_data_dict['emhass_conf']['data_path'] / filename, index_label='timestamp') + input_data_dict['emhass_conf']['data_path'] / filename, index_label='timestamp') + + if not isinstance(input_data_dict["params"],dict): + params = json.loads(input_data_dict["params"]) + else: + params = input_data_dict["params"] + + # if continual_publish, save mpc results to data_path/entities json + if input_data_dict["retrieve_hass_conf"].get("continual_publish",False) or params["passed_data"].get("entity_save",False): + #Trigger the publish function, save entity data and not post to HA + publish_data(input_data_dict, logger, entity_save=True, dont_post=True) + return opt_res_naive_mpc @@ -648,9 +684,12 @@ def regressor_model_predict(input_data_dict: dict, logger: logging.Logger, return prediction + def publish_data(input_data_dict: dict, logger: logging.Logger, save_data_to_file: Optional[bool] = False, - opt_res_latest: Optional[pd.DataFrame] = None) -> pd.DataFrame: + opt_res_latest: Optional[pd.DataFrame] = None, + entity_save: Optional[bool] = False, + dont_post: Optional[bool] = False) -> pd.DataFrame: """ Publish the data obtained from the optimization results. @@ -662,15 +701,55 @@ def publish_data(input_data_dict: dict, logger: logging.Logger, :type save_data_to_file: bool, optional :return: The output data of the optimization readed from a CSV file in the data folder :rtype: pd.DataFrame + :param entity_save: Save built entities to data_path/entities + :type entity_save: bool, optional + :param dont_post: Do not post to Home Assistant. Works with entity_save + :type dont_post: bool, optional """ logger.info("Publishing data to HASS instance") + + if not isinstance(input_data_dict["params"],dict): + params = json.loads(input_data_dict["params"]) + else: + params = input_data_dict["params"] + # Check if a day ahead optimization has been performed (read CSV file) if save_data_to_file: today = datetime.now(timezone.utc).replace( hour=0, minute=0, second=0, microsecond=0 ) filename = "opt_res_dayahead_" + today.strftime("%Y_%m_%d") + ".csv" + # If publish_prefix is passed, check if there is saved entities in data_path/entities with prefix, publish to results + elif params["passed_data"].get("publish_prefix","") != "" and not dont_post: + opt_res_list = [] + opt_res_list_names = [] + publish_prefix = params["passed_data"]["publish_prefix"] + entity_path = input_data_dict['emhass_conf']['data_path'] / "entities" + + # Check if items in entity_path + if os.path.exists(entity_path) and len(os.listdir(entity_path)) > 0: + # Obtain all files in entity_path + entity_path_contents = os.listdir(entity_path) + for entity in entity_path_contents: + if entity != "metadata.json": + # If publish_prefix is "all" publish all saved entities to Home Assistant + # If publish_prefix matches the prefix from saved entities, publish to Home Assistant + if publish_prefix in entity or publish_prefix == "all": + entity_data = publish_json(entity,input_data_dict,entity_path,logger) + if not isinstance(entity_data, bool): + opt_res_list.append(entity_data) + opt_res_list_names.append(entity.replace(".json", "")) + else: + return False + # Build a DataFrame with published entities + opt_res = pd.concat(opt_res_list, axis=1) + opt_res.columns = opt_res_list_names + return opt_res + else: + logger.warning("no saved entity json files in path:" + str(entity_path)) + logger.warning("falling back to opt_res_latest") + filename = "opt_res_latest.csv" else: filename = "opt_res_latest.csv" if opt_res_latest is None: @@ -698,7 +777,6 @@ def publish_data(input_data_dict: dict, logger: logging.Logger, if idx_closest == -1: idx_closest = opt_res_latest.index.get_indexer([now_precise], method="nearest")[0] # Publish the data - params = json.loads(input_data_dict["params"]) publish_prefix = params["passed_data"]["publish_prefix"] # Publish PV forecast custom_pv_forecast_id = params["passed_data"]["custom_pv_forecast_id"] @@ -710,6 +788,8 @@ def publish_data(input_data_dict: dict, logger: logging.Logger, custom_pv_forecast_id["friendly_name"], type_var="power", publish_prefix=publish_prefix, + save_entities=entity_save, + dont_post=dont_post ) # Publish Load forecast custom_load_forecast_id = params["passed_data"]["custom_load_forecast_id"] @@ -721,6 +801,8 @@ def publish_data(input_data_dict: dict, logger: logging.Logger, custom_load_forecast_id["friendly_name"], type_var="power", publish_prefix=publish_prefix, + save_entities=entity_save, + dont_post=dont_post ) cols_published = ["P_PV", "P_Load"] # Publish deferrable loads @@ -742,6 +824,8 @@ def publish_data(input_data_dict: dict, logger: logging.Logger, custom_deferrable_forecast_id[k]["friendly_name"], type_var="deferrable", publish_prefix=publish_prefix, + save_entities=entity_save, + dont_post=dont_post ) cols_published = cols_published + ["P_deferrable{}".format(k)] # Publish battery power @@ -760,6 +844,8 @@ def publish_data(input_data_dict: dict, logger: logging.Logger, custom_batt_forecast_id["friendly_name"], type_var="batt", publish_prefix=publish_prefix, + save_entities=entity_save, + dont_post=dont_post ) cols_published = cols_published + ["P_batt"] custom_batt_soc_forecast_id = params["passed_data"][ @@ -773,6 +859,8 @@ def publish_data(input_data_dict: dict, logger: logging.Logger, custom_batt_soc_forecast_id["friendly_name"], type_var="SOC", publish_prefix=publish_prefix, + save_entities=entity_save, + dont_post=dont_post ) cols_published = cols_published + ["SOC_opt"] # Publish grid power @@ -785,6 +873,8 @@ def publish_data(input_data_dict: dict, logger: logging.Logger, custom_grid_forecast_id["friendly_name"], type_var="power", publish_prefix=publish_prefix, + save_entities=entity_save, + dont_post=dont_post ) cols_published = cols_published + ["P_grid"] # Publish total value of cost function @@ -798,7 +888,10 @@ def publish_data(input_data_dict: dict, logger: logging.Logger, custom_cost_fun_id["friendly_name"], type_var="cost_fun", publish_prefix=publish_prefix, + save_entities=entity_save, + dont_post=dont_post ) + # cols_published = cols_published + col_cost_fun # Publish the optimization status custom_cost_fun_id = params["passed_data"]["custom_optim_status_id"] if "optim_status" not in opt_res_latest: @@ -814,6 +907,8 @@ def publish_data(input_data_dict: dict, logger: logging.Logger, custom_cost_fun_id["friendly_name"], type_var="optim_status", publish_prefix=publish_prefix, + save_entities=entity_save, + dont_post=dont_post ) cols_published = cols_published + ["optim_status"] # Publish unit_load_cost @@ -826,6 +921,8 @@ def publish_data(input_data_dict: dict, logger: logging.Logger, custom_unit_load_cost_id["friendly_name"], type_var="unit_load_cost", publish_prefix=publish_prefix, + save_entities=entity_save, + dont_post=dont_post ) cols_published = cols_published + ["unit_load_cost"] # Publish unit_prod_price @@ -838,6 +935,8 @@ def publish_data(input_data_dict: dict, logger: logging.Logger, custom_unit_prod_price_id["friendly_name"], type_var="unit_prod_price", publish_prefix=publish_prefix, + save_entities=entity_save, + dont_post=dont_post ) cols_published = cols_published + ["unit_prod_price"] # Create a DF resuming what has been published @@ -845,6 +944,108 @@ def publish_data(input_data_dict: dict, logger: logging.Logger, opt_res_latest.index[idx_closest]]] return opt_res +def continual_publish(input_data_dict,entity_path,logger): + """ + If continual_publish is true and a entity file saved in /data_path/entities, continually publish sensor on freq rate, updating entity current state value based on timestamp + + :param input_data_dict: A dictionnary with multiple data used by the action functions + :type input_data_dict: dict + :param entity_path: Path for entities folder in data_path + :type entity_path: Path + :param logger: The passed logger object + :type logger: logging.Logger + + """ + logger.info("Continual publish thread service started") + freq = input_data_dict['retrieve_hass_conf'].get("freq", pd.to_timedelta(1, "minutes")) + entity_path_contents = [] + + while True: + # Sleep for x seconds (using current time as a reference for time left) + time.sleep(max(0,freq.total_seconds() - (datetime.now(input_data_dict["retrieve_hass_conf"]["time_zone"]).timestamp() % 60))) + + # Loop through all saved entity files + if os.path.exists(entity_path) and len(os.listdir(entity_path)) > 0: + entity_path_contents = os.listdir(entity_path) + for entity in entity_path_contents: + if entity != "metadata.json": + # Call publish_json with entity file, build entity, and publish + publish_json(entity,input_data_dict,entity_path,logger,"continual_publish") + pass + # This function should never return + return False + +def publish_json(entity,input_data_dict,entity_path,logger,reference: Optional[str] = ""): + """ + Extract saved entity data from .json (in data_path/entities), build entity, post results to post_data + + :param entity: json file containing entity data + :type entity: dict + :param input_data_dict: A dictionnary with multiple data used by the action functions + :type input_data_dict: dict + :param entity_path: Path for entities folder in data_path + :type entity_path: Path + :param logger: The passed logger object + :type logger: logging.Logger + :param reference: String for identifying who ran the function + :type reference: str, optional + + """ + + # Retrieve entity metadata from file + if os.path.isfile(entity_path / "metadata.json"): + with open(entity_path / "metadata.json", "r") as file: + metadata = json.load(file) + if not metadata.get("lowest_freq",None) == None: + freq = pd.to_timedelta(metadata["lowest_freq"], "minutes") + else: + logger.error("unable to located metadata.json in:" + entity_path) + return False + + # Round current timecode (now) + now_precise = datetime.now(input_data_dict["retrieve_hass_conf"]["time_zone"]).replace(second=0, microsecond=0) + + # Retrieve entity data from file + entity_data = pd.read_json(entity_path / entity , orient='index') + + # Remove ".json" from string for entity_id + entity_id = entity.replace(".json", "") + + # Adjust Dataframe from received entity json file + entity_data.columns = [metadata[entity_id]["name"]] + entity_data.index.name = "timestamp" + entity_data.index = pd.to_datetime(entity_data.index).tz_convert(input_data_dict["retrieve_hass_conf"]["time_zone"]) + entity_data.index.freq = pd.to_timedelta(int(metadata[entity_id]["freq"]), "minutes") + # Calculate the current state value + if input_data_dict["retrieve_hass_conf"]["method_ts_round"] == "nearest": + idx_closest = entity_data.index.get_indexer([now_precise], method="nearest")[0] + elif input_data_dict["retrieve_hass_conf"]["method_ts_round"] == "first": + idx_closest = entity_data.index.get_indexer([now_precise], method="ffill")[0] + elif input_data_dict["retrieve_hass_conf"]["method_ts_round"] == "last": + idx_closest = entity_data.index.get_indexer([now_precise], method="bfill")[0] + if idx_closest == -1: + idx_closest = entity_data.index.get_indexer([now_precise], method="nearest")[0] + + # Call post data + if reference == "continual_publish": + logger.debug("Auto Published sensor:") + logger_levels = "DEBUG" + else: + logger_levels = "INFO" + + #post/save entity + input_data_dict["rh"].post_data( + data_df=entity_data[metadata[entity_id]["name"]], + idx=idx_closest, + entity_id=entity_id, + unit_of_measurement=metadata[entity_id]["unit_of_measurement"], + friendly_name=metadata[entity_id]["friendly_name"], + type_var=metadata[entity_id].get("type_var",""), + save_entities=False, + logger_levels=logger_levels + ) + return entity_data[metadata[entity_id]["name"]] + def main(): r"""Define the main command line entry function. @@ -979,7 +1180,7 @@ def main(): prediction = regressor_model_predict(input_data_dict, logger, debug=args.debug,mlr=mlr) opt_res = None elif args.action == "publish-data": - opt_res = publish_data(input_data_dict, logger) + opt_res = publish_data(input_data_dict,logger) else: logger.error("The passed action argument is not valid") logger.error("Try setting --action: perfect-optim, dayahead-optim, naive-mpc-optim, forecast-model-fit, forecast-model-predict, forecast-model-tune or publish-data") diff --git a/src/emhass/retrieve_hass.py b/src/emhass/retrieve_hass.py index 476a7c9b..9737bc81 100644 --- a/src/emhass/retrieve_hass.py +++ b/src/emhass/retrieve_hass.py @@ -3,6 +3,8 @@ import json import copy +import os +import pathlib import datetime import logging from typing import Optional @@ -61,7 +63,7 @@ def __init__(self, hass_url: str, long_lived_token: str, freq: pd.Timedelta, self.freq = freq self.time_zone = time_zone self.params = params - # self.emhass_conf = emhass_conf + self.emhass_conf = emhass_conf self.logger = logger self.get_data_from_file = get_data_from_file @@ -304,9 +306,11 @@ def get_attr_data_dict(data_df: pd.DataFrame, idx: int, entity_id: str, unit_of_ } return data + def post_data(self, data_df: pd.DataFrame, idx: int, entity_id: str, unit_of_measurement: str, friendly_name: str, type_var: str, from_mlforecaster: Optional[bool] = False, - publish_prefix: Optional[str] = "") -> None: + publish_prefix: Optional[str] = "", save_entities: Optional[bool] = False, + logger_levels: Optional[str] = "info", dont_post: Optional[bool] = False) -> None: r""" Post passed data to hass. @@ -326,6 +330,12 @@ def post_data(self, data_df: pd.DataFrame, idx: int, entity_id: str, unit_of_mea :type type_var: str :param publish_prefix: A common prefix for all published data entity_id. :type publish_prefix: str, optional + :param save_entities: if entity data should be saved in data_path/entities + :type save_entities: bool, optional + :param logger_levels: set logger level, info or debug, to output + :type logger_levels: str, optional + :param dont_post: dont post to HA + :type dont_post: bool, optional """ # Add a possible prefix to the entity ID @@ -340,10 +350,12 @@ def post_data(self, data_df: pd.DataFrame, idx: int, entity_id: str, unit_of_mea headers = { "Authorization": "Bearer " + self.long_lived_token, "content-type": "application/json", - } + } # Preparing the data dict to be published if type_var == "cost_fun": - state = np.round(data_df.sum()[0], 2) + if isinstance(data_df.iloc[0],pd.Series): #if Series extract + data_df = data_df.iloc[:, 0] + state = np.round(data_df.sum(), 2) elif type_var == "unit_load_cost" or type_var == "unit_prod_price": state = np.round(data_df.loc[data_df.index[idx]], 4) elif type_var == "optim_status": @@ -398,18 +410,54 @@ def post_data(self, data_df: pd.DataFrame, idx: int, entity_id: str, unit_of_mea }, } # Actually post the data - if self.get_data_from_file: + if self.get_data_from_file or dont_post: class response: pass response.status_code = 200 response.ok = True else: response = post(url, headers=headers, data=json.dumps(data)) + # Treating the response status and posting them on the logger if response.ok: - self.logger.info("Successfully posted to " + entity_id + " = " + str(state)) + + if logger_levels == "DEBUG": + self.logger.debug("Successfully posted to " + entity_id + " = " + str(state)) + else: + self.logger.info("Successfully posted to " + entity_id + " = " + str(state)) + + # If save entities is set, save entity data to /data_path/entities + if (save_entities): + entities_path = self.emhass_conf['data_path'] / "entities" + + # Clarify folder exists + pathlib.Path(entities_path).mkdir(parents=True, exist_ok=True) + + # Save entity data to json file + result = data_df.to_json(index="timestamp", orient='index', date_unit='s', date_format='iso') + parsed = json.loads(result) + with open(entities_path / (entity_id + ".json"), "w") as file: + json.dump(parsed, file, indent=4) + + # Save the required metadata to json file + if os.path.isfile(entities_path / "metadata.json"): + with open(entities_path / "metadata.json", "r") as file: + metadata = json.load(file) + else: + metadata = {} + with open(entities_path / "metadata.json", "w") as file: + # Save entity metadata, key = entity_id + metadata[entity_id] = {'name': data_df.name, 'unit_of_measurement': unit_of_measurement,'friendly_name': friendly_name,'type_var': type_var, 'freq': int(self.freq.seconds / 60)} + + # Find lowest frequency to set for continual loop freq + if metadata.get("lowest_freq",None) == None or metadata["lowest_freq"] > int(self.freq.seconds / 60): + metadata["lowest_freq"] = int(self.freq.seconds / 60) + json.dump(metadata,file, indent=4) + + self.logger.debug("Saved " + entity_id + " to json file") + else: - self.logger.info( + self.logger.warning( "The status code for received curl command response is: " + str(response.status_code) ) diff --git a/src/emhass/utils.py b/src/emhass/utils.py index f32a136d..2cf2e9e3 100644 --- a/src/emhass/utils.py +++ b/src/emhass/utils.py @@ -12,7 +12,6 @@ import yaml import pytz - import plotly.express as px pd.options.plotting.backend = "plotly" @@ -443,6 +442,8 @@ def treat_runtimeparams(runtimeparams: str, params: str, retrieve_hass_conf: dic optim_conf["weight_battery_charge"] = runtimeparams["weight_battery_charge"] if 'freq' in runtimeparams.keys(): retrieve_hass_conf['freq'] = pd.to_timedelta(runtimeparams['freq'], "minutes") + if 'continual_publish' in runtimeparams.keys(): + retrieve_hass_conf['continual_publish'] = bool(runtimeparams['continual_publish']) # Treat plant configuration parameters passed at runtime if "SOCtarget" in runtimeparams.keys(): plant_conf["SOCtarget"] = runtimeparams["SOCtarget"] @@ -487,12 +488,18 @@ def treat_runtimeparams(runtimeparams: str, params: str, retrieve_hass_conf: dic params["passed_data"]["custom_deferrable_forecast_id"] = runtimeparams[ "custom_deferrable_forecast_id" ] - # A condition to put a prefix on all published data + # A condition to put a prefix on all published data, or check for saved data under prefix name if "publish_prefix" not in runtimeparams.keys(): publish_prefix = "" else: publish_prefix = runtimeparams["publish_prefix"] params["passed_data"]["publish_prefix"] = publish_prefix + # A condition to manually save entity data under data_path/entities after optimization + if "entity_save" not in runtimeparams.keys(): + entity_save = "" + else: + entity_save = runtimeparams["entity_save"] + params["passed_data"]["entity_save"] = entity_save # Serialize the final params params = json.dumps(params) return params, retrieve_hass_conf, optim_conf, plant_conf @@ -767,6 +774,9 @@ def build_params(params: dict, params_secrets: dict, options: dict, addon: int, params["retrieve_hass_conf"]["method_ts_round"] = options.get( "method_ts_round", params["retrieve_hass_conf"]["method_ts_round"] ) + params["retrieve_hass_conf"]["continual_publish"] = options.get( + "continual_publish", params["retrieve_hass_conf"]["continual_publish"] + ) # Update params Secrets if specified params["params_secrets"] = params_secrets params["params_secrets"]["time_zone"] = options.get( diff --git a/src/emhass/web_server.py b/src/emhass/web_server.py index afb5370a..3b2dad4e 100644 --- a/src/emhass/web_server.py +++ b/src/emhass/web_server.py @@ -7,21 +7,21 @@ from waitress import serve from importlib.metadata import version, PackageNotFoundError from pathlib import Path -import os, json, argparse, pickle, yaml, logging, re +import os, json, argparse, pickle, yaml, logging, re, threading from distutils.util import strtobool from emhass.command_line import set_input_data_dict from emhass.command_line import perfect_forecast_optim, dayahead_forecast_optim, naive_mpc_optim from emhass.command_line import forecast_model_fit, forecast_model_predict, forecast_model_tune from emhass.command_line import regressor_model_fit, regressor_model_predict -from emhass.command_line import publish_data +from emhass.command_line import publish_data, continual_publish from emhass.utils import get_injection_dict, get_injection_dict_forecast_model_fit, \ get_injection_dict_forecast_model_tune, build_params # Define the Flask instance app = Flask(__name__) -#check logfile for error, anything after string match if provided +# Check logfile for error, anything after string match if provided def checkFileLog(refString=None): if (refString is not None): logArray = grabLog(refString) #grab reduced log array @@ -34,7 +34,7 @@ def checkFileLog(refString=None): return True return False -#find string in logs, append all lines after to return +# Find string in logs, append all lines after to return def grabLog(refString): isFound = [] output = [] @@ -49,13 +49,14 @@ def grabLog(refString): output.append(logArray[x]) return output -#clear the log file +# Clear the log file def clearFileLog(): if ((emhass_conf['data_path'] / 'actionLogs.txt')).exists(): with open(str(emhass_conf['data_path'] / 'actionLogs.txt'), "w") as fp: fp.truncate() -#initial index page render + +# Initial index page render @app.route('/') def index(): app.logger.info("EMHASS server online, serving index.html...") @@ -97,6 +98,7 @@ def template_action(action_name): #post actions @app.route('/action/', methods=['POST']) def action_call(action_name): + # Setting up parameters with open(str(emhass_conf['data_path'] / 'params.pkl'), "rb") as fid: emhass_conf['config_path'], params = pickle.load(fid) runtimeparams = request.get_json(force=True) @@ -110,6 +112,15 @@ def action_call(action_name): params, runtimeparams, action_name, app.logger) if not input_data_dict: return make_response(grabLog(ActionStr), 400) + + # If continual_publish is True, start thread with loop function + if len(continual_publish_thread) == 0 and input_data_dict['retrieve_hass_conf'].get("continual_publish",False): + # Start Thread + continualLoop = threading.Thread(name="continual_publish",target=continual_publish,args=[input_data_dict,entity_path,app.logger]) + continualLoop.start() + continual_publish_thread.append(continualLoop) + + # run action based on POST request if action_name == 'publish-data': ActionStr = " >> Publishing data..." app.logger.info(ActionStr) @@ -446,6 +457,18 @@ def action_call(action_name): app.logger.addHandler(fileLogger) clearFileLog() #Clear Action File logger file, ready for new instance + + #If entity_path exists, remove any entity/metadata files + entity_path = emhass_conf['data_path'] / "entities" + if os.path.exists(entity_path): + entity_pathContents = os.listdir(entity_path) + if len(entity_pathContents) > 0: + for entity in entity_pathContents: + os.remove(entity_path / entity) + + # Initialise continual publish thread list + continual_publish_thread = [] + # Launch server port = int(os.environ.get('PORT', 5000)) app.logger.info("Launching the emhass webserver at: http://"+web_ui_url+":"+str(port))