Skip to content

Commit

Permalink
add support for optimistic alignment to PeriodSnapshot.accumulator
Browse files Browse the repository at this point in the history
  • Loading branch information
perenecabuto committed Feb 12, 2021
1 parent cc1c821 commit e62848a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,34 @@ class PeriodSnapshotAccumulatorSpec extends WordSpec with Reconfigure with Instr
accumulator.add(fiveSecondsSeven) shouldBe empty // second 0:35
}

"not align snapshot when optimistic tick alignment is false" in {
// When the kamon.metric.optimistic-tick-alignment is false
// If accumulating over 11 seconds, the snapshots should be generated at 00:00:11, 00:00:22, 00:00:33 and so on.
// Thus the snapshot next tick is never aligned
applyConfig("kamon.metric.optimistic-tick-alignment = no")

val accumulator = newAccumulator(11, 0)
// as the first add snapshot.to determines the first ticker use zero second
// to send snapshot at the seconds that are multiples of the duration
accumulator.add(zeroSecond) shouldBe empty // second 0:0
accumulator.add(fiveSecondsOne) shouldBe empty // second 0:5
accumulator.add(fiveSecondsTwo) shouldBe empty // second 0:10

val s15 = accumulator.add(fiveSecondsThree).value // second 0:15
s15.from shouldBe(zeroSecond.from)
s15.to shouldBe(fiveSecondsThree.to)

accumulator.add(fiveSecondsFour) shouldBe empty // second 0:20
val s25 = accumulator.add(fiveSecondsFive).value // second 0:25
s25.from shouldBe(fiveSecondsFour.from)
s25.to shouldBe(fiveSecondsFive.to)

accumulator.add(fiveSecondsSix) shouldBe empty // second 0:30
}

"do best effort to align when snapshots themselves are not aligned" in {
applyConfig("kamon.metric.optimistic-tick-alignment = yes")

val accumulator = newAccumulator(30, 0)
accumulator.add(tenSecondsOne) shouldBe empty // second 0:13
accumulator.add(tenSecondsTwo) shouldBe empty // second 0:23
Expand Down Expand Up @@ -139,6 +166,8 @@ class PeriodSnapshotAccumulatorSpec extends WordSpec with Reconfigure with Instr
}

"produce a snapshot when enough data has been accumulated" in {
applyConfig("kamon.metric.optimistic-tick-alignment = yes")

val accumulator = newAccumulator(15, 1)
accumulator.add(fiveSecondsOne) shouldBe empty
accumulator.add(fiveSecondsTwo) shouldBe empty
Expand Down Expand Up @@ -167,6 +196,8 @@ class PeriodSnapshotAccumulatorSpec extends WordSpec with Reconfigure with Instr
val alignedZeroTime = Clock.nextAlignedInstant(Kamon.clock().instant(), Duration.ofSeconds(60)).minusSeconds(60)
val unAlignedZeroTime = alignedZeroTime.plusSeconds(3)

val zeroSecond = createPeriodSnapshot(alignedZeroTime, alignedZeroTime, 13)

// Aligned snapshots, every 5 seconds from second 00.
val fiveSecondsOne = createPeriodSnapshot(alignedZeroTime, alignedZeroTime.plusSeconds(5), 22)
val fiveSecondsTwo = createPeriodSnapshot(alignedZeroTime.plusSeconds(5), alignedZeroTime.plusSeconds(10), 33)
Expand Down Expand Up @@ -221,6 +252,7 @@ class PeriodSnapshotAccumulatorSpec extends WordSpec with Reconfigure with Instr
)

override protected def beforeAll(): Unit = {
applyConfig("kamon.metric.optimistic-tick-alignment = yes")
applyConfig("kamon.metric.tick-interval = 10 seconds")
}
}
21 changes: 16 additions & 5 deletions core/kamon-core/src/main/scala/kamon/metric/PeriodSnapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ object PeriodSnapshot {
private var _accumulatingFrom: Option[Instant] = None

def add(periodSnapshot: PeriodSnapshot): Option[PeriodSnapshot] = {

// Initialize the next tick based on incoming snapshots.
if(_nextTick == Instant.EPOCH)
_nextTick = Clock.nextAlignedInstant(periodSnapshot.to, period)
if (_nextTick == Instant.EPOCH) {
_nextTick = nextInstant(periodSnapshot.to)
}

// short-circuit if there is no need to accumulate (e.g. when metrics tick-interval is the same as duration or the
// snapshots have a longer period than the duration).
Expand All @@ -116,10 +116,9 @@ object PeriodSnapshot {

for(from <- _accumulatingFrom if isAroundNextTick(periodSnapshot.to)) yield {
val accumulatedPeriodSnapshot = buildPeriodSnapshot(from, periodSnapshot.to, resetState = true)
_nextTick = Clock.nextAlignedInstant(_nextTick, period)
_nextTick = nextInstant(_nextTick)
_accumulatingFrom = None
clearAccumulatedData()

accumulatedPeriodSnapshot
}
}
Expand All @@ -133,6 +132,18 @@ object PeriodSnapshot {
Duration.between(instant, _nextTick.minus(margin)).toMillis() <= 0
}

private def nextInstant(from: Instant): Instant = {
if (isOptimisticAlignmentEnabled()) {
Clock.nextAlignedInstant(from, period)
} else {
Instant.ofEpochMilli(from.toEpochMilli + period.toMillis)
}
}

private def isOptimisticAlignmentEnabled(): Boolean = {
Kamon.config().getBoolean("kamon.metric.optimistic-tick-alignment")
}

private def isSameDurationAsTickInterval(): Boolean = {
Kamon.config().getDuration("kamon.metric.tick-interval") == period
}
Expand Down

0 comments on commit e62848a

Please sign in to comment.