diff --git a/.gitattributes b/.gitattributes index c523710ab2..767e2ee130 100644 --- a/.gitattributes +++ b/.gitattributes @@ -2,6 +2,9 @@ mock_*.go linguist-generated *_mock.go linguist-generated +# genny files +*_gen.go linguist-generated + # generated manifests kube/bundle.yaml linguist-generated=true diff --git a/scripts/auto-gen-helpers.sh b/scripts/auto-gen-helpers.sh index 0c3acdcb2e..76be3ad059 100755 --- a/scripts/auto-gen-helpers.sh +++ b/scripts/auto-gen-helpers.sh @@ -1,5 +1,12 @@ #!/bin/bash -source "${GOPATH}/src/github.com/m3db/m3/.ci/variables.sh" + +if [ -z "$GOPATH" ]; then + # assume a developer is running locally without a GOPATH + source .ci/variables.sh +else + source "${GOPATH}/src/github.com/m3db/m3/.ci/variables.sh" +fi + set -e diff --git a/src/aggregator/aggregator/counter_elem_gen.go b/src/aggregator/aggregator/counter_elem_gen.go index a6f3c720d3..625ce7de2d 100644 --- a/src/aggregator/aggregator/counter_elem_gen.go +++ b/src/aggregator/aggregator/counter_elem_gen.go @@ -411,10 +411,12 @@ func (e *CounterElem) processValueWithAggregationLock( discardNaNValues = e.opts.DiscardNaNAggregatedValues() ) for aggTypeIdx, aggType := range e.aggTypes { + var extraDp transformation.Datapoint value := lockedAgg.aggregation.ValueOf(aggType) for _, transformOp := range transformations { unaryOp, isUnaryOp := transformOp.UnaryTransform() binaryOp, isBinaryOp := transformOp.BinaryTransform() + unaryMultiOp, isUnaryMultiOp := transformOp.UnaryMultiOutputTransform() switch { case isUnaryOp: curr := transformation.Datapoint{ @@ -450,7 +452,15 @@ func (e *CounterElem) processValueWithAggregationLock( } value = res.Value + case isUnaryMultiOp: + curr := transformation.Datapoint{ + TimeNanos: timeNanos, + Value: value, + } + var res transformation.Datapoint + res, extraDp = unaryMultiOp.Evaluate(curr) + value = res.Value } } @@ -459,11 +469,22 @@ func (e *CounterElem) processValueWithAggregationLock( } if !e.parsedPipeline.HasRollup { - switch e.idPrefixSuffixType { - case NoPrefixNoSuffix: - flushLocalFn(nil, e.id, nil, timeNanos, value, e.sp) - case WithPrefixWithSuffix: - flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), timeNanos, value, e.sp) + toFlush := make([]transformation.Datapoint, 0, 2) + toFlush = append(toFlush, transformation.Datapoint{ + TimeNanos: timeNanos, + Value: value, + }) + if extraDp.TimeNanos != 0 { + toFlush = append(toFlush, extraDp) + } + for _, point := range toFlush { + switch e.idPrefixSuffixType { + case NoPrefixNoSuffix: + flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp) + case WithPrefixWithSuffix: + flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), + point.TimeNanos, point.Value, e.sp) + } } } else { forwardedAggregationKey, _ := e.ForwardedAggregationKey() diff --git a/src/aggregator/aggregator/elem_test.go b/src/aggregator/aggregator/elem_test.go index f3e223897b..52c89da25b 100644 --- a/src/aggregator/aggregator/elem_test.go +++ b/src/aggregator/aggregator/elem_test.go @@ -1613,6 +1613,96 @@ func TestGaugeElemConsumeCustomAggregationCustomPipeline(t *testing.T) { require.Equal(t, 0, len(e.values)) } +func TestGaugeElemReset(t *testing.T) { + alignedstartAtNanos := []int64{ + time.Unix(210, 0).UnixNano(), + time.Unix(220, 0).UnixNano(), + time.Unix(230, 0).UnixNano(), + time.Unix(240, 0).UnixNano(), + } + gaugeVals := []float64{123.0, 456.0, 589.0} + aggregationTypes := maggregation.Types{maggregation.Sum} + isEarlierThanFn := isStandardMetricEarlierThan + timestampNanosFn := standardMetricTimestampNanos + opts := NewOptions().SetDiscardNaNAggregatedValues(false) + + testPipeline := applied.NewPipeline([]applied.OpUnion{ + { + Type: pipeline.TransformationOpType, + Transformation: pipeline.TransformationOp{Type: transformation.Reset}, + }, + }) + + e := testGaugeElem(alignedstartAtNanos[:3], gaugeVals, aggregationTypes, testPipeline, opts) + + // Consume values before an early-enough time. + localFn, localRes := testFlushLocalMetricFn() + forwardFn, forwardRes := testFlushForwardedMetricFn() + onForwardedFlushedFn, onForwardedFlushedRes := testOnForwardedFlushedFn() + require.False(t, e.Consume(0, isEarlierThanFn, timestampNanosFn, localFn, forwardFn, onForwardedFlushedFn)) + require.Equal(t, 0, len(*onForwardedFlushedRes)) + require.Equal(t, 0, len(*localRes)) + require.Equal(t, 0, len(*forwardRes)) + require.Equal(t, 3, len(e.values)) + + // Consume one value. + localFn, localRes = testFlushLocalMetricFn() + forwardFn, forwardRes = testFlushForwardedMetricFn() + onForwardedFlushedFn, onForwardedFlushedRes = testOnForwardedFlushedFn() + require.False(t, e.Consume(alignedstartAtNanos[1], isEarlierThanFn, timestampNanosFn, localFn, forwardFn, onForwardedFlushedFn)) + require.Equal(t, (*localRes)[0].timeNanos, alignedstartAtNanos[1]) + require.Equal(t, (*localRes)[0].value, 123.0) + require.Equal(t, (*localRes)[1].timeNanos, alignedstartAtNanos[1]+int64(time.Second)) + require.Equal(t, (*localRes)[1].value, 0.0) + require.Equal(t, 0, len(*forwardRes)) + require.Equal(t, 0, len(*onForwardedFlushedRes)) + require.Equal(t, 2, len(e.values)) + require.Equal(t, time.Unix(220, 0).UnixNano(), e.lastConsumedAtNanos) + require.Equal(t, 0, len(e.lastConsumedValues)) + + // Consume all values. + localFn, localRes = testFlushLocalMetricFn() + forwardFn, forwardRes = testFlushForwardedMetricFn() + onForwardedFlushedFn, onForwardedFlushedRes = testOnForwardedFlushedFn() + require.False(t, e.Consume(alignedstartAtNanos[3], isEarlierThanFn, timestampNanosFn, localFn, forwardFn, onForwardedFlushedFn)) + require.Equal(t, (*localRes)[0].timeNanos, alignedstartAtNanos[2]) + require.Equal(t, (*localRes)[0].value, 456.0) + require.Equal(t, (*localRes)[1].timeNanos, alignedstartAtNanos[2]+int64(time.Second)) + require.Equal(t, (*localRes)[1].value, 0.0) + require.Equal(t, (*localRes)[2].timeNanos, alignedstartAtNanos[3]) + require.Equal(t, (*localRes)[2].value, 589.0) + require.Equal(t, (*localRes)[3].timeNanos, alignedstartAtNanos[3]+int64(time.Second)) + require.Equal(t, (*localRes)[3].value, 0.0) + require.Equal(t, 0, len(*forwardRes)) + require.Equal(t, 0, len(*onForwardedFlushedRes)) + require.Equal(t, 0, len(e.values)) + require.Equal(t, time.Unix(240, 0).UnixNano(), e.lastConsumedAtNanos) + require.Equal(t, 0, len(e.lastConsumedValues)) + + // Tombstone the element and discard all values. + e.tombstoned = true + localFn, localRes = testFlushLocalMetricFn() + forwardFn, forwardRes = testFlushForwardedMetricFn() + onForwardedFlushedFn, onForwardedFlushedRes = testOnForwardedFlushedFn() + require.True(t, e.Consume(alignedstartAtNanos[3], isEarlierThanFn, timestampNanosFn, localFn, forwardFn, onForwardedFlushedFn)) + //verifyOnForwardedFlushResult(t, expectedOnFlushedRes, *onForwardedFlushedRes) + require.Equal(t, 0, len(*forwardRes)) + require.Equal(t, 0, len(*localRes)) + require.Equal(t, 0, len(*forwardRes)) + require.Equal(t, 0, len(e.values)) + + // Reading and discarding values from a closed element is no op. + e.closed = true + localFn, localRes = testFlushLocalMetricFn() + forwardFn, forwardRes = testFlushForwardedMetricFn() + onForwardedFlushedFn, onForwardedFlushedRes = testOnForwardedFlushedFn() + require.False(t, e.Consume(alignedstartAtNanos[3], isEarlierThanFn, timestampNanosFn, localFn, forwardFn, onForwardedFlushedFn)) + require.Equal(t, 0, len(*localRes)) + require.Equal(t, 0, len(*forwardRes)) + require.Equal(t, 0, len(*onForwardedFlushedRes)) + require.Equal(t, 0, len(e.values)) +} + func TestGaugeElemClose(t *testing.T) { e := testGaugeElem(testAlignedStarts[:len(testAlignedStarts)-1], testGaugeVals, maggregation.DefaultTypes, applied.DefaultPipeline, NewOptions()) require.False(t, e.closed) diff --git a/src/aggregator/aggregator/gauge_elem_gen.go b/src/aggregator/aggregator/gauge_elem_gen.go index 0a7de840b7..20efbc03fa 100644 --- a/src/aggregator/aggregator/gauge_elem_gen.go +++ b/src/aggregator/aggregator/gauge_elem_gen.go @@ -411,10 +411,12 @@ func (e *GaugeElem) processValueWithAggregationLock( discardNaNValues = e.opts.DiscardNaNAggregatedValues() ) for aggTypeIdx, aggType := range e.aggTypes { + var extraDp transformation.Datapoint value := lockedAgg.aggregation.ValueOf(aggType) for _, transformOp := range transformations { unaryOp, isUnaryOp := transformOp.UnaryTransform() binaryOp, isBinaryOp := transformOp.BinaryTransform() + unaryMultiOp, isUnaryMultiOp := transformOp.UnaryMultiOutputTransform() switch { case isUnaryOp: curr := transformation.Datapoint{ @@ -450,7 +452,15 @@ func (e *GaugeElem) processValueWithAggregationLock( } value = res.Value + case isUnaryMultiOp: + curr := transformation.Datapoint{ + TimeNanos: timeNanos, + Value: value, + } + var res transformation.Datapoint + res, extraDp = unaryMultiOp.Evaluate(curr) + value = res.Value } } @@ -459,11 +469,22 @@ func (e *GaugeElem) processValueWithAggregationLock( } if !e.parsedPipeline.HasRollup { - switch e.idPrefixSuffixType { - case NoPrefixNoSuffix: - flushLocalFn(nil, e.id, nil, timeNanos, value, e.sp) - case WithPrefixWithSuffix: - flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), timeNanos, value, e.sp) + toFlush := make([]transformation.Datapoint, 0, 2) + toFlush = append(toFlush, transformation.Datapoint{ + TimeNanos: timeNanos, + Value: value, + }) + if extraDp.TimeNanos != 0 { + toFlush = append(toFlush, extraDp) + } + for _, point := range toFlush { + switch e.idPrefixSuffixType { + case NoPrefixNoSuffix: + flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp) + case WithPrefixWithSuffix: + flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), + point.TimeNanos, point.Value, e.sp) + } } } else { forwardedAggregationKey, _ := e.ForwardedAggregationKey() diff --git a/src/aggregator/aggregator/generic_elem.go b/src/aggregator/aggregator/generic_elem.go index f5c57e2a54..eab77d6b34 100644 --- a/src/aggregator/aggregator/generic_elem.go +++ b/src/aggregator/aggregator/generic_elem.go @@ -468,10 +468,12 @@ func (e *GenericElem) processValueWithAggregationLock( discardNaNValues = e.opts.DiscardNaNAggregatedValues() ) for aggTypeIdx, aggType := range e.aggTypes { + var extraDp transformation.Datapoint value := lockedAgg.aggregation.ValueOf(aggType) for _, transformOp := range transformations { unaryOp, isUnaryOp := transformOp.UnaryTransform() binaryOp, isBinaryOp := transformOp.BinaryTransform() + unaryMultiOp, isUnaryMultiOp := transformOp.UnaryMultiOutputTransform() switch { case isUnaryOp: curr := transformation.Datapoint{ @@ -507,7 +509,15 @@ func (e *GenericElem) processValueWithAggregationLock( } value = res.Value + case isUnaryMultiOp: + curr := transformation.Datapoint{ + TimeNanos: timeNanos, + Value: value, + } + var res transformation.Datapoint + res, extraDp = unaryMultiOp.Evaluate(curr) + value = res.Value } } @@ -516,11 +526,22 @@ func (e *GenericElem) processValueWithAggregationLock( } if !e.parsedPipeline.HasRollup { - switch e.idPrefixSuffixType { - case NoPrefixNoSuffix: - flushLocalFn(nil, e.id, nil, timeNanos, value, e.sp) - case WithPrefixWithSuffix: - flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), timeNanos, value, e.sp) + toFlush := make([]transformation.Datapoint, 0, 2) + toFlush = append(toFlush, transformation.Datapoint{ + TimeNanos: timeNanos, + Value: value, + }) + if extraDp.TimeNanos != 0 { + toFlush = append(toFlush, extraDp) + } + for _, point := range toFlush { + switch e.idPrefixSuffixType { + case NoPrefixNoSuffix: + flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp) + case WithPrefixWithSuffix: + flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), + point.TimeNanos, point.Value, e.sp) + } } } else { forwardedAggregationKey, _ := e.ForwardedAggregationKey() diff --git a/src/aggregator/aggregator/timer_elem_gen.go b/src/aggregator/aggregator/timer_elem_gen.go index a40370f3ac..52e0d2be88 100644 --- a/src/aggregator/aggregator/timer_elem_gen.go +++ b/src/aggregator/aggregator/timer_elem_gen.go @@ -411,10 +411,12 @@ func (e *TimerElem) processValueWithAggregationLock( discardNaNValues = e.opts.DiscardNaNAggregatedValues() ) for aggTypeIdx, aggType := range e.aggTypes { + var extraDp transformation.Datapoint value := lockedAgg.aggregation.ValueOf(aggType) for _, transformOp := range transformations { unaryOp, isUnaryOp := transformOp.UnaryTransform() binaryOp, isBinaryOp := transformOp.BinaryTransform() + unaryMultiOp, isUnaryMultiOp := transformOp.UnaryMultiOutputTransform() switch { case isUnaryOp: curr := transformation.Datapoint{ @@ -450,7 +452,15 @@ func (e *TimerElem) processValueWithAggregationLock( } value = res.Value + case isUnaryMultiOp: + curr := transformation.Datapoint{ + TimeNanos: timeNanos, + Value: value, + } + var res transformation.Datapoint + res, extraDp = unaryMultiOp.Evaluate(curr) + value = res.Value } } @@ -459,11 +469,22 @@ func (e *TimerElem) processValueWithAggregationLock( } if !e.parsedPipeline.HasRollup { - switch e.idPrefixSuffixType { - case NoPrefixNoSuffix: - flushLocalFn(nil, e.id, nil, timeNanos, value, e.sp) - case WithPrefixWithSuffix: - flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), timeNanos, value, e.sp) + toFlush := make([]transformation.Datapoint, 0, 2) + toFlush = append(toFlush, transformation.Datapoint{ + TimeNanos: timeNanos, + Value: value, + }) + if extraDp.TimeNanos != 0 { + toFlush = append(toFlush, extraDp) + } + for _, point := range toFlush { + switch e.idPrefixSuffixType { + case NoPrefixNoSuffix: + flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp) + case WithPrefixWithSuffix: + flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType), + point.TimeNanos, point.Value, e.sp) + } } } else { forwardedAggregationKey, _ := e.ForwardedAggregationKey() diff --git a/src/metrics/generated/proto/transformationpb/transformation.pb.go b/src/metrics/generated/proto/transformationpb/transformation.pb.go index 808d10685a..88f42764c8 100644 --- a/src/metrics/generated/proto/transformationpb/transformation.pb.go +++ b/src/metrics/generated/proto/transformationpb/transformation.pb.go @@ -54,6 +54,7 @@ const ( TransformationType_PERSECOND TransformationType = 2 TransformationType_INCREASE TransformationType = 3 TransformationType_ADD TransformationType = 4 + TransformationType_RESET TransformationType = 5 ) var TransformationType_name = map[int32]string{ @@ -62,6 +63,7 @@ var TransformationType_name = map[int32]string{ 2: "PERSECOND", 3: "INCREASE", 4: "ADD", + 5: "RESET", } var TransformationType_value = map[string]int32{ "UNKNOWN": 0, @@ -69,6 +71,7 @@ var TransformationType_value = map[string]int32{ "PERSECOND": 2, "INCREASE": 3, "ADD": 4, + "RESET": 5, } func (x TransformationType) String() string { @@ -87,18 +90,19 @@ func init() { } var fileDescriptorTransformation = []byte{ - // 201 bytes of a gzipped FileDescriptorProto + // 212 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x0a, 0x49, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0xcf, 0x35, 0x4e, 0x49, 0xd2, 0xcf, 0x35, 0xd6, 0x2f, 0x2e, 0x4a, 0xd6, 0xcf, 0x4d, 0x2d, 0x29, 0xca, 0x4c, 0x2e, 0xd6, 0x4f, 0x4f, 0xcd, 0x4b, 0x2d, 0x4a, 0x2c, 0x49, 0x4d, 0xd1, 0x2f, 0x28, 0xca, 0x2f, 0xc9, 0xd7, 0x2f, 0x29, 0x4a, 0xcc, 0x2b, 0x4e, 0xcb, 0x2f, 0xca, 0x4d, 0x2c, 0xc9, 0xcc, 0xcf, 0x2b, 0x48, 0x42, 0x13, 0xd0, 0x03, 0xab, - 0x12, 0x12, 0x40, 0x57, 0xa6, 0x15, 0xca, 0x25, 0x14, 0x82, 0x22, 0x16, 0x52, 0x59, 0x90, 0x2a, + 0x12, 0x12, 0x40, 0x57, 0xa6, 0x95, 0xc0, 0x25, 0x14, 0x82, 0x22, 0x16, 0x52, 0x59, 0x90, 0x2a, 0xc4, 0xcd, 0xc5, 0x1e, 0xea, 0xe7, 0xed, 0xe7, 0x1f, 0xee, 0x27, 0xc0, 0x20, 0xc4, 0xc3, 0xc5, 0xe1, 0xe8, 0x14, 0xec, 0xef, 0x13, 0x1a, 0xe2, 0x2a, 0xc0, 0x28, 0xc4, 0xcb, 0xc5, 0x19, 0xe0, 0x1a, 0x14, 0xec, 0xea, 0xec, 0xef, 0xe7, 0x22, 0xc0, 0x04, 0x92, 0xf4, 0xf4, 0x73, 0x0e, 0x72, - 0x75, 0x0c, 0x76, 0x15, 0x60, 0x16, 0x62, 0xe7, 0x62, 0x76, 0x74, 0x71, 0x11, 0x60, 0x71, 0x0a, - 0x3c, 0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4, 0x18, 0x27, 0x3c, 0x96, - 0x63, 0x88, 0xb2, 0xa7, 0xd0, 0x43, 0x49, 0x6c, 0x60, 0x71, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, - 0xff, 0xf3, 0x67, 0xfb, 0xf2, 0x1a, 0x01, 0x00, 0x00, + 0x75, 0x0c, 0x76, 0x15, 0x60, 0x16, 0x62, 0xe7, 0x62, 0x76, 0x74, 0x71, 0x11, 0x60, 0x11, 0xe2, + 0xe4, 0x62, 0x0d, 0x72, 0x0d, 0x76, 0x0d, 0x11, 0x60, 0x75, 0x0a, 0x3c, 0xf1, 0x48, 0x8e, 0xf1, + 0xc2, 0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, 0x88, 0xb2, 0xa7, 0xd0, + 0x6f, 0x49, 0x6c, 0x60, 0x71, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0x71, 0x38, 0xb4, 0xa1, + 0x25, 0x01, 0x00, 0x00, } diff --git a/src/metrics/generated/proto/transformationpb/transformation.proto b/src/metrics/generated/proto/transformationpb/transformation.proto index 640f043bfc..2857644b01 100644 --- a/src/metrics/generated/proto/transformationpb/transformation.proto +++ b/src/metrics/generated/proto/transformationpb/transformation.proto @@ -30,4 +30,5 @@ enum TransformationType { PERSECOND = 2; INCREASE = 3; ADD = 4; + RESET = 5; } diff --git a/src/metrics/transformation/func.go b/src/metrics/transformation/func.go index 80016bb49b..4be747b6fd 100644 --- a/src/metrics/transformation/func.go +++ b/src/metrics/transformation/func.go @@ -66,3 +66,16 @@ type BinaryTransformFn func(prev, curr Datapoint) Datapoint func (fn BinaryTransformFn) Evaluate(prev, curr Datapoint) Datapoint { return fn(prev, curr) } + +// UnaryMultiOutputTransform is like UnaryTransform, but can output an additional datapoint. +// The additional datapoint is not passed to subsequent transforms. +type UnaryMultiOutputTransform interface { + Evaluate(dp Datapoint) (Datapoint, Datapoint) +} + +// UnaryMultiOutputTransformFn implements UnaryMultiOutputTransform as a function. +type UnaryMultiOutputTransformFn func(dp Datapoint) (Datapoint, Datapoint) + +func (fn UnaryMultiOutputTransformFn) Evaluate(dp Datapoint) (Datapoint, Datapoint) { + return fn(dp) +} diff --git a/src/metrics/transformation/type.go b/src/metrics/transformation/type.go index fe0edae0f1..e66ebbe4c3 100644 --- a/src/metrics/transformation/type.go +++ b/src/metrics/transformation/type.go @@ -37,11 +37,12 @@ const ( PerSecond Increase Add + Reset ) // IsValid checks if the transformation type is valid. func (t Type) IsValid() bool { - return t.IsUnaryTransform() || t.IsBinaryTransform() + return t.IsUnaryTransform() || t.IsBinaryTransform() || t.IsUnaryMultiOutputTransform() } // IsUnaryTransform returns whether this is a unary transformation. @@ -56,19 +57,27 @@ func (t Type) IsBinaryTransform() bool { return exists } +func (t Type) IsUnaryMultiOutputTransform() bool { + _, exists := unaryMultiOutputTransforms[t] + return exists +} + // NewOp returns a constructed operation that is allocated once and can be // reused. func (t Type) NewOp() (Op, error) { var ( - err error - unary UnaryTransform - binary BinaryTransform + err error + unary UnaryTransform + binary BinaryTransform + unaryMulti UnaryMultiOutputTransform ) switch { case t.IsUnaryTransform(): unary, err = t.UnaryTransform() case t.IsBinaryTransform(): binary, err = t.BinaryTransform() + case t.IsUnaryMultiOutputTransform(): + unaryMulti, err = t.UnaryMultiOutputTransform() default: err = fmt.Errorf("unknown transformation type: %v", t) } @@ -76,9 +85,10 @@ func (t Type) NewOp() (Op, error) { return Op{}, err } return Op{ - opType: t, - unary: unary, - binary: binary, + opType: t, + unary: unary, + binary: binary, + unaryMulti: unaryMulti, }, nil } @@ -122,6 +132,26 @@ func (t Type) MustBinaryTransform() BinaryTransform { return tf } +// UnaryMultiOutputTransform returns the unary transformation function associated with +// the transformation type if applicable, or an error otherwise. +func (t Type) UnaryMultiOutputTransform() (UnaryMultiOutputTransform, error) { + tf, exists := unaryMultiOutputTransforms[t] + if !exists { + return nil, fmt.Errorf("%v is not a unary transfomration", t) + } + return tf(), nil +} + +// MustUnaryMultiOutputTransform returns the unary transformation function associated with +// the transformation type if applicable, or panics otherwise. +func (t Type) MustUnaryMultiOutputTransform() UnaryMultiOutputTransform { + tf, err := t.UnaryMultiOutputTransform() + if err != nil { + panic(err) + } + return tf +} + // ToProto converts the transformation type to a protobuf message in place. func (t Type) ToProto(pb *transformationpb.TransformationType) error { switch t { @@ -133,6 +163,8 @@ func (t Type) ToProto(pb *transformationpb.TransformationType) error { *pb = transformationpb.TransformationType_INCREASE case Add: *pb = transformationpb.TransformationType_ADD + case Reset: + *pb = transformationpb.TransformationType_RESET default: return fmt.Errorf("unknown transformation type: %v", t) } @@ -150,6 +182,8 @@ func (t *Type) FromProto(pb transformationpb.TransformationType) error { *t = Increase case transformationpb.TransformationType_ADD: *t = Add + case transformationpb.TransformationType_RESET: + *t = Reset default: return fmt.Errorf("unknown transformation type in proto: %v", pb) } @@ -201,9 +235,10 @@ func ParseType(str string) (Type, error) { type Op struct { opType Type - // might have either unary or binary - unary UnaryTransform - binary BinaryTransform + // has one of the following + unary UnaryTransform + binary BinaryTransform + unaryMulti UnaryMultiOutputTransform } // Type returns the op type. @@ -227,6 +262,14 @@ func (o Op) BinaryTransform() (BinaryTransform, bool) { return o.binary, true } +// UnaryMultiOutputTransform returns the active unary multi transform if op is unary multi transform. +func (o Op) UnaryMultiOutputTransform() (UnaryMultiOutputTransform, bool) { + if !o.Type().IsUnaryMultiOutputTransform() { + return nil, false + } + return o.unaryMulti, true +} + var ( unaryTransforms = map[Type]func() UnaryTransform{ Absolute: transformAbsolute, @@ -236,6 +279,9 @@ var ( PerSecond: transformPerSecond, Increase: transformIncrease, } + unaryMultiOutputTransforms = map[Type]func() UnaryMultiOutputTransform{ + Reset: transformReset, + } typeStringMap map[string]Type ) @@ -247,4 +293,7 @@ func init() { for t := range binaryTransforms { typeStringMap[t.String()] = t } + for t := range unaryMultiOutputTransforms { + typeStringMap[t.String()] = t + } } diff --git a/src/metrics/transformation/type_string.go b/src/metrics/transformation/type_string.go index 028d8a4bc1..0509f1f1ed 100644 --- a/src/metrics/transformation/type_string.go +++ b/src/metrics/transformation/type_string.go @@ -33,11 +33,12 @@ func _() { _ = x[PerSecond-2] _ = x[Increase-3] _ = x[Add-4] + _ = x[Reset-5] } -const _Type_name = "UnknownTypeAbsolutePerSecondIncreaseAdd" +const _Type_name = "UnknownTypeAbsolutePerSecondIncreaseAddReset" -var _Type_index = [...]uint8{0, 11, 19, 28, 36, 39} +var _Type_index = [...]uint8{0, 11, 19, 28, 36, 39, 44} func (i Type) String() string { if i < 0 || i >= Type(len(_Type_index)-1) { diff --git a/src/metrics/transformation/unary_multi.go b/src/metrics/transformation/unary_multi.go new file mode 100644 index 0000000000..dc8ba9e4d8 --- /dev/null +++ b/src/metrics/transformation/unary_multi.go @@ -0,0 +1,47 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package transformation + +import "time" + +// transformReset returns the provided datapoint and a zero datapoint one second later. +// +// This transform is useful for force resetting a counter value in Prometheus. When running the M3Aggregator in HA, both +// the follower and leader are computing aggregate counters, but they started counting at different times. If these +// counters are emitted as monotonic cumulative counters, during failover the counter decreases if the new leader +// started counting later. Prometheus assumes any decrease in a counter is due a counter reset, which leads to strange +// display results since the counter did not actually reset. +// +// This transform gets around this issue by explicitly not accumulating results, like Add, and force resets the counter +// with a zero value so PromQL properly graphs the delta as the rate value. +// +// This does have the downside of an extra 0 datapoint per resolution period. The storage cost is more than just the +// extra 0 value since the value is stored 1 second after the actual datapoint. This degrades the timestamp encoding +// since the timestamps are no longer at a fixed interval. In practice we see a 3x increase in storage for these +// aggregated counters. +// +// Currently only a single extra datapoint per aggregation is supported. If multiple transforms in an aggregation emit +// an additional datapoint, only the last one is used. +func transformReset() UnaryMultiOutputTransform { + return UnaryMultiOutputTransformFn(func(dp Datapoint) (Datapoint, Datapoint) { + return dp, Datapoint{Value: 0, TimeNanos: dp.TimeNanos + int64(time.Second)} + }) +} diff --git a/src/metrics/transformation/unary_multi_test.go b/src/metrics/transformation/unary_multi_test.go new file mode 100644 index 0000000000..68764b942e --- /dev/null +++ b/src/metrics/transformation/unary_multi_test.go @@ -0,0 +1,44 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package transformation + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestReset(t *testing.T) { + reset, err := Reset.UnaryMultiOutputTransform() + require.NoError(t, err) + now := time.Now() + dp := Datapoint{Value: 1000, TimeNanos: now.UnixNano()} + this, other := reset.Evaluate(dp) + require.Equal(t, dp, this) + require.Equal(t, Datapoint{Value: 0, TimeNanos: now.Add(time.Second).UnixNano()}, other) +} + +func TestUnaryMultiParse(t *testing.T) { + parsed, err := ParseType("Reset") + require.NoError(t, err) + require.Equal(t, Reset, parsed) +}