Skip to content

Commit

Permalink
Add Reset Transformation to reset prometheus counters
Browse files Browse the repository at this point in the history
This transformation resets an aggregated prometheus counter by emitting
a zero. This transformation fixes the display issue when an M3Aggregator
failover occurs. When the Add transformation is used to generate a cumulative
counter for prometheus, the values in the leader and follower are different
since they started counting at different times. When the failover happens, the
value might decrease. When a cumulative counter decreases, PromQL assumes the
counter reset. This leads to strange display issues, since the counter did not
actually reset. By explicitly reseting the counter each resolution period and
not accumulating a counter, PromQL can correclty graph the rate across failovers.

This does have the downside of an extra 0 datapoint per resolution period. The storage
cost is more than just the extra 0 since the extra 0 is stored 1 second after the actual
datapoint. This degrades the timestamp encoding since the timestamps are no longer at
a fixed interval. In practice we see a 3x increase in storage for these aggregated counters.
  • Loading branch information
ryanhall07 committed Oct 23, 2020
1 parent 50c2f97 commit 93210f8
Show file tree
Hide file tree
Showing 13 changed files with 374 additions and 43 deletions.
9 changes: 8 additions & 1 deletion scripts/auto-gen-helpers.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
#!/bin/bash
source "${GOPATH}/src/github.com/m3db/m3/.ci/variables.sh"

if [ -z "$GOPATH" ]; then
# assume a developer is running locally without a GOPATH
source .ci/variables.sh
else
source "${GOPATH}/src/github.com/m3db/m3/.ci/variables.sh"
fi


set -e

Expand Down
31 changes: 25 additions & 6 deletions src/aggregator/aggregator/counter_elem_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,10 +411,13 @@ func (e *CounterElem) processValueWithAggregationLock(
discardNaNValues = e.opts.DiscardNaNAggregatedValues()
)
for aggTypeIdx, aggType := range e.aggTypes {
toFlush := make([]transformation.Datapoint, 0)
value := lockedAgg.aggregation.ValueOf(aggType)
for _, transformOp := range transformations {

unaryOp, isUnaryOp := transformOp.UnaryTransform()
binaryOp, isBinaryOp := transformOp.BinaryTransform()
unaryMultiOp, isUnaryMultiOp := transformOp.UnaryMultiOutputTransform()
switch {
case isUnaryOp:
curr := transformation.Datapoint{
Expand Down Expand Up @@ -450,7 +453,16 @@ func (e *CounterElem) processValueWithAggregationLock(
}

value = res.Value

case isUnaryMultiOp:
curr := transformation.Datapoint{
TimeNanos: timeNanos,
Value: value,
}
res, others := unaryMultiOp.Evaluate(curr)
for _, o := range others {
toFlush = append(toFlush, o)
}
value = res.Value
}
}

Expand All @@ -459,11 +471,18 @@ func (e *CounterElem) processValueWithAggregationLock(
}

if !e.parsedPipeline.HasRollup {
switch e.idPrefixSuffixType {
case NoPrefixNoSuffix:
flushLocalFn(nil, e.id, nil, timeNanos, value, e.sp)
case WithPrefixWithSuffix:
flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), timeNanos, value, e.sp)
toFlush = append(toFlush, transformation.Datapoint{
TimeNanos: timeNanos,
Value: value,
})
for _, point := range toFlush {
switch e.idPrefixSuffixType {
case NoPrefixNoSuffix:
flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp)
case WithPrefixWithSuffix:
flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType),
point.TimeNanos, point.Value, e.sp)
}
}
} else {
forwardedAggregationKey, _ := e.ForwardedAggregationKey()
Expand Down
90 changes: 90 additions & 0 deletions src/aggregator/aggregator/elem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1613,6 +1613,96 @@ func TestGaugeElemConsumeCustomAggregationCustomPipeline(t *testing.T) {
require.Equal(t, 0, len(e.values))
}

func TestGaugeElemReset(t *testing.T) {
alignedstartAtNanos := []int64{
time.Unix(210, 0).UnixNano(),
time.Unix(220, 0).UnixNano(),
time.Unix(230, 0).UnixNano(),
time.Unix(240, 0).UnixNano(),
}
gaugeVals := []float64{123.0, 456.0, 589.0}
aggregationTypes := maggregation.Types{maggregation.Sum}
isEarlierThanFn := isStandardMetricEarlierThan
timestampNanosFn := standardMetricTimestampNanos
opts := NewOptions().SetDiscardNaNAggregatedValues(false)

testPipeline := applied.NewPipeline([]applied.OpUnion{
{
Type: pipeline.TransformationOpType,
Transformation: pipeline.TransformationOp{Type: transformation.Reset},
},
})

e := testGaugeElem(alignedstartAtNanos[:3], gaugeVals, aggregationTypes, testPipeline, opts)

// Consume values before an early-enough time.
localFn, localRes := testFlushLocalMetricFn()
forwardFn, forwardRes := testFlushForwardedMetricFn()
onForwardedFlushedFn, onForwardedFlushedRes := testOnForwardedFlushedFn()
require.False(t, e.Consume(0, isEarlierThanFn, timestampNanosFn, localFn, forwardFn, onForwardedFlushedFn))
require.Equal(t, 0, len(*onForwardedFlushedRes))
require.Equal(t, 0, len(*localRes))
require.Equal(t, 0, len(*forwardRes))
require.Equal(t, 3, len(e.values))

// Consume one value.
localFn, localRes = testFlushLocalMetricFn()
forwardFn, forwardRes = testFlushForwardedMetricFn()
onForwardedFlushedFn, onForwardedFlushedRes = testOnForwardedFlushedFn()
require.False(t, e.Consume(alignedstartAtNanos[1], isEarlierThanFn, timestampNanosFn, localFn, forwardFn, onForwardedFlushedFn))
require.Equal(t, (*localRes)[0].timeNanos, alignedstartAtNanos[1]+int64(time.Second))
require.Equal(t, (*localRes)[0].value, 0.0)
require.Equal(t, (*localRes)[1].timeNanos, alignedstartAtNanos[1])
require.Equal(t, (*localRes)[1].value, 123.0)
require.Equal(t, 0, len(*forwardRes))
require.Equal(t, 0, len(*onForwardedFlushedRes))
require.Equal(t, 2, len(e.values))
require.Equal(t, time.Unix(220, 0).UnixNano(), e.lastConsumedAtNanos)
require.Equal(t, 0, len(e.lastConsumedValues))

// Consume all values.
localFn, localRes = testFlushLocalMetricFn()
forwardFn, forwardRes = testFlushForwardedMetricFn()
onForwardedFlushedFn, onForwardedFlushedRes = testOnForwardedFlushedFn()
require.False(t, e.Consume(alignedstartAtNanos[3], isEarlierThanFn, timestampNanosFn, localFn, forwardFn, onForwardedFlushedFn))
require.Equal(t, (*localRes)[0].timeNanos, alignedstartAtNanos[2]+int64(time.Second))
require.Equal(t, (*localRes)[0].value, 0.0)
require.Equal(t, (*localRes)[1].timeNanos, alignedstartAtNanos[2])
require.Equal(t, (*localRes)[1].value, 456.0)
require.Equal(t, (*localRes)[2].timeNanos, alignedstartAtNanos[3]+int64(time.Second))
require.Equal(t, (*localRes)[2].value, 0.0)
require.Equal(t, (*localRes)[3].timeNanos, alignedstartAtNanos[3])
require.Equal(t, (*localRes)[3].value, 589.0)
require.Equal(t, 0, len(*forwardRes))
require.Equal(t, 0, len(*onForwardedFlushedRes))
require.Equal(t, 0, len(e.values))
require.Equal(t, time.Unix(240, 0).UnixNano(), e.lastConsumedAtNanos)
require.Equal(t, 0, len(e.lastConsumedValues))

// Tombstone the element and discard all values.
e.tombstoned = true
localFn, localRes = testFlushLocalMetricFn()
forwardFn, forwardRes = testFlushForwardedMetricFn()
onForwardedFlushedFn, onForwardedFlushedRes = testOnForwardedFlushedFn()
require.True(t, e.Consume(alignedstartAtNanos[3], isEarlierThanFn, timestampNanosFn, localFn, forwardFn, onForwardedFlushedFn))
//verifyOnForwardedFlushResult(t, expectedOnFlushedRes, *onForwardedFlushedRes)
require.Equal(t, 0, len(*forwardRes))
require.Equal(t, 0, len(*localRes))
require.Equal(t, 0, len(*forwardRes))
require.Equal(t, 0, len(e.values))

// Reading and discarding values from a closed element is no op.
e.closed = true
localFn, localRes = testFlushLocalMetricFn()
forwardFn, forwardRes = testFlushForwardedMetricFn()
onForwardedFlushedFn, onForwardedFlushedRes = testOnForwardedFlushedFn()
require.False(t, e.Consume(alignedstartAtNanos[3], isEarlierThanFn, timestampNanosFn, localFn, forwardFn, onForwardedFlushedFn))
require.Equal(t, 0, len(*localRes))
require.Equal(t, 0, len(*forwardRes))
require.Equal(t, 0, len(*onForwardedFlushedRes))
require.Equal(t, 0, len(e.values))
}

func TestGaugeElemClose(t *testing.T) {
e := testGaugeElem(testAlignedStarts[:len(testAlignedStarts)-1], testGaugeVals, maggregation.DefaultTypes, applied.DefaultPipeline, NewOptions())
require.False(t, e.closed)
Expand Down
31 changes: 25 additions & 6 deletions src/aggregator/aggregator/gauge_elem_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,10 +411,13 @@ func (e *GaugeElem) processValueWithAggregationLock(
discardNaNValues = e.opts.DiscardNaNAggregatedValues()
)
for aggTypeIdx, aggType := range e.aggTypes {
toFlush := make([]transformation.Datapoint, 0)
value := lockedAgg.aggregation.ValueOf(aggType)
for _, transformOp := range transformations {

unaryOp, isUnaryOp := transformOp.UnaryTransform()
binaryOp, isBinaryOp := transformOp.BinaryTransform()
unaryMultiOp, isUnaryMultiOp := transformOp.UnaryMultiOutputTransform()
switch {
case isUnaryOp:
curr := transformation.Datapoint{
Expand Down Expand Up @@ -450,7 +453,16 @@ func (e *GaugeElem) processValueWithAggregationLock(
}

value = res.Value

case isUnaryMultiOp:
curr := transformation.Datapoint{
TimeNanos: timeNanos,
Value: value,
}
res, others := unaryMultiOp.Evaluate(curr)
for _, o := range others {
toFlush = append(toFlush, o)
}
value = res.Value
}
}

Expand All @@ -459,11 +471,18 @@ func (e *GaugeElem) processValueWithAggregationLock(
}

if !e.parsedPipeline.HasRollup {
switch e.idPrefixSuffixType {
case NoPrefixNoSuffix:
flushLocalFn(nil, e.id, nil, timeNanos, value, e.sp)
case WithPrefixWithSuffix:
flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), timeNanos, value, e.sp)
toFlush = append(toFlush, transformation.Datapoint{
TimeNanos: timeNanos,
Value: value,
})
for _, point := range toFlush {
switch e.idPrefixSuffixType {
case NoPrefixNoSuffix:
flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp)
case WithPrefixWithSuffix:
flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType),
point.TimeNanos, point.Value, e.sp)
}
}
} else {
forwardedAggregationKey, _ := e.ForwardedAggregationKey()
Expand Down
31 changes: 25 additions & 6 deletions src/aggregator/aggregator/generic_elem.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,10 +468,13 @@ func (e *GenericElem) processValueWithAggregationLock(
discardNaNValues = e.opts.DiscardNaNAggregatedValues()
)
for aggTypeIdx, aggType := range e.aggTypes {
toFlush := make([]transformation.Datapoint, 0)
value := lockedAgg.aggregation.ValueOf(aggType)
for _, transformOp := range transformations {

unaryOp, isUnaryOp := transformOp.UnaryTransform()
binaryOp, isBinaryOp := transformOp.BinaryTransform()
unaryMultiOp, isUnaryMultiOp := transformOp.UnaryMultiOutputTransform()
switch {
case isUnaryOp:
curr := transformation.Datapoint{
Expand Down Expand Up @@ -507,7 +510,16 @@ func (e *GenericElem) processValueWithAggregationLock(
}

value = res.Value

case isUnaryMultiOp:
curr := transformation.Datapoint{
TimeNanos: timeNanos,
Value: value,
}
res, others := unaryMultiOp.Evaluate(curr)
for _, o := range others {
toFlush = append(toFlush, o)
}
value = res.Value
}
}

Expand All @@ -516,11 +528,18 @@ func (e *GenericElem) processValueWithAggregationLock(
}

if !e.parsedPipeline.HasRollup {
switch e.idPrefixSuffixType {
case NoPrefixNoSuffix:
flushLocalFn(nil, e.id, nil, timeNanos, value, e.sp)
case WithPrefixWithSuffix:
flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), timeNanos, value, e.sp)
toFlush = append(toFlush, transformation.Datapoint{
TimeNanos: timeNanos,
Value: value,
})
for _, point := range toFlush {
switch e.idPrefixSuffixType {
case NoPrefixNoSuffix:
flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp)
case WithPrefixWithSuffix:
flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType),
point.TimeNanos, point.Value, e.sp)
}
}
} else {
forwardedAggregationKey, _ := e.ForwardedAggregationKey()
Expand Down
31 changes: 25 additions & 6 deletions src/aggregator/aggregator/timer_elem_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,10 +411,13 @@ func (e *TimerElem) processValueWithAggregationLock(
discardNaNValues = e.opts.DiscardNaNAggregatedValues()
)
for aggTypeIdx, aggType := range e.aggTypes {
toFlush := make([]transformation.Datapoint, 0)
value := lockedAgg.aggregation.ValueOf(aggType)
for _, transformOp := range transformations {

unaryOp, isUnaryOp := transformOp.UnaryTransform()
binaryOp, isBinaryOp := transformOp.BinaryTransform()
unaryMultiOp, isUnaryMultiOp := transformOp.UnaryMultiOutputTransform()
switch {
case isUnaryOp:
curr := transformation.Datapoint{
Expand Down Expand Up @@ -450,7 +453,16 @@ func (e *TimerElem) processValueWithAggregationLock(
}

value = res.Value

case isUnaryMultiOp:
curr := transformation.Datapoint{
TimeNanos: timeNanos,
Value: value,
}
res, others := unaryMultiOp.Evaluate(curr)
for _, o := range others {
toFlush = append(toFlush, o)
}
value = res.Value
}
}

Expand All @@ -459,11 +471,18 @@ func (e *TimerElem) processValueWithAggregationLock(
}

if !e.parsedPipeline.HasRollup {
switch e.idPrefixSuffixType {
case NoPrefixNoSuffix:
flushLocalFn(nil, e.id, nil, timeNanos, value, e.sp)
case WithPrefixWithSuffix:
flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), timeNanos, value, e.sp)
toFlush = append(toFlush, transformation.Datapoint{
TimeNanos: timeNanos,
Value: value,
})
for _, point := range toFlush {
switch e.idPrefixSuffixType {
case NoPrefixNoSuffix:
flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp)
case WithPrefixWithSuffix:
flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType),
point.TimeNanos, point.Value, e.sp)
}
}
} else {
forwardedAggregationKey, _ := e.ForwardedAggregationKey()
Expand Down
16 changes: 10 additions & 6 deletions src/metrics/generated/proto/transformationpb/transformation.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ enum TransformationType {
PERSECOND = 2;
INCREASE = 3;
ADD = 4;
RESET = 5;
}
Loading

0 comments on commit 93210f8

Please sign in to comment.