Skip to content

Commit

Permalink
mosmix caching improved
Browse files Browse the repository at this point in the history
  • Loading branch information
grro committed Nov 8, 2022
1 parent 3f2caab commit 4aee02a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 76 deletions.
122 changes: 47 additions & 75 deletions pvpower/mosmix.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
from random import randrange
import pytz
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from typing import List, Dict
import xml.etree.ElementTree as ET



class IssueTimeCollector:

def __init__(self):
Expand Down Expand Up @@ -82,38 +81,41 @@ def consume(self, event, elem):

class ParameterUtcSeries:

def __init__(self, name: str, series: Dict[str, float]):
self.name = name
self.__series = series # time_utc: value
TIME_PATTERN = "%Y.%m.%dT%H"

def size(self) -> int:
return len(self.__series.keys())

def value_at(self, local_datetime: datetime) -> float:
dt_utc = local_datetime.astimezone(pytz.UTC)
return self.__series.get(dt_utc.strftime("%Y.%m.%d %H"))

def to_dict(self) -> Dict[str, Any]:
return { "name": self.name,
"series": self.__series }

def merge(self, other, min_local_datetime: datetime):
min_datetime_utc = min_local_datetime.astimezone(pytz.UTC)
min_datetime_utc_str = min_datetime_utc.strftime("%Y.%m.%d %H")
merged_series = {}
for time_utc in other.__series.keys():
if time_utc >= min_datetime_utc_str:
merged_series[time_utc] = other.__series[time_utc]
merged_series.update(self.__series)
return ParameterUtcSeries(self.name, merged_series)
@staticmethod
def __datetime_to_utc_string(dt: datetime) -> str:
return dt.astimezone(pytz.UTC).strftime(ParameterUtcSeries.TIME_PATTERN)

@staticmethod
def create(name: str, time_series: List[datetime], values: Dict[str, List[float]]):
return ParameterUtcSeries(name, {time_series[i].astimezone(pytz.UTC).strftime("%Y.%m.%d %H"): values[name][i] for i in range(0, len(time_series))})
def __utc_string_to_datetime(utc_string: str) -> datetime:
return datetime.strptime(utc_string, ParameterUtcSeries.TIME_PATTERN)

@staticmethod
def from_dict(map: Dict[str, Any]):
return ParameterUtcSeries(map['name'], map['series'])
def create(name: str, time_series: List[datetime], values: Dict[str, List[float]]):
return ParameterUtcSeries({ParameterUtcSeries.__datetime_to_utc_string(time_series[i]): values[name][i] for i in range(0, len(time_series))})

def __init__(self, utc_series: Dict[str, float]):
self.utc_series = utc_series

def size(self) -> int:
return len(self.utc_series.keys())

@property
def date_from_utc(self) -> datetime:
return self.__utc_string_to_datetime(sorted(self.utc_series.keys())[0])

@property
def date_to_utc(self) -> datetime:
return self.__utc_string_to_datetime(sorted(self.utc_series.keys())[-1])

def value_at(self, dt: datetime) -> float:
return self.utc_series.get(self.__datetime_to_utc_string(dt))

def merge(self, old, min_datetime: datetime):
series = {time_utc: old.utc_series[time_utc] for time_utc in old.utc_series.keys() if time_utc >= self.__datetime_to_utc_string(min_datetime)}
series.update(self.utc_series)
return ParameterUtcSeries(series)


class MosmixS:
Expand All @@ -125,8 +127,6 @@ def create(station_id: str,
parameters: Dict[str, List[float]]):
return MosmixS(station_id,
issue_time_utc,
timesteps_utc[0],
timesteps_utc[-1],
{parameter: ParameterUtcSeries.create(parameter, timesteps_utc, parameters) for parameter in parameters})

def __utc_to_local(utc: datetime) -> datetime:
Expand All @@ -135,37 +135,21 @@ def __utc_to_local(utc: datetime) -> datetime:
def __init__(self,
station_id: str,
issue_time_utc: datetime,
date_from_utc: datetime,
date_to_utc: datetime,
parameter_series: Dict[str, ParameterUtcSeries]):
self.station_id = station_id
self.__issue_time_utc = issue_time_utc
self.__date_from_utc = date_from_utc
self.__date_to_utc = date_to_utc
self.__parameter_series = parameter_series
self.issue_time = MosmixS.__utc_to_local(self.__issue_time_utc)
self.date_from = MosmixS.__utc_to_local(self.__parameter_series["Rad1h"].date_from_utc)
self.date_to = MosmixS.__utc_to_local(self.__parameter_series["Rad1h"].date_to_utc)

@property
def issue_time(self) -> datetime:
return MosmixS.__utc_to_local(self.__issue_time_utc)

@property
def date_from(self) -> datetime:
return MosmixS.__utc_to_local(self.__date_from_utc)

@property
def date_to(self) -> datetime:
return MosmixS.__utc_to_local(self.__date_to_utc)

def merge(self, old_mosmix, min_local_datetime: datetime = None):
if min_local_datetime is None:
min_local_datetime = datetime.now() - timedelta(days=5)
def merge(self, old_mosmix):
if old_mosmix is None:
return self
else:
min_local_datetime = datetime.now() - timedelta(days=5)
merged = MosmixS(self.station_id,
self.__issue_time_utc,
old_mosmix.__date_from_utc,
self.__date_to_utc,
{parameter: self.__parameter_series[parameter].merge(old_mosmix.__parameter_series[parameter], min_local_datetime) for parameter in self.__parameter_series.keys()})
logging.debug("merging \nold mosmix: " + str(old_mosmix) + " \nnew mosmix: " + str(self) + " \nmerged mosmix: " + str(merged))
return merged
Expand Down Expand Up @@ -199,9 +183,7 @@ def save(self, filename: str = "mosmix.json"):
with open(filename, "w") as file:
data = json.dumps({ "station_id": self.station_id,
"issue_time_utc": self.__issue_time_utc.isoformat(),
"utc_date_from": self.__date_from_utc.isoformat(),
"utc_date_to": self.__date_to_utc.isoformat(),
"parameter_series": { parameter: self.__parameter_series[parameter].to_dict() for parameter in self.__parameter_series.keys()}})
"parameter_series": { parameter: self.__parameter_series[parameter].utc_series for parameter in self.__parameter_series.keys()}})
file.write(data)

@staticmethod
Expand All @@ -212,20 +194,13 @@ def load(filename: str = "mosmix.json"):
data = json.loads(file.read())
station_id = data['station_id']
issue_time_utc = datetime.fromisoformat(data['issue_time_utc'])
utc_date_from = datetime.fromisoformat(data['utc_date_from'])
utc_date_to = datetime.fromisoformat(data['utc_date_to'])
parameter_series = {parameter: ParameterUtcSeries.from_dict(data['parameter_series'][parameter]) for parameter in data['parameter_series'].keys()}
return MosmixS(station_id,
issue_time_utc,
utc_date_from,
utc_date_to,
parameter_series)
parameter_series = {parameter: ParameterUtcSeries(data['parameter_series'][parameter]) for parameter in data['parameter_series'].keys()}
return MosmixS(station_id, issue_time_utc, parameter_series)
except Exception as e:
logging.warning("error occurred loading mosmix cache file " + filename, e)
return None



class MosmixLoader(ABC):

@abstractmethod
Expand Down Expand Up @@ -266,7 +241,6 @@ def get(self) -> MosmixS:
return mosmix



class FileCachedMosmixLoader(MosmixLoader):

def __init__(self, station_id: str):
Expand All @@ -278,17 +252,15 @@ def __init__(self, station_id: str):

def get(self) -> MosmixS:
cache_filename = os.path.join(self.__dir, "mosmixs_" + self.__station_id + ".json")
cached_mosmix = MosmixS.load(cache_filename)
if cached_mosmix is not None:
if cached_mosmix.is_expired():
elasped_minutes_since_last_cache_refresh = int((time.time() - os.path.getmtime(cache_filename)) / 60)
if elasped_minutes_since_last_cache_refresh < 10: # at maximum all 10 min the (large!) mosmix file will be loaded via web
logging.debug("filebased mosmix cache is expired, however last refresh is < 10 min. return cached one -> " + str(cached_mosmix))
return cached_mosmix
mosmix = MosmixS.load(cache_filename)
if mosmix is not None:
if mosmix.is_expired():
if int((time.time() - os.path.getmtime(cache_filename)) / 60) < 10: # at maximum all 10 min the (large!) mosmix file will be loaded via web
return mosmix
else:
return cached_mosmix
mosmix = self.__parent_loader.get()
merged_mosmix = mosmix.merge(cached_mosmix)
return mosmix
new_mosmix = self.__parent_loader.get()
merged_mosmix = new_mosmix.merge(mosmix)
merged_mosmix.save(cache_filename)
return merged_mosmix

Expand Down
2 changes: 1 addition & 1 deletion pvpower/traindata.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def from_csv(line: str):


class TrainSampleLog:
COMPACTION_PERIOD_DAYS = 10
COMPACTION_PERIOD_DAYS = 15

def __init__(self, dirname: str):
self.lock = RLock()
Expand Down

0 comments on commit 4aee02a

Please sign in to comment.