Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
xerial committed Sep 11, 2023
1 parent 724d58f commit ae64757
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ case class HttpClientConfig(
def noRequestFilter: HttpClientConfig = this.copy(requestFilter = identity)

/**
* Add a custom response filter, mostly for debugging purpose
* Add a custom response filter to the last response, mostly for debugging purpose
* @param newResponseFilter
* @return
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ import wvlet.airframe.codec.MessageCodec
import wvlet.airframe.codec.PrimitiveCodec.UnitCodec
import wvlet.airframe.control.Retry.{MaxRetryException, RetryContext}
import wvlet.airframe.control.{CircuitBreakerOpenException, Retry}
import wvlet.airframe.http.HttpMessage.{Request, Response}
import wvlet.airframe.http.*
import wvlet.airframe.rx.Rx
import wvlet.airframe.http.HttpMessage.{Request, Response}
import wvlet.airframe.surface.Surface
import wvlet.log.LogSupport

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package wvlet.airframe.http.client
import wvlet.airframe.control.CircuitBreaker
import wvlet.airframe.http.HttpMessage.{Request, Response}
import wvlet.airframe.http.*
import wvlet.airframe.rx.Rx
import wvlet.airframe.rx.{OnCompletion, OnError, OnNext, Rx, RxRunner}
import wvlet.airframe.surface.Surface

/**
Expand Down Expand Up @@ -51,21 +51,43 @@ trait SyncClient extends SyncClientCompat with HttpClientFactory[SyncClient] wit
val request = config.requestFilter(req)
var lastResponse: Option[Response] = None

try {
config.retryContext.runWithContext(request, circuitBreaker) {
loggingFilter
.andThen(config.clientFilter)
.apply(context)
.andThen(req => Rx.single(channel.send(req, config)))
.apply(request)
.run { resp =>
lastResponse = Some(config.responseFilter(resp))
// Build a chain of request filters
def requestPipeline: RxHttpEndpoint = {
loggingFilter(context)
.andThen { req =>
Rx.single(channel.send(req, config))
.tap { resp =>
// Remember the last response for the error reporting purpose
lastResponse = Some(resp)
}
}
}

val rx =
// Apply the client filter first to handle only the last response
config.clientFilter.andThen { req =>
config.retryContext
.runAsyncWithContext(req, circuitBreaker) {
requestPipeline(req)
}
.map { resp =>
// Apply the response filter for the successful response
config.responseFilter(resp)
}
.recover {
// Or if request has been failing, apply the response filter only to the last response
val response = lastResponse.map(config.responseFilter(_))
HttpClients.defaultHttpClientErrorHandler(response)
}
lastResponse.get
}
} catch {
HttpClients.defaultHttpClientErrorHandler(lastResponse)

RxRunner.run(rx(request)) {
case OnError(e) =>
throw e
case _ =>
//
}
lastResponse.get
}

/**
Expand Down

0 comments on commit ae64757

Please sign in to comment.