From 04d3a68bc4fdf5b6044c46cd0f10658b49ba19f2 Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Wed, 16 Sep 2020 00:28:26 -0400 Subject: [PATCH 01/10] [coord] Configurable blackholed SP's back from agg There are cases where we want to ignore metrics for a given storage policy at the coordinator as they come back from the aggregator over m3msg. Specifically, the coordinators may still receive aggregated metrics with storage policies that they no longer have namespace configuration for. This PR allows dropping those metrics as they come back from the aggregator based on their storage policy. --- .../server/m3msg/protobuf_handler.go | 38 +++++++++-- .../server/m3msg/protobuf_handler_test.go | 67 +++++++++++++++++++ 2 files changed, 99 insertions(+), 6 deletions(-) diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go index 4c3d78b242..5e9512f118 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go @@ -25,6 +25,7 @@ import ( "sync" "github.com/m3db/m3/src/metrics/encoding/protobuf" + "github.com/m3db/m3/src/metrics/policy" "github.com/m3db/m3/src/msg/consumer" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" @@ -38,11 +39,13 @@ type Options struct { InstrumentOptions instrument.Options WriteFn WriteFn ProtobufDecoderPoolOptions pool.ObjectPoolOptions + BlockholePolicies []policy.StoragePolicy } type handlerMetrics struct { messageReadError tally.Counter metricAccepted tally.Counter + droppedMetricBlackholePolicy tally.Counter droppedMetricDecodeError tally.Counter droppedMetricDecodeMalformed tally.Counter } @@ -55,6 +58,9 @@ func newHandlerMetrics(scope tally.Scope) handlerMetrics { droppedMetricDecodeError: messageScope.Tagged(map[string]string{ "reason": "decode-error", }).Counter("dropped"), + droppedMetricBlackholePolicy: messageScope.Tagged(map[string]string{ + "reason": "blackhole-policy", + }).Counter("dropped"), droppedMetricDecodeMalformed: messageScope.Tagged(map[string]string{ "reason": "decode-malformed", }).Counter("dropped"), @@ -68,18 +74,28 @@ type pbHandler struct { wg *sync.WaitGroup logger *zap.Logger m handlerMetrics + + // Set of policies for which when we see a metric we drop it on the floor. + blackholePolicies map[policy.StoragePolicy]struct{} } func newProtobufProcessor(opts Options) consumer.MessageProcessor { p := protobuf.NewAggregatedDecoderPool(opts.ProtobufDecoderPoolOptions) p.Init() + + blackholePolicies := make(map[policy.StoragePolicy]struct{}, len(opts.BlockholePolicies)) + for _, sp := range opts.BlockholePolicies { + blackholePolicies[sp] = struct{}{} + } + return &pbHandler{ - ctx: context.Background(), - writeFn: opts.WriteFn, - pool: p, - wg: &sync.WaitGroup{}, - logger: opts.InstrumentOptions.Logger(), - m: newHandlerMetrics(opts.InstrumentOptions.MetricsScope()), + ctx: context.Background(), + writeFn: opts.WriteFn, + pool: p, + wg: &sync.WaitGroup{}, + logger: opts.InstrumentOptions.Logger(), + m: newHandlerMetrics(opts.InstrumentOptions.MetricsScope()), + blackholePolicies: blackholePolicies, } } @@ -96,10 +112,20 @@ func (h *pbHandler) Process(msg consumer.Message) { h.m.droppedMetricDecodeMalformed.Inc(1) return } + h.m.metricAccepted.Inc(1) h.wg.Add(1) r := NewProtobufCallback(msg, dec, h.wg) + + // If storage policy is blackholed, ack the message immediately and don't + // bother passing down the write path. + if _, ok := h.blackholePolicies[sp]; ok { + h.m.droppedMetricBlackholePolicy.Inc(1) + r.Callback(OnSuccess) + return + } + h.writeFn(h.ctx, dec.ID(), dec.TimeNanos(), dec.EncodeNanos(), dec.Value(), sp, r) } diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go index b67319836f..7394081a5d 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go @@ -127,6 +127,73 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) { require.Equal(t, m2.StoragePolicy, payload.sp) } +func TestM3MsgServerWithProtobufHandler_Blackhole(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + w := &mockWriter{m: make(map[string]payload)} + hOpts := Options{ + WriteFn: w.write, + InstrumentOptions: instrument.NewOptions(), + BlockholePolicies: []policy.StoragePolicy{validStoragePolicy}, + } + opts := consumer.NewOptions(). + SetAckBufferSize(1). + SetConnectionWriteBufferSize(1) + + s := server.NewServer( + "a", + consumer.NewMessageHandler(newProtobufProcessor(hOpts), opts), + server.NewOptions(), + ) + s.Serve(l) + + conn, err := net.Dial("tcp", l.Addr().String()) + require.NoError(t, err) + m1 := aggregated.MetricWithStoragePolicy{ + Metric: aggregated.Metric{ + ID: []byte(testID), + TimeNanos: 1000, + Value: 1, + Type: metric.GaugeType, + }, + StoragePolicy: validStoragePolicy, + } + + encoder := protobuf.NewAggregatedEncoder(nil) + require.NoError(t, encoder.Encode(m1, 2000)) + enc := proto.NewEncoder(opts.EncoderOptions()) + require.NoError(t, enc.Encode(&msgpb.Message{ + Value: encoder.Buffer().Bytes(), + })) + _, err = conn.Write(enc.Bytes()) + require.NoError(t, err) + + var a msgpb.Ack + dec := proto.NewDecoder(conn, opts.DecoderOptions(), 10) + require.NoError(t, dec.Decode(&a)) + require.Equal(t, 0, w.ingested()) + + m2 := aggregated.MetricWithStoragePolicy{ + Metric: aggregated.Metric{ + ID: []byte{}, + TimeNanos: 0, + Value: 0, + Type: metric.UnknownType, + }, + StoragePolicy: validStoragePolicy, + } + require.NoError(t, encoder.Encode(m2, 3000)) + enc = proto.NewEncoder(opts.EncoderOptions()) + require.NoError(t, enc.Encode(&msgpb.Message{ + Value: encoder.Buffer().Bytes(), + })) + _, err = conn.Write(enc.Bytes()) + require.NoError(t, err) + require.NoError(t, dec.Decode(&a)) + require.Equal(t, 0, w.ingested()) +} + type mockWriter struct { sync.Mutex From d76edd763273f1963451185b313dce70f156f5a3 Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Wed, 16 Sep 2020 00:38:30 -0400 Subject: [PATCH 02/10] wire up to config --- src/cmd/services/m3coordinator/server/m3msg/config.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/cmd/services/m3coordinator/server/m3msg/config.go b/src/cmd/services/m3coordinator/server/m3msg/config.go index f135b3ae39..962b1caa53 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/config.go +++ b/src/cmd/services/m3coordinator/server/m3msg/config.go @@ -21,6 +21,7 @@ package m3msg import ( + "github.com/m3db/m3/src/metrics/policy" "github.com/m3db/m3/src/msg/consumer" "github.com/m3db/m3/src/x/instrument" xio "github.com/m3db/m3/src/x/io" @@ -67,6 +68,7 @@ func (c Configuration) NewServer( type handlerConfiguration struct { // ProtobufDecoderPool configs the protobuf decoder pool. ProtobufDecoderPool pool.ObjectPoolConfiguration `yaml:"protobufDecoderPool"` + BlackholePolicies []policy.StoragePolicy `yaml:"blackholePolicies"` } func (c handlerConfiguration) newHandler( From 6d4931f4ca1dea7159699930c098475f95014518 Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Wed, 16 Sep 2020 11:22:21 -0400 Subject: [PATCH 03/10] add log message --- .../server/m3msg/protobuf_handler.go | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go index 5e9512f118..88ee9aa902 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go @@ -83,20 +83,27 @@ func newProtobufProcessor(opts Options) consumer.MessageProcessor { p := protobuf.NewAggregatedDecoderPool(opts.ProtobufDecoderPoolOptions) p.Init() - blackholePolicies := make(map[policy.StoragePolicy]struct{}, len(opts.BlockholePolicies)) - for _, sp := range opts.BlockholePolicies { - blackholePolicies[sp] = struct{}{} - } - - return &pbHandler{ + h := &pbHandler{ ctx: context.Background(), writeFn: opts.WriteFn, pool: p, wg: &sync.WaitGroup{}, logger: opts.InstrumentOptions.Logger(), m: newHandlerMetrics(opts.InstrumentOptions.MetricsScope()), - blackholePolicies: blackholePolicies, + blackholePolicies: make(map[policy.StoragePolicy]struct{}, len(opts.BlockholePolicies)), + } + + if len(opts.BlockholePolicies) > 0 { + policyNames := make([]string, len(opts.BlockholePolicies)) + for i, sp := range opts.BlockholePolicies { + h.blackholePolicies[sp] = struct{}{} + policyNames[i] = sp.String() + } + + h.logger.Info("m3msg handler blackholing metrics for configured policies", zap.Strings("policyNames", policyNames)) } + + return h } func (h *pbHandler) Process(msg consumer.Message) { From 35a8954a7da33de0ae4f87d839e2fa28b6bf9603 Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Wed, 16 Sep 2020 11:40:42 -0400 Subject: [PATCH 04/10] pass policies to handler --- src/cmd/services/m3coordinator/server/m3msg/config.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cmd/services/m3coordinator/server/m3msg/config.go b/src/cmd/services/m3coordinator/server/m3msg/config.go index 962b1caa53..953f62a2d0 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/config.go +++ b/src/cmd/services/m3coordinator/server/m3msg/config.go @@ -84,6 +84,7 @@ func (c handlerConfiguration) newHandler( }), ), ProtobufDecoderPoolOptions: c.ProtobufDecoderPool.NewObjectPoolOptions(iOpts), + BlockholePolicies: c.BlackholePolicies, }) return consumer.NewMessageHandler(p, cOpts), nil } From 8489665b14238c793f56ff81e2ca67fdddbf12ed Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Wed, 16 Sep 2020 12:19:44 -0400 Subject: [PATCH 05/10] truncate SPs --- .../server/m3msg/protobuf_handler.go | 5 ++++- .../server/m3msg/protobuf_handler_test.go | 21 ++++++++++++------ src/metrics/policy/resolution.go | 7 ++++++ src/metrics/policy/resolution_test.go | 22 +++++++++++++++++++ src/metrics/policy/storage_policy.go | 13 +++++++++-- src/metrics/policy/storage_policy_test.go | 9 ++++++++ 6 files changed, 67 insertions(+), 10 deletions(-) diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go index 88ee9aa902..a433052b65 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go @@ -96,6 +96,9 @@ func newProtobufProcessor(opts Options) consumer.MessageProcessor { if len(opts.BlockholePolicies) > 0 { policyNames := make([]string, len(opts.BlockholePolicies)) for i, sp := range opts.BlockholePolicies { + // We only match incoming policies by their stripped resoluton (no + // precision) and retention. + sp := sp.StripPrecision() h.blackholePolicies[sp] = struct{}{} policyNames[i] = sp.String() } @@ -127,7 +130,7 @@ func (h *pbHandler) Process(msg consumer.Message) { // If storage policy is blackholed, ack the message immediately and don't // bother passing down the write path. - if _, ok := h.blackholePolicies[sp]; ok { + if _, ok := h.blackholePolicies[sp.StripPrecision()]; ok { h.m.droppedMetricBlackholePolicy.Inc(1) r.Callback(OnSuccess) return diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go index 7394081a5d..3196cb6566 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go @@ -26,6 +26,7 @@ import ( "net" "sync" "testing" + "time" "github.com/m3db/m3/src/metrics/encoding/protobuf" "github.com/m3db/m3/src/metrics/metric" @@ -36,13 +37,19 @@ import ( "github.com/m3db/m3/src/msg/protocol/proto" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/server" + xtime "github.com/m3db/m3/src/x/time" "github.com/stretchr/testify/require" ) var ( - testID = "stats.sjc1.gauges.m3+some-name+dc=sjc1,env=production,service=foo,type=gauge" - validStoragePolicy = policy.MustParseStoragePolicy("1m:40d") + testID = "stats.sjc1.gauges.m3+some-name+dc=sjc1,env=production,service=foo,type=gauge" + + // baseStoragePolicy represents what we typically define in config for SP. + // precisionStoragePolicy is the same retention/resolution, but includes the + // precision (which is often included with incoming writes). + baseStoragePolicy = policy.MustParseStoragePolicy("1m:40d") + precisionStoragePolicy = policy.NewStoragePolicy(time.Minute, xtime.Second, 40*24*time.Hour) ) func TestM3MsgServerWithProtobufHandler(t *testing.T) { @@ -74,7 +81,7 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) { Value: 1, Type: metric.GaugeType, }, - StoragePolicy: validStoragePolicy, + StoragePolicy: precisionStoragePolicy, } encoder := protobuf.NewAggregatedEncoder(nil) @@ -98,7 +105,7 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) { Value: 0, Type: metric.UnknownType, }, - StoragePolicy: validStoragePolicy, + StoragePolicy: precisionStoragePolicy, } require.NoError(t, encoder.Encode(m2, 3000)) enc = proto.NewEncoder(opts.EncoderOptions()) @@ -135,7 +142,7 @@ func TestM3MsgServerWithProtobufHandler_Blackhole(t *testing.T) { hOpts := Options{ WriteFn: w.write, InstrumentOptions: instrument.NewOptions(), - BlockholePolicies: []policy.StoragePolicy{validStoragePolicy}, + BlockholePolicies: []policy.StoragePolicy{baseStoragePolicy}, } opts := consumer.NewOptions(). SetAckBufferSize(1). @@ -157,7 +164,7 @@ func TestM3MsgServerWithProtobufHandler_Blackhole(t *testing.T) { Value: 1, Type: metric.GaugeType, }, - StoragePolicy: validStoragePolicy, + StoragePolicy: precisionStoragePolicy, } encoder := protobuf.NewAggregatedEncoder(nil) @@ -181,7 +188,7 @@ func TestM3MsgServerWithProtobufHandler_Blackhole(t *testing.T) { Value: 0, Type: metric.UnknownType, }, - StoragePolicy: validStoragePolicy, + StoragePolicy: precisionStoragePolicy, } require.NoError(t, encoder.Encode(m2, 3000)) enc = proto.NewEncoder(opts.EncoderOptions()) diff --git a/src/metrics/policy/resolution.go b/src/metrics/policy/resolution.go index c2cf1cb829..874048a0e3 100644 --- a/src/metrics/policy/resolution.go +++ b/src/metrics/policy/resolution.go @@ -73,6 +73,13 @@ func (r *Resolution) FromProto(pb *policypb.Resolution) error { return nil } +// StripPrecision returns a copy of the resolution without its precision. +func (r Resolution) StripPrecision() Resolution { + return Resolution{ + Window: r.Window, + } +} + // String is the string representation of a resolution. func (r Resolution) String() string { _, maxUnit := xtime.MaxUnitForDuration(r.Window) diff --git a/src/metrics/policy/resolution_test.go b/src/metrics/policy/resolution_test.go index 9767ca0b0e..9b8ea04461 100644 --- a/src/metrics/policy/resolution_test.go +++ b/src/metrics/policy/resolution_test.go @@ -208,3 +208,25 @@ func TestParseResolutionNoPrecisionErrors(t *testing.T) { require.Error(t, err) } } + +func TestResolutionStripPrecision(t *testing.T) { + inputs := []Resolution{ + {Window: time.Second, Precision: xtime.Second}, + {Window: 10 * time.Second, Precision: xtime.Second}, + {Window: time.Minute, Precision: xtime.Minute}, + {Window: 5 * time.Minute, Precision: xtime.Minute}, + {Window: 10 * time.Minute, Precision: xtime.Minute}, + } + + expected := []Resolution{ + {Window: time.Second}, + {Window: 10 * time.Second}, + {Window: time.Minute}, + {Window: 5 * time.Minute}, + {Window: 10 * time.Minute}, + } + + for i, input := range inputs { + require.Equal(t, expected[i], input.StripPrecision()) + } +} diff --git a/src/metrics/policy/storage_policy.go b/src/metrics/policy/storage_policy.go index 1c4221a22e..45b20a3c1c 100644 --- a/src/metrics/policy/storage_policy.go +++ b/src/metrics/policy/storage_policy.go @@ -77,8 +77,17 @@ func NewStoragePolicyFromProto(pb *policypb.StoragePolicy) (StoragePolicy, error // retention width and resolution. The resolution precision is ignored // for equivalency (hence why the method is not named Equal). func (p StoragePolicy) Equivalent(other StoragePolicy) bool { - return p.resolution.Window == other.resolution.Window && - p.retention == other.retention + return p.StripPrecision() == other.StripPrecision() +} + +// StripPrecision returns the storage policy with its resolution's precision +// stripped. This allows comparing two stripped StoragePolicies with `==`, +// similar to Equivalent, but useful when a StoragePolicy is a key in a map. +func (p StoragePolicy) StripPrecision() StoragePolicy { + return StoragePolicy{ + resolution: p.resolution.StripPrecision(), + retention: p.retention, + } } // String is the string representation of a storage policy. diff --git a/src/metrics/policy/storage_policy_test.go b/src/metrics/policy/storage_policy_test.go index 3390d8a604..6c78e955e2 100644 --- a/src/metrics/policy/storage_policy_test.go +++ b/src/metrics/policy/storage_policy_test.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/x/test/testmarshal" xtime "github.com/m3db/m3/src/x/time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" yaml "gopkg.in/yaml.v2" ) @@ -574,3 +575,11 @@ func TestStoragePoliciesByRetentionAscResolutionAsc(t *testing.T) { } require.Equal(t, expected, inputs) } + +func TestStoragePolicyStripPrecision(t *testing.T) { + strSp := MustParseStoragePolicy("1m:1d") + preciseSp := NewStoragePolicy(time.Minute, xtime.Hour, 24*time.Hour) + assert.False(t, strSp == preciseSp) + assert.True(t, strSp.StripPrecision() == preciseSp.StripPrecision()) + assert.True(t, strSp.Equivalent(preciseSp)) +} From 7f95a8a506b0598d929ec45abb3cfde9f0a8c1a8 Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Wed, 16 Sep 2020 14:43:44 -0400 Subject: [PATCH 06/10] paranoid test --- .../m3coordinator/server/m3msg/protobuf_handler_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go index 3196cb6566..a55a324276 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go @@ -181,6 +181,7 @@ func TestM3MsgServerWithProtobufHandler_Blackhole(t *testing.T) { require.NoError(t, dec.Decode(&a)) require.Equal(t, 0, w.ingested()) + // Ensure a metric with a difference policy still gets ingested. m2 := aggregated.MetricWithStoragePolicy{ Metric: aggregated.Metric{ ID: []byte{}, @@ -188,7 +189,7 @@ func TestM3MsgServerWithProtobufHandler_Blackhole(t *testing.T) { Value: 0, Type: metric.UnknownType, }, - StoragePolicy: precisionStoragePolicy, + StoragePolicy: policy.MustParseStoragePolicy("5m:180d"), } require.NoError(t, encoder.Encode(m2, 3000)) enc = proto.NewEncoder(opts.EncoderOptions()) @@ -198,7 +199,7 @@ func TestM3MsgServerWithProtobufHandler_Blackhole(t *testing.T) { _, err = conn.Write(enc.Bytes()) require.NoError(t, err) require.NoError(t, dec.Decode(&a)) - require.Equal(t, 0, w.ingested()) + require.Equal(t, 1, w.ingested()) } type mockWriter struct { From ee729b38735c5d91132a4b8af402ce8215f1153f Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Wed, 16 Sep 2020 15:16:20 -0400 Subject: [PATCH 07/10] array of blackhole policies --- .../server/m3msg/protobuf_handler.go | 22 +++++++------------ src/metrics/policy/resolution.go | 7 ------ src/metrics/policy/resolution_test.go | 22 ------------------- src/metrics/policy/storage_policy.go | 13 ++--------- src/metrics/policy/storage_policy_test.go | 9 -------- 5 files changed, 10 insertions(+), 63 deletions(-) diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go index a433052b65..159aab0bbb 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go @@ -76,7 +76,7 @@ type pbHandler struct { m handlerMetrics // Set of policies for which when we see a metric we drop it on the floor. - blackholePolicies map[policy.StoragePolicy]struct{} + blackholePolicies []policy.StoragePolicy } func newProtobufProcessor(opts Options) consumer.MessageProcessor { @@ -90,19 +90,11 @@ func newProtobufProcessor(opts Options) consumer.MessageProcessor { wg: &sync.WaitGroup{}, logger: opts.InstrumentOptions.Logger(), m: newHandlerMetrics(opts.InstrumentOptions.MetricsScope()), - blackholePolicies: make(map[policy.StoragePolicy]struct{}, len(opts.BlockholePolicies)), + blackholePolicies: opts.BlockholePolicies, } if len(opts.BlockholePolicies) > 0 { policyNames := make([]string, len(opts.BlockholePolicies)) - for i, sp := range opts.BlockholePolicies { - // We only match incoming policies by their stripped resoluton (no - // precision) and retention. - sp := sp.StripPrecision() - h.blackholePolicies[sp] = struct{}{} - policyNames[i] = sp.String() - } - h.logger.Info("m3msg handler blackholing metrics for configured policies", zap.Strings("policyNames", policyNames)) } @@ -130,10 +122,12 @@ func (h *pbHandler) Process(msg consumer.Message) { // If storage policy is blackholed, ack the message immediately and don't // bother passing down the write path. - if _, ok := h.blackholePolicies[sp.StripPrecision()]; ok { - h.m.droppedMetricBlackholePolicy.Inc(1) - r.Callback(OnSuccess) - return + for _, blackholeSp := range h.blackholePolicies { + if sp.Equivalent(blackholeSp) { + h.m.droppedMetricBlackholePolicy.Inc(1) + r.Callback(OnSuccess) + return + } } h.writeFn(h.ctx, dec.ID(), dec.TimeNanos(), dec.EncodeNanos(), dec.Value(), sp, r) diff --git a/src/metrics/policy/resolution.go b/src/metrics/policy/resolution.go index 874048a0e3..c2cf1cb829 100644 --- a/src/metrics/policy/resolution.go +++ b/src/metrics/policy/resolution.go @@ -73,13 +73,6 @@ func (r *Resolution) FromProto(pb *policypb.Resolution) error { return nil } -// StripPrecision returns a copy of the resolution without its precision. -func (r Resolution) StripPrecision() Resolution { - return Resolution{ - Window: r.Window, - } -} - // String is the string representation of a resolution. func (r Resolution) String() string { _, maxUnit := xtime.MaxUnitForDuration(r.Window) diff --git a/src/metrics/policy/resolution_test.go b/src/metrics/policy/resolution_test.go index 9b8ea04461..9767ca0b0e 100644 --- a/src/metrics/policy/resolution_test.go +++ b/src/metrics/policy/resolution_test.go @@ -208,25 +208,3 @@ func TestParseResolutionNoPrecisionErrors(t *testing.T) { require.Error(t, err) } } - -func TestResolutionStripPrecision(t *testing.T) { - inputs := []Resolution{ - {Window: time.Second, Precision: xtime.Second}, - {Window: 10 * time.Second, Precision: xtime.Second}, - {Window: time.Minute, Precision: xtime.Minute}, - {Window: 5 * time.Minute, Precision: xtime.Minute}, - {Window: 10 * time.Minute, Precision: xtime.Minute}, - } - - expected := []Resolution{ - {Window: time.Second}, - {Window: 10 * time.Second}, - {Window: time.Minute}, - {Window: 5 * time.Minute}, - {Window: 10 * time.Minute}, - } - - for i, input := range inputs { - require.Equal(t, expected[i], input.StripPrecision()) - } -} diff --git a/src/metrics/policy/storage_policy.go b/src/metrics/policy/storage_policy.go index 45b20a3c1c..1c4221a22e 100644 --- a/src/metrics/policy/storage_policy.go +++ b/src/metrics/policy/storage_policy.go @@ -77,17 +77,8 @@ func NewStoragePolicyFromProto(pb *policypb.StoragePolicy) (StoragePolicy, error // retention width and resolution. The resolution precision is ignored // for equivalency (hence why the method is not named Equal). func (p StoragePolicy) Equivalent(other StoragePolicy) bool { - return p.StripPrecision() == other.StripPrecision() -} - -// StripPrecision returns the storage policy with its resolution's precision -// stripped. This allows comparing two stripped StoragePolicies with `==`, -// similar to Equivalent, but useful when a StoragePolicy is a key in a map. -func (p StoragePolicy) StripPrecision() StoragePolicy { - return StoragePolicy{ - resolution: p.resolution.StripPrecision(), - retention: p.retention, - } + return p.resolution.Window == other.resolution.Window && + p.retention == other.retention } // String is the string representation of a storage policy. diff --git a/src/metrics/policy/storage_policy_test.go b/src/metrics/policy/storage_policy_test.go index 6c78e955e2..3390d8a604 100644 --- a/src/metrics/policy/storage_policy_test.go +++ b/src/metrics/policy/storage_policy_test.go @@ -30,7 +30,6 @@ import ( "github.com/m3db/m3/src/x/test/testmarshal" xtime "github.com/m3db/m3/src/x/time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" yaml "gopkg.in/yaml.v2" ) @@ -575,11 +574,3 @@ func TestStoragePoliciesByRetentionAscResolutionAsc(t *testing.T) { } require.Equal(t, expected, inputs) } - -func TestStoragePolicyStripPrecision(t *testing.T) { - strSp := MustParseStoragePolicy("1m:1d") - preciseSp := NewStoragePolicy(time.Minute, xtime.Hour, 24*time.Hour) - assert.False(t, strSp == preciseSp) - assert.True(t, strSp.StripPrecision() == preciseSp.StripPrecision()) - assert.True(t, strSp.Equivalent(preciseSp)) -} From 533b9996b3819f4aa9bcfcc270310879002e6ccb Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Wed, 16 Sep 2020 15:17:31 -0400 Subject: [PATCH 08/10] log policies --- .../services/m3coordinator/server/m3msg/protobuf_handler.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go index 159aab0bbb..f982b24a89 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go @@ -95,6 +95,9 @@ func newProtobufProcessor(opts Options) consumer.MessageProcessor { if len(opts.BlockholePolicies) > 0 { policyNames := make([]string, len(opts.BlockholePolicies)) + for i, sp := range h.blackholePolicies { + policyNames[i] = sp.String() + } h.logger.Info("m3msg handler blackholing metrics for configured policies", zap.Strings("policyNames", policyNames)) } From f0dba2b94b786b134050d76114015a6f71807b21 Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Wed, 16 Sep 2020 15:44:38 -0400 Subject: [PATCH 09/10] index -> append --- .../services/m3coordinator/server/m3msg/protobuf_handler.go | 6 +++--- .../m3coordinator/server/m3msg/protobuf_handler_test.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go index f982b24a89..7589b95595 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go @@ -94,9 +94,9 @@ func newProtobufProcessor(opts Options) consumer.MessageProcessor { } if len(opts.BlockholePolicies) > 0 { - policyNames := make([]string, len(opts.BlockholePolicies)) - for i, sp := range h.blackholePolicies { - policyNames[i] = sp.String() + policyNames := make([]string, 0, len(opts.BlockholePolicies)) + for _, sp := range h.blackholePolicies { + policyNames = append(policyNames, sp.String()) } h.logger.Info("m3msg handler blackholing metrics for configured policies", zap.Strings("policyNames", policyNames)) } diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go index a55a324276..187750ab49 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go @@ -43,7 +43,7 @@ import ( ) var ( - testID = "stats.sjc1.gauges.m3+some-name+dc=sjc1,env=production,service=foo,type=gauge" + testID = "stats.foo1.gauges.m3+some-name+dc=foo1,env=production,service=foo,type=gauge" // baseStoragePolicy represents what we typically define in config for SP. // precisionStoragePolicy is the same retention/resolution, but includes the @@ -181,7 +181,7 @@ func TestM3MsgServerWithProtobufHandler_Blackhole(t *testing.T) { require.NoError(t, dec.Decode(&a)) require.Equal(t, 0, w.ingested()) - // Ensure a metric with a difference policy still gets ingested. + // Ensure a metric with a different policy still gets ingested. m2 := aggregated.MetricWithStoragePolicy{ Metric: aggregated.Metric{ ID: []byte{}, From 0fd5198fee1be49f7db49a0adeca8407afbb24c2 Mon Sep 17 00:00:00 2001 From: Matt Schallert Date: Wed, 16 Sep 2020 16:53:17 -0400 Subject: [PATCH 10/10] test base policy --- .../server/m3msg/protobuf_handler_test.go | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go index 187750ab49..02e411749a 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go @@ -200,6 +200,27 @@ func TestM3MsgServerWithProtobufHandler_Blackhole(t *testing.T) { require.NoError(t, err) require.NoError(t, dec.Decode(&a)) require.Equal(t, 1, w.ingested()) + + // Ensure a metric with base policy (equivalent but default precision) is + // still ignored. + m3 := aggregated.MetricWithStoragePolicy{ + Metric: aggregated.Metric{ + ID: []byte(testID), + TimeNanos: 1000, + Value: 1, + Type: metric.GaugeType, + }, + StoragePolicy: baseStoragePolicy, + } + require.NoError(t, encoder.Encode(m3, 3000)) + enc = proto.NewEncoder(opts.EncoderOptions()) + require.NoError(t, enc.Encode(&msgpb.Message{ + Value: encoder.Buffer().Bytes(), + })) + _, err = conn.Write(enc.Bytes()) + require.NoError(t, err) + require.NoError(t, dec.Decode(&a)) + require.Equal(t, 1, w.ingested()) } type mockWriter struct {