diff --git a/build.sbt b/build.sbt index 8ebaa048..a387329f 100644 --- a/build.sbt +++ b/build.sbt @@ -9,6 +9,9 @@ val levshaVersion = "1.3.0" val akkaVersion = "2.6.19" val akkaHttpVersion = "10.2.9" +val pekkoVersion = "1.0.0" +val pekkoHttpVersion = "0.0.0+4469-fb6a5426-SNAPSHOT" + val circeVersion = "0.14.1" val ce2Version = "2.5.5" val ce3Version = "3.3.12" @@ -220,6 +223,22 @@ lazy val akka = project ) .dependsOn(korolev) +lazy val pekko = project + .in(interop / "pekko") + .enablePlugins(GitVersioning) + .settings(crossVersionSettings) + .settings(commonSettings: _*) + .settings( + resolvers += "pekko-http-snapshot-repository" at "https://repository.apache.org/content/repositories/snapshots", + normalizedName := "korolev-pekko", + libraryDependencies ++= Seq( + "org.apache.pekko" %% "pekko-actor" % pekkoVersion, + "org.apache.pekko" %% "pekko-stream" % pekkoVersion, + "org.apache.pekko" %% "pekko-http" % pekkoHttpVersion + ) + ) + .dependsOn(korolev) + lazy val zioHttp = project .in(interop / "zio-http") .enablePlugins(GitVersioning) @@ -556,7 +575,7 @@ lazy val root = project korolev, effect, web, http, standalone, testkit, bytes, webDsl, // Interop - akka, ce2, ce3, monix, zio, zioStreams, zio2, zio2Streams, slf4j, + akka, pekko, ce2, ce3, monix, zio, zioStreams, zio2, zio2Streams, slf4j, scodec, fs2ce2, fs2ce3, zioHttp, // Examples simpleExample, routingExample, gameOfLifeExample, diff --git a/interop/pekko/src/main/scala/korolev/pekko/PekkoHttpServerConfig.scala b/interop/pekko/src/main/scala/korolev/pekko/PekkoHttpServerConfig.scala new file mode 100644 index 00000000..014bc284 --- /dev/null +++ b/interop/pekko/src/main/scala/korolev/pekko/PekkoHttpServerConfig.scala @@ -0,0 +1,31 @@ +/* + * Copyright 2017-2020 Aleksey Fomkin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package korolev.pekko + +import scala.concurrent.duration._ + +case class PekkoHttpServerConfig(maxRequestBodySize: Int = PekkoHttpServerConfig.DefaultMaxRequestBodySize, + outputBufferSize: Int = PekkoHttpServerConfig.DefaultOutputBufferSize, + wsStreamedCompletionTimeout: FiniteDuration = PekkoHttpServerConfig.DefaultWsStreamedCompletionTimeout, + wsStreamedParallelism: Int = PekkoHttpServerConfig.DefaultWsStreamedParallelism) + +object PekkoHttpServerConfig { + val DefaultMaxRequestBodySize: Int = 8 * 1024 * 1024 + val DefaultOutputBufferSize: Int = 1000 + val DefaultWsStreamedCompletionTimeout: FiniteDuration = 30.seconds + val DefaultWsStreamedParallelism: Int = 2 +} diff --git a/interop/pekko/src/main/scala/korolev/pekko/SimplePekkoHttpKorolevApp.scala b/interop/pekko/src/main/scala/korolev/pekko/SimplePekkoHttpKorolevApp.scala new file mode 100644 index 00000000..a6448348 --- /dev/null +++ b/interop/pekko/src/main/scala/korolev/pekko/SimplePekkoHttpKorolevApp.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2017-2020 Aleksey Fomkin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package korolev.pekko + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.Http + +abstract class SimplePekkoHttpKorolevApp(config: PekkoHttpServerConfig = null) { + + implicit val actorSystem: ActorSystem = ActorSystem() + + def service: PekkoHttpService + + def main(args: Array[String]): Unit = { + val escapedConfig = + if (config == null) PekkoHttpServerConfig() + else config + val route = service(escapedConfig) + Http().newServerAt("0.0.0.0", 8080).bindFlow(route) + () + } +} diff --git a/interop/pekko/src/main/scala/korolev/pekko/instances.scala b/interop/pekko/src/main/scala/korolev/pekko/instances.scala new file mode 100644 index 00000000..e21dd0a5 --- /dev/null +++ b/interop/pekko/src/main/scala/korolev/pekko/instances.scala @@ -0,0 +1,73 @@ +/* + * Copyright 2017-2020 Aleksey Fomkin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package korolev.pekko + +import org.apache.pekko.NotUsed +import org.apache.pekko.stream.OverflowStrategy +import org.apache.pekko.stream.scaladsl.{Sink, Source} +import org.apache.pekko.util.ByteString +import korolev.pekko.util.{PekkoByteStringBytesLike, KorolevStreamPublisher, KorolevStreamSubscriber} +import korolev.data.BytesLike +import korolev.effect.{Effect, Stream} +import org.reactivestreams.Publisher + +import scala.concurrent.ExecutionContext + +object instances { + + implicit final class SinkCompanionOps(value: Sink.type) { + def korolevStream[F[_]: Effect, T]: Sink[T, Stream[F, T]] = { + val subscriber = new KorolevStreamSubscriber[F, T]() + Sink + .fromSubscriber(subscriber) + .mapMaterializedValue(_ => subscriber) + } + } + + implicit final class StreamCompanionOps(value: Stream.type) { + def fromPublisher[F[_]: Effect, T](publisher: Publisher[T]): Stream[F, T] = { + val result = new KorolevStreamSubscriber[F, T]() + publisher.subscribe(result) + result + } + } + + implicit final class KorolevStreamsOps[F[_]: Effect, T](stream: Stream[F, T]) { + + /** + * Converts korolev [[korolev.effect.Stream]] to [[Publisher]]. + * + * If `fanout` is `true`, the `Publisher` will support multiple `Subscriber`s and + * the size of the `inputBuffer` configured for this operator becomes the maximum number of elements that + * the fastest [[org.reactivestreams.Subscriber]] can be ahead of the slowest one before slowing + * the processing down due to back pressure. + * + * If `fanout` is `false` then the `Publisher` will only support a single `Subscriber` and + * reject any additional `Subscriber`s with [[korolev.pekko.util.KorolevStreamPublisher.MultipleSubscribersProhibitedException]]. + */ + def asPublisher(fanout: Boolean = false)(implicit ec: ExecutionContext): Publisher[T] = + new KorolevStreamPublisher(stream, fanout) + + def asPekkoSource(implicit ec: ExecutionContext): Source[T, NotUsed] = { + val publisher = new KorolevStreamPublisher(stream, fanout = false) + Source.fromPublisher(publisher) + } + } + + implicit final val pekkoByteStringBytesLike: BytesLike[ByteString] = + new PekkoByteStringBytesLike() +} diff --git a/interop/pekko/src/main/scala/korolev/pekko/package.scala b/interop/pekko/src/main/scala/korolev/pekko/package.scala new file mode 100644 index 00000000..b0fe3fa5 --- /dev/null +++ b/interop/pekko/src/main/scala/korolev/pekko/package.scala @@ -0,0 +1,194 @@ +/* + * Copyright 2017-2020 Aleksey Fomkin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package korolev + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.model._ +import org.apache.pekko.http.scaladsl.model.headers.RawHeader +import org.apache.pekko.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage} +import org.apache.pekko.http.scaladsl.server.Directives._ +import org.apache.pekko.http.scaladsl.server.Route +import org.apache.pekko.stream.Materializer +import org.apache.pekko.stream.scaladsl.{Flow, Keep, Sink} +import org.apache.pekko.util.ByteString +import korolev.pekko.util.LoggingReporter +import korolev.data.{Bytes, BytesLike} +import korolev.effect.{Effect, Reporter, Stream} +import korolev.server.internal.BadRequestException +import korolev.server.{WebSocketRequest as KorolevWebSocketRequest, WebSocketResponse as KorolevWebSocketResponse} +import korolev.server.{KorolevService, KorolevServiceConfig, HttpRequest as KorolevHttpRequest} +import korolev.state.{StateDeserializer, StateSerializer} +import korolev.web.{PathAndQuery, Request as KorolevRequest, Response as KorolevResponse} + +import scala.concurrent.{ExecutionContext, Future} + +package object pekko { + + type PekkoHttpService = PekkoHttpServerConfig => Route + + import instances._ + + def pekkoHttpService[F[_]: Effect, S: StateSerializer: StateDeserializer, M] + (config: KorolevServiceConfig[F, S, M], wsLoggingEnabled: Boolean = false) + (implicit actorSystem: ActorSystem, materializer: Materializer, ec: ExecutionContext): PekkoHttpService = { pekkoHttpConfig => + // If reporter wasn't overridden, use pekko-logging reporter. + val actualConfig = + if (config.reporter != Reporter.PrintReporter) config + else config.copy(reporter = new LoggingReporter(actorSystem)) + + val korolevServer = korolev.server.korolevService(actualConfig) + val wsRouter = configureWsRoute(korolevServer, pekkoHttpConfig, actualConfig, wsLoggingEnabled) + val httpRoute = configureHttpRoute(korolevServer) + + wsRouter ~ httpRoute + } + + private def configureWsRoute[F[_]: Effect, S: StateSerializer: StateDeserializer, M] + (korolevServer: KorolevService[F], + pekkoHttpConfig: PekkoHttpServerConfig, + korolevServiceConfig: KorolevServiceConfig[F, S, M], + wsLoggingEnabled: Boolean) + (implicit materializer: Materializer, ec: ExecutionContext): Route = + extractRequest { request => + extractUnmatchedPath { path => + extractWebSocketUpgrade { upgrade => + // inSink - consume messages from the client + // outSource - push messages to the client + val (inStream, inSink) = Sink.korolevStream[F, Bytes].preMaterialize() + val korolevRequest = mkKorolevRequest(request, path.toString, inStream) + + complete { + val korolevWsRequest = KorolevWebSocketRequest(korolevRequest, upgrade.requestedProtocols) + Effect[F].toFuture(korolevServer.ws(korolevWsRequest)).map { + case KorolevWebSocketResponse(KorolevResponse(_, outStream, _, _), selectedProtocol) => + val source = outStream + .asPekkoSource + .map(text => BinaryMessage.Strict(text.as[ByteString])) + val sink = Flow[Message] + .mapAsync(pekkoHttpConfig.wsStreamedParallelism) { + case TextMessage.Strict(message) => + Future.successful(Some(BytesLike[Bytes].utf8(message))) + case TextMessage.Streamed(stream) => + stream + .completionTimeout(pekkoHttpConfig.wsStreamedCompletionTimeout) + .runFold("")(_ + _) + .map(message => Some(BytesLike[Bytes].utf8(message))) + case BinaryMessage.Strict(data) => + Future.successful(Some(Bytes.wrap(data))) + case BinaryMessage.Streamed(stream) => + stream + .completionTimeout(pekkoHttpConfig.wsStreamedCompletionTimeout) + .runFold(ByteString.empty)(_ ++ _) + .map(message => Some(Bytes.wrap(message))) + } + .recover { + case ex => + korolevServiceConfig.reporter.error(s"WebSocket exception ${ex.getMessage}, shutdown output stream", ex) + outStream.cancel() + None + } + .collect { + case Some(message) => + message + } + .to(inSink) + + upgrade.handleMessages( + if(wsLoggingEnabled) { + Flow.fromSinkAndSourceCoupled(sink, source).log("korolev-ws") + } else { + Flow.fromSinkAndSourceCoupled(sink, source) + }, + Some(selectedProtocol) + ) + case _ => + throw new RuntimeException // cannot happen + }.recover { + case BadRequestException(message) => + HttpResponse(StatusCodes.BadRequest, entity = HttpEntity(message)) + } + } + } + } + } + + private def configureHttpRoute[F[_]](korolevServer: KorolevService[F])(implicit mat: Materializer, async: Effect[F], ec: ExecutionContext): Route = + extractUnmatchedPath { path => + extractRequest { request => + val sink = Sink.korolevStream[F, Bytes] + val body = + if (request.method == HttpMethods.GET) { + Stream.empty[F, Bytes] + } else { + request + .entity + .dataBytes + .map(Bytes.wrap(_)) + .toMat(sink)(Keep.right) + .run() + } + val korolevRequest = mkKorolevRequest(request, path.toString, body) + val responseF = handleHttpResponse(korolevServer, korolevRequest) + complete(responseF) + } + } + + private def mkKorolevRequest[F[_], Body](request: HttpRequest, + path: String, + body: Body): KorolevRequest[Body] = + KorolevRequest( + pq = PathAndQuery.fromString(path).withParams(request.uri.rawQueryString), + method = KorolevRequest.Method.fromString(request.method.value), + contentLength = request.headers.find(_.is("content-length")).map(_.value().toLong), + renderedCookie = request.headers.find(_.is("cookie")).map(_.value()).getOrElse(""), + headers = { + val contentType = request.entity.contentType + val contentTypeHeaders = + if (contentType.mediaType.isMultipart) Seq("content-type" -> contentType.toString) else Seq.empty + request.headers.map(h => (h.name(), h.value())) ++ contentTypeHeaders + }, + body = body + ) + + private def handleHttpResponse[F[_]: Effect](korolevServer: KorolevService[F], + korolevRequest: KorolevHttpRequest[F])(implicit ec: ExecutionContext): Future[HttpResponse] = + Effect[F].toFuture(korolevServer.http(korolevRequest)).map { + case response @ KorolevResponse(status, body, responseHeaders, _) => + val (contentTypeOpt, otherHeaders) = getContentTypeAndResponseHeaders(responseHeaders) + val bytesSource = body.asPekkoSource.map(_.as[ByteString]) + HttpResponse( + StatusCode.int2StatusCode(status.code), + otherHeaders, + response.contentLength match { + case Some(bytesLength) => HttpEntity(contentTypeOpt.getOrElse(ContentTypes.NoContentType), bytesLength, bytesSource) + case None => HttpEntity(contentTypeOpt.getOrElse(ContentTypes.NoContentType), bytesSource) + } + ) + } + + private def getContentTypeAndResponseHeaders(responseHeaders: Seq[(String, String)]): (Option[ContentType], List[HttpHeader]) = { + val headers = responseHeaders.map { case (name, value) => + HttpHeader.parse(name, value) match { + case HttpHeader.ParsingResult.Ok(header, _) => header + case _ => RawHeader(name, value) + } + } + val (contentTypeHeaders, otherHeaders) = headers.partition(_.lowercaseName() == "content-type") + val contentTypeOpt = contentTypeHeaders.headOption.flatMap(h => ContentType.parse(h.value()).right.toOption) + (contentTypeOpt, otherHeaders.toList) + } +} diff --git a/interop/pekko/src/main/scala/korolev/pekko/util/Countdown.scala b/interop/pekko/src/main/scala/korolev/pekko/util/Countdown.scala new file mode 100644 index 00000000..afd1aa7f --- /dev/null +++ b/interop/pekko/src/main/scala/korolev/pekko/util/Countdown.scala @@ -0,0 +1,66 @@ +package korolev.pekko.util + +import korolev.effect.Effect + +import java.util.concurrent.atomic.AtomicReference +import scala.annotation.tailrec + +final class Countdown[F[_]: Effect] { + + private final val state = new AtomicReference(Countdown.State(None, 0)) + + def value(): F[Long] = Effect[F].delay(unsafeValue) + + def add(n: Int): F[Unit] = Effect[F].delay(unsafeAdd(n)) + + def decOrLock(): F[Unit] = Effect[F].promise[Unit] { cb => + @tailrec + def aux(): Unit = { + val ref = state.get() + if (ref.n == 0) { + val newValue = ref.copy(pending = Some(cb)) + if (!state.compareAndSet(ref, newValue)) { + aux() + } + } else { + val newValue = ref.copy(n = ref.n - 1) + if (state.compareAndSet(ref, newValue)) { + cb(Countdown.unitToken) + } else { + aux() + } + } + } + aux() + } + + def unsafeAdd(x: Long): Unit = { + // x should be positive + if (x > 0) { + @tailrec + def aux(): Unit = { + val ref = state.get() + ref.pending match { + case Some(cb) => + if (state.compareAndSet(ref, Countdown.State(pending = None, n = ref.n + x - 1))) { + cb(Countdown.unitToken) + } else { + aux() + } + case None => + if (!state.compareAndSet(ref, ref.copy(n = ref.n + x))) { + aux() + } + } + } + aux() + } + } + + def unsafeValue: Long = state.get().n +} + +object Countdown { + val unitToken = Right(()) + case class State(pending: Option[Effect.Promise[Unit]], n: Long) +} diff --git a/interop/pekko/src/main/scala/korolev/pekko/util/KorolevStreamPublisher.scala b/interop/pekko/src/main/scala/korolev/pekko/util/KorolevStreamPublisher.scala new file mode 100644 index 00000000..fabfcb2b --- /dev/null +++ b/interop/pekko/src/main/scala/korolev/pekko/util/KorolevStreamPublisher.scala @@ -0,0 +1,91 @@ +/* + * Copyright 2017-2020 Aleksey Fomkin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package korolev.pekko.util + +import korolev.pekko.util.KorolevStreamPublisher.MultipleSubscribersProhibitedException +import korolev.effect.syntax._ +import korolev.effect.{Effect, Hub, Reporter, Stream} +import org.reactivestreams.{Publisher, Subscriber, Subscription} + +import scala.concurrent.ExecutionContext + +final class KorolevStreamPublisher[F[_] : Effect, T](stream: Stream[F, T], + fanout: Boolean) + (implicit ec: ExecutionContext) extends Publisher[T] { + + private implicit val reporter: Reporter = Reporter.PrintReporter + + private var hasActualSubscriber = false + + private val hub = + if (fanout) Hub(stream) + else null + + private final class StreamSubscription(stream: Stream[F, T], + subscriber: Subscriber[_ >: T]) extends Subscription { + + private val countdown = new Countdown[F]() + + private def loop(): F[Unit] = + for { + _ <- countdown.decOrLock() + maybeItem <- stream.pull() + _ <- maybeItem match { + case Some(item) => + subscriber.onNext(item) + loop() + case None => + subscriber.onComplete() + Effect[F].unit + } + } yield () + + loop().runAsync { + case Left(error) => subscriber.onError(error) + case Right(_) => () + } + + def request(n: Long): Unit = { + countdown.unsafeAdd(n) + } + + def cancel(): Unit = { + stream + .cancel() + .runAsyncForget + } + } + + def subscribe(subscriber: Subscriber[_ >: T]): Unit = { + if (hub != null) { + hub.newStream().runAsyncSuccess { newStream => + val subscription = new StreamSubscription(newStream, subscriber) + subscriber.onSubscribe(subscription) + } + } else { + if (hasActualSubscriber) + throw MultipleSubscribersProhibitedException() + subscriber.onSubscribe(new StreamSubscription(stream, subscriber)) + } + hasActualSubscriber = true + } +} + +object KorolevStreamPublisher { + final case class MultipleSubscribersProhibitedException() + extends Exception("Multiple subscribers prohibited for this KorolevStreamPublisher") +} diff --git a/interop/pekko/src/main/scala/korolev/pekko/util/KorolevStreamSubscriber.scala b/interop/pekko/src/main/scala/korolev/pekko/util/KorolevStreamSubscriber.scala new file mode 100644 index 00000000..a28653a6 --- /dev/null +++ b/interop/pekko/src/main/scala/korolev/pekko/util/KorolevStreamSubscriber.scala @@ -0,0 +1,72 @@ +/* + * Copyright 2017-2020 Aleksey Fomkin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package korolev.pekko.util + +import korolev.effect.{Effect, Stream} +import org.reactivestreams.{Subscriber, Subscription} + +final class KorolevStreamSubscriber[F[_]: Effect,T] extends Stream[F, T] with Subscriber[T] { + + @volatile private var subscription: Subscription = _ + + @volatile private var pullCallback: Either[Throwable, Option[T]] => Unit = _ + + @volatile private var completeValue: Either[Throwable, None.type] = _ + + def onSubscribe(subscription: Subscription): Unit = { + this.subscription = subscription + if (pullCallback != null) + subscription.request(1) + } + + def onNext(value: T): Unit = { + val cb = pullCallback + pullCallback = null + cb(Right(Some(value))) + } + + def onError(error: Throwable): Unit = { + completeWith(Left(error)) + } + + def onComplete(): Unit = { + completeWith(Right(None)) + } + + private def completeWith(that: Either[Throwable, None.type]): Unit = { + completeValue = that + if (pullCallback != null) { + val cb = pullCallback + pullCallback = null + cb(that) + } + } + + def pull(): F[Option[T]] = Effect[F].promise { cb => + if (completeValue == null) { + pullCallback = cb + if (subscription != null) { + subscription.request(1) + } + } else { + cb(completeValue) + } + } + + def cancel(): F[Unit] = + Effect[F].delay(subscription.cancel()) +} diff --git a/interop/pekko/src/main/scala/korolev/pekko/util/LoggingReporter.scala b/interop/pekko/src/main/scala/korolev/pekko/util/LoggingReporter.scala new file mode 100644 index 00000000..d69d5cdb --- /dev/null +++ b/interop/pekko/src/main/scala/korolev/pekko/util/LoggingReporter.scala @@ -0,0 +1,40 @@ +/* + * Copyright 2017-2020 Aleksey Fomkin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package korolev.pekko.util + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.event.{LogSource, Logging} +import korolev.effect.Reporter + +final class LoggingReporter(actorSystem: ActorSystem) extends Reporter { + + private implicit val logSource: LogSource[LoggingReporter] = new LogSource[LoggingReporter] { + def genString(t: LoggingReporter): String = "korolev" + } + + private val log = Logging(actorSystem, this) + + def error(message: String, cause: Throwable): Unit = log.error(cause, message) + def error(message: String): Unit = log.error(message) + def warning(message: String, cause: Throwable): Unit = log.warning(s"$message: {}", cause) + def warning(message: String): Unit = log.warning(message) + def info(message: String): Unit = log.info(message) + def debug(message: String): Unit = log.debug(message) + def debug(message: String, arg1: Any): Unit = log.debug(message, arg1) + def debug(message: String, arg1: Any, arg2: Any): Unit = log.debug(message, arg1, arg2) + def debug(message: String, arg1: Any, arg2: Any, arg3: Any): Unit = log.debug(message, arg1, arg2, arg3) +} diff --git a/interop/pekko/src/main/scala/korolev/pekko/util/PekkoByteStringBytesLike.scala b/interop/pekko/src/main/scala/korolev/pekko/util/PekkoByteStringBytesLike.scala new file mode 100644 index 00000000..c17039be --- /dev/null +++ b/interop/pekko/src/main/scala/korolev/pekko/util/PekkoByteStringBytesLike.scala @@ -0,0 +1,86 @@ +package korolev.pekko.util + +import org.apache.pekko.util.ByteString +import korolev.data.BytesLike + +import java.nio.ByteBuffer +import java.nio.charset.{Charset, StandardCharsets} + +final class PekkoByteStringBytesLike extends BytesLike[ByteString] { + + val empty: ByteString = ByteString.empty + + def ascii(s: String): ByteString = ByteString.fromString(s, StandardCharsets.US_ASCII) + + def utf8(s: String): ByteString = ByteString.fromString(s, StandardCharsets.UTF_8) + + def wrapArray(bytes: Array[Byte]): ByteString = ByteString.fromArrayUnsafe(bytes) + + def copyBuffer(buffer: ByteBuffer): ByteString = ByteString.fromByteBuffer(buffer) + + def copyToBuffer(b: ByteString, buffer: ByteBuffer): Int = b.copyToBuffer(buffer) + + def copyFromArray(bytes: Array[Byte]): ByteString = ByteString.fromArray(bytes) + + def copyFromArray(bytes: Array[Byte], offset: Int, size: Int): ByteString = ByteString.fromArray(bytes, offset, size) + + def copyToArray(value: ByteString, array: Array[Byte], sourceOffset: Int, targetOffset: Int, length: Int): Unit = + value.drop(sourceOffset).copyToArray(array, targetOffset, length) + + def asAsciiString(bytes: ByteString): String = + bytes.decodeString(StandardCharsets.US_ASCII) + + def asUtf8String(bytes: ByteString): String = + bytes.decodeString(StandardCharsets.UTF_8) + + def asString(bytes: ByteString, charset: Charset): String = + bytes.decodeString(charset) + + def asArray(bytes: ByteString): Array[Byte] = + bytes.toArray + + def asBuffer(bytes: ByteString): ByteBuffer = + bytes.asByteBuffer + + def eq(l: ByteString, r: ByteString): Boolean = + l.equals(r) + + def get(bytes: ByteString, i: Long): Byte = + bytes(i.toInt) + + def length(bytes: ByteString): Long = + bytes.length.toLong + + def concat(left: ByteString, right: ByteString): ByteString = + left.concat(right) + + def slice(bytes: ByteString, start: Long, end: Long): ByteString = + bytes.slice(start.toInt, end.toInt) + + def mapWithIndex(bytes: ByteString, f: (Byte, Long) => Byte): ByteString = { + var i = 0 + bytes.map { x => + val res = f(x, i) + i += 1 + res + } + } + + def indexOf(where: ByteString, that: Byte): Long = + where.indexOf(that) + + def indexOf(where: ByteString, that: Byte, from: Long): Long = + where.indexOf(that, from.toInt) + + def lastIndexOf(where: ByteString, that: Byte): Long = + where.lastIndexOf(that) + + def indexOfSlice(where: ByteString, that: ByteString): Long = + where.indexOfSlice(that) + + def lastIndexOfSlice(where: ByteString, that: ByteString): Long = + where.lastIndexOfSlice(that) + + def foreach(bytes: ByteString, f: Byte => Unit): Unit = + bytes.foreach(f) +} diff --git a/interop/pekko/src/test/scala/CountdownSpec.scala b/interop/pekko/src/test/scala/CountdownSpec.scala new file mode 100644 index 00000000..c2da4862 --- /dev/null +++ b/interop/pekko/src/test/scala/CountdownSpec.scala @@ -0,0 +1,96 @@ +import korolev.pekko.util.Countdown +import korolev.effect.Effect.FutureEffect +import korolev.effect.syntax._ +import org.scalatest.freespec.AsyncFreeSpec + +import java.util.concurrent.atomic.AtomicReference +import scala.concurrent.Future + +class CountdownSpec extends AsyncFreeSpec { + + private case class Promise() extends Exception + + class ThrowOnPromise(var switch: Boolean = false) extends FutureEffect { + + def toggle(): Future[Unit] = Future.successful { + switch = !switch + } + + override def promise[A](cb: (Either[Throwable, A] => Unit) => Unit): Future[A] = { + var released = false + val p = super.promise { (f: Either[Throwable, A] => Unit) => + cb { x => + released = true + f(x) + } + } + if (switch) { + if (released) p + else Future.failed(Promise()) + } else { + p + } + } + } + + "decOrLock should lock when count is 0" in recoverToSucceededIf[Promise] { + val countdown = new Countdown[Future]()(new ThrowOnPromise(switch = true)) + for { + _ <- countdown.add(3) + _ <- countdown.decOrLock() + _ <- countdown.decOrLock() + _ <- countdown.decOrLock() + _ <- countdown.decOrLock() // should lock + } yield () + } + + "decOrLock should not lock until count > 0" in { + val countdown = new Countdown[Future]()(new ThrowOnPromise) + for { + _ <- countdown.add(3) + _ <- countdown.decOrLock() + _ <- countdown.decOrLock() + _ <- countdown.decOrLock() + } yield succeed + } + + "add should release taken lock" in { + val effectInstance = new ThrowOnPromise(switch = false) + val countdown = new Countdown[Future]()(effectInstance) + val result = new AtomicReference[String]("") + for { + _ <- countdown.add(3) + _ <- countdown.decOrLock() + _ <- countdown.decOrLock() + _ <- countdown.decOrLock() + fiber <- countdown + .decOrLock() // should lock + .map(_ => result.getAndUpdate(_ + " world")) + .start + _ = result.set("hello") + _ <- countdown.add(3) + _ <- fiber.join() + } yield assert(result.get == "hello world") + } + + "count the lock released by the add invocation" in recoverToSucceededIf[Promise] { + val effectInstance = new ThrowOnPromise(switch = false) + val countdown = new Countdown[Future]()(effectInstance) + for { + _ <- countdown.add(3) + _ <- countdown.decOrLock() + _ <- countdown.decOrLock() + _ <- countdown.decOrLock() + fiber <- countdown + .decOrLock() // should lock + .start + _ <- countdown.add(3) + _ <- fiber.join() + _ <- effectInstance.toggle() + _ <- countdown.decOrLock() + _ <- countdown.decOrLock() + _ <- countdown.decOrLock() // should lock + } yield () + } + +}