Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[m3msg] Add receive and handle latency to consumers #3920

Merged
merged 4 commits into from
Nov 13, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/msg/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/m3db/m3/src/msg/protocol/proto"
"github.com/m3db/m3/src/x/clock"
xio "github.com/m3db/m3/src/x/io"
xtime "github.com/m3db/m3/src/x/time"

"github.com/uber-go/tally"
)
Expand Down Expand Up @@ -75,6 +76,10 @@ type metrics struct {
ackSent tally.Counter
ackEncodeError tally.Counter
ackWriteError tally.Counter
// the duration between the producer sending the message and the consumer reading the message.
receiveLatency tally.Histogram
// the duration between the consumer reading the message and sending an ack to the producer.
handleLatency tally.Histogram
}

func newConsumerMetrics(scope tally.Scope) metrics {
Expand All @@ -84,6 +89,12 @@ func newConsumerMetrics(scope tally.Scope) metrics {
ackSent: scope.Counter("ack-sent"),
ackEncodeError: scope.Counter("ack-encode-error"),
ackWriteError: scope.Counter("ack-write-error"),
receiveLatency: scope.Histogram("receive-latency",
// 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.2s, 2.4s, 4.8s, 9.6s
tally.MustMakeExponentialDurationBuckets(time.Millisecond*10, 2, 11)),
handleLatency: scope.Histogram("handle-latency",
// 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.2s, 2.4s, 4.8s, 9.6s
tally.MustMakeExponentialDurationBuckets(time.Millisecond*10, 2, 11)),
}
}

Expand Down Expand Up @@ -156,6 +167,9 @@ func (c *consumer) Message() (Message, error) {
c.m.messageDecodeError.Inc(1)
return nil, err
}
if m.Metadata.SentAtNanos > 0 {
c.m.receiveLatency.RecordDuration(xtime.Since(xtime.UnixNano(m.Metadata.SentAtNanos)))
}
c.m.messageReceived.Inc(1)
return m, nil
}
Expand Down
3 changes: 3 additions & 0 deletions src/msg/consumer/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package consumer
import (
"io"
"net"
"time"

"github.com/m3db/m3/src/x/server"

Expand Down Expand Up @@ -60,7 +61,9 @@ func (h *messageHandler) Handle(conn net.Conn) {
if msgErr != nil {
break
}
start := time.Now()
c.process(msg)
h.m.handleLatency.RecordDuration(time.Since(start))
}
if msgErr != nil && msgErr != io.EOF {
h.opts.InstrumentOptions().Logger().With(zap.Error(msgErr)).Error("could not read message from consumer")
Expand Down
60 changes: 48 additions & 12 deletions src/msg/generated/proto/msgpb/msg.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/msg/generated/proto/msgpb/msg.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "github.com/gogo/protobuf/gogoproto/gogo.proto";
message Metadata {
uint64 shard = 1;
uint64 id = 2;
uint64 sentAtNanos = 3;
}

message Message {
Expand Down
5 changes: 5 additions & 0 deletions src/msg/producer/writer/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ func (m *message) Metadata() metadata {
return m.meta
}

// SetSentAt sets the sentAtNanos on the metadata.
func (m *message) SetSentAt(nanos int64) {
m.meta.sentAtNanos = uint64(nanos)
}

// Marshaler returns the marshaler and a bool to indicate whether the marshaler is valid.
func (m *message) Marshaler() (proto.Marshaler, bool) {
return &m.pb, !m.RefCountedMessage.IsDroppedOrConsumed()
Expand Down
1 change: 1 addition & 0 deletions src/msg/producer/writer/message_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ func (w *messageWriterImpl) write(
m *message,
) error {
m.IncReads()
m.SetSentAt(w.nowFn().UnixNano())
msg, isValid := m.Marshaler()
if !isValid {
m.DecReads()
Expand Down
7 changes: 5 additions & 2 deletions src/msg/producer/writer/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,21 @@ import "github.com/m3db/m3/src/msg/generated/proto/msgpb"

// metadata is the metadata for a message.
type metadata struct {
shard uint64
id uint64
shard uint64
id uint64
sentAtNanos uint64
}

func (m metadata) ToProto(pb *msgpb.Metadata) {
pb.Shard = m.shard
pb.Id = m.id
pb.SentAtNanos = m.sentAtNanos
}

func (m *metadata) FromProto(pb msgpb.Metadata) {
m.shard = pb.Shard
m.id = pb.Id
m.sentAtNanos = pb.SentAtNanos
}

func newMetadataFromProto(pb msgpb.Metadata) metadata {
Expand Down
6 changes: 6 additions & 0 deletions src/x/time/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ func Now() UnixNano {
return ToUnixNano(time.Now())
}

// Since returns the time elapsed since t.
// It is shorthand for time.Now().Sub(t).
func Since(t UnixNano) time.Duration {
return Now().Sub(t)
}

// ToNormalizedTime returns the normalized units of time given a time unit.
func ToNormalizedTime(t time.Time, u time.Duration) int64 {
return t.UnixNano() / u.Nanoseconds()
Expand Down