diff --git a/src/finch/_base_client.py b/src/finch/_base_client.py index a168301f..89d9ce48 100644 --- a/src/finch/_base_client.py +++ b/src/finch/_base_client.py @@ -72,6 +72,7 @@ DEFAULT_TIMEOUT, DEFAULT_MAX_RETRIES, RAW_RESPONSE_HEADER, + STREAMED_RAW_RESPONSE_HEADER, ) from ._streaming import Stream, AsyncStream from ._exceptions import ( @@ -363,14 +364,21 @@ def _make_status_error_from_response( self, response: httpx.Response, ) -> APIStatusError: - err_text = response.text.strip() - body = err_text + if response.is_closed and not response.is_stream_consumed: + # We can't read the response body as it has been closed + # before it was read. This can happen if an event hook + # raises a status error. + body = None + err_msg = f"Error code: {response.status_code}" + else: + err_text = response.text.strip() + body = err_text - try: - body = json.loads(err_text) - err_msg = f"Error code: {response.status_code} - {body}" - except Exception: - err_msg = err_text or f"Error code: {response.status_code}" + try: + body = json.loads(err_text) + err_msg = f"Error code: {response.status_code} - {body}" + except Exception: + err_msg = err_text or f"Error code: {response.status_code}" return self._make_status_error(err_msg, body=body, response=response) @@ -534,6 +542,12 @@ def _process_response_data( except pydantic.ValidationError as err: raise APIResponseValidationError(response=response, body=data) from err + def _should_stream_response_body(self, *, request: httpx.Request) -> bool: + if request.headers.get(STREAMED_RAW_RESPONSE_HEADER) == "true": + return True + + return False + @property def qs(self) -> Querystring: return Querystring() @@ -606,7 +620,7 @@ def _calculate_retry_timeout( if response_headers is not None: retry_header = response_headers.get("retry-after") try: - retry_after = int(retry_header) + retry_after = float(retry_header) except Exception: retry_date_tuple = email.utils.parsedate_tz(retry_header) if retry_date_tuple is None: @@ -862,14 +876,21 @@ def _request( request = self._build_request(options) self._prepare_request(request) + response = None + try: - response = self._client.send(request, auth=self.custom_auth, stream=stream) + response = self._client.send( + request, + auth=self.custom_auth, + stream=stream or self._should_stream_response_body(request=request), + ) log.debug( 'HTTP Request: %s %s "%i %s"', request.method, request.url, response.status_code, response.reason_phrase ) response.raise_for_status() except httpx.HTTPStatusError as err: # thrown on 4xx and 5xx status code if retries > 0 and self._should_retry(err.response): + err.response.close() return self._retry_request( options, cast_to, @@ -881,9 +902,14 @@ def _request( # If the response is streamed then we need to explicitly read the response # to completion before attempting to access the response text. - err.response.read() + if not err.response.is_closed: + err.response.read() + raise self._make_status_error_from_response(err.response) from None except httpx.TimeoutException as err: + if response is not None: + response.close() + if retries > 0: return self._retry_request( options, @@ -891,9 +917,14 @@ def _request( retries, stream=stream, stream_cls=stream_cls, + response_headers=response.headers if response is not None else None, ) + raise APITimeoutError(request=request) from err except Exception as err: + if response is not None: + response.close() + if retries > 0: return self._retry_request( options, @@ -901,7 +932,9 @@ def _request( retries, stream=stream, stream_cls=stream_cls, + response_headers=response.headers if response is not None else None, ) + raise APIConnectionError(request=request) from err return self._process_response( @@ -917,7 +950,7 @@ def _retry_request( options: FinalRequestOptions, cast_to: Type[ResponseT], remaining_retries: int, - response_headers: Optional[httpx.Headers] = None, + response_headers: httpx.Headers | None, *, stream: bool, stream_cls: type[_StreamT] | None, @@ -1303,14 +1336,21 @@ async def _request( request = self._build_request(options) await self._prepare_request(request) + response = None + try: - response = await self._client.send(request, auth=self.custom_auth, stream=stream) + response = await self._client.send( + request, + auth=self.custom_auth, + stream=stream or self._should_stream_response_body(request=request), + ) log.debug( 'HTTP Request: %s %s "%i %s"', request.method, request.url, response.status_code, response.reason_phrase ) response.raise_for_status() except httpx.HTTPStatusError as err: # thrown on 4xx and 5xx status code if retries > 0 and self._should_retry(err.response): + await err.response.aclose() return await self._retry_request( options, cast_to, @@ -1322,19 +1362,39 @@ async def _request( # If the response is streamed then we need to explicitly read the response # to completion before attempting to access the response text. - await err.response.aread() + if not err.response.is_closed: + await err.response.aread() + raise self._make_status_error_from_response(err.response) from None - except httpx.ConnectTimeout as err: - if retries > 0: - return await self._retry_request(options, cast_to, retries, stream=stream, stream_cls=stream_cls) - raise APITimeoutError(request=request) from err except httpx.TimeoutException as err: + if response is not None: + await response.aclose() + if retries > 0: - return await self._retry_request(options, cast_to, retries, stream=stream, stream_cls=stream_cls) + return await self._retry_request( + options, + cast_to, + retries, + stream=stream, + stream_cls=stream_cls, + response_headers=response.headers if response is not None else None, + ) + raise APITimeoutError(request=request) from err except Exception as err: + if response is not None: + await response.aclose() + if retries > 0: - return await self._retry_request(options, cast_to, retries, stream=stream, stream_cls=stream_cls) + return await self._retry_request( + options, + cast_to, + retries, + stream=stream, + stream_cls=stream_cls, + response_headers=response.headers if response is not None else None, + ) + raise APIConnectionError(request=request) from err return self._process_response( @@ -1350,7 +1410,7 @@ async def _retry_request( options: FinalRequestOptions, cast_to: Type[ResponseT], remaining_retries: int, - response_headers: Optional[httpx.Headers] = None, + response_headers: httpx.Headers | None, *, stream: bool, stream_cls: type[_AsyncStreamT] | None, diff --git a/src/finch/_constants.py b/src/finch/_constants.py index 0c3f31df..39b46eb0 100644 --- a/src/finch/_constants.py +++ b/src/finch/_constants.py @@ -3,6 +3,7 @@ import httpx RAW_RESPONSE_HEADER = "X-Stainless-Raw-Response" +STREAMED_RAW_RESPONSE_HEADER = "X-Stainless-Streamed-Raw-Response" # default timeout is 1 minute DEFAULT_TIMEOUT = httpx.Timeout(timeout=60.0, connect=5.0) diff --git a/tests/test_client.py b/tests/test_client.py index 5c5b16b1..a95af2f6 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -18,7 +18,7 @@ from finch._types import Omit from finch._client import Finch, AsyncFinch from finch._models import BaseModel, FinalRequestOptions -from finch._exceptions import APIResponseValidationError +from finch._exceptions import APIStatusError, APIResponseValidationError from finch._base_client import ( DEFAULT_TIMEOUT, HTTPX_DEFAULT_TIMEOUT, @@ -704,6 +704,31 @@ def test_parse_retry_after_header(self, remaining_retries: int, retry_after: str calculated = client._calculate_retry_timeout(remaining_retries, options, headers) assert calculated == pytest.approx(timeout, 0.5 * 0.875) # pyright: ignore[reportUnknownMemberType] + @pytest.mark.respx(base_url=base_url) + def test_status_error_within_httpx(self, respx_mock: MockRouter) -> None: + respx_mock.post("/foo").mock(return_value=httpx.Response(200, json={"foo": "bar"})) + + def on_response(response: httpx.Response) -> None: + raise httpx.HTTPStatusError( + "Simulating an error inside httpx", + response=response, + request=response.request, + ) + + client = Finch( + base_url=base_url, + access_token=access_token, + _strict_response_validation=True, + http_client=httpx.Client( + event_hooks={ + "response": [on_response], + } + ), + max_retries=0, + ) + with pytest.raises(APIStatusError): + client.post("/foo", cast_to=httpx.Response) + class TestAsyncFinch: client = AsyncFinch(base_url=base_url, access_token=access_token, _strict_response_validation=True) @@ -1377,3 +1402,29 @@ async def test_parse_retry_after_header(self, remaining_retries: int, retry_afte options = FinalRequestOptions(method="get", url="/foo", max_retries=3) calculated = client._calculate_retry_timeout(remaining_retries, options, headers) assert calculated == pytest.approx(timeout, 0.5 * 0.875) # pyright: ignore[reportUnknownMemberType] + + @pytest.mark.respx(base_url=base_url) + @pytest.mark.asyncio + async def test_status_error_within_httpx(self, respx_mock: MockRouter) -> None: + respx_mock.post("/foo").mock(return_value=httpx.Response(200, json={"foo": "bar"})) + + def on_response(response: httpx.Response) -> None: + raise httpx.HTTPStatusError( + "Simulating an error inside httpx", + response=response, + request=response.request, + ) + + client = AsyncFinch( + base_url=base_url, + access_token=access_token, + _strict_response_validation=True, + http_client=httpx.AsyncClient( + event_hooks={ + "response": [on_response], + } + ), + max_retries=0, + ) + with pytest.raises(APIStatusError): + await client.post("/foo", cast_to=httpx.Response)