From bc33b1d1d361d1c77a7fbb4ab69f318635bd28d1 Mon Sep 17 00:00:00 2001 From: "Felipe Ramos (perenecabuto)" Date: Wed, 22 Jul 2020 21:06:02 -0300 Subject: [PATCH] add support for optimistic alignment to PeriodSnapshot.accumulator --- .../PeriodSnapshotAccumulatorSpec.scala | 32 +++++++++++++++++++ .../scala/kamon/metric/PeriodSnapshot.scala | 21 +++++++++--- 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/core/kamon-core-tests/src/test/scala/kamon/metric/PeriodSnapshotAccumulatorSpec.scala b/core/kamon-core-tests/src/test/scala/kamon/metric/PeriodSnapshotAccumulatorSpec.scala index f671b1b42..3dfd5f88b 100644 --- a/core/kamon-core-tests/src/test/scala/kamon/metric/PeriodSnapshotAccumulatorSpec.scala +++ b/core/kamon-core-tests/src/test/scala/kamon/metric/PeriodSnapshotAccumulatorSpec.scala @@ -93,7 +93,34 @@ class PeriodSnapshotAccumulatorSpec extends WordSpec with Reconfigure with Instr accumulator.add(fiveSecondsSeven) shouldBe empty // second 0:35 } + "not align snapshot when optimistic tick alignment is false" in { + // When the kamon.metric.optimistic-tick-alignment is false + // If accumulating over 11 seconds, the snapshots should be generated at 00:00:11, 00:00:22, 00:00:33 and so on. + // Thus the snapshot next tick is never aligned + applyConfig("kamon.metric.optimistic-tick-alignment = no") + + val accumulator = newAccumulator(11, 0) + // as the first add snapshot.to determines the first ticker use zero second + // to send snapshot at the seconds that are multiples of the duration + accumulator.add(zeroSecond) shouldBe empty // second 0:0 + accumulator.add(fiveSecondsOne) shouldBe empty // second 0:5 + accumulator.add(fiveSecondsTwo) shouldBe empty // second 0:10 + + val s15 = accumulator.add(fiveSecondsThree).value // second 0:15 + s15.from shouldBe(zeroSecond.from) + s15.to shouldBe(fiveSecondsThree.to) + + accumulator.add(fiveSecondsFour) shouldBe empty // second 0:20 + val s25 = accumulator.add(fiveSecondsFive).value // second 0:25 + s25.from shouldBe(fiveSecondsFour.from) + s25.to shouldBe(fiveSecondsFive.to) + + accumulator.add(fiveSecondsSix) shouldBe empty // second 0:30 + } + "do best effort to align when snapshots themselves are not aligned" in { + applyConfig("kamon.metric.optimistic-tick-alignment = yes") + val accumulator = newAccumulator(30, 0) accumulator.add(tenSecondsOne) shouldBe empty // second 0:13 accumulator.add(tenSecondsTwo) shouldBe empty // second 0:23 @@ -139,6 +166,8 @@ class PeriodSnapshotAccumulatorSpec extends WordSpec with Reconfigure with Instr } "produce a snapshot when enough data has been accumulated" in { + applyConfig("kamon.metric.optimistic-tick-alignment = yes") + val accumulator = newAccumulator(15, 1) accumulator.add(fiveSecondsOne) shouldBe empty accumulator.add(fiveSecondsTwo) shouldBe empty @@ -167,6 +196,8 @@ class PeriodSnapshotAccumulatorSpec extends WordSpec with Reconfigure with Instr val alignedZeroTime = Clock.nextAlignedInstant(Kamon.clock().instant(), Duration.ofSeconds(60)).minusSeconds(60) val unAlignedZeroTime = alignedZeroTime.plusSeconds(3) + val zeroSecond = createPeriodSnapshot(alignedZeroTime, alignedZeroTime, 13) + // Aligned snapshots, every 5 seconds from second 00. val fiveSecondsOne = createPeriodSnapshot(alignedZeroTime, alignedZeroTime.plusSeconds(5), 22) val fiveSecondsTwo = createPeriodSnapshot(alignedZeroTime.plusSeconds(5), alignedZeroTime.plusSeconds(10), 33) @@ -221,6 +252,7 @@ class PeriodSnapshotAccumulatorSpec extends WordSpec with Reconfigure with Instr ) override protected def beforeAll(): Unit = { + applyConfig("kamon.metric.optimistic-tick-alignment = yes") applyConfig("kamon.metric.tick-interval = 10 seconds") } } diff --git a/core/kamon-core/src/main/scala/kamon/metric/PeriodSnapshot.scala b/core/kamon-core/src/main/scala/kamon/metric/PeriodSnapshot.scala index d4b4d0622..6af92b8f2 100644 --- a/core/kamon-core/src/main/scala/kamon/metric/PeriodSnapshot.scala +++ b/core/kamon-core/src/main/scala/kamon/metric/PeriodSnapshot.scala @@ -97,10 +97,10 @@ object PeriodSnapshot { private var _accumulatingFrom: Option[Instant] = None def add(periodSnapshot: PeriodSnapshot): Option[PeriodSnapshot] = { - // Initialize the next tick based on incoming snapshots. - if(_nextTick == Instant.EPOCH) - _nextTick = Clock.nextAlignedInstant(periodSnapshot.to, period) + if (_nextTick == Instant.EPOCH) { + _nextTick = nextInstant(periodSnapshot.to) + } // short-circuit if there is no need to accumulate (e.g. when metrics tick-interval is the same as duration or the // snapshots have a longer period than the duration). @@ -116,10 +116,9 @@ object PeriodSnapshot { for(from <- _accumulatingFrom if isAroundNextTick(periodSnapshot.to)) yield { val accumulatedPeriodSnapshot = buildPeriodSnapshot(from, periodSnapshot.to, resetState = true) - _nextTick = Clock.nextAlignedInstant(_nextTick, period) + _nextTick = nextInstant(_nextTick) _accumulatingFrom = None clearAccumulatedData() - accumulatedPeriodSnapshot } } @@ -133,6 +132,18 @@ object PeriodSnapshot { Duration.between(instant, _nextTick.minus(margin)).toMillis() <= 0 } + private def nextInstant(from: Instant): Instant = { + if (isOptimisticAlignmentEnabled()) { + Clock.nextAlignedInstant(from, period) + } else { + Instant.ofEpochMilli(from.toEpochMilli + period.toMillis) + } + } + + private def isOptimisticAlignmentEnabled(): Boolean = { + Kamon.config().getBoolean("kamon.metric.optimistic-tick-alignment") + } + private def isSameDurationAsTickInterval(): Boolean = { Kamon.config().getDuration("kamon.metric.tick-interval") == period }