Skip to content

Commit

Permalink
[coord] Configurable blackholed SP's back from agg (#2641)
Browse files Browse the repository at this point in the history
There are cases where we want to ignore metrics for a given storage
policy at the coordinator as they come back from the aggregator over
m3msg. Specifically, the coordinators may still receive aggregated
metrics with storage policies that they no longer have namespace
configuration for. This PR allows dropping those metrics as they come
back from the aggregator based on their storage policy.
  • Loading branch information
schallert authored Sep 16, 2020
1 parent 273b2e3 commit 2f568ee
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 11 deletions.
3 changes: 3 additions & 0 deletions src/cmd/services/m3coordinator/server/m3msg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package m3msg

import (
"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/msg/consumer"
"github.com/m3db/m3/src/x/instrument"
xio "github.com/m3db/m3/src/x/io"
Expand Down Expand Up @@ -67,6 +68,7 @@ func (c Configuration) NewServer(
type handlerConfiguration struct {
// ProtobufDecoderPool configs the protobuf decoder pool.
ProtobufDecoderPool pool.ObjectPoolConfiguration `yaml:"protobufDecoderPool"`
BlackholePolicies []policy.StoragePolicy `yaml:"blackholePolicies"`
}

func (c handlerConfiguration) newHandler(
Expand All @@ -82,6 +84,7 @@ func (c handlerConfiguration) newHandler(
}),
),
ProtobufDecoderPoolOptions: c.ProtobufDecoderPool.NewObjectPoolOptions(iOpts),
BlockholePolicies: c.BlackholePolicies,
})
return consumer.NewMessageHandler(p, cOpts), nil
}
Expand Down
47 changes: 40 additions & 7 deletions src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"

"github.com/m3db/m3/src/metrics/encoding/protobuf"
"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/msg/consumer"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
Expand All @@ -38,11 +39,13 @@ type Options struct {
InstrumentOptions instrument.Options
WriteFn WriteFn
ProtobufDecoderPoolOptions pool.ObjectPoolOptions
BlockholePolicies []policy.StoragePolicy
}

type handlerMetrics struct {
messageReadError tally.Counter
metricAccepted tally.Counter
droppedMetricBlackholePolicy tally.Counter
droppedMetricDecodeError tally.Counter
droppedMetricDecodeMalformed tally.Counter
}
Expand All @@ -55,6 +58,9 @@ func newHandlerMetrics(scope tally.Scope) handlerMetrics {
droppedMetricDecodeError: messageScope.Tagged(map[string]string{
"reason": "decode-error",
}).Counter("dropped"),
droppedMetricBlackholePolicy: messageScope.Tagged(map[string]string{
"reason": "blackhole-policy",
}).Counter("dropped"),
droppedMetricDecodeMalformed: messageScope.Tagged(map[string]string{
"reason": "decode-malformed",
}).Counter("dropped"),
Expand All @@ -68,19 +74,34 @@ type pbHandler struct {
wg *sync.WaitGroup
logger *zap.Logger
m handlerMetrics

// Set of policies for which when we see a metric we drop it on the floor.
blackholePolicies []policy.StoragePolicy
}

func newProtobufProcessor(opts Options) consumer.MessageProcessor {
p := protobuf.NewAggregatedDecoderPool(opts.ProtobufDecoderPoolOptions)
p.Init()
return &pbHandler{
ctx: context.Background(),
writeFn: opts.WriteFn,
pool: p,
wg: &sync.WaitGroup{},
logger: opts.InstrumentOptions.Logger(),
m: newHandlerMetrics(opts.InstrumentOptions.MetricsScope()),

h := &pbHandler{
ctx: context.Background(),
writeFn: opts.WriteFn,
pool: p,
wg: &sync.WaitGroup{},
logger: opts.InstrumentOptions.Logger(),
m: newHandlerMetrics(opts.InstrumentOptions.MetricsScope()),
blackholePolicies: opts.BlockholePolicies,
}

if len(opts.BlockholePolicies) > 0 {
policyNames := make([]string, 0, len(opts.BlockholePolicies))
for _, sp := range h.blackholePolicies {
policyNames = append(policyNames, sp.String())
}
h.logger.Info("m3msg handler blackholing metrics for configured policies", zap.Strings("policyNames", policyNames))
}

return h
}

func (h *pbHandler) Process(msg consumer.Message) {
Expand All @@ -96,10 +117,22 @@ func (h *pbHandler) Process(msg consumer.Message) {
h.m.droppedMetricDecodeMalformed.Inc(1)
return
}

h.m.metricAccepted.Inc(1)

h.wg.Add(1)
r := NewProtobufCallback(msg, dec, h.wg)

// If storage policy is blackholed, ack the message immediately and don't
// bother passing down the write path.
for _, blackholeSp := range h.blackholePolicies {
if sp.Equivalent(blackholeSp) {
h.m.droppedMetricBlackholePolicy.Inc(1)
r.Callback(OnSuccess)
return
}
}

h.writeFn(h.ctx, dec.ID(), dec.TimeNanos(), dec.EncodeNanos(), dec.Value(), sp, r)
}

Expand Down
104 changes: 100 additions & 4 deletions src/cmd/services/m3coordinator/server/m3msg/protobuf_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"net"
"sync"
"testing"
"time"

"github.com/m3db/m3/src/metrics/encoding/protobuf"
"github.com/m3db/m3/src/metrics/metric"
Expand All @@ -36,13 +37,19 @@ import (
"github.com/m3db/m3/src/msg/protocol/proto"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/server"
xtime "github.com/m3db/m3/src/x/time"

"github.com/stretchr/testify/require"
)

var (
testID = "stats.sjc1.gauges.m3+some-name+dc=sjc1,env=production,service=foo,type=gauge"
validStoragePolicy = policy.MustParseStoragePolicy("1m:40d")
testID = "stats.foo1.gauges.m3+some-name+dc=foo1,env=production,service=foo,type=gauge"

// baseStoragePolicy represents what we typically define in config for SP.
// precisionStoragePolicy is the same retention/resolution, but includes the
// precision (which is often included with incoming writes).
baseStoragePolicy = policy.MustParseStoragePolicy("1m:40d")
precisionStoragePolicy = policy.NewStoragePolicy(time.Minute, xtime.Second, 40*24*time.Hour)
)

func TestM3MsgServerWithProtobufHandler(t *testing.T) {
Expand Down Expand Up @@ -74,7 +81,7 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) {
Value: 1,
Type: metric.GaugeType,
},
StoragePolicy: validStoragePolicy,
StoragePolicy: precisionStoragePolicy,
}

encoder := protobuf.NewAggregatedEncoder(nil)
Expand All @@ -98,7 +105,7 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) {
Value: 0,
Type: metric.UnknownType,
},
StoragePolicy: validStoragePolicy,
StoragePolicy: precisionStoragePolicy,
}
require.NoError(t, encoder.Encode(m2, 3000))
enc = proto.NewEncoder(opts.EncoderOptions())
Expand Down Expand Up @@ -127,6 +134,95 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) {
require.Equal(t, m2.StoragePolicy, payload.sp)
}

func TestM3MsgServerWithProtobufHandler_Blackhole(t *testing.T) {
l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)

w := &mockWriter{m: make(map[string]payload)}
hOpts := Options{
WriteFn: w.write,
InstrumentOptions: instrument.NewOptions(),
BlockholePolicies: []policy.StoragePolicy{baseStoragePolicy},
}
opts := consumer.NewOptions().
SetAckBufferSize(1).
SetConnectionWriteBufferSize(1)

s := server.NewServer(
"a",
consumer.NewMessageHandler(newProtobufProcessor(hOpts), opts),
server.NewOptions(),
)
s.Serve(l)

conn, err := net.Dial("tcp", l.Addr().String())
require.NoError(t, err)
m1 := aggregated.MetricWithStoragePolicy{
Metric: aggregated.Metric{
ID: []byte(testID),
TimeNanos: 1000,
Value: 1,
Type: metric.GaugeType,
},
StoragePolicy: precisionStoragePolicy,
}

encoder := protobuf.NewAggregatedEncoder(nil)
require.NoError(t, encoder.Encode(m1, 2000))
enc := proto.NewEncoder(opts.EncoderOptions())
require.NoError(t, enc.Encode(&msgpb.Message{
Value: encoder.Buffer().Bytes(),
}))
_, err = conn.Write(enc.Bytes())
require.NoError(t, err)

var a msgpb.Ack
dec := proto.NewDecoder(conn, opts.DecoderOptions(), 10)
require.NoError(t, dec.Decode(&a))
require.Equal(t, 0, w.ingested())

// Ensure a metric with a different policy still gets ingested.
m2 := aggregated.MetricWithStoragePolicy{
Metric: aggregated.Metric{
ID: []byte{},
TimeNanos: 0,
Value: 0,
Type: metric.UnknownType,
},
StoragePolicy: policy.MustParseStoragePolicy("5m:180d"),
}
require.NoError(t, encoder.Encode(m2, 3000))
enc = proto.NewEncoder(opts.EncoderOptions())
require.NoError(t, enc.Encode(&msgpb.Message{
Value: encoder.Buffer().Bytes(),
}))
_, err = conn.Write(enc.Bytes())
require.NoError(t, err)
require.NoError(t, dec.Decode(&a))
require.Equal(t, 1, w.ingested())

// Ensure a metric with base policy (equivalent but default precision) is
// still ignored.
m3 := aggregated.MetricWithStoragePolicy{
Metric: aggregated.Metric{
ID: []byte(testID),
TimeNanos: 1000,
Value: 1,
Type: metric.GaugeType,
},
StoragePolicy: baseStoragePolicy,
}
require.NoError(t, encoder.Encode(m3, 3000))
enc = proto.NewEncoder(opts.EncoderOptions())
require.NoError(t, enc.Encode(&msgpb.Message{
Value: encoder.Buffer().Bytes(),
}))
_, err = conn.Write(enc.Bytes())
require.NoError(t, err)
require.NoError(t, dec.Decode(&a))
require.Equal(t, 1, w.ingested())
}

type mockWriter struct {
sync.Mutex

Expand Down

0 comments on commit 2f568ee

Please sign in to comment.