From 80c9f9ea9b3a8918f260c01ba899834ddd2bb0eb Mon Sep 17 00:00:00 2001 From: Ryan Hall Date: Mon, 28 Sep 2020 09:39:19 -0700 Subject: [PATCH] Add queue processing metrics (#2671) * Add queue processing metrics I found these metrics useful when tracking down an issue with the queue processor. Additionally, added an optional consumer label for the message writer metrics so you can easily tell which downstream consumer is causing issues. Unfortunately, setting the consumer label is a little awkward since it only makes since for the replicated shard writer, which only has a single consumer instance at one time. For the shared shard writer, the empty string is used. --- src/msg/producer/writer/message_writer.go | 133 ++++++++++++++---- .../producer/writer/message_writer_test.go | 13 +- src/msg/producer/writer/shard_writer.go | 4 + 3 files changed, 122 insertions(+), 28 deletions(-) diff --git a/src/msg/producer/writer/message_writer.go b/src/msg/producer/writer/message_writer.go index 37ee1168a9..8e09b9a8b2 100644 --- a/src/msg/producer/writer/message_writer.go +++ b/src/msg/producer/writer/message_writer.go @@ -62,6 +62,14 @@ type messageWriter interface { // RemoveConsumerWriter removes the consumer writer for the given address. RemoveConsumerWriter(addr string) + // Metrics returns the metrics + Metrics() messageWriterMetrics + + // SetMetrics sets the metrics + // + // This allows changing the labels of the metrics when the downstream consumer instance changes. + SetMetrics(m messageWriterMetrics) + // ReplicatedShardID returns the replicated shard id. ReplicatedShardID() uint64 @@ -88,6 +96,8 @@ type messageWriter interface { } type messageWriterMetrics struct { + scope tally.Scope + opts instrument.TimerOptions writeSuccess tally.Counter oneConsumerWriteError tally.Counter allConsumersWriteError tally.Counter @@ -103,40 +113,83 @@ type messageWriterMetrics struct { messageWriteDelay tally.Timer scanBatchLatency tally.Timer scanTotalLatency tally.Timer + enqueuedMessages tally.Counter + dequeuedMessages tally.Counter + processedWrite tally.Counter + processedClosed tally.Counter + processedNotReady tally.Counter + processedTTL tally.Counter + processedAck tally.Counter + processedDrop tally.Counter +} + +func (m messageWriterMetrics) withConsumer(consumer string) messageWriterMetrics { + return newMessageWriterMetricsWithConsumer(m.scope, m.opts, consumer) } func newMessageWriterMetrics( scope tally.Scope, opts instrument.TimerOptions, ) messageWriterMetrics { + return newMessageWriterMetricsWithConsumer(scope, opts, "unknown") +} + +func newMessageWriterMetricsWithConsumer( + scope tally.Scope, + opts instrument.TimerOptions, + consumer string, +) messageWriterMetrics { + consumerScope := scope.Tagged(map[string]string{"consumer" : consumer}) return messageWriterMetrics{ - writeSuccess: scope.Counter("write-success"), + scope: scope, + opts: opts, + writeSuccess: consumerScope.Counter("write-success"), oneConsumerWriteError: scope.Counter("write-error-one-consumer"), - allConsumersWriteError: scope. + allConsumersWriteError: consumerScope. Tagged(map[string]string{"error-type": "all-consumers"}). Counter("write-error"), - noWritersError: scope. + noWritersError: consumerScope. Tagged(map[string]string{"error-type": "no-writers"}). Counter("write-error"), - writeAfterCutoff: scope. + writeAfterCutoff: consumerScope. Tagged(map[string]string{"reason": "after-cutoff"}). Counter("invalid-write"), - writeBeforeCutover: scope. + writeBeforeCutover: consumerScope. Tagged(map[string]string{"reason": "before-cutover"}). Counter("invalid-write"), - messageAcked: scope.Counter("message-acked"), - messageClosed: scope.Counter("message-closed"), - messageDroppedBufferFull: scope.Tagged( + messageAcked: consumerScope.Counter("message-acked"), + messageClosed: consumerScope.Counter("message-closed"), + messageDroppedBufferFull: consumerScope.Tagged( map[string]string{"reason": "buffer-full"}, ).Counter("message-dropped"), - messageDroppedTTLExpire: scope.Tagged( + messageDroppedTTLExpire: consumerScope.Tagged( map[string]string{"reason": "ttl-expire"}, ).Counter("message-dropped"), - messageRetry: scope.Counter("message-retry"), - messageConsumeLatency: instrument.NewTimer(scope, "message-consume-latency", opts), - messageWriteDelay: instrument.NewTimer(scope, "message-write-delay", opts), - scanBatchLatency: instrument.NewTimer(scope, "scan-batch-latency", opts), - scanTotalLatency: instrument.NewTimer(scope, "scan-total-latency", opts), + messageRetry: consumerScope.Counter("message-retry"), + messageConsumeLatency: instrument.NewTimer(consumerScope, "message-consume-latency", opts), + messageWriteDelay: instrument.NewTimer(consumerScope, "message-write-delay", opts), + scanBatchLatency: instrument.NewTimer(consumerScope, "scan-batch-latency", opts), + scanTotalLatency: instrument.NewTimer(consumerScope, "scan-total-latency", opts), + enqueuedMessages: consumerScope.Counter("message-enqueue"), + dequeuedMessages: consumerScope.Counter("message-dequeue"), + processedWrite: consumerScope. + Tagged(map[string]string{"result": "write"}). + Counter("message-processed"), + processedClosed: consumerScope. + Tagged(map[string]string{"result": "closed"}). + Counter("message-processed"), + processedNotReady: consumerScope. + Tagged(map[string]string{"result": "retry"}). + Counter("message-processed"), + processedTTL: consumerScope. + Tagged(map[string]string{"result": "ttl"}). + Counter("message-processed"), + processedAck: consumerScope. + Tagged(map[string]string{"result": "ack"}). + Counter("message-processed"), + processedDrop: consumerScope. + Tagged(map[string]string{"result": "drop"}). + Counter("message-processed"), } } @@ -163,7 +216,8 @@ type messageWriterImpl struct { isClosed bool doneCh chan struct{} wg sync.WaitGroup - m messageWriterMetrics + // metrics can be updated when a consumer instance changes, so must be guarded with RLock + m *messageWriterMetrics nextFullScan time.Time lastNewWrite *list.Element @@ -196,7 +250,7 @@ func newMessageWriter( msgsToWrite: make([]*message, 0, opts.MessageQueueScanBatchSize()), isClosed: false, doneCh: make(chan struct{}), - m: m, + m: &m, nowFn: nowFn, } } @@ -221,6 +275,7 @@ func (w *messageWriterImpl) Write(rm *producer.RefCountedMessage) { msg.Set(meta, rm, nowNanos) w.acks.add(meta, msg) // Make sure all the new writes are ordered in queue. + w.m.enqueuedMessages.Inc(1) if w.lastNewWrite != nil { w.lastNewWrite = w.queue.InsertAfter(msg, w.lastNewWrite) } else { @@ -244,6 +299,7 @@ func (w *messageWriterImpl) isValidWriteWithLock(nowNanos int64) bool { func (w *messageWriterImpl) write( iterationIndexes []int, consumerWriters []consumerWriter, + metrics *messageWriterMetrics, m *message, ) error { m.IncReads() @@ -267,18 +323,18 @@ func (w *messageWriterImpl) write( for i := len(iterationIndexes) - 1; i >= 0; i-- { consumerWriter := consumerWriters[randIndex(iterationIndexes, i)] if err := consumerWriter.Write(connIndex, w.encoder.Bytes()); err != nil { - w.m.oneConsumerWriteError.Inc(1) + metrics.oneConsumerWriteError.Inc(1) continue } written = true - w.m.writeSuccess.Inc(1) + metrics.writeSuccess.Inc(1) break } if written { return nil } // Could not be written to any consumer, will retry later. - w.m.allConsumersWriteError.Inc(1) + metrics.allConsumersWriteError.Inc(1) return errFailAllConsumers } @@ -305,6 +361,8 @@ func (w *messageWriterImpl) nextRetryNanos(writeTimes int, nowNanos int64) int64 func (w *messageWriterImpl) Ack(meta metadata) bool { acked, initNanos := w.acks.ack(meta) if acked { + w.RLock() + defer w.RUnlock() w.m.messageConsumeLatency.Record(time.Duration(w.nowFn().UnixNano() - initNanos)) w.m.messageAcked.Inc(1) return true @@ -346,6 +404,7 @@ func (w *messageWriterImpl) scanMessageQueue() { e := w.queue.Front() w.lastNewWrite = nil isClosed := w.isClosed + m := w.m w.RUnlock() var ( msgsToWrite []*message @@ -365,24 +424,24 @@ func (w *messageWriterImpl) scanMessageQueue() { iterationIndexes = w.iterationIndexes w.Unlock() if !fullScan && len(msgsToWrite) == 0 { - w.m.scanBatchLatency.Record(w.nowFn().Sub(beforeBatch)) + m.scanBatchLatency.Record(w.nowFn().Sub(beforeBatch)) // If this is not a full scan, abort after the iteration batch // that no new messages were found. break } if skipWrites { - w.m.scanBatchLatency.Record(w.nowFn().Sub(beforeBatch)) + m.scanBatchLatency.Record(w.nowFn().Sub(beforeBatch)) continue } - if err := w.writeBatch(iterationIndexes, consumerWriters, msgsToWrite); err != nil { + if err := w.writeBatch(iterationIndexes, consumerWriters, m, msgsToWrite); err != nil { // When we can't write to any consumer writer, skip the writes in this scan // to avoid meaningless attempts but continue to clean up the queue. skipWrites = true } - w.m.scanBatchLatency.Record(w.nowFn().Sub(beforeBatch)) + m.scanBatchLatency.Record(w.nowFn().Sub(beforeBatch)) } afterScan := w.nowFn() - w.m.scanTotalLatency.Record(afterScan.Sub(beforeScan)) + m.scanTotalLatency.Record(afterScan.Sub(beforeScan)) if fullScan { w.nextFullScan = afterScan.Add(w.opts.MessageQueueFullScanInterval()) } @@ -391,18 +450,19 @@ func (w *messageWriterImpl) scanMessageQueue() { func (w *messageWriterImpl) writeBatch( iterationIndexes []int, consumerWriters []consumerWriter, + metrics *messageWriterMetrics, messages []*message, ) error { if len(consumerWriters) == 0 { // Not expected in a healthy/valid placement. - w.m.noWritersError.Inc(int64(len(messages))) + metrics.noWritersError.Inc(int64(len(messages))) return errNoWriters } for _, m := range messages { - if err := w.write(iterationIndexes, consumerWriters, m); err != nil { + if err := w.write(iterationIndexes, consumerWriters, metrics, m); err != nil { return err } - w.m.messageWriteDelay.Record(time.Duration(w.nowFn().UnixNano() - m.InitNanos())) + metrics.messageWriteDelay.Record(time.Duration(w.nowFn().UnixNano() - m.InitNanos())) } return nil } @@ -429,6 +489,7 @@ func (w *messageWriterImpl) scanBatchWithLock( next = e.Next() m := e.Value.(*message) if w.isClosed { + w.m.processedClosed.Inc(1) // Simply ack the messages here to mark them as consumed for this // message writer, this is useful when user removes a consumer service // during runtime that may be unhealthy to consume the messages. @@ -441,6 +502,7 @@ func (w *messageWriterImpl) scanBatchWithLock( continue } if m.RetryAtNanos() >= nowNanos { + w.m.processedNotReady.Inc(1) if !fullScan { // If this is not a full scan, bail after the first element that // is not a new write. @@ -451,6 +513,7 @@ func (w *messageWriterImpl) scanBatchWithLock( // If the message exceeded its allowed ttl of the consumer service, // remove it from the buffer. if w.messageTTLNanos > 0 && m.InitNanos()+w.messageTTLNanos <= nowNanos { + w.m.processedTTL.Inc(1) // There is a chance the message was acked right before the ack is // called, in which case just remove it from the queue. if acked, _ := w.acks.ack(m.Metadata()); acked { @@ -460,10 +523,12 @@ func (w *messageWriterImpl) scanBatchWithLock( continue } if m.IsAcked() { + w.m.processedAck.Inc(1) w.removeFromQueueWithLock(e, m) continue } if m.IsDroppedOrConsumed() { + w.m.processedDrop.Inc(1) // There is a chance the message could be acked between m.Acked() // and m.IsDroppedOrConsumed() check, in which case we should not // mark it as dropped, just continue and next tick will remove it @@ -482,6 +547,7 @@ func (w *messageWriterImpl) scanBatchWithLock( if writeTimes > 1 { w.m.messageRetry.Inc(1) } + w.m.processedWrite.Inc(1) w.msgsToWrite = append(w.msgsToWrite, m) } return next, w.msgsToWrite @@ -599,6 +665,18 @@ func (w *messageWriterImpl) RemoveConsumerWriter(addr string) { w.Unlock() } +func (w *messageWriterImpl) Metrics() messageWriterMetrics { + w.RLock() + defer w.RUnlock() + return *w.m +} + +func (w *messageWriterImpl) SetMetrics(m messageWriterMetrics) { + w.Lock() + w.m = &m + w.Unlock() +} + func (w *messageWriterImpl) QueueSize() int { return w.acks.size() } @@ -612,6 +690,7 @@ func (w *messageWriterImpl) newMessage() *message { func (w *messageWriterImpl) removeFromQueueWithLock(e *list.Element, m *message) { w.queue.Remove(e) + w.m.dequeuedMessages.Inc(1) w.close(m) } diff --git a/src/msg/producer/writer/message_writer_test.go b/src/msg/producer/writer/message_writer_test.go index 053fc4652b..05d62f2cda 100644 --- a/src/msg/producer/writer/message_writer_test.go +++ b/src/msg/producer/writer/message_writer_test.go @@ -799,7 +799,9 @@ func TestMessageWriterQueueFullScanOnWriteErrors(t *testing.T) { defer ctrl.Finish() opts := testOptions().SetMessageQueueScanBatchSize(1) - w := newMessageWriter(200, nil, opts, testMessageWriterMetrics()).(*messageWriterImpl) + scope := tally.NewTestScope("", nil) + metrics := testMessageWriterMetricsWithScope(scope).withConsumer("c1") + w := newMessageWriter(200, nil, opts, metrics).(*messageWriterImpl) w.AddConsumerWriter(newConsumerWriter("bad", nil, opts, testConsumerWriterMetrics())) mm1 := producer.NewMockMessage(ctrl) @@ -820,6 +822,11 @@ func TestMessageWriterQueueFullScanOnWriteErrors(t *testing.T) { rm1.Drop() w.scanMessageQueue() require.Equal(t, 1, w.queue.Len()) + + snapshot := scope.Snapshot() + counters := snapshot.Counters() + require.Equal(t, int64(1), counters["message-processed+consumer=c1,result=write"].Value()) + require.Equal(t, int64(1), counters["message-processed+consumer=c1,result=drop"].Value()) } func isEmptyWithLock(h *acks) bool { @@ -838,6 +845,10 @@ func testMessageWriterMetrics() messageWriterMetrics { return newMessageWriterMetrics(tally.NoopScope, instrument.TimerOptions{}) } +func testMessageWriterMetricsWithScope(scope tally.TestScope) messageWriterMetrics { + return newMessageWriterMetrics(scope, instrument.TimerOptions{}) +} + func validateMessages(t *testing.T, msgs []*producer.RefCountedMessage, w *messageWriterImpl) { w.RLock() idx := 0 diff --git a/src/msg/producer/writer/shard_writer.go b/src/msg/producer/writer/shard_writer.go index 62734b3a00..77384a32cc 100644 --- a/src/msg/producer/writer/shard_writer.go +++ b/src/msg/producer/writer/shard_writer.go @@ -203,6 +203,9 @@ func (w *replicatedShardWriter) UpdateInstances( if instance, cw, ok := anyKeyValueInMap(toBeAdded); ok { mw.AddConsumerWriter(cw) mw.RemoveConsumerWriter(id) + // a replicated writer only has a single downstream consumer instance at a time so we can update the + // metrics with a useful consumer label. + mw.SetMetrics(mw.Metrics().withConsumer(instance.ID())) w.updateCutoverCutoffNanos(mw, instance) newMessageWriters[instance.Endpoint()] = mw delete(toBeAdded, instance) @@ -218,6 +221,7 @@ func (w *replicatedShardWriter) UpdateInstances( w.replicaID++ mw := newMessageWriter(replicatedShardID, w.mPool, w.opts, w.m) mw.AddConsumerWriter(cw) + mw.SetMetrics(mw.Metrics().withConsumer(instance.ID())) w.updateCutoverCutoffNanos(mw, instance) mw.Init() w.ackRouter.Register(replicatedShardID, mw)