diff --git a/docs/changelog.md b/docs/changelog.md index d9abd04bc..6daa34ab5 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -6,6 +6,12 @@ nav_order: 2 # Changelog +# Unreleased changes + +### Internals + +- Replace `asyncio.wait_for` with `asyncio.timeout`. For Python <3.11 a backport package is used. + # 2.8.0 Hostnames 2023-04-12 ### Connection diff --git a/requirements/production.txt b/requirements/production.txt index 0b7cc2cf2..2c3fb0cae 100644 --- a/requirements/production.txt +++ b/requirements/production.txt @@ -1,2 +1,3 @@ +async-timeout==4.0.2;python_version<"3.11" cryptography==40.0.1 ifaddr==0.2.0 diff --git a/setup.py b/setup.py index d8ef6753d..d4c913bde 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,11 @@ with open(path.join(THIS_DIRECTORY, "xknx/__version__.py"), encoding="utf-8") as fp: exec(fp.read(), VERSION) -REQUIRES = ["cryptography>=35.0.0", "ifaddr>=0.1.7"] +REQUIRES = [ + "async_timeout>=4.0.0;python_version<'3.11'", + "cryptography>=35.0.0", + "ifaddr>=0.1.7", +] setup( name="xknx", diff --git a/test/core_tests/connection_manager_test.py b/test/core_tests/connection_manager_test.py index e48407aa3..5a87f3ac6 100644 --- a/test/core_tests/connection_manager_test.py +++ b/test/core_tests/connection_manager_test.py @@ -7,6 +7,7 @@ from xknx import XKNX from xknx.core import XknxConnectionState, XknxConnectionType from xknx.io import ConnectionConfig +from xknx.util import asyncio_timeout class TestConnectionManager: @@ -108,7 +109,8 @@ async def set_connected(): with patch("xknx.io.KNXIPInterface._start", side_effect=set_connected): await xknx.start() # wait for side_effect to finish - await asyncio.wait_for(xknx.connection_manager.connected.wait(), timeout=1) + async with asyncio_timeout(1): + await xknx.connection_manager.connected.wait() await xknx.stop() async def test_connection_information(self): diff --git a/xknx/cemi/cemi_handler.py b/xknx/cemi/cemi_handler.py index f095c6bbf..293586942 100644 --- a/xknx/cemi/cemi_handler.py +++ b/xknx/cemi/cemi_handler.py @@ -22,6 +22,7 @@ from xknx.secure.data_secure import DataSecure from xknx.secure.keyring import Keyring from xknx.telegram import IndividualAddress, Telegram, TelegramDirection, tpci +from xknx.util import asyncio_timeout from .cemi_frame import CEMIFrame, CEMILData from .const import CEMIMessageCode @@ -77,10 +78,8 @@ async def send_telegram(self, telegram: Telegram) -> None: raise ex try: - await asyncio.wait_for( - self._l_data_confirmation_event.wait(), - timeout=REQUEST_TO_CONFIRMATION_TIMEOUT, - ) + async with asyncio_timeout(REQUEST_TO_CONFIRMATION_TIMEOUT): + await self._l_data_confirmation_event.wait() except asyncio.TimeoutError: self.xknx.connection_manager.cemi_count_outgoing_error += 1 raise ConfirmationError( diff --git a/xknx/core/value_reader.py b/xknx/core/value_reader.py index c58c933da..9e15ebd33 100644 --- a/xknx/core/value_reader.py +++ b/xknx/core/value_reader.py @@ -16,6 +16,7 @@ from xknx.telegram import Telegram from xknx.telegram.address import GroupAddress, InternalGroupAddress from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite +from xknx.util import asyncio_timeout if TYPE_CHECKING: from xknx.xknx import XKNX @@ -49,10 +50,8 @@ async def read(self) -> Telegram | None: await self.send_group_read() try: - await asyncio.wait_for( - self.response_received_event.wait(), - timeout=self.timeout_in_seconds, - ) + async with asyncio_timeout(self.timeout_in_seconds): + await self.response_received_event.wait() except asyncio.TimeoutError: logger.warning( "Error: KNX bus did not respond in time (%s secs) to GroupValueRead request for: %s", diff --git a/xknx/io/gateway_scanner.py b/xknx/io/gateway_scanner.py index 3eb0a479c..1d2bad9a5 100644 --- a/xknx/io/gateway_scanner.py +++ b/xknx/io/gateway_scanner.py @@ -35,6 +35,7 @@ TunnelingSlotStatus, ) from xknx.telegram import IndividualAddress +from xknx.util import asyncio_timeout from .transport import UDPTransport @@ -266,10 +267,8 @@ async def _scan( ) try: await self._send_search_requests(udp_transport=udp_transport) - await asyncio.wait_for( - self._response_received_event.wait(), - timeout=self.timeout_in_seconds, - ) + async with asyncio_timeout(self.timeout_in_seconds): + await self._response_received_event.wait() except asyncio.TimeoutError: pass except asyncio.CancelledError: diff --git a/xknx/io/ip_secure.py b/xknx/io/ip_secure.py index d58194a91..326a7b405 100644 --- a/xknx/io/ip_secure.py +++ b/xknx/io/ip_secure.py @@ -37,6 +37,7 @@ generate_ecdh_key_pair, ) from xknx.secure.util import bytes_xor, sha256_hash +from xknx.util import asyncio_timeout from .const import SESSION_KEEPALIVE_RATE, XKNX_SERIAL_NUMBER from .request_response import Authenticate, Session @@ -672,13 +673,11 @@ async def synchronize(self) -> None: self._expected_notify_handler = message_tag, waiter_fut self.send_timer_notify(message_tag=message_tag) try: - timer_value = await asyncio.wait_for( - waiter_fut, - timeout=( # 3.3 seconds at latency_ms=1000, sync_latency_fraction=10% - self.max_delay_time_follower_update_notify - + 2 * self.latency_tolerance_ms / 1000 - ), - ) + async with asyncio_timeout( # 3.3 seconds at latency_ms=1000, sync_latency_fraction=10% + self.max_delay_time_follower_update_notify + + 2 * self.latency_tolerance_ms / 1000 + ): + timer_value = await waiter_fut self.update(new_value=timer_value) except asyncio.TimeoutError: # use highest received timer value of TimerNotify or SecureWrapper frames diff --git a/xknx/io/request_response/request_response.py b/xknx/io/request_response/request_response.py index 85db907fa..f2ffd17bd 100644 --- a/xknx/io/request_response/request_response.py +++ b/xknx/io/request_response/request_response.py @@ -11,6 +11,7 @@ from xknx.exceptions import CommunicationError from xknx.io.transport import KNXIPTransport from xknx.knxip import HPAI, ErrorCode, KNXIPBody, KNXIPBodyResponse, KNXIPFrame +from xknx.util import asyncio_timeout logger = logging.getLogger("xknx.log") @@ -44,10 +45,8 @@ async def start(self) -> None: ) try: await self.send_request() - await asyncio.wait_for( - self.response_received_event.wait(), - timeout=self.timeout_in_seconds, - ) + async with asyncio_timeout(self.timeout_in_seconds): + await self.response_received_event.wait() except asyncio.TimeoutError: logger.debug( "Error: KNX bus did not respond in time (%s secs) to request of type '%s'", diff --git a/xknx/io/self_description.py b/xknx/io/self_description.py index e0b42b8b1..16317c904 100644 --- a/xknx/io/self_description.py +++ b/xknx/io/self_description.py @@ -19,6 +19,7 @@ SearchRequestExtended, SearchResponseExtended, ) +from xknx.util import asyncio_timeout from .const import DEFAULT_MCAST_PORT from .transport import UDPTransport @@ -128,10 +129,8 @@ async def start(self) -> None: frame = self.create_knxipframe() try: self.transport.send(frame) - await asyncio.wait_for( - self.response_received_event.wait(), - timeout=DESCRIPTION_TIMEOUT, - ) + async with asyncio_timeout(DESCRIPTION_TIMEOUT): + await self.response_received_event.wait() except asyncio.TimeoutError: logger.debug( "Error: KNX bus did not respond in time (%s secs) to request of type '%s'", diff --git a/xknx/management/management.py b/xknx/management/management.py index 43c125e37..8a27b54bc 100644 --- a/xknx/management/management.py +++ b/xknx/management/management.py @@ -17,6 +17,7 @@ from xknx.telegram import IndividualAddress, Telegram from xknx.telegram.apci import APCI from xknx.telegram.tpci import TAck, TConnect, TDataConnected, TDisconnect, TNak +from xknx.util import asyncio_timeout if TYPE_CHECKING: from xknx.xknx import XKNX @@ -242,7 +243,8 @@ async def _send_data(self, payload: APCI) -> None: ) try: await self.xknx.cemi_handler.send_telegram(telegram) - ack = await asyncio.wait_for(self._ack_waiter, MANAGAMENT_ACK_TIMEOUT) + async with asyncio_timeout(MANAGAMENT_ACK_TIMEOUT): + ack = await self._ack_waiter except asyncio.TimeoutError: logger.info( "%s: timeout while waiting for ACK. Resending Telegram.", self.address @@ -252,7 +254,8 @@ async def _send_data(self, payload: APCI) -> None: self._ack_waiter = asyncio.get_event_loop().create_future() await self.xknx.cemi_handler.send_telegram(telegram) try: - ack = await asyncio.wait_for(self._ack_waiter, MANAGAMENT_ACK_TIMEOUT) + async with asyncio_timeout(MANAGAMENT_ACK_TIMEOUT): + ack = await self._ack_waiter except asyncio.TimeoutError: raise ManagementConnectionTimeout( "No ACK received for repeated telegram." @@ -278,9 +281,8 @@ async def _send_data(self, payload: APCI) -> None: async def _receive(self, expected_payload: type[APCI] | None) -> Telegram: """Wait for a telegram from the KNX device.""" try: - telegram = await asyncio.wait_for( - self._response_waiter, timeout=MANAGAMENT_CONNECTION_TIMEOUT - ) + async with asyncio_timeout(MANAGAMENT_CONNECTION_TIMEOUT): + telegram = await self._response_waiter except asyncio.TimeoutError: raise ManagementConnectionTimeout( f"Timeout while waiting for {expected_payload}" diff --git a/xknx/util/__init__.py b/xknx/util/__init__.py new file mode 100644 index 000000000..79d804904 --- /dev/null +++ b/xknx/util/__init__.py @@ -0,0 +1,12 @@ +"""Helper functions for XKNX.""" +import sys + +# Backport of `asyncio.timeout` to be able to replace `asyncio.wait_for` +# in py3.9 and py3.10 see https://github.com/python/cpython/pull/98518 +if sys.version_info[:2] < (3, 11): + from async_timeout import timeout as asyncio_timeout +else: + from asyncio import timeout as asyncio_timeout + + +__all__ = ["asyncio_timeout"]