From ae647577db84280a0010b4d9e03ad1b6736acba3 Mon Sep 17 00:00:00 2001 From: "Taro L. Saito" Date: Mon, 11 Sep 2023 09:14:49 -0700 Subject: [PATCH] wip --- .../http/client/HttpClientConfig.scala | 2 +- .../airframe/http/client/HttpClients.scala | 3 +- .../airframe/http/client/SyncClient.scala | 48 ++++++++++++++----- 3 files changed, 37 insertions(+), 16 deletions(-) diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClientConfig.scala b/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClientConfig.scala index 27ed2375be..babc53e6dc 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClientConfig.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClientConfig.scala @@ -84,7 +84,7 @@ case class HttpClientConfig( def noRequestFilter: HttpClientConfig = this.copy(requestFilter = identity) /** - * Add a custom response filter, mostly for debugging purpose + * Add a custom response filter to the last response, mostly for debugging purpose * @param newResponseFilter * @return */ diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClients.scala b/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClients.scala index c60cbfb38f..a515eecb82 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClients.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/client/HttpClients.scala @@ -17,9 +17,8 @@ import wvlet.airframe.codec.MessageCodec import wvlet.airframe.codec.PrimitiveCodec.UnitCodec import wvlet.airframe.control.Retry.{MaxRetryException, RetryContext} import wvlet.airframe.control.{CircuitBreakerOpenException, Retry} -import wvlet.airframe.http.HttpMessage.{Request, Response} import wvlet.airframe.http.* -import wvlet.airframe.rx.Rx +import wvlet.airframe.http.HttpMessage.{Request, Response} import wvlet.airframe.surface.Surface import wvlet.log.LogSupport diff --git a/airframe-http/src/main/scala/wvlet/airframe/http/client/SyncClient.scala b/airframe-http/src/main/scala/wvlet/airframe/http/client/SyncClient.scala index 43659cc9ea..45609b0481 100644 --- a/airframe-http/src/main/scala/wvlet/airframe/http/client/SyncClient.scala +++ b/airframe-http/src/main/scala/wvlet/airframe/http/client/SyncClient.scala @@ -16,7 +16,7 @@ package wvlet.airframe.http.client import wvlet.airframe.control.CircuitBreaker import wvlet.airframe.http.HttpMessage.{Request, Response} import wvlet.airframe.http.* -import wvlet.airframe.rx.Rx +import wvlet.airframe.rx.{OnCompletion, OnError, OnNext, Rx, RxRunner} import wvlet.airframe.surface.Surface /** @@ -51,21 +51,43 @@ trait SyncClient extends SyncClientCompat with HttpClientFactory[SyncClient] wit val request = config.requestFilter(req) var lastResponse: Option[Response] = None - try { - config.retryContext.runWithContext(request, circuitBreaker) { - loggingFilter - .andThen(config.clientFilter) - .apply(context) - .andThen(req => Rx.single(channel.send(req, config))) - .apply(request) - .run { resp => - lastResponse = Some(config.responseFilter(resp)) + // Build a chain of request filters + def requestPipeline: RxHttpEndpoint = { + loggingFilter(context) + .andThen { req => + Rx.single(channel.send(req, config)) + .tap { resp => + // Remember the last response for the error reporting purpose + lastResponse = Some(resp) + } + } + } + + val rx = + // Apply the client filter first to handle only the last response + config.clientFilter.andThen { req => + config.retryContext + .runAsyncWithContext(req, circuitBreaker) { + requestPipeline(req) + } + .map { resp => + // Apply the response filter for the successful response + config.responseFilter(resp) + } + .recover { + // Or if request has been failing, apply the response filter only to the last response + val response = lastResponse.map(config.responseFilter(_)) + HttpClients.defaultHttpClientErrorHandler(response) } - lastResponse.get } - } catch { - HttpClients.defaultHttpClientErrorHandler(lastResponse) + + RxRunner.run(rx(request)) { + case OnError(e) => + throw e + case _ => + // } + lastResponse.get } /**