Skip to content

Commit

Permalink
Pekko & Akka backends
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Dec 30, 2024
1 parent 29e94ca commit 5857ff1
Show file tree
Hide file tree
Showing 13 changed files with 92 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,22 @@ import sttp.client4.compression.Compressor
import akka.stream.scaladsl.StreamConverters
import akka.stream.scaladsl.FileIO

trait AkkaCompressor[R <: AkkaStreams] extends Compressor[R] {
override abstract def apply(body: GenericRequestBody[R], encoding: String): GenericRequestBody[R] =
trait AkkaCompressor extends Compressor[AkkaStreams] {
override abstract def apply[R2 <: AkkaStreams](body: GenericRequestBody[R2]): GenericRequestBody[AkkaStreams] =
body match {
case InputStreamBody(b, _) => StreamBody(AkkaStreams)(compressStream(StreamConverters.fromInputStream(() => b)))
case StreamBody(b) => StreamBody(AkkaStreams)(compressStream(b.asInstanceOf[Source[ByteString, Any]]))
case FileBody(f, _) => StreamBody(AkkaStreams)(compressStream(FileIO.fromPath(f.toPath)))
case _ => super.apply(body, encoding)
case _ => super.apply(body)
}

def compressStream(stream: Source[ByteString, Any]): Source[ByteString, Any]
}

class GZipAkkaCompressor[R <: AkkaStreams] extends GZipDefaultCompressor[R] with AkkaCompressor[R] {
object GZipAkkaCompressor extends GZipDefaultCompressor[AkkaStreams] with AkkaCompressor {
def compressStream(stream: Source[ByteString, Any]): Source[ByteString, Any] = stream.via(Compression.gzip)
}

class DeflateAkkaCompressor[R <: AkkaStreams] extends DeflateDefaultCompressor[R] with AkkaCompressor[R] {
object DeflateAkkaCompressor extends DeflateDefaultCompressor[AkkaStreams] with AkkaCompressor {
def compressStream(stream: Source[ByteString, Any]): Source[ByteString, Any] = stream.via(Compression.deflate)
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package sttp.client4.akkahttp

import java.io.UnsupportedEncodingException
import akka.{Done, NotUsed}
import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.event.LoggingAdapter
import akka.http.scaladsl.coding.Coders
import akka.http.scaladsl.model.headers.{BasicHttpCredentials, HttpEncoding, HttpEncodings}
import akka.http.scaladsl.model.ws.{InvalidUpgradeResponse, Message, ValidUpgrade, WebSocketRequest}
import akka.http.scaladsl.model.{StatusCode => _, _}
Expand All @@ -14,12 +12,13 @@ import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Sink}
import sttp.capabilities.akka.AkkaStreams
import sttp.capabilities.{Effect, WebSockets}
import sttp.client4.akkahttp.AkkaHttpBackend.EncodingHandler
import sttp.client4.testing.WebSocketStreamBackendStub
import sttp.client4._
import sttp.client4.wrappers.FollowRedirectsBackend
import sttp.model.{ResponseMetadata, StatusCode}
import sttp.monad.{FutureMonad, MonadError}
import sttp.client4.compression.CompressionHandlers
import sttp.client4.compression.Decompressor

import scala.concurrent.{ExecutionContext, Future, Promise}

Expand All @@ -33,7 +32,7 @@ class AkkaHttpBackend private (
customizeRequest: HttpRequest => HttpRequest,
customizeWebsocketRequest: WebSocketRequest => WebSocketRequest,
customizeResponse: (HttpRequest, HttpResponse) => HttpResponse,
customEncodingHandler: EncodingHandler
compressionHandlers: CompressionHandlers[AkkaStreams, HttpResponse]
) extends WebSocketStreamBackend[Future, AkkaStreams] {
type R = AkkaStreams with WebSockets with Effect[Future]

Expand All @@ -49,11 +48,9 @@ class AkkaHttpBackend private (
if (r.isWebSocket) sendWebSocket(r) else sendRegular(r)
}

private val compressors = List(new GZipAkkaCompressor, new DeflateAkkaCompressor)

private def sendRegular[T](r: GenericRequest[T, R]): Future[Response[T]] =
Future
.fromTry(ToAkka.request(r).flatMap(BodyToAkka(r, _, compressors)))
.fromTry(ToAkka.request(r).flatMap(BodyToAkka(r, _, compressionHandlers.compressors)))
.map(customizeRequest)
.flatMap(request =>
http
Expand Down Expand Up @@ -143,14 +140,12 @@ class AkkaHttpBackend private (
// http://doc.akka.io/docs/akka-http/10.0.7/scala/http/common/de-coding.html
private def decodeAkkaResponse(response: HttpResponse, autoDecompressionEnabled: Boolean): HttpResponse =
if (!response.status.allowsEntity() || !autoDecompressionEnabled) response
else customEncodingHandler.orElse(EncodingHandler(standardEncoding)).apply(response -> response.encoding)

private def standardEncoding: (HttpResponse, HttpEncoding) => HttpResponse = {
case (body, HttpEncodings.gzip) => Coders.Gzip.decodeMessage(body)
case (body, HttpEncodings.deflate) => Coders.Deflate.decodeMessage(body)
case (body, HttpEncodings.identity) => Coders.NoCoding.decodeMessage(body)
case (_, ce) => throw new UnsupportedEncodingException(s"Unsupported encoding: $ce")
}
else
response.encoding match {
case HttpEncodings.identity => response
case encoding: HttpEncoding =>
Decompressor.decompressIfPossible(response, encoding.value, compressionHandlers.decompressors)
}

private def adjustExceptions[T](request: GenericRequest[_, _])(t: => Future[T]): Future[T] =
SttpClientException.adjustExceptions(monad)(t)(FromAkka.exception(request, _))
Expand All @@ -166,12 +161,11 @@ class AkkaHttpBackend private (
}

object AkkaHttpBackend {
type EncodingHandler = PartialFunction[(HttpResponse, HttpEncoding), HttpResponse]
object EncodingHandler {
def apply(f: (HttpResponse, HttpEncoding) => HttpResponse): EncodingHandler = { case (body, encoding) =>
f(body, encoding)
}
}
val DefaultCompressionHandlers: CompressionHandlers[AkkaStreams, HttpResponse] =
CompressionHandlers(
List(GZipAkkaCompressor, DeflateAkkaCompressor),
List(GZipAkkaDecompressor, DeflateAkkaDecompressor)
)

private def make(
actorSystem: ActorSystem,
Expand All @@ -183,7 +177,7 @@ object AkkaHttpBackend {
customizeRequest: HttpRequest => HttpRequest,
customizeWebsocketRequest: WebSocketRequest => WebSocketRequest = identity,
customizeResponse: (HttpRequest, HttpResponse) => HttpResponse = (_, r) => r,
customEncodingHandler: EncodingHandler = PartialFunction.empty
compressionHandlers: CompressionHandlers[AkkaStreams, HttpResponse]
): WebSocketStreamBackend[Future, AkkaStreams] =
FollowRedirectsBackend(
new AkkaHttpBackend(
Expand All @@ -196,7 +190,7 @@ object AkkaHttpBackend {
customizeRequest,
customizeWebsocketRequest,
customizeResponse,
customEncodingHandler
compressionHandlers
)
)

Expand All @@ -212,7 +206,7 @@ object AkkaHttpBackend {
customizeRequest: HttpRequest => HttpRequest = identity,
customizeWebsocketRequest: WebSocketRequest => WebSocketRequest = identity,
customizeResponse: (HttpRequest, HttpResponse) => HttpResponse = (_, r) => r,
customEncodingHandler: EncodingHandler = PartialFunction.empty
compressionHandlers: CompressionHandlers[AkkaStreams, HttpResponse] = DefaultCompressionHandlers
)(implicit
ec: Option[ExecutionContext] = None
): WebSocketStreamBackend[Future, AkkaStreams] = {
Expand All @@ -228,7 +222,7 @@ object AkkaHttpBackend {
customizeRequest,
customizeWebsocketRequest,
customizeResponse,
customEncodingHandler
compressionHandlers
)
}

Expand All @@ -247,7 +241,7 @@ object AkkaHttpBackend {
customizeRequest: HttpRequest => HttpRequest = identity,
customizeWebsocketRequest: WebSocketRequest => WebSocketRequest = identity,
customizeResponse: (HttpRequest, HttpResponse) => HttpResponse = (_, r) => r,
customEncodingHandler: EncodingHandler = PartialFunction.empty
compressionHandlers: CompressionHandlers[AkkaStreams, HttpResponse] = DefaultCompressionHandlers
)(implicit
ec: Option[ExecutionContext] = None
): WebSocketStreamBackend[Future, AkkaStreams] =
Expand All @@ -259,7 +253,7 @@ object AkkaHttpBackend {
customizeRequest,
customizeWebsocketRequest,
customizeResponse,
customEncodingHandler
compressionHandlers
)

/** @param actorSystem
Expand All @@ -276,7 +270,7 @@ object AkkaHttpBackend {
customizeRequest: HttpRequest => HttpRequest = identity,
customizeWebsocketRequest: WebSocketRequest => WebSocketRequest = identity,
customizeResponse: (HttpRequest, HttpResponse) => HttpResponse = (_, r) => r,
customEncodingHandler: EncodingHandler = PartialFunction.empty
compressionHandlers: CompressionHandlers[AkkaStreams, HttpResponse] = DefaultCompressionHandlers
)(implicit
ec: Option[ExecutionContext] = None
): WebSocketStreamBackend[Future, AkkaStreams] =
Expand All @@ -290,7 +284,7 @@ object AkkaHttpBackend {
customizeRequest,
customizeWebsocketRequest,
customizeResponse,
customEncodingHandler
compressionHandlers
)

/** Create a stub backend for testing, which uses the [[Future]] response wrapper, and doesn't support streaming.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package sttp.client4.akkahttp

import sttp.client4.compression.Decompressor
import sttp.model.Encodings
import akka.http.scaladsl.coding.Coders
import akka.http.scaladsl.model.HttpResponse

object GZipAkkaDecompressor extends Decompressor[HttpResponse] {
override val encoding: String = Encodings.Gzip
override def apply(body: HttpResponse): HttpResponse = Coders.Gzip.decodeMessage(body)
}

object DeflateAkkaDecompressor extends Decompressor[HttpResponse] {
override val encoding: String = Encodings.Deflate
override def apply(body: HttpResponse): HttpResponse = Coders.Deflate.decodeMessage(body)
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class HttpClientZioBackend private (
object HttpClientZioBackend {
val DefaultCompressionHandlers: CompressionHandlers[ZioStreams, ZioStreams.BinaryStream] =
CompressionHandlers(
List(new GZipZioCompressor(), new DeflateZioCompressor()),
List(GZipZioCompressor, DeflateZioCompressor),
List(GZipZioDecompressor, DeflateZioDecompressor)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import sttp.client4.compression.DeflateDefaultCompressor
import zio.stream.ZPipeline
import zio.stream.ZStream

trait ZioCompressor[R <: ZioStreams] extends Compressor[R] {
override abstract def apply[R2 <: R](body: GenericRequestBody[R2]): GenericRequestBody[R] =
trait ZioCompressor extends Compressor[ZioStreams] {
override abstract def apply[R2 <: ZioStreams](body: GenericRequestBody[R2]): GenericRequestBody[ZioStreams] =
body match {
case InputStreamBody(b, _) => StreamBody(ZioStreams)(compressStream(ZStream.fromInputStream(b)))
case StreamBody(b) => StreamBody(ZioStreams)(compressStream(b.asInstanceOf[Stream[Throwable, Byte]]))
Expand All @@ -22,10 +22,10 @@ trait ZioCompressor[R <: ZioStreams] extends Compressor[R] {
def compressStream(stream: Stream[Throwable, Byte]): Stream[Throwable, Byte]
}

class GZipZioCompressor[R <: ZioStreams] extends GZipDefaultCompressor[R] with ZioCompressor[R] {
object GZipZioCompressor extends GZipDefaultCompressor[ZioStreams] with ZioCompressor {
def compressStream(stream: Stream[Throwable, Byte]): Stream[Throwable, Byte] = stream.via(ZPipeline.gzip())
}

class DeflateZioCompressor[R <: ZioStreams] extends DeflateDefaultCompressor[R] with ZioCompressor[R] {
object DeflateZioCompressor extends DeflateDefaultCompressor[ZioStreams] with ZioCompressor {
def compressStream(stream: Stream[Throwable, Byte]): Stream[Throwable, Byte] = stream.via(ZPipeline.deflate())
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import sttp.client4.compression.Compressor
import org.apache.pekko.stream.scaladsl.StreamConverters
import org.apache.pekko.stream.scaladsl.FileIO

trait PekkoCompressor[R <: PekkoStreams] extends Compressor[R] {
override abstract def apply(body: GenericRequestBody[R]): GenericRequestBody[R] =
trait PekkoCompressor extends Compressor[PekkoStreams] {
override abstract def apply[R2 <: PekkoStreams](body: GenericRequestBody[R2]): GenericRequestBody[PekkoStreams] =
body match {
case InputStreamBody(b, _) => StreamBody(PekkoStreams)(compressStream(StreamConverters.fromInputStream(() => b)))
case StreamBody(b) => StreamBody(PekkoStreams)(compressStream(b.asInstanceOf[Source[ByteString, Any]]))
Expand All @@ -23,10 +23,10 @@ trait PekkoCompressor[R <: PekkoStreams] extends Compressor[R] {
def compressStream(stream: Source[ByteString, Any]): Source[ByteString, Any]
}

class GZipPekkoCompressor[R <: PekkoStreams] extends GZipDefaultCompressor[R] with PekkoCompressor[R] {
object GZipPekkoCompressor extends GZipDefaultCompressor[PekkoStreams] with PekkoCompressor {
def compressStream(stream: Source[ByteString, Any]): Source[ByteString, Any] = stream.via(Compression.gzip)
}

class DeflatePekkoCompressor[R <: PekkoStreams] extends DeflateDefaultCompressor[R] with PekkoCompressor[R] {
object DeflatePekkoCompressor extends DeflateDefaultCompressor[PekkoStreams] with PekkoCompressor {
def compressStream(stream: Source[ByteString, Any]): Source[ByteString, Any] = stream.via(Compression.deflate)
}
Loading

0 comments on commit 5857ff1

Please sign in to comment.