Skip to content

Commit

Permalink
Add ingester server for metrics encoded protobuf (#1252)
Browse files Browse the repository at this point in the history
  • Loading branch information
cw9 authored Dec 18, 2018
1 parent 7dd6c34 commit a948b5a
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 18 deletions.
4 changes: 2 additions & 2 deletions src/cmd/services/m3coordinator/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (i *Ingester) Ingest(
metricTime time.Time,
value float64,
sp policy.StoragePolicy,
callback *m3msg.RefCountedCallback,
callback m3msg.Callbackable,
) {
op := i.p.Get().(*ingestOp)
op.c = ctx
Expand Down Expand Up @@ -139,7 +139,7 @@ type ingestOp struct {
metricTime time.Time
value float64
sp policy.StoragePolicy
callback *m3msg.RefCountedCallback
callback m3msg.Callbackable
q storage.WriteQuery
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,33 @@ import (
"github.com/m3db/m3/src/msg/consumer"
"github.com/m3db/m3x/instrument"
"github.com/m3db/m3x/log"
"github.com/m3db/m3x/pool"

"github.com/uber-go/tally"
)

// Options for the ingest handler.
type Options struct {
InstrumentOptions instrument.Options
WriteFn WriteFn
AggregatedIteratorOptions msgpack.AggregatedIteratorOptions
InstrumentOptions instrument.Options
WriteFn WriteFn
AggregatedIteratorOptions msgpack.AggregatedIteratorOptions
ProtobufDecoderPoolOptions pool.ObjectPoolOptions
}

type handlerMetrics struct {
messageReadError tally.Counter
metricAccepted tally.Counter
droppedMetricMsgpackDecodeError tally.Counter
droppedMetricDecodeMalformed tally.Counter
messageReadError tally.Counter
metricAccepted tally.Counter
droppedMetricDecodeError tally.Counter
droppedMetricDecodeMalformed tally.Counter
}

func newHandlerMetrics(scope tally.Scope) handlerMetrics {
messageScope := scope.SubScope("metric")
return handlerMetrics{
messageReadError: scope.Counter("message-read-error"),
metricAccepted: messageScope.Counter("accepted"),
droppedMetricMsgpackDecodeError: messageScope.Tagged(map[string]string{
"reason": "msgpack-decode-error",
droppedMetricDecodeError: messageScope.Tagged(map[string]string{
"reason": "decode-error",
}).Counter("dropped"),
droppedMetricDecodeMalformed: messageScope.Tagged(map[string]string{
"reason": "decode-malformed",
Expand Down Expand Up @@ -154,6 +156,6 @@ func (h *perConsumerHandler) processMessage(
r.decRef()
if err := h.it.Err(); err != nil && err != io.EOF {
h.logger.WithFields(log.NewErrField(h.it.Err())).Errorf("could not decode msg %s", msg.Bytes())
h.m.droppedMetricMsgpackDecodeError.Inc(1)
h.m.droppedMetricDecodeError.Inc(1)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (m *mockWriter) write(
metricTime time.Time,
value float64,
sp policy.StoragePolicy,
callbackable *RefCountedCallback,
callbackable Callbackable,
) {
m.Lock()
m.n++
Expand Down
92 changes: 92 additions & 0 deletions src/cmd/services/m3coordinator/server/m3msg/pb_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package m3msg

import (
"context"
"time"

"github.com/m3db/m3/src/metrics/encoding/protobuf"
"github.com/m3db/m3/src/msg/consumer"
"github.com/m3db/m3x/log"
)

type pbHandler struct {
ctx context.Context
writeFn WriteFn
pool protobuf.AggregatedDecoderPool
logger log.Logger
m handlerMetrics
}

func newProtobufHandler(opts Options) *pbHandler {
p := protobuf.NewAggregatedDecoderPool(opts.ProtobufDecoderPoolOptions)
p.Init()
return &pbHandler{
ctx: context.Background(),
writeFn: opts.WriteFn,
pool: p,
logger: opts.InstrumentOptions.Logger(),
m: newHandlerMetrics(opts.InstrumentOptions.MetricsScope()),
}
}

func (h *pbHandler) message(msg consumer.Message) {
dec := h.pool.Get()
if err := dec.Decode(msg.Bytes()); err != nil {
h.logger.WithFields(log.NewErrField(err)).Error("invalid raw metric")
h.m.droppedMetricDecodeError.Inc(1)
return
}
sp, err := dec.StoragePolicy()
if err != nil {
h.logger.WithFields(log.NewErrField(err)).Error("invalid storage policy")
h.m.droppedMetricDecodeMalformed.Inc(1)
return
}
h.m.metricAccepted.Inc(1)

r := newProtobufCallback(msg, dec)
h.writeFn(h.ctx, dec.ID(), time.Unix(0, dec.TimeNanos()), dec.Value(), sp, r)
}

type protobufCallback struct {
msg consumer.Message
dec *protobuf.AggregatedDecoder
}

func newProtobufCallback(
msg consumer.Message,
dec *protobuf.AggregatedDecoder,
) *protobufCallback {
return &protobufCallback{
msg: msg,
dec: dec,
}
}

func (c *protobufCallback) Callback(t CallbackType) {
switch t {
case OnSuccess, OnNonRetriableError:
c.msg.Ack()
}
c.dec.Close()
}
96 changes: 96 additions & 0 deletions src/cmd/services/m3coordinator/server/m3msg/pb_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package m3msg

import (
"net"
"testing"

"github.com/m3db/m3/src/metrics/encoding/protobuf"
"github.com/m3db/m3/src/metrics/metric"
"github.com/m3db/m3/src/metrics/metric/aggregated"
"github.com/m3db/m3/src/msg/consumer"
"github.com/m3db/m3/src/msg/generated/proto/msgpb"
"github.com/m3db/m3/src/msg/protocol/proto"
"github.com/m3db/m3x/instrument"
"github.com/m3db/m3x/server"

"github.com/stretchr/testify/require"
)

var testID = "stats.sjc1.gauges.m3+some-name+dc=sjc1,env=production,service=foo,type=gauge"

func TestM3msgServerWithProtobufHandler(t *testing.T) {
l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)

w := &mockWriter{m: make(map[string]payload)}
hOpts := Options{
WriteFn: w.write,
InstrumentOptions: instrument.NewOptions(),
}
handler := newProtobufHandler(hOpts)
require.NoError(t, err)

opts := consumer.NewOptions().
SetAckBufferSize(1).
SetConnectionWriteBufferSize(1)

s := server.NewServer(
"a",
consumer.NewMessageHandler(handler.message, opts),
server.NewOptions(),
)
s.Serve(l)

conn, err := net.Dial("tcp", l.Addr().String())
require.NoError(t, err)
m := aggregated.MetricWithStoragePolicy{
Metric: aggregated.Metric{
ID: []byte(testID),
TimeNanos: 1000,
Value: 1,
Type: metric.GaugeType,
},
StoragePolicy: validStoragePolicy,
}

encoder := protobuf.NewAggregatedEncoder(nil)
require.NoError(t, encoder.Encode(m, 2000))
enc := proto.NewEncoder(opts.EncoderOptions())
require.NoError(t, enc.Encode(&msgpb.Message{
Value: encoder.Buffer().Bytes(),
}))
_, err = conn.Write(enc.Bytes())
require.NoError(t, err)

var a msgpb.Ack
dec := proto.NewDecoder(conn, opts.DecoderOptions())
require.NoError(t, dec.Decode(&a))
require.Equal(t, 1, w.ingested())

payload, ok := w.m[string(m.ID)]
require.True(t, ok)
require.Equal(t, string(m.ID), payload.id)
require.Equal(t, m.TimeNanos, payload.metricTime.UnixNano())
require.Equal(t, m.Value, payload.value)
require.Equal(t, m.StoragePolicy, payload.sp)
}
7 changes: 6 additions & 1 deletion src/cmd/services/m3coordinator/server/m3msg/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type WriteFn func(
metricTime time.Time,
value float64,
sp policy.StoragePolicy,
callback *RefCountedCallback,
callback Callbackable,
)

// CallbackType defines the type for the callback.
Expand All @@ -52,6 +52,11 @@ const (
OnRetriableError
)

// Callbackable can be called back.
type Callbackable interface {
Callback(t CallbackType)
}

// RefCountedCallback wraps a message with a reference count, the message will
// be acked once the reference count decrements to zero.
type RefCountedCallback struct {
Expand Down
8 changes: 4 additions & 4 deletions src/metrics/metric/aggregated/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var (
Value: 21.99,
}
testBadMetric = Metric{
Type: metric.UnknownType,
Type: 999,
}
testMetricWithStoragePolicy = MetricWithStoragePolicy{
Metric: testMetric1,
Expand All @@ -74,7 +74,7 @@ var (
Values: []float64{1.34, -26.57},
}
testBadForwardedMetric = ForwardedMetric{
Type: metric.UnknownType,
Type: 999,
}
testForwardMetadata1 = metadata.ForwardMetadata{
AggregationID: aggregation.DefaultID,
Expand Down Expand Up @@ -208,10 +208,10 @@ var (
NumForwardedTimes: 2,
}
testBadMetricProto = metricpb.TimedMetric{
Type: metricpb.MetricType_UNKNOWN,
Type: 999,
}
testBadForwardedMetricProto = metricpb.ForwardedMetric{
Type: metricpb.MetricType_UNKNOWN,
Type: 999,
}
)

Expand Down

0 comments on commit a948b5a

Please sign in to comment.