diff --git a/src/msg/consumer/consumer.go b/src/msg/consumer/consumer.go index f880b67ec7..8703f0b6fa 100644 --- a/src/msg/consumer/consumer.go +++ b/src/msg/consumer/consumer.go @@ -29,6 +29,7 @@ import ( "github.com/m3db/m3/src/msg/protocol/proto" "github.com/m3db/m3/src/x/clock" xio "github.com/m3db/m3/src/x/io" + xtime "github.com/m3db/m3/src/x/time" "github.com/uber-go/tally" ) @@ -75,6 +76,10 @@ type metrics struct { ackSent tally.Counter ackEncodeError tally.Counter ackWriteError tally.Counter + // the duration between the producer sending the message and the consumer reading the message. + receiveLatency tally.Histogram + // the duration between the consumer reading the message and sending an ack to the producer. + handleLatency tally.Histogram } func newConsumerMetrics(scope tally.Scope) metrics { @@ -84,6 +89,12 @@ func newConsumerMetrics(scope tally.Scope) metrics { ackSent: scope.Counter("ack-sent"), ackEncodeError: scope.Counter("ack-encode-error"), ackWriteError: scope.Counter("ack-write-error"), + receiveLatency: scope.Histogram("receive-latency", + // 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.2s, 2.4s, 4.8s, 9.6s + tally.MustMakeExponentialDurationBuckets(time.Millisecond*10, 2, 11)), + handleLatency: scope.Histogram("handle-latency", + // 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.2s, 2.4s, 4.8s, 9.6s + tally.MustMakeExponentialDurationBuckets(time.Millisecond*10, 2, 11)), } } @@ -156,6 +167,9 @@ func (c *consumer) Message() (Message, error) { c.m.messageDecodeError.Inc(1) return nil, err } + if m.Metadata.SentAtNanos > 0 { + c.m.receiveLatency.RecordDuration(xtime.Since(xtime.UnixNano(m.Metadata.SentAtNanos))) + } c.m.messageReceived.Inc(1) return m, nil } @@ -270,6 +284,10 @@ func (m *message) ShardID() uint64 { return m.Metadata.Shard } +func (m *message) SentAtNanos() uint64 { + return m.Metadata.SentAtNanos +} + func resetProto(m *msgpb.Message) { m.Metadata.Id = 0 m.Metadata.Shard = 0 diff --git a/src/msg/consumer/consumer_mock.go b/src/msg/consumer/consumer_mock.go index 3b024f45d9..e1f081405e 100644 --- a/src/msg/consumer/consumer_mock.go +++ b/src/msg/consumer/consumer_mock.go @@ -79,6 +79,20 @@ func (mr *MockMessageMockRecorder) Bytes() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bytes", reflect.TypeOf((*MockMessage)(nil).Bytes)) } +// SentAtNanos mocks base method. +func (m *MockMessage) SentAtNanos() uint64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SentAtNanos") + ret0, _ := ret[0].(uint64) + return ret0 +} + +// SentAtNanos indicates an expected call of SentAtNanos. +func (mr *MockMessageMockRecorder) SentAtNanos() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SentAtNanos", reflect.TypeOf((*MockMessage)(nil).SentAtNanos)) +} + // ShardID mocks base method. func (m *MockMessage) ShardID() uint64 { m.ctrl.T.Helper() diff --git a/src/msg/consumer/handlers.go b/src/msg/consumer/handlers.go index 572767a078..84b0f4ad6e 100644 --- a/src/msg/consumer/handlers.go +++ b/src/msg/consumer/handlers.go @@ -23,6 +23,7 @@ package consumer import ( "io" "net" + "time" "github.com/m3db/m3/src/x/server" @@ -60,7 +61,9 @@ func (h *messageHandler) Handle(conn net.Conn) { if msgErr != nil { break } + start := time.Now() c.process(msg) + h.m.handleLatency.RecordDuration(time.Since(start)) } if msgErr != nil && msgErr != io.EOF { h.opts.InstrumentOptions().Logger().With(zap.Error(msgErr)).Error("could not read message from consumer") diff --git a/src/msg/consumer/types.go b/src/msg/consumer/types.go index bdd1ea3f06..cf556eb678 100644 --- a/src/msg/consumer/types.go +++ b/src/msg/consumer/types.go @@ -38,6 +38,9 @@ type Message interface { // ShardID returns shard ID of the Message. ShardID() uint64 + + // SentAtNanos returns when the producer sent the Message. + SentAtNanos() uint64 } // Consumer receives messages from a connection. diff --git a/src/msg/generated/proto/msgpb/msg.pb.go b/src/msg/generated/proto/msgpb/msg.pb.go index 732b0a4500..30395d7060 100644 --- a/src/msg/generated/proto/msgpb/msg.pb.go +++ b/src/msg/generated/proto/msgpb/msg.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-gogo. DO NOT EDIT. // source: github.com/m3db/m3/src/msg/generated/proto/msgpb/msg.proto -// Copyright (c) 2018 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -53,8 +53,9 @@ var _ = math.Inf const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package type Metadata struct { - Shard uint64 `protobuf:"varint,1,opt,name=shard,proto3" json:"shard,omitempty"` - Id uint64 `protobuf:"varint,2,opt,name=id,proto3" json:"id,omitempty"` + Shard uint64 `protobuf:"varint,1,opt,name=shard,proto3" json:"shard,omitempty"` + Id uint64 `protobuf:"varint,2,opt,name=id,proto3" json:"id,omitempty"` + SentAtNanos uint64 `protobuf:"varint,3,opt,name=sentAtNanos,proto3" json:"sentAtNanos,omitempty"` } func (m *Metadata) Reset() { *m = Metadata{} } @@ -76,6 +77,13 @@ func (m *Metadata) GetId() uint64 { return 0 } +func (m *Metadata) GetSentAtNanos() uint64 { + if m != nil { + return m.SentAtNanos + } + return 0 +} + type Message struct { Metadata Metadata `protobuf:"bytes,1,opt,name=metadata" json:"metadata"` Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` @@ -146,6 +154,11 @@ func (m *Metadata) MarshalTo(dAtA []byte) (int, error) { i++ i = encodeVarintMsg(dAtA, i, uint64(m.Id)) } + if m.SentAtNanos != 0 { + dAtA[i] = 0x18 + i++ + i = encodeVarintMsg(dAtA, i, uint64(m.SentAtNanos)) + } return i, nil } @@ -229,6 +242,9 @@ func (m *Metadata) Size() (n int) { if m.Id != 0 { n += 1 + sovMsg(uint64(m.Id)) } + if m.SentAtNanos != 0 { + n += 1 + sovMsg(uint64(m.SentAtNanos)) + } return n } @@ -336,6 +352,25 @@ func (m *Metadata) Unmarshal(dAtA []byte) error { break } } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SentAtNanos", wireType) + } + m.SentAtNanos = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMsg + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.SentAtNanos |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipMsg(dAtA[iNdEx:]) @@ -659,20 +694,21 @@ func init() { } var fileDescriptorMsg = []byte{ - // 232 bytes of a gzipped FileDescriptorProto + // 253 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xb2, 0x4a, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0xcf, 0x35, 0x4e, 0x49, 0xd2, 0xcf, 0x35, 0xd6, 0x2f, 0x2e, 0x4a, 0xd6, 0xcf, 0x2d, 0x4e, 0xd7, 0x4f, 0x4f, 0xcd, 0x4b, 0x2d, 0x4a, 0x2c, 0x49, 0x4d, 0xd1, 0x2f, 0x28, 0xca, 0x2f, 0xc9, 0x07, 0x89, 0x15, 0x24, 0x81, 0x48, 0x3d, 0x30, 0x5f, 0x88, 0x15, 0x2c, 0x20, 0xa5, 0x8b, 0x64, 0x44, 0x7a, 0x7e, 0x7a, 0x3e, 0x44, 0x75, 0x52, 0x69, 0x1a, - 0x98, 0x07, 0xd1, 0x0a, 0x62, 0x41, 0x74, 0x29, 0x19, 0x70, 0x71, 0xf8, 0xa6, 0x96, 0x24, 0xa6, + 0x98, 0x07, 0xd1, 0x0a, 0x62, 0x41, 0x74, 0x29, 0x05, 0x71, 0x71, 0xf8, 0xa6, 0x96, 0x24, 0xa6, 0x24, 0x96, 0x24, 0x0a, 0x89, 0x70, 0xb1, 0x16, 0x67, 0x24, 0x16, 0xa5, 0x48, 0x30, 0x2a, 0x30, 0x6a, 0xb0, 0x04, 0x41, 0x38, 0x42, 0x7c, 0x5c, 0x4c, 0x99, 0x29, 0x12, 0x4c, 0x60, 0x21, 0xa6, - 0xcc, 0x14, 0xa5, 0x20, 0x2e, 0x76, 0xdf, 0xd4, 0xe2, 0xe2, 0xc4, 0xf4, 0x54, 0x21, 0x43, 0x2e, - 0x8e, 0x5c, 0xa8, 0x66, 0xb0, 0x1e, 0x6e, 0x23, 0x7e, 0x3d, 0xb0, 0x2b, 0xf4, 0x60, 0x66, 0x3a, - 0xb1, 0x9c, 0xb8, 0x27, 0xcf, 0x10, 0x04, 0x57, 0x06, 0xb2, 0xa3, 0x2c, 0x31, 0xa7, 0x34, 0x15, - 0x6c, 0x20, 0x4f, 0x10, 0x84, 0xa3, 0x64, 0xc1, 0xc5, 0xec, 0x98, 0x9c, 0x8d, 0x66, 0x1e, 0x33, - 0x11, 0xe6, 0x39, 0x09, 0x9c, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, 0x47, 0x72, - 0x8c, 0x13, 0x1e, 0xcb, 0x31, 0x24, 0xb1, 0x81, 0x3d, 0x66, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, - 0x31, 0xd7, 0x86, 0x7e, 0x4c, 0x01, 0x00, 0x00, + 0xcc, 0x14, 0x21, 0x05, 0x2e, 0xee, 0xe2, 0xd4, 0xbc, 0x12, 0xc7, 0x12, 0xbf, 0xc4, 0xbc, 0xfc, + 0x62, 0x09, 0x66, 0xb0, 0x04, 0xb2, 0x90, 0x52, 0x10, 0x17, 0xbb, 0x6f, 0x6a, 0x71, 0x71, 0x62, + 0x7a, 0xaa, 0x90, 0x21, 0x17, 0x47, 0x2e, 0xd4, 0x78, 0xb0, 0xa9, 0xdc, 0x46, 0xfc, 0x7a, 0x60, + 0x77, 0xea, 0xc1, 0x6c, 0x75, 0x62, 0x39, 0x71, 0x4f, 0x9e, 0x21, 0x08, 0xae, 0x0c, 0xe4, 0x8a, + 0xb2, 0xc4, 0x9c, 0xd2, 0x54, 0xb0, 0x95, 0x3c, 0x41, 0x10, 0x8e, 0x92, 0x05, 0x17, 0xb3, 0x63, + 0x72, 0x36, 0x9a, 0x79, 0xcc, 0x44, 0x98, 0xe7, 0x24, 0x70, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, + 0x72, 0x8c, 0x0f, 0x1e, 0xc9, 0x31, 0x4e, 0x78, 0x2c, 0xc7, 0x90, 0xc4, 0x06, 0xf6, 0xba, 0x31, + 0x20, 0x00, 0x00, 0xff, 0xff, 0x60, 0x29, 0xbd, 0x49, 0x6e, 0x01, 0x00, 0x00, } diff --git a/src/msg/generated/proto/msgpb/msg.proto b/src/msg/generated/proto/msgpb/msg.proto index 8ebdace164..69431f5214 100644 --- a/src/msg/generated/proto/msgpb/msg.proto +++ b/src/msg/generated/proto/msgpb/msg.proto @@ -7,6 +7,7 @@ import "github.com/gogo/protobuf/gogoproto/gogo.proto"; message Metadata { uint64 shard = 1; uint64 id = 2; + uint64 sentAtNanos = 3; } message Message { diff --git a/src/msg/integration/setup.go b/src/msg/integration/setup.go index 0c377a2f46..c1e616a7ed 100644 --- a/src/msg/integration/setup.go +++ b/src/msg/integration/setup.go @@ -46,7 +46,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/zap" - yaml "gopkg.in/yaml.v2" + "gopkg.in/yaml.v2" ) const ( @@ -512,7 +512,9 @@ func (c *testConsumer) consumeAndAck(totalConsumed *atomic.Int64) { consumer.Close() return } - + if msg.SentAtNanos() <= 0 { + panic("sentAtNanos not set") + } wp.Go( func() { c.Lock() diff --git a/src/msg/producer/writer/message.go b/src/msg/producer/writer/message.go index 219adb258d..60914707c8 100644 --- a/src/msg/producer/writer/message.go +++ b/src/msg/producer/writer/message.go @@ -122,6 +122,11 @@ func (m *message) Metadata() metadata { return m.meta } +// SetSentAt sets the sentAtNanos on the metadata proto. +func (m *message) SetSentAt(nanos int64) { + m.pb.Metadata.SentAtNanos = uint64(nanos) +} + // Marshaler returns the marshaler and a bool to indicate whether the marshaler is valid. func (m *message) Marshaler() (proto.Marshaler, bool) { return &m.pb, !m.RefCountedMessage.IsDroppedOrConsumed() diff --git a/src/msg/producer/writer/message_writer.go b/src/msg/producer/writer/message_writer.go index 21f15f3e5a..c00a3b6dc2 100644 --- a/src/msg/producer/writer/message_writer.go +++ b/src/msg/producer/writer/message_writer.go @@ -279,8 +279,10 @@ func (w *messageWriterImpl) Write(rm *producer.RefCountedMessage) { rm.IncRef() w.msgID++ meta := metadata{ - shard: w.replicatedShardID, - id: w.msgID, + metadataKey: metadataKey{ + shard: w.replicatedShardID, + id: w.msgID, + }, } msg.Set(meta, rm, nowNanos) w.acks.add(meta, msg) @@ -318,6 +320,7 @@ func (w *messageWriterImpl) write( m *message, ) error { m.IncReads() + m.SetSentAt(w.nowFn().UnixNano()) msg, isValid := m.Marshaler() if !isValid { m.DecReads() @@ -726,25 +729,25 @@ func (w *messageWriterImpl) close(m *message) { type acks struct { sync.Mutex - ackMap map[metadata]*message + ackMap map[metadataKey]*message } // nolint: unparam func newAckHelper(size int) *acks { return &acks{ - ackMap: make(map[metadata]*message, size), + ackMap: make(map[metadataKey]*message, size), } } func (a *acks) add(meta metadata, m *message) { a.Lock() - a.ackMap[meta] = m + a.ackMap[meta.metadataKey] = m a.Unlock() } func (a *acks) remove(meta metadata) { a.Lock() - delete(a.ackMap, meta) + delete(a.ackMap, meta.metadataKey) a.Unlock() } @@ -752,13 +755,13 @@ func (a *acks) remove(meta metadata) { // processing time for lag calculations. func (a *acks) ack(meta metadata) (bool, int64) { a.Lock() - m, ok := a.ackMap[meta] + m, ok := a.ackMap[meta.metadataKey] if !ok { a.Unlock() // Acking a message that is already acked, which is ok. return false, 0 } - delete(a.ackMap, meta) + delete(a.ackMap, meta.metadataKey) a.Unlock() expectedProcessAtNanos := m.ExpectedProcessAtNanos() m.Ack() diff --git a/src/msg/producer/writer/message_writer_test.go b/src/msg/producer/writer/message_writer_test.go index c35c172640..64b65c759e 100644 --- a/src/msg/producer/writer/message_writer_test.go +++ b/src/msg/producer/writer/message_writer_test.go @@ -160,7 +160,7 @@ func TestMessageWriterWithPooling(t *testing.T) { require.Equal(t, 1, w.queue.Len()) mm2.EXPECT().Finalize(producer.Consumed) - w.Ack(metadata{shard: 200, id: 2}) + w.Ack(metadata{metadataKey: metadataKey{shard: 200, id: 2}}) require.True(t, isEmptyWithLock(w.acks)) for { w.RLock() @@ -243,7 +243,7 @@ func TestMessageWriterWithoutPooling(t *testing.T) { require.Equal(t, 1, w.queue.Len()) mm2.EXPECT().Finalize(producer.Consumed) - w.Ack(metadata{shard: 200, id: 2}) + w.Ack(metadata{metadataKey: metadataKey{shard: 200, id: 2}}) require.True(t, isEmptyWithLock(w.acks)) for { w.RLock() @@ -295,7 +295,7 @@ func TestMessageWriterRetryWithoutPooling(t *testing.T) { time.Sleep(100 * time.Millisecond) } - _, ok := w.acks.ackMap[metadata{shard: 200, id: 1}] + _, ok := w.acks.ackMap[metadataKey{shard: 200, id: 1}] require.True(t, ok) cw := newConsumerWriter(addr, a, opts, testConsumerWriterMetrics()) @@ -355,7 +355,7 @@ func TestMessageWriterRetryWithPooling(t *testing.T) { time.Sleep(100 * time.Millisecond) } - m1, ok := w.acks.ackMap[metadata{shard: 200, id: 1}] + m1, ok := w.acks.ackMap[metadataKey{shard: 200, id: 1}] require.True(t, ok) cw := newConsumerWriter(addr, a, opts, testConsumerWriterMetrics()) @@ -453,8 +453,10 @@ func TestMessageWriterCleanupAckedMessage(t *testing.T) { } acks := w.acks meta := metadata{ - id: 1, - shard: 200, + metadataKey: metadataKey{ + id: 1, + shard: 200, + }, } // The message will not be finalized because it's still being hold by another message writer. acks.ack(meta) diff --git a/src/msg/producer/writer/metadata.go b/src/msg/producer/writer/metadata.go index 500bc41239..e7d5ae6456 100644 --- a/src/msg/producer/writer/metadata.go +++ b/src/msg/producer/writer/metadata.go @@ -24,6 +24,12 @@ import "github.com/m3db/m3/src/msg/generated/proto/msgpb" // metadata is the metadata for a message. type metadata struct { + metadataKey + sentAtNanos uint64 +} + +// metadataKey uniquely identifies a metadata. +type metadataKey struct { shard uint64 id uint64 } @@ -31,11 +37,13 @@ type metadata struct { func (m metadata) ToProto(pb *msgpb.Metadata) { pb.Shard = m.shard pb.Id = m.id + pb.SentAtNanos = m.sentAtNanos } func (m *metadata) FromProto(pb msgpb.Metadata) { m.shard = pb.Shard m.id = pb.Id + m.sentAtNanos = pb.SentAtNanos } func newMetadataFromProto(pb msgpb.Metadata) metadata { diff --git a/src/x/time/time.go b/src/x/time/time.go index 2e7cbb323c..d56f710059 100644 --- a/src/x/time/time.go +++ b/src/x/time/time.go @@ -35,6 +35,12 @@ func Now() UnixNano { return ToUnixNano(time.Now()) } +// Since returns the time elapsed since t. +// It is shorthand for time.Now().Sub(t). +func Since(t UnixNano) time.Duration { + return Now().Sub(t) +} + // ToNormalizedTime returns the normalized units of time given a time unit. func ToNormalizedTime(t time.Time, u time.Duration) int64 { return t.UnixNano() / u.Nanoseconds()