From 02ece6c57d980117f8b1477ca37f82a75fcdcf0a Mon Sep 17 00:00:00 2001 From: brharrington Date: Fri, 23 Sep 2022 11:17:30 -0500 Subject: [PATCH] optimize result tags computation (#1469) A long time ago the tag set was made deterministic based on the query expression where it only includes tags with an exact match or that are listed in the group by. As a result, it isn't necessary to compute the intersection of dimensions while aggregating the matching time series. --- .../atlas/core/db/AggregateCollector.scala | 6 +- .../atlas/core/db/MemoryDatabase.scala | 15 ++-- .../atlas/core/db/TimeSeriesBuffer.scala | 38 +++------- .../core/db/AggregateCollectorSuite.scala | 2 +- .../atlas/core/db/TimeSeriesBufferSuite.scala | 72 ++----------------- 5 files changed, 27 insertions(+), 106 deletions(-) diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/db/AggregateCollector.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/db/AggregateCollector.scala index 9a3227169..e19205ff7 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/db/AggregateCollector.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/db/AggregateCollector.scala @@ -123,11 +123,11 @@ abstract class SimpleAggregateCollector extends AggregateCollector { blocks.foreach { b => if (valueMask != null) { - val v = buffer.aggrBlock(tags, b, aggr, ConsolidationFunction.Sum, multiple, op) + val v = buffer.aggrBlock(b, aggr, ConsolidationFunction.Sum, multiple, op) buffer.valueMask(valueMask, b, multiple) valueCount += v } else { - val v = buffer.aggrBlock(tags, b, aggr, cf, multiple, op) + val v = buffer.aggrBlock(b, aggr, cf, multiple, op) valueCount += v } } @@ -286,7 +286,7 @@ class AllAggregateCollector extends LimitedAggregateCollector { } blocks.foreach { b => - val v = buffer.aggrBlock(tags, b, aggr, cf, multiple, op) + val v = buffer.aggrBlock(b, aggr, cf, multiple, op) valueCount += v } diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/db/MemoryDatabase.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/db/MemoryDatabase.scala index 61ec5d14c..25248ada1 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/db/MemoryDatabase.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/db/MemoryDatabase.scala @@ -172,13 +172,7 @@ class MemoryDatabase(registry: Registry, config: Config) extends Database { val bufEnd = bufStart + cfStepLength * cfStep - cfStep def newBuffer(tags: Map[String, String]): TimeSeriesBuffer = { - val resultTags = expr match { - case _: DataExpr.All => tags - case _ => - val resultKeys = Query.exactKeys(expr.query) ++ expr.finalGrouping - tags.filter(t => resultKeys.contains(t._1)) - } - TimeSeriesBuffer(resultTags, cfStep, bufStart, bufEnd) + TimeSeriesBuffer(tags, cfStep, bufStart, bufEnd) } index.findItems(query).foreach { item => @@ -199,9 +193,14 @@ class MemoryDatabase(registry: Registry, config: Config) extends Database { queryInputDatapoints.increment(stats.inputDatapoints) queryOutputDatapoints.increment(stats.outputDatapoints) + val resultKeys = Query.exactKeys(expr.query) ++ expr.finalGrouping val vs = collector.result .map { t => - DataExpr.withDefaultLabel(expr, t) + val resultTags = expr match { + case _: DataExpr.All => t.tags + case _ => t.tags.filter(t => resultKeys.contains(t._1)) + } + DataExpr.withDefaultLabel(expr, t.withTags(resultTags)) } .sortWith { _.label < _.label } finalValues(context, expr, vs) diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/db/TimeSeriesBuffer.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/db/TimeSeriesBuffer.scala index 50b2d2a5a..023e3b42d 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/db/TimeSeriesBuffer.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/db/TimeSeriesBuffer.scala @@ -129,7 +129,7 @@ object TimeSeriesBuffer { /** * Mutable buffer for efficiently manipulating metric data. */ -final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeSeq) +final class TimeSeriesBuffer(val tags: Map[String, String], val data: ArrayTimeSeq) extends TimeSeries with TimeSeq with LazyTaggedItem { @@ -167,24 +167,14 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS new TimeSeriesBuffer(tags, new ArrayTimeSeq(data.dsType, start, step, values.clone)) } - /** - * Compute the new tags for the aggregate buffer. The tags are the - * intersection of tag values. - */ - private def aggrTags(t: Map[String, String]): Map[String, String] = { - tags.toSet.intersect(t.toSet).toMap - } - /** Aggregate the data from the block into this buffer. */ private[db] def aggrBlock( - blkTags: Map[String, String], block: Block, aggr: Int, cf: ConsolidationFunction, multiple: Int, op: (Double, Double) => Double ): Int = { - tags = aggrTags(blkTags) val s = start / step val e = values.length + s - 1 val bs = block.start / step @@ -260,11 +250,10 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS /** * Add the corresponding positions of the two buffers. The buffers must have - * the same settings. The tags for the new buffer will be the intersection. + * the same settings. */ def add(ts: TimeSeriesBuffer): Unit = { val nts = ts.normalize(step, start, values.length) - tags = aggrTags(nts.tags) val length = scala.math.min(values.length, nts.values.length) var pos = 0 while (pos < length) { @@ -273,8 +262,8 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS } } - def add(tags: Map[String, String], block: Block): Int = { - aggrBlock(tags, block, Block.Sum, ConsolidationFunction.Sum, 1, Math.addNaN) + def add(block: Block): Int = { + aggrBlock(block, Block.Sum, ConsolidationFunction.Sum, 1, Math.addNaN) } /** @@ -295,7 +284,6 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS */ def subtract(ts: TimeSeriesBuffer): Unit = { val nts = ts.normalize(step, start, values.length) - tags = aggrTags(nts.tags) val length = scala.math.min(values.length, nts.values.length) var pos = 0 while (pos < length) { @@ -322,7 +310,6 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS */ def multiply(ts: TimeSeriesBuffer): Unit = { val nts = ts.normalize(step, start, values.length) - tags = aggrTags(nts.tags) val length = scala.math.min(values.length, nts.values.length) var pos = 0 while (pos < length) { @@ -349,7 +336,6 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS */ def divide(ts: TimeSeriesBuffer): Unit = { val nts = ts.normalize(step, start, values.length) - tags = aggrTags(nts.tags) val length = scala.math.min(values.length, nts.values.length) var pos = 0 while (pos < length) { @@ -376,7 +362,6 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS */ def max(ts: TimeSeriesBuffer): Unit = { val nts = ts.normalize(step, start, values.length) - tags = aggrTags(nts.tags) val length = scala.math.min(values.length, nts.values.length) var pos = 0 while (pos < length) { @@ -387,8 +372,8 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS } } - def max(tags: Map[String, String], block: Block): Int = { - aggrBlock(tags, block, Block.Max, ConsolidationFunction.Sum, 1, Math.maxNaN) + def max(block: Block): Int = { + aggrBlock(block, Block.Max, ConsolidationFunction.Sum, 1, Math.maxNaN) } /** @@ -398,7 +383,6 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS */ def min(ts: TimeSeriesBuffer): Unit = { val nts = ts.normalize(step, start, values.length) - tags = aggrTags(nts.tags) val length = scala.math.min(values.length, nts.values.length) var pos = 0 while (pos < length) { @@ -409,8 +393,8 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS } } - def min(tags: Map[String, String], block: Block): Int = { - aggrBlock(tags, block, Block.Min, ConsolidationFunction.Sum, 1, Math.minNaN) + def min(block: Block): Int = { + aggrBlock(block, Block.Min, ConsolidationFunction.Sum, 1, Math.minNaN) } /** @@ -431,7 +415,6 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS */ def count(ts: TimeSeriesBuffer): Unit = { val nts = ts.normalize(step, start, values.length) - tags = aggrTags(nts.tags) val length = scala.math.min(values.length, nts.values.length) var pos = 0 while (pos < length) { @@ -442,8 +425,8 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS } } - def count(tags: Map[String, String], block: Block): Int = { - aggrBlock(tags, block, Block.Count, ConsolidationFunction.Sum, 1, Math.addNaN) + def count(block: Block): Int = { + aggrBlock(block, Block.Count, ConsolidationFunction.Sum, 1, Math.addNaN) } /** @@ -452,7 +435,6 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS def merge(ts: TimeSeriesBuffer): Unit = { require(step == ts.step, "step sizes must be the same") require(start == ts.start, "start times must be the same") - tags = aggrTags(ts.tags) val length = math.min(values.length, ts.values.length) var i = 0 while (i < length) { diff --git a/atlas-core/src/test/scala/com/netflix/atlas/core/db/AggregateCollectorSuite.scala b/atlas-core/src/test/scala/com/netflix/atlas/core/db/AggregateCollectorSuite.scala index 194d1e669..b530dac37 100644 --- a/atlas-core/src/test/scala/com/netflix/atlas/core/db/AggregateCollectorSuite.scala +++ b/atlas-core/src/test/scala/com/netflix/atlas/core/db/AggregateCollectorSuite.scala @@ -149,7 +149,7 @@ class AggregateCollectorSuite extends FunSuite { newTaggedBuffer(Map("a" -> "4", "b" -> "2", "c" -> "7"), 5.0) ) val expected = List( - newTaggedBuffer(Map("b" -> "2"), 9.0), + newTaggedBuffer(Map("a" -> "1", "b" -> "2"), 9.0), newTaggedBuffer(Map("a" -> "3", "b" -> "3"), 2.0) ) val by = DataExpr.GroupBy(DataExpr.Sum(Query.False), List("b")) diff --git a/atlas-core/src/test/scala/com/netflix/atlas/core/db/TimeSeriesBufferSuite.scala b/atlas-core/src/test/scala/com/netflix/atlas/core/db/TimeSeriesBufferSuite.scala index 7bf5fef8b..7d643b3a5 100644 --- a/atlas-core/src/test/scala/com/netflix/atlas/core/db/TimeSeriesBufferSuite.scala +++ b/atlas-core/src/test/scala/com/netflix/atlas/core/db/TimeSeriesBufferSuite.scala @@ -71,7 +71,7 @@ class TimeSeriesBufferSuite extends FunSuite { val buffer = TimeSeriesBuffer(tags, step, 1 * step, 19 * step) blocks.foreach { b => - buffer.add(tags, b) + buffer.add(b) } val m = buffer assertEquals(m.step, step) @@ -93,7 +93,7 @@ class TimeSeriesBufferSuite extends FunSuite { val buffer = TimeSeriesBuffer(tags, 6 * step, step, 18 * step) blocks.foreach { b => - buffer.aggrBlock(tags, b, Block.Sum, ConsolidationFunction.Max, 6, Math.addNaN) + buffer.aggrBlock(b, Block.Sum, ConsolidationFunction.Max, 6, Math.addNaN) } val m = buffer assertEquals(m.step, 6 * step) @@ -115,7 +115,7 @@ class TimeSeriesBufferSuite extends FunSuite { val consol = multiple * step val buffer = TimeSeriesBuffer(tags, consol, consol, 20 * consol) blocks.foreach { b => - buffer.aggrBlock(tags, b, Block.Sum, ConsolidationFunction.Max, multiple, Math.addNaN) + buffer.aggrBlock(b, Block.Sum, ConsolidationFunction.Max, multiple, Math.addNaN) } val m = buffer assertEquals(m.step, consol) @@ -123,75 +123,15 @@ class TimeSeriesBufferSuite extends FunSuite { assert(m.values.forall(_ == 4.0)) } - test("cf with start".ignore) { - val tags = emptyTags - val step = 60000L - val block = ArrayBlock(0L, 60) - (8 until 60).foreach { i => - block.buffer(i) = 1.0 - } - - val buffer = TimeSeriesBuffer(tags, 6 * step, step, 60 * step) - buffer.aggrBlock(tags, block, Block.Sum, ConsolidationFunction.Avg, 6, Math.addNaN) - val m = buffer - println(m) - } - - val pairs = List( - (ConsolidationFunction.Avg -> Math.addNaN _), - (ConsolidationFunction.Max -> Math.maxNaN _), - (ConsolidationFunction.Min -> Math.minNaN _) - ) - - /*pairs.foreach { - case (name, af) => - test(s"$name: consolidate then aggregate === aggregate then consolidate") { - val cf = name - val tags = emptyTags[String, String] - val step = 60000L - val blocks = (0 until 1000).map(_ => newBlock(0, 60)) - - val afFirst = TimeSeriesBuffer(tags, step, 0L, 60 * step) - val cfFirst = TimeSeriesBuffer(tags, 6 * step, 0L, 60 * step - 1) - blocks.foreach { b => - afFirst.aggrBlock(tags, b, Block.Sum, cf, 1, af) - cfFirst.aggrBlock(tags, b, Block.Sum, cf, 6, af) - } - - val cfSecond = afFirst.consolidate(6, cf) - (0 until cfFirst.values.length).foreach { i => - assertEquals(cfSecond.values(i), (cfFirst.values(i) +- 1e-9), s"position $i") - } - } - - test(s"$name with NaN: consolidate then aggregate === aggregate then consolidate") { - val cf = name - val tags = emptyTags[String, String] - val step = 60000L - val blocks = (0 until 1000).map(_ => newBlockWithNaN(0, 60)) - - val afFirst = TimeSeriesBuffer(tags, step, 0L, 60 * step) - val cfFirst = TimeSeriesBuffer(tags, 6 * step, 0L, 60 * step - 1) - blocks.foreach { b => - afFirst.aggrBlock(tags, b, Block.Sum, cf, 1, af) - cfFirst.aggrBlock(tags, b, Block.Sum, cf, 6, af) - } - - val cfSecond = afFirst.consolidate(6, cf) - (0 until cfFirst.values.length).foreach { i => - assertEquals(cfSecond.values(i), (cfFirst.values(i) +- 1e-9), s"position $i") - } - } - }*/ - test("aggregate tags") { + // No longer done, it will always use the tags from the initial buffer val common = Map("a" -> "b", "c" -> "d") val t1 = common + ("c" -> "e") val t2 = common + ("z" -> "y") val b1 = TimeSeriesBuffer(t1, 60000, 0, Array.fill(1)(0.0)) val b2 = TimeSeriesBuffer(t2, 60000, 0, Array.fill(1)(0.0)) b1.add(b2) - assertEquals(b1.tags, Map("a" -> "b")) + assertEquals(b1.tags, t1) } test("add buffer") { @@ -548,7 +488,7 @@ class TimeSeriesBufferSuite extends FunSuite { val end = bufStart + step * 12 val buffer = TimeSeriesBuffer(emptyTags, step, bufStart, end) - buffer.aggrBlock(emptyTags, block, Block.Sum, ConsolidationFunction.Avg, 5, Math.addNaN) + buffer.aggrBlock(block, Block.Sum, ConsolidationFunction.Avg, 5, Math.addNaN) buffer.values.foreach { v => assert(v.isNaN || v <= 0.0) }