Skip to content

Commit

Permalink
Use asyncio.timeout instead of asyncio.wait_for (#1246)
Browse files Browse the repository at this point in the history
* Use asyncio.timeout instead of asyncio.wait_for

* Update changelog.md
  • Loading branch information
farmio authored Apr 13, 2023
1 parent 728e953 commit 5b3444e
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 34 deletions.
6 changes: 6 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ nav_order: 2

# Changelog

# Unreleased changes

### Internals

- Replace `asyncio.wait_for` with `asyncio.timeout`. For Python <3.11 a backport package is used.

# 2.8.0 Hostnames 2023-04-12

### Connection
Expand Down
1 change: 1 addition & 0 deletions requirements/production.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
async-timeout==4.0.2;python_version<"3.11"
cryptography==40.0.1
ifaddr==0.2.0
6 changes: 5 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
with open(path.join(THIS_DIRECTORY, "xknx/__version__.py"), encoding="utf-8") as fp:
exec(fp.read(), VERSION)

REQUIRES = ["cryptography>=35.0.0", "ifaddr>=0.1.7"]
REQUIRES = [
"async_timeout>=4.0.0;python_version<'3.11'",
"cryptography>=35.0.0",
"ifaddr>=0.1.7",
]

setup(
name="xknx",
Expand Down
4 changes: 3 additions & 1 deletion test/core_tests/connection_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from xknx import XKNX
from xknx.core import XknxConnectionState, XknxConnectionType
from xknx.io import ConnectionConfig
from xknx.util import asyncio_timeout


class TestConnectionManager:
Expand Down Expand Up @@ -108,7 +109,8 @@ async def set_connected():
with patch("xknx.io.KNXIPInterface._start", side_effect=set_connected):
await xknx.start()
# wait for side_effect to finish
await asyncio.wait_for(xknx.connection_manager.connected.wait(), timeout=1)
async with asyncio_timeout(1):
await xknx.connection_manager.connected.wait()
await xknx.stop()

async def test_connection_information(self):
Expand Down
7 changes: 3 additions & 4 deletions xknx/cemi/cemi_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from xknx.secure.data_secure import DataSecure
from xknx.secure.keyring import Keyring
from xknx.telegram import IndividualAddress, Telegram, TelegramDirection, tpci
from xknx.util import asyncio_timeout

from .cemi_frame import CEMIFrame, CEMILData
from .const import CEMIMessageCode
Expand Down Expand Up @@ -77,10 +78,8 @@ async def send_telegram(self, telegram: Telegram) -> None:
raise ex

try:
await asyncio.wait_for(
self._l_data_confirmation_event.wait(),
timeout=REQUEST_TO_CONFIRMATION_TIMEOUT,
)
async with asyncio_timeout(REQUEST_TO_CONFIRMATION_TIMEOUT):
await self._l_data_confirmation_event.wait()
except asyncio.TimeoutError:
self.xknx.connection_manager.cemi_count_outgoing_error += 1
raise ConfirmationError(
Expand Down
7 changes: 3 additions & 4 deletions xknx/core/value_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from xknx.telegram import Telegram
from xknx.telegram.address import GroupAddress, InternalGroupAddress
from xknx.telegram.apci import GroupValueRead, GroupValueResponse, GroupValueWrite
from xknx.util import asyncio_timeout

if TYPE_CHECKING:
from xknx.xknx import XKNX
Expand Down Expand Up @@ -49,10 +50,8 @@ async def read(self) -> Telegram | None:
await self.send_group_read()

try:
await asyncio.wait_for(
self.response_received_event.wait(),
timeout=self.timeout_in_seconds,
)
async with asyncio_timeout(self.timeout_in_seconds):
await self.response_received_event.wait()
except asyncio.TimeoutError:
logger.warning(
"Error: KNX bus did not respond in time (%s secs) to GroupValueRead request for: %s",
Expand Down
7 changes: 3 additions & 4 deletions xknx/io/gateway_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
TunnelingSlotStatus,
)
from xknx.telegram import IndividualAddress
from xknx.util import asyncio_timeout

from .transport import UDPTransport

Expand Down Expand Up @@ -266,10 +267,8 @@ async def _scan(
)
try:
await self._send_search_requests(udp_transport=udp_transport)
await asyncio.wait_for(
self._response_received_event.wait(),
timeout=self.timeout_in_seconds,
)
async with asyncio_timeout(self.timeout_in_seconds):
await self._response_received_event.wait()
except asyncio.TimeoutError:
pass
except asyncio.CancelledError:
Expand Down
13 changes: 6 additions & 7 deletions xknx/io/ip_secure.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
generate_ecdh_key_pair,
)
from xknx.secure.util import bytes_xor, sha256_hash
from xknx.util import asyncio_timeout

from .const import SESSION_KEEPALIVE_RATE, XKNX_SERIAL_NUMBER
from .request_response import Authenticate, Session
Expand Down Expand Up @@ -672,13 +673,11 @@ async def synchronize(self) -> None:
self._expected_notify_handler = message_tag, waiter_fut
self.send_timer_notify(message_tag=message_tag)
try:
timer_value = await asyncio.wait_for(
waiter_fut,
timeout=( # 3.3 seconds at latency_ms=1000, sync_latency_fraction=10%
self.max_delay_time_follower_update_notify
+ 2 * self.latency_tolerance_ms / 1000
),
)
async with asyncio_timeout( # 3.3 seconds at latency_ms=1000, sync_latency_fraction=10%
self.max_delay_time_follower_update_notify
+ 2 * self.latency_tolerance_ms / 1000
):
timer_value = await waiter_fut
self.update(new_value=timer_value)
except asyncio.TimeoutError:
# use highest received timer value of TimerNotify or SecureWrapper frames
Expand Down
7 changes: 3 additions & 4 deletions xknx/io/request_response/request_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from xknx.exceptions import CommunicationError
from xknx.io.transport import KNXIPTransport
from xknx.knxip import HPAI, ErrorCode, KNXIPBody, KNXIPBodyResponse, KNXIPFrame
from xknx.util import asyncio_timeout

logger = logging.getLogger("xknx.log")

Expand Down Expand Up @@ -44,10 +45,8 @@ async def start(self) -> None:
)
try:
await self.send_request()
await asyncio.wait_for(
self.response_received_event.wait(),
timeout=self.timeout_in_seconds,
)
async with asyncio_timeout(self.timeout_in_seconds):
await self.response_received_event.wait()
except asyncio.TimeoutError:
logger.debug(
"Error: KNX bus did not respond in time (%s secs) to request of type '%s'",
Expand Down
7 changes: 3 additions & 4 deletions xknx/io/self_description.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
SearchRequestExtended,
SearchResponseExtended,
)
from xknx.util import asyncio_timeout

from .const import DEFAULT_MCAST_PORT
from .transport import UDPTransport
Expand Down Expand Up @@ -128,10 +129,8 @@ async def start(self) -> None:
frame = self.create_knxipframe()
try:
self.transport.send(frame)
await asyncio.wait_for(
self.response_received_event.wait(),
timeout=DESCRIPTION_TIMEOUT,
)
async with asyncio_timeout(DESCRIPTION_TIMEOUT):
await self.response_received_event.wait()
except asyncio.TimeoutError:
logger.debug(
"Error: KNX bus did not respond in time (%s secs) to request of type '%s'",
Expand Down
12 changes: 7 additions & 5 deletions xknx/management/management.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from xknx.telegram import IndividualAddress, Telegram
from xknx.telegram.apci import APCI
from xknx.telegram.tpci import TAck, TConnect, TDataConnected, TDisconnect, TNak
from xknx.util import asyncio_timeout

if TYPE_CHECKING:
from xknx.xknx import XKNX
Expand Down Expand Up @@ -242,7 +243,8 @@ async def _send_data(self, payload: APCI) -> None:
)
try:
await self.xknx.cemi_handler.send_telegram(telegram)
ack = await asyncio.wait_for(self._ack_waiter, MANAGAMENT_ACK_TIMEOUT)
async with asyncio_timeout(MANAGAMENT_ACK_TIMEOUT):
ack = await self._ack_waiter
except asyncio.TimeoutError:
logger.info(
"%s: timeout while waiting for ACK. Resending Telegram.", self.address
Expand All @@ -252,7 +254,8 @@ async def _send_data(self, payload: APCI) -> None:
self._ack_waiter = asyncio.get_event_loop().create_future()
await self.xknx.cemi_handler.send_telegram(telegram)
try:
ack = await asyncio.wait_for(self._ack_waiter, MANAGAMENT_ACK_TIMEOUT)
async with asyncio_timeout(MANAGAMENT_ACK_TIMEOUT):
ack = await self._ack_waiter
except asyncio.TimeoutError:
raise ManagementConnectionTimeout(
"No ACK received for repeated telegram."
Expand All @@ -278,9 +281,8 @@ async def _send_data(self, payload: APCI) -> None:
async def _receive(self, expected_payload: type[APCI] | None) -> Telegram:
"""Wait for a telegram from the KNX device."""
try:
telegram = await asyncio.wait_for(
self._response_waiter, timeout=MANAGAMENT_CONNECTION_TIMEOUT
)
async with asyncio_timeout(MANAGAMENT_CONNECTION_TIMEOUT):
telegram = await self._response_waiter
except asyncio.TimeoutError:
raise ManagementConnectionTimeout(
f"Timeout while waiting for {expected_payload}"
Expand Down
12 changes: 12 additions & 0 deletions xknx/util/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
"""Helper functions for XKNX."""
import sys

# Backport of `asyncio.timeout` to be able to replace `asyncio.wait_for`
# in py3.9 and py3.10 see https://github.com/python/cpython/pull/98518
if sys.version_info[:2] < (3, 11):
from async_timeout import timeout as asyncio_timeout
else:
from asyncio import timeout as asyncio_timeout


__all__ = ["asyncio_timeout"]

0 comments on commit 5b3444e

Please sign in to comment.