Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coord] Configurable blackholed SP's back from agg #2641

Merged
merged 12 commits into from
Sep 16, 2020
3 changes: 3 additions & 0 deletions src/cmd/services/m3coordinator/server/m3msg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package m3msg

import (
"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/msg/consumer"
"github.com/m3db/m3/src/x/instrument"
xio "github.com/m3db/m3/src/x/io"
Expand Down Expand Up @@ -67,6 +68,7 @@ func (c Configuration) NewServer(
type handlerConfiguration struct {
// ProtobufDecoderPool configs the protobuf decoder pool.
ProtobufDecoderPool pool.ObjectPoolConfiguration `yaml:"protobufDecoderPool"`
BlackholePolicies []policy.StoragePolicy `yaml:"blackholePolicies"`
}

func (c handlerConfiguration) newHandler(
Expand All @@ -82,6 +84,7 @@ func (c handlerConfiguration) newHandler(
}),
),
ProtobufDecoderPoolOptions: c.ProtobufDecoderPool.NewObjectPoolOptions(iOpts),
BlockholePolicies: c.BlackholePolicies,
})
return consumer.NewMessageHandler(p, cOpts), nil
}
Expand Down
47 changes: 40 additions & 7 deletions src/cmd/services/m3coordinator/server/m3msg/protobuf_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"

"github.com/m3db/m3/src/metrics/encoding/protobuf"
"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/msg/consumer"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
Expand All @@ -38,11 +39,13 @@ type Options struct {
InstrumentOptions instrument.Options
WriteFn WriteFn
ProtobufDecoderPoolOptions pool.ObjectPoolOptions
BlockholePolicies []policy.StoragePolicy
}

type handlerMetrics struct {
messageReadError tally.Counter
metricAccepted tally.Counter
droppedMetricBlackholePolicy tally.Counter
droppedMetricDecodeError tally.Counter
droppedMetricDecodeMalformed tally.Counter
}
Expand All @@ -55,6 +58,9 @@ func newHandlerMetrics(scope tally.Scope) handlerMetrics {
droppedMetricDecodeError: messageScope.Tagged(map[string]string{
"reason": "decode-error",
}).Counter("dropped"),
droppedMetricBlackholePolicy: messageScope.Tagged(map[string]string{
"reason": "blackhole-policy",
}).Counter("dropped"),
droppedMetricDecodeMalformed: messageScope.Tagged(map[string]string{
"reason": "decode-malformed",
}).Counter("dropped"),
Expand All @@ -68,19 +74,34 @@ type pbHandler struct {
wg *sync.WaitGroup
logger *zap.Logger
m handlerMetrics

// Set of policies for which when we see a metric we drop it on the floor.
blackholePolicies []policy.StoragePolicy
}

func newProtobufProcessor(opts Options) consumer.MessageProcessor {
p := protobuf.NewAggregatedDecoderPool(opts.ProtobufDecoderPoolOptions)
p.Init()
return &pbHandler{
ctx: context.Background(),
writeFn: opts.WriteFn,
pool: p,
wg: &sync.WaitGroup{},
logger: opts.InstrumentOptions.Logger(),
m: newHandlerMetrics(opts.InstrumentOptions.MetricsScope()),

h := &pbHandler{
ctx: context.Background(),
writeFn: opts.WriteFn,
pool: p,
wg: &sync.WaitGroup{},
logger: opts.InstrumentOptions.Logger(),
m: newHandlerMetrics(opts.InstrumentOptions.MetricsScope()),
blackholePolicies: opts.BlockholePolicies,
}

if len(opts.BlockholePolicies) > 0 {
policyNames := make([]string, len(opts.BlockholePolicies))
for i, sp := range h.blackholePolicies {
policyNames[i] = sp.String()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: using append instead of indexed insert is a little more future proof

}
h.logger.Info("m3msg handler blackholing metrics for configured policies", zap.Strings("policyNames", policyNames))
}

return h
}

func (h *pbHandler) Process(msg consumer.Message) {
Expand All @@ -96,10 +117,22 @@ func (h *pbHandler) Process(msg consumer.Message) {
h.m.droppedMetricDecodeMalformed.Inc(1)
return
}

h.m.metricAccepted.Inc(1)

h.wg.Add(1)
r := NewProtobufCallback(msg, dec, h.wg)

// If storage policy is blackholed, ack the message immediately and don't
// bother passing down the write path.
for _, blackholeSp := range h.blackholePolicies {
if sp.Equivalent(blackholeSp) {
h.m.droppedMetricBlackholePolicy.Inc(1)
r.Callback(OnSuccess)
return
}
}

h.writeFn(h.ctx, dec.ID(), dec.TimeNanos(), dec.EncodeNanos(), dec.Value(), sp, r)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"net"
"sync"
"testing"
"time"

"github.com/m3db/m3/src/metrics/encoding/protobuf"
"github.com/m3db/m3/src/metrics/metric"
Expand All @@ -36,13 +37,19 @@ import (
"github.com/m3db/m3/src/msg/protocol/proto"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/server"
xtime "github.com/m3db/m3/src/x/time"

"github.com/stretchr/testify/require"
)

var (
testID = "stats.sjc1.gauges.m3+some-name+dc=sjc1,env=production,service=foo,type=gauge"
validStoragePolicy = policy.MustParseStoragePolicy("1m:40d")
testID = "stats.sjc1.gauges.m3+some-name+dc=sjc1,env=production,service=foo,type=gauge"

// baseStoragePolicy represents what we typically define in config for SP.
// precisionStoragePolicy is the same retention/resolution, but includes the
// precision (which is often included with incoming writes).
baseStoragePolicy = policy.MustParseStoragePolicy("1m:40d")
precisionStoragePolicy = policy.NewStoragePolicy(time.Minute, xtime.Second, 40*24*time.Hour)
)

func TestM3MsgServerWithProtobufHandler(t *testing.T) {
Expand Down Expand Up @@ -74,7 +81,7 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) {
Value: 1,
Type: metric.GaugeType,
},
StoragePolicy: validStoragePolicy,
StoragePolicy: precisionStoragePolicy,
}

encoder := protobuf.NewAggregatedEncoder(nil)
Expand All @@ -98,7 +105,7 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) {
Value: 0,
Type: metric.UnknownType,
},
StoragePolicy: validStoragePolicy,
StoragePolicy: precisionStoragePolicy,
}
require.NoError(t, encoder.Encode(m2, 3000))
enc = proto.NewEncoder(opts.EncoderOptions())
Expand Down Expand Up @@ -127,6 +134,74 @@ func TestM3MsgServerWithProtobufHandler(t *testing.T) {
require.Equal(t, m2.StoragePolicy, payload.sp)
}

func TestM3MsgServerWithProtobufHandler_Blackhole(t *testing.T) {
l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)

w := &mockWriter{m: make(map[string]payload)}
hOpts := Options{
WriteFn: w.write,
InstrumentOptions: instrument.NewOptions(),
BlockholePolicies: []policy.StoragePolicy{baseStoragePolicy},
}
opts := consumer.NewOptions().
SetAckBufferSize(1).
SetConnectionWriteBufferSize(1)

s := server.NewServer(
"a",
consumer.NewMessageHandler(newProtobufProcessor(hOpts), opts),
server.NewOptions(),
)
s.Serve(l)

conn, err := net.Dial("tcp", l.Addr().String())
require.NoError(t, err)
m1 := aggregated.MetricWithStoragePolicy{
Metric: aggregated.Metric{
ID: []byte(testID),
TimeNanos: 1000,
Value: 1,
Type: metric.GaugeType,
},
StoragePolicy: precisionStoragePolicy,
}

encoder := protobuf.NewAggregatedEncoder(nil)
require.NoError(t, encoder.Encode(m1, 2000))
enc := proto.NewEncoder(opts.EncoderOptions())
require.NoError(t, enc.Encode(&msgpb.Message{
Value: encoder.Buffer().Bytes(),
}))
_, err = conn.Write(enc.Bytes())
require.NoError(t, err)

var a msgpb.Ack
dec := proto.NewDecoder(conn, opts.DecoderOptions(), 10)
require.NoError(t, dec.Decode(&a))
require.Equal(t, 0, w.ingested())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth adding a test with baseStoragePolicy StoragePolicy being denied as well?


// Ensure a metric with a difference policy still gets ingested.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: different policy

m2 := aggregated.MetricWithStoragePolicy{
Metric: aggregated.Metric{
ID: []byte{},
TimeNanos: 0,
Value: 0,
Type: metric.UnknownType,
},
StoragePolicy: policy.MustParseStoragePolicy("5m:180d"),
}
require.NoError(t, encoder.Encode(m2, 3000))
enc = proto.NewEncoder(opts.EncoderOptions())
require.NoError(t, enc.Encode(&msgpb.Message{
Value: encoder.Buffer().Bytes(),
}))
_, err = conn.Write(enc.Bytes())
require.NoError(t, err)
require.NoError(t, dec.Decode(&a))
require.Equal(t, 1, w.ingested())
}

type mockWriter struct {
sync.Mutex

Expand Down