Skip to content

Commit

Permalink
optimize result tags computation (Netflix#1469)
Browse files Browse the repository at this point in the history
A long time ago the tag set was made deterministic
based on the query expression where it only includes
tags with an exact match or that are listed in the
group by. As a result, it isn't necessary to compute
the intersection of dimensions while aggregating the
matching time series.
  • Loading branch information
brharrington authored and manolama committed May 22, 2024
1 parent f6132ff commit 02ece6c
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,11 @@ abstract class SimpleAggregateCollector extends AggregateCollector {

blocks.foreach { b =>
if (valueMask != null) {
val v = buffer.aggrBlock(tags, b, aggr, ConsolidationFunction.Sum, multiple, op)
val v = buffer.aggrBlock(b, aggr, ConsolidationFunction.Sum, multiple, op)
buffer.valueMask(valueMask, b, multiple)
valueCount += v
} else {
val v = buffer.aggrBlock(tags, b, aggr, cf, multiple, op)
val v = buffer.aggrBlock(b, aggr, cf, multiple, op)
valueCount += v
}
}
Expand Down Expand Up @@ -286,7 +286,7 @@ class AllAggregateCollector extends LimitedAggregateCollector {
}

blocks.foreach { b =>
val v = buffer.aggrBlock(tags, b, aggr, cf, multiple, op)
val v = buffer.aggrBlock(b, aggr, cf, multiple, op)
valueCount += v
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,7 @@ class MemoryDatabase(registry: Registry, config: Config) extends Database {
val bufEnd = bufStart + cfStepLength * cfStep - cfStep

def newBuffer(tags: Map[String, String]): TimeSeriesBuffer = {
val resultTags = expr match {
case _: DataExpr.All => tags
case _ =>
val resultKeys = Query.exactKeys(expr.query) ++ expr.finalGrouping
tags.filter(t => resultKeys.contains(t._1))
}
TimeSeriesBuffer(resultTags, cfStep, bufStart, bufEnd)
TimeSeriesBuffer(tags, cfStep, bufStart, bufEnd)
}

index.findItems(query).foreach { item =>
Expand All @@ -199,9 +193,14 @@ class MemoryDatabase(registry: Registry, config: Config) extends Database {
queryInputDatapoints.increment(stats.inputDatapoints)
queryOutputDatapoints.increment(stats.outputDatapoints)

val resultKeys = Query.exactKeys(expr.query) ++ expr.finalGrouping
val vs = collector.result
.map { t =>
DataExpr.withDefaultLabel(expr, t)
val resultTags = expr match {
case _: DataExpr.All => t.tags
case _ => t.tags.filter(t => resultKeys.contains(t._1))
}
DataExpr.withDefaultLabel(expr, t.withTags(resultTags))
}
.sortWith { _.label < _.label }
finalValues(context, expr, vs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ object TimeSeriesBuffer {
/**
* Mutable buffer for efficiently manipulating metric data.
*/
final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeSeq)
final class TimeSeriesBuffer(val tags: Map[String, String], val data: ArrayTimeSeq)
extends TimeSeries
with TimeSeq
with LazyTaggedItem {
Expand Down Expand Up @@ -167,24 +167,14 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS
new TimeSeriesBuffer(tags, new ArrayTimeSeq(data.dsType, start, step, values.clone))
}

/**
* Compute the new tags for the aggregate buffer. The tags are the
* intersection of tag values.
*/
private def aggrTags(t: Map[String, String]): Map[String, String] = {
tags.toSet.intersect(t.toSet).toMap
}

/** Aggregate the data from the block into this buffer. */
private[db] def aggrBlock(
blkTags: Map[String, String],
block: Block,
aggr: Int,
cf: ConsolidationFunction,
multiple: Int,
op: (Double, Double) => Double
): Int = {
tags = aggrTags(blkTags)
val s = start / step
val e = values.length + s - 1
val bs = block.start / step
Expand Down Expand Up @@ -260,11 +250,10 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS

/**
* Add the corresponding positions of the two buffers. The buffers must have
* the same settings. The tags for the new buffer will be the intersection.
* the same settings.
*/
def add(ts: TimeSeriesBuffer): Unit = {
val nts = ts.normalize(step, start, values.length)
tags = aggrTags(nts.tags)
val length = scala.math.min(values.length, nts.values.length)
var pos = 0
while (pos < length) {
Expand All @@ -273,8 +262,8 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS
}
}

def add(tags: Map[String, String], block: Block): Int = {
aggrBlock(tags, block, Block.Sum, ConsolidationFunction.Sum, 1, Math.addNaN)
def add(block: Block): Int = {
aggrBlock(block, Block.Sum, ConsolidationFunction.Sum, 1, Math.addNaN)
}

/**
Expand All @@ -295,7 +284,6 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS
*/
def subtract(ts: TimeSeriesBuffer): Unit = {
val nts = ts.normalize(step, start, values.length)
tags = aggrTags(nts.tags)
val length = scala.math.min(values.length, nts.values.length)
var pos = 0
while (pos < length) {
Expand All @@ -322,7 +310,6 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS
*/
def multiply(ts: TimeSeriesBuffer): Unit = {
val nts = ts.normalize(step, start, values.length)
tags = aggrTags(nts.tags)
val length = scala.math.min(values.length, nts.values.length)
var pos = 0
while (pos < length) {
Expand All @@ -349,7 +336,6 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS
*/
def divide(ts: TimeSeriesBuffer): Unit = {
val nts = ts.normalize(step, start, values.length)
tags = aggrTags(nts.tags)
val length = scala.math.min(values.length, nts.values.length)
var pos = 0
while (pos < length) {
Expand All @@ -376,7 +362,6 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS
*/
def max(ts: TimeSeriesBuffer): Unit = {
val nts = ts.normalize(step, start, values.length)
tags = aggrTags(nts.tags)
val length = scala.math.min(values.length, nts.values.length)
var pos = 0
while (pos < length) {
Expand All @@ -387,8 +372,8 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS
}
}

def max(tags: Map[String, String], block: Block): Int = {
aggrBlock(tags, block, Block.Max, ConsolidationFunction.Sum, 1, Math.maxNaN)
def max(block: Block): Int = {
aggrBlock(block, Block.Max, ConsolidationFunction.Sum, 1, Math.maxNaN)
}

/**
Expand All @@ -398,7 +383,6 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS
*/
def min(ts: TimeSeriesBuffer): Unit = {
val nts = ts.normalize(step, start, values.length)
tags = aggrTags(nts.tags)
val length = scala.math.min(values.length, nts.values.length)
var pos = 0
while (pos < length) {
Expand All @@ -409,8 +393,8 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS
}
}

def min(tags: Map[String, String], block: Block): Int = {
aggrBlock(tags, block, Block.Min, ConsolidationFunction.Sum, 1, Math.minNaN)
def min(block: Block): Int = {
aggrBlock(block, Block.Min, ConsolidationFunction.Sum, 1, Math.minNaN)
}

/**
Expand All @@ -431,7 +415,6 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS
*/
def count(ts: TimeSeriesBuffer): Unit = {
val nts = ts.normalize(step, start, values.length)
tags = aggrTags(nts.tags)
val length = scala.math.min(values.length, nts.values.length)
var pos = 0
while (pos < length) {
Expand All @@ -442,8 +425,8 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS
}
}

def count(tags: Map[String, String], block: Block): Int = {
aggrBlock(tags, block, Block.Count, ConsolidationFunction.Sum, 1, Math.addNaN)
def count(block: Block): Int = {
aggrBlock(block, Block.Count, ConsolidationFunction.Sum, 1, Math.addNaN)
}

/**
Expand All @@ -452,7 +435,6 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS
def merge(ts: TimeSeriesBuffer): Unit = {
require(step == ts.step, "step sizes must be the same")
require(start == ts.start, "start times must be the same")
tags = aggrTags(ts.tags)
val length = math.min(values.length, ts.values.length)
var i = 0
while (i < length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class AggregateCollectorSuite extends FunSuite {
newTaggedBuffer(Map("a" -> "4", "b" -> "2", "c" -> "7"), 5.0)
)
val expected = List(
newTaggedBuffer(Map("b" -> "2"), 9.0),
newTaggedBuffer(Map("a" -> "1", "b" -> "2"), 9.0),
newTaggedBuffer(Map("a" -> "3", "b" -> "3"), 2.0)
)
val by = DataExpr.GroupBy(DataExpr.Sum(Query.False), List("b"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class TimeSeriesBufferSuite extends FunSuite {

val buffer = TimeSeriesBuffer(tags, step, 1 * step, 19 * step)
blocks.foreach { b =>
buffer.add(tags, b)
buffer.add(b)
}
val m = buffer
assertEquals(m.step, step)
Expand All @@ -93,7 +93,7 @@ class TimeSeriesBufferSuite extends FunSuite {

val buffer = TimeSeriesBuffer(tags, 6 * step, step, 18 * step)
blocks.foreach { b =>
buffer.aggrBlock(tags, b, Block.Sum, ConsolidationFunction.Max, 6, Math.addNaN)
buffer.aggrBlock(b, Block.Sum, ConsolidationFunction.Max, 6, Math.addNaN)
}
val m = buffer
assertEquals(m.step, 6 * step)
Expand All @@ -115,83 +115,23 @@ class TimeSeriesBufferSuite extends FunSuite {
val consol = multiple * step
val buffer = TimeSeriesBuffer(tags, consol, consol, 20 * consol)
blocks.foreach { b =>
buffer.aggrBlock(tags, b, Block.Sum, ConsolidationFunction.Max, multiple, Math.addNaN)
buffer.aggrBlock(b, Block.Sum, ConsolidationFunction.Max, multiple, Math.addNaN)
}
val m = buffer
assertEquals(m.step, consol)
assertEquals(m.start, consol)
assert(m.values.forall(_ == 4.0))
}

test("cf with start".ignore) {
val tags = emptyTags
val step = 60000L
val block = ArrayBlock(0L, 60)
(8 until 60).foreach { i =>
block.buffer(i) = 1.0
}

val buffer = TimeSeriesBuffer(tags, 6 * step, step, 60 * step)
buffer.aggrBlock(tags, block, Block.Sum, ConsolidationFunction.Avg, 6, Math.addNaN)
val m = buffer
println(m)
}

val pairs = List(
(ConsolidationFunction.Avg -> Math.addNaN _),
(ConsolidationFunction.Max -> Math.maxNaN _),
(ConsolidationFunction.Min -> Math.minNaN _)
)

/*pairs.foreach {
case (name, af) =>
test(s"$name: consolidate then aggregate === aggregate then consolidate") {
val cf = name
val tags = emptyTags[String, String]
val step = 60000L
val blocks = (0 until 1000).map(_ => newBlock(0, 60))
val afFirst = TimeSeriesBuffer(tags, step, 0L, 60 * step)
val cfFirst = TimeSeriesBuffer(tags, 6 * step, 0L, 60 * step - 1)
blocks.foreach { b =>
afFirst.aggrBlock(tags, b, Block.Sum, cf, 1, af)
cfFirst.aggrBlock(tags, b, Block.Sum, cf, 6, af)
}
val cfSecond = afFirst.consolidate(6, cf)
(0 until cfFirst.values.length).foreach { i =>
assertEquals(cfSecond.values(i), (cfFirst.values(i) +- 1e-9), s"position $i")
}
}
test(s"$name with NaN: consolidate then aggregate === aggregate then consolidate") {
val cf = name
val tags = emptyTags[String, String]
val step = 60000L
val blocks = (0 until 1000).map(_ => newBlockWithNaN(0, 60))
val afFirst = TimeSeriesBuffer(tags, step, 0L, 60 * step)
val cfFirst = TimeSeriesBuffer(tags, 6 * step, 0L, 60 * step - 1)
blocks.foreach { b =>
afFirst.aggrBlock(tags, b, Block.Sum, cf, 1, af)
cfFirst.aggrBlock(tags, b, Block.Sum, cf, 6, af)
}
val cfSecond = afFirst.consolidate(6, cf)
(0 until cfFirst.values.length).foreach { i =>
assertEquals(cfSecond.values(i), (cfFirst.values(i) +- 1e-9), s"position $i")
}
}
}*/

test("aggregate tags") {
// No longer done, it will always use the tags from the initial buffer
val common = Map("a" -> "b", "c" -> "d")
val t1 = common + ("c" -> "e")
val t2 = common + ("z" -> "y")
val b1 = TimeSeriesBuffer(t1, 60000, 0, Array.fill(1)(0.0))
val b2 = TimeSeriesBuffer(t2, 60000, 0, Array.fill(1)(0.0))
b1.add(b2)
assertEquals(b1.tags, Map("a" -> "b"))
assertEquals(b1.tags, t1)
}

test("add buffer") {
Expand Down Expand Up @@ -548,7 +488,7 @@ class TimeSeriesBufferSuite extends FunSuite {
val end = bufStart + step * 12
val buffer = TimeSeriesBuffer(emptyTags, step, bufStart, end)

buffer.aggrBlock(emptyTags, block, Block.Sum, ConsolidationFunction.Avg, 5, Math.addNaN)
buffer.aggrBlock(block, Block.Sum, ConsolidationFunction.Avg, 5, Math.addNaN)
buffer.values.foreach { v =>
assert(v.isNaN || v <= 0.0)
}
Expand Down

0 comments on commit 02ece6c

Please sign in to comment.