diff --git a/vespa/application.py b/vespa/application.py index ce27f058..ebdbffba 100644 --- a/vespa/application.py +++ b/vespa/application.py @@ -135,7 +135,11 @@ def __init__( self.auth_method = "mtls" def asyncio( - self, connections: Optional[int] = 8, total_timeout: int = 10 + self, + connections: Optional[int] = 1, + total_timeout: Optional[int] = None, + timeout: Union[httpx.Timeout, int] = httpx.Timeout(5), + **kwargs, ) -> "VespaAsync": """ Access Vespa asynchronous connection layer. @@ -146,14 +150,26 @@ def asyncio( async with app.asyncio() as async_app: response = await async_app.query(body=body) - See :class:`VespaAsync` for more details. + # passing kwargs + limits = httpx.Limits(max_keepalive_connections=5, max_connections=5, keepalive_expiry=15) + timeout = httpx.Timeout(connect=3, read=4, write=2, pool=5) + async with app.asyncio(connections=5, timeout=timeout, limits=limits) as async_app: + response = await async_app.query(body=body) - :param connections: Number of allowed concurrent connections - :param total_timeout: Total timeout in secs. + See :class:`VespaAsync` for more details on the parameters. + + :param connections: Number of maximum_keepalive_connections. + :param total_timeout: Deprecated. Will be ignored. Use timeout instead. + :param timeout: httpx.Timeout object. See https://www.python-httpx.org/advanced/timeouts/. Defaults to 5 seconds. + :param kwargs: Additional arguments to be passed to the httpx.AsyncClient. :return: Instance of Vespa asynchronous layer. """ return VespaAsync( - app=self, connections=connections, total_timeout=total_timeout + app=self, + connections=connections, + total_timeout=total_timeout, + timeout=timeout, + **kwargs, ) def syncio( @@ -1472,58 +1488,92 @@ def __init__( self, app: Vespa, connections: Optional[int] = 1, - total_timeout: int = 10, - connect_timeout: int = 5, - read_timeout: int = 5, - write_timeout: int = 5, - pool_timeout: int = 5, - keepalive_expiry: int = 10, + total_timeout: Optional[int] = None, + timeout: Union[httpx.Timeout, int] = httpx.Timeout(5), **kwargs, ) -> None: """ - Class to handle async HTTP-connection(s) to Vespa. - Uses httpx as the async http client, and HTTP/2 by default. + Class to handle asynchronous HTTP connections to Vespa. + + Uses `httpx` as the async HTTP client, and HTTP/2 by default. This class is intended to be used as a context manager. - Example usage:: + **Basic usage**:: - async with VespaAsync(app) as async_app: - response = await async_app.query(body={"yql": "select * from sources * where title contains 'music';"}) + async with VespaAsync(app) as async_app: + response = await async_app.query( + body={"yql": "select * from sources * where title contains 'music';"} + ) - Can also be accessed directly from :func:`Vespa.asyncio` :: + **Passing custom timeout and limits**:: - app = Vespa(url="localhost", port=8080) - async with app.asyncio() as async_app: - response = await async_app.query(body={"yql": "select * from sources * where title contains 'music';"}) + import httpx + + timeout = httpx.Timeout(10.0, connect=5.0) + limits = httpx.Limits(max_connections=10, max_keepalive_connections=5) + + async with VespaAsync(app, timeout=timeout, limits=limits) as async_app: + response = await async_app.query( + body={"yql": "select * from sources * where title contains 'music';"} + ) + + **Using additional kwargs (e.g., proxies)**:: + + proxies = "http://localhost:8080" + + async with VespaAsync(app, proxies=proxies) as async_app: + response = await async_app.query( + body={"yql": "select * from sources * where title contains 'music';"} + ) + + **Accessing via :func:`Vespa.asyncio`**:: + + app = Vespa(url="localhost", port=8080) + async with app.asyncio(timeout=timeout, limits=limits) as async_app: + response = await async_app.query( + body={"yql": "select * from sources * where title contains 'music';"} + ) See also :func:`Vespa.feed_async_iterable` for a convenient interface to async data feeding. Args: app (Vespa): Vespa application object. - connections (Optional[int], optional): number of connections. Defaults to 1 as HTTP/2 is multiplexed. - total_timeout (int, optional): total timeout in seconds for the HTTP request. Defaults to 10. Will be applied to connect, read, and write. See https://www.python-httpx.org/advanced/timeouts/ - connect_timeout (int, optional): connect timeout in seconds for the HTTP request. Defaults to 5. Will not be applied if total_timeout is set. See https://www.python-httpx.org/advanced/timeouts/ - read_timeout (int, optional): read timeout in seconds for the HTTP request. Defaults to 5. Will not be applied if total_timeout is set. See https://www.python-httpx.org/advanced/timeouts/ - write_timeout (int, optional): write timeout in seconds for the HTTP request. Defaults to 5. Will not be applied if total_timeout is set. See https://www.python-httpx.org/advanced/timeouts/ - pool_timeout (int, optional): pool timeout in seconds for the HTTP request. Defaults to 5. Will not be applied if total_timeout is set. See https://www.python-httpx.org/advanced/timeouts/ - keepalive_expiry (int, optional): time limit on idle keep-alive connections in seconds. Defaults to 10. Setting higher than 30 seconds may cause the Vespa server to reset connection and will issue Warning. See https://www.python-httpx.org/advanced/resource-limits/ - **kwargs: Additional arguments to be passed to the httpx.AsyncClient. See https://www.python-httpx.org/api/#asyncclient + connections (Optional[int], optional): Number of connections. Defaults to 1 as HTTP/2 is multiplexed. + total_timeout (int, optional): **Deprecated**. Will be ignored and removed in future versions. + Use `timeout` to pass an `httpx.Timeout` object instead. + timeout (httpx.Timeout, optional): Timeout settings for the `httpx.AsyncClient`. Defaults to `httpx.Timeout(5)`. + **kwargs: Additional arguments to be passed to the `httpx.AsyncClient`. See + [HTTPX AsyncClient documentation](https://www.python-httpx.org/api/#asyncclient) for more details. + + Note: + - Passing `timeout` allows you to configure timeouts for connect, read, write, and overall request time. + - The `limits` parameter can be used to control connection pooling behavior, such as the maximum number of concurrent connections. + - See https://www.python-httpx.org/ for more information on `httpx` and its features. """ self.app = app self.httpx_client = None self.connections = connections self.total_timeout = total_timeout - self.connect_timeout = connect_timeout - self.read_timeout = read_timeout - self.write_timeout = write_timeout - self.pool_timeout = pool_timeout - self.keepalive_expiry = keepalive_expiry - if self.keepalive_expiry > 30: + if self.total_timeout is not None: + # issue DeprecationWarning warnings.warn( - "Setting keepalive_expiry higher than 30 seconds may cause the Vespa server to reset idle connection." + "total_timeout is deprecated, will be ignored and will be removed in future versions. Use timeout to pass a httpx.Timeout object instead.", + category=DeprecationWarning, ) + self.timeout = timeout + if isinstance(self.timeout, int): + self.timeout = httpx.Timeout(timeout) self.kwargs = kwargs self.headers = self.app.base_headers.copy() + self.limits = kwargs.get( + "limits", httpx.Limits(max_keepalive_connections=self.connections) + ) + # Warn if limits.keepalive_expiry is higher than 30 seconds + if self.limits.keepalive_expiry and self.limits.keepalive_expiry > 30: + warnings.warn( + "Keepalive expiry is set to more than 30 seconds. Vespa server resets idle connections, so this may cause ConnectionResetError.", + category=UserWarning, + ) if self.app.auth_method == "token" and self.app.vespa_cloud_secret_token: # Bearer and user-agent self.headers.update( @@ -1540,31 +1590,17 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): def _open_httpx_client(self): if self.httpx_client is not None: return - limits = httpx.Limits( - max_keepalive_connections=self.connections, - max_connections=self.connections, - keepalive_expiry=self.keepalive_expiry, # This should NOT exceed the keepalive_timeout on the Server, otherwise we will get ConnectionTerminated errors. - ) - if self.total_timeout: - timeout = httpx.Timeout(self.total_timeout) - else: - timeout = httpx.Timeout( - pool=self.pool_timeout, - connect=self.connect_timeout, - read=self.read_timeout, - write=self.write_timeout, - ) + if self.app.cert is not None: sslcontext = httpx.create_ssl_context(cert=(self.app.cert, self.app.key)) else: sslcontext = False self.httpx_client = httpx.AsyncClient( - timeout=timeout, + timeout=self.timeout, headers=self.headers, verify=sslcontext, http2=True, # HTTP/2 by default http1=False, - limits=limits, **self.kwargs, ) return self.httpx_client