Skip to content

Commit

Permalink
deprecate total_timeout, allow timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasht86 committed Oct 17, 2024
1 parent 0b93eb9 commit 10a7372
Showing 1 changed file with 87 additions and 51 deletions.
138 changes: 87 additions & 51 deletions vespa/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ def __init__(
self.auth_method = "mtls"

def asyncio(
self, connections: Optional[int] = 8, total_timeout: int = 10
self,
connections: Optional[int] = 1,
total_timeout: Optional[int] = None,
timeout: Union[httpx.Timeout, int] = httpx.Timeout(5),
**kwargs,
) -> "VespaAsync":
"""
Access Vespa asynchronous connection layer.
Expand All @@ -146,14 +150,26 @@ def asyncio(
async with app.asyncio() as async_app:
response = await async_app.query(body=body)
See :class:`VespaAsync` for more details.
# passing kwargs
limits = httpx.Limits(max_keepalive_connections=5, max_connections=5, keepalive_expiry=15)
timeout = httpx.Timeout(connect=3, read=4, write=2, pool=5)
async with app.asyncio(connections=5, timeout=timeout, limits=limits) as async_app:
response = await async_app.query(body=body)
:param connections: Number of allowed concurrent connections
:param total_timeout: Total timeout in secs.
See :class:`VespaAsync` for more details on the parameters.
:param connections: Number of maximum_keepalive_connections.
:param total_timeout: Deprecated. Will be ignored. Use timeout instead.
:param timeout: httpx.Timeout object. See https://www.python-httpx.org/advanced/timeouts/. Defaults to 5 seconds.
:param kwargs: Additional arguments to be passed to the httpx.AsyncClient.
:return: Instance of Vespa asynchronous layer.
"""
return VespaAsync(
app=self, connections=connections, total_timeout=total_timeout
app=self,
connections=connections,
total_timeout=total_timeout,
timeout=timeout,
**kwargs,
)

def syncio(
Expand Down Expand Up @@ -1472,58 +1488,92 @@ def __init__(
self,
app: Vespa,
connections: Optional[int] = 1,
total_timeout: int = 10,
connect_timeout: int = 5,
read_timeout: int = 5,
write_timeout: int = 5,
pool_timeout: int = 5,
keepalive_expiry: int = 10,
total_timeout: Optional[int] = None,
timeout: Union[httpx.Timeout, int] = httpx.Timeout(5),
**kwargs,
) -> None:
"""
Class to handle async HTTP-connection(s) to Vespa.
Uses httpx as the async http client, and HTTP/2 by default.
Class to handle asynchronous HTTP connections to Vespa.
Uses `httpx` as the async HTTP client, and HTTP/2 by default.
This class is intended to be used as a context manager.
Example usage::
**Basic usage**::
async with VespaAsync(app) as async_app:
response = await async_app.query(body={"yql": "select * from sources * where title contains 'music';"})
async with VespaAsync(app) as async_app:
response = await async_app.query(
body={"yql": "select * from sources * where title contains 'music';"}
)
Can also be accessed directly from :func:`Vespa.asyncio` ::
**Passing custom timeout and limits**::
app = Vespa(url="localhost", port=8080)
async with app.asyncio() as async_app:
response = await async_app.query(body={"yql": "select * from sources * where title contains 'music';"})
import httpx
timeout = httpx.Timeout(10.0, connect=5.0)
limits = httpx.Limits(max_connections=10, max_keepalive_connections=5)
async with VespaAsync(app, timeout=timeout, limits=limits) as async_app:
response = await async_app.query(
body={"yql": "select * from sources * where title contains 'music';"}
)
**Using additional kwargs (e.g., proxies)**::
proxies = "http://localhost:8080"
async with VespaAsync(app, proxies=proxies) as async_app:
response = await async_app.query(
body={"yql": "select * from sources * where title contains 'music';"}
)
**Accessing via :func:`Vespa.asyncio`**::
app = Vespa(url="localhost", port=8080)
async with app.asyncio(timeout=timeout, limits=limits) as async_app:
response = await async_app.query(
body={"yql": "select * from sources * where title contains 'music';"}
)
See also :func:`Vespa.feed_async_iterable` for a convenient interface to async data feeding.
Args:
app (Vespa): Vespa application object.
connections (Optional[int], optional): number of connections. Defaults to 1 as HTTP/2 is multiplexed.
total_timeout (int, optional): total timeout in seconds for the HTTP request. Defaults to 10. Will be applied to connect, read, and write. See https://www.python-httpx.org/advanced/timeouts/
connect_timeout (int, optional): connect timeout in seconds for the HTTP request. Defaults to 5. Will not be applied if total_timeout is set. See https://www.python-httpx.org/advanced/timeouts/
read_timeout (int, optional): read timeout in seconds for the HTTP request. Defaults to 5. Will not be applied if total_timeout is set. See https://www.python-httpx.org/advanced/timeouts/
write_timeout (int, optional): write timeout in seconds for the HTTP request. Defaults to 5. Will not be applied if total_timeout is set. See https://www.python-httpx.org/advanced/timeouts/
pool_timeout (int, optional): pool timeout in seconds for the HTTP request. Defaults to 5. Will not be applied if total_timeout is set. See https://www.python-httpx.org/advanced/timeouts/
keepalive_expiry (int, optional): time limit on idle keep-alive connections in seconds. Defaults to 10. Setting higher than 30 seconds may cause the Vespa server to reset connection and will issue Warning. See https://www.python-httpx.org/advanced/resource-limits/
**kwargs: Additional arguments to be passed to the httpx.AsyncClient. See https://www.python-httpx.org/api/#asyncclient
connections (Optional[int], optional): Number of connections. Defaults to 1 as HTTP/2 is multiplexed.
total_timeout (int, optional): **Deprecated**. Will be ignored and removed in future versions.
Use `timeout` to pass an `httpx.Timeout` object instead.
timeout (httpx.Timeout, optional): Timeout settings for the `httpx.AsyncClient`. Defaults to `httpx.Timeout(5)`.
**kwargs: Additional arguments to be passed to the `httpx.AsyncClient`. See
[HTTPX AsyncClient documentation](https://www.python-httpx.org/api/#asyncclient) for more details.
Note:
- Passing `timeout` allows you to configure timeouts for connect, read, write, and overall request time.
- The `limits` parameter can be used to control connection pooling behavior, such as the maximum number of concurrent connections.
- See https://www.python-httpx.org/ for more information on `httpx` and its features.
"""
self.app = app
self.httpx_client = None
self.connections = connections
self.total_timeout = total_timeout
self.connect_timeout = connect_timeout
self.read_timeout = read_timeout
self.write_timeout = write_timeout
self.pool_timeout = pool_timeout
self.keepalive_expiry = keepalive_expiry
if self.keepalive_expiry > 30:
if self.total_timeout is not None:
# issue DeprecationWarning
warnings.warn(
"Setting keepalive_expiry higher than 30 seconds may cause the Vespa server to reset idle connection."
"total_timeout is deprecated, will be ignored and will be removed in future versions. Use timeout to pass a httpx.Timeout object instead.",
category=DeprecationWarning,
)
self.timeout = timeout
if isinstance(self.timeout, int):
self.timeout = httpx.Timeout(timeout)
self.kwargs = kwargs
self.headers = self.app.base_headers.copy()
self.limits = kwargs.get(
"limits", httpx.Limits(max_keepalive_connections=self.connections)
)
# Warn if limits.keepalive_expiry is higher than 30 seconds
if self.limits.keepalive_expiry and self.limits.keepalive_expiry > 30:
warnings.warn(
"Keepalive expiry is set to more than 30 seconds. Vespa server resets idle connections, so this may cause ConnectionResetError.",
category=UserWarning,
)
if self.app.auth_method == "token" and self.app.vespa_cloud_secret_token:
# Bearer and user-agent
self.headers.update(
Expand All @@ -1540,31 +1590,17 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
def _open_httpx_client(self):
if self.httpx_client is not None:
return
limits = httpx.Limits(
max_keepalive_connections=self.connections,
max_connections=self.connections,
keepalive_expiry=self.keepalive_expiry, # This should NOT exceed the keepalive_timeout on the Server, otherwise we will get ConnectionTerminated errors.
)
if self.total_timeout:
timeout = httpx.Timeout(self.total_timeout)
else:
timeout = httpx.Timeout(
pool=self.pool_timeout,
connect=self.connect_timeout,
read=self.read_timeout,
write=self.write_timeout,
)

if self.app.cert is not None:
sslcontext = httpx.create_ssl_context(cert=(self.app.cert, self.app.key))
else:
sslcontext = False
self.httpx_client = httpx.AsyncClient(
timeout=timeout,
timeout=self.timeout,
headers=self.headers,
verify=sslcontext,
http2=True, # HTTP/2 by default
http1=False,
limits=limits,
**self.kwargs,
)
return self.httpx_client
Expand Down

0 comments on commit 10a7372

Please sign in to comment.