From 74fa6d07b034bf9aca43948e0c794d15c34db397 Mon Sep 17 00:00:00 2001 From: Vladislav Dolbilov Date: Sun, 16 Oct 2022 17:54:40 +0300 Subject: [PATCH] fix race in Adapters.publisherToStream (#339) * fix race in Adapters.publisherToStream * fix race in Adapters.publisherToStream Co-authored-by: Simon Schenk --- .../scala/zio/interop/reactivestreams/Adapters.scala | 5 +++-- .../reactivestreams/PublisherToStreamSpec.scala | 11 +++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/main/scala/zio/interop/reactivestreams/Adapters.scala b/src/main/scala/zio/interop/reactivestreams/Adapters.scala index 70b6b4e..22e1efe 100644 --- a/src/main/scala/zio/interop/reactivestreams/Adapters.scala +++ b/src/main/scala/zio/interop/reactivestreams/Adapters.scala @@ -149,7 +149,8 @@ object Adapters { override def await(): IO[Option[Throwable], Unit] = done match { - case Some(value) => ZIO.fail(value) + case Some(value) => + if (q.isEmpty()) ZIO.fail(value) else ZIO.unit case None => val p = Promise.unsafe.make[Option[Throwable], Unit](FiberId.None) toNotify = Some(p) @@ -161,7 +162,7 @@ object Adapters { done.fold(p.await) { e => // The producer has canceled or errored in the meantime. toNotify = None - ZIO.fail(e) + if (q.isEmpty()) ZIO.fail(e) else ZIO.unit } } diff --git a/src/test/scala/zio/interop/reactivestreams/PublisherToStreamSpec.scala b/src/test/scala/zio/interop/reactivestreams/PublisherToStreamSpec.scala index 4f2cc0d..bef3763 100644 --- a/src/test/scala/zio/interop/reactivestreams/PublisherToStreamSpec.scala +++ b/src/test/scala/zio/interop/reactivestreams/PublisherToStreamSpec.scala @@ -3,6 +3,7 @@ package zio.interop.reactivestreams import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import org.reactivestreams.Subscription +import org.reactivestreams.example.unicast.NumberIterablePublisher import org.reactivestreams.tck.TestEnvironment import org.reactivestreams.tck.TestEnvironment.ManualPublisher import zio.Chunk @@ -183,6 +184,16 @@ object PublisherToStreamSpec extends ZIOSpecDefault { succeeds(isUnit) ) ) + }, + test("collect all messages") { + for { + executor <- ZIO.executor + sum <- ZIO + .foreach((1 to 10000).toVector) { _ => + Adapters.publisherToStream(new NumberIterablePublisher(0, 1, executor.asJava), 16).runCount + } + .map(_.sum) + } yield assert(sum)(equalTo(10000L)) } )