Skip to content

Commit

Permalink
Add basic support for TCP connections
Browse files Browse the repository at this point in the history
Create generic InverterProtocol and create both UDP and TCP based subclasses, keeping connection/socket open by default.
(TCP is at this moment Modbus/RTU over TPC, not Modbus/TCP).
  • Loading branch information
mletenay committed Apr 23, 2024
1 parent d8cea46 commit 28642e3
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 97 deletions.
16 changes: 3 additions & 13 deletions goodwe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async def discover(host: str, port: int = GOODWE_UDP_PORT, timeout: int = 1, ret
# Try the common AA55C07F0102000241 command first and detect inverter type from serial_number
try:
logger.debug("Probing inverter at %s.", host)
response = await DISCOVERY_COMMAND.execute(host, port, timeout, retries)
response = await DISCOVERY_COMMAND.execute(UdpInverterProtocol(host, port, timeout, retries))
response = response.response_data()
model_name = response[5:15].decode("ascii").rstrip()
serial_number = response[31:47].decode("ascii")
Expand Down Expand Up @@ -120,22 +120,12 @@ async def search_inverters() -> bytes:
Raise InverterError if unable to contact any inverter
"""
logger.debug("Searching inverters by broadcast to port 48899")
loop = asyncio.get_running_loop()
command = ProtocolCommand("WIFIKIT-214028-READ".encode("utf-8"), lambda r: True)
response_future = loop.create_future()
transport, _ = await loop.create_datagram_endpoint(
lambda: UdpInverterProtocol(response_future, command, 1, 3),
remote_addr=("255.255.255.255", 48899),
allow_broadcast=True,
)
try:
await response_future
result = response_future.result()
result = await command.execute(UdpInverterProtocol("255.255.255.255", 48899, 1, 0))
if result is not None:
return result
return result.response_data()
else:
raise InverterError("No response received to broadcast request.")
except asyncio.CancelledError:
raise InverterError("No valid response received to broadcast request.") from None
finally:
transport.close()
1 change: 1 addition & 0 deletions goodwe/const.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Dict

GOODWE_TCP_PORT = 502
GOODWE_UDP_PORT = 8899

BATTERY_MODES: Dict[int, str] = {
Expand Down
25 changes: 15 additions & 10 deletions goodwe/inverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import Any, Callable, Dict, Tuple, Optional

from .exceptions import MaxRetriesException, RequestFailedException
from .protocol import ProtocolCommand, ProtocolResponse
from .protocol import InverterProtocol, ProtocolCommand, ProtocolResponse, TcpInverterProtocol, UdpInverterProtocol

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -88,15 +88,13 @@ class Inverter(ABC):
"""

def __init__(self, host: str, port: int, comm_addr: int = 0, timeout: int = 1, retries: int = 3):
self.host: str = host
self.port: int = port
self.comm_addr: int = comm_addr
self.timeout: int = timeout
self.retries: int = retries
self._protocol: InverterProtocol = self._create_protocol(host, port, timeout, retries)
self._running_loop: asyncio.AbstractEventLoop | None = None
self._lock: asyncio.Lock | None = None
self._consecutive_failures_count: int = 0

self.comm_addr: int = comm_addr

self.model_name: str | None = None
self.serial_number: str | None = None
self.rated_power: int = 0
Expand Down Expand Up @@ -130,12 +128,12 @@ def _ensure_lock(self) -> asyncio.Lock:
async def _read_from_socket(self, command: ProtocolCommand) -> ProtocolResponse:
async with self._ensure_lock():
try:
result = await command.execute(self.host, self.port, self.timeout, self.retries)
result = await command.execute(self._protocol)
self._consecutive_failures_count = 0
return result
except MaxRetriesException:
self._consecutive_failures_count += 1
raise RequestFailedException(f'No valid response received even after {self.retries} retries',
raise RequestFailedException(f'No valid response received even after {self._protocol.retries} retries',
self._consecutive_failures_count)
except RequestFailedException as ex:
self._consecutive_failures_count += 1
Expand Down Expand Up @@ -191,8 +189,8 @@ async def send_command(
self, command: bytes, validator: Callable[[bytes], bool] = lambda x: True
) -> ProtocolResponse:
"""
Send low level udp command (as bytes).
Answer command's raw response data.
Send low level command (as bytes).
Answer ProtocolResponse with command's raw response data.
"""
return await self._read_from_socket(ProtocolCommand(command, validator))

Expand Down Expand Up @@ -278,6 +276,13 @@ def settings(self) -> Tuple[Sensor, ...]:
"""
raise NotImplementedError()

@staticmethod
def _create_protocol(host: str, port: int, timeout: int, retries: int) -> InverterProtocol:
if port == 502:
return TcpInverterProtocol(host, port, timeout, retries)
else:
return UdpInverterProtocol(host, port, timeout, retries)

@staticmethod
def _map_response(response: ProtocolResponse, sensors: Tuple[Sensor, ...]) -> Dict[str, Any]:
"""Process the response data and return dictionary with runtime values"""
Expand Down
215 changes: 171 additions & 44 deletions goodwe/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,43 @@
logger = logging.getLogger(__name__)


class UdpInverterProtocol(asyncio.DatagramProtocol):
def __init__(
self,
response_future: Future,
command: ProtocolCommand,
timeout: int,
retries: int
):
super().__init__()
self.response_future: Future = response_future
self.command: ProtocolCommand = command
class InverterProtocol:

def __init__(self, host: str, port: int, timeout: int, retries: int):
self._host: str = host
self._port: int = port
self.timeout: int = timeout
self.retries: int = retries
self.protocol: asyncio.Protocol | None = None
self.response_future: Future | None = None
self.command: ProtocolCommand | None = None

async def send_request(self, command: ProtocolCommand) -> Future:
raise NotImplementedError()


class UdpInverterProtocol(InverterProtocol, asyncio.DatagramProtocol):
def __init__(self, host: str, port: int, timeout: int = 1, retries: int = 3):
super().__init__(host, port, timeout, retries)
self._transport: asyncio.transports.DatagramTransport | None = None
self._retry_timeout: int = timeout
self._max_retries: int = retries
self._retries: int = 0
self._retry: int = 0

async def _connect(self) -> None:
if not self._transport or self._transport.is_closing():
self._transport, self.protocol = await asyncio.get_running_loop().create_datagram_endpoint(
lambda: self,
remote_addr=(self._host, self._port),
)

def connection_made(self, transport: asyncio.DatagramTransport) -> None:
"""On connection made"""
self._transport = transport
self._send_request()

def connection_lost(self, exc: Optional[Exception]) -> None:
"""On connection lost"""
if exc is not None:
logger.debug("Socket closed with error: %s.", exc)
# Cancel Future on connection lost
if not self.response_future.done():
self.response_future.cancel()
self._close_transport()

def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None:
"""On datagram received"""
Expand All @@ -50,35 +59,163 @@ def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None:
self.response_future.set_result(data)
else:
logger.debug("Received invalid response: %s", data.hex())
self._retries += 1
self._send_request()
self._retry += 1
self._send_request(self.command, self.response_future)
except RequestRejectedException as ex:
logger.debug("Received exception response: %s", data.hex())
self.response_future.set_exception(ex)
self._close_transport()

def error_received(self, exc: Exception) -> None:
"""On error received"""
logger.debug("Received error: %s", exc)
self.response_future.set_exception(exc)
self._close_transport()

def _send_request(self) -> None:
async def send_request(self, command: ProtocolCommand) -> Future:
"""Send message via transport"""
await self._connect()
response_future = asyncio.get_running_loop().create_future()
self._retry = 0
self._send_request(command, response_future)
await response_future
return response_future

def _send_request(self, command: ProtocolCommand, response_future: Future) -> None:
"""Send message via transport"""
self.command = command
self.response_future = response_future
logger.debug("Sending: %s%s", self.command,
f' - retry #{self._retries}/{self._max_retries}' if self._retries > 0 else '')
f' - retry #{self._retry}/{self.retries}' if self._retry > 0 else '')
self._transport.sendto(self.command.request)
asyncio.get_event_loop().call_later(self._retry_timeout, self._retry_mechanism)
asyncio.get_running_loop().call_later(self.timeout, self._retry_mechanism)

def _retry_mechanism(self) -> None:
"""Retry mechanism to prevent hanging transport"""
if self.response_future.done():
self._transport.close()
elif self._retries < self._max_retries:
logger.debug("Failed to receive response to %s in time (%ds).", self.command, self._retry_timeout)
self._retries += 1
self._send_request()
self._close_transport()
elif self._retry < self.retries:
logger.debug("Failed to receive response to %s in time (%ds).", self.command, self.timeout)
self._retry += 1
self._send_request(self.command, self.response_future)
else:
logger.debug("Max number of retries (%d) reached, request %s failed.", self._max_retries, self.command)
logger.debug("Max number of retries (%d) reached, request %s failed.", self.retries, self.command)
self.response_future.set_exception(MaxRetriesException)
self._close_transport()

def _close_transport(self) -> None:
if self._transport:
self._transport.close()
self._transport = None
# Cancel Future on connection close
if self.response_future and not self.response_future.done():
self.response_future.cancel()


class TcpInverterProtocol(InverterProtocol, asyncio.Protocol):
def __init__(self, host: str, port: int, timeout: int = 1, retries: int = 0):
super().__init__(host, port, timeout, retries)
self._transport: asyncio.transports.Transport | None = None
self._retry: int = 0

async def _connect(self) -> None:
if not self._transport or self._transport.is_closing():
self._transport, self.protocol = await asyncio.get_running_loop().create_connection(
lambda: self,
host=self._host, port=self._port,
)

def connection_made(self, transport: asyncio.DatagramTransport) -> None:
"""On connection made"""
logger.debug("Connection opened.")
pass

def eof_received(self) -> None:
logger.debug("Connection closed.")
self._close_transport()

def connection_lost(self, exc: Optional[Exception]) -> None:
"""On connection lost"""
if exc is not None:
logger.debug("Connection closed with error: %s.", exc)
self._close_transport()

def data_received(self, data: bytes) -> None:
"""On data received"""
try:
if self.command.validator(data):
logger.debug("Received: %s", data.hex())
self._retry = 0
self.response_future.set_result(data)
else:
logger.debug("Received invalid response: %s", data.hex())
self.response_future.set_exception(RequestRejectedException())
self._close_transport()
except RequestRejectedException as ex:
logger.debug("Received exception response: %s", data.hex())
self.response_future.set_exception(ex)
# self._close_transport()

def error_received(self, exc: Exception) -> None:
"""On error received"""
logger.debug("Received error: %s", exc)
self.response_future.set_exception(exc)
self._close_transport()

async def send_request(self, command: ProtocolCommand) -> Future:
"""Send message via transport"""
try:
await self._connect()
response_future = asyncio.get_running_loop().create_future()
self._send_request(command, response_future)
await response_future
return response_future
except asyncio.CancelledError:
if self._retry < self.retries:
logger.debug("Connection broken error")
self._retry += 1
self._close_transport()
return await self.send_request(command)
else:
return self._max_retries_reached()
except ConnectionRefusedError as exc:
if self._retry < self.retries:
logger.debug("Connection refused error: %s", exc)
self._retry += 1
return await self.send_request(command)
else:
return self._max_retries_reached()

def _send_request(self, command: ProtocolCommand, response_future: Future) -> None:
"""Send message via transport"""
self.command = command
self.response_future = response_future
logger.debug("Sending: %s%s", self.command,
f' - retry #{self._retry}/{self.retries}' if self._retry > 0 else '')
self._transport.write(self.command.request)
asyncio.get_running_loop().call_later(self.timeout, self._timeout_mechanism)

def _timeout_mechanism(self) -> None:
"""Retry mechanism to prevent hanging transport"""
if self.response_future.done():
self._retry = 0
else:
self._close_transport()

def _max_retries_reached(self) -> Future:
logger.debug("Max number of retries (%d) reached, request %s failed.", self.retries, self.command)
self._close_transport()
self.response_future = asyncio.get_running_loop().create_future()
self.response_future.set_exception(MaxRetriesException)
return self.response_future

def _close_transport(self) -> None:
if self._transport:
self._transport.close()
self._transport = None
# Cancel Future on connection lost
if self.response_future and not self.response_future.done():
self.response_future.cancel()


class ProtocolResponse:
Expand Down Expand Up @@ -135,35 +272,25 @@ def get_offset(self, address: int):
"""Calculate relative offset to start of the response bytes"""
return address

async def execute(self, host: str, port: int, timeout: int, retries: int) -> ProtocolResponse:
async def execute(self, protocol: InverterProtocol) -> ProtocolResponse:
"""
Execute the udp protocol command on the specified address/port.
Since the UDP communication is by definition unreliable, when no (valid) response is received by specified
timeout, the command will be re-tried up to retries times.
Execute the protocol command on the specified connection.
Return raw response data
Return ProtocolResponse with raw response data
"""
loop = asyncio.get_running_loop()
response_future = loop.create_future()
transport, _ = await loop.create_datagram_endpoint(
lambda: UdpInverterProtocol(response_future, self, timeout, retries),
remote_addr=(host, port),
)
try:
await response_future
response_future = await protocol.send_request(self)
result = response_future.result()
if result is not None:
return ProtocolResponse(result, self)
else:
raise RequestFailedException(
"No response received to '" + self.request.hex() + "' request."
)
except asyncio.CancelledError:
except (asyncio.CancelledError, ConnectionRefusedError):
raise RequestFailedException(
"No valid response received to '" + self.request.hex() + "' request."
) from None
finally:
transport.close()


class Aa55ProtocolCommand(ProtocolCommand):
Expand Down
Loading

0 comments on commit 28642e3

Please sign in to comment.