Skip to content

Commit

Permalink
Rewrite ping logic for cloud API
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexxIT committed Oct 27, 2024
1 parent f5a62d6 commit 0eac42c
Showing 1 changed file with 15 additions and 39 deletions.
54 changes: 15 additions & 39 deletions custom_components/sonoff/core/ewelink/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,41 +293,6 @@ def sign(msg: bytes) -> bytes:
).digest()


# noinspection PyProtectedMember
class WebSocket:
"""Default asyncio.WebSocket keep-alive only incoming messages with heartbeats.
This is helpful if messages from the server don't come very often.
With this changes we also keep-alive outgoing messages with heartbeats.
This is helpful if our messages to the server are not sent very often.
"""

def __init__(self, ws: ClientWebSocketResponse):
self._heartbeat: float = ws._heartbeat
self._heartbeat_cb: asyncio.TimerHandle | None = None
self.ws = ws

def __aiter__(self):
return self.ws

async def __anext__(self):
return await self.ws.__anext__()

async def receive_json(self):
return await self.ws.receive_json()

async def send_json(self, data: dict):
if self._heartbeat_cb:
self._heartbeat_cb.cancel()
self._heartbeat_cb = None

self._heartbeat_cb = self.ws._loop.call_later(
self._heartbeat, self.ws._send_heartbeat
)

await self.ws.send_json(data)


class XRegistryCloud(ResponseWaiter, XRegistryBase):
auth: dict | None = None
devices: dict[str, dict] = None
Expand All @@ -336,7 +301,7 @@ class XRegistryCloud(ResponseWaiter, XRegistryBase):
region: str = None

task: asyncio.Task | None = None
ws: WebSocket = None
ws: ClientWebSocketResponse = None

@property
def host(self) -> str:
Expand Down Expand Up @@ -545,6 +510,8 @@ async def run_forever(self, **kwargs):
try:
msg: WSMessage
async for msg in self.ws:
if msg.data == "pong":
continue
resp = json.loads(msg.data)
_ = asyncio.create_task(self._process_ws_msg(resp))
except Exception as e:
Expand All @@ -557,10 +524,9 @@ async def connect(self) -> bool:
resp = await r.json()

# we can use IP, but using domain because security
ws = await self.session.ws_connect(
self.ws = await self.session.ws_connect(
f"wss://{resp['domain']}:{resp['port']}/api/ws", heartbeat=90
)
self.ws = WebSocket(ws)

# https://coolkit-technologies.github.io/eWeLink-API/#/en/APICenterV2?id=websocket-handshake
ts = time.time()
Expand Down Expand Up @@ -592,7 +558,7 @@ async def connect(self) -> bool:
raise Exception(resp)

if (config := resp.get("config")) and config.get("hb"):
self.ws._heartbeat = config.get("hbInterval")
asyncio.create_task(_ping(self.ws, config.get("hbInterval")))

return True

Expand Down Expand Up @@ -636,3 +602,13 @@ async def _process_ws_msg(self, data: dict):

else:
_LOGGER.warning(f"UNKNOWN cloud msg: {data}")


# https://coolkit-technologies.github.io/eWeLink-API/#/en/OAuth2.0?id=websocket-handshake
async def _ping(ws: ClientWebSocketResponse, heartbeat: int):
try:
while heartbeat:
await asyncio.sleep(heartbeat)
await ws.send_str("ping")
except:
pass

0 comments on commit 0eac42c

Please sign in to comment.