Skip to content

Commit

Permalink
Move _ensure_lock() from Inverter to InverterProtocol
Browse files Browse the repository at this point in the history
Move _ensure_lock() from Inverter to InverterProtocol and re-use UDP transport betweem call (do not close it after each request).
  • Loading branch information
mletenay committed May 1, 2024
1 parent 32d9422 commit 7120000
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 60 deletions.
43 changes: 11 additions & 32 deletions goodwe/inverter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import asyncio
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
Expand Down Expand Up @@ -89,8 +88,6 @@ class Inverter(ABC):

def __init__(self, host: str, port: int, comm_addr: int = 0, timeout: int = 1, retries: int = 3):
self._protocol: InverterProtocol = self._create_protocol(host, port, timeout, retries)
self._running_loop: asyncio.AbstractEventLoop | None = None
self._lock: asyncio.Lock | None = None
self._consecutive_failures_count: int = 0

self.comm_addr: int = comm_addr
Expand Down Expand Up @@ -120,36 +117,18 @@ def _write_multi_command(self, offset: int, values: bytes) -> ProtocolCommand:
"""Create write multiple protocol command."""
return self._protocol.write_multi_command(self.comm_addr, offset, values)

def _ensure_lock(self) -> asyncio.Lock:
"""Validate (or create) asyncio Lock.
The asyncio.Lock must always be created from within's asyncio loop,
so it cannot be eagerly created in constructor.
Additionally, since asyncio.run() creates and closes its own loop,
the lock's scope (its creating loop) mus be verified to support proper
behavior in subsequent asyncio.run() invocations.
"""
if self._lock and self._running_loop == asyncio.get_event_loop():
return self._lock
else:
logger.debug("Creating lock instance for current event loop.")
self._lock = asyncio.Lock()
self._running_loop = asyncio.get_event_loop()
return self._lock

async def _read_from_socket(self, command: ProtocolCommand) -> ProtocolResponse:
async with self._ensure_lock():
try:
result = await command.execute(self._protocol)
self._consecutive_failures_count = 0
return result
except MaxRetriesException:
self._consecutive_failures_count += 1
raise RequestFailedException(f'No valid response received even after {self._protocol.retries} retries',
self._consecutive_failures_count) from None
except RequestFailedException as ex:
self._consecutive_failures_count += 1
raise RequestFailedException(ex.message, self._consecutive_failures_count) from None
try:
result = await command.execute(self._protocol)
self._consecutive_failures_count = 0
return result
except MaxRetriesException:
self._consecutive_failures_count += 1
raise RequestFailedException(f'No valid response received even after {self._protocol.retries} retries',
self._consecutive_failures_count) from None
except RequestFailedException as ex:
self._consecutive_failures_count += 1
raise RequestFailedException(ex.message, self._consecutive_failures_count) from None

@abstractmethod
async def read_device_info(self):
Expand Down
80 changes: 52 additions & 28 deletions goodwe/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,36 @@ class InverterProtocol:
def __init__(self, host: str, port: int, timeout: int, retries: int):
self._host: str = host
self._port: int = port
self._running_loop: asyncio.AbstractEventLoop | None = None
self._lock: asyncio.Lock | None = None
self._timer: asyncio.TimerHandle | None = None
self.timeout: int = timeout
self.retries: int = retries
self.protocol: asyncio.Protocol | None = None
self.response_future: Future | None = None
self.command: ProtocolCommand | None = None

def _ensure_lock(self) -> asyncio.Lock:
"""Validate (or create) asyncio Lock.
The asyncio.Lock must always be created from within's asyncio loop,
so it cannot be eagerly created in constructor.
Additionally, since asyncio.run() creates and closes its own loop,
the lock's scope (its creating loop) mus be verified to support proper
behavior in subsequent asyncio.run() invocations.
"""
if self._lock and self._running_loop == asyncio.get_event_loop():
return self._lock
else:
logger.debug("Creating lock instance for current event loop.")
self._lock = asyncio.Lock()
self._running_loop = asyncio.get_event_loop()
self._close_transport()
return self._lock

def _close_transport(self) -> None:
raise NotImplementedError()

async def send_request(self, command: ProtocolCommand) -> Future:
raise NotImplementedError()

Expand Down Expand Up @@ -88,7 +111,6 @@ def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None:
if self.command.validator(data):
logger.debug("Received: %s", data.hex())
self.response_future.set_result(data)
self._close_transport()
else:
logger.debug("Received invalid response: %s", data.hex())
asyncio.get_running_loop().call_soon(self._retry_mechanism)
Expand All @@ -105,12 +127,13 @@ def error_received(self, exc: Exception) -> None:

async def send_request(self, command: ProtocolCommand) -> Future:
"""Send message via transport"""
await self._connect()
response_future = asyncio.get_running_loop().create_future()
self._retry = 0
self._send_request(command, response_future)
await response_future
return response_future
async with self._ensure_lock():
await self._connect()
response_future = asyncio.get_running_loop().create_future()
self._retry = 0
self._send_request(command, response_future)
await response_future
return response_future

def _send_request(self, command: ProtocolCommand, response_future: Future) -> None:
"""Send message via transport"""
Expand Down Expand Up @@ -217,27 +240,28 @@ def error_received(self, exc: Exception) -> None:

async def send_request(self, command: ProtocolCommand) -> Future:
"""Send message via transport"""
try:
await self._connect()
response_future = asyncio.get_running_loop().create_future()
self._send_request(command, response_future)
await response_future
return response_future
except asyncio.CancelledError:
if self._retry < self.retries:
logger.debug("Connection broken error")
self._retry += 1
self._close_transport()
return await self.send_request(command)
else:
return self._max_retries_reached()
except ConnectionRefusedError as exc:
if self._retry < self.retries:
logger.debug("Connection refused error: %s", exc)
self._retry += 1
return await self.send_request(command)
else:
return self._max_retries_reached()
async with self._ensure_lock():
try:
await self._connect()
response_future = asyncio.get_running_loop().create_future()
self._send_request(command, response_future)
await response_future
return response_future
except asyncio.CancelledError:
if self._retry < self.retries:
logger.debug("Connection broken error")
self._retry += 1
self._close_transport()
return await self.send_request(command)
else:
return self._max_retries_reached()
except ConnectionRefusedError as exc:
if self._retry < self.retries:
logger.debug("Connection refused error: %s", exc)
self._retry += 1
return await self.send_request(command)
else:
return self._max_retries_reached()

def _send_request(self, command: ProtocolCommand, response_future: Future) -> None:
"""Send message via transport"""
Expand Down

0 comments on commit 7120000

Please sign in to comment.