From dc38cef2cdd07321cfc688a5b66c76e0cea7ef4b Mon Sep 17 00:00:00 2001 From: Maxim Schuwalow Date: Wed, 11 Oct 2023 21:02:06 +0200 Subject: [PATCH 01/14] Add Pipeline conversions and rewrite internals with channels --- .../interop/reactivestreams/Adapters.scala | 497 +++++++++--------- .../zio/interop/reactivestreams/package.scala | 16 +- .../PipelineToProcessorSpec.scala | 78 +++ .../ProcessorToPipelineSpec.scala | 84 +++ .../PublisherToStreamSpec.scala | 1 + .../SinkToSubscriberSpec.scala | 2 +- .../StreamToPublisherSpec.scala | 15 +- 7 files changed, 440 insertions(+), 253 deletions(-) create mode 100644 zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/PipelineToProcessorSpec.scala create mode 100644 zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/ProcessorToPipelineSpec.scala diff --git a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala index c0859cf..16bc3e6 100644 --- a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala +++ b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala @@ -3,34 +3,32 @@ package zio.interop.reactivestreams import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import org.reactivestreams.Subscription +import org.reactivestreams.Processor import zio._ import zio.Unsafe._ import zio.internal.RingBuffer import zio.stream._ -import zio.stream.ZStream.Pull -import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference +import zio.stream.internal.AsyncInputConsumer +import zio.stream.internal.AsyncInputProducer +import zio.UIO +import java.util.concurrent.atomic.AtomicBoolean object Adapters { def streamToPublisher[R, E <: Throwable, O]( stream: => ZStream[R, E, O] )(implicit trace: Trace): ZIO[R, Nothing, Publisher[O]] = - ZIO.runtime.map { runtime => subscriber => + ZIO.runtime[R].map { runtime => subscriber => if (subscriber == null) { throw new NullPointerException("Subscriber must not be null.") } else unsafe { implicit unsafe => - val subscription = new DemandTrackingSubscription(subscriber) + val subscription = new SubscriptionProducer[O](subscriber) + subscriber.onSubscribe(subscription) runtime.unsafe.fork( - for { - _ <- ZIO.succeed(subscriber.onSubscribe(subscription)) - _ <- stream - .run(demandUnfoldSink(subscriber, subscription)) - .catchAll(e => ZIO.succeed(subscriber.onError(e))) - .forkDaemon - } yield () + (stream.toChannel >>> ZChannel.fromZIO(subscription.awaitCancellation).embedInput(subscription)).runDrain ) () } @@ -38,269 +36,290 @@ object Adapters { def subscriberToSink[E <: Throwable, I]( subscriber: => Subscriber[I] - )(implicit trace: Trace): ZIO[Scope, Nothing, (E => UIO[Unit], ZSink[Any, Nothing, I, I, Unit])] = + ): ZIO[Scope, Nothing, (E => UIO[Unit], ZSink[Any, Nothing, I, I, Unit])] = ZIO.succeed { unsafe { implicit unsafe => - val sub = subscriber - for { - error <- Promise.make[E, Nothing] - subscription = new DemandTrackingSubscription(sub) - _ <- ZIO.succeed(sub.onSubscribe(subscription)) - fiber <- error.await.interruptible.catchAll(t => ZIO.succeed(sub.onError(t))).forkScoped - } yield (error.fail(_) *> fiber.join, demandUnfoldSink(sub, subscription)) + val subscription = new SubscriptionProducer[I](subscriber) + + def reader: ZChannel[Any, ZNothing, Chunk[I], Any, Nothing, Chunk[I], Unit] = ZChannel.readWith( + i => ZChannel.fromZIO(subscription.emit(i)) *> reader, + _ => ???, // impossible + d => ZChannel.fromZIO(subscription.done(d)) *> ZChannel.succeed(()) + ) + + subscriber.onSubscribe(subscription) + ((e: E) => subscription.error(Cause.fail(e)).sandbox.ignore, ZSink.fromChannel(reader)) } + } def publisherToStream[O]( publisher: => Publisher[O], bufferSize: => Int - )(implicit trace: Trace): ZStream[Any, Throwable, O] = { - - val pullOrFail = - for { - subscriberP <- makeSubscriber[O](bufferSize) - (subscriber, p) = subscriberP - _ <- ZIO.acquireRelease(ZIO.succeed(publisher.subscribe(subscriber)))(_ => ZIO.succeed(subscriber.interrupt())) - subQ <- p.await - (sub, q) = subQ - process <- process(sub, q, () => subscriber.await(), () => subscriber.isDone) - } yield process - val pull = pullOrFail.catchAll(e => ZIO.succeed(Pull.fail(e))) - fromPull[Any, Throwable, O](pull) + )(implicit trace: Trace): ZStream[Any, Throwable, O] = ZStream.unwrapScoped { + val subscribe = ZIO.succeed { + val subscriber = new SubscriberConsumer[O](bufferSize)(unsafe) + publisher.subscribe(subscriber) + subscriber + } + + subscribe + .tap(s => ZIO.addFinalizer(s.cancelSubscription.forkDaemon)) + .uninterruptible + .map(c => ZStream.fromChannel(ZChannel.fromInput(c))) } def sinkToSubscriber[R, I, L, Z]( sink: => ZSink[R, Throwable, I, L, Z], bufferSize: => Int - )(implicit trace: Trace): ZIO[R with Scope, Throwable, (Subscriber[I], IO[Throwable, Z])] = - for { - subscriberP <- makeSubscriber[I](bufferSize) - (subscriber, p) = subscriberP - pull = p.await.flatMap { case (subscription, q) => - process(subscription, q, () => subscriber.await(), () => subscriber.isDone, bufferSize) - } - .catchAll(e => ZIO.succeedNow(Pull.fail(e))) - fiber <- fromPull(pull).run(sink).forkScoped - } yield (subscriber, fiber.join) - - private def process[A]( - sub: Subscription, - q: RingBuffer[A], - await: () => IO[Option[Throwable], Unit], - isDone: () => Boolean, - maxChunkSize: Int = Int.MaxValue - ): ZIO[Scope, Nothing, ZIO[Any, Option[Throwable], Chunk[A]]] = + )(implicit trace: Trace): ZIO[R with Scope, Throwable, (Subscriber[I], IO[Throwable, Z])] = { + val subscriber = new SubscriberConsumer[I](bufferSize)(unsafe) for { - _ <- ZIO.succeed(sub.request(q.capacity.toLong)) - requestedRef <- Ref.make(q.capacity.toLong) // TODO: maybe turn into unfold? - } yield { - def pull: Pull[Any, Throwable, A] = - for { - requested <- requestedRef.get - pollSize = Math.min(requested, maxChunkSize.toLong).toInt - chunk <- ZIO.succeed(q.pollUpTo(pollSize)) - r <- - if (chunk.isEmpty) - await() *> pull - else - (if (chunk.size == pollSize && !isDone()) - ZIO.succeed(sub.request(q.capacity.toLong)) *> requestedRef.set(q.capacity.toLong) - else requestedRef.set(requested - chunk.size)) *> - Pull.emit(chunk) - } yield r - - pull - } - - private trait InterruptibleSubscriber[A] extends Subscriber[A] { - def interrupt(): Unit - def await(): IO[Option[Throwable], Unit] - def isDone: Boolean + _ <- ZIO.addFinalizer(subscriber.cancelSubscription.forkDaemon) + sinkFiber <- (ZChannel.fromInput(subscriber) pipeToOrFail sink.channel).runDrain.forkScoped + } yield (subscriber, sinkFiber.join) } - private def makeSubscriber[A]( - capacity: Int - ): ZIO[ - Scope, - Nothing, - ( - InterruptibleSubscriber[A], - Promise[Throwable, (Subscription, RingBuffer[A])] - ) - ] = - for { - q <- ZIO.succeed(RingBuffer[A](capacity)) - p <- ZIO.acquireRelease( - Promise - .make[Throwable, (Subscription, RingBuffer[A])] - )( - _.poll.flatMap(_.fold(ZIO.unit)(_.foldZIO(_ => ZIO.unit, { case (sub, _) => ZIO.succeed(sub.cancel()) }))) - ) - } yield unsafe { implicit unsafe => - val subscriber = - new InterruptibleSubscriber[A] { - - val isSubscribedOrInterrupted = new AtomicBoolean - @volatile - var done: Option[Option[Throwable]] = None - @volatile - var toNotify: Option[Promise[Option[Throwable], Unit]] = None - - override def interrupt(): Unit = - isSubscribedOrInterrupted.set(true) - - override def await(): IO[Option[Throwable], Unit] = - done match { - case Some(value) => - if (q.isEmpty()) ZIO.fail(value) else ZIO.unit - case None => - val p = Promise.unsafe.make[Option[Throwable], Unit](FiberId.None) - toNotify = Some(p) - // An element has arrived in the meantime, we do not need to start waiting. - if (!q.isEmpty()) { - toNotify = None - ZIO.unit - } else - done.fold(p.await) { e => - // The producer has canceled or errored in the meantime. - toNotify = None - if (q.isEmpty()) ZIO.fail(e) else ZIO.unit - } - } + def processorToPipeline[I, O]( + processor: Processor[I, O], + bufferSize: Int = 16 + )(implicit trace: Trace): ZPipeline[Any, Throwable, I, O] = ZPipeline.unwrapScoped { + val subscription = new SubscriptionProducer[I](processor)(unsafe) + val subscriber = new SubscriberConsumer[O](bufferSize)(unsafe) - override def isDone: Boolean = done.isDefined + for { + _ <- ZIO.succeed(processor.onSubscribe(subscription)) + _ <- ZIO.succeed(processor.subscribe(subscriber)) + _ <- ZIO.addFinalizer(subscriber.cancelSubscription) + _ <- ZIO.addFinalizer(ZIO.succeed(subscription.cancel())) + } yield ZPipeline.fromChannel(ZChannel.fromInput(subscriber).embedInput(subscription)) + } - override def onSubscribe(s: Subscription): Unit = - if (s == null) { - val e = new NullPointerException("s was null in onSubscribe") - p.unsafe.done(ZIO.fail(e)) - throw e - } else { - val shouldCancel = isSubscribedOrInterrupted.getAndSet(true) - if (shouldCancel) - s.cancel() - else - p.unsafe.done(ZIO.succeedNow((s, q))) - } + def pipelineToProcessor[R <: Scope, I, O]( + pipeline: ZPipeline[R, Throwable, I, O], + bufferSize: Int = 16 + )(implicit trace: Trace): ZIO[R, Nothing, Processor[I, O]] = ZIO.scoped[R] { + for { + runtime <- ZIO.runtime[R] + subscriber = new SubscriberConsumer[I](bufferSize)(unsafe) + } yield new Processor[I, O] { + def onSubscribe(s: Subscription): Unit = subscriber.onSubscribe(s) - override def onNext(t: A): Unit = - if (t == null) { - failNPE("t was null in onNext") - } else { - q.offer(t) - toNotify.foreach(_.unsafe.done(ZIO.unit)) - } + def onNext(t: I): Unit = subscriber.onNext(t) - override def onError(e: Throwable): Unit = - if (e == null) - failNPE("t was null in onError") - else - fail(e) + def onError(t: Throwable): Unit = subscriber.onError(t) - override def onComplete(): Unit = { - done = Some(None) - toNotify.foreach(_.unsafe.done(ZIO.fail(None))) - } + def onComplete(): Unit = subscriber.onComplete() - private def failNPE(msg: String) = { - val e = new NullPointerException(msg) - fail(e) - throw e + def subscribe(s: Subscriber[_ >: O <: Object]): Unit = { + val subscription = new SubscriptionProducer[O](s)(unsafe) + s.onSubscribe(subscription) + unsafe { implicit u => + runtime.unsafe.fork { + ((ZChannel.fromInput(subscriber) pipeToOrFail pipeline.toChannel) >>> ZChannel + .fromZIO(subscription.awaitCancellation *> subscriber.cancelSubscription) + .embedInput(subscription)).runDrain } + () + } + } - private def fail(e: Throwable) = { - done = Some(Some(e)) - toNotify.foreach(_.unsafe.done(ZIO.fail(Some(e)))) - } + } + } + private class SubscriptionProducer[A](sub: Subscriber[_ >: A])(implicit unsafe: Unsafe) + extends Subscription + with AsyncInputProducer[Throwable, Chunk[A], Any] { + import SubscriptionProducer.State + + private val state: AtomicReference[State[A]] = new AtomicReference(State.initial[A]) + private val canceled: Promise[Nothing, Unit] = Promise.unsafe.make(FiberId.None) + + val awaitCancellation: UIO[Unit] = canceled.await + + def request(n: Long): Unit = + if (n <= 0) sub.onError(new IllegalArgumentException("non-positive subscription request")) + else { + state.getAndUpdate { + case State.Running(demand) => State.Running(demand + n) + case State.Waiting(resume) => State.Running(n) + case other => other + } match { + case State.Waiting(resume) => resume.unsafe.done(ZIO.unit) + case _ => () } + } - (subscriber, p) + def cancel(): Unit = { + state.getAndSet(State.Cancelled) match { + case State.Waiting(resume) => resume.unsafe.done(ZIO.interrupt) + case _ => () + } + canceled.unsafe.done(ZIO.unit) } - private def demandUnfoldSink[I]( - subscriber: Subscriber[I], - subscription: DemandTrackingSubscription - ): ZSink[Any, Nothing, I, I, Unit] = - ZSink - .foldChunksZIO[Any, Nothing, I, Boolean](true)(identity) { (_, chunk) => - ZIO - .iterate(chunk)(!_.isEmpty) { chunk => - subscription - .offer(chunk.size) - .flatMap { acceptedCount => - ZIO - .foreach(chunk.take(acceptedCount))(a => ZIO.succeed(subscriber.onNext(a))) - .as(chunk.drop(acceptedCount)) - } + def emit(el: Chunk[A])(implicit trace: zio.Trace): UIO[Any] = ZIO.suspendSucceed { + if (el.isEmpty) ZIO.unit + else + ZIO.suspendSucceed { + state.getAndUpdate { + case State.Running(demand) => + if (demand > el.size) + State.Running(demand - el.size) + else + State.Waiting(Promise.unsafe.make[Nothing, Unit](FiberId.None)) + case other => other + } match { + case State.Waiting(resume) => + resume.await *> emit(el) + case State.Running(demand) => + if (demand > el.size) + ZIO.succeed(el.foreach(sub.onNext(_))) + else + ZIO.succeed(el.take(demand.toInt).foreach(sub.onNext(_))) *> emit(el.drop(demand.toInt)) + case State.Cancelled => + ZIO.interrupt } - .fold( - _ => false, // canceled - _ => true - ) + } + } + + def done(a: Any)(implicit trace: zio.Trace): UIO[Any] = ZIO.suspendSucceed { + state.getAndSet(State.Cancelled) match { + case State.Running(_) => ZIO.succeed(sub.onComplete()) *> canceled.succeed(()) + case State.Cancelled => ZIO.interrupt + case State.Waiting(resume) => ZIO.succeed(sub.onComplete()) *> resume.interrupt *> canceled.succeed(()) } - .map(_ => if (!subscription.isCanceled) subscriber.onComplete()) - - private class DemandTrackingSubscription(subscriber: Subscriber[_])(implicit val unsafe: Unsafe) - extends Subscription { - - private case class State( - requestedCount: Long, // -1 when cancelled - toNotify: Option[(Int, Promise[Unit, Int])] - ) - - private val initial = State(0L, None) - private val canceled = State(-1, None) - private def requested(n: Long) = State(n, None) - private def awaiting(n: Int, p: Promise[Unit, Int]) = State(0L, Some((n, p))) - - private val state = new AtomicReference(initial) - - def offer(n: Int): IO[Unit, Int] = { - var result: IO[Unit, Int] = null - state.updateAndGet { - case `canceled` => - result = ZIO.fail(()) - canceled - case State(0L, _) => - val p = Promise.unsafe.make[Unit, Int](FiberId.None) - result = p.await - awaiting(n, p) - case State(requestedCount, _) => - val newRequestedCount = Math.max(requestedCount - n, 0L) - val accepted = Math.min(requestedCount, n.toLong).toInt - result = ZIO.succeedNow(accepted) - requested(newRequestedCount) + } + + def error(cause: Cause[Throwable])(implicit trace: zio.Trace): UIO[Any] = ZIO.suspendSucceed { + state.getAndSet(State.Cancelled) match { + case State.Running(_) => + ZIO.succeed { + cause.failureOrCause.fold( + sub.onError, + c => sub.onError(new FiberFailure(c)) + ) + } *> canceled.succeed(()) + case State.Cancelled => ZIO.interrupt + case State.Waiting(resume) => + ZIO.succeed { + cause.failureOrCause.fold( + sub.onError, + c => sub.onError(new FiberFailure(c)) + ) + } *> resume.interrupt *> canceled.succeed(()) } - result } - def isCanceled: Boolean = state.get().requestedCount < 0 - - override def request(n: Long): Unit = { - if (n <= 0) subscriber.onError(new IllegalArgumentException("non-positive subscription request")) - var notification: () => Unit = () => () - state.getAndUpdate { - case `canceled` => - canceled - case State(requestedCount, Some((offered, toNotify))) => - val newRequestedCount = requestedCount + n - val accepted = Math.min(offered.toLong, newRequestedCount) - val remaining = newRequestedCount - accepted - notification = () => toNotify.unsafe.done(ZIO.succeedNow(accepted.toInt)) - requested(remaining) - case State(requestedCount, _) if ((Long.MaxValue - n) > requestedCount) => - requested(requestedCount + n) - case _ => - requested(Long.MaxValue) + def awaitRead(implicit trace: zio.Trace): UIO[Any] = ZIO.unit + } + + private object SubscriptionProducer { + sealed trait State[+A] + object State { + def initial[A](implicit unsafe: Unsafe): State[A] = Waiting(Promise.unsafe.make[Nothing, Unit](FiberId.None)) + + final case class Waiting(resume: Promise[Nothing, Unit]) extends State[Nothing] + final case class Running(demand: Long) extends State[Nothing] + case object Cancelled extends State[Nothing] + } + } + + private class SubscriberConsumer[A](capacity: Int)(implicit unsafe: Unsafe) + extends Subscriber[A] + with AsyncInputConsumer[Throwable, Chunk[A], Any] { + import SubscriberConsumer.State + + private val subscription: Promise[Nothing, Subscription] = Promise.unsafe.make(FiberId.None) + private val buffer: RingBuffer[A] = RingBuffer(capacity) + private val state: AtomicReference[State] = new AtomicReference(State.Drained) + private val isSubscribed: AtomicBoolean = new AtomicBoolean(false) + + def onSubscribe(s: Subscription): Unit = + if (!isSubscribed.compareAndSet(false, true)) { + s.cancel() + } else { + subscription.unsafe.done(ZIO.succeedNow(s)) + s.request(buffer.capacity.toLong) + } + + def onNext(t: A): Unit = + if (t == null) { + throw new NullPointerException("t was null in onNext") + } else { + buffer.offer(t) + state.getAndUpdate { + case State.Drained => State.Full + case State.Waiting(_) => State.Full + case other => other + } match { + case State.Waiting(promise) => promise.unsafe.done(ZIO.unit) + case _ => () + } + } + + def onError(t: Throwable): Unit = + if (t == null) { + throw new NullPointerException("t was null in onError") + } else { + state.getAndSet(State.Failed(t)) match { + case State.Waiting(promise) => promise.unsafe.done(ZIO.unit) + case _ => () + } + } + + def onComplete(): Unit = + state.getAndSet(State.Ended) match { + case State.Waiting(promise) => promise.unsafe.done(ZIO.unit) + case _ => () } - notification() + + def cancelSubscription: UIO[Unit] = + subscription.await.tap(s => ZIO.succeed(s.cancel())).unit + + def takeWith[B](onError: Cause[Throwable] => B, onElement: Chunk[A] => B, onDone: Any => B)(implicit + trace: zio.Trace + ): UIO[B] = subscription.await.flatMap { sub => + ZIO.suspendSucceed { + state.updateAndGet { + case State.Drained => State.Waiting(Promise.unsafe.make[Nothing, Unit](FiberId.None)) + case State.Full => State.Drained + case other => other + } match { + case State.Drained => + val data = buffer.pollUpTo(buffer.capacity) + val dataSize = data.size + if (dataSize > 0) { + sub.request(data.size.toLong) + ZIO.succeedNow(onElement(data)) + } else { + ZIO.succeedNow(onElement(Chunk.empty)) + } + case State.Full => ??? // impossible + case State.Waiting(promise) => promise.await *> takeWith(onError, onElement, onDone) + case State.Failed(t) => + // drain remaining data before failing + val data = buffer.pollUpTo(buffer.capacity) + if (data.nonEmpty) ZIO.succeedNow(onElement(data)) else ZIO.succeedNow(onError(Cause.fail(t))) + case State.Ended => + // drain remaining data before failing + val data = buffer.pollUpTo(buffer.capacity) + if (data.nonEmpty) ZIO.succeedNow(onElement(data)) else ZIO.succeedNow(onDone(())) + } + } + } + } + + private object SubscriberConsumer { + + sealed trait State + + object State { + case object Drained extends State + case object Full extends State + final case class Waiting(promise: Promise[Nothing, Unit]) extends State + final case class Failed(cause: Throwable) extends State + case object Ended extends State } - override def cancel(): Unit = - state.getAndSet(canceled).toNotify.foreach { case (_, p) => p.unsafe.done(ZIO.fail(())) } } - private def fromPull[R, E, A](zio: ZIO[R with Scope, Nothing, ZIO[R, Option[E], Chunk[A]]])(implicit - trace: Trace - ): ZStream[R, E, A] = - ZStream.unwrapScoped[R](zio.map(pull => ZStream.repeatZIOChunkOption(pull))) } diff --git a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/package.scala b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/package.scala index 6ac7b4a..8912a12 100644 --- a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/package.scala +++ b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/package.scala @@ -2,9 +2,11 @@ package zio.interop import org.reactivestreams.Publisher import org.reactivestreams.Subscriber -import zio.{ Scope, UIO, Task, ZIO, Trace } +import zio.{ Scope, UIO, Task, ZIO, Trace, URIO } import zio.stream.ZSink import zio.stream.ZStream +import org.reactivestreams.Processor +import zio.stream.ZPipeline package object reactivestreams { @@ -59,4 +61,16 @@ package object reactivestreams { Adapters.subscriberToSink(subscriber) } + final implicit class processorToPipeline[I, O](private val processor: Processor[I, O]) extends AnyVal { + + def toZIOPipeline(implicit trace: Trace): ZPipeline[Any, Throwable, I, O] = + Adapters.processorToPipeline(processor) + } + + final implicit class pipelineToProcessor[R <: Scope, I, O](private val pipeline: ZPipeline[R, Throwable, I, O]) + extends AnyVal { + + def toProcessor(implicit trace: Trace): URIO[R, Processor[I, O]] = + Adapters.pipelineToProcessor(pipeline) + } } diff --git a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/PipelineToProcessorSpec.scala b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/PipelineToProcessorSpec.scala new file mode 100644 index 0000000..bfb19a8 --- /dev/null +++ b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/PipelineToProcessorSpec.scala @@ -0,0 +1,78 @@ +package zio.interop.reactivestreams + +import zio.test.Assertion._ +import zio.test._ +import org.reactivestreams.tck.IdentityProcessorVerification +import org.testng.annotations.Test +import zio._ +import zio.stream.ZPipeline +import org.reactivestreams.tck +import zio.Unsafe.unsafe +import org.reactivestreams.Processor +import java.util.concurrent.Executors +import java.lang.reflect.InvocationTargetException +import org.testng.SkipException + +object PipelineToProcessorSpec extends ZIOSpecDefault { + + override def spec = + suite("Converting a `Pipeline` to a `Processor`")( + suite("passes all required and optional TCK tests")( + tckTests: _* + ) + ) + + val managedVerification = + for { + runtime <- ZIO.runtime[Scope] + executor <- ZIO.succeed(Executors.newFixedThreadPool(4)) + _ <- ZIO.addFinalizer(ZIO.succeed(executor.shutdown())) + env = new tck.TestEnvironment(1000, 500) + ver = new IdentityProcessorVerification[Int](env) { + override def createIdentityProcessor( + bufferSize: Int + ): Processor[Int, Int] = + unsafe { implicit u => + runtime.unsafe.run(Adapters.pipelineToProcessor(ZPipeline.identity[Int], bufferSize)).getOrThrow() + } + + override def createElement(n: Int): Int = n + + override def createFailedPublisher() = null + + override def publisherExecutorService() = executor + + override def maxSupportedSubscribers() = 1 + + override def boundedDepthOfOnNextAndRequestRecursion() = 1 + } + _ <- ZIO.succeed(ver.setUp()) + } yield ver + + val tckTests = + classOf[IdentityProcessorVerification[Int]] + .getMethods() + .toList + .filter { method => + method + .getAnnotations() + .exists(annotation => classOf[Test].isAssignableFrom(annotation.annotationType())) + } + .collect { + case method if method.getName().startsWith("untested") => + test(method.getName())(assert(())(anything)) @@ TestAspect.ignore + case method => + test(method.getName())( + ZIO.scoped[Any] { + for { + ver <- managedVerification + r <- ZIO + .attemptBlockingInterrupt(method.invoke(ver)) + .unit + .refineOrDie { case e: InvocationTargetException => e.getTargetException() } + .exit + } yield assert(r)(fails(isSubtype[SkipException](anything)) || succeeds(isUnit)) + } + ) + } +} diff --git a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/ProcessorToPipelineSpec.scala b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/ProcessorToPipelineSpec.scala new file mode 100644 index 0000000..7f1876e --- /dev/null +++ b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/ProcessorToPipelineSpec.scala @@ -0,0 +1,84 @@ +package zio.interop.reactivestreams + +import zio.Chunk +import zio.UIO +import zio.ZIO +import zio.stream.ZStream +import zio.test.Assertion._ +import zio.test._ +import scala.collection.mutable.ListBuffer +import java.util.concurrent.SubmissionPublisher +import java.util.concurrent.Flow +import org.reactivestreams.FlowAdapters + +object ProcessorToPipelineSpec extends ZIOSpecDefault { + + override def spec = + suite("Converting a `Processor` to a `Pipeline`")( + test("works with a well behaved `Publisher`") { + val processor = new TestProcessor((i: Int) => i.toString()) + + val effect = ZStream(1, 2, 3, 4, 5).via(processor.asPipeline).runCollect + + for { + result <- effect + events <- processor.getEvents + } yield assert(result)(equalTo(Chunk("1", "2", "3", "4", "5"))) && + assert(events)( + equalTo( + List( + ProcessorEvent.OnSubscribe, + ProcessorEvent.OnNext(1), + ProcessorEvent.OnNext(2), + ProcessorEvent.OnNext(3), + ProcessorEvent.OnNext(4), + ProcessorEvent.OnNext(5), + ProcessorEvent.OnComplete + ) + ) + ) + } + ) @@ TestAspect.withLiveClock + + sealed trait ProcessorEvent[+A] + object ProcessorEvent { + final case object OnSubscribe extends ProcessorEvent[Nothing] + final case class OnNext[A](item: A) extends ProcessorEvent[A] + final case class OnError(error: Throwable) extends ProcessorEvent[Nothing] + final case object OnComplete extends ProcessorEvent[Nothing] + } + + final class TestProcessor[A, B](f: A => B) extends SubmissionPublisher[B] with Flow.Processor[A, B] { + + private var subscription: Flow.Subscription = null + private val events = ListBuffer[ProcessorEvent[A]]() + + def onSubscribe(subscription: Flow.Subscription): Unit = { + this.events.addOne(ProcessorEvent.OnSubscribe) + this.subscription = subscription; + println(subscription) + subscription.request(1); + } + + def onNext(item: A): Unit = { + this.events.addOne(ProcessorEvent.OnNext(item)) + submit(f(item)); + subscription.request(1); + } + + def onError(error: Throwable): Unit = { + this.events.addOne(ProcessorEvent.OnError(error)) + closeExceptionally(error); + } + + def onComplete(): Unit = { + this.events.addOne(ProcessorEvent.OnComplete) + close(); + } + + def getEvents: UIO[List[ProcessorEvent[A]]] = + ZIO.succeed(this.events.toList) + + def asPipeline = Adapters.processorToPipeline(FlowAdapters.toProcessor(this)) + } +} diff --git a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/PublisherToStreamSpec.scala b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/PublisherToStreamSpec.scala index bef3763..18404a5 100644 --- a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/PublisherToStreamSpec.scala +++ b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/PublisherToStreamSpec.scala @@ -193,6 +193,7 @@ object PublisherToStreamSpec extends ZIOSpecDefault { Adapters.publisherToStream(new NumberIterablePublisher(0, 1, executor.asJava), 16).runCount } .map(_.sum) + } yield assert(sum)(equalTo(10000L)) } ) diff --git a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/SinkToSubscriberSpec.scala b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/SinkToSubscriberSpec.scala index 9c735d0..553e923 100644 --- a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/SinkToSubscriberSpec.scala +++ b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/SinkToSubscriberSpec.scala @@ -4,7 +4,7 @@ import org.reactivestreams.{ Publisher, Subscriber, Subscription } import org.reactivestreams.tck.SubscriberWhiteboxVerification.{ SubscriberPuppet, WhiteboxSubscriberProbe } import org.reactivestreams.tck.{ SubscriberWhiteboxVerification, TestEnvironment } import org.testng.annotations.Test -import zio.{ Chunk, Promise, ZIO, durationInt, durationLong } +import zio.{ Promise, ZIO, durationInt, durationLong, Chunk } import zio.stream.ZSink import zio.test.Assertion._ import zio.test._ diff --git a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/StreamToPublisherSpec.scala b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/StreamToPublisherSpec.scala index a0d5a2e..0f6210a 100644 --- a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/StreamToPublisherSpec.scala +++ b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/StreamToPublisherSpec.scala @@ -10,6 +10,7 @@ import zio.test._ import java.lang.reflect.InvocationTargetException import zio.Unsafe +import org.testng.SkipException object StreamToPublisherSpec extends ZIOSpecDefault { override def spec = @@ -31,17 +32,7 @@ object StreamToPublisherSpec extends ZIOSpecDefault { .getOrThrowFiberFailure() } - override def createFailedPublisher(): Publisher[Int] = - Unsafe.unsafe { implicit unsafe => - runtime.unsafe - .run( - ZStream - .fail(new RuntimeException("boom!")) - .map(_.asInstanceOf[Int]) - .toPublisher - ) - .getOrThrowFiberFailure() - } + override def createFailedPublisher(): Publisher[Int] = null } val tests = @@ -67,7 +58,7 @@ object StreamToPublisherSpec extends ZIOSpecDefault { .unit .refineOrDie { case e: InvocationTargetException => e.getTargetException() } .exit - } yield assert(r)(succeeds(isUnit)) + } yield assert(r)(fails(isSubtype[SkipException](anything)) || succeeds(isUnit)) ) } } From edea4b63c73a8f4f250488f74f8bed4cf90d45a0 Mon Sep 17 00:00:00 2001 From: Maxim Schuwalow Date: Wed, 11 Oct 2023 21:03:45 +0200 Subject: [PATCH 02/14] Readd stray trace --- .../src/main/scala/zio/interop/reactivestreams/Adapters.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala index 16bc3e6..52e2bee 100644 --- a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala +++ b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala @@ -36,7 +36,7 @@ object Adapters { def subscriberToSink[E <: Throwable, I]( subscriber: => Subscriber[I] - ): ZIO[Scope, Nothing, (E => UIO[Unit], ZSink[Any, Nothing, I, I, Unit])] = ZIO.succeed { + )(implicit trace: Trace): ZIO[Scope, Nothing, (E => UIO[Unit], ZSink[Any, Nothing, I, I, Unit])] = ZIO.succeed { unsafe { implicit unsafe => val subscription = new SubscriptionProducer[I](subscriber) From dfde4513449cbcbdf3d806fbcdce2d1c51ba25c6 Mon Sep 17 00:00:00 2001 From: Maxim Schuwalow Date: Wed, 11 Oct 2023 21:28:44 +0200 Subject: [PATCH 03/14] Use finalizers where possible --- .../interop/reactivestreams/Adapters.scala | 55 +++++++++---------- 1 file changed, 26 insertions(+), 29 deletions(-) diff --git a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala index 52e2bee..039f85b 100644 --- a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala +++ b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala @@ -36,35 +36,35 @@ object Adapters { def subscriberToSink[E <: Throwable, I]( subscriber: => Subscriber[I] - )(implicit trace: Trace): ZIO[Scope, Nothing, (E => UIO[Unit], ZSink[Any, Nothing, I, I, Unit])] = ZIO.succeed { - unsafe { implicit unsafe => - val subscription = new SubscriptionProducer[I](subscriber) - - def reader: ZChannel[Any, ZNothing, Chunk[I], Any, Nothing, Chunk[I], Unit] = ZChannel.readWith( - i => ZChannel.fromZIO(subscription.emit(i)) *> reader, - _ => ???, // impossible - d => ZChannel.fromZIO(subscription.done(d)) *> ZChannel.succeed(()) - ) - - subscriber.onSubscribe(subscription) - ((e: E) => subscription.error(Cause.fail(e)).sandbox.ignore, ZSink.fromChannel(reader)) + )(implicit trace: Trace): ZIO[Scope, Nothing, (E => UIO[Unit], ZSink[Any, Nothing, I, I, Unit])] = + ZIO.suspendSucceed { + unsafe { implicit unsafe => + val subscription = new SubscriptionProducer[I](subscriber) + + def reader: ZChannel[Any, ZNothing, Chunk[I], Any, Nothing, Chunk[I], Unit] = ZChannel.readWith( + i => ZChannel.fromZIO(subscription.emit(i)) *> reader, + _ => ???, // impossible + d => ZChannel.fromZIO(subscription.done(d)) *> ZChannel.succeed(()) + ) + + for { + _ <- ZIO.acquireRelease(ZIO.succeed(subscriber.onSubscribe(subscription)))(_ => + ZIO.succeed(subscription.cancel()) + ) + } yield ((e: E) => subscription.error(Cause.fail(e)).sandbox.ignore, ZSink.fromChannel(reader)) + } } - } def publisherToStream[O]( publisher: => Publisher[O], bufferSize: => Int )(implicit trace: Trace): ZStream[Any, Throwable, O] = ZStream.unwrapScoped { - val subscribe = ZIO.succeed { - val subscriber = new SubscriberConsumer[O](bufferSize)(unsafe) - publisher.subscribe(subscriber) - subscriber - } + val subscriber = new SubscriberConsumer[O](bufferSize)(unsafe) - subscribe - .tap(s => ZIO.addFinalizer(s.cancelSubscription.forkDaemon)) - .uninterruptible - .map(c => ZStream.fromChannel(ZChannel.fromInput(c))) + for { + _ <- + ZIO.acquireRelease(ZIO.succeed(publisher.subscribe(subscriber)))(_ => subscriber.cancelSubscription.forkDaemon) + } yield ZStream.fromChannel(ZChannel.fromInput(subscriber)) } def sinkToSubscriber[R, I, L, Z]( @@ -86,17 +86,16 @@ object Adapters { val subscriber = new SubscriberConsumer[O](bufferSize)(unsafe) for { - _ <- ZIO.succeed(processor.onSubscribe(subscription)) - _ <- ZIO.succeed(processor.subscribe(subscriber)) - _ <- ZIO.addFinalizer(subscriber.cancelSubscription) - _ <- ZIO.addFinalizer(ZIO.succeed(subscription.cancel())) + _ <- + ZIO.acquireRelease(ZIO.succeed(processor.subscribe(subscriber)))(_ => subscriber.cancelSubscription.forkDaemon) + _ <- ZIO.acquireRelease(ZIO.succeed(processor.onSubscribe(subscription)))(_ => ZIO.succeed(subscription.cancel())) } yield ZPipeline.fromChannel(ZChannel.fromInput(subscriber).embedInput(subscription)) } def pipelineToProcessor[R <: Scope, I, O]( pipeline: ZPipeline[R, Throwable, I, O], bufferSize: Int = 16 - )(implicit trace: Trace): ZIO[R, Nothing, Processor[I, O]] = ZIO.scoped[R] { + )(implicit trace: Trace): ZIO[R, Nothing, Processor[I, O]] = for { runtime <- ZIO.runtime[R] subscriber = new SubscriberConsumer[I](bufferSize)(unsafe) @@ -121,9 +120,7 @@ object Adapters { () } } - } - } private class SubscriptionProducer[A](sub: Subscriber[_ >: A])(implicit unsafe: Unsafe) extends Subscription From a312edc6686ad8d841f98af799382fb856103a9d Mon Sep 17 00:00:00 2001 From: Maxim Schuwalow Date: Wed, 11 Oct 2023 21:31:54 +0200 Subject: [PATCH 04/14] underscore unused --- .../src/main/scala/zio/interop/reactivestreams/Adapters.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala index 039f85b..407a051 100644 --- a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala +++ b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala @@ -108,7 +108,7 @@ object Adapters { def onComplete(): Unit = subscriber.onComplete() - def subscribe(s: Subscriber[_ >: O <: Object]): Unit = { + def subscribe(s: Subscriber[_ >: O]): Unit = { val subscription = new SubscriptionProducer[O](s)(unsafe) s.onSubscribe(subscription) unsafe { implicit u => @@ -137,7 +137,7 @@ object Adapters { else { state.getAndUpdate { case State.Running(demand) => State.Running(demand + n) - case State.Waiting(resume) => State.Running(n) + case State.Waiting(_) => State.Running(n) case other => other } match { case State.Waiting(resume) => resume.unsafe.done(ZIO.unit) From 5fba6b5fc07daca6172e6bc8175f6b376f057e0a Mon Sep 17 00:00:00 2001 From: Maxim Schuwalow Date: Wed, 11 Oct 2023 21:52:55 +0200 Subject: [PATCH 05/14] add type anotations --- .../reactivestreams/ProcessorToPipelineSpec.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/ProcessorToPipelineSpec.scala b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/ProcessorToPipelineSpec.scala index 7f1876e..a3ed3a1 100644 --- a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/ProcessorToPipelineSpec.scala +++ b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/ProcessorToPipelineSpec.scala @@ -54,31 +54,30 @@ object ProcessorToPipelineSpec extends ZIOSpecDefault { private val events = ListBuffer[ProcessorEvent[A]]() def onSubscribe(subscription: Flow.Subscription): Unit = { - this.events.addOne(ProcessorEvent.OnSubscribe) + this.events += ProcessorEvent.OnSubscribe this.subscription = subscription; - println(subscription) subscription.request(1); } def onNext(item: A): Unit = { - this.events.addOne(ProcessorEvent.OnNext(item)) + this.events += ProcessorEvent.OnNext(item) submit(f(item)); subscription.request(1); } def onError(error: Throwable): Unit = { - this.events.addOne(ProcessorEvent.OnError(error)) + this.events += ProcessorEvent.OnError(error) closeExceptionally(error); } def onComplete(): Unit = { - this.events.addOne(ProcessorEvent.OnComplete) + this.events += ProcessorEvent.OnComplete close(); } def getEvents: UIO[List[ProcessorEvent[A]]] = ZIO.succeed(this.events.toList) - def asPipeline = Adapters.processorToPipeline(FlowAdapters.toProcessor(this)) + def asPipeline = Adapters.processorToPipeline(FlowAdapters.toProcessor[A, B](this)) } } From 592fc8fa911ec21c76ff004c1a8c4c9f6f7cd0ed Mon Sep 17 00:00:00 2001 From: Maxim Schuwalow Date: Thu, 12 Oct 2023 17:44:38 +0200 Subject: [PATCH 06/14] channel conversions --- .../interop/reactivestreams/Adapters.scala | 98 ++++++++++++++++--- .../zio/interop/reactivestreams/package.scala | 15 +++ .../ProcessorToPipelineSpec.scala | 47 ++++++++- 3 files changed, 147 insertions(+), 13 deletions(-) diff --git a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala index 407a051..5510dac 100644 --- a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala +++ b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala @@ -12,8 +12,9 @@ import zio.stream._ import java.util.concurrent.atomic.AtomicReference import zio.stream.internal.AsyncInputConsumer import zio.stream.internal.AsyncInputProducer -import zio.UIO import java.util.concurrent.atomic.AtomicBoolean +import zio.UIO +import scala.util.control.NoStackTrace object Adapters { @@ -43,7 +44,7 @@ object Adapters { def reader: ZChannel[Any, ZNothing, Chunk[I], Any, Nothing, Chunk[I], Unit] = ZChannel.readWith( i => ZChannel.fromZIO(subscription.emit(i)) *> reader, - _ => ???, // impossible + ZChannel.fail, d => ZChannel.fromZIO(subscription.done(d)) *> ZChannel.succeed(()) ) @@ -78,18 +79,58 @@ object Adapters { } yield (subscriber, sinkFiber.join) } + /** Upstream errors will not be passed to the processor. If you want errors to be passed, convert the processor to a + * channel instead. + */ def processorToPipeline[I, O]( processor: Processor[I, O], bufferSize: Int = 16 )(implicit trace: Trace): ZPipeline[Any, Throwable, I, O] = ZPipeline.unwrapScoped { val subscription = new SubscriptionProducer[I](processor)(unsafe) val subscriber = new SubscriberConsumer[O](bufferSize)(unsafe) + val passthrough = new PassthroughAsyncInput(subscription, subscriber) + + for { + _ <- + ZIO.acquireRelease(ZIO.succeed(processor.subscribe(subscriber)))(_ => subscriber.cancelSubscription.forkDaemon) + _ <- ZIO.acquireRelease(ZIO.succeed(processor.onSubscribe(subscription)))(_ => ZIO.succeed(subscription.cancel())) + } yield ZPipeline.fromChannel(ZChannel.fromInput(passthrough).embedInput(passthrough)) + } + + def publisherToChannel[O]( + publisher: Publisher[O], + bufferSize: Int = 16 + )(implicit trace: Trace): ZChannel[Any, Any, Any, Any, Throwable, Chunk[O], Any] = ZChannel.unwrapScoped[Any] { + val subscriber = new SubscriberConsumer[O](bufferSize)(unsafe) + + for { + _ <- + ZIO.acquireRelease(ZIO.succeed(publisher.subscribe(subscriber)))(_ => subscriber.cancelSubscription.forkDaemon) + } yield ZChannel.fromInput(subscriber) + } + + def subscriberToChannel[I]( + consumer: Subscriber[I] + )(implicit trace: Trace): ZChannel[Any, Throwable, Chunk[I], Any, Any, Any, Any] = ZChannel.unwrapScoped[Any] { + val subscription = new SubscriptionProducer[I](consumer)(unsafe) + + for { + _ <- ZIO.acquireRelease(ZIO.succeed(consumer.onSubscribe(subscription)))(_ => ZIO.succeed(subscription.cancel())) + } yield ZChannel.fromZIO(subscription.awaitCancellation).embedInput(subscription) + } + + def processorToChannel[I, O]( + processor: Processor[I, O], + bufferSize: Int = 16 + )(implicit trace: Trace): ZChannel[Any, Throwable, Chunk[I], Any, Throwable, Chunk[O], Any] = ZChannel.unwrapScoped { + val subscription = new SubscriptionProducer[I](processor)(unsafe) + val subscriber = new SubscriberConsumer[O](bufferSize)(unsafe) for { _ <- ZIO.acquireRelease(ZIO.succeed(processor.subscribe(subscriber)))(_ => subscriber.cancelSubscription.forkDaemon) _ <- ZIO.acquireRelease(ZIO.succeed(processor.onSubscribe(subscription)))(_ => ZIO.succeed(subscription.cancel())) - } yield ZPipeline.fromChannel(ZChannel.fromInput(subscriber).embedInput(subscription)) + } yield ZChannel.fromInput(subscriber).embedInput(subscription) } def pipelineToProcessor[R <: Scope, I, O]( @@ -192,7 +233,7 @@ object Adapters { ZIO.succeed { cause.failureOrCause.fold( sub.onError, - c => sub.onError(new FiberFailure(c)) + c => sub.onError(UpstreamDefect(c)) ) } *> canceled.succeed(()) case State.Cancelled => ZIO.interrupt @@ -200,7 +241,7 @@ object Adapters { ZIO.succeed { cause.failureOrCause.fold( sub.onError, - c => sub.onError(new FiberFailure(c)) + c => sub.onError(UpstreamDefect(c)) ) } *> resume.interrupt *> canceled.succeed(()) } @@ -276,26 +317,36 @@ object Adapters { trace: zio.Trace ): UIO[B] = subscription.await.flatMap { sub => ZIO.suspendSucceed { - state.updateAndGet { + state.getAndUpdate { case State.Drained => State.Waiting(Promise.unsafe.make[Nothing, Unit](FiberId.None)) case State.Full => State.Drained case other => other } match { case State.Drained => + // next iteration will wait + takeWith(onError, onElement, onDone) + case State.Full => val data = buffer.pollUpTo(buffer.capacity) - val dataSize = data.size + val dataSize = data.size.toLong if (dataSize > 0) { sub.request(data.size.toLong) ZIO.succeedNow(onElement(data)) } else { ZIO.succeedNow(onElement(Chunk.empty)) } - case State.Full => ??? // impossible - case State.Waiting(promise) => promise.await *> takeWith(onError, onElement, onDone) - case State.Failed(t) => + + case State.Waiting(promise) => + promise.await *> takeWith(onError, onElement, onDone) + case State.Failed(t) => // drain remaining data before failing val data = buffer.pollUpTo(buffer.capacity) - if (data.nonEmpty) ZIO.succeedNow(onElement(data)) else ZIO.succeedNow(onError(Cause.fail(t))) + if (data.nonEmpty) ZIO.succeedNow(onElement(data)) + else { + t match { + case UpstreamDefect(cause) => ZIO.succeedNow(onError(cause)) + case err => ZIO.succeedNow(onError(Cause.fail(t))) + } + } case State.Ended => // drain remaining data before failing val data = buffer.pollUpTo(buffer.capacity) @@ -319,4 +370,29 @@ object Adapters { } + private final case class UpstreamDefect(cause: Cause[Nothing]) extends NoStackTrace { + override def getMessage(): String = s"Upsteam defect: ${cause.prettyPrint}" + } + + class PassthroughAsyncInput[I, O]( + producer: AsyncInputProducer[Nothing, I, Any], + consumer: AsyncInputConsumer[Throwable, O, Any] + ) extends AsyncInputProducer[Throwable, I, Any] + with AsyncInputConsumer[Throwable, O, Any] { + private val error: Promise[Nothing, Cause[Throwable]] = unsafe(implicit u => Promise.unsafe.make(FiberId.None)) + + def takeWith[A](onError: Cause[Throwable] => A, onElement: O => A, onDone: Any => A)(implicit + trace: zio.Trace + ): UIO[A] = + consumer.takeWith(onError, onElement, onDone) race error.await.map(onError) + + def emit(el: I)(implicit trace: zio.Trace): UIO[Any] = producer.emit(el) + + def done(a: Any)(implicit trace: zio.Trace): UIO[Any] = producer.done(a) + + def error(cause: Cause[Throwable])(implicit trace: zio.Trace): UIO[Any] = error.succeed(cause) + def awaitRead(implicit trace: zio.Trace): UIO[Any] = producer.awaitRead + + } + } diff --git a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/package.scala b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/package.scala index 8912a12..497530d 100644 --- a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/package.scala +++ b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/package.scala @@ -7,6 +7,8 @@ import zio.stream.ZSink import zio.stream.ZStream import org.reactivestreams.Processor import zio.stream.ZPipeline +import zio.stream.ZChannel +import zio.Chunk package object reactivestreams { @@ -41,6 +43,11 @@ package object reactivestreams { */ def toZIOStream(qSize: Int = 16)(implicit trace: Trace): ZStream[Any, Throwable, O] = Adapters.publisherToStream(publisher, qSize) + + def toZIOChannel(bufferSize: Int = 16)(implicit + trace: Trace + ): ZChannel[Any, Any, Any, Any, Throwable, Chunk[O], Any] = + Adapters.publisherToChannel(publisher, bufferSize) } final implicit class subscriberToSink[I](private val subscriber: Subscriber[I]) extends AnyVal { @@ -59,12 +66,20 @@ package object reactivestreams { trace: Trace ): ZIO[Scope, Nothing, (E => UIO[Unit], ZSink[Any, Nothing, I, I, Unit])] = Adapters.subscriberToSink(subscriber) + + def toZIOChannel(implicit trace: Trace): ZChannel[Any, Throwable, Chunk[I], Any, Any, Any, Any] = + Adapters.subscriberToChannel(subscriber) } final implicit class processorToPipeline[I, O](private val processor: Processor[I, O]) extends AnyVal { def toZIOPipeline(implicit trace: Trace): ZPipeline[Any, Throwable, I, O] = Adapters.processorToPipeline(processor) + + def toProcessorZIOChannel(implicit + trace: Trace + ): ZChannel[Any, Throwable, Chunk[I], Any, Throwable, Chunk[O], Any] = + Adapters.processorToChannel(processor) } final implicit class pipelineToProcessor[R <: Scope, I, O](private val pipeline: ZPipeline[R, Throwable, I, O]) diff --git a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/ProcessorToPipelineSpec.scala b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/ProcessorToPipelineSpec.scala index a3ed3a1..c43cfba 100644 --- a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/ProcessorToPipelineSpec.scala +++ b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/ProcessorToPipelineSpec.scala @@ -37,15 +37,56 @@ object ProcessorToPipelineSpec extends ZIOSpecDefault { ) ) ) + }, + test("passes through errors without offering them to the processor") { + val processor = new TestProcessor((i: Int) => i.toString()) + val err = new RuntimeException() + + val effect = (ZStream(1, 2) ++ ZStream.fail(err)).via(processor.asPipeline).runCollect + + for { + result <- effect.exit + events <- processor.getEvents + } yield assert(result)(fails(equalTo(err))) && + assert(events)( + equalTo( + List( + ProcessorEvent.OnSubscribe, + ProcessorEvent.OnNext(1), + ProcessorEvent.OnNext(2) + ) + ) + ) + }, + test("passes through errors when converting to a raw channel") { + val processor = new TestProcessor((i: Int) => i.toString()) + val err = new RuntimeException() + + val effect = ((ZStream(1, 2) ++ ZStream.fail(err)).channel >>> processor.asChannel).runCollect + + for { + result <- effect.exit + events <- processor.getEvents + } yield assert(result)(fails(equalTo(err))) && + assert(events)( + equalTo( + List( + ProcessorEvent.OnSubscribe, + ProcessorEvent.OnNext(1), + ProcessorEvent.OnNext(2), + ProcessorEvent.OnError(err) + ) + ) + ) } ) @@ TestAspect.withLiveClock sealed trait ProcessorEvent[+A] object ProcessorEvent { - final case object OnSubscribe extends ProcessorEvent[Nothing] + case object OnSubscribe extends ProcessorEvent[Nothing] final case class OnNext[A](item: A) extends ProcessorEvent[A] final case class OnError(error: Throwable) extends ProcessorEvent[Nothing] - final case object OnComplete extends ProcessorEvent[Nothing] + case object OnComplete extends ProcessorEvent[Nothing] } final class TestProcessor[A, B](f: A => B) extends SubmissionPublisher[B] with Flow.Processor[A, B] { @@ -79,5 +120,7 @@ object ProcessorToPipelineSpec extends ZIOSpecDefault { ZIO.succeed(this.events.toList) def asPipeline = Adapters.processorToPipeline(FlowAdapters.toProcessor[A, B](this)) + + def asChannel = Adapters.processorToChannel(FlowAdapters.toProcessor[A, B](this)) } } From 01548e136b636ccd35610ca2ebc6e6671419f494 Mon Sep 17 00:00:00 2001 From: Maxim Schuwalow Date: Fri, 13 Oct 2023 00:12:23 +0200 Subject: [PATCH 07/14] wip --- .../interop/reactivestreams/Adapters.scala | 67 +++++++++++++++++-- 1 file changed, 63 insertions(+), 4 deletions(-) diff --git a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala index 5510dac..f4c4920 100644 --- a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala +++ b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala @@ -97,6 +97,67 @@ object Adapters { } yield ZPipeline.fromChannel(ZChannel.fromInput(passthrough).embedInput(passthrough)) } + def channelToPublisher[O]( + channel: ZChannel[Any, Any, Any, Any, Throwable, Chunk[O], Any] + ): ZIO[Scope, Nothing, Publisher[O]] = ZIO.runtime[Scope].map { runtime => + new Publisher[O] { + def subscribe(subscriber: Subscriber[_ >: O]): Unit = + if (subscriber == null) { + throw new NullPointerException("Subscriber must not be null.") + } else { + val subscription = new SubscriptionProducer[O](subscriber)(unsafe) + unsafe { implicit u => + runtime.unsafe.run { + for { + _ <- ZIO.acquireRelease(ZIO.succeed(subscriber.onSubscribe(subscription)))(_ => ZIO.succeed(subscription.cancel())) + _ <- (channel >>> ZChannel.fromZIO(subscription.awaitCancellation).embedInput(subscription)).runDrain.forkScoped + } yield () + }.getOrThrow() + } + } + } + } + + def channelToSubscriber[I]( + channel: ZChannel[Any, Throwable, Chunk[I], Any, Any, Any, Any], + bufferSize: Int = 16 + ): ZIO[Scope, Nothing, Subscriber[I]] = ZIO.suspendSucceed { + val subscriber = new SubscriberConsumer[I](bufferSize)(unsafe) + for { + _ <- ZIO.addFinalizer(subscriber.cancelSubscription.forkDaemon) + _ <- (ZChannel.fromInput(subscriber) >>> channel).runDrain.forkScoped + } yield subscriber + } + + def channelToProcessor[I, O]( + channel: ZChannel[Any, Throwable, Chunk[I], Any, Throwable, Chunk[O], Any], + bufferSize: Int = 16 + ): ZIO[Scope, Nothing, Processor[I, O]] = + for { + runtime <- ZIO.runtime[Scope] + subscriber = new SubscriberConsumer[I](bufferSize)(unsafe) + } yield new Processor[I, O] { + def onSubscribe(s: Subscription): Unit = subscriber.onSubscribe(s) + + def onNext(t: I): Unit = subscriber.onNext(t) + + def onError(t: Throwable): Unit = subscriber.onError(t) + + def onComplete(): Unit = subscriber.onComplete() + + def subscribe(s: Subscriber[_ >: O]): Unit = { + val subscription = new SubscriptionProducer[O](s)(unsafe) + s.onSubscribe(subscription) + unsafe { implicit u => + runtime.unsafe.fork { + (ZChannel.fromInput(subscriber) >>> channel >>> ZChannel + .fromZIO(subscription.awaitCancellation *> subscriber.cancelSubscription).embedInput(subscription)).runDrain + } + } + () + } + } + def publisherToChannel[O]( publisher: Publisher[O], bufferSize: Int = 16 @@ -154,12 +215,10 @@ object Adapters { s.onSubscribe(subscription) unsafe { implicit u => runtime.unsafe.fork { - ((ZChannel.fromInput(subscriber) pipeToOrFail pipeline.toChannel) >>> ZChannel - .fromZIO(subscription.awaitCancellation *> subscriber.cancelSubscription) - .embedInput(subscription)).runDrain + ((ZChannel.fromInput(subscriber) pipeToOrFail pipeline.toChannel) >>> ZChannel.fromZIO(subscription.awaitCancellation *> subscriber.cancelSubscription).embedInput(subscription)).runDrain } - () } + () } } From c494cda957fb770da623d472e9186489ec069e35 Mon Sep 17 00:00:00 2001 From: Maxim Schuwalow Date: Fri, 13 Oct 2023 17:52:29 +0200 Subject: [PATCH 08/14] wip --- .../interop/reactivestreams/Adapters.scala | 43 +++++++++++-------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala index f4c4920..d430b0f 100644 --- a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala +++ b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala @@ -29,7 +29,7 @@ object Adapters { val subscription = new SubscriptionProducer[O](subscriber) subscriber.onSubscribe(subscription) runtime.unsafe.fork( - (stream.toChannel >>> ZChannel.fromZIO(subscription.awaitCancellation).embedInput(subscription)).runDrain + (stream.toChannel >>> ZChannel.fromZIO(subscription.awaitCompletion).embedInput(subscription)).runDrain ) () } @@ -110,7 +110,7 @@ object Adapters { runtime.unsafe.run { for { _ <- ZIO.acquireRelease(ZIO.succeed(subscriber.onSubscribe(subscription)))(_ => ZIO.succeed(subscription.cancel())) - _ <- (channel >>> ZChannel.fromZIO(subscription.awaitCancellation).embedInput(subscription)).runDrain.forkScoped + _ <- (channel >>> ZChannel.fromZIO(subscription.awaitCompletion).embedInput(subscription)).runDrain.forkScoped } yield () }.getOrThrow() } @@ -136,6 +136,7 @@ object Adapters { for { runtime <- ZIO.runtime[Scope] subscriber = new SubscriberConsumer[I](bufferSize)(unsafe) + _ <- ZIO.addFinalizer(subscriber.cancelSubscription.forkDaemon) } yield new Processor[I, O] { def onSubscribe(s: Subscription): Unit = subscriber.onSubscribe(s) @@ -147,14 +148,14 @@ object Adapters { def subscribe(s: Subscriber[_ >: O]): Unit = { val subscription = new SubscriptionProducer[O](s)(unsafe) - s.onSubscribe(subscription) unsafe { implicit u => - runtime.unsafe.fork { - (ZChannel.fromInput(subscriber) >>> channel >>> ZChannel - .fromZIO(subscription.awaitCancellation *> subscriber.cancelSubscription).embedInput(subscription)).runDrain - } + runtime.unsafe.run { + for { + _ <- ZIO.acquireRelease(ZIO.succeed(s.onSubscribe(subscription)))(_ => ZIO.succeed(subscription.cancel())) + _ <- (ZChannel.fromInput(subscriber) >>> channel >>> ZChannel.fromZIO(subscription.awaitCompletion *> subscriber.cancelSubscription).embedInput(subscription)).runDrain.forkScoped + } yield () + }.getOrThrow() } - () } } @@ -177,7 +178,7 @@ object Adapters { for { _ <- ZIO.acquireRelease(ZIO.succeed(consumer.onSubscribe(subscription)))(_ => ZIO.succeed(subscription.cancel())) - } yield ZChannel.fromZIO(subscription.awaitCancellation).embedInput(subscription) + } yield ZChannel.fromZIO(subscription.awaitCompletion).embedInput(subscription) } def processorToChannel[I, O]( @@ -194,7 +195,7 @@ object Adapters { } yield ZChannel.fromInput(subscriber).embedInput(subscription) } - def pipelineToProcessor[R <: Scope, I, O]( + def pipelineToProcessor[R, I, O]( pipeline: ZPipeline[R, Throwable, I, O], bufferSize: Int = 16 )(implicit trace: Trace): ZIO[R, Nothing, Processor[I, O]] = @@ -210,12 +211,16 @@ object Adapters { def onComplete(): Unit = subscriber.onComplete() - def subscribe(s: Subscriber[_ >: O]): Unit = { + def subscribe(s: Subscriber[_ >: O]): Unit = + if (s == null) { + throw new NullPointerException("Subscriber must not be null.") + } else { val subscription = new SubscriptionProducer[O](s)(unsafe) s.onSubscribe(subscription) unsafe { implicit u => runtime.unsafe.fork { - ((ZChannel.fromInput(subscriber) pipeToOrFail pipeline.toChannel) >>> ZChannel.fromZIO(subscription.awaitCancellation *> subscriber.cancelSubscription).embedInput(subscription)).runDrain + ((ZChannel.fromInput(subscriber) pipeToOrFail pipeline.toChannel) >>> + ZChannel.fromZIO(subscription.awaitCompletion *> subscriber.cancelSubscription).embedInput(subscription)).runDrain } } () @@ -228,9 +233,9 @@ object Adapters { import SubscriptionProducer.State private val state: AtomicReference[State[A]] = new AtomicReference(State.initial[A]) - private val canceled: Promise[Nothing, Unit] = Promise.unsafe.make(FiberId.None) + private val completed: Promise[Nothing, Unit] = Promise.unsafe.make(FiberId.None) - val awaitCancellation: UIO[Unit] = canceled.await + val awaitCompletion: UIO[Unit] = completed.await def request(n: Long): Unit = if (n <= 0) sub.onError(new IllegalArgumentException("non-positive subscription request")) @@ -250,7 +255,7 @@ object Adapters { case State.Waiting(resume) => resume.unsafe.done(ZIO.interrupt) case _ => () } - canceled.unsafe.done(ZIO.unit) + completed.unsafe.done(ZIO.unit) } def emit(el: Chunk[A])(implicit trace: zio.Trace): UIO[Any] = ZIO.suspendSucceed { @@ -280,9 +285,9 @@ object Adapters { def done(a: Any)(implicit trace: zio.Trace): UIO[Any] = ZIO.suspendSucceed { state.getAndSet(State.Cancelled) match { - case State.Running(_) => ZIO.succeed(sub.onComplete()) *> canceled.succeed(()) + case State.Running(_) => ZIO.succeed(sub.onComplete()) *> completed.succeed(()) case State.Cancelled => ZIO.interrupt - case State.Waiting(resume) => ZIO.succeed(sub.onComplete()) *> resume.interrupt *> canceled.succeed(()) + case State.Waiting(resume) => ZIO.succeed(sub.onComplete()) *> resume.interrupt *> completed.succeed(()) } } @@ -294,7 +299,7 @@ object Adapters { sub.onError, c => sub.onError(UpstreamDefect(c)) ) - } *> canceled.succeed(()) + } *> completed.succeed(()) case State.Cancelled => ZIO.interrupt case State.Waiting(resume) => ZIO.succeed { @@ -302,7 +307,7 @@ object Adapters { sub.onError, c => sub.onError(UpstreamDefect(c)) ) - } *> resume.interrupt *> canceled.succeed(()) + } *> resume.interrupt *> completed.succeed(()) } } From 6dbb015f45573398070372e8341700f157d6a0d5 Mon Sep 17 00:00:00 2001 From: Maxim Schuwalow Date: Sun, 15 Oct 2023 22:49:41 +0200 Subject: [PATCH 09/14] wip --- .../interop/reactivestreams/Adapters.scala | 48 ++++++++++--------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala index d430b0f..019018d 100644 --- a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala +++ b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala @@ -64,7 +64,7 @@ object Adapters { for { _ <- - ZIO.acquireRelease(ZIO.succeed(publisher.subscribe(subscriber)))(_ => subscriber.cancelSubscription.forkDaemon) + ZIO.acquireRelease(ZIO.succeed(publisher.subscribe(subscriber)))(_ => subscriber.cancelSubscription) } yield ZStream.fromChannel(ZChannel.fromInput(subscriber)) } @@ -74,7 +74,7 @@ object Adapters { )(implicit trace: Trace): ZIO[R with Scope, Throwable, (Subscriber[I], IO[Throwable, Z])] = { val subscriber = new SubscriberConsumer[I](bufferSize)(unsafe) for { - _ <- ZIO.addFinalizer(subscriber.cancelSubscription.forkDaemon) + _ <- ZIO.addFinalizer(subscriber.cancelSubscription) sinkFiber <- (ZChannel.fromInput(subscriber) pipeToOrFail sink.channel).runDrain.forkScoped } yield (subscriber, sinkFiber.join) } @@ -92,7 +92,7 @@ object Adapters { for { _ <- - ZIO.acquireRelease(ZIO.succeed(processor.subscribe(subscriber)))(_ => subscriber.cancelSubscription.forkDaemon) + ZIO.acquireRelease(ZIO.succeed(processor.subscribe(subscriber)))(_ => subscriber.cancelSubscription) _ <- ZIO.acquireRelease(ZIO.succeed(processor.onSubscribe(subscription)))(_ => ZIO.succeed(subscription.cancel())) } yield ZPipeline.fromChannel(ZChannel.fromInput(passthrough).embedInput(passthrough)) } @@ -124,7 +124,7 @@ object Adapters { ): ZIO[Scope, Nothing, Subscriber[I]] = ZIO.suspendSucceed { val subscriber = new SubscriberConsumer[I](bufferSize)(unsafe) for { - _ <- ZIO.addFinalizer(subscriber.cancelSubscription.forkDaemon) + _ <- ZIO.addFinalizer(subscriber.cancelSubscription) _ <- (ZChannel.fromInput(subscriber) >>> channel).runDrain.forkScoped } yield subscriber } @@ -136,7 +136,7 @@ object Adapters { for { runtime <- ZIO.runtime[Scope] subscriber = new SubscriberConsumer[I](bufferSize)(unsafe) - _ <- ZIO.addFinalizer(subscriber.cancelSubscription.forkDaemon) + _ <- ZIO.addFinalizer(subscriber.cancelSubscription) } yield new Processor[I, O] { def onSubscribe(s: Subscription): Unit = subscriber.onSubscribe(s) @@ -167,7 +167,7 @@ object Adapters { for { _ <- - ZIO.acquireRelease(ZIO.succeed(publisher.subscribe(subscriber)))(_ => subscriber.cancelSubscription.forkDaemon) + ZIO.acquireRelease(ZIO.succeed(publisher.subscribe(subscriber)))(_ => subscriber.cancelSubscription) } yield ZChannel.fromInput(subscriber) } @@ -190,18 +190,20 @@ object Adapters { for { _ <- - ZIO.acquireRelease(ZIO.succeed(processor.subscribe(subscriber)))(_ => subscriber.cancelSubscription.forkDaemon) + ZIO.acquireRelease(ZIO.succeed(processor.subscribe(subscriber)))(_ => subscriber.cancelSubscription) _ <- ZIO.acquireRelease(ZIO.succeed(processor.onSubscribe(subscription)))(_ => ZIO.succeed(subscription.cancel())) } yield ZChannel.fromInput(subscriber).embedInput(subscription) } - def pipelineToProcessor[R, I, O]( + def pipelineToProcessor[R <: Scope, I, O]( pipeline: ZPipeline[R, Throwable, I, O], bufferSize: Int = 16 )(implicit trace: Trace): ZIO[R, Nothing, Processor[I, O]] = for { - runtime <- ZIO.runtime[R] - subscriber = new SubscriberConsumer[I](bufferSize)(unsafe) + runtime <- ZIO.runtime[R] + subscriber = new SubscriberConsumer[I](bufferSize)(unsafe) + outerFinalizerRef <- Ref.make(subscriber.cancelSubscription) + _ <- ZIO.addFinalizer(outerFinalizerRef.get.flatten) } yield new Processor[I, O] { def onSubscribe(s: Subscription): Unit = subscriber.onSubscribe(s) @@ -211,19 +213,18 @@ object Adapters { def onComplete(): Unit = subscriber.onComplete() - def subscribe(s: Subscriber[_ >: O]): Unit = - if (s == null) { - throw new NullPointerException("Subscriber must not be null.") - } else { + def subscribe(s: Subscriber[_ >: O]): Unit = { val subscription = new SubscriptionProducer[O](s)(unsafe) - s.onSubscribe(subscription) unsafe { implicit u => - runtime.unsafe.fork { - ((ZChannel.fromInput(subscriber) pipeToOrFail pipeline.toChannel) >>> - ZChannel.fromZIO(subscription.awaitCompletion *> subscriber.cancelSubscription).embedInput(subscription)).runDrain - } + runtime.unsafe.run { + for { + finalizerRef <- Ref.make(ZIO.unit) + _ <- ZIO.addFinalizer(finalizerRef.get.flatten) + _ <- (ZIO.succeed(s.onSubscribe(subscription)) *> finalizerRef.set(ZIO.succeed(subscription.cancel()))).uninterruptible + _ <- ((ZChannel.fromInput(subscriber) pipeToOrFail pipeline.toChannel) >>> ZChannel.fromZIO(subscription.awaitCompletion *> ZIO.succeed(subscription.cancel()) *> finalizerRef.set(ZIO.unit) *> subscriber.cancelSubscription *> outerFinalizerRef.set(ZIO.unit)).embedInput(subscription)).runDrain.forkScoped + } yield () + }.getOrThrow() } - () } } @@ -333,10 +334,10 @@ object Adapters { private val subscription: Promise[Nothing, Subscription] = Promise.unsafe.make(FiberId.None) private val buffer: RingBuffer[A] = RingBuffer(capacity) private val state: AtomicReference[State] = new AtomicReference(State.Drained) - private val isSubscribed: AtomicBoolean = new AtomicBoolean(false) + private val isSubscribedOrCanceled: AtomicBoolean = new AtomicBoolean(false) def onSubscribe(s: Subscription): Unit = - if (!isSubscribed.compareAndSet(false, true)) { + if (!isSubscribedOrCanceled.compareAndSet(false, true)) { s.cancel() } else { subscription.unsafe.done(ZIO.succeedNow(s)) @@ -375,7 +376,8 @@ object Adapters { } def cancelSubscription: UIO[Unit] = - subscription.await.tap(s => ZIO.succeed(s.cancel())).unit + ZIO.succeed(isSubscribedOrCanceled.set(true)) *> + subscription.poll.flatMap(ZIO.foreachDiscard(_)(_.map(_.cancel()))) def takeWith[B](onError: Cause[Throwable] => B, onElement: Chunk[A] => B, onDone: Any => B)(implicit trace: zio.Trace From 84c182d955ce6806987d3b6b501f4431e4c350fb Mon Sep 17 00:00:00 2001 From: Maxim Schuwalow Date: Mon, 16 Oct 2023 16:43:32 +0200 Subject: [PATCH 10/14] cleanup --- .../interop/reactivestreams/Adapters.scala | 165 +++++++++--------- 1 file changed, 80 insertions(+), 85 deletions(-) diff --git a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala index 019018d..56eef42 100644 --- a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala +++ b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala @@ -20,20 +20,19 @@ object Adapters { def streamToPublisher[R, E <: Throwable, O]( stream: => ZStream[R, E, O] - )(implicit trace: Trace): ZIO[R, Nothing, Publisher[O]] = - ZIO.runtime[R].map { runtime => subscriber => - if (subscriber == null) { - throw new NullPointerException("Subscriber must not be null.") - } else - unsafe { implicit unsafe => - val subscription = new SubscriptionProducer[O](subscriber) - subscriber.onSubscribe(subscription) - runtime.unsafe.fork( - (stream.toChannel >>> ZChannel.fromZIO(subscription.awaitCompletion).embedInput(subscription)).runDrain - ) - () - } - } + )(implicit trace: Trace): ZIO[R, Nothing, Publisher[O]] = ZIO.runtime[R].map { runtime => subscriber => + if (subscriber == null) { + throw new NullPointerException("Subscriber must not be null.") + } else + unsafe { implicit unsafe => + val subscription = new SubscriptionProducer[O](subscriber) + subscriber.onSubscribe(subscription) + runtime.unsafe.fork( + (stream.toChannel >>> ZChannel.fromZIO(subscription.awaitCompletion).embedInput(subscription)).runDrain + ) + () + } + } def subscriberToSink[E <: Throwable, I]( subscriber: => Subscriber[I] @@ -42,9 +41,9 @@ object Adapters { unsafe { implicit unsafe => val subscription = new SubscriptionProducer[I](subscriber) - def reader: ZChannel[Any, ZNothing, Chunk[I], Any, Nothing, Chunk[I], Unit] = ZChannel.readWith( + def reader: ZChannel[Any, ZNothing, Chunk[I], Any, Nothing, Chunk[I], Unit] = ZChannel.readWithCause( i => ZChannel.fromZIO(subscription.emit(i)) *> reader, - ZChannel.fail, + e => ZChannel.failCause(e), d => ZChannel.fromZIO(subscription.done(d)) *> ZChannel.succeed(()) ) @@ -59,24 +58,26 @@ object Adapters { def publisherToStream[O]( publisher: => Publisher[O], bufferSize: => Int - )(implicit trace: Trace): ZStream[Any, Throwable, O] = ZStream.unwrapScoped { - val subscriber = new SubscriberConsumer[O](bufferSize)(unsafe) - - for { - _ <- - ZIO.acquireRelease(ZIO.succeed(publisher.subscribe(subscriber)))(_ => subscriber.cancelSubscription) - } yield ZStream.fromChannel(ZChannel.fromInput(subscriber)) - } + )(implicit trace: Trace): ZStream[Any, Throwable, O] = publisherToChannel(publisher, bufferSize).toStream def sinkToSubscriber[R, I, L, Z]( sink: => ZSink[R, Throwable, I, L, Z], bufferSize: => Int )(implicit trace: Trace): ZIO[R with Scope, Throwable, (Subscriber[I], IO[Throwable, Z])] = { - val subscriber = new SubscriberConsumer[I](bufferSize)(unsafe) + def promChannel(prom: Promise[Throwable, Z]): ZChannel[R, Throwable, Chunk[L], Z, Any, Any, Any] = + ZChannel.readWithCause( + ZChannel.write(_) *> promChannel(prom), + e => ZChannel.fromZIO(prom.failCause(e)) *> ZChannel.failCause(e), + d => ZChannel.fromZIO(prom.succeed(d)) *> ZChannel.succeed(d) + ) + for { - _ <- ZIO.addFinalizer(subscriber.cancelSubscription) - sinkFiber <- (ZChannel.fromInput(subscriber) pipeToOrFail sink.channel).runDrain.forkScoped - } yield (subscriber, sinkFiber.join) + prom <- Promise.make[Throwable, Z] + subscriber <- channelToSubscriber( + (ZChannel.identity[Throwable, Chunk[I], Any] pipeToOrFail sink.channel) >>> promChannel(prom), + bufferSize + ) + } yield (subscriber, prom.await) } /** Upstream errors will not be passed to the processor. If you want errors to be passed, convert the processor to a @@ -97,9 +98,15 @@ object Adapters { } yield ZPipeline.fromChannel(ZChannel.fromInput(passthrough).embedInput(passthrough)) } - def channelToPublisher[O]( - channel: ZChannel[Any, Any, Any, Any, Throwable, Chunk[O], Any] - ): ZIO[Scope, Nothing, Publisher[O]] = ZIO.runtime[Scope].map { runtime => + def pipelineToProcessor[R <: Scope, I, O]( + pipeline: ZPipeline[R, Throwable, I, O], + bufferSize: Int = 16 + )(implicit trace: Trace): ZIO[R, Nothing, Processor[I, O]] = + channelToProcessor(ZChannel.identity pipeToOrFail pipeline.channel, bufferSize) + + def channelToPublisher[R <: Scope, O]( + channel: ZChannel[R, Any, Any, Any, Throwable, Chunk[O], Any] + ): ZIO[R, Nothing, Publisher[O]] = ZIO.runtime[R].map { runtime => new Publisher[O] { def subscribe(subscriber: Subscriber[_ >: O]): Unit = if (subscriber == null) { @@ -109,8 +116,12 @@ object Adapters { unsafe { implicit u => runtime.unsafe.run { for { - _ <- ZIO.acquireRelease(ZIO.succeed(subscriber.onSubscribe(subscription)))(_ => ZIO.succeed(subscription.cancel())) - _ <- (channel >>> ZChannel.fromZIO(subscription.awaitCompletion).embedInput(subscription)).runDrain.forkScoped + _ <- ZIO.acquireRelease(ZIO.succeed(subscriber.onSubscribe(subscription)))(_ => + ZIO.succeed(subscription.cancel()) + ) + _ <- (channel >>> ZChannel + .fromZIO(subscription.awaitCompletion) + .embedInput(subscription)).runDrain.forkScoped } yield () }.getOrThrow() } @@ -118,23 +129,23 @@ object Adapters { } } - def channelToSubscriber[I]( - channel: ZChannel[Any, Throwable, Chunk[I], Any, Any, Any, Any], + def channelToSubscriber[R <: Scope, I]( + channel: ZChannel[R, Throwable, Chunk[I], Any, Any, Any, Any], bufferSize: Int = 16 - ): ZIO[Scope, Nothing, Subscriber[I]] = ZIO.suspendSucceed { - val subscriber = new SubscriberConsumer[I](bufferSize)(unsafe) + ): ZIO[R, Nothing, Subscriber[I]] = ZIO.suspendSucceed { + val subscriber = new SubscriberConsumer[I](bufferSize)(unsafe) for { _ <- ZIO.addFinalizer(subscriber.cancelSubscription) _ <- (ZChannel.fromInput(subscriber) >>> channel).runDrain.forkScoped } yield subscriber } - def channelToProcessor[I, O]( - channel: ZChannel[Any, Throwable, Chunk[I], Any, Throwable, Chunk[O], Any], + def channelToProcessor[R <: Scope, I, O]( + channel: ZChannel[R, Throwable, Chunk[I], Any, Throwable, Chunk[O], Any], bufferSize: Int = 16 - ): ZIO[Scope, Nothing, Processor[I, O]] = + ): ZIO[R, Nothing, Processor[I, O]] = for { - runtime <- ZIO.runtime[Scope] + runtime <- ZIO.runtime[R] subscriber = new SubscriberConsumer[I](bufferSize)(unsafe) _ <- ZIO.addFinalizer(subscriber.cancelSubscription) } yield new Processor[I, O] { @@ -151,8 +162,15 @@ object Adapters { unsafe { implicit u => runtime.unsafe.run { for { - _ <- ZIO.acquireRelease(ZIO.succeed(s.onSubscribe(subscription)))(_ => ZIO.succeed(subscription.cancel())) - _ <- (ZChannel.fromInput(subscriber) >>> channel >>> ZChannel.fromZIO(subscription.awaitCompletion *> subscriber.cancelSubscription).embedInput(subscription)).runDrain.forkScoped + finalizerRef <- Ref.make(ZIO.unit) + _ <- ZIO.addFinalizer(finalizerRef.get.flatten) + _ <- (ZIO.succeed(s.onSubscribe(subscription)) *> finalizerRef.set( + ZIO.succeed(subscription.cancel()) + )).uninterruptible + _ <- + (ZChannel.fromInput(subscriber) >>> channel >>> ZChannel + .fromZIO(subscription.awaitCompletion *> finalizerRef.set(ZIO.unit) *> subscriber.cancelSubscription) + .embedInput(subscription)).runDrain.forkScoped } yield () }.getOrThrow() } @@ -195,45 +213,12 @@ object Adapters { } yield ZChannel.fromInput(subscriber).embedInput(subscription) } - def pipelineToProcessor[R <: Scope, I, O]( - pipeline: ZPipeline[R, Throwable, I, O], - bufferSize: Int = 16 - )(implicit trace: Trace): ZIO[R, Nothing, Processor[I, O]] = - for { - runtime <- ZIO.runtime[R] - subscriber = new SubscriberConsumer[I](bufferSize)(unsafe) - outerFinalizerRef <- Ref.make(subscriber.cancelSubscription) - _ <- ZIO.addFinalizer(outerFinalizerRef.get.flatten) - } yield new Processor[I, O] { - def onSubscribe(s: Subscription): Unit = subscriber.onSubscribe(s) - - def onNext(t: I): Unit = subscriber.onNext(t) - - def onError(t: Throwable): Unit = subscriber.onError(t) - - def onComplete(): Unit = subscriber.onComplete() - - def subscribe(s: Subscriber[_ >: O]): Unit = { - val subscription = new SubscriptionProducer[O](s)(unsafe) - unsafe { implicit u => - runtime.unsafe.run { - for { - finalizerRef <- Ref.make(ZIO.unit) - _ <- ZIO.addFinalizer(finalizerRef.get.flatten) - _ <- (ZIO.succeed(s.onSubscribe(subscription)) *> finalizerRef.set(ZIO.succeed(subscription.cancel()))).uninterruptible - _ <- ((ZChannel.fromInput(subscriber) pipeToOrFail pipeline.toChannel) >>> ZChannel.fromZIO(subscription.awaitCompletion *> ZIO.succeed(subscription.cancel()) *> finalizerRef.set(ZIO.unit) *> subscriber.cancelSubscription *> outerFinalizerRef.set(ZIO.unit)).embedInput(subscription)).runDrain.forkScoped - } yield () - }.getOrThrow() - } - } - } - private class SubscriptionProducer[A](sub: Subscriber[_ >: A])(implicit unsafe: Unsafe) extends Subscription with AsyncInputProducer[Throwable, Chunk[A], Any] { import SubscriptionProducer.State - private val state: AtomicReference[State[A]] = new AtomicReference(State.initial[A]) + private val state: AtomicReference[State[A]] = new AtomicReference(State.initial[A]) private val completed: Promise[Nothing, Unit] = Promise.unsafe.make(FiberId.None) val awaitCompletion: UIO[Unit] = completed.await @@ -347,8 +332,9 @@ object Adapters { def onNext(t: A): Unit = if (t == null) { throw new NullPointerException("t was null in onNext") + } else if (!buffer.offer(t)) { + throw new IllegalStateException("buffer is full") } else { - buffer.offer(t) state.getAndUpdate { case State.Drained => State.Full case State.Waiting(_) => State.Full @@ -377,21 +363,23 @@ object Adapters { def cancelSubscription: UIO[Unit] = ZIO.succeed(isSubscribedOrCanceled.set(true)) *> - subscription.poll.flatMap(ZIO.foreachDiscard(_)(_.map(_.cancel()))) + subscription.poll.flatMap(ZIO.foreachDiscard(_)(_.map(_.cancel()))) *> + subscription.interrupt.unit *> + ZIO.succeed(state.getAndSet(State.Canceled) match { + case State.Waiting(promise) => promise.unsafe.done(ZIO.unit) + case _ => () + }) def takeWith[B](onError: Cause[Throwable] => B, onElement: Chunk[A] => B, onDone: Any => B)(implicit trace: zio.Trace ): UIO[B] = subscription.await.flatMap { sub => ZIO.suspendSucceed { - state.getAndUpdate { + state.updateAndGet { case State.Drained => State.Waiting(Promise.unsafe.make[Nothing, Unit](FiberId.None)) case State.Full => State.Drained case other => other } match { case State.Drained => - // next iteration will wait - takeWith(onError, onElement, onDone) - case State.Full => val data = buffer.pollUpTo(buffer.capacity) val dataSize = data.size.toLong if (dataSize > 0) { @@ -401,8 +389,11 @@ object Adapters { ZIO.succeedNow(onElement(Chunk.empty)) } + case State.Full => ??? // impossible + case State.Waiting(promise) => promise.await *> takeWith(onError, onElement, onDone) + case State.Failed(t) => // drain remaining data before failing val data = buffer.pollUpTo(buffer.capacity) @@ -417,6 +408,9 @@ object Adapters { // drain remaining data before failing val data = buffer.pollUpTo(buffer.capacity) if (data.nonEmpty) ZIO.succeedNow(onElement(data)) else ZIO.succeedNow(onDone(())) + + case State.Canceled => + ZIO.interrupt } } } @@ -427,11 +421,13 @@ object Adapters { sealed trait State object State { + final case class Waiting(promise: Promise[Nothing, Unit]) extends State case object Drained extends State case object Full extends State - final case class Waiting(promise: Promise[Nothing, Unit]) extends State final case class Failed(cause: Throwable) extends State case object Ended extends State + case object Canceled extends State + } } @@ -460,5 +456,4 @@ object Adapters { def awaitRead(implicit trace: zio.Trace): UIO[Any] = producer.awaitRead } - } From 141bfa1e4d22bbcafc0aea4b6c0835a6ea5f4fc7 Mon Sep 17 00:00:00 2001 From: Maxim Schuwalow Date: Mon, 16 Oct 2023 17:16:19 +0200 Subject: [PATCH 11/14] add subscriber -> channel test --- .../zio/interop/reactivestreams/Adapters.scala | 6 +++--- .../reactivestreams/SubscriberToSinkSpec.scala | 17 ++++++++++++++++- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala index 56eef42..b661c56 100644 --- a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala +++ b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala @@ -313,7 +313,7 @@ object Adapters { private class SubscriberConsumer[A](capacity: Int)(implicit unsafe: Unsafe) extends Subscriber[A] - with AsyncInputConsumer[Throwable, Chunk[A], Any] { + with AsyncInputConsumer[Throwable, Chunk[A], Unit] { import SubscriberConsumer.State private val subscription: Promise[Nothing, Subscription] = Promise.unsafe.make(FiberId.None) @@ -370,7 +370,7 @@ object Adapters { case _ => () }) - def takeWith[B](onError: Cause[Throwable] => B, onElement: Chunk[A] => B, onDone: Any => B)(implicit + def takeWith[B](onError: Cause[Throwable] => B, onElement: Chunk[A] => B, onDone: Unit => B)(implicit trace: zio.Trace ): UIO[B] = subscription.await.flatMap { sub => ZIO.suspendSucceed { @@ -401,7 +401,7 @@ object Adapters { else { t match { case UpstreamDefect(cause) => ZIO.succeedNow(onError(cause)) - case err => ZIO.succeedNow(onError(Cause.fail(t))) + case err => ZIO.succeedNow(onError(Cause.fail(err))) } } case State.Ended => diff --git a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/SubscriberToSinkSpec.scala b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/SubscriberToSinkSpec.scala index c7128c8..193f305 100644 --- a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/SubscriberToSinkSpec.scala +++ b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/SubscriberToSinkSpec.scala @@ -88,7 +88,22 @@ object SubscriberToSinkSpec extends ZIOSpecDefault { err2 <- probe.expectError.timeout(100.millis).exit } yield assert(err)(succeeds(equalTo(e))) && assert(err2)(fails(anything)) } - } + }, + test("transports errors when transforming to channel") { + makeSubscriber.flatMap(probe => + ZIO.scoped[Any] { + val channel = probe.underlying.toZIOChannel + for { + fiber <- ((ZStream.fromIterable(seq) ++ ZStream.fail(e)).channel >>> channel).runDrain.fork + _ <- ZIO.sleep(100.millis) + _ <- probe.request(length + 1) + elements <- probe.nextElements(length).exit + err <- probe.expectError.exit + _ <- fiber.join + } yield assert(elements)(succeeds(equalTo(seq))) && assert(err)(succeeds(equalTo(e))) + } + ) + } @@ TestAspect.withLiveClock ) val seq: List[Int] = List.range(0, 31) From 1218676819995a24485314d9d5d335725f315358 Mon Sep 17 00:00:00 2001 From: Maxim Schuwalow Date: Mon, 16 Oct 2023 17:26:37 +0200 Subject: [PATCH 12/14] adjust ci --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9e8bba7..5e79307 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,7 +32,7 @@ jobs: strategy: fail-fast: false matrix: - java: ['adopt@1.8', 'adopt@1.11'] + java: ['adopt@1.9', 'adopt@1.11'] scala: ['2.11.12', '2.12.15', '2.13.8', '3.2.1'] steps: - name: Checkout current branch @@ -60,7 +60,7 @@ jobs: uses: coursier/cache-action@v6 - name: Check Document Generation run: sbt docs/compileDocs - + publish: runs-on: ubuntu-20.04 needs: [lint, test, website] From 0e5d429187993136dc1434f9e6cab16b0d2d2a61 Mon Sep 17 00:00:00 2001 From: Maxim Schuwalow Date: Mon, 16 Oct 2023 17:53:28 +0200 Subject: [PATCH 13/14] make tests run on 8 --- .github/workflows/ci.yml | 2 +- build.sbt | 13 +++---- .../interop/reactivestreams/Adapters.scala | 2 +- .../ProcessorToPipelineSpec.scala | 35 ++++++++++++++++--- .../StreamToPublisherSpec.scala | 15 ++++++-- 5 files changed, 51 insertions(+), 16 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5e79307..c0e7801 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,7 +32,7 @@ jobs: strategy: fail-fast: false matrix: - java: ['adopt@1.9', 'adopt@1.11'] + java: ['adopt@1.8', 'adopt@1.11'] scala: ['2.11.12', '2.12.15', '2.13.8', '3.2.1'] steps: - name: Checkout current branch diff --git a/build.sbt b/build.sbt index 4d55ac0..a135eff 100644 --- a/build.sbt +++ b/build.sbt @@ -55,12 +55,13 @@ lazy val interopReactiveStreams = project .settings(testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework")) .settings( libraryDependencies ++= Seq( - "dev.zio" %% "zio" % zioVersion, - "dev.zio" %% "zio-streams" % zioVersion, - "dev.zio" %% "zio-test" % zioVersion % Test, - "dev.zio" %% "zio-test-sbt" % zioVersion % Test, - "org.reactivestreams" % "reactive-streams" % rsVersion, - "org.reactivestreams" % "reactive-streams-tck" % rsVersion % Test + "dev.zio" %% "zio" % zioVersion, + "dev.zio" %% "zio-streams" % zioVersion, + "dev.zio" %% "zio-test" % zioVersion % Test, + "dev.zio" %% "zio-test-sbt" % zioVersion % Test, + "org.reactivestreams" % "reactive-streams" % rsVersion, + "org.reactivestreams" % "reactive-streams-tck" % rsVersion % Test, + "net.sourceforge.streamsupport" % "streamsupport-flow" % "1.7.4" % Test ), libraryDependencies ++= { if (scalaVersion.value == ScalaDotty) diff --git a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala index b661c56..a8b93a5 100644 --- a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala +++ b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala @@ -364,7 +364,7 @@ object Adapters { def cancelSubscription: UIO[Unit] = ZIO.succeed(isSubscribedOrCanceled.set(true)) *> subscription.poll.flatMap(ZIO.foreachDiscard(_)(_.map(_.cancel()))) *> - subscription.interrupt.unit *> + subscription.interrupt.exit *> ZIO.succeed(state.getAndSet(State.Canceled) match { case State.Waiting(promise) => promise.unsafe.done(ZIO.unit) case _ => () diff --git a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/ProcessorToPipelineSpec.scala b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/ProcessorToPipelineSpec.scala index c43cfba..a93717d 100644 --- a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/ProcessorToPipelineSpec.scala +++ b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/ProcessorToPipelineSpec.scala @@ -7,7 +7,8 @@ import zio.stream.ZStream import zio.test.Assertion._ import zio.test._ import scala.collection.mutable.ListBuffer -import java.util.concurrent.SubmissionPublisher +import java8.util.concurrent.SubmissionPublisher +import java8.util.concurrent.{ Flow => Flow8 } import java.util.concurrent.Flow import org.reactivestreams.FlowAdapters @@ -89,9 +90,10 @@ object ProcessorToPipelineSpec extends ZIOSpecDefault { case object OnComplete extends ProcessorEvent[Nothing] } - final class TestProcessor[A, B](f: A => B) extends SubmissionPublisher[B] with Flow.Processor[A, B] { + final class TestProcessor[A, B](f: A => B) extends Flow.Processor[A, B] { private var subscription: Flow.Subscription = null + private val submissionPublisher = new SubmissionPublisher[B]() private val events = ListBuffer[ProcessorEvent[A]]() def onSubscribe(subscription: Flow.Subscription): Unit = { @@ -102,25 +104,48 @@ object ProcessorToPipelineSpec extends ZIOSpecDefault { def onNext(item: A): Unit = { this.events += ProcessorEvent.OnNext(item) - submit(f(item)); + submissionPublisher.submit(f(item)); subscription.request(1); } def onError(error: Throwable): Unit = { this.events += ProcessorEvent.OnError(error) - closeExceptionally(error); + submissionPublisher.closeExceptionally(error); } def onComplete(): Unit = { this.events += ProcessorEvent.OnComplete - close(); + submissionPublisher.close(); } def getEvents: UIO[List[ProcessorEvent[A]]] = ZIO.succeed(this.events.toList) + def subscribe(subscriber: Flow.Subscriber[_ >: B]): Unit = + submissionPublisher.subscribe(new CompatSubscriber[B](subscriber)) + def asPipeline = Adapters.processorToPipeline(FlowAdapters.toProcessor[A, B](this)) def asChannel = Adapters.processorToChannel(FlowAdapters.toProcessor[A, B](this)) + + } + + final class CompatSubscriber[B](underlying: Flow.Subscriber[_ >: B]) extends Flow8.Subscriber[B] { + def onSubscribe(subscription: Flow8.Subscription): Unit = + underlying.onSubscribe(new CompatSubscription(subscription)) + + def onNext(item: B): Unit = underlying.onNext(item) + + def onError(throwable: Throwable): Unit = underlying.onError(throwable) + + def onComplete(): Unit = underlying.onComplete() + + } + + final class CompatSubscription(underlying: Flow8.Subscription) extends Flow.Subscription { + def request(n: Long): Unit = + underlying.request(n) + def cancel(): Unit = + underlying.cancel() } } diff --git a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/StreamToPublisherSpec.scala b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/StreamToPublisherSpec.scala index 0f6210a..a0d5a2e 100644 --- a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/StreamToPublisherSpec.scala +++ b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/StreamToPublisherSpec.scala @@ -10,7 +10,6 @@ import zio.test._ import java.lang.reflect.InvocationTargetException import zio.Unsafe -import org.testng.SkipException object StreamToPublisherSpec extends ZIOSpecDefault { override def spec = @@ -32,7 +31,17 @@ object StreamToPublisherSpec extends ZIOSpecDefault { .getOrThrowFiberFailure() } - override def createFailedPublisher(): Publisher[Int] = null + override def createFailedPublisher(): Publisher[Int] = + Unsafe.unsafe { implicit unsafe => + runtime.unsafe + .run( + ZStream + .fail(new RuntimeException("boom!")) + .map(_.asInstanceOf[Int]) + .toPublisher + ) + .getOrThrowFiberFailure() + } } val tests = @@ -58,7 +67,7 @@ object StreamToPublisherSpec extends ZIOSpecDefault { .unit .refineOrDie { case e: InvocationTargetException => e.getTargetException() } .exit - } yield assert(r)(fails(isSubtype[SkipException](anything)) || succeeds(isUnit)) + } yield assert(r)(succeeds(isUnit)) ) } } From cbdef490ee04e4fbd8d1b718e718a254e4871bfc Mon Sep 17 00:00:00 2001 From: Maxim Schuwalow Date: Mon, 16 Oct 2023 18:07:25 +0200 Subject: [PATCH 14/14] again --- .../interop/reactivestreams/Adapters.scala | 4 +-- .../zio/interop/reactivestreams/package.scala | 4 +-- .../ProcessorToPipelineSpec.scala | 30 ++++++++----------- .../SubscriberToSinkSpec.scala | 2 +- 4 files changed, 17 insertions(+), 23 deletions(-) diff --git a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala index a8b93a5..3cbfbd3 100644 --- a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala +++ b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/Adapters.scala @@ -363,8 +363,8 @@ object Adapters { def cancelSubscription: UIO[Unit] = ZIO.succeed(isSubscribedOrCanceled.set(true)) *> - subscription.poll.flatMap(ZIO.foreachDiscard(_)(_.map(_.cancel()))) *> - subscription.interrupt.exit *> + subscription.poll.flatMap(ZIO.foreachDiscard(_)(_.map(_.cancel()).exit)) *> + subscription.interrupt.unit *> ZIO.succeed(state.getAndSet(State.Canceled) match { case State.Waiting(promise) => promise.unsafe.done(ZIO.unit) case _ => () diff --git a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/package.scala b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/package.scala index 497530d..879930e 100644 --- a/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/package.scala +++ b/zio-interop-reactivestreams/src/main/scala/zio/interop/reactivestreams/package.scala @@ -44,7 +44,7 @@ package object reactivestreams { def toZIOStream(qSize: Int = 16)(implicit trace: Trace): ZStream[Any, Throwable, O] = Adapters.publisherToStream(publisher, qSize) - def toZIOChannel(bufferSize: Int = 16)(implicit + def toPublisherZIOChannel(bufferSize: Int = 16)(implicit trace: Trace ): ZChannel[Any, Any, Any, Any, Throwable, Chunk[O], Any] = Adapters.publisherToChannel(publisher, bufferSize) @@ -67,7 +67,7 @@ package object reactivestreams { ): ZIO[Scope, Nothing, (E => UIO[Unit], ZSink[Any, Nothing, I, I, Unit])] = Adapters.subscriberToSink(subscriber) - def toZIOChannel(implicit trace: Trace): ZChannel[Any, Throwable, Chunk[I], Any, Any, Any, Any] = + def toSubscriberZIOChannel(implicit trace: Trace): ZChannel[Any, Throwable, Chunk[I], Any, Any, Any, Any] = Adapters.subscriberToChannel(subscriber) } diff --git a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/ProcessorToPipelineSpec.scala b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/ProcessorToPipelineSpec.scala index a93717d..e002066 100644 --- a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/ProcessorToPipelineSpec.scala +++ b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/ProcessorToPipelineSpec.scala @@ -9,8 +9,7 @@ import zio.test._ import scala.collection.mutable.ListBuffer import java8.util.concurrent.SubmissionPublisher import java8.util.concurrent.{ Flow => Flow8 } -import java.util.concurrent.Flow -import org.reactivestreams.FlowAdapters +import org.reactivestreams.{ Processor, Subscriber, Subscription } object ProcessorToPipelineSpec extends ZIOSpecDefault { @@ -19,7 +18,7 @@ object ProcessorToPipelineSpec extends ZIOSpecDefault { test("works with a well behaved `Publisher`") { val processor = new TestProcessor((i: Int) => i.toString()) - val effect = ZStream(1, 2, 3, 4, 5).via(processor.asPipeline).runCollect + val effect = ZStream(1, 2, 3, 4, 5).via(processor.toZIOPipeline).runCollect for { result <- effect @@ -43,7 +42,7 @@ object ProcessorToPipelineSpec extends ZIOSpecDefault { val processor = new TestProcessor((i: Int) => i.toString()) val err = new RuntimeException() - val effect = (ZStream(1, 2) ++ ZStream.fail(err)).via(processor.asPipeline).runCollect + val effect = (ZStream(1, 2) ++ ZStream.fail(err)).via(processor.toZIOPipeline).runCollect for { result <- effect.exit @@ -63,7 +62,7 @@ object ProcessorToPipelineSpec extends ZIOSpecDefault { val processor = new TestProcessor((i: Int) => i.toString()) val err = new RuntimeException() - val effect = ((ZStream(1, 2) ++ ZStream.fail(err)).channel >>> processor.asChannel).runCollect + val effect = ((ZStream(1, 2) ++ ZStream.fail(err)).channel >>> processor.toProcessorZIOChannel).runCollect for { result <- effect.exit @@ -90,13 +89,13 @@ object ProcessorToPipelineSpec extends ZIOSpecDefault { case object OnComplete extends ProcessorEvent[Nothing] } - final class TestProcessor[A, B](f: A => B) extends Flow.Processor[A, B] { + final class TestProcessor[A, B](f: A => B) extends Processor[A, B] { - private var subscription: Flow.Subscription = null - private val submissionPublisher = new SubmissionPublisher[B]() - private val events = ListBuffer[ProcessorEvent[A]]() + private var subscription: Subscription = null + private val submissionPublisher = new SubmissionPublisher[B]() + private val events = ListBuffer[ProcessorEvent[A]]() - def onSubscribe(subscription: Flow.Subscription): Unit = { + def onSubscribe(subscription: Subscription): Unit = { this.events += ProcessorEvent.OnSubscribe this.subscription = subscription; subscription.request(1); @@ -121,16 +120,11 @@ object ProcessorToPipelineSpec extends ZIOSpecDefault { def getEvents: UIO[List[ProcessorEvent[A]]] = ZIO.succeed(this.events.toList) - def subscribe(subscriber: Flow.Subscriber[_ >: B]): Unit = + def subscribe(subscriber: Subscriber[_ >: B]): Unit = submissionPublisher.subscribe(new CompatSubscriber[B](subscriber)) - - def asPipeline = Adapters.processorToPipeline(FlowAdapters.toProcessor[A, B](this)) - - def asChannel = Adapters.processorToChannel(FlowAdapters.toProcessor[A, B](this)) - } - final class CompatSubscriber[B](underlying: Flow.Subscriber[_ >: B]) extends Flow8.Subscriber[B] { + final class CompatSubscriber[B](underlying: Subscriber[_ >: B]) extends Flow8.Subscriber[B] { def onSubscribe(subscription: Flow8.Subscription): Unit = underlying.onSubscribe(new CompatSubscription(subscription)) @@ -142,7 +136,7 @@ object ProcessorToPipelineSpec extends ZIOSpecDefault { } - final class CompatSubscription(underlying: Flow8.Subscription) extends Flow.Subscription { + final class CompatSubscription(underlying: Flow8.Subscription) extends Subscription { def request(n: Long): Unit = underlying.request(n) def cancel(): Unit = diff --git a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/SubscriberToSinkSpec.scala b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/SubscriberToSinkSpec.scala index 193f305..c55c536 100644 --- a/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/SubscriberToSinkSpec.scala +++ b/zio-interop-reactivestreams/src/test/scala/zio/interop/reactivestreams/SubscriberToSinkSpec.scala @@ -92,7 +92,7 @@ object SubscriberToSinkSpec extends ZIOSpecDefault { test("transports errors when transforming to channel") { makeSubscriber.flatMap(probe => ZIO.scoped[Any] { - val channel = probe.underlying.toZIOChannel + val channel = probe.underlying.toSubscriberZIOChannel for { fiber <- ((ZStream.fromIterable(seq) ++ ZStream.fail(e)).channel >>> channel).runDrain.fork _ <- ZIO.sleep(100.millis)