diff --git a/scripts/docker-integration-tests/aggregator/m3aggregator.yml b/scripts/docker-integration-tests/aggregator/m3aggregator.yml index 16aed28011..309f52b41d 100644 --- a/scripts/docker-integration-tests/aggregator/m3aggregator.yml +++ b/scripts/docker-integration-tests/aggregator/m3aggregator.yml @@ -320,6 +320,8 @@ aggregator: flushInterval: 1s writeBufferSize: 16384 readBufferSize: 256 + passthrough: + enabled: true forwarding: maxConstDelay: 1m # Need to add some buffer window, since timed metrics by default are delayed by 1min. entryTTL: 1h diff --git a/src/aggregator/aggregator/aggregator.go b/src/aggregator/aggregator/aggregator.go index 84e7cc3dd2..53925f1906 100644 --- a/src/aggregator/aggregator/aggregator.go +++ b/src/aggregator/aggregator/aggregator.go @@ -30,6 +30,7 @@ import ( "time" "github.com/m3db/m3/src/aggregator/aggregator/handler" + "github.com/m3db/m3/src/aggregator/aggregator/handler/writer" "github.com/m3db/m3/src/aggregator/client" "github.com/m3db/m3/src/aggregator/sharding" "github.com/m3db/m3/src/cluster/placement" @@ -39,6 +40,7 @@ import ( "github.com/m3db/m3/src/metrics/metric/aggregated" "github.com/m3db/m3/src/metrics/metric/id" "github.com/m3db/m3/src/metrics/metric/unaggregated" + "github.com/m3db/m3/src/metrics/policy" "github.com/m3db/m3/src/x/clock" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/instrument" @@ -77,6 +79,9 @@ type Aggregator interface { // AddForwarded adds a forwarded metric with metadata. AddForwarded(metric aggregated.ForwardedMetric, metadata metadata.ForwardMetadata) error + // AddPassthrough adds a passthrough metric with storage policy. + AddPassthrough(metric aggregated.Metric, storagePolicy policy.StoragePolicy) error + // Resign stops the aggregator from participating in leader election and resigns // from ongoing campaign if any. Resign() error @@ -103,6 +108,7 @@ type aggregator struct { electionManager ElectionManager flushManager FlushManager flushHandler handler.Handler + passthroughWriter writer.Writer adminClient client.AdminClient resignTimeout time.Duration @@ -137,6 +143,7 @@ func NewAggregator(opts Options) Aggregator { electionManager: opts.ElectionManager(), flushManager: opts.FlushManager(), flushHandler: opts.FlushHandler(), + passthroughWriter: opts.PassthroughWriter(), adminClient: opts.AdminClient(), resignTimeout: opts.ResignTimeout(), doneCh: make(chan struct{}), @@ -257,6 +264,43 @@ func (agg *aggregator) AddForwarded( return nil } +func (agg *aggregator) AddPassthrough( + metric aggregated.Metric, + storagePolicy policy.StoragePolicy, +) error { + callStart := agg.nowFn() + agg.metrics.passthrough.Inc(1) + + if agg.electionManager.ElectionState() == FollowerState { + agg.metrics.addPassthrough.ReportFollowerNoop() + return nil + } + + pw, err := agg.passWriter() + if err != nil { + agg.metrics.addPassthrough.ReportError(err) + return err + } + + mp := aggregated.ChunkedMetricWithStoragePolicy{ + ChunkedMetric: aggregated.ChunkedMetric{ + ChunkedID: id.ChunkedID{ + Data: []byte(metric.ID), + }, + TimeNanos: metric.TimeNanos, + Value: metric.Value, + }, + StoragePolicy: storagePolicy, + } + + if err := pw.Write(mp); err != nil { + agg.metrics.addPassthrough.ReportError(err) + return err + } + agg.metrics.addPassthrough.ReportSuccess(agg.nowFn().Sub(callStart)) + return nil +} + func (agg *aggregator) Resign() error { ctx, cancel := context.WithTimeout(context.Background(), agg.resignTimeout) defer cancel() @@ -284,6 +328,7 @@ func (agg *aggregator) Close() error { agg.closeShardSetWithLock() } agg.flushHandler.Close() + agg.passthroughWriter.Close() if agg.adminClient != nil { agg.adminClient.Close() } @@ -291,6 +336,21 @@ func (agg *aggregator) Close() error { return nil } +func (agg *aggregator) passWriter() (writer.Writer, error) { + agg.RLock() + defer agg.RUnlock() + + if agg.state != aggregatorOpen { + return nil, errAggregatorNotOpenOrClosed + } + + if agg.electionManager.ElectionState() == FollowerState { + return writer.NewBlackholeWriter(), nil + } + + return agg.passthroughWriter, nil +} + func (agg *aggregator) shardFor(id id.RawID) (*aggregatorShard, error) { agg.RLock() shard, err := agg.shardForWithLock(id, noUpdateShards) @@ -760,6 +820,29 @@ func (m *aggregatorAddTimedMetrics) ReportError(err error) { } } +type aggregatorAddPassthroughMetrics struct { + aggregatorAddMetricMetrics + followerNoop tally.Counter +} + +func newAggregatorAddPassthroughMetrics( + scope tally.Scope, + samplingRate float64, +) aggregatorAddPassthroughMetrics { + return aggregatorAddPassthroughMetrics{ + aggregatorAddMetricMetrics: newAggregatorAddMetricMetrics(scope, samplingRate), + followerNoop: scope.Counter("follower-noop"), + } +} + +func (m *aggregatorAddPassthroughMetrics) ReportError(err error) { + m.aggregatorAddMetricMetrics.ReportError(err) +} + +func (m *aggregatorAddPassthroughMetrics) ReportFollowerNoop() { + m.followerNoop.Inc(1) +} + type latencyBucketKey struct { resolution time.Duration numForwardedTimes int @@ -930,19 +1013,21 @@ func newAggregatorShardSetIDMetrics(scope tally.Scope) aggregatorShardSetIDMetri } type aggregatorMetrics struct { - counters tally.Counter - timers tally.Counter - timerBatches tally.Counter - gauges tally.Counter - forwarded tally.Counter - timed tally.Counter - addUntimed aggregatorAddUntimedMetrics - addTimed aggregatorAddTimedMetrics - addForwarded aggregatorAddForwardedMetrics - placement aggregatorPlacementMetrics - shards aggregatorShardsMetrics - shardSetID aggregatorShardSetIDMetrics - tick aggregatorTickMetrics + counters tally.Counter + timers tally.Counter + timerBatches tally.Counter + gauges tally.Counter + forwarded tally.Counter + timed tally.Counter + passthrough tally.Counter + addUntimed aggregatorAddUntimedMetrics + addTimed aggregatorAddTimedMetrics + addForwarded aggregatorAddForwardedMetrics + addPassthrough aggregatorAddPassthroughMetrics + placement aggregatorPlacementMetrics + shards aggregatorShardsMetrics + shardSetID aggregatorShardSetIDMetrics + tick aggregatorTickMetrics } func newAggregatorMetrics( @@ -953,24 +1038,27 @@ func newAggregatorMetrics( addUntimedScope := scope.SubScope("addUntimed") addTimedScope := scope.SubScope("addTimed") addForwardedScope := scope.SubScope("addForwarded") + addPassthroughScope := scope.SubScope("addPassthrough") placementScope := scope.SubScope("placement") shardsScope := scope.SubScope("shards") shardSetIDScope := scope.SubScope("shard-set-id") tickScope := scope.SubScope("tick") return aggregatorMetrics{ - counters: scope.Counter("counters"), - timers: scope.Counter("timers"), - timerBatches: scope.Counter("timer-batches"), - gauges: scope.Counter("gauges"), - forwarded: scope.Counter("forwarded"), - timed: scope.Counter("timed"), - addUntimed: newAggregatorAddUntimedMetrics(addUntimedScope, samplingRate), - addTimed: newAggregatorAddTimedMetrics(addTimedScope, samplingRate), - addForwarded: newAggregatorAddForwardedMetrics(addForwardedScope, samplingRate, maxAllowedForwardingDelayFn), - placement: newAggregatorPlacementMetrics(placementScope), - shards: newAggregatorShardsMetrics(shardsScope), - shardSetID: newAggregatorShardSetIDMetrics(shardSetIDScope), - tick: newAggregatorTickMetrics(tickScope), + counters: scope.Counter("counters"), + timers: scope.Counter("timers"), + timerBatches: scope.Counter("timer-batches"), + gauges: scope.Counter("gauges"), + forwarded: scope.Counter("forwarded"), + timed: scope.Counter("timed"), + passthrough: scope.Counter("passthrough"), + addUntimed: newAggregatorAddUntimedMetrics(addUntimedScope, samplingRate), + addTimed: newAggregatorAddTimedMetrics(addTimedScope, samplingRate), + addForwarded: newAggregatorAddForwardedMetrics(addForwardedScope, samplingRate, maxAllowedForwardingDelayFn), + addPassthrough: newAggregatorAddPassthroughMetrics(addPassthroughScope, samplingRate), + placement: newAggregatorPlacementMetrics(placementScope), + shards: newAggregatorShardsMetrics(shardsScope), + shardSetID: newAggregatorShardSetIDMetrics(shardSetIDScope), + tick: newAggregatorTickMetrics(tickScope), } } diff --git a/src/aggregator/aggregator/aggregator_test.go b/src/aggregator/aggregator/aggregator_test.go index 5a4b3cd079..9ecebb10c6 100644 --- a/src/aggregator/aggregator/aggregator_test.go +++ b/src/aggregator/aggregator/aggregator_test.go @@ -77,6 +77,12 @@ var ( TimeNanos: 12345, Values: []float64{76109, 23891}, } + testPassthroughMetric = aggregated.Metric{ + Type: metric.CounterType, + ID: []byte("testPassthrough"), + TimeNanos: 12345, + Value: 1000, + } testInvalidMetric = unaggregated.MetricUnion{ Type: metric.UnknownType, ID: []byte("testInvalid"), @@ -126,6 +132,7 @@ var ( SourceID: 1234, NumForwardedTimes: 3, } + testPassthroughStroagePolicy = policy.NewStoragePolicy(time.Minute, xtime.Minute, 12*time.Hour) ) func TestAggregatorOpenAlreadyOpen(t *testing.T) { @@ -664,6 +671,25 @@ func TestAggregatorResignSuccess(t *testing.T) { require.NoError(t, agg.Resign()) } +func TestAggregatorAddPassthroughNotOpen(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + agg, _ := testAggregator(t, ctrl) + err := agg.AddPassthrough(testPassthroughMetric, testPassthroughStroagePolicy) + require.Equal(t, errAggregatorNotOpenOrClosed, err) +} + +func TestAggregatorAddPassthroughSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + agg, _ := testAggregator(t, ctrl) + require.NoError(t, agg.Open()) + err := agg.AddPassthrough(testPassthroughMetric, testPassthroughStroagePolicy) + require.NoError(t, err) +} + func TestAggregatorStatus(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1100,6 +1126,7 @@ func testOptions(ctrl *gomock.Controller) Options { electionMgr.EXPECT().Reset().Return(nil).AnyTimes() electionMgr.EXPECT().Open(gomock.Any()).Return(nil).AnyTimes() electionMgr.EXPECT().Close().Return(nil).AnyTimes() + electionMgr.EXPECT().ElectionState().Return(LeaderState).AnyTimes() flushManager := NewMockFlushManager(ctrl) flushManager.EXPECT().Reset().Return(nil).AnyTimes() @@ -1117,6 +1144,11 @@ func testOptions(ctrl *gomock.Controller) Options { h.EXPECT().NewWriter(gomock.Any()).Return(w, nil).AnyTimes() h.EXPECT().Close().AnyTimes() + pw := writer.NewMockWriter(ctrl) + pw.EXPECT().Write(gomock.Any()).Return(nil).AnyTimes() + pw.EXPECT().Flush().Return(nil).AnyTimes() + pw.EXPECT().Close().Return(nil).AnyTimes() + cl := client.NewMockAdminClient(ctrl) cl.EXPECT().Flush().Return(nil).AnyTimes() cl.EXPECT().Close().AnyTimes() @@ -1133,6 +1165,7 @@ func testOptions(ctrl *gomock.Controller) Options { SetElectionManager(electionMgr). SetFlushManager(flushManager). SetFlushHandler(h). + SetPassthroughWriter(pw). SetAdminClient(cl). SetMaxAllowedForwardingDelayFn(infiniteAllowedDelayFn). SetBufferForFutureTimedMetric(math.MaxInt64). diff --git a/src/aggregator/aggregator/capture/aggregator.go b/src/aggregator/aggregator/capture/aggregator.go index 8b9c0cc3a5..d54ca38266 100644 --- a/src/aggregator/aggregator/capture/aggregator.go +++ b/src/aggregator/aggregator/capture/aggregator.go @@ -39,13 +39,14 @@ import ( type aggregator struct { sync.RWMutex - numMetricsAdded int - countersWithMetadatas []unaggregated.CounterWithMetadatas - batchTimersWithMetadatas []unaggregated.BatchTimerWithMetadatas - gaugesWithMetadatas []unaggregated.GaugeWithMetadatas - forwardedMetricsWithMetadata []aggregated.ForwardedMetricWithMetadata - timedMetricsWithMetadata []aggregated.TimedMetricWithMetadata - timedMetricsWithMetadatas []aggregated.TimedMetricWithMetadatas + numMetricsAdded int + countersWithMetadatas []unaggregated.CounterWithMetadatas + batchTimersWithMetadatas []unaggregated.BatchTimerWithMetadatas + gaugesWithMetadatas []unaggregated.GaugeWithMetadatas + forwardedMetricsWithMetadata []aggregated.ForwardedMetricWithMetadata + timedMetricsWithMetadata []aggregated.TimedMetricWithMetadata + timedMetricsWithMetadatas []aggregated.TimedMetricWithMetadatas + passthroughMetricsWithMetadata []aggregated.PassthroughMetricWithMetadata } // NewAggregator creates a new capturing aggregator. @@ -152,6 +153,26 @@ func (agg *aggregator) AddForwarded( return nil } +func (agg *aggregator) AddPassthrough( + metric aggregated.Metric, + storagePolicy policy.StoragePolicy, +) error { + // Clone the metric and timed metadata to ensure it cannot be mutated externally. + metric = cloneTimedMetric(metric) + storagePolicy = cloneStoragePolicy(storagePolicy) + + agg.Lock() + defer agg.Unlock() + + pm := aggregated.PassthroughMetricWithMetadata{ + Metric: metric, + StoragePolicy: storagePolicy, + } + agg.passthroughMetricsWithMetadata = append(agg.passthroughMetricsWithMetadata, pm) + agg.numMetricsAdded++ + return nil +} + func (agg *aggregator) Resign() error { return nil } func (agg *aggregator) Status() aggr.RuntimeStatus { return aggr.RuntimeStatus{} } func (agg *aggregator) Close() error { return nil } @@ -167,17 +188,19 @@ func (agg *aggregator) Snapshot() SnapshotResult { agg.Lock() result := SnapshotResult{ - CountersWithMetadatas: agg.countersWithMetadatas, - BatchTimersWithMetadatas: agg.batchTimersWithMetadatas, - GaugesWithMetadatas: agg.gaugesWithMetadatas, - ForwardedMetricsWithMetadata: agg.forwardedMetricsWithMetadata, - TimedMetricWithMetadata: agg.timedMetricsWithMetadata, + CountersWithMetadatas: agg.countersWithMetadatas, + BatchTimersWithMetadatas: agg.batchTimersWithMetadatas, + GaugesWithMetadatas: agg.gaugesWithMetadatas, + ForwardedMetricsWithMetadata: agg.forwardedMetricsWithMetadata, + TimedMetricWithMetadata: agg.timedMetricsWithMetadata, + PassthroughMetricWithMetadata: agg.passthroughMetricsWithMetadata, } agg.countersWithMetadatas = nil agg.batchTimersWithMetadatas = nil agg.gaugesWithMetadatas = nil agg.forwardedMetricsWithMetadata = nil agg.timedMetricsWithMetadata = nil + agg.passthroughMetricsWithMetadata = nil agg.numMetricsAdded = 0 agg.Unlock() @@ -263,3 +286,8 @@ func cloneTimedMetadata(meta metadata.TimedMetadata) metadata.TimedMetadata { cloned := meta return cloned } + +func cloneStoragePolicy(sp policy.StoragePolicy) policy.StoragePolicy { + cloned := sp + return cloned +} diff --git a/src/aggregator/aggregator/capture/aggregator_test.go b/src/aggregator/aggregator/capture/aggregator_test.go index 85bf3172ca..6032b4e0b5 100644 --- a/src/aggregator/aggregator/capture/aggregator_test.go +++ b/src/aggregator/aggregator/capture/aggregator_test.go @@ -47,17 +47,17 @@ var ( } testBatchTimer = unaggregated.MetricUnion{ Type: metric.TimerType, - ID: id.RawID("testCounter"), + ID: id.RawID("testBatchTimer"), BatchTimerVal: []float64{1.0, 3.5, 2.2, 6.5, 4.8}, } testGauge = unaggregated.MetricUnion{ Type: metric.GaugeType, - ID: id.RawID("testCounter"), + ID: id.RawID("testGauge"), GaugeVal: 123.456, } testTimed = aggregated.Metric{ Type: metric.CounterType, - ID: []byte("testForwarded"), + ID: []byte("testTimed"), TimeNanos: 12345, Value: -13.5, } @@ -67,6 +67,12 @@ var ( TimeNanos: 12345, Values: []float64{908, -13.5}, } + testPassthrough = aggregated.Metric{ + Type: metric.CounterType, + ID: []byte("testPassthrough"), + TimeNanos: 12345, + Value: -12.3, + } testInvalid = unaggregated.MetricUnion{ Type: metric.UnknownType, ID: id.RawID("invalid"), @@ -91,6 +97,7 @@ var ( SourceID: 1234, NumForwardedTimes: 3, } + testPassthroughStoragePolicy = policy.NewStoragePolicy(time.Minute, xtime.Minute, 12*time.Hour) ) func TestAggregator(t *testing.T) { @@ -155,6 +162,17 @@ func TestAggregator(t *testing.T) { require.Equal(t, 5, agg.NumMetricsAdded()) + // Add valid passthrough metrics with storage policy. + expected.PassthroughMetricWithMetadata = append( + expected.PassthroughMetricWithMetadata, + aggregated.PassthroughMetricWithMetadata{ + Metric: testPassthrough, + StoragePolicy: testPassthroughStoragePolicy, + }, + ) + require.NoError(t, agg.AddPassthrough(testPassthrough, testPassthroughStoragePolicy)) + require.Equal(t, 6, agg.NumMetricsAdded()) + res := agg.Snapshot() require.Equal(t, expected, res) } diff --git a/src/aggregator/aggregator/capture/types.go b/src/aggregator/aggregator/capture/types.go index 8e0ac3f2b2..da7c2d66aa 100644 --- a/src/aggregator/aggregator/capture/types.go +++ b/src/aggregator/aggregator/capture/types.go @@ -40,9 +40,10 @@ type Aggregator interface { // SnapshotResult is the snapshot result. type SnapshotResult struct { - CountersWithMetadatas []unaggregated.CounterWithMetadatas - BatchTimersWithMetadatas []unaggregated.BatchTimerWithMetadatas - GaugesWithMetadatas []unaggregated.GaugeWithMetadatas - ForwardedMetricsWithMetadata []aggregated.ForwardedMetricWithMetadata - TimedMetricWithMetadata []aggregated.TimedMetricWithMetadata + CountersWithMetadatas []unaggregated.CounterWithMetadatas + BatchTimersWithMetadatas []unaggregated.BatchTimerWithMetadatas + GaugesWithMetadatas []unaggregated.GaugeWithMetadatas + ForwardedMetricsWithMetadata []aggregated.ForwardedMetricWithMetadata + TimedMetricWithMetadata []aggregated.TimedMetricWithMetadata + PassthroughMetricWithMetadata []aggregated.PassthroughMetricWithMetadata } diff --git a/src/aggregator/aggregator/handler/writer/sharded.go b/src/aggregator/aggregator/handler/writer/sharded.go new file mode 100644 index 0000000000..6bfbf58963 --- /dev/null +++ b/src/aggregator/aggregator/handler/writer/sharded.go @@ -0,0 +1,154 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package writer + +import ( + "sync" + + "github.com/m3db/m3/src/aggregator/sharding" + "github.com/m3db/m3/src/metrics/metric/aggregated" + xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/instrument" + + "github.com/pkg/errors" +) + +var ( + errShardedWriterNoWriters = errors.New("no backing writers provided") + errShardedWriterClosed = errors.New("sharded writer closed") +) + +type shardedWriter struct { + mutex sync.RWMutex + closed bool + writers []*threadsafeWriter + shardFn sharding.AggregatedShardFn + numShards int +} + +var _ Writer = &shardedWriter{} + +// NewShardedWriter shards writes to the provided writers with the given sharding fn. +func NewShardedWriter( + writers []Writer, + shardFn sharding.AggregatedShardFn, + iOpts instrument.Options, +) (Writer, error) { + if len(writers) == 0 { + return nil, errShardedWriterNoWriters + } + + threadsafeWriters := make([]*threadsafeWriter, 0, len(writers)) + for _, w := range writers { + threadsafeWriters = append(threadsafeWriters, &threadsafeWriter{ + writer: w, + }) + } + + return &shardedWriter{ + numShards: len(writers), + writers: threadsafeWriters, + shardFn: shardFn, + }, nil +} + +func (w *shardedWriter) Write(mp aggregated.ChunkedMetricWithStoragePolicy) error { + w.mutex.RLock() + if w.closed { + w.mutex.RUnlock() + return errShardedWriterClosed + } + + shardID := w.shardFn(mp.ChunkedID, w.numShards) + writerErr := w.writers[shardID].Write(mp) + w.mutex.RUnlock() + + return writerErr +} + +func (w *shardedWriter) Flush() error { + w.mutex.RLock() + defer w.mutex.RUnlock() + + if w.closed { + return errShardedWriterClosed + } + + var multiErr xerrors.MultiError + for i := 0; i < w.numShards; i++ { + multiErr = multiErr.Add(w.writers[i].Flush()) + } + + if multiErr.Empty() { + return nil + } + + return errors.WithMessage(multiErr.FinalError(), "failed to flush sharded writer") +} + +func (w *shardedWriter) Close() error { + w.mutex.Lock() + defer w.mutex.Unlock() + + if w.closed { + return errShardedWriterClosed + } + w.closed = true + + var multiErr xerrors.MultiError + for i := 0; i < w.numShards; i++ { + multiErr = multiErr.Add(w.writers[i].Close()) + } + + if multiErr.Empty() { + return nil + } + + return errors.WithMessage(multiErr.FinalError(), "failed to close sharded writer") +} + +type threadsafeWriter struct { + mutex sync.Mutex + writer Writer +} + +var _ Writer = &threadsafeWriter{} + +func (w *threadsafeWriter) Write(mp aggregated.ChunkedMetricWithStoragePolicy) error { + w.mutex.Lock() + err := w.writer.Write(mp) + w.mutex.Unlock() + return err +} + +func (w *threadsafeWriter) Flush() error { + w.mutex.Lock() + err := w.writer.Flush() + w.mutex.Unlock() + return err +} + +func (w *threadsafeWriter) Close() error { + w.mutex.Lock() + err := w.writer.Close() + w.mutex.Unlock() + return err +} diff --git a/src/aggregator/aggregator/handler/writer/sharded_test.go b/src/aggregator/aggregator/handler/writer/sharded_test.go new file mode 100644 index 0000000000..a07e4d288d --- /dev/null +++ b/src/aggregator/aggregator/handler/writer/sharded_test.go @@ -0,0 +1,67 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package writer + +import ( + "testing" + + "github.com/m3db/m3/src/metrics/metric/aggregated" + "github.com/m3db/m3/src/metrics/metric/id" + "github.com/m3db/m3/src/x/instrument" + xtest "github.com/m3db/m3/src/x/test" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" +) + +func TestNewShardedWriter(t *testing.T) { + ctrl := gomock.NewController(xtest.Reporter{t}) + defer ctrl.Finish() + + w1, w2 := NewMockWriter(ctrl), NewMockWriter(ctrl) + writers := []Writer{w1, w2} + + shardFn := func(_ id.ChunkedID, _ int) uint32 { + return 0 + } + + w, err := NewShardedWriter(writers, shardFn, instrument.NewOptions()) + require.NoError(t, err) + + metric := aggregated.ChunkedMetricWithStoragePolicy{ + ChunkedMetric: aggregated.ChunkedMetric{ + ChunkedID: id.ChunkedID{ + Data: []byte("some-random-id"), + }, + }, + } + + w1.EXPECT().Write(metric).Return(nil) + require.NoError(t, w.Write(metric)) + + w1.EXPECT().Flush().Return(nil) + w2.EXPECT().Flush().Return(nil) + require.NoError(t, w.Flush()) + + w1.EXPECT().Close().Return(nil) + w2.EXPECT().Close().Return(nil) + require.NoError(t, w.Close()) +} diff --git a/src/aggregator/aggregator/options.go b/src/aggregator/aggregator/options.go index c511fb0bce..006801f365 100644 --- a/src/aggregator/aggregator/options.go +++ b/src/aggregator/aggregator/options.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/aggregator/aggregation/quantile/cm" "github.com/m3db/m3/src/aggregator/aggregator/handler" + "github.com/m3db/m3/src/aggregator/aggregator/handler/writer" "github.com/m3db/m3/src/aggregator/client" "github.com/m3db/m3/src/aggregator/runtime" "github.com/m3db/m3/src/aggregator/sharding" @@ -199,6 +200,12 @@ type Options interface { // FlushHandler returns the handler that flushes buffered encoders. FlushHandler() handler.Handler + // SetPassthroughWriter sets the writer for passthrough metrics. + SetPassthroughWriter(value writer.Writer) Options + + // PassthroughWriter returns the writer for passthrough metrics. + PassthroughWriter() writer.Writer + // SetEntryTTL sets the ttl for expiring stale entries. SetEntryTTL(value time.Duration) Options @@ -328,6 +335,7 @@ type options struct { bufferDurationAfterShardCutoff time.Duration flushManager FlushManager flushHandler handler.Handler + passthroughWriter writer.Writer entryTTL time.Duration entryCheckInterval time.Duration entryCheckBatchPercent float64 @@ -374,6 +382,7 @@ func NewOptions() Options { shardFn: sharding.Murmur32Hash.MustShardFn(), bufferDurationBeforeShardCutover: defaultBufferDurationBeforeShardCutover, bufferDurationAfterShardCutoff: defaultBufferDurationAfterShardCutoff, + passthroughWriter: writer.NewBlackholeWriter(), entryTTL: defaultEntryTTL, entryCheckInterval: defaultEntryCheckInterval, entryCheckBatchPercent: defaultEntryCheckBatchPercent, @@ -591,6 +600,16 @@ func (o *options) FlushHandler() handler.Handler { return o.flushHandler } +func (o *options) SetPassthroughWriter(value writer.Writer) Options { + opts := *o + opts.passthroughWriter = value + return &opts +} + +func (o *options) PassthroughWriter() writer.Writer { + return o.passthroughWriter +} + func (o *options) SetEntryTTL(value time.Duration) Options { opts := *o opts.entryTTL = value diff --git a/src/aggregator/aggregator/options_test.go b/src/aggregator/aggregator/options_test.go index 75c5545a71..be97fffd6d 100644 --- a/src/aggregator/aggregator/options_test.go +++ b/src/aggregator/aggregator/options_test.go @@ -27,6 +27,7 @@ import ( "github.com/m3db/m3/src/aggregator/aggregation/quantile/cm" "github.com/m3db/m3/src/aggregator/aggregator/handler" + "github.com/m3db/m3/src/aggregator/aggregator/handler/writer" "github.com/m3db/m3/src/aggregator/client" "github.com/m3db/m3/src/aggregator/runtime" "github.com/m3db/m3/src/x/clock" @@ -149,6 +150,15 @@ func TestSetFlushHandler(t *testing.T) { require.Equal(t, h, o.FlushHandler()) } +func TestSetPassthroughWriter(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + w := writer.NewMockWriter(ctrl) + o := NewOptions().SetPassthroughWriter(w) + require.Equal(t, w, o.PassthroughWriter()) +} + func TestSetEntryTTL(t *testing.T) { value := time.Minute o := NewOptions().SetEntryTTL(value) diff --git a/src/aggregator/client/client.go b/src/aggregator/client/client.go index 2317b947ab..fbe60516fe 100644 --- a/src/aggregator/client/client.go +++ b/src/aggregator/client/client.go @@ -34,6 +34,7 @@ import ( "github.com/m3db/m3/src/metrics/metric/aggregated" "github.com/m3db/m3/src/metrics/metric/id" "github.com/m3db/m3/src/metrics/metric/unaggregated" + "github.com/m3db/m3/src/metrics/policy" "github.com/m3db/m3/src/x/clock" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/instrument" @@ -75,6 +76,12 @@ type Client interface { metadata metadata.TimedMetadata, ) error + // WritePassthrough writes passthrough metrics. + WritePassthrough( + metric aggregated.Metric, + storagePolicy policy.StoragePolicy, + ) error + // WriteTimedWithStagedMetadatas writes timed metrics with staged metadatas. WriteTimedWithStagedMetadatas( metric aggregated.Metric, @@ -113,6 +120,7 @@ type clientMetrics struct { writeUntimedCounter instrument.MethodMetrics writeUntimedBatchTimer instrument.MethodMetrics writeUntimedGauge instrument.MethodMetrics + writePassthrough instrument.MethodMetrics writeForwarded instrument.MethodMetrics flush instrument.MethodMetrics shardNotOwned tally.Counter @@ -124,6 +132,7 @@ func newClientMetrics(scope tally.Scope, sampleRate float64) clientMetrics { writeUntimedCounter: instrument.NewMethodMetrics(scope, "writeUntimedCounter", sampleRate), writeUntimedBatchTimer: instrument.NewMethodMetrics(scope, "writeUntimedBatchTimer", sampleRate), writeUntimedGauge: instrument.NewMethodMetrics(scope, "writeUntimedGauge", sampleRate), + writePassthrough: instrument.NewMethodMetrics(scope, "writePassthrough", sampleRate), writeForwarded: instrument.NewMethodMetrics(scope, "writeForwarded", sampleRate), flush: instrument.NewMethodMetrics(scope, "flush", sampleRate), shardNotOwned: scope.Counter("shard-not-owned"), @@ -262,6 +271,23 @@ func (c *client) WriteTimed( return err } +func (c *client) WritePassthrough( + metric aggregated.Metric, + storagePolicy policy.StoragePolicy, +) error { + callStart := c.nowFn() + payload := payloadUnion{ + payloadType: passthroughType, + passthrough: passthroughPayload{ + metric: metric, + storagePolicy: storagePolicy, + }, + } + err := c.write(metric.ID, metric.TimeNanos, payload) + c.metrics.writePassthrough.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) + return err +} + func (c *client) WriteTimedWithStagedMetadatas( metric aggregated.Metric, metadatas metadata.StagedMetadatas, diff --git a/src/aggregator/client/client_mock.go b/src/aggregator/client/client_mock.go index a36674b14b..e63c8ab47c 100644 --- a/src/aggregator/client/client_mock.go +++ b/src/aggregator/client/client_mock.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/metrics/metadata" "github.com/m3db/m3/src/metrics/metric/aggregated" "github.com/m3db/m3/src/metrics/metric/unaggregated" + "github.com/m3db/m3/src/metrics/policy" "github.com/golang/mock/gomock" ) @@ -99,6 +100,20 @@ func (mr *MockClientMockRecorder) Init() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockClient)(nil).Init)) } +// WritePassthrough mocks base method +func (m *MockClient) WritePassthrough(arg0 aggregated.Metric, arg1 policy.StoragePolicy) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WritePassthrough", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// WritePassthrough indicates an expected call of WritePassthrough +func (mr *MockClientMockRecorder) WritePassthrough(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WritePassthrough", reflect.TypeOf((*MockClient)(nil).WritePassthrough), arg0, arg1) +} + // WriteTimed mocks base method func (m *MockClient) WriteTimed(arg0 aggregated.Metric, arg1 metadata.TimedMetadata) error { m.ctrl.T.Helper() @@ -248,6 +263,20 @@ func (mr *MockAdminClientMockRecorder) WriteForwarded(arg0, arg1 interface{}) *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteForwarded", reflect.TypeOf((*MockAdminClient)(nil).WriteForwarded), arg0, arg1) } +// WritePassthrough mocks base method +func (m *MockAdminClient) WritePassthrough(arg0 aggregated.Metric, arg1 policy.StoragePolicy) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WritePassthrough", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// WritePassthrough indicates an expected call of WritePassthrough +func (mr *MockAdminClientMockRecorder) WritePassthrough(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WritePassthrough", reflect.TypeOf((*MockAdminClient)(nil).WritePassthrough), arg0, arg1) +} + // WriteTimed mocks base method func (m *MockAdminClient) WriteTimed(arg0 aggregated.Metric, arg1 metadata.TimedMetadata) error { m.ctrl.T.Helper() diff --git a/src/aggregator/client/client_test.go b/src/aggregator/client/client_test.go index bb024a9f5e..230ede3c4d 100644 --- a/src/aggregator/client/client_test.go +++ b/src/aggregator/client/client_test.go @@ -66,7 +66,7 @@ var ( } testTimed = aggregated.Metric{ Type: metric.CounterType, - ID: []byte("testForwarded"), + ID: []byte("testTimed"), TimeNanos: 1234, Value: 178, } @@ -76,6 +76,12 @@ var ( TimeNanos: 1234, Values: []float64{34567, 256, 178}, } + testPassthrough = aggregated.Metric{ + Type: metric.CounterType, + ID: []byte("testPassthrough"), + TimeNanos: 12345, + Value: 123, + } testStagedMetadatas = metadata.StagedMetadatas{ { CutoverNanos: 100, @@ -127,7 +133,8 @@ var ( SourceID: 1234, NumForwardedTimes: 3, } - testPlacementInstances = []placement.Instance{ + testPassthroughMetadata = policy.NewStoragePolicy(time.Minute, xtime.Minute, 12*time.Hour) + testPlacementInstances = []placement.Instance{ placement.NewInstance(). SetID("instance1"). SetEndpoint("instance1_endpoint"). @@ -645,6 +652,106 @@ func TestClientWriteForwardedMetricPartialError(t *testing.T) { require.Equal(t, testForwardMetadata, payloadRes.forwarded.metadata) } +func TestClientWritePassthroughMetricSuccess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + instancesRes []placement.Instance + shardRes uint32 + payloadRes payloadUnion + ) + writerMgr := NewMockinstanceWriterManager(ctrl) + writerMgr.EXPECT(). + Write(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func( + instance placement.Instance, + shardID uint32, + payload payloadUnion, + ) error { + instancesRes = append(instancesRes, instance) + shardRes = shardID + payloadRes = payload + return nil + }). + MinTimes(1) + stagedPlacement := placement.NewMockActiveStagedPlacement(ctrl) + stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1) + watcher := placement.NewMockStagedPlacementWatcher(ctrl) + watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1) + c := NewClient(testOptions()).(*client) + c.state = clientInitialized + c.nowFn = func() time.Time { return time.Unix(0, testNowNanos) } + c.writerMgr = writerMgr + c.placementWatcher = watcher + + expectedInstances := []placement.Instance{ + testPlacementInstances[0], + testPlacementInstances[2], + } + testMetric := testPassthrough + testMetric.TimeNanos = testNowNanos + err := c.WritePassthrough(testMetric, testPassthroughMetadata) + require.NoError(t, err) + require.Equal(t, expectedInstances, instancesRes) + require.Equal(t, uint32(1), shardRes) + require.Equal(t, passthroughType, payloadRes.payloadType) + require.Equal(t, testMetric, payloadRes.passthrough.metric) + require.Equal(t, testPassthroughMetadata, payloadRes.passthrough.storagePolicy) +} + +func TestClientWritePassthroughMetricPartialError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + instancesRes []placement.Instance + shardRes uint32 + payloadRes payloadUnion + errInstanceWrite = errors.New("instance write error") + ) + writerMgr := NewMockinstanceWriterManager(ctrl) + writerMgr.EXPECT(). + Write(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func( + instance placement.Instance, + shardID uint32, + payload payloadUnion, + ) error { + if instance.ID() == testPlacementInstances[0].ID() { + return errInstanceWrite + } + instancesRes = append(instancesRes, instance) + shardRes = shardID + payloadRes = payload + return nil + }). + MinTimes(1) + stagedPlacement := placement.NewMockActiveStagedPlacement(ctrl) + stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1) + watcher := placement.NewMockStagedPlacementWatcher(ctrl) + watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1) + c := NewClient(testOptions()).(*client) + c.state = clientInitialized + c.nowFn = func() time.Time { return time.Unix(0, testNowNanos) } + c.writerMgr = writerMgr + c.placementWatcher = watcher + + expectedInstances := []placement.Instance{ + testPlacementInstances[2], + } + testMetric := testPassthrough + testMetric.TimeNanos = testNowNanos + err := c.WritePassthrough(testMetric, testPassthroughMetadata) + require.Error(t, err) + require.True(t, strings.Contains(err.Error(), errInstanceWrite.Error())) + require.Equal(t, expectedInstances, instancesRes) + require.Equal(t, uint32(1), shardRes) + require.Equal(t, passthroughType, payloadRes.payloadType) + require.Equal(t, testMetric, payloadRes.passthrough.metric) + require.Equal(t, testPassthroughMetadata, payloadRes.passthrough.storagePolicy) +} + func TestClientFlushClosed(t *testing.T) { c := NewClient(testOptions()).(*client) c.state = clientClosed diff --git a/src/aggregator/client/payload.go b/src/aggregator/client/payload.go index 7e24a793e3..ffc2722450 100644 --- a/src/aggregator/client/payload.go +++ b/src/aggregator/client/payload.go @@ -24,6 +24,7 @@ import ( "github.com/m3db/m3/src/metrics/metadata" "github.com/m3db/m3/src/metrics/metric/aggregated" "github.com/m3db/m3/src/metrics/metric/unaggregated" + "github.com/m3db/m3/src/metrics/policy" ) type payloadType int @@ -35,6 +36,7 @@ const ( forwardedType timedType timedWithStagedMetadatasType + passthroughType ) type untimedPayload struct { @@ -57,10 +59,16 @@ type timedWithStagedMetadatas struct { metadatas metadata.StagedMetadatas } +type passthroughPayload struct { + metric aggregated.Metric + storagePolicy policy.StoragePolicy +} + type payloadUnion struct { payloadType payloadType untimed untimedPayload forwarded forwardedPayload timed timedPayload timedWithStagedMetadatas timedWithStagedMetadatas + passthrough passthroughPayload } diff --git a/src/aggregator/client/writer.go b/src/aggregator/client/writer.go index 266470e3d7..4d94a69319 100644 --- a/src/aggregator/client/writer.go +++ b/src/aggregator/client/writer.go @@ -32,6 +32,7 @@ import ( "github.com/m3db/m3/src/metrics/metric" "github.com/m3db/m3/src/metrics/metric/aggregated" "github.com/m3db/m3/src/metrics/metric/unaggregated" + "github.com/m3db/m3/src/metrics/policy" xerrors "github.com/m3db/m3/src/x/errors" "github.com/uber-go/tally" @@ -167,6 +168,8 @@ func (w *writer) encodeWithLock( case timedWithStagedMetadatasType: elem := payload.timedWithStagedMetadatas return w.encodeTimedWithStagedMetadatasWithLock(encoder, elem.metric, elem.metadatas) + case passthroughType: + return w.encodePassthroughWithLock(encoder, payload.passthrough.metric, payload.passthrough.storagePolicy) default: return fmt.Errorf("unknown payload type: %v", payload.payloadType) } @@ -416,6 +419,45 @@ func (w *writer) encodeTimedWithStagedMetadatasWithLock( return w.enqueueBuffer(buffer) } +func (w *writer) encodePassthroughWithLock( + encoder *lockedEncoder, + metric aggregated.Metric, + storagePolicy policy.StoragePolicy, +) error { + encoder.Lock() + + sizeBefore := encoder.Len() + msg := encoding.UnaggregatedMessageUnion{ + Type: encoding.PassthroughMetricWithMetadataType, + PassthroughMetricWithMetadata: aggregated.PassthroughMetricWithMetadata{ + Metric: metric, + StoragePolicy: storagePolicy, + }} + if err := encoder.EncodeMessage(msg); err != nil { + w.log.Error("encode passthrough metric error", + zap.Any("metric", metric), + zap.Any("storagepolicy", storagePolicy), + zap.Error(err), + ) + // Rewind buffer and clear out the encoder error. + encoder.Truncate(sizeBefore) + encoder.Unlock() + w.metrics.encodeErrors.Inc(1) + return err + } + + // If the buffer size is not big enough, do nothing. + if sizeAfter := encoder.Len(); sizeAfter < w.flushSize { + encoder.Unlock() + return nil + } + + // Otherwise we enqueue the current buffer. + buffer := w.prepareEnqueueBufferWithLock(encoder, sizeBefore) + encoder.Unlock() + return w.enqueueBuffer(buffer) +} + // prepareEnqueueBufferWithLock prepares the writer to enqueue a // buffer onto its instance queue. It gets a new buffer from pool, // copies the bytes exceeding sizeBefore to it, resets the encoder diff --git a/src/aggregator/client/writer_test.go b/src/aggregator/client/writer_test.go index d673f2a4c4..1dcd471508 100644 --- a/src/aggregator/client/writer_test.go +++ b/src/aggregator/client/writer_test.go @@ -802,14 +802,15 @@ func testWriterConcurrentWriteStress( defer ctrl.Finish() var ( - numIter = 3000 - shard = uint32(0) - counters = make([]unaggregated.Counter, numIter) - timers = make([]unaggregated.BatchTimer, numIter) - gauges = make([]unaggregated.Gauge, numIter) - forwarded = make([]aggregated.ForwardedMetric, numIter) - resultsLock sync.Mutex - results [][]byte + numIter = 3000 + shard = uint32(0) + counters = make([]unaggregated.Counter, numIter) + timers = make([]unaggregated.BatchTimer, numIter) + gauges = make([]unaggregated.Gauge, numIter) + forwarded = make([]aggregated.ForwardedMetric, numIter) + passthroughed = make([]aggregated.Metric, numIter) + resultsLock sync.Mutex + results [][]byte ) // Construct metrics input. @@ -841,6 +842,12 @@ func testWriterConcurrentWriteStress( TimeNanos: int64(i), Values: forwardedVals, } + passthroughed[i] = aggregated.Metric{ + Type: metric.GaugeType, + ID: []byte(fmt.Sprintf("passthroughed%d", i)), + TimeNanos: int64(i), + Value: float64(i), + } } queue := NewMockinstanceQueue(ctrl) @@ -863,7 +870,7 @@ func testWriterConcurrentWriteStress( w.queue = queue var wg sync.WaitGroup - wg.Add(4) + wg.Add(5) go func() { defer wg.Done() @@ -940,14 +947,30 @@ func testWriterConcurrentWriteStress( } }() + go func() { + defer wg.Done() + + for i := 0; i < numIter; i++ { + payload := payloadUnion{ + payloadType: passthroughType, + passthrough: passthroughPayload{ + metric: passthroughed[i], + storagePolicy: testPassthroughMetadata, + }, + } + require.NoError(t, w.Write(shard, payload)) + } + }() + wg.Wait() w.Flush() var ( - resCounters = make([]unaggregated.Counter, 0, numIter) - resTimers = make([]unaggregated.BatchTimer, 0, numIter) - resGauges = make([]unaggregated.Gauge, 0, numIter) - resForwarded = make([]aggregated.ForwardedMetric, 0, numIter) + resCounters = make([]unaggregated.Counter, 0, numIter) + resTimers = make([]unaggregated.BatchTimer, 0, numIter) + resGauges = make([]unaggregated.Gauge, 0, numIter) + resForwarded = make([]aggregated.ForwardedMetric, 0, numIter) + resPassthroughed = make([]aggregated.Metric, 0, numIter) ) for i := 0; i < len(results); i++ { buf := bytes.NewBuffer(results[i]) @@ -971,6 +994,10 @@ func testWriterConcurrentWriteStress( require.Equal(t, testForwardMetadata, msgResult.ForwardedMetricWithMetadata.ForwardMetadata) metric := cloneForwardedMetric(msgResult.ForwardedMetricWithMetadata.ForwardedMetric) resForwarded = append(resForwarded, metric) + case encoding.PassthroughMetricWithMetadataType: + require.Equal(t, testPassthroughMetadata, msgResult.PassthroughMetricWithMetadata.StoragePolicy) + metric := clonePassthroughedMetric(msgResult.PassthroughMetricWithMetadata.Metric) + resPassthroughed = append(resPassthroughed, metric) default: require.Fail(t, "unrecognized message type %v", msgResult.Type) } @@ -1078,3 +1105,10 @@ func cloneForwardedMetric(m aggregated.ForwardedMetric) aggregated.ForwardedMetr cloned.Values = append([]float64(nil), m.Values...) return cloned } + +func clonePassthroughedMetric(m aggregated.Metric) aggregated.Metric { + cloned := m + cloned.ID = append([]byte(nil), m.ID...) + cloned.Value = m.Value + return cloned +} diff --git a/src/aggregator/integration/client.go b/src/aggregator/integration/client.go index 7551a0cc4a..b83394a813 100644 --- a/src/aggregator/integration/client.go +++ b/src/aggregator/integration/client.go @@ -187,6 +187,20 @@ func (c *client) writeForwardedMetricWithMetadata( return c.writeUnaggregatedMessage(msg) } +func (c *client) writePassthroughMetricWithMetadata( + metric aggregated.Metric, + storagePolicy policy.StoragePolicy, +) error { + msg := encoding.UnaggregatedMessageUnion{ + Type: encoding.PassthroughMetricWithMetadataType, + PassthroughMetricWithMetadata: aggregated.PassthroughMetricWithMetadata{ + Metric: metric, + StoragePolicy: storagePolicy, + }, + } + return c.writeUnaggregatedMessage(msg) +} + func (c *client) writeUnaggregatedMessage( msg encoding.UnaggregatedMessageUnion, ) error { diff --git a/src/aggregator/integration/integration_data.go b/src/aggregator/integration/integration_data.go index c05a275294..220f8119e4 100644 --- a/src/aggregator/integration/integration_data.go +++ b/src/aggregator/integration/integration_data.go @@ -210,6 +210,8 @@ func generateTestDataset(opts datasetGenOpts) (testDataset, error) { mu = generateTestForwardedMetric(metricType, opts.ids[i], timestamp.UnixNano(), intervalIdx, i, opts.valueGenOpts.forwarded) case timedMetric: mu = generateTestTimedMetric(metricType, opts.ids[i], timestamp.UnixNano(), intervalIdx, i, opts.valueGenOpts.timed) + case passthroughMetric: + mu = generateTestPassthroughMetric(metricType, opts.ids[i], timestamp.UnixNano(), intervalIdx, i, opts.valueGenOpts.passthrough) default: return nil, fmt.Errorf("unrecognized metric category: %v", opts.category) } @@ -277,6 +279,24 @@ func generateTestTimedMetric( } } +func generateTestPassthroughMetric( + metricType metric.Type, + id string, + timeNanos int64, + intervalIdx, idIdx int, + valueGenOpts passthroughValueGenOpts, +) metricUnion { + return metricUnion{ + category: passthroughMetric, + passthrough: aggregated.Metric{ + Type: metricType, + ID: metricid.RawID(id), + TimeNanos: timeNanos, + Value: valueGenOpts.passthroughValueGenFn(intervalIdx, idIdx), + }, + } +} + func generateTestForwardedMetric( metricType metric.Type, id string, @@ -402,6 +422,9 @@ func computeExpectedAggregationBuckets( values, err = addForwardedMetricToAggregation(values, mu.forwarded) case timedMetric: values, err = addTimedMetricToAggregation(values, mu.timed) + case passthroughMetric: + // Passthrough metrics need no aggregation. + err = nil default: err = fmt.Errorf("unrecognized metric category: %v", mu.category) } @@ -691,6 +714,7 @@ const ( untimedMetric metricCategory = iota forwardedMetric timedMetric + passthroughMetric ) func (c metricCategory) TimestampNanosFn() timestampNanosFn { @@ -707,16 +731,21 @@ func (c metricCategory) TimestampNanosFn() timestampNanosFn { return func(windowStartAtNanos int64, resolution time.Duration) int64 { return windowStartAtNanos + resolution.Nanoseconds() } + case passthroughMetric: + return func(windowStartAtNanos int64, _ time.Duration) int64 { + return windowStartAtNanos + } default: panic(fmt.Errorf("unknown category type: %v", c)) } } type metricUnion struct { - category metricCategory - untimed unaggregated.MetricUnion - forwarded aggregated.ForwardedMetric - timed aggregated.Metric + category metricCategory + untimed unaggregated.MetricUnion + forwarded aggregated.ForwardedMetric + timed aggregated.Metric + passthrough aggregated.Metric } func (mu metricUnion) Type() metric.Type { @@ -727,6 +756,8 @@ func (mu metricUnion) Type() metric.Type { return mu.forwarded.Type case timedMetric: return mu.timed.Type + case passthroughMetric: + return mu.passthrough.Type default: panic(fmt.Errorf("unknown category type: %v", mu.category)) } @@ -740,6 +771,8 @@ func (mu metricUnion) ID() metricid.RawID { return mu.forwarded.ID case timedMetric: return mu.timed.ID + case passthroughMetric: + return mu.passthrough.ID default: panic(fmt.Errorf("unknown category type: %v", mu.category)) } @@ -752,16 +785,18 @@ const ( stagedMetadatasType forwardMetadataType timedMetadataType + passthroughMetadataType ) type metadataFn func(idx int) metadataUnion type metadataUnion struct { - mType metadataType - policiesList policy.PoliciesList - stagedMetadatas metadata.StagedMetadatas - forwardMetadata metadata.ForwardMetadata - timedMetadata metadata.TimedMetadata + mType metadataType + policiesList policy.PoliciesList + stagedMetadatas metadata.StagedMetadatas + forwardMetadata metadata.ForwardMetadata + timedMetadata metadata.TimedMetadata + passthroughMetadata policy.StoragePolicy } func (mu metadataUnion) expectedAggregationKeys( @@ -777,6 +812,8 @@ func (mu metadataUnion) expectedAggregationKeys( return computeExpectedAggregationKeysFromForwardMetadata(mu.forwardMetadata), nil case timedMetadataType: return computeExpectedAggregationKeysFromTimedMetadata(mu.timedMetadata), nil + case passthroughMetadataType: + return computeExpectedAggregationKeysFromPassthroughMetadata(mu.passthroughMetadata), nil default: return nil, fmt.Errorf("unexpected metadata type: %v", mu.mType) } @@ -873,6 +910,17 @@ func computeExpectedAggregationKeysFromTimedMetadata( } } +func computeExpectedAggregationKeysFromPassthroughMetadata( + metadata policy.StoragePolicy, +) aggregationKeys { + return aggregationKeys{ + { + aggregationID: maggregation.DefaultID, + storagePolicy: metadata, + }, + } +} + func computeExpectedAggregationKeysFromForwardMetadata( metadata metadata.ForwardMetadata, ) aggregationKeys { @@ -947,6 +995,21 @@ var defaultTimedValueGenOpts = timedValueGenOpts{ timedValueGenFn: defaultTimedValueGenFn, } +type passthroughValueGenFn func(intervalIdx, idIdx int) float64 + +func defaultPassthroughValueGenFn(intervalIdx, _ int) float64 { + testVal := 123.456 + return testVal + float64(intervalIdx) +} + +type passthroughValueGenOpts struct { + passthroughValueGenFn passthroughValueGenFn +} + +var defaultPassthroughValueGenOpts = passthroughValueGenOpts{ + passthroughValueGenFn: defaultPassthroughValueGenFn, +} + type forwardedValueGenFn func(intervalIdx, idIdx int) []float64 func defaultForwardedValueGenFn(intervalIdx, _ int) []float64 { @@ -967,15 +1030,17 @@ var defaultForwardedValueGenOpts = forwardedValueGenOpts{ } type valueGenOpts struct { - untimed untimedValueGenOpts - timed timedValueGenOpts - forwarded forwardedValueGenOpts + untimed untimedValueGenOpts + timed timedValueGenOpts + forwarded forwardedValueGenOpts + passthrough passthroughValueGenOpts } var defaultValueGenOpts = valueGenOpts{ - untimed: defaultUntimedValueGenOpts, - timed: defaultTimedValueGenOpts, - forwarded: defaultForwardedValueGenOpts, + untimed: defaultUntimedValueGenOpts, + timed: defaultTimedValueGenOpts, + forwarded: defaultForwardedValueGenOpts, + passthrough: defaultPassthroughValueGenOpts, } type datasetGenOpts struct { diff --git a/src/aggregator/integration/one_client_passthru_test.go b/src/aggregator/integration/one_client_passthru_test.go new file mode 100644 index 0000000000..c63134d66e --- /dev/null +++ b/src/aggregator/integration/one_client_passthru_test.go @@ -0,0 +1,183 @@ +// +build integration + +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package integration + +import ( + "reflect" + "sort" + "sync" + "testing" + "time" + + "github.com/m3db/m3/src/cluster/placement" + "github.com/m3db/m3/src/metrics/metric" + "github.com/m3db/m3/src/metrics/metric/aggregated" + "github.com/m3db/m3/src/metrics/policy" + "github.com/m3db/m3/src/x/clock" + xtime "github.com/m3db/m3/src/x/time" + + "github.com/stretchr/testify/require" +) + +func TestOneClientPassthroughMetrics(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + + serverOpts := newTestServerOptions() + + // Clock setup. + var lock sync.RWMutex + now := time.Now().Truncate(time.Hour) + getNowFn := func() time.Time { + lock.RLock() + t := now + lock.RUnlock() + return t + } + setNowFn := func(t time.Time) { + lock.Lock() + now = t + lock.Unlock() + } + clockOpts := clock.NewOptions().SetNowFn(getNowFn) + serverOpts = serverOpts.SetClockOptions(clockOpts) + + // Placement setup. + numShards := 1024 + cfg := placementInstanceConfig{ + instanceID: serverOpts.InstanceID(), + shardSetID: serverOpts.ShardSetID(), + shardStartInclusive: 0, + shardEndExclusive: uint32(numShards), + } + instance := cfg.newPlacementInstance() + placement := newPlacement(numShards, []placement.Instance{instance}) + placementKey := serverOpts.PlacementKVKey() + placementStore := serverOpts.KVStore() + require.NoError(t, setPlacement(placementKey, placementStore, placement)) + + // Create server. + testServer := newTestServerSetup(t, serverOpts) + defer testServer.close() + + // Start the server. + log := testServer.aggregatorOpts.InstrumentOptions().Logger() + log.Info("test one client sending of passthrough metrics") + require.NoError(t, testServer.startServer()) + log.Info("server is now up") + require.NoError(t, testServer.waitUntilLeader()) + log.Info("server is now the leader") + + var ( + idPrefix = "full.passthru.id" + numIDs = 10 + start = getNowFn() + stop = start.Add(10 * time.Second) + interval = 2 * time.Second + ) + client := testServer.newClient() + require.NoError(t, client.connect()) + defer client.close() + + ids := generateTestIDs(idPrefix, numIDs) + metadataFn := func(idx int) metadataUnion { + return metadataUnion{ + mType: passthroughMetadataType, + passthroughMetadata: policy.NewStoragePolicy(2*time.Second, xtime.Second, time.Hour), + } + } + dataset := mustGenerateTestDataset(t, datasetGenOpts{ + start: start, + stop: stop, + interval: interval, + ids: ids, + category: passthroughMetric, + typeFn: constantMetricTypeFnFactory(metric.GaugeType), + valueGenOpts: defaultValueGenOpts, + metadataFn: metadataFn, + }) + + for _, data := range dataset { + setNowFn(data.timestamp) + for _, mm := range data.metricWithMetadatas { + require.NoError(t, client.writePassthroughMetricWithMetadata(mm.metric.passthrough, mm.metadata.passthroughMetadata)) + } + require.NoError(t, client.flush()) + + // Give server some time to process the incoming packets. + time.Sleep(100 * time.Millisecond) + } + + // Move time forward and wait for flushing to happen. + finalTime := stop.Add(time.Minute + 2*time.Second) + setNowFn(finalTime) + time.Sleep(2 * time.Second) + + // Stop the server. + require.NoError(t, testServer.stopServer()) + log.Info("server is now down") + + // Validate results. + expected := computeExpectedPassthroughResults(t, dataset) + actual := testServer.sortedResults() + require.Equal(t, dedupResults(expected), dedupResults(actual)) +} + +func computeExpectedPassthroughResults( + t *testing.T, + dataset testDataset, +) []aggregated.MetricWithStoragePolicy { + var expected []aggregated.MetricWithStoragePolicy + for _, testData := range dataset { + for _, metricWithMetadata := range testData.metricWithMetadatas { + require.Equal(t, passthroughMetric, metricWithMetadata.metric.category) + + expectedPassthrough := aggregated.MetricWithStoragePolicy{ + Metric: metricWithMetadata.metric.passthrough, + StoragePolicy: metricWithMetadata.metadata.passthroughMetadata, + } + + // The capturingWriter writes ChunkedMetricWithStoragePolicy which has no metric type defined. + expectedPassthrough.Metric.Type = metric.UnknownType + expected = append(expected, expectedPassthrough) + } + } + // Sort the aggregated metrics. + sort.Sort(byTimeIDPolicyAscending(expected)) + return expected +} + +func dedupResults( + results []aggregated.MetricWithStoragePolicy, +) []aggregated.MetricWithStoragePolicy { + var deduped []aggregated.MetricWithStoragePolicy + lenDeduped := 0 + for _, m := range results { + if lenDeduped == 0 || !reflect.DeepEqual(deduped[lenDeduped-1], m) { + deduped = append(deduped, m) + lenDeduped++ + } + } + return deduped +} diff --git a/src/aggregator/integration/setup.go b/src/aggregator/integration/setup.go index 6ec6edebb7..d28b15aaec 100644 --- a/src/aggregator/integration/setup.go +++ b/src/aggregator/integration/setup.go @@ -170,7 +170,11 @@ func newTestServerSetup(t *testing.T, opts testServerOptions) *testServerSetup { resultLock sync.Mutex ) handler := &capturingHandler{results: &results, resultLock: &resultLock} - aggregatorOpts = aggregatorOpts.SetFlushHandler(handler) + pw, err := handler.NewWriter(tally.NoopScope) + if err != nil { + panic(err.Error()) + } + aggregatorOpts = aggregatorOpts.SetFlushHandler(handler).SetPassthroughWriter(pw) // Set up entry pool. runtimeOpts := runtime.NewOptions() diff --git a/src/aggregator/server/rawtcp/server.go b/src/aggregator/server/rawtcp/server.go index cb6f4d9ba1..24876bc06a 100644 --- a/src/aggregator/server/rawtcp/server.go +++ b/src/aggregator/server/rawtcp/server.go @@ -38,6 +38,7 @@ import ( "github.com/m3db/m3/src/metrics/metadata" "github.com/m3db/m3/src/metrics/metric/aggregated" "github.com/m3db/m3/src/metrics/metric/unaggregated" + "github.com/m3db/m3/src/metrics/policy" xserver "github.com/m3db/m3/src/x/server" "github.com/uber-go/tally" @@ -61,6 +62,7 @@ type handlerMetrics struct { addUntimedErrors tally.Counter addTimedErrors tally.Counter addForwardedErrors tally.Counter + addPassthroughErrors tally.Counter unknownErrorTypeErrors tally.Counter decodeErrors tally.Counter errLogRateLimited tally.Counter @@ -72,6 +74,7 @@ func newHandlerMetrics(scope tally.Scope) handlerMetrics { addUntimedErrors: scope.Counter("add-untimed-errors"), addTimedErrors: scope.Counter("add-timed-errors"), addForwardedErrors: scope.Counter("add-forwarded-errors"), + addPassthroughErrors: scope.Counter("add-passthrough-errors"), unknownErrorTypeErrors: scope.Counter("unknown-error-type-errors"), decodeErrors: scope.Counter("decode-errors"), errLogRateLimited: scope.Counter("error-log-rate-limited"), @@ -124,13 +127,15 @@ func (s *handler) Handle(conn net.Conn) { // Iterate over the incoming metrics stream and queue up metrics. var ( - untimedMetric unaggregated.MetricUnion - stagedMetadatas metadata.StagedMetadatas - forwardedMetric aggregated.ForwardedMetric - forwardMetadata metadata.ForwardMetadata - timedMetric aggregated.Metric - timedMetadata metadata.TimedMetadata - err error + untimedMetric unaggregated.MetricUnion + stagedMetadatas metadata.StagedMetadatas + forwardedMetric aggregated.ForwardedMetric + forwardMetadata metadata.ForwardMetadata + timedMetric aggregated.Metric + timedMetadata metadata.TimedMetadata + passthroughMetric aggregated.Metric + passthroughMetadata policy.StoragePolicy + err error ) for it.Next() { current := it.Current() @@ -159,6 +164,10 @@ func (s *handler) Handle(conn net.Conn) { timedMetric = current.TimedMetricWithMetadatas.Metric stagedMetadatas = current.TimedMetricWithMetadatas.StagedMetadatas err = toAddTimedError(s.aggregator.AddTimedWithStagedMetadatas(timedMetric, stagedMetadatas)) + case encoding.PassthroughMetricWithMetadataType: + passthroughMetric = current.PassthroughMetricWithMetadata.Metric + passthroughMetadata = current.PassthroughMetricWithMetadata.StoragePolicy + err = toAddPassthroughError(s.aggregator.AddPassthrough(passthroughMetric, passthroughMetadata)) default: err = newUnknownMessageTypeError(current.Type) } @@ -207,6 +216,15 @@ func (s *handler) Handle(conn net.Conn) { zap.Float64("value", timedMetric.Value), zap.Error(err), ) + case addPassthroughError: + s.metrics.addPassthroughErrors.Inc(1) + s.log.Error("error adding passthrough metric", + zap.String("remoteAddress", remoteAddress), + zap.Stringer("id", timedMetric.ID), + zap.Time("timestamp", time.Unix(0, timedMetric.TimeNanos)), + zap.Float64("value", timedMetric.Value), + zap.Error(err), + ) default: s.metrics.unknownErrorTypeErrors.Inc(1) s.log.Error("unknown error type", @@ -285,3 +303,16 @@ func toAddForwardedError(err error) error { } func (e addForwardedError) Error() string { return e.err.Error() } + +type addPassthroughError struct { + err error +} + +func toAddPassthroughError(err error) error { + if err == nil { + return nil + } + return addPassthroughError{err: err} +} + +func (e addPassthroughError) Error() string { return e.err.Error() } diff --git a/src/aggregator/server/rawtcp/server_test.go b/src/aggregator/server/rawtcp/server_test.go index bb257a1f74..4e16f2c365 100644 --- a/src/aggregator/server/rawtcp/server_test.go +++ b/src/aggregator/server/rawtcp/server_test.go @@ -70,7 +70,7 @@ var ( } testTimed = aggregated.Metric{ Type: metric.CounterType, - ID: []byte("testForwarded"), + ID: []byte("testTimed"), TimeNanos: 12345, Value: -13, } @@ -80,6 +80,12 @@ var ( TimeNanos: 12345, Values: []float64{908, -13}, } + testPassthrough = aggregated.Metric{ + Type: metric.CounterType, + ID: []byte("testPassthrough"), + TimeNanos: 12345, + Value: -13, + } testDefaultPoliciesList = policy.DefaultPoliciesList testCustomPoliciesList = policy.PoliciesList{ policy.NewStagedPolicies( @@ -135,6 +141,7 @@ var ( SourceID: 1234, NumForwardedTimes: 3, } + testPassthroughStoragePolicy = policy.NewStoragePolicy(time.Minute, xtime.Minute, 12*time.Hour) testCounterWithPoliciesList = unaggregated.CounterWithPoliciesList{ Counter: testCounter.Counter(), PoliciesList: testDefaultPoliciesList, @@ -167,6 +174,10 @@ var ( ForwardedMetric: testForwarded, ForwardMetadata: testForwardMetadata, } + testPassthroughMetricWithMetadata = aggregated.PassthroughMetricWithMetadata{ + Metric: testPassthrough, + StoragePolicy: testPassthroughStoragePolicy, + } testCmpOpts = []cmp.Option{ cmpopts.EquateEmpty(), cmp.AllowUnexported(policy.StoragePolicy{}), @@ -224,6 +235,7 @@ func testRawTCPServerHandleUnaggregated( protocol := protocolSelector(i) if protocol == protobufEncoding { expectedResult.TimedMetricWithMetadata = append(expectedResult.TimedMetricWithMetadata, testTimedMetricWithMetadata) + expectedResult.PassthroughMetricWithMetadata = append(expectedResult.PassthroughMetricWithMetadata, testPassthroughMetricWithMetadata) expectedResult.ForwardedMetricsWithMetadata = append(expectedResult.ForwardedMetricsWithMetadata, testForwardedMetricWithMetadata) expectedTotalMetrics += 5 } else { @@ -262,6 +274,10 @@ func testRawTCPServerHandleUnaggregated( Type: encoding.TimedMetricWithMetadataType, TimedMetricWithMetadata: testTimedMetricWithMetadata, })) + require.NoError(t, encoder.EncodeMessage(encoding.UnaggregatedMessageUnion{ + Type: encoding.PassthroughMetricWithMetadataType, + PassthroughMetricWithMetadata: testPassthroughMetricWithMetadata, + })) require.NoError(t, encoder.EncodeMessage(encoding.UnaggregatedMessageUnion{ Type: encoding.ForwardedMetricWithMetadataType, ForwardedMetricWithMetadata: testForwardedMetricWithMetadata, diff --git a/src/cmd/services/m3aggregator/config/aggregator.go b/src/cmd/services/m3aggregator/config/aggregator.go index 8e18e28159..079c1816ca 100644 --- a/src/cmd/services/m3aggregator/config/aggregator.go +++ b/src/cmd/services/m3aggregator/config/aggregator.go @@ -34,6 +34,7 @@ import ( "github.com/m3db/m3/src/aggregator/aggregation/quantile/cm" "github.com/m3db/m3/src/aggregator/aggregator" "github.com/m3db/m3/src/aggregator/aggregator/handler" + "github.com/m3db/m3/src/aggregator/aggregator/handler/writer" aggclient "github.com/m3db/m3/src/aggregator/client" aggruntime "github.com/m3db/m3/src/aggregator/runtime" "github.com/m3db/m3/src/aggregator/sharding" @@ -57,6 +58,10 @@ var ( errEmptyJitterBucketList = errors.New("empty jitter bucket list") ) +var ( + defaultNumPassthroughWriters = 8 +) + // AggregatorConfiguration contains aggregator configuration. type AggregatorConfiguration struct { // HostID is the local host ID configuration. @@ -123,6 +128,9 @@ type AggregatorConfiguration struct { // Flushing handler configuration. Flush handler.FlushHandlerConfiguration `yaml:"flush"` + // Passthrough controls the passthrough knobs. + Passthrough *passthroughConfiguration `yaml:"passthrough"` + // Forwarding configuration. Forwarding forwardingConfiguration `yaml:"forwarding"` @@ -368,6 +376,18 @@ func (c *AggregatorConfiguration) NewAggregatorOptions( } opts = opts.SetFlushHandler(flushHandler) + // Set passthrough writer. + aggShardFn, err := hashType.AggregatedShardFn() + if err != nil { + return nil, err + } + iOpts = instrumentOpts.SetMetricsScope(scope.SubScope("passthrough-writer")) + passthroughWriter, err := c.newPassthroughWriter(flushHandler, iOpts, aggShardFn) + if err != nil { + return nil, err + } + opts = opts.SetPassthroughWriter(passthroughWriter) + // Set max allowed forwarding delay function. jitterEnabled := flushManagerOpts.JitterEnabled() maxJitterFn := flushManagerOpts.MaxJitterFn() @@ -859,3 +879,40 @@ func setMetricPrefix( } return fn([]byte(*str)) } + +// PassthroughConfiguration contains the knobs for pass-through server. +type passthroughConfiguration struct { + // Enabled controls whether the passthrough server/writer is enabled. + Enabled bool `yaml:"enabled"` + + // NumWriters controls the number of passthrough writers used. + NumWriters int `yaml:"numWriters"` +} + +func (c *AggregatorConfiguration) newPassthroughWriter( + flushHandler handler.Handler, + iOpts instrument.Options, + shardFn sharding.AggregatedShardFn, +) (writer.Writer, error) { + // fallback gracefully + if c.Passthrough == nil || !c.Passthrough.Enabled { + iOpts.Logger().Info("passthrough writer disabled, blackholing all passthrough writes") + return writer.NewBlackholeWriter(), nil + } + + count := defaultNumPassthroughWriters + if c.Passthrough.NumWriters != 0 { + count = c.Passthrough.NumWriters + } + + writers := make([]writer.Writer, 0, count) + for i := 0; i < count; i++ { + writer, err := flushHandler.NewWriter(iOpts.MetricsScope()) + if err != nil { + return nil, err + } + writers = append(writers, writer) + } + + return writer.NewShardedWriter(writers, shardFn, iOpts) +} diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index 2bba042ac5..325a1a2de7 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -1091,6 +1091,14 @@ func (c *aggregatorLocalAdminClient) WriteForwarded( return c.agg.AddForwarded(metric, metadata) } +// WritePassthrough writes passthrough metrics. +func (c *aggregatorLocalAdminClient) WritePassthrough( + metric aggregated.Metric, + storagePolicy policy.StoragePolicy, +) error { + return c.agg.AddPassthrough(metric, storagePolicy) +} + // Flush flushes any remaining data buffered by the client. func (c *aggregatorLocalAdminClient) Flush() error { return nil diff --git a/src/metrics/encoding/protobuf/reset.go b/src/metrics/encoding/protobuf/reset.go index bfd999ae14..3641422e2e 100644 --- a/src/metrics/encoding/protobuf/reset.go +++ b/src/metrics/encoding/protobuf/reset.go @@ -49,6 +49,7 @@ func resetMetricWithMetadatasProto(pb *metricpb.MetricWithMetadatas) { resetForwardedMetricWithMetadataProto(pb.ForwardedMetricWithMetadata) resetTimedMetricWithMetadataProto(pb.TimedMetricWithMetadata) resetTimedMetricWithMetadatasProto(pb.TimedMetricWithMetadatas) + resetTimedMetricWithStoragePolicyProto(pb.TimedMetricWithStoragePolicy) } func resetCounterWithMetadatasProto(pb *metricpb.CounterWithMetadatas) { diff --git a/src/metrics/encoding/protobuf/unaggregated_encoder.go b/src/metrics/encoding/protobuf/unaggregated_encoder.go index 8ee584b751..cee6855a4b 100644 --- a/src/metrics/encoding/protobuf/unaggregated_encoder.go +++ b/src/metrics/encoding/protobuf/unaggregated_encoder.go @@ -69,6 +69,7 @@ type unaggregatedEncoder struct { fm metricpb.ForwardedMetricWithMetadata tm metricpb.TimedMetricWithMetadata tms metricpb.TimedMetricWithMetadatas + pm metricpb.TimedMetricWithStoragePolicy buf []byte used int @@ -130,6 +131,8 @@ func (enc *unaggregatedEncoder) EncodeMessage(msg encoding.UnaggregatedMessageUn return enc.encodeTimedMetricWithMetadata(msg.TimedMetricWithMetadata) case encoding.TimedMetricWithMetadatasType: return enc.encodeTimedMetricWithMetadatas(msg.TimedMetricWithMetadatas) + case encoding.PassthroughMetricWithMetadataType: + return enc.encodePassthroughMetricWithMetadata(msg.PassthroughMetricWithMetadata) default: return fmt.Errorf("unknown message type: %v", msg.Type) } @@ -201,6 +204,17 @@ func (enc *unaggregatedEncoder) encodeTimedMetricWithMetadatas(tms aggregated.Ti return enc.encodeMetricWithMetadatas(mm) } +func (enc *unaggregatedEncoder) encodePassthroughMetricWithMetadata(pm aggregated.PassthroughMetricWithMetadata) error { + if err := pm.ToProto(&enc.pm); err != nil { + return fmt.Errorf("passthrough metric with metadata proto conversion failed: %v", err) + } + mm := metricpb.MetricWithMetadatas{ + Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_STORAGE_POLICY, + TimedMetricWithStoragePolicy: &enc.pm, + } + return enc.encodeMetricWithMetadatas(mm) +} + func (enc *unaggregatedEncoder) encodeMetricWithMetadatas(pb metricpb.MetricWithMetadatas) error { msgSize := pb.Size() if msgSize > enc.maxMessageSize { diff --git a/src/metrics/encoding/protobuf/unaggregated_encoder_test.go b/src/metrics/encoding/protobuf/unaggregated_encoder_test.go index eaf67fe829..57364bd604 100644 --- a/src/metrics/encoding/protobuf/unaggregated_encoder_test.go +++ b/src/metrics/encoding/protobuf/unaggregated_encoder_test.go @@ -96,6 +96,18 @@ var ( TimeNanos: 82590, Value: 0, } + testPassthroughMetric1 = aggregated.Metric{ + Type: metric.CounterType, + ID: []byte("testPassthroughMetric1"), + TimeNanos: 11111, + Value: 1, + } + testPassthroughMetric2 = aggregated.Metric{ + Type: metric.GaugeType, + ID: []byte("testPassthroughMetric2"), + TimeNanos: 22222, + Value: 2, + } testStagedMetadatas1 = metadata.StagedMetadatas{ { CutoverNanos: 1234, @@ -239,6 +251,8 @@ var ( AggregationID: aggregation.MustCompressTypes(aggregation.Sum), StoragePolicy: policy.NewStoragePolicy(10*time.Second, xtime.Second, 6*time.Hour), } + testPassthroughMetadata1 = policy.NewStoragePolicy(time.Minute, xtime.Minute, 12*time.Hour) + testPassthroughMetadata2 = policy.NewStoragePolicy(10*time.Second, xtime.Second, 6*time.Hour) testCounter1Proto = metricpb.Counter{ Id: []byte("testCounter1"), Value: 123, @@ -287,6 +301,18 @@ var ( TimeNanos: 82590, Value: 0, } + testPassthroughMetric1Proto = metricpb.TimedMetric{ + Type: metricpb.MetricType_COUNTER, + Id: []byte("testPassthroughMetric1"), + TimeNanos: 11111, + Value: 1, + } + testPassthroughMetric2Proto = metricpb.TimedMetric{ + Type: metricpb.MetricType_GAUGE, + Id: []byte("testPassthroughMetric2"), + TimeNanos: 22222, + Value: 2, + } testStagedMetadatas1Proto = metricpb.StagedMetadatas{ Metadatas: []metricpb.StagedMetadata{ { @@ -516,6 +542,24 @@ var ( }, }, } + testPassthroughMetadata1Proto = policypb.StoragePolicy{ + Resolution: &policypb.Resolution{ + WindowSize: time.Minute.Nanoseconds(), + Precision: time.Minute.Nanoseconds(), + }, + Retention: &policypb.Retention{ + Period: (12 * time.Hour).Nanoseconds(), + }, + } + testPassthroughMetadata2Proto = policypb.StoragePolicy{ + Resolution: &policypb.Resolution{ + WindowSize: 10 * time.Second.Nanoseconds(), + Precision: time.Second.Nanoseconds(), + }, + Retention: &policypb.Retention{ + Period: (6 * time.Hour).Nanoseconds(), + }, + } testCmpOpts = []cmp.Option{ cmpopts.EquateEmpty(), cmp.AllowUnexported(policy.StoragePolicy{}), @@ -822,6 +866,66 @@ func TestUnaggregatedEncoderEncodeTimedMetricWithMetadata(t *testing.T) { } } +func TestUnaggregatedEncoderEncodePassthroughMetricWithMetadata(t *testing.T) { + inputs := []aggregated.PassthroughMetricWithMetadata{ + { + Metric: testPassthroughMetric1, + StoragePolicy: testPassthroughMetadata1, + }, + { + Metric: testPassthroughMetric1, + StoragePolicy: testPassthroughMetadata2, + }, + { + Metric: testPassthroughMetric2, + StoragePolicy: testPassthroughMetadata1, + }, + { + Metric: testPassthroughMetric2, + StoragePolicy: testPassthroughMetadata2, + }, + } + expected := []metricpb.TimedMetricWithStoragePolicy{ + { + TimedMetric: testPassthroughMetric1Proto, + StoragePolicy: testPassthroughMetadata1Proto, + }, + { + TimedMetric: testPassthroughMetric1Proto, + StoragePolicy: testPassthroughMetadata2Proto, + }, + { + TimedMetric: testPassthroughMetric2Proto, + StoragePolicy: testPassthroughMetadata1Proto, + }, + { + TimedMetric: testPassthroughMetric2Proto, + StoragePolicy: testPassthroughMetadata2Proto, + }, + } + + var ( + sizeRes int + pbRes metricpb.MetricWithMetadatas + ) + enc := NewUnaggregatedEncoder(NewUnaggregatedOptions()) + enc.(*unaggregatedEncoder).encodeMessageSizeFn = func(size int) { sizeRes = size } + enc.(*unaggregatedEncoder).encodeMessageFn = func(pb metricpb.MetricWithMetadatas) error { pbRes = pb; return nil } + for i, input := range inputs { + require.NoError(t, enc.EncodeMessage(encoding.UnaggregatedMessageUnion{ + Type: encoding.PassthroughMetricWithMetadataType, + PassthroughMetricWithMetadata: input, + })) + expectedProto := metricpb.MetricWithMetadatas{ + Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_STORAGE_POLICY, + TimedMetricWithStoragePolicy: &expected[i], + } + expectedMsgSize := expectedProto.Size() + require.Equal(t, expectedMsgSize, sizeRes) + require.Equal(t, expectedProto, pbRes) + } +} + func TestUnaggregatedEncoderStress(t *testing.T) { inputs := []interface{}{ unaggregated.CounterWithMetadatas{ @@ -844,6 +948,10 @@ func TestUnaggregatedEncoderStress(t *testing.T) { Metric: testTimedMetric1, TimedMetadata: testTimedMetadata1, }, + aggregated.PassthroughMetricWithMetadata{ + Metric: testPassthroughMetric1, + StoragePolicy: testPassthroughMetadata1, + }, unaggregated.CounterWithMetadatas{ Counter: testCounter2, StagedMetadatas: testStagedMetadatas1, @@ -896,6 +1004,10 @@ func TestUnaggregatedEncoderStress(t *testing.T) { Metric: testTimedMetric2, TimedMetadata: testTimedMetadata2, }, + aggregated.PassthroughMetricWithMetadata{ + Metric: testPassthroughMetric2, + StoragePolicy: testPassthroughMetadata2, + }, } expected := []interface{}{ @@ -919,6 +1031,10 @@ func TestUnaggregatedEncoderStress(t *testing.T) { Metric: testTimedMetric1Proto, Metadata: testTimedMetadata1Proto, }, + metricpb.TimedMetricWithStoragePolicy{ + TimedMetric: testPassthroughMetric1Proto, + StoragePolicy: testPassthroughMetadata1Proto, + }, metricpb.CounterWithMetadatas{ Counter: testCounter2Proto, Metadatas: testStagedMetadatas1Proto, @@ -971,6 +1087,10 @@ func TestUnaggregatedEncoderStress(t *testing.T) { Metric: testTimedMetric2Proto, Metadata: testTimedMetadata2Proto, }, + metricpb.TimedMetricWithStoragePolicy{ + TimedMetric: testPassthroughMetric2Proto, + StoragePolicy: testPassthroughMetadata2Proto, + }, } var ( @@ -1039,6 +1159,16 @@ func TestUnaggregatedEncoderStress(t *testing.T) { Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATA, TimedMetricWithMetadata: &res, } + case aggregated.PassthroughMetricWithMetadata: + msg = encoding.UnaggregatedMessageUnion{ + Type: encoding.PassthroughMetricWithMetadataType, + PassthroughMetricWithMetadata: input, + } + res := expected[i].(metricpb.TimedMetricWithStoragePolicy) + expectedProto = metricpb.MetricWithMetadatas{ + Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_STORAGE_POLICY, + TimedMetricWithStoragePolicy: &res, + } default: require.Fail(t, "unrecognized type %T", input) } diff --git a/src/metrics/encoding/protobuf/unaggregated_iterator.go b/src/metrics/encoding/protobuf/unaggregated_iterator.go index 3cc73b489a..d682512df1 100644 --- a/src/metrics/encoding/protobuf/unaggregated_iterator.go +++ b/src/metrics/encoding/protobuf/unaggregated_iterator.go @@ -152,6 +152,9 @@ func (it *unaggregatedIterator) decodeMessage(size int) error { case metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATAS: it.msg.Type = encoding.TimedMetricWithMetadatasType it.err = it.msg.TimedMetricWithMetadatas.FromProto(it.pb.TimedMetricWithMetadatas) + case metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_STORAGE_POLICY: + it.msg.Type = encoding.PassthroughMetricWithMetadataType + it.err = it.msg.PassthroughMetricWithMetadata.FromProto(it.pb.TimedMetricWithStoragePolicy) default: it.err = fmt.Errorf("unrecognized message type: %v", it.pb.Type) } diff --git a/src/metrics/encoding/protobuf/unaggregated_iterator_test.go b/src/metrics/encoding/protobuf/unaggregated_iterator_test.go index ed9e517089..523e923b19 100644 --- a/src/metrics/encoding/protobuf/unaggregated_iterator_test.go +++ b/src/metrics/encoding/protobuf/unaggregated_iterator_test.go @@ -217,6 +217,51 @@ func TestUnaggregatedIteratorDecodeForwardedMetricWithMetadata(t *testing.T) { require.Equal(t, io.EOF, it.Err()) require.Equal(t, len(inputs), i) } +func TestUnaggregatedIteratorDecodePassthroughMetricWithMetadata(t *testing.T) { + inputs := []aggregated.PassthroughMetricWithMetadata{ + { + Metric: testPassthroughMetric1, + StoragePolicy: testPassthroughMetadata1, + }, + { + Metric: testPassthroughMetric2, + StoragePolicy: testPassthroughMetadata1, + }, + { + Metric: testPassthroughMetric1, + StoragePolicy: testPassthroughMetadata2, + }, + { + Metric: testPassthroughMetric2, + StoragePolicy: testPassthroughMetadata2, + }, + } + + enc := NewUnaggregatedEncoder(NewUnaggregatedOptions()) + for _, input := range inputs { + require.NoError(t, enc.EncodeMessage(encoding.UnaggregatedMessageUnion{ + Type: encoding.PassthroughMetricWithMetadataType, + PassthroughMetricWithMetadata: input, + })) + } + dataBuf := enc.Relinquish() + defer dataBuf.Close() + + var ( + i int + stream = bytes.NewReader(dataBuf.Bytes()) + ) + it := NewUnaggregatedIterator(stream, NewUnaggregatedOptions()) + defer it.Close() + for it.Next() { + res := it.Current() + require.Equal(t, encoding.PassthroughMetricWithMetadataType, res.Type) + require.Equal(t, inputs[i], res.PassthroughMetricWithMetadata) + i++ + } + require.Equal(t, io.EOF, it.Err()) + require.Equal(t, len(inputs), i) +} func TestUnaggregatedIteratorDecodeTimedMetricWithMetadata(t *testing.T) { inputs := []aggregated.TimedMetricWithMetadata{ @@ -286,6 +331,10 @@ func TestUnaggregatedIteratorDecodeStress(t *testing.T) { Metric: testTimedMetric1, TimedMetadata: testTimedMetadata1, }, + aggregated.PassthroughMetricWithMetadata{ + Metric: testPassthroughMetric1, + StoragePolicy: testPassthroughMetadata1, + }, unaggregated.CounterWithMetadatas{ Counter: testCounter2, StagedMetadatas: testStagedMetadatas1, @@ -338,6 +387,10 @@ func TestUnaggregatedIteratorDecodeStress(t *testing.T) { Metric: testTimedMetric2, TimedMetadata: testTimedMetadata2, }, + aggregated.PassthroughMetricWithMetadata{ + Metric: testPassthroughMetric2, + StoragePolicy: testPassthroughMetadata2, + }, } numIter := 1000 @@ -371,6 +424,11 @@ func TestUnaggregatedIteratorDecodeStress(t *testing.T) { Type: encoding.TimedMetricWithMetadataType, TimedMetricWithMetadata: input, } + case aggregated.PassthroughMetricWithMetadata: + msg = encoding.UnaggregatedMessageUnion{ + Type: encoding.PassthroughMetricWithMetadataType, + PassthroughMetricWithMetadata: input, + } default: require.Fail(t, "unrecognized type %T", input) } @@ -405,6 +463,9 @@ func TestUnaggregatedIteratorDecodeStress(t *testing.T) { case aggregated.TimedMetricWithMetadata: require.Equal(t, encoding.TimedMetricWithMetadataType, res.Type) require.True(t, cmp.Equal(expectedRes, res.TimedMetricWithMetadata, testCmpOpts...)) + case aggregated.PassthroughMetricWithMetadata: + require.Equal(t, encoding.PassthroughMetricWithMetadataType, res.Type) + require.True(t, cmp.Equal(expectedRes, res.PassthroughMetricWithMetadata, testCmpOpts...)) default: require.Fail(t, "unknown input type: %T", inputs[j]) } diff --git a/src/metrics/encoding/types.go b/src/metrics/encoding/types.go index d66c4da2c9..2f361502e3 100644 --- a/src/metrics/encoding/types.go +++ b/src/metrics/encoding/types.go @@ -39,6 +39,7 @@ const ( ForwardedMetricWithMetadataType TimedMetricWithMetadataType TimedMetricWithMetadatasType + PassthroughMetricWithMetadataType ) // UnaggregatedMessageUnion is a union of different types of unaggregated messages. @@ -46,13 +47,14 @@ const ( // by the `Type` field of the union, which in turn determines which one // of the field in the union contains the corresponding message data. type UnaggregatedMessageUnion struct { - Type UnaggregatedMessageType - CounterWithMetadatas unaggregated.CounterWithMetadatas - BatchTimerWithMetadatas unaggregated.BatchTimerWithMetadatas - GaugeWithMetadatas unaggregated.GaugeWithMetadatas - ForwardedMetricWithMetadata aggregated.ForwardedMetricWithMetadata - TimedMetricWithMetadata aggregated.TimedMetricWithMetadata - TimedMetricWithMetadatas aggregated.TimedMetricWithMetadatas + Type UnaggregatedMessageType + CounterWithMetadatas unaggregated.CounterWithMetadatas + BatchTimerWithMetadatas unaggregated.BatchTimerWithMetadatas + GaugeWithMetadatas unaggregated.GaugeWithMetadatas + ForwardedMetricWithMetadata aggregated.ForwardedMetricWithMetadata + TimedMetricWithMetadata aggregated.TimedMetricWithMetadata + TimedMetricWithMetadatas aggregated.TimedMetricWithMetadatas + PassthroughMetricWithMetadata aggregated.PassthroughMetricWithMetadata } // ByteReadScanner is capable of reading and scanning bytes. diff --git a/src/metrics/generated/proto/metricpb/composite.pb.go b/src/metrics/generated/proto/metricpb/composite.pb.go index b42e69cf29..48154fd048 100644 --- a/src/metrics/generated/proto/metricpb/composite.pb.go +++ b/src/metrics/generated/proto/metricpb/composite.pb.go @@ -75,13 +75,14 @@ const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package type MetricWithMetadatas_Type int32 const ( - MetricWithMetadatas_UNKNOWN MetricWithMetadatas_Type = 0 - MetricWithMetadatas_COUNTER_WITH_METADATAS MetricWithMetadatas_Type = 1 - MetricWithMetadatas_BATCH_TIMER_WITH_METADATAS MetricWithMetadatas_Type = 2 - MetricWithMetadatas_GAUGE_WITH_METADATAS MetricWithMetadatas_Type = 3 - MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA MetricWithMetadatas_Type = 4 - MetricWithMetadatas_TIMED_METRIC_WITH_METADATA MetricWithMetadatas_Type = 5 - MetricWithMetadatas_TIMED_METRIC_WITH_METADATAS MetricWithMetadatas_Type = 6 + MetricWithMetadatas_UNKNOWN MetricWithMetadatas_Type = 0 + MetricWithMetadatas_COUNTER_WITH_METADATAS MetricWithMetadatas_Type = 1 + MetricWithMetadatas_BATCH_TIMER_WITH_METADATAS MetricWithMetadatas_Type = 2 + MetricWithMetadatas_GAUGE_WITH_METADATAS MetricWithMetadatas_Type = 3 + MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA MetricWithMetadatas_Type = 4 + MetricWithMetadatas_TIMED_METRIC_WITH_METADATA MetricWithMetadatas_Type = 5 + MetricWithMetadatas_TIMED_METRIC_WITH_METADATAS MetricWithMetadatas_Type = 6 + MetricWithMetadatas_TIMED_METRIC_WITH_STORAGE_POLICY MetricWithMetadatas_Type = 7 ) var MetricWithMetadatas_Type_name = map[int32]string{ @@ -92,15 +93,17 @@ var MetricWithMetadatas_Type_name = map[int32]string{ 4: "FORWARDED_METRIC_WITH_METADATA", 5: "TIMED_METRIC_WITH_METADATA", 6: "TIMED_METRIC_WITH_METADATAS", + 7: "TIMED_METRIC_WITH_STORAGE_POLICY", } var MetricWithMetadatas_Type_value = map[string]int32{ - "UNKNOWN": 0, - "COUNTER_WITH_METADATAS": 1, - "BATCH_TIMER_WITH_METADATAS": 2, - "GAUGE_WITH_METADATAS": 3, - "FORWARDED_METRIC_WITH_METADATA": 4, - "TIMED_METRIC_WITH_METADATA": 5, - "TIMED_METRIC_WITH_METADATAS": 6, + "UNKNOWN": 0, + "COUNTER_WITH_METADATAS": 1, + "BATCH_TIMER_WITH_METADATAS": 2, + "GAUGE_WITH_METADATAS": 3, + "FORWARDED_METRIC_WITH_METADATA": 4, + "TIMED_METRIC_WITH_METADATA": 5, + "TIMED_METRIC_WITH_METADATAS": 6, + "TIMED_METRIC_WITH_STORAGE_POLICY": 7, } func (x MetricWithMetadatas_Type) String() string { @@ -315,13 +318,14 @@ func (m *AggregatedMetric) GetEncodeNanos() int64 { // significant performance hit when such message type is used for encoding // and decoding high volume traffic. type MetricWithMetadatas struct { - Type MetricWithMetadatas_Type `protobuf:"varint,1,opt,name=type,proto3,enum=metricpb.MetricWithMetadatas_Type" json:"type,omitempty"` - CounterWithMetadatas *CounterWithMetadatas `protobuf:"bytes,2,opt,name=counter_with_metadatas,json=counterWithMetadatas" json:"counter_with_metadatas,omitempty"` - BatchTimerWithMetadatas *BatchTimerWithMetadatas `protobuf:"bytes,3,opt,name=batch_timer_with_metadatas,json=batchTimerWithMetadatas" json:"batch_timer_with_metadatas,omitempty"` - GaugeWithMetadatas *GaugeWithMetadatas `protobuf:"bytes,4,opt,name=gauge_with_metadatas,json=gaugeWithMetadatas" json:"gauge_with_metadatas,omitempty"` - ForwardedMetricWithMetadata *ForwardedMetricWithMetadata `protobuf:"bytes,5,opt,name=forwarded_metric_with_metadata,json=forwardedMetricWithMetadata" json:"forwarded_metric_with_metadata,omitempty"` - TimedMetricWithMetadata *TimedMetricWithMetadata `protobuf:"bytes,6,opt,name=timed_metric_with_metadata,json=timedMetricWithMetadata" json:"timed_metric_with_metadata,omitempty"` - TimedMetricWithMetadatas *TimedMetricWithMetadatas `protobuf:"bytes,7,opt,name=timed_metric_with_metadatas,json=timedMetricWithMetadatas" json:"timed_metric_with_metadatas,omitempty"` + Type MetricWithMetadatas_Type `protobuf:"varint,1,opt,name=type,proto3,enum=metricpb.MetricWithMetadatas_Type" json:"type,omitempty"` + CounterWithMetadatas *CounterWithMetadatas `protobuf:"bytes,2,opt,name=counter_with_metadatas,json=counterWithMetadatas" json:"counter_with_metadatas,omitempty"` + BatchTimerWithMetadatas *BatchTimerWithMetadatas `protobuf:"bytes,3,opt,name=batch_timer_with_metadatas,json=batchTimerWithMetadatas" json:"batch_timer_with_metadatas,omitempty"` + GaugeWithMetadatas *GaugeWithMetadatas `protobuf:"bytes,4,opt,name=gauge_with_metadatas,json=gaugeWithMetadatas" json:"gauge_with_metadatas,omitempty"` + ForwardedMetricWithMetadata *ForwardedMetricWithMetadata `protobuf:"bytes,5,opt,name=forwarded_metric_with_metadata,json=forwardedMetricWithMetadata" json:"forwarded_metric_with_metadata,omitempty"` + TimedMetricWithMetadata *TimedMetricWithMetadata `protobuf:"bytes,6,opt,name=timed_metric_with_metadata,json=timedMetricWithMetadata" json:"timed_metric_with_metadata,omitempty"` + TimedMetricWithMetadatas *TimedMetricWithMetadatas `protobuf:"bytes,7,opt,name=timed_metric_with_metadatas,json=timedMetricWithMetadatas" json:"timed_metric_with_metadatas,omitempty"` + TimedMetricWithStoragePolicy *TimedMetricWithStoragePolicy `protobuf:"bytes,8,opt,name=timed_metric_with_storage_policy,json=timedMetricWithStoragePolicy" json:"timed_metric_with_storage_policy,omitempty"` } func (m *MetricWithMetadatas) Reset() { *m = MetricWithMetadatas{} } @@ -378,6 +382,13 @@ func (m *MetricWithMetadatas) GetTimedMetricWithMetadatas() *TimedMetricWithMeta return nil } +func (m *MetricWithMetadatas) GetTimedMetricWithStoragePolicy() *TimedMetricWithStoragePolicy { + if m != nil { + return m.TimedMetricWithStoragePolicy + } + return nil +} + func init() { proto.RegisterType((*CounterWithMetadatas)(nil), "metricpb.CounterWithMetadatas") proto.RegisterType((*BatchTimerWithMetadatas)(nil), "metricpb.BatchTimerWithMetadatas") @@ -739,6 +750,16 @@ func (m *MetricWithMetadatas) MarshalTo(dAtA []byte) (int, error) { } i += n21 } + if m.TimedMetricWithStoragePolicy != nil { + dAtA[i] = 0x42 + i++ + i = encodeVarintComposite(dAtA, i, uint64(m.TimedMetricWithStoragePolicy.Size())) + n22, err := m.TimedMetricWithStoragePolicy.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n22 + } return i, nil } @@ -862,6 +883,10 @@ func (m *MetricWithMetadatas) Size() (n int) { l = m.TimedMetricWithMetadatas.Size() n += 1 + l + sovComposite(uint64(l)) } + if m.TimedMetricWithStoragePolicy != nil { + l = m.TimedMetricWithStoragePolicy.Size() + n += 1 + l + sovComposite(uint64(l)) + } return n } @@ -1993,6 +2018,39 @@ func (m *MetricWithMetadatas) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 8: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TimedMetricWithStoragePolicy", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowComposite + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthComposite + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.TimedMetricWithStoragePolicy == nil { + m.TimedMetricWithStoragePolicy = &TimedMetricWithStoragePolicy{} + } + if err := m.TimedMetricWithStoragePolicy.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipComposite(dAtA[iNdEx:]) @@ -2124,53 +2182,56 @@ func init() { } var fileDescriptorComposite = []byte{ - // 768 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x96, 0xdf, 0x6b, 0xd3, 0x6c, - 0x14, 0xc7, 0xf7, 0x6c, 0x5d, 0xb7, 0xf7, 0x74, 0xef, 0xde, 0xbe, 0x8f, 0x75, 0xad, 0xed, 0xc8, - 0x5c, 0x40, 0x11, 0xc4, 0x16, 0x57, 0x70, 0xc8, 0x50, 0xe8, 0xaf, 0x75, 0x45, 0xd6, 0x49, 0x9a, - 0x51, 0xf0, 0x62, 0x21, 0x49, 0xb3, 0x34, 0x62, 0x9b, 0x92, 0x3c, 0x65, 0x0c, 0x6f, 0xbc, 0xd4, - 0x1b, 0x11, 0xc4, 0xff, 0xc0, 0x3f, 0x66, 0x57, 0x22, 0x78, 0x2f, 0x32, 0xff, 0x11, 0x49, 0xf2, - 0xa4, 0xf9, 0xad, 0xd2, 0xde, 0x25, 0xe7, 0x9c, 0xef, 0xe7, 0x7c, 0xfb, 0xe4, 0x39, 0x87, 0x42, - 0x5b, 0xd5, 0xc8, 0x70, 0x2a, 0x95, 0x65, 0x7d, 0x54, 0x19, 0x55, 0x07, 0x52, 0x65, 0x54, 0xad, - 0x98, 0x86, 0x5c, 0x19, 0x29, 0xc4, 0xd0, 0x64, 0xb3, 0xa2, 0x2a, 0x63, 0xc5, 0x10, 0x89, 0x32, - 0xa8, 0x4c, 0x0c, 0x9d, 0xe8, 0x34, 0x3e, 0x91, 0x2a, 0xb2, 0x3e, 0x9a, 0xe8, 0xa6, 0x46, 0x94, - 0xb2, 0x9d, 0xc0, 0xeb, 0x6e, 0xa6, 0xf8, 0xc0, 0x87, 0x54, 0x75, 0x55, 0x77, 0x94, 0xd2, 0xf4, - 0xdc, 0x7e, 0x73, 0x30, 0xd6, 0x93, 0x23, 0x2c, 0x36, 0xe7, 0x75, 0xe0, 0x3c, 0x50, 0xca, 0xe1, - 0x02, 0x14, 0x71, 0x20, 0x12, 0x71, 0x4e, 0x37, 0x13, 0xfd, 0x95, 0x26, 0x5f, 0x4e, 0x24, 0xfa, - 0xe0, 0x50, 0xd8, 0xb7, 0x08, 0x72, 0x0d, 0x7d, 0x3a, 0x26, 0x8a, 0xd1, 0xd7, 0xc8, 0xf0, 0x98, - 0xf6, 0x30, 0xf1, 0x43, 0x58, 0x93, 0x9d, 0x78, 0x01, 0xdd, 0x46, 0xf7, 0x32, 0x7b, 0xff, 0x97, - 0x5d, 0x27, 0x65, 0x2a, 0xa8, 0xa7, 0xae, 0xbe, 0xef, 0x2c, 0x71, 0x6e, 0x1d, 0x7e, 0x02, 0xff, - 0xb8, 0x1e, 0xcd, 0xc2, 0xb2, 0x2d, 0xba, 0xe5, 0x89, 0x7a, 0x44, 0x54, 0x95, 0xc1, 0xac, 0x01, - 0x15, 0x7b, 0x0a, 0xf6, 0x13, 0x82, 0x7c, 0x5d, 0x24, 0xf2, 0x90, 0xd7, 0x46, 0x61, 0x37, 0x07, - 0x90, 0x91, 0xac, 0x94, 0x40, 0xac, 0x1c, 0x75, 0x94, 0xf3, 0xe0, 0x9e, 0x8e, 0x72, 0x41, 0x9a, - 0x45, 0x16, 0xf5, 0xf5, 0x06, 0x01, 0x6e, 0x8b, 0x53, 0x55, 0x09, 0x5a, 0xba, 0x0f, 0xab, 0xaa, - 0x15, 0xa5, 0x66, 0xfe, 0xf3, 0x88, 0x76, 0x31, 0xe5, 0x38, 0x35, 0x8b, 0x5a, 0xf8, 0x88, 0xa0, - 0x74, 0xa8, 0x1b, 0x17, 0xa2, 0x31, 0xb0, 0xeb, 0x0c, 0x4d, 0xf6, 0x9b, 0xc1, 0xfb, 0x90, 0x76, - 0x60, 0xd4, 0x8c, 0x8f, 0x1d, 0x92, 0x51, 0x36, 0x2d, 0xc7, 0x07, 0xb0, 0xee, 0x76, 0x89, 0xda, - 0xa2, 0x52, 0xb7, 0x0b, 0x95, 0xce, 0x04, 0xec, 0x3b, 0x04, 0x79, 0xeb, 0x84, 0xe3, 0x1c, 0x55, - 0x43, 0x8e, 0x6e, 0x7a, 0x58, 0x9f, 0x24, 0xe4, 0xe6, 0x71, 0xc4, 0x4d, 0x3e, 0x2a, 0x8b, 0xf7, - 0xf2, 0x1e, 0x41, 0x21, 0xc1, 0x8b, 0x39, 0x9f, 0x99, 0x05, 0x3f, 0xd9, 0x67, 0x04, 0xdb, 0x21, - 0x43, 0x3d, 0xa2, 0x1b, 0xa2, 0xaa, 0x3c, 0xb7, 0xe7, 0x0f, 0x3f, 0x85, 0x0d, 0xeb, 0x32, 0x0f, - 0x84, 0xbf, 0xb7, 0x96, 0x21, 0x5e, 0x08, 0x37, 0x61, 0xd3, 0x74, 0x80, 0x82, 0x33, 0xd1, 0xb3, - 0x23, 0x73, 0x27, 0xbd, 0x1c, 0x68, 0x48, 0x19, 0xff, 0x9a, 0xfe, 0x20, 0xfb, 0x1a, 0xb2, 0x35, - 0x55, 0x35, 0x14, 0xd5, 0xda, 0x14, 0x33, 0x72, 0xf0, 0xb8, 0xee, 0xc6, 0x7a, 0x8a, 0xfc, 0xa2, - 0xd0, 0xf9, 0xed, 0xc2, 0x86, 0x32, 0x96, 0xf5, 0x81, 0x22, 0x8c, 0xc5, 0xb1, 0xee, 0x1c, 0xe1, - 0x0a, 0x97, 0x71, 0x62, 0x5d, 0x2b, 0xc4, 0x7e, 0x4b, 0xc3, 0x8d, 0xb8, 0xef, 0xf5, 0x08, 0x52, - 0xe4, 0x72, 0xe2, 0x4c, 0xd6, 0xe6, 0x1e, 0xeb, 0xb5, 0x8f, 0x29, 0x2e, 0xf3, 0x97, 0x13, 0x85, - 0xb3, 0xeb, 0x31, 0x0f, 0x5b, 0x74, 0x17, 0x09, 0x17, 0x1a, 0x19, 0x0a, 0xe1, 0xef, 0xc7, 0x44, - 0x56, 0x58, 0x00, 0xc5, 0xe5, 0xe4, 0xb8, 0x4d, 0x78, 0x06, 0x45, 0xdf, 0xee, 0x09, 0x93, 0x57, - 0x6c, 0xf2, 0x6e, 0xdc, 0x2a, 0x0a, 0xc2, 0xf3, 0x52, 0xc2, 0x6e, 0xeb, 0x42, 0xce, 0x5e, 0x12, - 0x61, 0x72, 0xca, 0x26, 0x6f, 0x87, 0xf6, 0x4a, 0x10, 0x8a, 0xd5, 0xe8, 0x62, 0x7a, 0x09, 0xcc, - 0xb9, 0x3b, 0xf4, 0xf4, 0x72, 0x05, 0xd1, 0x85, 0x55, 0x9b, 0x7c, 0x27, 0x71, 0x49, 0xf8, 0x79, - 0x5c, 0xe9, 0xfc, 0x37, 0x8b, 0xe7, 0x0c, 0x8a, 0xfe, 0x4b, 0x1c, 0xea, 0x93, 0x0e, 0x9f, 0x4d, - 0xc2, 0x84, 0x72, 0x79, 0x92, 0xb0, 0x46, 0x44, 0x28, 0x25, 0xf3, 0xcd, 0xc2, 0x9a, 0xdd, 0x80, - 0xfd, 0x63, 0x03, 0x93, 0x2b, 0x24, 0x74, 0x30, 0xd9, 0x2f, 0x08, 0x52, 0xd6, 0x1d, 0xc2, 0x19, - 0x58, 0x3b, 0xed, 0x3e, 0xeb, 0x9e, 0xf4, 0xbb, 0xd9, 0x25, 0x5c, 0x84, 0xad, 0xc6, 0xc9, 0x69, - 0x97, 0x6f, 0x71, 0x42, 0xbf, 0xc3, 0x1f, 0x09, 0xc7, 0x2d, 0xbe, 0xd6, 0xac, 0xf1, 0xb5, 0x5e, - 0x16, 0x61, 0x06, 0x8a, 0xf5, 0x1a, 0xdf, 0x38, 0x12, 0xf8, 0xce, 0x71, 0x34, 0xbf, 0x8c, 0x0b, - 0x90, 0x6b, 0xd7, 0x4e, 0xdb, 0xad, 0x70, 0x66, 0x05, 0xb3, 0xc0, 0x1c, 0x9e, 0x70, 0xfd, 0x1a, - 0xd7, 0x6c, 0x35, 0xad, 0x04, 0xd7, 0x69, 0x04, 0x8b, 0xb2, 0x29, 0x8b, 0x6e, 0x71, 0x13, 0xf2, - 0xab, 0x78, 0x07, 0x4a, 0xc9, 0xf9, 0x5e, 0x36, 0x5d, 0xef, 0x5c, 0x5d, 0x33, 0xe8, 0xeb, 0x35, - 0x83, 0x7e, 0x5c, 0x33, 0xe8, 0xc3, 0x4f, 0x66, 0xe9, 0xc5, 0xfe, 0x9c, 0x7f, 0x39, 0xa4, 0xb4, - 0xfd, 0x5e, 0xfd, 0x15, 0x00, 0x00, 0xff, 0xff, 0x00, 0x47, 0x7a, 0x2d, 0x7c, 0x09, 0x00, 0x00, + // 808 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x96, 0xcd, 0x6e, 0xd3, 0x58, + 0x14, 0xc7, 0xeb, 0x36, 0x4d, 0x3a, 0x27, 0x9d, 0x4e, 0xe6, 0x4e, 0xa6, 0xc9, 0x24, 0x95, 0xdb, + 0x5a, 0x33, 0x23, 0x24, 0x44, 0x22, 0x1a, 0x89, 0x0a, 0x55, 0x20, 0x39, 0x1f, 0x4d, 0x23, 0x68, + 0x52, 0x39, 0xae, 0x22, 0x58, 0xd4, 0xb2, 0x1d, 0xd7, 0x31, 0x22, 0x71, 0x64, 0xdf, 0xa8, 0xaa, + 0xd8, 0xb0, 0x84, 0x0d, 0x42, 0x42, 0xbc, 0x01, 0x0f, 0xd3, 0x25, 0x4f, 0x80, 0x50, 0x59, 0xb1, + 0xe7, 0x01, 0x90, 0xed, 0xeb, 0xf8, 0x2b, 0x2e, 0x90, 0xec, 0xec, 0xf3, 0xf1, 0x3b, 0xff, 0x5c, + 0xdf, 0xf3, 0x57, 0xa0, 0xa9, 0x6a, 0x78, 0x30, 0x91, 0x4a, 0xb2, 0x3e, 0x2c, 0x0f, 0x2b, 0x7d, + 0xa9, 0x3c, 0xac, 0x94, 0x4d, 0x43, 0x2e, 0x0f, 0x15, 0x6c, 0x68, 0xb2, 0x59, 0x56, 0x95, 0x91, + 0x62, 0x88, 0x58, 0xe9, 0x97, 0xc7, 0x86, 0x8e, 0x75, 0x12, 0x1f, 0x4b, 0x65, 0x59, 0x1f, 0x8e, + 0x75, 0x53, 0xc3, 0x4a, 0xc9, 0x4e, 0xa0, 0x35, 0x37, 0x53, 0xb8, 0xe3, 0x43, 0xaa, 0xba, 0xaa, + 0x3b, 0x9d, 0xd2, 0xe4, 0xdc, 0x7e, 0x73, 0x30, 0xd6, 0x93, 0xd3, 0x58, 0xa8, 0xcf, 0xab, 0xc0, + 0x79, 0x20, 0x94, 0xc3, 0x05, 0x28, 0x62, 0x5f, 0xc4, 0xe2, 0x9c, 0x6a, 0xc6, 0xfa, 0x73, 0x4d, + 0xbe, 0x1c, 0x4b, 0xe4, 0xc1, 0xa1, 0x30, 0xaf, 0x28, 0xc8, 0xd6, 0xf4, 0xc9, 0x08, 0x2b, 0x46, + 0x4f, 0xc3, 0x83, 0x63, 0x32, 0xc3, 0x44, 0x77, 0x21, 0x25, 0x3b, 0xf1, 0x3c, 0xb5, 0x43, 0xdd, + 0x4a, 0xef, 0xfd, 0x59, 0x72, 0x95, 0x94, 0x48, 0x43, 0x35, 0x71, 0xf5, 0x69, 0x7b, 0x89, 0x73, + 0xeb, 0xd0, 0x03, 0xf8, 0xcd, 0xd5, 0x68, 0xe6, 0x97, 0xed, 0xa6, 0x7f, 0xbc, 0xa6, 0x2e, 0x16, + 0x55, 0xa5, 0x3f, 0x1d, 0x40, 0x9a, 0xbd, 0x0e, 0xe6, 0x3d, 0x05, 0xb9, 0xaa, 0x88, 0xe5, 0x01, + 0xaf, 0x0d, 0xc3, 0x6a, 0x0e, 0x20, 0x2d, 0x59, 0x29, 0x01, 0x5b, 0x39, 0xa2, 0x28, 0xeb, 0xc1, + 0xbd, 0x3e, 0xc2, 0x05, 0x69, 0x1a, 0x59, 0x54, 0xd7, 0x4b, 0x0a, 0x50, 0x53, 0x9c, 0xa8, 0x4a, + 0x50, 0xd2, 0x6d, 0x58, 0x55, 0xad, 0x28, 0x11, 0xf3, 0x87, 0x47, 0xb4, 0x8b, 0x09, 0xc7, 0xa9, + 0x59, 0x54, 0xc2, 0x3b, 0x0a, 0x8a, 0x87, 0xba, 0x71, 0x21, 0x1a, 0x7d, 0xbb, 0xce, 0xd0, 0x64, + 0xbf, 0x18, 0xb4, 0x0f, 0x49, 0x07, 0x46, 0xc4, 0xf8, 0xd8, 0xa1, 0x36, 0xc2, 0x26, 0xe5, 0xe8, + 0x00, 0xd6, 0xdc, 0x29, 0x51, 0x59, 0xa4, 0xd5, 0x9d, 0x42, 0x5a, 0xa7, 0x0d, 0xcc, 0x6b, 0x0a, + 0x72, 0xd6, 0x09, 0xcf, 0x52, 0x54, 0x09, 0x29, 0xfa, 0xdb, 0xc3, 0xfa, 0x5a, 0x42, 0x6a, 0xee, + 0x47, 0xd4, 0xe4, 0xa2, 0x6d, 0xb3, 0xb5, 0xbc, 0xa1, 0x20, 0x1f, 0xa3, 0xc5, 0x9c, 0x4f, 0xcc, + 0x82, 0x9f, 0xec, 0x03, 0x05, 0x5b, 0x21, 0x41, 0x5d, 0xac, 0x1b, 0xa2, 0xaa, 0x9c, 0xd8, 0xfb, + 0x87, 0x1e, 0xc2, 0xba, 0x75, 0x99, 0xfb, 0xc2, 0xcf, 0x4b, 0x4b, 0x63, 0x2f, 0x84, 0xea, 0xb0, + 0x61, 0x3a, 0x40, 0xc1, 0xd9, 0xe8, 0xe9, 0x91, 0xb9, 0x9b, 0x5e, 0x0a, 0x0c, 0x24, 0x8c, 0xdf, + 0x4d, 0x7f, 0x90, 0x79, 0x01, 0x19, 0x56, 0x55, 0x0d, 0x45, 0xb5, 0x9c, 0x62, 0x4a, 0x0e, 0x1e, + 0xd7, 0xff, 0x33, 0x35, 0x45, 0x7e, 0x51, 0xe8, 0xfc, 0x76, 0x61, 0x5d, 0x19, 0xc9, 0x7a, 0x5f, + 0x11, 0x46, 0xe2, 0x48, 0x77, 0x8e, 0x70, 0x85, 0x4b, 0x3b, 0xb1, 0xb6, 0x15, 0x62, 0xbe, 0xa6, + 0xe0, 0xaf, 0x59, 0xdf, 0xeb, 0x1e, 0x24, 0xf0, 0xe5, 0xd8, 0xd9, 0xac, 0x8d, 0x3d, 0xc6, 0x1b, + 0x3f, 0xa3, 0xb8, 0xc4, 0x5f, 0x8e, 0x15, 0xce, 0xae, 0x47, 0x3c, 0x6c, 0x12, 0x2f, 0x12, 0x2e, + 0x34, 0x3c, 0x10, 0xc2, 0xdf, 0x8f, 0x8e, 0x58, 0x58, 0x00, 0xc5, 0x65, 0xe5, 0x59, 0x4e, 0x78, + 0x06, 0x05, 0x9f, 0xf7, 0x84, 0xc9, 0x2b, 0x36, 0x79, 0x77, 0x96, 0x15, 0x05, 0xe1, 0x39, 0x29, + 0xc6, 0xdb, 0xda, 0x90, 0xb5, 0x4d, 0x22, 0x4c, 0x4e, 0xd8, 0xe4, 0xad, 0x90, 0xaf, 0x04, 0xa1, + 0x48, 0x8d, 0x1a, 0xd3, 0x33, 0xa0, 0xcf, 0xdd, 0xa5, 0x27, 0x97, 0x2b, 0x88, 0xce, 0xaf, 0xda, + 0xe4, 0xff, 0x62, 0x4d, 0xc2, 0xcf, 0xe3, 0x8a, 0xe7, 0x37, 0x18, 0xcf, 0x19, 0x14, 0xfc, 0x97, + 0x38, 0x34, 0x27, 0x19, 0x3e, 0x9b, 0x98, 0x0d, 0xe5, 0x72, 0x38, 0xc6, 0x46, 0x44, 0x28, 0xc6, + 0xf3, 0xcd, 0x7c, 0xca, 0x1e, 0xc0, 0xfc, 0x70, 0x80, 0xc9, 0xe5, 0x71, 0x9c, 0x39, 0x8c, 0x60, + 0x27, 0x3a, 0x22, 0xb4, 0x59, 0x6b, 0xbf, 0xb2, 0x07, 0xdc, 0x16, 0xbe, 0x21, 0xcb, 0x7c, 0xa3, + 0x20, 0x61, 0xdd, 0x59, 0x94, 0x86, 0xd4, 0x69, 0xfb, 0x51, 0xbb, 0xd3, 0x6b, 0x67, 0x96, 0x50, + 0x01, 0x36, 0x6b, 0x9d, 0xd3, 0x36, 0xdf, 0xe0, 0x84, 0x5e, 0x8b, 0x3f, 0x12, 0x8e, 0x1b, 0x3c, + 0x5b, 0x67, 0x79, 0xb6, 0x9b, 0xa1, 0x10, 0x0d, 0x85, 0x2a, 0xcb, 0xd7, 0x8e, 0x04, 0xbe, 0x75, + 0x1c, 0xcd, 0x2f, 0xa3, 0x3c, 0x64, 0x9b, 0xec, 0x69, 0xb3, 0x11, 0xce, 0xac, 0x20, 0x06, 0xe8, + 0xc3, 0x0e, 0xd7, 0x63, 0xb9, 0x7a, 0xa3, 0x6e, 0x25, 0xb8, 0x56, 0x2d, 0x58, 0x94, 0x49, 0x58, + 0x74, 0x8b, 0x1b, 0x93, 0x5f, 0x45, 0xdb, 0x50, 0x8c, 0xcf, 0x77, 0x33, 0x49, 0xf4, 0x2f, 0xec, + 0x44, 0x0b, 0xba, 0x7c, 0x87, 0x63, 0x9b, 0x0d, 0xe1, 0xa4, 0xf3, 0xb8, 0x55, 0x7b, 0x92, 0x49, + 0x55, 0x5b, 0x57, 0xd7, 0x34, 0xf5, 0xf1, 0x9a, 0xa6, 0x3e, 0x5f, 0xd3, 0xd4, 0xdb, 0x2f, 0xf4, + 0xd2, 0xd3, 0xfd, 0x39, 0xff, 0x08, 0x49, 0x49, 0xfb, 0xbd, 0xf2, 0x3d, 0x00, 0x00, 0xff, 0xff, + 0x34, 0x3f, 0x09, 0x4f, 0x12, 0x0a, 0x00, 0x00, } diff --git a/src/metrics/generated/proto/metricpb/composite.proto b/src/metrics/generated/proto/metricpb/composite.proto index ad54e6c119..61555eb196 100644 --- a/src/metrics/generated/proto/metricpb/composite.proto +++ b/src/metrics/generated/proto/metricpb/composite.proto @@ -84,6 +84,7 @@ message MetricWithMetadatas { FORWARDED_METRIC_WITH_METADATA = 4; TIMED_METRIC_WITH_METADATA = 5; TIMED_METRIC_WITH_METADATAS = 6; + TIMED_METRIC_WITH_STORAGE_POLICY = 7; } Type type = 1; CounterWithMetadatas counter_with_metadatas = 2; @@ -92,4 +93,5 @@ message MetricWithMetadatas { ForwardedMetricWithMetadata forwarded_metric_with_metadata = 5; TimedMetricWithMetadata timed_metric_with_metadata = 6; TimedMetricWithMetadatas timed_metric_with_metadatas = 7; + TimedMetricWithStoragePolicy timed_metric_with_storage_policy = 8; } diff --git a/src/metrics/metric/aggregated/types.go b/src/metrics/metric/aggregated/types.go index 65f9723b25..4415dfa039 100644 --- a/src/metrics/metric/aggregated/types.go +++ b/src/metrics/metric/aggregated/types.go @@ -33,8 +33,9 @@ import ( ) var ( - errNilForwardedMetricWithMetadataProto = errors.New("nil forwarded metric with metadata proto message") - errNilTimedMetricWithMetadataProto = errors.New("nil timed metric with metadata proto message") + errNilForwardedMetricWithMetadataProto = errors.New("nil forwarded metric with metadata proto message") + errNilTimedMetricWithMetadataProto = errors.New("nil timed metric with metadata proto message") + errNilPassthroughMetricWithMetadataProto = errors.New("nil passthrough metric with metadata proto message") ) // Metric is a metric, which is essentially a named value at certain time. @@ -254,3 +255,28 @@ func (tm *TimedMetricWithMetadatas) FromProto(pb *metricpb.TimedMetricWithMetada } return tm.StagedMetadatas.FromProto(pb.Metadatas) } + +// PassthroughMetricWithMetadata is a passthrough metric with metadata. +type PassthroughMetricWithMetadata struct { + Metric + policy.StoragePolicy +} + +// ToProto converts the passthrough metric with metadata to a protobuf message in place. +func (pm PassthroughMetricWithMetadata) ToProto(pb *metricpb.TimedMetricWithStoragePolicy) error { + if err := pm.Metric.ToProto(&pb.TimedMetric); err != nil { + return err + } + return pm.StoragePolicy.ToProto(&pb.StoragePolicy) +} + +// FromProto converts the protobuf message to a timed metric with metadata in place. +func (pm *PassthroughMetricWithMetadata) FromProto(pb *metricpb.TimedMetricWithStoragePolicy) error { + if pb == nil { + return errNilPassthroughMetricWithMetadataProto + } + if err := pm.Metric.FromProto(pb.TimedMetric); err != nil { + return err + } + return pm.StoragePolicy.FromProto(pb.StoragePolicy) +}