diff --git a/src/cmd/services/m3coordinator/ingest/ingest.go b/src/cmd/services/m3coordinator/ingest/ingest.go index 47b22a5ca4..e41df426a7 100644 --- a/src/cmd/services/m3coordinator/ingest/ingest.go +++ b/src/cmd/services/m3coordinator/ingest/ingest.go @@ -111,7 +111,7 @@ func (i *Ingester) Ingest( metricTime time.Time, value float64, sp policy.StoragePolicy, - callback *m3msg.RefCountedCallback, + callback m3msg.Callbackable, ) { op := i.p.Get().(*ingestOp) op.c = ctx @@ -139,7 +139,7 @@ type ingestOp struct { metricTime time.Time value float64 sp policy.StoragePolicy - callback *m3msg.RefCountedCallback + callback m3msg.Callbackable q storage.WriteQuery } diff --git a/src/cmd/services/m3coordinator/server/m3msg/handler.go b/src/cmd/services/m3coordinator/server/m3msg/msgpack_handler.go similarity index 88% rename from src/cmd/services/m3coordinator/server/m3msg/handler.go rename to src/cmd/services/m3coordinator/server/m3msg/msgpack_handler.go index 49db8ad707..ee1b1cc4c9 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/handler.go +++ b/src/cmd/services/m3coordinator/server/m3msg/msgpack_handler.go @@ -30,22 +30,24 @@ import ( "github.com/m3db/m3/src/msg/consumer" "github.com/m3db/m3x/instrument" "github.com/m3db/m3x/log" + "github.com/m3db/m3x/pool" "github.com/uber-go/tally" ) // Options for the ingest handler. type Options struct { - InstrumentOptions instrument.Options - WriteFn WriteFn - AggregatedIteratorOptions msgpack.AggregatedIteratorOptions + InstrumentOptions instrument.Options + WriteFn WriteFn + AggregatedIteratorOptions msgpack.AggregatedIteratorOptions + ProtobufDecoderPoolOptions pool.ObjectPoolOptions } type handlerMetrics struct { - messageReadError tally.Counter - metricAccepted tally.Counter - droppedMetricMsgpackDecodeError tally.Counter - droppedMetricDecodeMalformed tally.Counter + messageReadError tally.Counter + metricAccepted tally.Counter + droppedMetricDecodeError tally.Counter + droppedMetricDecodeMalformed tally.Counter } func newHandlerMetrics(scope tally.Scope) handlerMetrics { @@ -53,8 +55,8 @@ func newHandlerMetrics(scope tally.Scope) handlerMetrics { return handlerMetrics{ messageReadError: scope.Counter("message-read-error"), metricAccepted: messageScope.Counter("accepted"), - droppedMetricMsgpackDecodeError: messageScope.Tagged(map[string]string{ - "reason": "msgpack-decode-error", + droppedMetricDecodeError: messageScope.Tagged(map[string]string{ + "reason": "decode-error", }).Counter("dropped"), droppedMetricDecodeMalformed: messageScope.Tagged(map[string]string{ "reason": "decode-malformed", @@ -154,6 +156,6 @@ func (h *perConsumerHandler) processMessage( r.decRef() if err := h.it.Err(); err != nil && err != io.EOF { h.logger.WithFields(log.NewErrField(h.it.Err())).Errorf("could not decode msg %s", msg.Bytes()) - h.m.droppedMetricMsgpackDecodeError.Inc(1) + h.m.droppedMetricDecodeError.Inc(1) } } diff --git a/src/cmd/services/m3coordinator/server/m3msg/handler_test.go b/src/cmd/services/m3coordinator/server/m3msg/msgpack_handler_test.go similarity index 99% rename from src/cmd/services/m3coordinator/server/m3msg/handler_test.go rename to src/cmd/services/m3coordinator/server/m3msg/msgpack_handler_test.go index 5974a9c579..017fe9210e 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/handler_test.go +++ b/src/cmd/services/m3coordinator/server/m3msg/msgpack_handler_test.go @@ -109,7 +109,7 @@ func (m *mockWriter) write( metricTime time.Time, value float64, sp policy.StoragePolicy, - callbackable *RefCountedCallback, + callbackable Callbackable, ) { m.Lock() m.n++ diff --git a/src/cmd/services/m3coordinator/server/m3msg/pb_handler.go b/src/cmd/services/m3coordinator/server/m3msg/pb_handler.go new file mode 100644 index 0000000000..2e8762e905 --- /dev/null +++ b/src/cmd/services/m3coordinator/server/m3msg/pb_handler.go @@ -0,0 +1,92 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package m3msg + +import ( + "context" + "time" + + "github.com/m3db/m3/src/metrics/encoding/protobuf" + "github.com/m3db/m3/src/msg/consumer" + "github.com/m3db/m3x/log" +) + +type pbHandler struct { + ctx context.Context + writeFn WriteFn + pool protobuf.AggregatedDecoderPool + logger log.Logger + m handlerMetrics +} + +func newProtobufHandler(opts Options) *pbHandler { + p := protobuf.NewAggregatedDecoderPool(opts.ProtobufDecoderPoolOptions) + p.Init() + return &pbHandler{ + ctx: context.Background(), + writeFn: opts.WriteFn, + pool: p, + logger: opts.InstrumentOptions.Logger(), + m: newHandlerMetrics(opts.InstrumentOptions.MetricsScope()), + } +} + +func (h *pbHandler) message(msg consumer.Message) { + dec := h.pool.Get() + if err := dec.Decode(msg.Bytes()); err != nil { + h.logger.WithFields(log.NewErrField(err)).Error("invalid raw metric") + h.m.droppedMetricDecodeError.Inc(1) + return + } + sp, err := dec.StoragePolicy() + if err != nil { + h.logger.WithFields(log.NewErrField(err)).Error("invalid storage policy") + h.m.droppedMetricDecodeMalformed.Inc(1) + return + } + h.m.metricAccepted.Inc(1) + + r := newProtobufCallback(msg, dec) + h.writeFn(h.ctx, dec.ID(), time.Unix(0, dec.TimeNanos()), dec.Value(), sp, r) +} + +type protobufCallback struct { + msg consumer.Message + dec *protobuf.AggregatedDecoder +} + +func newProtobufCallback( + msg consumer.Message, + dec *protobuf.AggregatedDecoder, +) *protobufCallback { + return &protobufCallback{ + msg: msg, + dec: dec, + } +} + +func (c *protobufCallback) Callback(t CallbackType) { + switch t { + case OnSuccess, OnNonRetriableError: + c.msg.Ack() + } + c.dec.Close() +} diff --git a/src/cmd/services/m3coordinator/server/m3msg/pb_handler_test.go b/src/cmd/services/m3coordinator/server/m3msg/pb_handler_test.go new file mode 100644 index 0000000000..7b0db3cd23 --- /dev/null +++ b/src/cmd/services/m3coordinator/server/m3msg/pb_handler_test.go @@ -0,0 +1,96 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package m3msg + +import ( + "net" + "testing" + + "github.com/m3db/m3/src/metrics/encoding/protobuf" + "github.com/m3db/m3/src/metrics/metric" + "github.com/m3db/m3/src/metrics/metric/aggregated" + "github.com/m3db/m3/src/msg/consumer" + "github.com/m3db/m3/src/msg/generated/proto/msgpb" + "github.com/m3db/m3/src/msg/protocol/proto" + "github.com/m3db/m3x/instrument" + "github.com/m3db/m3x/server" + + "github.com/stretchr/testify/require" +) + +var testID = "stats.sjc1.gauges.m3+some-name+dc=sjc1,env=production,service=foo,type=gauge" + +func TestM3msgServerWithProtobufHandler(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + w := &mockWriter{m: make(map[string]payload)} + hOpts := Options{ + WriteFn: w.write, + InstrumentOptions: instrument.NewOptions(), + } + handler := newProtobufHandler(hOpts) + require.NoError(t, err) + + opts := consumer.NewOptions(). + SetAckBufferSize(1). + SetConnectionWriteBufferSize(1) + + s := server.NewServer( + "a", + consumer.NewMessageHandler(handler.message, opts), + server.NewOptions(), + ) + s.Serve(l) + + conn, err := net.Dial("tcp", l.Addr().String()) + require.NoError(t, err) + m := aggregated.MetricWithStoragePolicy{ + Metric: aggregated.Metric{ + ID: []byte(testID), + TimeNanos: 1000, + Value: 1, + Type: metric.GaugeType, + }, + StoragePolicy: validStoragePolicy, + } + + encoder := protobuf.NewAggregatedEncoder(nil) + require.NoError(t, encoder.Encode(m, 2000)) + enc := proto.NewEncoder(opts.EncoderOptions()) + require.NoError(t, enc.Encode(&msgpb.Message{ + Value: encoder.Buffer().Bytes(), + })) + _, err = conn.Write(enc.Bytes()) + require.NoError(t, err) + + var a msgpb.Ack + dec := proto.NewDecoder(conn, opts.DecoderOptions()) + require.NoError(t, dec.Decode(&a)) + require.Equal(t, 1, w.ingested()) + + payload, ok := w.m[string(m.ID)] + require.True(t, ok) + require.Equal(t, string(m.ID), payload.id) + require.Equal(t, m.TimeNanos, payload.metricTime.UnixNano()) + require.Equal(t, m.Value, payload.value) + require.Equal(t, m.StoragePolicy, payload.sp) +} diff --git a/src/cmd/services/m3coordinator/server/m3msg/types.go b/src/cmd/services/m3coordinator/server/m3msg/types.go index 04e96c0fcd..b3d5d270bc 100644 --- a/src/cmd/services/m3coordinator/server/m3msg/types.go +++ b/src/cmd/services/m3coordinator/server/m3msg/types.go @@ -36,7 +36,7 @@ type WriteFn func( metricTime time.Time, value float64, sp policy.StoragePolicy, - callback *RefCountedCallback, + callback Callbackable, ) // CallbackType defines the type for the callback. @@ -52,6 +52,11 @@ const ( OnRetriableError ) +// Callbackable can be called back. +type Callbackable interface { + Callback(t CallbackType) +} + // RefCountedCallback wraps a message with a reference count, the message will // be acked once the reference count decrements to zero. type RefCountedCallback struct { diff --git a/src/metrics/metric/aggregated/types_test.go b/src/metrics/metric/aggregated/types_test.go index 560e3530a3..e00e7f50b4 100644 --- a/src/metrics/metric/aggregated/types_test.go +++ b/src/metrics/metric/aggregated/types_test.go @@ -55,7 +55,7 @@ var ( Value: 21.99, } testBadMetric = Metric{ - Type: metric.UnknownType, + Type: 999, } testMetricWithStoragePolicy = MetricWithStoragePolicy{ Metric: testMetric1, @@ -74,7 +74,7 @@ var ( Values: []float64{1.34, -26.57}, } testBadForwardedMetric = ForwardedMetric{ - Type: metric.UnknownType, + Type: 999, } testForwardMetadata1 = metadata.ForwardMetadata{ AggregationID: aggregation.DefaultID, @@ -208,10 +208,10 @@ var ( NumForwardedTimes: 2, } testBadMetricProto = metricpb.TimedMetric{ - Type: metricpb.MetricType_UNKNOWN, + Type: 999, } testBadForwardedMetricProto = metricpb.ForwardedMetric{ - Type: metricpb.MetricType_UNKNOWN, + Type: 999, } )