Skip to content

Commit

Permalink
Add Reset Transformation (#2794)
Browse files Browse the repository at this point in the history
* Add Reset Transformation to reset prometheus counters

This transformation resets an aggregated prometheus counter by emitting
a zero. This transformation fixes the display issue when an M3Aggregator
failover occurs. When the Add transformation is used to generate a cumulative
counter for prometheus, the values in the leader and follower are different
since they started counting at different times. When the failover happens, the
value might decrease. When a cumulative counter decreases, PromQL assumes the
counter reset. This leads to strange display issues, since the counter did not
actually reset. By explicitly reseting the counter each resolution period and
not accumulating a counter, PromQL can correclty graph the rate across failovers.

This does have the downside of an extra 0 datapoint per resolution period. The storage
cost is more than just the extra 0 since the extra 0 is stored 1 second after the actual
datapoint. This degrades the timestamp encoding since the timestamps are no longer at
a fixed interval. In practice we see a 3x increase in storage for these aggregated counters.
  • Loading branch information
ryanhall07 authored Oct 28, 2020
1 parent 7ed094e commit 89215d5
Show file tree
Hide file tree
Showing 14 changed files with 382 additions and 39 deletions.
3 changes: 3 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
mock_*.go linguist-generated
*_mock.go linguist-generated

# genny files
*_gen.go linguist-generated

# generated manifests
kube/bundle.yaml linguist-generated=true

Expand Down
9 changes: 8 additions & 1 deletion scripts/auto-gen-helpers.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
#!/bin/bash
source "${GOPATH}/src/github.com/m3db/m3/.ci/variables.sh"

if [ -z "$GOPATH" ]; then
# assume a developer is running locally without a GOPATH
source .ci/variables.sh
else
source "${GOPATH}/src/github.com/m3db/m3/.ci/variables.sh"
fi


set -e

Expand Down
31 changes: 26 additions & 5 deletions src/aggregator/aggregator/counter_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

90 changes: 90 additions & 0 deletions src/aggregator/aggregator/elem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1613,6 +1613,96 @@ func TestGaugeElemConsumeCustomAggregationCustomPipeline(t *testing.T) {
require.Equal(t, 0, len(e.values))
}

func TestGaugeElemReset(t *testing.T) {
alignedstartAtNanos := []int64{
time.Unix(210, 0).UnixNano(),
time.Unix(220, 0).UnixNano(),
time.Unix(230, 0).UnixNano(),
time.Unix(240, 0).UnixNano(),
}
gaugeVals := []float64{123.0, 456.0, 589.0}
aggregationTypes := maggregation.Types{maggregation.Sum}
isEarlierThanFn := isStandardMetricEarlierThan
timestampNanosFn := standardMetricTimestampNanos
opts := NewOptions().SetDiscardNaNAggregatedValues(false)

testPipeline := applied.NewPipeline([]applied.OpUnion{
{
Type: pipeline.TransformationOpType,
Transformation: pipeline.TransformationOp{Type: transformation.Reset},
},
})

e := testGaugeElem(alignedstartAtNanos[:3], gaugeVals, aggregationTypes, testPipeline, opts)

// Consume values before an early-enough time.
localFn, localRes := testFlushLocalMetricFn()
forwardFn, forwardRes := testFlushForwardedMetricFn()
onForwardedFlushedFn, onForwardedFlushedRes := testOnForwardedFlushedFn()
require.False(t, e.Consume(0, isEarlierThanFn, timestampNanosFn, localFn, forwardFn, onForwardedFlushedFn))
require.Equal(t, 0, len(*onForwardedFlushedRes))
require.Equal(t, 0, len(*localRes))
require.Equal(t, 0, len(*forwardRes))
require.Equal(t, 3, len(e.values))

// Consume one value.
localFn, localRes = testFlushLocalMetricFn()
forwardFn, forwardRes = testFlushForwardedMetricFn()
onForwardedFlushedFn, onForwardedFlushedRes = testOnForwardedFlushedFn()
require.False(t, e.Consume(alignedstartAtNanos[1], isEarlierThanFn, timestampNanosFn, localFn, forwardFn, onForwardedFlushedFn))
require.Equal(t, (*localRes)[0].timeNanos, alignedstartAtNanos[1])
require.Equal(t, (*localRes)[0].value, 123.0)
require.Equal(t, (*localRes)[1].timeNanos, alignedstartAtNanos[1]+int64(time.Second))
require.Equal(t, (*localRes)[1].value, 0.0)
require.Equal(t, 0, len(*forwardRes))
require.Equal(t, 0, len(*onForwardedFlushedRes))
require.Equal(t, 2, len(e.values))
require.Equal(t, time.Unix(220, 0).UnixNano(), e.lastConsumedAtNanos)
require.Equal(t, 0, len(e.lastConsumedValues))

// Consume all values.
localFn, localRes = testFlushLocalMetricFn()
forwardFn, forwardRes = testFlushForwardedMetricFn()
onForwardedFlushedFn, onForwardedFlushedRes = testOnForwardedFlushedFn()
require.False(t, e.Consume(alignedstartAtNanos[3], isEarlierThanFn, timestampNanosFn, localFn, forwardFn, onForwardedFlushedFn))
require.Equal(t, (*localRes)[0].timeNanos, alignedstartAtNanos[2])
require.Equal(t, (*localRes)[0].value, 456.0)
require.Equal(t, (*localRes)[1].timeNanos, alignedstartAtNanos[2]+int64(time.Second))
require.Equal(t, (*localRes)[1].value, 0.0)
require.Equal(t, (*localRes)[2].timeNanos, alignedstartAtNanos[3])
require.Equal(t, (*localRes)[2].value, 589.0)
require.Equal(t, (*localRes)[3].timeNanos, alignedstartAtNanos[3]+int64(time.Second))
require.Equal(t, (*localRes)[3].value, 0.0)
require.Equal(t, 0, len(*forwardRes))
require.Equal(t, 0, len(*onForwardedFlushedRes))
require.Equal(t, 0, len(e.values))
require.Equal(t, time.Unix(240, 0).UnixNano(), e.lastConsumedAtNanos)
require.Equal(t, 0, len(e.lastConsumedValues))

// Tombstone the element and discard all values.
e.tombstoned = true
localFn, localRes = testFlushLocalMetricFn()
forwardFn, forwardRes = testFlushForwardedMetricFn()
onForwardedFlushedFn, onForwardedFlushedRes = testOnForwardedFlushedFn()
require.True(t, e.Consume(alignedstartAtNanos[3], isEarlierThanFn, timestampNanosFn, localFn, forwardFn, onForwardedFlushedFn))
//verifyOnForwardedFlushResult(t, expectedOnFlushedRes, *onForwardedFlushedRes)
require.Equal(t, 0, len(*forwardRes))
require.Equal(t, 0, len(*localRes))
require.Equal(t, 0, len(*forwardRes))
require.Equal(t, 0, len(e.values))

// Reading and discarding values from a closed element is no op.
e.closed = true
localFn, localRes = testFlushLocalMetricFn()
forwardFn, forwardRes = testFlushForwardedMetricFn()
onForwardedFlushedFn, onForwardedFlushedRes = testOnForwardedFlushedFn()
require.False(t, e.Consume(alignedstartAtNanos[3], isEarlierThanFn, timestampNanosFn, localFn, forwardFn, onForwardedFlushedFn))
require.Equal(t, 0, len(*localRes))
require.Equal(t, 0, len(*forwardRes))
require.Equal(t, 0, len(*onForwardedFlushedRes))
require.Equal(t, 0, len(e.values))
}

func TestGaugeElemClose(t *testing.T) {
e := testGaugeElem(testAlignedStarts[:len(testAlignedStarts)-1], testGaugeVals, maggregation.DefaultTypes, applied.DefaultPipeline, NewOptions())
require.False(t, e.closed)
Expand Down
31 changes: 26 additions & 5 deletions src/aggregator/aggregator/gauge_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 26 additions & 5 deletions src/aggregator/aggregator/generic_elem.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,10 +468,12 @@ func (e *GenericElem) processValueWithAggregationLock(
discardNaNValues = e.opts.DiscardNaNAggregatedValues()
)
for aggTypeIdx, aggType := range e.aggTypes {
var extraDp transformation.Datapoint
value := lockedAgg.aggregation.ValueOf(aggType)
for _, transformOp := range transformations {
unaryOp, isUnaryOp := transformOp.UnaryTransform()
binaryOp, isBinaryOp := transformOp.BinaryTransform()
unaryMultiOp, isUnaryMultiOp := transformOp.UnaryMultiOutputTransform()
switch {
case isUnaryOp:
curr := transformation.Datapoint{
Expand Down Expand Up @@ -507,7 +509,15 @@ func (e *GenericElem) processValueWithAggregationLock(
}

value = res.Value
case isUnaryMultiOp:
curr := transformation.Datapoint{
TimeNanos: timeNanos,
Value: value,
}

var res transformation.Datapoint
res, extraDp = unaryMultiOp.Evaluate(curr)
value = res.Value
}
}

Expand All @@ -516,11 +526,22 @@ func (e *GenericElem) processValueWithAggregationLock(
}

if !e.parsedPipeline.HasRollup {
switch e.idPrefixSuffixType {
case NoPrefixNoSuffix:
flushLocalFn(nil, e.id, nil, timeNanos, value, e.sp)
case WithPrefixWithSuffix:
flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), timeNanos, value, e.sp)
toFlush := make([]transformation.Datapoint, 0, 2)
toFlush = append(toFlush, transformation.Datapoint{
TimeNanos: timeNanos,
Value: value,
})
if extraDp.TimeNanos != 0 {
toFlush = append(toFlush, extraDp)
}
for _, point := range toFlush {
switch e.idPrefixSuffixType {
case NoPrefixNoSuffix:
flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp)
case WithPrefixWithSuffix:
flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType),
point.TimeNanos, point.Value, e.sp)
}
}
} else {
forwardedAggregationKey, _ := e.ForwardedAggregationKey()
Expand Down
31 changes: 26 additions & 5 deletions src/aggregator/aggregator/timer_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 10 additions & 6 deletions src/metrics/generated/proto/transformationpb/transformation.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 89215d5

Please sign in to comment.