diff --git a/src/cmd/services/m3coordinator/server/m3msg/config.go b/src/cmd/services/m3coordinator/server/m3msg/config.go index f135b3ae39..953f62a2d0 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/config.go +++ b/src/cmd/services/m3coordinator/server/m3msg/config.go @@ -21,6 +21,7 @@ package m3msg import ( + "github.com/m3db/m3/src/metrics/policy" "github.com/m3db/m3/src/msg/consumer" "github.com/m3db/m3/src/x/instrument" xio "github.com/m3db/m3/src/x/io" @@ -67,6 +68,7 @@ func (c Configuration) NewServer( type handlerConfiguration struct { // ProtobufDecoderPool configs the protobuf decoder pool. ProtobufDecoderPool pool.ObjectPoolConfiguration `yaml:"protobufDecoderPool"` + BlackholePolicies []policy.StoragePolicy `yaml:"blackholePolicies"` } func (c handlerConfiguration) newHandler( @@ -82,6 +84,7 @@ func (c handlerConfiguration) newHandler( }), ), ProtobufDecoderPoolOptions: c.ProtobufDecoderPool.NewObjectPoolOptions(iOpts), + BlockholePolicies: c.BlackholePolicies, }) return consumer.NewMessageHandler(p, cOpts), nil } diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go index 4c3d78b242..7589b95595 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go @@ -25,6 +25,7 @@ import ( "sync" "github.com/m3db/m3/src/metrics/encoding/protobuf" + "github.com/m3db/m3/src/metrics/policy" "github.com/m3db/m3/src/msg/consumer" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" @@ -38,11 +39,13 @@ type Options struct { InstrumentOptions instrument.Options WriteFn WriteFn ProtobufDecoderPoolOptions pool.ObjectPoolOptions + BlockholePolicies []policy.StoragePolicy } type handlerMetrics struct { messageReadError tally.Counter metricAccepted tally.Counter + droppedMetricBlackholePolicy tally.Counter droppedMetricDecodeError tally.Counter droppedMetricDecodeMalformed tally.Counter } @@ -55,6 +58,9 @@ func newHandlerMetrics(scope tally.Scope) handlerMetrics { droppedMetricDecodeError: messageScope.Tagged(map[string]string{ "reason": "decode-error", }).Counter("dropped"), + droppedMetricBlackholePolicy: messageScope.Tagged(map[string]string{ + "reason": "blackhole-policy", + }).Counter("dropped"), droppedMetricDecodeMalformed: messageScope.Tagged(map[string]string{ "reason": "decode-malformed", }).Counter("dropped"), @@ -68,19 +74,34 @@ type pbHandler struct { wg *sync.WaitGroup logger *zap.Logger m handlerMetrics + + // Set of policies for which when we see a metric we drop it on the floor. + blackholePolicies []policy.StoragePolicy } func newProtobufProcessor(opts Options) consumer.MessageProcessor { p := protobuf.NewAggregatedDecoderPool(opts.ProtobufDecoderPoolOptions) p.Init() - return &pbHandler{ - ctx: context.Background(), - writeFn: opts.WriteFn, - pool: p, - wg: &sync.WaitGroup{}, - logger: opts.InstrumentOptions.Logger(), - m: newHandlerMetrics(opts.InstrumentOptions.MetricsScope()), + + h := &pbHandler{ + ctx: context.Background(), + writeFn: opts.WriteFn, + pool: p, + wg: &sync.WaitGroup{}, + logger: opts.InstrumentOptions.Logger(), + m: newHandlerMetrics(opts.InstrumentOptions.MetricsScope()), + blackholePolicies: opts.BlockholePolicies, + } + + if len(opts.BlockholePolicies) > 0 { + policyNames := make([]string, 0, len(opts.BlockholePolicies)) + for _, sp := range h.blackholePolicies { + policyNames = append(policyNames, sp.String()) + } + h.logger.Info("m3msg handler blackholing metrics for configured policies", zap.Strings("policyNames", policyNames)) } + + return h } func (h *pbHandler) Process(msg consumer.Message) { @@ -96,10 +117,22 @@ func (h *pbHandler) Process(msg consumer.Message) { h.m.droppedMetricDecodeMalformed.Inc(1) return } + h.m.metricAccepted.Inc(1) h.wg.Add(1) r := NewProtobufCallback(msg, dec, h.wg) + + // If storage policy is blackholed, ack the message immediately and don't + // bother passing down the write path. + for _, blackholeSp := range h.blackholePolicies { + if sp.Equivalent(blackholeSp) { + h.m.droppedMetricBlackholePolicy.Inc(1) + r.Callback(OnSuccess) + return + } + } + h.writeFn(h.ctx, dec.ID(), dec.TimeNanos(), dec.EncodeNanos(), dec.Value(), sp, r) } diff --git a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go index b67319836f..02e411749a 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go +++ b/src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go @@ -26,6 +26,7 @@ import ( "net" "sync" "testing" + "time" "github.com/m3db/m3/src/metrics/encoding/protobuf" "github.com/m3db/m3/src/metrics/metric" @@ -36,13 +37,19 @@ import ( "github.com/m3db/m3/src/msg/protocol/proto" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/server" + xtime "github.com/m3db/m3/src/x/time" "github.com/stretchr/testify/require" ) var ( - testID = "stats.sjc1.gauges.m3+some-name+dc=sjc1,env=production,service=foo,type=gauge" - validStoragePolicy = policy.MustParseStoragePolicy("1m:40d") + testID = "stats.foo1.gauges.m3+some-name+dc=foo1,env=production,service=foo,type=gauge" + + // baseStoragePolicy represents what we typically define in config for SP. + // precisionStoragePolicy is the same retention/resolution, but includes the + // precision (which is often included with incoming writes). + baseStoragePolicy = policy.MustParseStoragePolicy("1m:40d") + precisionStoragePolicy = policy.NewStoragePolicy(time.Minute, xtime.Second, 40*24*time.Hour) ) func TestM3MsgServerWithProtobufHandler(t *testing.T) { @@ -74,7 +81,7 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) { Value: 1, Type: metric.GaugeType, }, - StoragePolicy: validStoragePolicy, + StoragePolicy: precisionStoragePolicy, } encoder := protobuf.NewAggregatedEncoder(nil) @@ -98,7 +105,7 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) { Value: 0, Type: metric.UnknownType, }, - StoragePolicy: validStoragePolicy, + StoragePolicy: precisionStoragePolicy, } require.NoError(t, encoder.Encode(m2, 3000)) enc = proto.NewEncoder(opts.EncoderOptions()) @@ -127,6 +134,95 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) { require.Equal(t, m2.StoragePolicy, payload.sp) } +func TestM3MsgServerWithProtobufHandler_Blackhole(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + w := &mockWriter{m: make(map[string]payload)} + hOpts := Options{ + WriteFn: w.write, + InstrumentOptions: instrument.NewOptions(), + BlockholePolicies: []policy.StoragePolicy{baseStoragePolicy}, + } + opts := consumer.NewOptions(). + SetAckBufferSize(1). + SetConnectionWriteBufferSize(1) + + s := server.NewServer( + "a", + consumer.NewMessageHandler(newProtobufProcessor(hOpts), opts), + server.NewOptions(), + ) + s.Serve(l) + + conn, err := net.Dial("tcp", l.Addr().String()) + require.NoError(t, err) + m1 := aggregated.MetricWithStoragePolicy{ + Metric: aggregated.Metric{ + ID: []byte(testID), + TimeNanos: 1000, + Value: 1, + Type: metric.GaugeType, + }, + StoragePolicy: precisionStoragePolicy, + } + + encoder := protobuf.NewAggregatedEncoder(nil) + require.NoError(t, encoder.Encode(m1, 2000)) + enc := proto.NewEncoder(opts.EncoderOptions()) + require.NoError(t, enc.Encode(&msgpb.Message{ + Value: encoder.Buffer().Bytes(), + })) + _, err = conn.Write(enc.Bytes()) + require.NoError(t, err) + + var a msgpb.Ack + dec := proto.NewDecoder(conn, opts.DecoderOptions(), 10) + require.NoError(t, dec.Decode(&a)) + require.Equal(t, 0, w.ingested()) + + // Ensure a metric with a different policy still gets ingested. + m2 := aggregated.MetricWithStoragePolicy{ + Metric: aggregated.Metric{ + ID: []byte{}, + TimeNanos: 0, + Value: 0, + Type: metric.UnknownType, + }, + StoragePolicy: policy.MustParseStoragePolicy("5m:180d"), + } + require.NoError(t, encoder.Encode(m2, 3000)) + enc = proto.NewEncoder(opts.EncoderOptions()) + require.NoError(t, enc.Encode(&msgpb.Message{ + Value: encoder.Buffer().Bytes(), + })) + _, err = conn.Write(enc.Bytes()) + require.NoError(t, err) + require.NoError(t, dec.Decode(&a)) + require.Equal(t, 1, w.ingested()) + + // Ensure a metric with base policy (equivalent but default precision) is + // still ignored. + m3 := aggregated.MetricWithStoragePolicy{ + Metric: aggregated.Metric{ + ID: []byte(testID), + TimeNanos: 1000, + Value: 1, + Type: metric.GaugeType, + }, + StoragePolicy: baseStoragePolicy, + } + require.NoError(t, encoder.Encode(m3, 3000)) + enc = proto.NewEncoder(opts.EncoderOptions()) + require.NoError(t, enc.Encode(&msgpb.Message{ + Value: encoder.Buffer().Bytes(), + })) + _, err = conn.Write(enc.Bytes()) + require.NoError(t, err) + require.NoError(t, dec.Decode(&a)) + require.Equal(t, 1, w.ingested()) +} + type mockWriter struct { sync.Mutex