Skip to content

Commit

Permalink
[msg] Fix metrics (#2372)
Browse files Browse the repository at this point in the history
  • Loading branch information
cw9 authored Jun 1, 2020
1 parent c415cbc commit fb31c2e
Showing 1 changed file with 7 additions and 8 deletions.
15 changes: 7 additions & 8 deletions src/msg/producer/writer/message_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,9 @@ func (w *messageWriterImpl) nextRetryNanos(writeTimes int, nowNanos int64) int64
}

func (w *messageWriterImpl) Ack(meta metadata) bool {
// acked, initNanos := w.acks.ack(meta)
acked, _ := w.acks.ack(meta)
acked, initNanos := w.acks.ack(meta)
if acked {
// w.m.messageConsumeLatency.Record(time.Duration(w.nowFn().UnixNano() - initNanos))
w.m.messageConsumeLatency.Record(time.Duration(w.nowFn().UnixNano() - initNanos))
w.m.messageAcked.Inc(1)
return true
}
Expand Down Expand Up @@ -366,24 +365,24 @@ func (w *messageWriterImpl) scanMessageQueue() {
iterationIndexes = w.iterationIndexes
w.Unlock()
if !fullScan && len(msgsToWrite) == 0 {
// w.m.scanBatchLatency.Record(w.nowFn().Sub(beforeBatch))
w.m.scanBatchLatency.Record(w.nowFn().Sub(beforeBatch))
// If this is not a full scan, abort after the iteration batch
// that no new messages were found.
break
}
if skipWrites {
// w.m.scanBatchLatency.Record(w.nowFn().Sub(beforeBatch))
w.m.scanBatchLatency.Record(w.nowFn().Sub(beforeBatch))
continue
}
if err := w.writeBatch(iterationIndexes, consumerWriters, msgsToWrite); err != nil {
// When we can't write to any consumer writer, skip the writes in this scan
// to avoid meaningless attempts but continue to clean up the queue.
skipWrites = true
}
// w.m.scanBatchLatency.Record(w.nowFn().Sub(beforeBatch))
w.m.scanBatchLatency.Record(w.nowFn().Sub(beforeBatch))
}
afterScan := w.nowFn()
// w.m.scanTotalLatency.Record(afterScan.Sub(beforeScan))
w.m.scanTotalLatency.Record(afterScan.Sub(beforeScan))
if fullScan {
w.nextFullScan = afterScan.Add(w.opts.MessageQueueFullScanInterval())
}
Expand All @@ -403,7 +402,7 @@ func (w *messageWriterImpl) writeBatch(
if err := w.write(iterationIndexes, consumerWriters, m); err != nil {
return err
}
// w.m.messageWriteDelay.Record(time.Duration(w.nowFn().UnixNano() - m.InitNanos()))
w.m.messageWriteDelay.Record(time.Duration(w.nowFn().UnixNano() - m.InitNanos()))
}
return nil
}
Expand Down

0 comments on commit fb31c2e

Please sign in to comment.