Skip to content

Commit

Permalink
Move logic to ping pong cache (#1299)
Browse files Browse the repository at this point in the history
* Move logic to ping pong cache

* Add debug log to cache expiration

* Update dynamic.py
  • Loading branch information
SukramJ authored Nov 21, 2023
1 parent 995d795 commit 2511270
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 127 deletions.
106 changes: 99 additions & 7 deletions hahomematic/caches/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,22 @@
from collections.abc import Mapping
from datetime import datetime
import logging
from typing import Any, Final
from typing import Any, Final, cast

from hahomematic import central as hmcu
from hahomematic.config import PING_PONG_MISMATCH_COUNT, PING_PONG_MISMATCH_COUNT_TTL
from hahomematic.const import (
EVENT_DATA,
EVENT_INSTANCE_NAME,
EVENT_INTERFACE_ID,
EVENT_PONG_MISMATCH_COUNT,
EVENT_TYPE,
INIT_DATETIME,
MAX_CACHE_AGE,
NO_CACHE_ENTRY,
CallSource,
EventType,
InterfaceEventType,
InterfaceName,
)
from hahomematic.platforms.device import HmDevice
Expand Down Expand Up @@ -194,17 +201,21 @@ class PingPongCache:

def __init__(
self,
central: hmcu.CentralUnit,
interface_id: str,
allowed_delta: int = PING_PONG_MISMATCH_COUNT,
ttl: int = PING_PONG_MISMATCH_COUNT_TTL,
):
"""Initialize the cache with ttl."""
assert ttl > 0
self._central: Final = central
self._interface_id: Final = interface_id
self._allowed_delta: Final = allowed_delta
self._ttl: Final = ttl
self._pending_pongs: Final[set[datetime]] = set()
self._unknown_pongs: Final[set[datetime]] = set()
self._pending_pong_logged: bool = False
self._unknown_pong_logged: bool = False

@property
def high_pending_pongs(self) -> bool:
Expand Down Expand Up @@ -244,12 +255,18 @@ def clear(self) -> None:
"""Clear the cache."""
self._pending_pongs.clear()
self._unknown_pongs.clear()
self._pending_pong_logged = False
self._unknown_pong_logged = False

def handle_send_ping(self, ping_ts: datetime) -> None:
"""Handle send ping timestamp."""
self._pending_pongs.add(ping_ts)
self._check_and_fire_pong_event(
event_type=InterfaceEventType.PENDING_PONG,
pong_mismatch_count=self.pending_pong_count,
)
_LOGGER.debug(
"PING PONG CACHE: Increase pending PING count: %s, %i for ts: %s",
"PING PONG CACHE: Increase pending PING count: %s - %i for ts: %s",
self._interface_id,
self.pending_pong_count,
ping_ts,
Expand All @@ -259,17 +276,25 @@ def handle_received_pong(self, pong_ts: datetime) -> None:
"""Handle received pong timestamp."""
if pong_ts in self._pending_pongs:
self._pending_pongs.remove(pong_ts)
self._check_and_fire_pong_event(
event_type=InterfaceEventType.PENDING_PONG,
pong_mismatch_count=self.pending_pong_count,
)
_LOGGER.debug(
"PING PONG CACHE: Reduce pending PING count: %s, %i for ts: %s",
"PING PONG CACHE: Reduce pending PING count: %s - %i for ts: %s",
self._interface_id,
self.pending_pong_count,
pong_ts,
)
return

self._unknown_pongs.add(pong_ts)
self._check_and_fire_pong_event(
event_type=InterfaceEventType.UNKNOWN_PONG,
pong_mismatch_count=self.unknown_pong_count,
)
_LOGGER.debug(
"PING PONG CACHE: Increase unknown PONG count: %s, %i for ts: %s",
"PING PONG CACHE: Increase unknown PONG count: %s - %i for ts: %s",
self._interface_id,
self.unknown_pong_count,
pong_ts,
Expand All @@ -278,10 +303,16 @@ def handle_received_pong(self, pong_ts: datetime) -> None:
def _cleanup_pending_pongs(self) -> None:
"""Cleanup too old pending pongs."""
dt_now = datetime.now()
for ping_ts in list(self._pending_pongs):
delta = dt_now - ping_ts
for pong_ts in list(self._pending_pongs):
delta = dt_now - pong_ts
if delta.seconds > self._ttl:
self._pending_pongs.remove(ping_ts)
self._pending_pongs.remove(pong_ts)
_LOGGER.debug(
"PING PONG CACHE: Removing expired pending PONG: %s - %i for ts: %s",
self._interface_id,
self.pending_pong_count,
pong_ts,
)

def _cleanup_unknown_pongs(self) -> None:
"""Cleanup too old unknown pongs."""
Expand All @@ -290,3 +321,64 @@ def _cleanup_unknown_pongs(self) -> None:
delta = dt_now - pong_ts
if delta.seconds > self._ttl:
self._unknown_pongs.remove(pong_ts)
_LOGGER.debug(
"PING PONG CACHE: Removing expired unknown PONG: %s - %i or ts: %s",
self._interface_id,
self.unknown_pong_count,
pong_ts,
)

def _check_and_fire_pong_event(
self, event_type: InterfaceEventType, pong_mismatch_count: int
) -> None:
"""Fire an event about the pong status."""

def _fire_event(mismatch_count: int) -> None:
self._central.fire_ha_event_callback(
event_type=EventType.INTERFACE,
event_data=cast(
dict[str, Any],
hmcu.INTERFACE_EVENT_SCHEMA(
{
EVENT_INTERFACE_ID: self._interface_id,
EVENT_TYPE: event_type,
EVENT_DATA: {
EVENT_INSTANCE_NAME: self._central.config.name,
EVENT_PONG_MISMATCH_COUNT: mismatch_count,
},
}
),
),
)

if self.low_pending_pongs and event_type == InterfaceEventType.PENDING_PONG:
_fire_event(mismatch_count=0)
self._pending_pong_logged = False
return

if self.low_unknown_pongs and event_type == InterfaceEventType.UNKNOWN_PONG:
self._unknown_pong_logged = False
return

if self.high_pending_pongs and event_type == InterfaceEventType.PENDING_PONG:
_fire_event(mismatch_count=pong_mismatch_count)
if self._pending_pong_logged is False:
_LOGGER.warning(
"Pending PONG mismatch: There is a mismatch between send ping events and received pong events for HA instance %s. "
"Possible reason 1: You are running multiple instances of HA with the same instance name configured for this integration. "
"Re-add one instance! Otherwise this HA instance will not receive update events from your CCU. "
"Possible reason 2: Something is stuck on the CCU or hasn't been cleaned up. Therefore, try a CCU restart.",
self._interface_id,
)
self._pending_pong_logged = True

if self.high_unknown_pongs and event_type == InterfaceEventType.UNKNOWN_PONG:
if self._unknown_pong_logged is False:
_LOGGER.warning(
"Unknown PONG Mismatch: Your HA instance %s receives PONG events, that it hasn't send. "
"Possible reason 1: You are running multiple instances of HA with the same instance name configured for this integration. "
"Re-add one instance! Otherwise the other HA instance will not receive update events from your CCU. "
"Possible reason 2: Something is stuck on the CCU or hasn't been cleaned up. Therefore, try a CCU restart.",
self._interface_id,
)
self._unknown_pong_logged = True
2 changes: 1 addition & 1 deletion hahomematic/central/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ def event(self, interface_id: str, channel_address: str, parameter: str, value:
if (
client := self.get_client(interface_id=interface_id)
) and client.supports_ping_pong:
client.handle_received_pong(
client.ping_pong_cache.handle_received_pong(
pong_ts=datetime.strptime(v_timestamp, DATETIME_FORMAT_MILLIS)
)
return
Expand Down
123 changes: 11 additions & 112 deletions hahomematic/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,13 @@
from hahomematic.const import (
DATETIME_FORMAT_MILLIS,
EVENT_AVAILABLE,
EVENT_DATA,
EVENT_INSTANCE_NAME,
EVENT_INTERFACE_ID,
EVENT_PONG_MISMATCH_COUNT,
EVENT_SECONDS_SINCE_LAST_EVENT,
EVENT_TYPE,
HOMEGEAR_SERIAL,
INIT_DATETIME,
VIRTUAL_REMOTE_TYPES,
Backend,
CallSource,
Description,
EventType,
ForcedDeviceAvailability,
InterfaceEventType,
InterfaceName,
Expand Down Expand Up @@ -68,9 +62,9 @@ def __init__(self, client_config: _ClientConfig) -> None:
self._connection_error_count: int = 0
self._is_callback_alive: bool = True
self.last_updated: datetime = INIT_DATETIME
self._ping_pong_cache: Final = PingPongCache(interface_id=client_config.interface_id)
self._pending_pong_logged: bool = False
self._unknown_pong_logged: bool = False
self._ping_pong_cache: Final = PingPongCache(
central=client_config.central, interface_id=client_config.interface_id
)

self._proxy: XmlRpcProxy
self._proxy_read: XmlRpcProxy
Expand All @@ -96,6 +90,11 @@ def available(self) -> bool:
def model(self) -> str:
"""Return the model of the backend."""

@property
def ping_pong_cache(self) -> PingPongCache:
"""Return the ping pong cache."""
return self._ping_pong_cache

@property
@abstractmethod
def supports_ping_pong(self) -> bool:
Expand All @@ -105,7 +104,7 @@ async def proxy_init(self) -> ProxyInitState:
"""Init the proxy has to tell the CCU / Homegear where to send the events."""
try:
_LOGGER.debug("PROXY_INIT: init('%s', '%s')", self._config.init_url, self.interface_id)
self._clear_ping_pong_cache()
self._ping_pong_cache.clear()
await self._proxy.init(self._config.init_url, self.interface_id)
self._mark_all_devices_forced_availability(
forced_availability=ForcedDeviceAvailability.NOT_SET
Expand All @@ -123,12 +122,6 @@ async def proxy_init(self) -> ProxyInitState:
self.last_updated = datetime.now()
return ProxyInitState.INIT_SUCCESS

def _clear_ping_pong_cache(self) -> None:
"""Clear the ping pong cache."""
self._ping_pong_cache.clear()
self._pending_pong_logged = False
self._unknown_pong_logged = False

async def proxy_de_init(self) -> ProxyInitState:
"""De-init to stop CCU from sending events for this remote."""
if self.last_updated == INIT_DATETIME:
Expand Down Expand Up @@ -335,100 +328,6 @@ async def get_device_description(self, device_address: str) -> tuple[dict[str, A
)
return None

def handle_send_ping(self, ping_ts: datetime) -> None:
"""Increase the number of send ping events."""
if self.supports_ping_pong is True:
self._ping_pong_cache.handle_send_ping(ping_ts=ping_ts)
self._check_and_fire_pending_pong_event()

def handle_received_pong(self, pong_ts: datetime) -> None:
"""Increase the number of send ping events."""
if self.supports_ping_pong is True:
self._ping_pong_cache.handle_received_pong(pong_ts=pong_ts)
self._check_and_fire_pending_pong_event()
self._check_and_fire_unknown_pong_event()

def _check_and_fire_pending_pong_event(self) -> None:
"""Fire an event about the pending pong status."""
self._check_and_fire_pong_event(
event_type=InterfaceEventType.PENDING_PONG,
pong_mismatch_count=self._ping_pong_cache.pending_pong_count,
)

def _check_and_fire_unknown_pong_event(self) -> None:
"""Fire an event about the unknown pong status."""
self._check_and_fire_pong_event(
event_type=InterfaceEventType.UNKNOWN_PONG,
pong_mismatch_count=self._ping_pong_cache.unknown_pong_count,
)

def _check_and_fire_pong_event(
self, event_type: InterfaceEventType, pong_mismatch_count: int
) -> None:
"""Fire an event about the pong status."""

def _fire_event(mismatch_count: int) -> None:
self.central.fire_ha_event_callback(
event_type=EventType.INTERFACE,
event_data=cast(
dict[str, Any],
hmcu.INTERFACE_EVENT_SCHEMA(
{
EVENT_INTERFACE_ID: self.interface_id,
EVENT_TYPE: event_type,
EVENT_DATA: {
EVENT_INSTANCE_NAME: self.central.config.name,
EVENT_PONG_MISMATCH_COUNT: mismatch_count,
},
}
),
),
)

if (
self._ping_pong_cache.low_pending_pongs
and event_type == InterfaceEventType.PENDING_PONG
):
_fire_event(mismatch_count=0)
self._pending_pong_logged = False
return

if (
self._ping_pong_cache.low_unknown_pongs
and event_type == InterfaceEventType.UNKNOWN_PONG
):
self._unknown_pong_logged = False
return

if (
self._ping_pong_cache.high_pending_pongs
and event_type == InterfaceEventType.PENDING_PONG
):
_fire_event(mismatch_count=pong_mismatch_count)
if self._pending_pong_logged is False:
_LOGGER.warning(
"Pending PONG mismatch: There is a mismatch between send ping events and received pong events for HA instance %s. "
"Possible reason 1: You are running multiple instances of HA with the same instance name configured for this integration. "
"Re-add one instance! Otherwise this HA instance will not receive update events from your CCU. "
"Possible reason 2: Something is stuck on the CCU or hasn't been cleaned up. Therefore, try a CCU restart.",
self.interface_id,
)
self._pending_pong_logged = True

if (
self._ping_pong_cache.high_unknown_pongs
and event_type == InterfaceEventType.UNKNOWN_PONG
):
if self._unknown_pong_logged is False:
_LOGGER.warning(
"Unknown PONG Mismatch: Your HA instance %s receives PONG events, that it hasn't send. "
"Possible reason 1: You are running multiple instances of HA with the same instance name configured for this integration. "
"Re-add one instance! Otherwise the other HA instance will not receive update events from your CCU. "
"Possible reason 2: Something is stuck on the CCU or hasn't been cleaned up. Therefore, try a CCU restart.",
self.interface_id,
)
self._unknown_pong_logged = True

async def set_install_mode(
self,
on: bool = True,
Expand Down Expand Up @@ -819,8 +718,8 @@ async def check_connection_availability(self, handle_ping_pong: bool) -> bool:
"""Check if _proxy is still initialized."""
try:
dt_now = datetime.now()
if handle_ping_pong:
self.handle_send_ping(ping_ts=dt_now)
if handle_ping_pong and self.supports_ping_pong:
self._ping_pong_cache.handle_send_ping(ping_ts=dt_now)
await self._proxy.ping(
f"{self.interface_id}#{dt_now.strftime(DATETIME_FORMAT_MILLIS)}"
)
Expand Down
4 changes: 2 additions & 2 deletions hahomematic_support/client_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ def is_callback_alive(self) -> bool:

async def check_connection_availability(self, handle_ping_pong: bool) -> bool:
"""Send ping to CCU to generate PONG event."""
if handle_ping_pong:
self.handle_send_ping(ping_ts=datetime.now())
if handle_ping_pong and self.supports_ping_pong:
self._ping_pong_cache.handle_send_ping(ping_ts=datetime.now())
return True

async def execute_program(self, pid: str) -> bool:
Expand Down
Loading

0 comments on commit 2511270

Please sign in to comment.