Skip to content
This repository has been archived by the owner. It is now read-only.

Commit

Permalink
Introduce pekko interop
Browse files Browse the repository at this point in the history
  • Loading branch information
greenhost87 committed Aug 13, 2023
1 parent 97af24d commit 457287e
Show file tree
Hide file tree
Showing 11 changed files with 805 additions and 1 deletion.
21 changes: 20 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ val levshaVersion = "1.3.0"
val akkaVersion = "2.6.19"
val akkaHttpVersion = "10.2.9"

val pekkoVersion = "1.0.0"
val pekkoHttpVersion = "0.0.0+4469-fb6a5426-SNAPSHOT"

val circeVersion = "0.14.1"
val ce2Version = "2.5.5"
val ce3Version = "3.3.12"
Expand Down Expand Up @@ -220,6 +223,22 @@ lazy val akka = project
)
.dependsOn(korolev)

lazy val pekko = project
.in(interop / "pekko")
.enablePlugins(GitVersioning)
.settings(crossVersionSettings)
.settings(commonSettings: _*)
.settings(
resolvers += "pekko-http-snapshot-repository" at "https://repository.apache.org/content/repositories/snapshots",
normalizedName := "korolev-pekko",
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-actor" % pekkoVersion,
"org.apache.pekko" %% "pekko-stream" % pekkoVersion,
"org.apache.pekko" %% "pekko-http" % pekkoHttpVersion
)
)
.dependsOn(korolev)

lazy val zioHttp = project
.in(interop / "zio-http")
.enablePlugins(GitVersioning)
Expand Down Expand Up @@ -556,7 +575,7 @@ lazy val root = project
korolev, effect, web, http, standalone, testkit,
bytes, webDsl,
// Interop
akka, ce2, ce3, monix, zio, zioStreams, zio2, zio2Streams, slf4j,
akka, pekko, ce2, ce3, monix, zio, zioStreams, zio2, zio2Streams, slf4j,
scodec, fs2ce2, fs2ce3, zioHttp,
// Examples
simpleExample, routingExample, gameOfLifeExample,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2017-2020 Aleksey Fomkin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package korolev.pekko

import scala.concurrent.duration._

case class PekkoHttpServerConfig(maxRequestBodySize: Int = PekkoHttpServerConfig.DefaultMaxRequestBodySize,
outputBufferSize: Int = PekkoHttpServerConfig.DefaultOutputBufferSize,
wsStreamedCompletionTimeout: FiniteDuration = PekkoHttpServerConfig.DefaultWsStreamedCompletionTimeout,
wsStreamedParallelism: Int = PekkoHttpServerConfig.DefaultWsStreamedParallelism)

object PekkoHttpServerConfig {
val DefaultMaxRequestBodySize: Int = 8 * 1024 * 1024
val DefaultOutputBufferSize: Int = 1000
val DefaultWsStreamedCompletionTimeout: FiniteDuration = 30.seconds
val DefaultWsStreamedParallelism: Int = 2
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2017-2020 Aleksey Fomkin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package korolev.pekko

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http

abstract class SimplePekkoHttpKorolevApp(config: PekkoHttpServerConfig = null) {

implicit val actorSystem: ActorSystem = ActorSystem()

def service: PekkoHttpService

def main(args: Array[String]): Unit = {
val escapedConfig =
if (config == null) PekkoHttpServerConfig()
else config
val route = service(escapedConfig)
Http().newServerAt("0.0.0.0", 8080).bindFlow(route)
()
}
}
73 changes: 73 additions & 0 deletions interop/pekko/src/main/scala/korolev/pekko/instances.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2017-2020 Aleksey Fomkin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package korolev.pekko

import org.apache.pekko.NotUsed
import org.apache.pekko.stream.OverflowStrategy
import org.apache.pekko.stream.scaladsl.{Sink, Source}
import org.apache.pekko.util.ByteString
import korolev.pekko.util.{PekkoByteStringBytesLike, KorolevStreamPublisher, KorolevStreamSubscriber}
import korolev.data.BytesLike
import korolev.effect.{Effect, Stream}
import org.reactivestreams.Publisher

import scala.concurrent.ExecutionContext

object instances {

implicit final class SinkCompanionOps(value: Sink.type) {
def korolevStream[F[_]: Effect, T]: Sink[T, Stream[F, T]] = {
val subscriber = new KorolevStreamSubscriber[F, T]()
Sink
.fromSubscriber(subscriber)
.mapMaterializedValue(_ => subscriber)
}
}

implicit final class StreamCompanionOps(value: Stream.type) {
def fromPublisher[F[_]: Effect, T](publisher: Publisher[T]): Stream[F, T] = {
val result = new KorolevStreamSubscriber[F, T]()
publisher.subscribe(result)
result
}
}

implicit final class KorolevStreamsOps[F[_]: Effect, T](stream: Stream[F, T]) {

/**
* Converts korolev [[korolev.effect.Stream]] to [[Publisher]].
*
* If `fanout` is `true`, the `Publisher` will support multiple `Subscriber`s and
* the size of the `inputBuffer` configured for this operator becomes the maximum number of elements that
* the fastest [[org.reactivestreams.Subscriber]] can be ahead of the slowest one before slowing
* the processing down due to back pressure.
*
* If `fanout` is `false` then the `Publisher` will only support a single `Subscriber` and
* reject any additional `Subscriber`s with [[korolev.pekko.util.KorolevStreamPublisher.MultipleSubscribersProhibitedException]].
*/
def asPublisher(fanout: Boolean = false)(implicit ec: ExecutionContext): Publisher[T] =
new KorolevStreamPublisher(stream, fanout)

def asPekkoSource(implicit ec: ExecutionContext): Source[T, NotUsed] = {
val publisher = new KorolevStreamPublisher(stream, fanout = false)
Source.fromPublisher(publisher)
}
}

implicit final val pekkoByteStringBytesLike: BytesLike[ByteString] =
new PekkoByteStringBytesLike()
}
194 changes: 194 additions & 0 deletions interop/pekko/src/main/scala/korolev/pekko/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Copyright 2017-2020 Aleksey Fomkin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package korolev

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.model._
import org.apache.pekko.http.scaladsl.model.headers.RawHeader
import org.apache.pekko.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage}
import org.apache.pekko.http.scaladsl.server.Directives._
import org.apache.pekko.http.scaladsl.server.Route
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.{Flow, Keep, Sink}
import org.apache.pekko.util.ByteString
import korolev.pekko.util.LoggingReporter
import korolev.data.{Bytes, BytesLike}
import korolev.effect.{Effect, Reporter, Stream}
import korolev.server.internal.BadRequestException
import korolev.server.{WebSocketRequest as KorolevWebSocketRequest, WebSocketResponse as KorolevWebSocketResponse}
import korolev.server.{KorolevService, KorolevServiceConfig, HttpRequest as KorolevHttpRequest}
import korolev.state.{StateDeserializer, StateSerializer}
import korolev.web.{PathAndQuery, Request as KorolevRequest, Response as KorolevResponse}

import scala.concurrent.{ExecutionContext, Future}

package object pekko {

type PekkoHttpService = PekkoHttpServerConfig => Route

import instances._

def pekkoHttpService[F[_]: Effect, S: StateSerializer: StateDeserializer, M]
(config: KorolevServiceConfig[F, S, M], wsLoggingEnabled: Boolean = false)
(implicit actorSystem: ActorSystem, materializer: Materializer, ec: ExecutionContext): PekkoHttpService = { pekkoHttpConfig =>
// If reporter wasn't overridden, use pekko-logging reporter.
val actualConfig =
if (config.reporter != Reporter.PrintReporter) config
else config.copy(reporter = new LoggingReporter(actorSystem))

val korolevServer = korolev.server.korolevService(actualConfig)
val wsRouter = configureWsRoute(korolevServer, pekkoHttpConfig, actualConfig, wsLoggingEnabled)
val httpRoute = configureHttpRoute(korolevServer)

wsRouter ~ httpRoute
}

private def configureWsRoute[F[_]: Effect, S: StateSerializer: StateDeserializer, M]
(korolevServer: KorolevService[F],
pekkoHttpConfig: PekkoHttpServerConfig,
korolevServiceConfig: KorolevServiceConfig[F, S, M],
wsLoggingEnabled: Boolean)
(implicit materializer: Materializer, ec: ExecutionContext): Route =
extractRequest { request =>
extractUnmatchedPath { path =>
extractWebSocketUpgrade { upgrade =>
// inSink - consume messages from the client
// outSource - push messages to the client
val (inStream, inSink) = Sink.korolevStream[F, Bytes].preMaterialize()
val korolevRequest = mkKorolevRequest(request, path.toString, inStream)

complete {
val korolevWsRequest = KorolevWebSocketRequest(korolevRequest, upgrade.requestedProtocols)
Effect[F].toFuture(korolevServer.ws(korolevWsRequest)).map {
case KorolevWebSocketResponse(KorolevResponse(_, outStream, _, _), selectedProtocol) =>
val source = outStream
.asPekkoSource
.map(text => BinaryMessage.Strict(text.as[ByteString]))
val sink = Flow[Message]
.mapAsync(pekkoHttpConfig.wsStreamedParallelism) {
case TextMessage.Strict(message) =>
Future.successful(Some(BytesLike[Bytes].utf8(message)))
case TextMessage.Streamed(stream) =>
stream
.completionTimeout(pekkoHttpConfig.wsStreamedCompletionTimeout)
.runFold("")(_ + _)
.map(message => Some(BytesLike[Bytes].utf8(message)))
case BinaryMessage.Strict(data) =>
Future.successful(Some(Bytes.wrap(data)))
case BinaryMessage.Streamed(stream) =>
stream
.completionTimeout(pekkoHttpConfig.wsStreamedCompletionTimeout)
.runFold(ByteString.empty)(_ ++ _)
.map(message => Some(Bytes.wrap(message)))
}
.recover {
case ex =>
korolevServiceConfig.reporter.error(s"WebSocket exception ${ex.getMessage}, shutdown output stream", ex)
outStream.cancel()
None
}
.collect {
case Some(message) =>
message
}
.to(inSink)

upgrade.handleMessages(
if(wsLoggingEnabled) {
Flow.fromSinkAndSourceCoupled(sink, source).log("korolev-ws")
} else {
Flow.fromSinkAndSourceCoupled(sink, source)
},
Some(selectedProtocol)
)
case _ =>
throw new RuntimeException // cannot happen
}.recover {
case BadRequestException(message) =>
HttpResponse(StatusCodes.BadRequest, entity = HttpEntity(message))
}
}
}
}
}

private def configureHttpRoute[F[_]](korolevServer: KorolevService[F])(implicit mat: Materializer, async: Effect[F], ec: ExecutionContext): Route =
extractUnmatchedPath { path =>
extractRequest { request =>
val sink = Sink.korolevStream[F, Bytes]
val body =
if (request.method == HttpMethods.GET) {
Stream.empty[F, Bytes]
} else {
request
.entity
.dataBytes
.map(Bytes.wrap(_))
.toMat(sink)(Keep.right)
.run()
}
val korolevRequest = mkKorolevRequest(request, path.toString, body)
val responseF = handleHttpResponse(korolevServer, korolevRequest)
complete(responseF)
}
}

private def mkKorolevRequest[F[_], Body](request: HttpRequest,
path: String,
body: Body): KorolevRequest[Body] =
KorolevRequest(
pq = PathAndQuery.fromString(path).withParams(request.uri.rawQueryString),
method = KorolevRequest.Method.fromString(request.method.value),
contentLength = request.headers.find(_.is("content-length")).map(_.value().toLong),
renderedCookie = request.headers.find(_.is("cookie")).map(_.value()).getOrElse(""),
headers = {
val contentType = request.entity.contentType
val contentTypeHeaders =
if (contentType.mediaType.isMultipart) Seq("content-type" -> contentType.toString) else Seq.empty
request.headers.map(h => (h.name(), h.value())) ++ contentTypeHeaders
},
body = body
)

private def handleHttpResponse[F[_]: Effect](korolevServer: KorolevService[F],
korolevRequest: KorolevHttpRequest[F])(implicit ec: ExecutionContext): Future[HttpResponse] =
Effect[F].toFuture(korolevServer.http(korolevRequest)).map {
case response @ KorolevResponse(status, body, responseHeaders, _) =>
val (contentTypeOpt, otherHeaders) = getContentTypeAndResponseHeaders(responseHeaders)
val bytesSource = body.asPekkoSource.map(_.as[ByteString])
HttpResponse(
StatusCode.int2StatusCode(status.code),
otherHeaders,
response.contentLength match {
case Some(bytesLength) => HttpEntity(contentTypeOpt.getOrElse(ContentTypes.NoContentType), bytesLength, bytesSource)
case None => HttpEntity(contentTypeOpt.getOrElse(ContentTypes.NoContentType), bytesSource)
}
)
}

private def getContentTypeAndResponseHeaders(responseHeaders: Seq[(String, String)]): (Option[ContentType], List[HttpHeader]) = {
val headers = responseHeaders.map { case (name, value) =>
HttpHeader.parse(name, value) match {
case HttpHeader.ParsingResult.Ok(header, _) => header
case _ => RawHeader(name, value)
}
}
val (contentTypeHeaders, otherHeaders) = headers.partition(_.lowercaseName() == "content-type")
val contentTypeOpt = contentTypeHeaders.headOption.flatMap(h => ContentType.parse(h.value()).right.toOption)
(contentTypeOpt, otherHeaders.toList)
}
}
Loading

0 comments on commit 457287e

Please sign in to comment.