diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/InstrumentedSource.scala b/ledger/metrics/src/main/scala/com/daml/metrics/InstrumentedSource.scala index e513c6b8bbad..ff1c4a6e01c7 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/InstrumentedSource.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/InstrumentedSource.scala @@ -3,42 +3,37 @@ package com.daml.metrics -import akka.Done -import akka.stream.scaladsl.{Source, SourceQueueWithComplete} -import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult} +import akka.stream.scaladsl.Source +import akka.stream.{BoundedSourceQueue, Materializer, OverflowStrategy, QueueOfferResult} import com.codahale.metrics.{Counter, Timer} -import com.daml.dec.DirectExecutionContext - -import scala.concurrent.Future object InstrumentedSource { - final class QueueWithComplete[T]( - delegate: SourceQueueWithComplete[(Timer.Context, T)], + final class InstrumentedBoundedSourceQueue[T]( + delegate: BoundedSourceQueue[(Timer.Context, T)], + bufferSize: Int, + capacityCounter: Counter, lengthCounter: Counter, delayTimer: Timer, - ) extends SourceQueueWithComplete[T] { + ) extends BoundedSourceQueue[T] { - override def complete(): Unit = delegate.complete() + override def complete(): Unit = { + delegate.complete() + capacityCounter.dec(bufferSize.toLong) + } override def fail(ex: Throwable): Unit = delegate.fail(ex) - override def watchCompletion(): Future[Done] = delegate.watchCompletion() - - override def offer(elem: T): Future[QueueOfferResult] = { + override def offer(elem: T): QueueOfferResult = { val result = delegate.offer( delayTimer.time() -> elem ) - // Use the `DirectExecutionContext` to ensure that the - // counter is updated as closely as possible to the - // update of the queue, so to offer the most consistent - // reading possible via the counter - result.foreach { + result match { case QueueOfferResult.Enqueued => lengthCounter.inc() case _ => // do nothing - }(DirectExecutionContext) + } result } } @@ -63,26 +58,24 @@ object InstrumentedSource { */ def queue[T]( bufferSize: Int, - overflowStrategy: OverflowStrategy, capacityCounter: Counter, lengthCounter: Counter, delayTimer: Timer, )(implicit materializer: Materializer - ): Source[T, QueueWithComplete[T]] = { - val (queue, source) = - Source.queue[(Timer.Context, T)](bufferSize, overflowStrategy).preMaterialize() + ): Source[T, BoundedSourceQueue[T]] = { + val (boundedQueue, source) = + Source.queue[(Timer.Context, T)](bufferSize).preMaterialize() val instrumentedQueue = - new QueueWithComplete[T](queue, lengthCounter, delayTimer) - // Using `map` and not `wireTap` because the latter is skipped on backpressure. - + new InstrumentedBoundedSourceQueue[T]( + boundedQueue, + bufferSize, + capacityCounter, + lengthCounter, + delayTimer, + ) capacityCounter.inc(bufferSize.toLong) - instrumentedQueue - .watchCompletion() - .andThen { case _ => - capacityCounter.dec(bufferSize.toLong) - }(DirectExecutionContext) source.mapMaterializedValue(_ => instrumentedQueue).map { case (timingContext, item) => timingContext.stop() diff --git a/ledger/metrics/src/test/suite/scala/com/daml/metrics/InstrumentedSourceSpec.scala b/ledger/metrics/src/test/suite/scala/com/daml/metrics/InstrumentedSourceSpec.scala index 424c7325ce0a..7a6704f4a52d 100644 --- a/ledger/metrics/src/test/suite/scala/com/daml/metrics/InstrumentedSourceSpec.scala +++ b/ledger/metrics/src/test/suite/scala/com/daml/metrics/InstrumentedSourceSpec.scala @@ -7,7 +7,7 @@ import scala.util.chaining._ import java.util.concurrent.atomic.AtomicLong import akka.stream.scaladsl.{Keep, Sink, Source} -import akka.stream.{OverflowStrategy, QueueOfferResult} +import akka.stream.QueueOfferResult import com.codahale.metrics.{Counter, Timer} import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll import com.daml.metrics.InstrumentedSourceSpec.SamplingCounter @@ -21,44 +21,7 @@ final class InstrumentedSourceSpec extends AsyncFlatSpec with Matchers with Akka behavior of "InstrumentedSource.queue" - it should "correctly enqueue and track the buffer saturation" in { - - val bufferSize = 500 - - val capacityCounter = new Counter() - val maxBuffered = new InstrumentedSourceSpec.MaxValueCounter() - val delayTimer = new Timer() - - val (source, sink) = - InstrumentedSource - .queue[Int]( - bufferSize, - OverflowStrategy.backpressure, - capacityCounter, - maxBuffered, - delayTimer, - ) - .toMat(Sink.seq)(Keep.both) - .run() - - // The values in the queue are not relevant, hence the random generation - val input = Seq.fill(bufferSize)(util.Random.nextInt()) - - for { - results <- Future.sequence(input.map(source.offer)) - _ = capacityCounter.getCount shouldEqual bufferSize - _ = source.complete() - output <- sink - } yield { - all(results) shouldBe QueueOfferResult.Enqueued - output shouldEqual input - maxBuffered.getCount shouldEqual bufferSize - capacityCounter.getCount shouldEqual 0 - maxBuffered.decrements.get shouldEqual bufferSize - } - } - - it should "correctly measure queue delay" in { + it should "correctly enqueue and measure queue delay" in { val capacityCounter = new Counter() val maxBuffered = new InstrumentedSourceSpec.MaxValueCounter() val delayTimer = new Timer() @@ -66,7 +29,7 @@ final class InstrumentedSourceSpec extends AsyncFlatSpec with Matchers with Akka val (source, sink) = InstrumentedSource - .queue[Int](16, OverflowStrategy.backpressure, capacityCounter, maxBuffered, delayTimer) + .queue[Int](bufferSize, capacityCounter, maxBuffered, delayTimer) .mapAsync(1) { x => akka.pattern.after(5.millis, system.scheduler)(Future(x)) } @@ -75,11 +38,9 @@ final class InstrumentedSourceSpec extends AsyncFlatSpec with Matchers with Akka val input = Seq.fill(bufferSize)(util.Random.nextInt()) - for { - result <- Future.sequence(input.map(source.offer)) - _ = source.complete() - output <- sink - } yield { + val result = input.map(source.offer) + source.complete() + sink.map { output => all(result) shouldBe QueueOfferResult.Enqueued output shouldEqual input delayTimer.getCount shouldEqual bufferSize @@ -87,7 +48,7 @@ final class InstrumentedSourceSpec extends AsyncFlatSpec with Matchers with Akka } } - it should "track the buffer saturation correctly when dropping items" in { + it should "track the buffer saturation correctly" in { val bufferSize = 500 @@ -107,7 +68,7 @@ final class InstrumentedSourceSpec extends AsyncFlatSpec with Matchers with Akka val (source, termination) = InstrumentedSource - .queue[Int](bufferSize, OverflowStrategy.dropNew, capacityCounter, maxBuffered, delayTimer) + .queue[Int](bufferSize, capacityCounter, maxBuffered, delayTimer) .mapAsync(1)(_ => stop.future) // Block until completed to overflow queue. .watchTermination()(Keep.both) .toMat(Sink.ignore)(Keep.left) @@ -118,28 +79,25 @@ final class InstrumentedSourceSpec extends AsyncFlatSpec with Matchers with Akka val inputSize = bufferSize * 2 val input = Seq.fill(inputSize)(util.Random.nextInt()) - for { - results <- Future.sequence(input.map(source.offer)) - _ = capacityCounter.getCount shouldEqual bufferSize - _ = stop.success(()) - _ = source.complete() - _ <- termination - } yield { - val enqueued = results.count { - case QueueOfferResult.Enqueued => true - case _ => false - } - val dropped = results.count { - case QueueOfferResult.Dropped => true - case _ => false - } + val results = input.map(source.offer) + capacityCounter.getCount shouldEqual bufferSize + stop.success(()) + source.complete() + val enqueued = results.count { + case QueueOfferResult.Enqueued => true + case _ => false + } + val dropped = results.count { + case QueueOfferResult.Dropped => true + case _ => false + } + termination.map { _ => inputSize shouldEqual (enqueued + dropped) assert(enqueued >= bufferSize) assert(dropped <= bufferSize) assert(maxBuffered.getCount >= lowAcceptanceThreshold) assert(maxBuffered.getCount <= highAcceptanceThreshold) capacityCounter.getCount shouldEqual 0 - } } diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/QueueBackedTracker.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/QueueBackedTracker.scala index 656e88915a49..f5382f7f3226 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/QueueBackedTracker.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/services/tracking/QueueBackedTracker.scala @@ -3,8 +3,8 @@ package com.daml.platform.apiserver.services.tracking -import akka.stream.scaladsl.{Flow, Keep, Sink, SourceQueueWithComplete} -import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult} +import akka.stream.scaladsl.{Flow, Keep, Sink} +import akka.stream.{BoundedSourceQueue, Materializer, QueueOfferResult} import akka.{Done, NotUsed} import com.codahale.metrics.{Counter, Timer} import com.daml.dec.DirectExecutionContext @@ -28,7 +28,7 @@ import scala.util.{Failure, Success, Try} * @param queue The input queue to the tracking flow. */ private[services] final class QueueBackedTracker( - queue: SourceQueueWithComplete[QueueBackedTracker.QueueInput], + queue: BoundedSourceQueue[QueueBackedTracker.QueueInput], done: Future[Done], errorFactories: ErrorFactories, )(implicit loggingContext: LoggingContext) @@ -47,34 +47,23 @@ private[services] final class QueueBackedTracker( ) logger.trace("Tracking command") val trackedPromise = Promise[Either[CompletionFailure, CompletionSuccess]]() - queue - .offer(Ctx(trackedPromise, submission)) - .flatMap[Either[TrackedCompletionFailure, CompletionSuccess]] { - case QueueOfferResult.Enqueued => - trackedPromise.future.map( - _.left.map(completionFailure => QueueCompletionFailure(completionFailure)) - ) - case QueueOfferResult.Failure(t) => - toQueueSubmitFailure( - errorFactories.SubmissionQueueErrors - .failedToEnqueueCommandSubmission("Failed to enqueue")(t) - ) - case QueueOfferResult.Dropped => - toQueueSubmitFailure(errorFactories.bufferFull("The submission ingress buffer is full")) - case QueueOfferResult.QueueClosed => - toQueueSubmitFailure( - errorFactories.SubmissionQueueErrors.queueClosed("Command service queue") - ) - } - .recoverWith { - case i: IllegalStateException - if i.getMessage == "You have to wait for previous offer to be resolved to send another request" => - toQueueSubmitFailure(errorFactories.bufferFull("The submission ingress buffer is full")) - case t => - toQueueSubmitFailure( - errorFactories.SubmissionQueueErrors.failedToEnqueueCommandSubmission("Failed")(t) - ) - } + queue.offer(Ctx(trackedPromise, submission)) match { + case QueueOfferResult.Enqueued => + trackedPromise.future.map( + _.left.map(completionFailure => QueueCompletionFailure(completionFailure)) + ) + case QueueOfferResult.Failure(t) => + toQueueSubmitFailure( + errorFactories.SubmissionQueueErrors + .failedToEnqueueCommandSubmission("Failed to enqueue")(t) + ) + case QueueOfferResult.Dropped => + toQueueSubmitFailure(errorFactories.bufferFull("The submission ingress buffer is full")) + case QueueOfferResult.QueueClosed => + toQueueSubmitFailure( + errorFactories.SubmissionQueueErrors.queueClosed("Command service queue") + ) + } } private def toQueueSubmitFailure( @@ -86,7 +75,6 @@ private[services] final class QueueBackedTracker( override def close(): Unit = { logger.debug("Shutting down tracking component.") queue.complete() - Await.result(queue.watchCompletion(), 30.seconds) Await.result(done, 30.seconds) () } @@ -113,7 +101,6 @@ private[services] object QueueBackedTracker { val ((queue, mat), done) = InstrumentedSource .queue[QueueInput]( inputBufferSize, - OverflowStrategy.dropNew, capacityCounter, lengthCounter, delayTimer, diff --git a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/tracking/QueueBackedTrackerSpec.scala b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/tracking/QueueBackedTrackerSpec.scala index 5157f970c3af..30817469b3da 100644 --- a/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/tracking/QueueBackedTrackerSpec.scala +++ b/ledger/participant-integration-api/src/test/suite/scala/platform/apiserver/services/tracking/QueueBackedTrackerSpec.scala @@ -3,10 +3,10 @@ package com.daml.platform.apiserver.services.tracking -import akka.stream.scaladsl.{Keep, Source, SourceQueueWithComplete} +import akka.stream.scaladsl.{Keep, Source} import akka.stream.testkit.TestSubscriber import akka.stream.testkit.scaladsl.TestSink -import akka.stream.{Materializer, OverflowStrategy} +import akka.stream.{BoundedSourceQueue, Materializer} import akka.{Done, NotUsed} import com.daml.grpc.RpcProtoExtractors import com.daml.ledger.api.testing.utils.{AkkaBeforeAndAfterAll, TestingException} @@ -38,7 +38,7 @@ class QueueBackedTrackerSpec private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting private var consumer: TestSubscriber.Probe[NotUsed] = _ - private var queue: SourceQueueWithComplete[QueueBackedTracker.QueueInput] = _ + private var queue: BoundedSourceQueue[QueueBackedTracker.QueueInput] = _ override protected def beforeEach(): Unit = { val (q, sink) = alwaysSuccessfulQueue(bufferSize = 1) @@ -164,9 +164,9 @@ object QueueBackedTrackerSpec { private def alwaysSuccessfulQueue(bufferSize: Int)(implicit materializer: Materializer - ): (SourceQueueWithComplete[QueueInput], TestSubscriber.Probe[NotUsed]) = + ): (BoundedSourceQueue[QueueInput], TestSubscriber.Probe[NotUsed]) = Source - .queue[QueueInput](bufferSize, OverflowStrategy.dropNew) + .queue[QueueInput](bufferSize) .map { in => val completion = CompletionResponse.CompletionSuccess( Completion(