diff --git a/build.sbt b/build.sbt index 34b68ef70..f50527959 100644 --- a/build.sbt +++ b/build.sbt @@ -100,7 +100,8 @@ lazy val zioKafka = .settings( libraryDependencies ++= Seq( kafkaClients, - scalaCollectionCompat + scalaCollectionCompat, + "dev.zio" %% "zio-concurrent" % zioVersion.value ) ) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index ee456d1ab..fa10ff9ef 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -445,6 +445,39 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .provideSomeLayer[Kafka](consumer(client, Some(group))) } yield assert(offsets.values.headOption.flatten.map(_.metadata))(isSome(equalTo(metadata))) }, + test("access to the java consumer must be fair") { + val kvs = (1 to 10).toList.map(i => (s"key$i", s"msg$i")) + + val expectedResult = (0 to 9).toList.map(i => i.toLong -> i.toLong) + + for { + topic <- randomTopic + client <- randomClient + group <- randomGroup + + _ <- produceMany(topic, kvs) + committedOffsetRef <- Ref.make(Seq.empty[(Long, Long)]) + + topicPartition = new TopicPartition(topic, 0) + + _ <- Consumer + .plainStream(Subscription.Topics(Set(topic)), Serde.string, Serde.string) + .take(10) + .map(_.offset) + .mapZIO(offsetBatch => + Consumer + .committed(Set(topicPartition)) + .flatMap(offset => + committedOffsetRef.update(map => + map :+ (offsetBatch.offset -> offset(topicPartition).map(_.offset()).getOrElse(0L)) + ) *> offsetBatch.commit + ) + ) + .runDrain + .provideSomeLayer[Kafka](consumer(client, Some(group), commitTimeout = 2.seconds)) + offsets <- committedOffsetRef.get + } yield assert(offsets)(equalTo(expectedResult)) + } @@ TestAspect.timeout(20.seconds), test("handle rebalancing by completing topic-partition streams") { val nrMessages = 50 val nrPartitions = 6 // Must be even and strictly positive diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala index 3d2f2e3dd..a3d6e1fbb 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala @@ -8,16 +8,18 @@ import zio.kafka.consumer.ConsumerSettings import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer import scala.jdk.CollectionConverters._ +import zio.concurrent.ReentrantLock private[consumer] final class ConsumerAccess( private[consumer] val consumer: ByteArrayKafkaConsumer, - access: Semaphore + access: ReentrantLock ) { + def withConsumer[A](f: ByteArrayKafkaConsumer => A): Task[A] = withConsumerZIO[Any, A](c => ZIO.attempt(f(c))) def withConsumerZIO[R, A](f: ByteArrayKafkaConsumer => RIO[R, A]): RIO[R, A] = - access.withPermit(withConsumerNoPermit(f)) + access.lock.zipRight(withConsumerNoPermit(f)).ensuring(access.unlock) private[consumer] def withConsumerNoPermit[R, A]( f: ByteArrayKafkaConsumer => RIO[R, A] @@ -34,7 +36,7 @@ private[consumer] final class ConsumerAccess( * Do not use this method outside of the Runloop */ private[internal] def runloopAccess[R, E, A](f: ByteArrayKafkaConsumer => ZIO[R, E, A]): ZIO[R, E, A] = - access.withPermit(f(consumer)) + access.lock.zipRight(f(consumer)).ensuring(access.unlock) } private[consumer] object ConsumerAccess { @@ -58,6 +60,6 @@ private[consumer] object ConsumerAccess { def make(consumer: ByteArrayKafkaConsumer): ZIO[Scope, Throwable, ConsumerAccess] = for { - access <- Semaphore.make(1) + access <- ReentrantLock.make(fairness = true) } yield new ConsumerAccess(consumer, access) }