Skip to content

Commit

Permalink
Custom timeout for requests (#687)
Browse files Browse the repository at this point in the history
* setup uniform interfaces for all Client._request_impl overridden functions


* ensure that timeout is a keyword-only arg. this avoids mistaken usage of this parameter

---------

Co-authored-by: justinr1234 <[email protected]>
  • Loading branch information
ckeshava and justinr1234 authored May 7, 2024
1 parent 6491781 commit 260d7dc
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 15 deletions.
7 changes: 5 additions & 2 deletions xrpl/asyncio/clients/async_websocket_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any, Dict, Type

from xrpl.asyncio.clients.async_client import AsyncClient
from xrpl.asyncio.clients.client import REQUEST_TIMEOUT
from xrpl.asyncio.clients.exceptions import XRPLWebsocketException
from xrpl.asyncio.clients.websocket_base import WebsocketBase
from xrpl.models.requests.request import Request
Expand Down Expand Up @@ -262,7 +263,9 @@ async def send(self: AsyncWebsocketClient, request: Request) -> None:
raise XRPLWebsocketException("Websocket is not open")
await self._do_send(request)

async def _request_impl(self: WebsocketBase, request: Request) -> Response:
async def _request_impl(
self: WebsocketBase, request: Request, *, timeout: float = REQUEST_TIMEOUT
) -> Response:
"""
``_request_impl`` implementation for async websocket.
Expand All @@ -280,4 +283,4 @@ async def _request_impl(self: WebsocketBase, request: Request) -> Response:
"""
if not self.is_open():
raise XRPLWebsocketException("Websocket is not open")
return await self._do_request_impl(request)
return await self._do_request_impl(request, timeout)
11 changes: 10 additions & 1 deletion xrpl/asyncio/clients/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@
from abc import ABC, abstractmethod
from typing import Optional

from typing_extensions import Final

from xrpl.models.requests.request import Request
from xrpl.models.response import Response

# The default request timeout duration. Set in Client._request_impl to allow more time
# for longer running commands.
REQUEST_TIMEOUT: Final[float] = 10.0


class Client(ABC):
"""
Expand All @@ -27,14 +33,17 @@ def __init__(self: Client, url: str) -> None:
self.build_version: Optional[str] = None

@abstractmethod
async def _request_impl(self: Client, request: Request) -> Response:
async def _request_impl(
self: Client, request: Request, *, timeout: float = REQUEST_TIMEOUT
) -> Response:
"""
This is the actual driver for a given Client's request. It must be
async because all of the helper functions in this library are
async-first. Implement this in a given Client.
Arguments:
request: An object representing information about a rippled request.
timeout: The maximum tolerable delay on waiting for a response.
Returns:
The response from the server, as a Response object.
Expand Down
13 changes: 7 additions & 6 deletions xrpl/asyncio/clients/json_rpc_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@
from json import JSONDecodeError

from httpx import AsyncClient
from typing_extensions import Final

from xrpl.asyncio.clients.client import Client
from xrpl.asyncio.clients.client import REQUEST_TIMEOUT, Client
from xrpl.asyncio.clients.exceptions import XRPLRequestFailureException
from xrpl.asyncio.clients.utils import json_to_response, request_to_json_rpc
from xrpl.models.requests.request import Request
from xrpl.models.response import Response

_TIMEOUT: Final[float] = 10.0


class JsonRpcBase(Client):
"""
Expand All @@ -22,12 +19,16 @@ class JsonRpcBase(Client):
:meta private:
"""

async def _request_impl(self: JsonRpcBase, request: Request) -> Response:
async def _request_impl(
self: JsonRpcBase, request: Request, *, timeout: float = REQUEST_TIMEOUT
) -> Response:
"""
Base ``_request_impl`` implementation for JSON RPC.
Arguments:
request: An object representing information about a rippled request.
timeout: The duration within which we expect to hear a response from the
rippled validator.
Returns:
The response from the server, as a Response object.
Expand All @@ -37,7 +38,7 @@ async def _request_impl(self: JsonRpcBase, request: Request) -> Response:
:meta private:
"""
async with AsyncClient(timeout=_TIMEOUT) as http_client:
async with AsyncClient(timeout=timeout) as http_client:
response = await http_client.post(
self.url,
json=request_to_json_rpc(request),
Expand Down
16 changes: 12 additions & 4 deletions xrpl/asyncio/clients/websocket_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ async def _do_pop_message(self: WebsocketBase) -> Dict[str, Any]:
cast(_MESSAGES_TYPE, self._messages).task_done()
return msg

async def _do_request_impl(self: WebsocketBase, request: Request) -> Response:
async def _do_request_impl(
self: WebsocketBase, request: Request, timeout: float
) -> Response:
"""
Base ``_request_impl`` implementation for websockets.
Expand All @@ -221,8 +223,14 @@ async def _do_request_impl(self: WebsocketBase, request: Request) -> Response:

# fire-and-forget the send, and await the Future
asyncio.create_task(self._do_send_no_future(request_with_id))
raw_response = await self._open_requests[request_str]

# remove the resolved Future, hopefully getting it garbage colleted
del self._open_requests[request_str]
try:
raw_response = await asyncio.wait_for(
self._open_requests[request_str], timeout
)
finally:
# remove the resolved Future, hopefully getting it garbage colleted
# Ensure the request is removed whether it times out or not
del self._open_requests[request_str]

return websocket_to_response(raw_response)
7 changes: 5 additions & 2 deletions xrpl/clients/websocket_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from types import TracebackType
from typing import Any, Dict, Iterator, Optional, Type, Union, cast

from xrpl.asyncio.clients.client import REQUEST_TIMEOUT
from xrpl.asyncio.clients.exceptions import XRPLWebsocketException
from xrpl.asyncio.clients.websocket_base import WebsocketBase
from xrpl.clients.sync_client import SyncClient
Expand Down Expand Up @@ -200,7 +201,9 @@ def send(self: WebsocketClient, request: Request) -> None:
self._do_send(request), cast(asyncio.AbstractEventLoop, self._loop)
).result()

async def _request_impl(self: WebsocketClient, request: Request) -> Response:
async def _request_impl(
self: WebsocketClient, request: Request, *, timeout: float = REQUEST_TIMEOUT
) -> Response:
"""
``_request_impl`` implementation for sync websockets that ensures the
``WebsocketBase._do_request_impl`` implementation is run on the other thread.
Expand Down Expand Up @@ -232,6 +235,6 @@ async def _request_impl(self: WebsocketClient, request: Request) -> Response:
# completely block the main thread until completed,
# just as if it were not async.
return asyncio.run_coroutine_threadsafe(
self._do_request_impl(request),
self._do_request_impl(request, timeout),
cast(asyncio.AbstractEventLoop, self._loop),
).result()

0 comments on commit 260d7dc

Please sign in to comment.