Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[m3msg] Add receive and handle latency to consumers #3920

Merged
merged 4 commits into from
Nov 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/msg/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/m3db/m3/src/msg/protocol/proto"
"github.com/m3db/m3/src/x/clock"
xio "github.com/m3db/m3/src/x/io"
xtime "github.com/m3db/m3/src/x/time"

"github.com/uber-go/tally"
)
Expand Down Expand Up @@ -75,6 +76,10 @@ type metrics struct {
ackSent tally.Counter
ackEncodeError tally.Counter
ackWriteError tally.Counter
// the duration between the producer sending the message and the consumer reading the message.
receiveLatency tally.Histogram
// the duration between the consumer reading the message and sending an ack to the producer.
handleLatency tally.Histogram
}

func newConsumerMetrics(scope tally.Scope) metrics {
Expand All @@ -84,6 +89,12 @@ func newConsumerMetrics(scope tally.Scope) metrics {
ackSent: scope.Counter("ack-sent"),
ackEncodeError: scope.Counter("ack-encode-error"),
ackWriteError: scope.Counter("ack-write-error"),
receiveLatency: scope.Histogram("receive-latency",
// 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.2s, 2.4s, 4.8s, 9.6s
tally.MustMakeExponentialDurationBuckets(time.Millisecond*10, 2, 11)),
handleLatency: scope.Histogram("handle-latency",
// 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.2s, 2.4s, 4.8s, 9.6s
tally.MustMakeExponentialDurationBuckets(time.Millisecond*10, 2, 11)),
}
}

Expand Down Expand Up @@ -156,6 +167,9 @@ func (c *consumer) Message() (Message, error) {
c.m.messageDecodeError.Inc(1)
return nil, err
}
if m.Metadata.SentAtNanos > 0 {
c.m.receiveLatency.RecordDuration(xtime.Since(xtime.UnixNano(m.Metadata.SentAtNanos)))
}
c.m.messageReceived.Inc(1)
return m, nil
}
Expand Down Expand Up @@ -270,6 +284,10 @@ func (m *message) ShardID() uint64 {
return m.Metadata.Shard
}

func (m *message) SentAtNanos() uint64 {
return m.Metadata.SentAtNanos
}

func resetProto(m *msgpb.Message) {
m.Metadata.Id = 0
m.Metadata.Shard = 0
Expand Down
14 changes: 14 additions & 0 deletions src/msg/consumer/consumer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/msg/consumer/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package consumer
import (
"io"
"net"
"time"

"github.com/m3db/m3/src/x/server"

Expand Down Expand Up @@ -60,7 +61,9 @@ func (h *messageHandler) Handle(conn net.Conn) {
if msgErr != nil {
break
}
start := time.Now()
c.process(msg)
h.m.handleLatency.RecordDuration(time.Since(start))
}
if msgErr != nil && msgErr != io.EOF {
h.opts.InstrumentOptions().Logger().With(zap.Error(msgErr)).Error("could not read message from consumer")
Expand Down
3 changes: 3 additions & 0 deletions src/msg/consumer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type Message interface {

// ShardID returns shard ID of the Message.
ShardID() uint64

// SentAtNanos returns when the producer sent the Message.
SentAtNanos() uint64
}

// Consumer receives messages from a connection.
Expand Down
60 changes: 48 additions & 12 deletions src/msg/generated/proto/msgpb/msg.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/msg/generated/proto/msgpb/msg.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "github.com/gogo/protobuf/gogoproto/gogo.proto";
message Metadata {
uint64 shard = 1;
uint64 id = 2;
uint64 sentAtNanos = 3;
}

message Message {
Expand Down
6 changes: 4 additions & 2 deletions src/msg/integration/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/zap"
yaml "gopkg.in/yaml.v2"
"gopkg.in/yaml.v2"
)

const (
Expand Down Expand Up @@ -512,7 +512,9 @@ func (c *testConsumer) consumeAndAck(totalConsumed *atomic.Int64) {
consumer.Close()
return
}

if msg.SentAtNanos() <= 0 {
panic("sentAtNanos not set")
}
wp.Go(
func() {
c.Lock()
Expand Down
5 changes: 5 additions & 0 deletions src/msg/producer/writer/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ func (m *message) Metadata() metadata {
return m.meta
}

// SetSentAt sets the sentAtNanos on the metadata proto.
func (m *message) SetSentAt(nanos int64) {
m.pb.Metadata.SentAtNanos = uint64(nanos)
}

// Marshaler returns the marshaler and a bool to indicate whether the marshaler is valid.
func (m *message) Marshaler() (proto.Marshaler, bool) {
return &m.pb, !m.RefCountedMessage.IsDroppedOrConsumed()
Expand Down
19 changes: 11 additions & 8 deletions src/msg/producer/writer/message_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,10 @@ func (w *messageWriterImpl) Write(rm *producer.RefCountedMessage) {
rm.IncRef()
w.msgID++
meta := metadata{
shard: w.replicatedShardID,
id: w.msgID,
metadataKey: metadataKey{
shard: w.replicatedShardID,
id: w.msgID,
},
}
msg.Set(meta, rm, nowNanos)
w.acks.add(meta, msg)
Expand Down Expand Up @@ -318,6 +320,7 @@ func (w *messageWriterImpl) write(
m *message,
) error {
m.IncReads()
m.SetSentAt(w.nowFn().UnixNano())
msg, isValid := m.Marshaler()
if !isValid {
m.DecReads()
Expand Down Expand Up @@ -726,39 +729,39 @@ func (w *messageWriterImpl) close(m *message) {
type acks struct {
sync.Mutex

ackMap map[metadata]*message
ackMap map[metadataKey]*message
}

// nolint: unparam
func newAckHelper(size int) *acks {
return &acks{
ackMap: make(map[metadata]*message, size),
ackMap: make(map[metadataKey]*message, size),
}
}

func (a *acks) add(meta metadata, m *message) {
a.Lock()
a.ackMap[meta] = m
a.ackMap[meta.metadataKey] = m
a.Unlock()
}

func (a *acks) remove(meta metadata) {
a.Lock()
delete(a.ackMap, meta)
delete(a.ackMap, meta.metadataKey)
a.Unlock()
}

// ack processes the ack. returns true if the message was not already acked. additionally returns the expected
// processing time for lag calculations.
func (a *acks) ack(meta metadata) (bool, int64) {
a.Lock()
m, ok := a.ackMap[meta]
m, ok := a.ackMap[meta.metadataKey]
if !ok {
a.Unlock()
// Acking a message that is already acked, which is ok.
return false, 0
}
delete(a.ackMap, meta)
delete(a.ackMap, meta.metadataKey)
a.Unlock()
expectedProcessAtNanos := m.ExpectedProcessAtNanos()
m.Ack()
Expand Down
14 changes: 8 additions & 6 deletions src/msg/producer/writer/message_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestMessageWriterWithPooling(t *testing.T) {
require.Equal(t, 1, w.queue.Len())

mm2.EXPECT().Finalize(producer.Consumed)
w.Ack(metadata{shard: 200, id: 2})
w.Ack(metadata{metadataKey: metadataKey{shard: 200, id: 2}})
require.True(t, isEmptyWithLock(w.acks))
for {
w.RLock()
Expand Down Expand Up @@ -243,7 +243,7 @@ func TestMessageWriterWithoutPooling(t *testing.T) {
require.Equal(t, 1, w.queue.Len())

mm2.EXPECT().Finalize(producer.Consumed)
w.Ack(metadata{shard: 200, id: 2})
w.Ack(metadata{metadataKey: metadataKey{shard: 200, id: 2}})
require.True(t, isEmptyWithLock(w.acks))
for {
w.RLock()
Expand Down Expand Up @@ -295,7 +295,7 @@ func TestMessageWriterRetryWithoutPooling(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}

_, ok := w.acks.ackMap[metadata{shard: 200, id: 1}]
_, ok := w.acks.ackMap[metadataKey{shard: 200, id: 1}]
require.True(t, ok)

cw := newConsumerWriter(addr, a, opts, testConsumerWriterMetrics())
Expand Down Expand Up @@ -355,7 +355,7 @@ func TestMessageWriterRetryWithPooling(t *testing.T) {
time.Sleep(100 * time.Millisecond)
}

m1, ok := w.acks.ackMap[metadata{shard: 200, id: 1}]
m1, ok := w.acks.ackMap[metadataKey{shard: 200, id: 1}]
require.True(t, ok)

cw := newConsumerWriter(addr, a, opts, testConsumerWriterMetrics())
Expand Down Expand Up @@ -453,8 +453,10 @@ func TestMessageWriterCleanupAckedMessage(t *testing.T) {
}
acks := w.acks
meta := metadata{
id: 1,
shard: 200,
metadataKey: metadataKey{
id: 1,
shard: 200,
},
}
// The message will not be finalized because it's still being hold by another message writer.
acks.ack(meta)
Expand Down
8 changes: 8 additions & 0 deletions src/msg/producer/writer/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,26 @@ import "github.com/m3db/m3/src/msg/generated/proto/msgpb"

// metadata is the metadata for a message.
type metadata struct {
metadataKey
sentAtNanos uint64
}

// metadataKey uniquely identifies a metadata.
type metadataKey struct {
shard uint64
id uint64
}

func (m metadata) ToProto(pb *msgpb.Metadata) {
pb.Shard = m.shard
pb.Id = m.id
pb.SentAtNanos = m.sentAtNanos
}

func (m *metadata) FromProto(pb msgpb.Metadata) {
m.shard = pb.Shard
m.id = pb.Id
m.sentAtNanos = pb.SentAtNanos
}

func newMetadataFromProto(pb msgpb.Metadata) metadata {
Expand Down
Loading