From 712000003cd9ebd16962c10b9e60eb9ded15fc85 Mon Sep 17 00:00:00 2001 From: mle Date: Wed, 1 May 2024 15:47:03 +0200 Subject: [PATCH] Move _ensure_lock() from Inverter to InverterProtocol Move _ensure_lock() from Inverter to InverterProtocol and re-use UDP transport betweem call (do not close it after each request). --- goodwe/inverter.py | 43 +++++++------------------ goodwe/protocol.py | 80 ++++++++++++++++++++++++++++++---------------- 2 files changed, 63 insertions(+), 60 deletions(-) diff --git a/goodwe/inverter.py b/goodwe/inverter.py index 9e905d7..a5cd660 100644 --- a/goodwe/inverter.py +++ b/goodwe/inverter.py @@ -1,6 +1,5 @@ from __future__ import annotations -import asyncio import logging from abc import ABC, abstractmethod from dataclasses import dataclass @@ -89,8 +88,6 @@ class Inverter(ABC): def __init__(self, host: str, port: int, comm_addr: int = 0, timeout: int = 1, retries: int = 3): self._protocol: InverterProtocol = self._create_protocol(host, port, timeout, retries) - self._running_loop: asyncio.AbstractEventLoop | None = None - self._lock: asyncio.Lock | None = None self._consecutive_failures_count: int = 0 self.comm_addr: int = comm_addr @@ -120,36 +117,18 @@ def _write_multi_command(self, offset: int, values: bytes) -> ProtocolCommand: """Create write multiple protocol command.""" return self._protocol.write_multi_command(self.comm_addr, offset, values) - def _ensure_lock(self) -> asyncio.Lock: - """Validate (or create) asyncio Lock. - - The asyncio.Lock must always be created from within's asyncio loop, - so it cannot be eagerly created in constructor. - Additionally, since asyncio.run() creates and closes its own loop, - the lock's scope (its creating loop) mus be verified to support proper - behavior in subsequent asyncio.run() invocations. - """ - if self._lock and self._running_loop == asyncio.get_event_loop(): - return self._lock - else: - logger.debug("Creating lock instance for current event loop.") - self._lock = asyncio.Lock() - self._running_loop = asyncio.get_event_loop() - return self._lock - async def _read_from_socket(self, command: ProtocolCommand) -> ProtocolResponse: - async with self._ensure_lock(): - try: - result = await command.execute(self._protocol) - self._consecutive_failures_count = 0 - return result - except MaxRetriesException: - self._consecutive_failures_count += 1 - raise RequestFailedException(f'No valid response received even after {self._protocol.retries} retries', - self._consecutive_failures_count) from None - except RequestFailedException as ex: - self._consecutive_failures_count += 1 - raise RequestFailedException(ex.message, self._consecutive_failures_count) from None + try: + result = await command.execute(self._protocol) + self._consecutive_failures_count = 0 + return result + except MaxRetriesException: + self._consecutive_failures_count += 1 + raise RequestFailedException(f'No valid response received even after {self._protocol.retries} retries', + self._consecutive_failures_count) from None + except RequestFailedException as ex: + self._consecutive_failures_count += 1 + raise RequestFailedException(ex.message, self._consecutive_failures_count) from None @abstractmethod async def read_device_info(self): diff --git a/goodwe/protocol.py b/goodwe/protocol.py index 57561c9..565a7a6 100644 --- a/goodwe/protocol.py +++ b/goodwe/protocol.py @@ -19,6 +19,8 @@ class InverterProtocol: def __init__(self, host: str, port: int, timeout: int, retries: int): self._host: str = host self._port: int = port + self._running_loop: asyncio.AbstractEventLoop | None = None + self._lock: asyncio.Lock | None = None self._timer: asyncio.TimerHandle | None = None self.timeout: int = timeout self.retries: int = retries @@ -26,6 +28,27 @@ def __init__(self, host: str, port: int, timeout: int, retries: int): self.response_future: Future | None = None self.command: ProtocolCommand | None = None + def _ensure_lock(self) -> asyncio.Lock: + """Validate (or create) asyncio Lock. + + The asyncio.Lock must always be created from within's asyncio loop, + so it cannot be eagerly created in constructor. + Additionally, since asyncio.run() creates and closes its own loop, + the lock's scope (its creating loop) mus be verified to support proper + behavior in subsequent asyncio.run() invocations. + """ + if self._lock and self._running_loop == asyncio.get_event_loop(): + return self._lock + else: + logger.debug("Creating lock instance for current event loop.") + self._lock = asyncio.Lock() + self._running_loop = asyncio.get_event_loop() + self._close_transport() + return self._lock + + def _close_transport(self) -> None: + raise NotImplementedError() + async def send_request(self, command: ProtocolCommand) -> Future: raise NotImplementedError() @@ -88,7 +111,6 @@ def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None: if self.command.validator(data): logger.debug("Received: %s", data.hex()) self.response_future.set_result(data) - self._close_transport() else: logger.debug("Received invalid response: %s", data.hex()) asyncio.get_running_loop().call_soon(self._retry_mechanism) @@ -105,12 +127,13 @@ def error_received(self, exc: Exception) -> None: async def send_request(self, command: ProtocolCommand) -> Future: """Send message via transport""" - await self._connect() - response_future = asyncio.get_running_loop().create_future() - self._retry = 0 - self._send_request(command, response_future) - await response_future - return response_future + async with self._ensure_lock(): + await self._connect() + response_future = asyncio.get_running_loop().create_future() + self._retry = 0 + self._send_request(command, response_future) + await response_future + return response_future def _send_request(self, command: ProtocolCommand, response_future: Future) -> None: """Send message via transport""" @@ -217,27 +240,28 @@ def error_received(self, exc: Exception) -> None: async def send_request(self, command: ProtocolCommand) -> Future: """Send message via transport""" - try: - await self._connect() - response_future = asyncio.get_running_loop().create_future() - self._send_request(command, response_future) - await response_future - return response_future - except asyncio.CancelledError: - if self._retry < self.retries: - logger.debug("Connection broken error") - self._retry += 1 - self._close_transport() - return await self.send_request(command) - else: - return self._max_retries_reached() - except ConnectionRefusedError as exc: - if self._retry < self.retries: - logger.debug("Connection refused error: %s", exc) - self._retry += 1 - return await self.send_request(command) - else: - return self._max_retries_reached() + async with self._ensure_lock(): + try: + await self._connect() + response_future = asyncio.get_running_loop().create_future() + self._send_request(command, response_future) + await response_future + return response_future + except asyncio.CancelledError: + if self._retry < self.retries: + logger.debug("Connection broken error") + self._retry += 1 + self._close_transport() + return await self.send_request(command) + else: + return self._max_retries_reached() + except ConnectionRefusedError as exc: + if self._retry < self.retries: + logger.debug("Connection refused error: %s", exc) + self._retry += 1 + return await self.send_request(command) + else: + return self._max_retries_reached() def _send_request(self, command: ProtocolCommand, response_future: Future) -> None: """Send message via transport"""