Skip to content

Commit

Permalink
Make the InstrumentedSource.queue use the BoundedSourceQueue
Browse files Browse the repository at this point in the history
CHANGELOG_BEGIN

- [Integration Kit] InstrumentedSource.queue.offer no longer returns a Future

CHANGELOG_END
  • Loading branch information
hubert-da committed Nov 22, 2021
1 parent ab520fb commit 61e12a9
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,37 @@

package com.daml.metrics

import akka.Done
import akka.stream.scaladsl.{Source, SourceQueueWithComplete}
import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult}
import akka.stream.scaladsl.Source
import akka.stream.{BoundedSourceQueue, Materializer, OverflowStrategy, QueueOfferResult}
import com.codahale.metrics.{Counter, Timer}
import com.daml.dec.DirectExecutionContext

import scala.concurrent.Future

object InstrumentedSource {

final class QueueWithComplete[T](
delegate: SourceQueueWithComplete[(Timer.Context, T)],
final class InstrumentedBoundedSourceQueue[T](
delegate: BoundedSourceQueue[(Timer.Context, T)],
bufferSize: Int,
capacityCounter: Counter,
lengthCounter: Counter,
delayTimer: Timer,
) extends SourceQueueWithComplete[T] {
) extends BoundedSourceQueue[T] {

override def complete(): Unit = delegate.complete()
override def complete(): Unit = {
delegate.complete()
capacityCounter.dec(bufferSize.toLong)
}

override def fail(ex: Throwable): Unit = delegate.fail(ex)

override def watchCompletion(): Future[Done] = delegate.watchCompletion()

override def offer(elem: T): Future[QueueOfferResult] = {
override def offer(elem: T): QueueOfferResult = {
val result = delegate.offer(
delayTimer.time() -> elem
)
// Use the `DirectExecutionContext` to ensure that the
// counter is updated as closely as possible to the
// update of the queue, so to offer the most consistent
// reading possible via the counter
result.foreach {
result match {
case QueueOfferResult.Enqueued =>
lengthCounter.inc()

case _ => // do nothing
}(DirectExecutionContext)
}
result
}
}
Expand All @@ -63,26 +58,24 @@ object InstrumentedSource {
*/
def queue[T](
bufferSize: Int,
overflowStrategy: OverflowStrategy,
capacityCounter: Counter,
lengthCounter: Counter,
delayTimer: Timer,
)(implicit
materializer: Materializer
): Source[T, QueueWithComplete[T]] = {
val (queue, source) =
Source.queue[(Timer.Context, T)](bufferSize, overflowStrategy).preMaterialize()
): Source[T, BoundedSourceQueue[T]] = {
val (boundedQueue, source) =
Source.queue[(Timer.Context, T)](bufferSize).preMaterialize()

val instrumentedQueue =
new QueueWithComplete[T](queue, lengthCounter, delayTimer)
// Using `map` and not `wireTap` because the latter is skipped on backpressure.

new InstrumentedBoundedSourceQueue[T](
boundedQueue,
bufferSize,
capacityCounter,
lengthCounter,
delayTimer,
)
capacityCounter.inc(bufferSize.toLong)
instrumentedQueue
.watchCompletion()
.andThen { case _ =>
capacityCounter.dec(bufferSize.toLong)
}(DirectExecutionContext)

source.mapMaterializedValue(_ => instrumentedQueue).map { case (timingContext, item) =>
timingContext.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import scala.util.chaining._
import java.util.concurrent.atomic.AtomicLong

import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{OverflowStrategy, QueueOfferResult}
import akka.stream.QueueOfferResult
import com.codahale.metrics.{Counter, Timer}
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.metrics.InstrumentedSourceSpec.SamplingCounter
Expand All @@ -21,52 +21,15 @@ final class InstrumentedSourceSpec extends AsyncFlatSpec with Matchers with Akka

behavior of "InstrumentedSource.queue"

it should "correctly enqueue and track the buffer saturation" in {

val bufferSize = 500

val capacityCounter = new Counter()
val maxBuffered = new InstrumentedSourceSpec.MaxValueCounter()
val delayTimer = new Timer()

val (source, sink) =
InstrumentedSource
.queue[Int](
bufferSize,
OverflowStrategy.backpressure,
capacityCounter,
maxBuffered,
delayTimer,
)
.toMat(Sink.seq)(Keep.both)
.run()

// The values in the queue are not relevant, hence the random generation
val input = Seq.fill(bufferSize)(util.Random.nextInt())

for {
results <- Future.sequence(input.map(source.offer))
_ = capacityCounter.getCount shouldEqual bufferSize
_ = source.complete()
output <- sink
} yield {
all(results) shouldBe QueueOfferResult.Enqueued
output shouldEqual input
maxBuffered.getCount shouldEqual bufferSize
capacityCounter.getCount shouldEqual 0
maxBuffered.decrements.get shouldEqual bufferSize
}
}

it should "correctly measure queue delay" in {
it should "correctly enqueue and measure queue delay" in {
val capacityCounter = new Counter()
val maxBuffered = new InstrumentedSourceSpec.MaxValueCounter()
val delayTimer = new Timer()
val bufferSize = 2

val (source, sink) =
InstrumentedSource
.queue[Int](16, OverflowStrategy.backpressure, capacityCounter, maxBuffered, delayTimer)
.queue[Int](bufferSize, capacityCounter, maxBuffered, delayTimer)
.mapAsync(1) { x =>
akka.pattern.after(5.millis, system.scheduler)(Future(x))
}
Expand All @@ -75,19 +38,17 @@ final class InstrumentedSourceSpec extends AsyncFlatSpec with Matchers with Akka

val input = Seq.fill(bufferSize)(util.Random.nextInt())

for {
result <- Future.sequence(input.map(source.offer))
_ = source.complete()
output <- sink
} yield {
val result = input.map(source.offer)
source.complete()
sink.map { output =>
all(result) shouldBe QueueOfferResult.Enqueued
output shouldEqual input
delayTimer.getCount shouldEqual bufferSize
delayTimer.getSnapshot.getMax should be >= 5.millis.toNanos
}
}

it should "track the buffer saturation correctly when dropping items" in {
it should "track the buffer saturation correctly" in {

val bufferSize = 500

Expand All @@ -107,7 +68,7 @@ final class InstrumentedSourceSpec extends AsyncFlatSpec with Matchers with Akka

val (source, termination) =
InstrumentedSource
.queue[Int](bufferSize, OverflowStrategy.dropNew, capacityCounter, maxBuffered, delayTimer)
.queue[Int](bufferSize, capacityCounter, maxBuffered, delayTimer)
.mapAsync(1)(_ => stop.future) // Block until completed to overflow queue.
.watchTermination()(Keep.both)
.toMat(Sink.ignore)(Keep.left)
Expand All @@ -118,28 +79,25 @@ final class InstrumentedSourceSpec extends AsyncFlatSpec with Matchers with Akka
val inputSize = bufferSize * 2
val input = Seq.fill(inputSize)(util.Random.nextInt())

for {
results <- Future.sequence(input.map(source.offer))
_ = capacityCounter.getCount shouldEqual bufferSize
_ = stop.success(())
_ = source.complete()
_ <- termination
} yield {
val enqueued = results.count {
case QueueOfferResult.Enqueued => true
case _ => false
}
val dropped = results.count {
case QueueOfferResult.Dropped => true
case _ => false
}
val results = input.map(source.offer)
capacityCounter.getCount shouldEqual bufferSize
stop.success(())
source.complete()
val enqueued = results.count {
case QueueOfferResult.Enqueued => true
case _ => false
}
val dropped = results.count {
case QueueOfferResult.Dropped => true
case _ => false
}
termination.map { _ =>
inputSize shouldEqual (enqueued + dropped)
assert(enqueued >= bufferSize)
assert(dropped <= bufferSize)
assert(maxBuffered.getCount >= lowAcceptanceThreshold)
assert(maxBuffered.getCount <= highAcceptanceThreshold)
capacityCounter.getCount shouldEqual 0

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

package com.daml.platform.apiserver.services.tracking

import akka.stream.scaladsl.{Flow, Keep, Sink, SourceQueueWithComplete}
import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult}
import akka.stream.scaladsl.{Flow, Keep, Sink}
import akka.stream.{BoundedSourceQueue, Materializer, QueueOfferResult}
import akka.{Done, NotUsed}
import com.codahale.metrics.{Counter, Timer}
import com.daml.dec.DirectExecutionContext
Expand All @@ -28,7 +28,7 @@ import scala.util.{Failure, Success, Try}
* @param queue The input queue to the tracking flow.
*/
private[services] final class QueueBackedTracker(
queue: SourceQueueWithComplete[QueueBackedTracker.QueueInput],
queue: BoundedSourceQueue[QueueBackedTracker.QueueInput],
done: Future[Done],
errorFactories: ErrorFactories,
)(implicit loggingContext: LoggingContext)
Expand All @@ -47,34 +47,23 @@ private[services] final class QueueBackedTracker(
)
logger.trace("Tracking command")
val trackedPromise = Promise[Either[CompletionFailure, CompletionSuccess]]()
queue
.offer(Ctx(trackedPromise, submission))
.flatMap[Either[TrackedCompletionFailure, CompletionSuccess]] {
case QueueOfferResult.Enqueued =>
trackedPromise.future.map(
_.left.map(completionFailure => QueueCompletionFailure(completionFailure))
)
case QueueOfferResult.Failure(t) =>
toQueueSubmitFailure(
errorFactories.SubmissionQueueErrors
.failedToEnqueueCommandSubmission("Failed to enqueue")(t)
)
case QueueOfferResult.Dropped =>
toQueueSubmitFailure(errorFactories.bufferFull("The submission ingress buffer is full"))
case QueueOfferResult.QueueClosed =>
toQueueSubmitFailure(
errorFactories.SubmissionQueueErrors.queueClosed("Command service queue")
)
}
.recoverWith {
case i: IllegalStateException
if i.getMessage == "You have to wait for previous offer to be resolved to send another request" =>
toQueueSubmitFailure(errorFactories.bufferFull("The submission ingress buffer is full"))
case t =>
toQueueSubmitFailure(
errorFactories.SubmissionQueueErrors.failedToEnqueueCommandSubmission("Failed")(t)
)
}
queue.offer(Ctx(trackedPromise, submission)) match {
case QueueOfferResult.Enqueued =>
trackedPromise.future.map(
_.left.map(completionFailure => QueueCompletionFailure(completionFailure))
)
case QueueOfferResult.Failure(t) =>
toQueueSubmitFailure(
errorFactories.SubmissionQueueErrors
.failedToEnqueueCommandSubmission("Failed to enqueue")(t)
)
case QueueOfferResult.Dropped =>
toQueueSubmitFailure(errorFactories.bufferFull("The submission ingress buffer is full"))
case QueueOfferResult.QueueClosed =>
toQueueSubmitFailure(
errorFactories.SubmissionQueueErrors.queueClosed("Command service queue")
)
}
}

private def toQueueSubmitFailure(
Expand All @@ -86,7 +75,6 @@ private[services] final class QueueBackedTracker(
override def close(): Unit = {
logger.debug("Shutting down tracking component.")
queue.complete()
Await.result(queue.watchCompletion(), 30.seconds)
Await.result(done, 30.seconds)
()
}
Expand All @@ -113,7 +101,6 @@ private[services] object QueueBackedTracker {
val ((queue, mat), done) = InstrumentedSource
.queue[QueueInput](
inputBufferSize,
OverflowStrategy.dropNew,
capacityCounter,
lengthCounter,
delayTimer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

package com.daml.platform.apiserver.services.tracking

import akka.stream.scaladsl.{Keep, Source, SourceQueueWithComplete}
import akka.stream.scaladsl.{Keep, Source}
import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{Materializer, OverflowStrategy}
import akka.stream.{BoundedSourceQueue, Materializer}
import akka.{Done, NotUsed}
import com.daml.grpc.RpcProtoExtractors
import com.daml.ledger.api.testing.utils.{AkkaBeforeAndAfterAll, TestingException}
Expand Down Expand Up @@ -38,7 +38,7 @@ class QueueBackedTrackerSpec
private implicit val loggingContext: LoggingContext = LoggingContext.ForTesting

private var consumer: TestSubscriber.Probe[NotUsed] = _
private var queue: SourceQueueWithComplete[QueueBackedTracker.QueueInput] = _
private var queue: BoundedSourceQueue[QueueBackedTracker.QueueInput] = _

override protected def beforeEach(): Unit = {
val (q, sink) = alwaysSuccessfulQueue(bufferSize = 1)
Expand Down Expand Up @@ -164,9 +164,9 @@ object QueueBackedTrackerSpec {

private def alwaysSuccessfulQueue(bufferSize: Int)(implicit
materializer: Materializer
): (SourceQueueWithComplete[QueueInput], TestSubscriber.Probe[NotUsed]) =
): (BoundedSourceQueue[QueueInput], TestSubscriber.Probe[NotUsed]) =
Source
.queue[QueueInput](bufferSize, OverflowStrategy.dropNew)
.queue[QueueInput](bufferSize)
.map { in =>
val completion = CompletionResponse.CompletionSuccess(
Completion(
Expand Down

0 comments on commit 61e12a9

Please sign in to comment.