diff --git a/src/msg/producer/writer/message_writer.go b/src/msg/producer/writer/message_writer.go index 37ee1168a9..8e09b9a8b2 100644 --- a/src/msg/producer/writer/message_writer.go +++ b/src/msg/producer/writer/message_writer.go @@ -62,6 +62,14 @@ type messageWriter interface { // RemoveConsumerWriter removes the consumer writer for the given address. RemoveConsumerWriter(addr string) + // Metrics returns the metrics + Metrics() messageWriterMetrics + + // SetMetrics sets the metrics + // + // This allows changing the labels of the metrics when the downstream consumer instance changes. + SetMetrics(m messageWriterMetrics) + // ReplicatedShardID returns the replicated shard id. ReplicatedShardID() uint64 @@ -88,6 +96,8 @@ type messageWriter interface { } type messageWriterMetrics struct { + scope tally.Scope + opts instrument.TimerOptions writeSuccess tally.Counter oneConsumerWriteError tally.Counter allConsumersWriteError tally.Counter @@ -103,40 +113,83 @@ type messageWriterMetrics struct { messageWriteDelay tally.Timer scanBatchLatency tally.Timer scanTotalLatency tally.Timer + enqueuedMessages tally.Counter + dequeuedMessages tally.Counter + processedWrite tally.Counter + processedClosed tally.Counter + processedNotReady tally.Counter + processedTTL tally.Counter + processedAck tally.Counter + processedDrop tally.Counter +} + +func (m messageWriterMetrics) withConsumer(consumer string) messageWriterMetrics { + return newMessageWriterMetricsWithConsumer(m.scope, m.opts, consumer) } func newMessageWriterMetrics( scope tally.Scope, opts instrument.TimerOptions, ) messageWriterMetrics { + return newMessageWriterMetricsWithConsumer(scope, opts, "unknown") +} + +func newMessageWriterMetricsWithConsumer( + scope tally.Scope, + opts instrument.TimerOptions, + consumer string, +) messageWriterMetrics { + consumerScope := scope.Tagged(map[string]string{"consumer" : consumer}) return messageWriterMetrics{ - writeSuccess: scope.Counter("write-success"), + scope: scope, + opts: opts, + writeSuccess: consumerScope.Counter("write-success"), oneConsumerWriteError: scope.Counter("write-error-one-consumer"), - allConsumersWriteError: scope. + allConsumersWriteError: consumerScope. Tagged(map[string]string{"error-type": "all-consumers"}). Counter("write-error"), - noWritersError: scope. + noWritersError: consumerScope. Tagged(map[string]string{"error-type": "no-writers"}). Counter("write-error"), - writeAfterCutoff: scope. + writeAfterCutoff: consumerScope. Tagged(map[string]string{"reason": "after-cutoff"}). Counter("invalid-write"), - writeBeforeCutover: scope. + writeBeforeCutover: consumerScope. Tagged(map[string]string{"reason": "before-cutover"}). Counter("invalid-write"), - messageAcked: scope.Counter("message-acked"), - messageClosed: scope.Counter("message-closed"), - messageDroppedBufferFull: scope.Tagged( + messageAcked: consumerScope.Counter("message-acked"), + messageClosed: consumerScope.Counter("message-closed"), + messageDroppedBufferFull: consumerScope.Tagged( map[string]string{"reason": "buffer-full"}, ).Counter("message-dropped"), - messageDroppedTTLExpire: scope.Tagged( + messageDroppedTTLExpire: consumerScope.Tagged( map[string]string{"reason": "ttl-expire"}, ).Counter("message-dropped"), - messageRetry: scope.Counter("message-retry"), - messageConsumeLatency: instrument.NewTimer(scope, "message-consume-latency", opts), - messageWriteDelay: instrument.NewTimer(scope, "message-write-delay", opts), - scanBatchLatency: instrument.NewTimer(scope, "scan-batch-latency", opts), - scanTotalLatency: instrument.NewTimer(scope, "scan-total-latency", opts), + messageRetry: consumerScope.Counter("message-retry"), + messageConsumeLatency: instrument.NewTimer(consumerScope, "message-consume-latency", opts), + messageWriteDelay: instrument.NewTimer(consumerScope, "message-write-delay", opts), + scanBatchLatency: instrument.NewTimer(consumerScope, "scan-batch-latency", opts), + scanTotalLatency: instrument.NewTimer(consumerScope, "scan-total-latency", opts), + enqueuedMessages: consumerScope.Counter("message-enqueue"), + dequeuedMessages: consumerScope.Counter("message-dequeue"), + processedWrite: consumerScope. + Tagged(map[string]string{"result": "write"}). + Counter("message-processed"), + processedClosed: consumerScope. + Tagged(map[string]string{"result": "closed"}). + Counter("message-processed"), + processedNotReady: consumerScope. + Tagged(map[string]string{"result": "retry"}). + Counter("message-processed"), + processedTTL: consumerScope. + Tagged(map[string]string{"result": "ttl"}). + Counter("message-processed"), + processedAck: consumerScope. + Tagged(map[string]string{"result": "ack"}). + Counter("message-processed"), + processedDrop: consumerScope. + Tagged(map[string]string{"result": "drop"}). + Counter("message-processed"), } } @@ -163,7 +216,8 @@ type messageWriterImpl struct { isClosed bool doneCh chan struct{} wg sync.WaitGroup - m messageWriterMetrics + // metrics can be updated when a consumer instance changes, so must be guarded with RLock + m *messageWriterMetrics nextFullScan time.Time lastNewWrite *list.Element @@ -196,7 +250,7 @@ func newMessageWriter( msgsToWrite: make([]*message, 0, opts.MessageQueueScanBatchSize()), isClosed: false, doneCh: make(chan struct{}), - m: m, + m: &m, nowFn: nowFn, } } @@ -221,6 +275,7 @@ func (w *messageWriterImpl) Write(rm *producer.RefCountedMessage) { msg.Set(meta, rm, nowNanos) w.acks.add(meta, msg) // Make sure all the new writes are ordered in queue. + w.m.enqueuedMessages.Inc(1) if w.lastNewWrite != nil { w.lastNewWrite = w.queue.InsertAfter(msg, w.lastNewWrite) } else { @@ -244,6 +299,7 @@ func (w *messageWriterImpl) isValidWriteWithLock(nowNanos int64) bool { func (w *messageWriterImpl) write( iterationIndexes []int, consumerWriters []consumerWriter, + metrics *messageWriterMetrics, m *message, ) error { m.IncReads() @@ -267,18 +323,18 @@ func (w *messageWriterImpl) write( for i := len(iterationIndexes) - 1; i >= 0; i-- { consumerWriter := consumerWriters[randIndex(iterationIndexes, i)] if err := consumerWriter.Write(connIndex, w.encoder.Bytes()); err != nil { - w.m.oneConsumerWriteError.Inc(1) + metrics.oneConsumerWriteError.Inc(1) continue } written = true - w.m.writeSuccess.Inc(1) + metrics.writeSuccess.Inc(1) break } if written { return nil } // Could not be written to any consumer, will retry later. - w.m.allConsumersWriteError.Inc(1) + metrics.allConsumersWriteError.Inc(1) return errFailAllConsumers } @@ -305,6 +361,8 @@ func (w *messageWriterImpl) nextRetryNanos(writeTimes int, nowNanos int64) int64 func (w *messageWriterImpl) Ack(meta metadata) bool { acked, initNanos := w.acks.ack(meta) if acked { + w.RLock() + defer w.RUnlock() w.m.messageConsumeLatency.Record(time.Duration(w.nowFn().UnixNano() - initNanos)) w.m.messageAcked.Inc(1) return true @@ -346,6 +404,7 @@ func (w *messageWriterImpl) scanMessageQueue() { e := w.queue.Front() w.lastNewWrite = nil isClosed := w.isClosed + m := w.m w.RUnlock() var ( msgsToWrite []*message @@ -365,24 +424,24 @@ func (w *messageWriterImpl) scanMessageQueue() { iterationIndexes = w.iterationIndexes w.Unlock() if !fullScan && len(msgsToWrite) == 0 { - w.m.scanBatchLatency.Record(w.nowFn().Sub(beforeBatch)) + m.scanBatchLatency.Record(w.nowFn().Sub(beforeBatch)) // If this is not a full scan, abort after the iteration batch // that no new messages were found. break } if skipWrites { - w.m.scanBatchLatency.Record(w.nowFn().Sub(beforeBatch)) + m.scanBatchLatency.Record(w.nowFn().Sub(beforeBatch)) continue } - if err := w.writeBatch(iterationIndexes, consumerWriters, msgsToWrite); err != nil { + if err := w.writeBatch(iterationIndexes, consumerWriters, m, msgsToWrite); err != nil { // When we can't write to any consumer writer, skip the writes in this scan // to avoid meaningless attempts but continue to clean up the queue. skipWrites = true } - w.m.scanBatchLatency.Record(w.nowFn().Sub(beforeBatch)) + m.scanBatchLatency.Record(w.nowFn().Sub(beforeBatch)) } afterScan := w.nowFn() - w.m.scanTotalLatency.Record(afterScan.Sub(beforeScan)) + m.scanTotalLatency.Record(afterScan.Sub(beforeScan)) if fullScan { w.nextFullScan = afterScan.Add(w.opts.MessageQueueFullScanInterval()) } @@ -391,18 +450,19 @@ func (w *messageWriterImpl) scanMessageQueue() { func (w *messageWriterImpl) writeBatch( iterationIndexes []int, consumerWriters []consumerWriter, + metrics *messageWriterMetrics, messages []*message, ) error { if len(consumerWriters) == 0 { // Not expected in a healthy/valid placement. - w.m.noWritersError.Inc(int64(len(messages))) + metrics.noWritersError.Inc(int64(len(messages))) return errNoWriters } for _, m := range messages { - if err := w.write(iterationIndexes, consumerWriters, m); err != nil { + if err := w.write(iterationIndexes, consumerWriters, metrics, m); err != nil { return err } - w.m.messageWriteDelay.Record(time.Duration(w.nowFn().UnixNano() - m.InitNanos())) + metrics.messageWriteDelay.Record(time.Duration(w.nowFn().UnixNano() - m.InitNanos())) } return nil } @@ -429,6 +489,7 @@ func (w *messageWriterImpl) scanBatchWithLock( next = e.Next() m := e.Value.(*message) if w.isClosed { + w.m.processedClosed.Inc(1) // Simply ack the messages here to mark them as consumed for this // message writer, this is useful when user removes a consumer service // during runtime that may be unhealthy to consume the messages. @@ -441,6 +502,7 @@ func (w *messageWriterImpl) scanBatchWithLock( continue } if m.RetryAtNanos() >= nowNanos { + w.m.processedNotReady.Inc(1) if !fullScan { // If this is not a full scan, bail after the first element that // is not a new write. @@ -451,6 +513,7 @@ func (w *messageWriterImpl) scanBatchWithLock( // If the message exceeded its allowed ttl of the consumer service, // remove it from the buffer. if w.messageTTLNanos > 0 && m.InitNanos()+w.messageTTLNanos <= nowNanos { + w.m.processedTTL.Inc(1) // There is a chance the message was acked right before the ack is // called, in which case just remove it from the queue. if acked, _ := w.acks.ack(m.Metadata()); acked { @@ -460,10 +523,12 @@ func (w *messageWriterImpl) scanBatchWithLock( continue } if m.IsAcked() { + w.m.processedAck.Inc(1) w.removeFromQueueWithLock(e, m) continue } if m.IsDroppedOrConsumed() { + w.m.processedDrop.Inc(1) // There is a chance the message could be acked between m.Acked() // and m.IsDroppedOrConsumed() check, in which case we should not // mark it as dropped, just continue and next tick will remove it @@ -482,6 +547,7 @@ func (w *messageWriterImpl) scanBatchWithLock( if writeTimes > 1 { w.m.messageRetry.Inc(1) } + w.m.processedWrite.Inc(1) w.msgsToWrite = append(w.msgsToWrite, m) } return next, w.msgsToWrite @@ -599,6 +665,18 @@ func (w *messageWriterImpl) RemoveConsumerWriter(addr string) { w.Unlock() } +func (w *messageWriterImpl) Metrics() messageWriterMetrics { + w.RLock() + defer w.RUnlock() + return *w.m +} + +func (w *messageWriterImpl) SetMetrics(m messageWriterMetrics) { + w.Lock() + w.m = &m + w.Unlock() +} + func (w *messageWriterImpl) QueueSize() int { return w.acks.size() } @@ -612,6 +690,7 @@ func (w *messageWriterImpl) newMessage() *message { func (w *messageWriterImpl) removeFromQueueWithLock(e *list.Element, m *message) { w.queue.Remove(e) + w.m.dequeuedMessages.Inc(1) w.close(m) } diff --git a/src/msg/producer/writer/message_writer_test.go b/src/msg/producer/writer/message_writer_test.go index 053fc4652b..05d62f2cda 100644 --- a/src/msg/producer/writer/message_writer_test.go +++ b/src/msg/producer/writer/message_writer_test.go @@ -799,7 +799,9 @@ func TestMessageWriterQueueFullScanOnWriteErrors(t *testing.T) { defer ctrl.Finish() opts := testOptions().SetMessageQueueScanBatchSize(1) - w := newMessageWriter(200, nil, opts, testMessageWriterMetrics()).(*messageWriterImpl) + scope := tally.NewTestScope("", nil) + metrics := testMessageWriterMetricsWithScope(scope).withConsumer("c1") + w := newMessageWriter(200, nil, opts, metrics).(*messageWriterImpl) w.AddConsumerWriter(newConsumerWriter("bad", nil, opts, testConsumerWriterMetrics())) mm1 := producer.NewMockMessage(ctrl) @@ -820,6 +822,11 @@ func TestMessageWriterQueueFullScanOnWriteErrors(t *testing.T) { rm1.Drop() w.scanMessageQueue() require.Equal(t, 1, w.queue.Len()) + + snapshot := scope.Snapshot() + counters := snapshot.Counters() + require.Equal(t, int64(1), counters["message-processed+consumer=c1,result=write"].Value()) + require.Equal(t, int64(1), counters["message-processed+consumer=c1,result=drop"].Value()) } func isEmptyWithLock(h *acks) bool { @@ -838,6 +845,10 @@ func testMessageWriterMetrics() messageWriterMetrics { return newMessageWriterMetrics(tally.NoopScope, instrument.TimerOptions{}) } +func testMessageWriterMetricsWithScope(scope tally.TestScope) messageWriterMetrics { + return newMessageWriterMetrics(scope, instrument.TimerOptions{}) +} + func validateMessages(t *testing.T, msgs []*producer.RefCountedMessage, w *messageWriterImpl) { w.RLock() idx := 0 diff --git a/src/msg/producer/writer/shard_writer.go b/src/msg/producer/writer/shard_writer.go index 62734b3a00..77384a32cc 100644 --- a/src/msg/producer/writer/shard_writer.go +++ b/src/msg/producer/writer/shard_writer.go @@ -203,6 +203,9 @@ func (w *replicatedShardWriter) UpdateInstances( if instance, cw, ok := anyKeyValueInMap(toBeAdded); ok { mw.AddConsumerWriter(cw) mw.RemoveConsumerWriter(id) + // a replicated writer only has a single downstream consumer instance at a time so we can update the + // metrics with a useful consumer label. + mw.SetMetrics(mw.Metrics().withConsumer(instance.ID())) w.updateCutoverCutoffNanos(mw, instance) newMessageWriters[instance.Endpoint()] = mw delete(toBeAdded, instance) @@ -218,6 +221,7 @@ func (w *replicatedShardWriter) UpdateInstances( w.replicaID++ mw := newMessageWriter(replicatedShardID, w.mPool, w.opts, w.m) mw.AddConsumerWriter(cw) + mw.SetMetrics(mw.Metrics().withConsumer(instance.ID())) w.updateCutoverCutoffNanos(mw, instance) mw.Init() w.ackRouter.Register(replicatedShardID, mw)