From f666c5fdef9287e81fa908773af92fe137f8624d Mon Sep 17 00:00:00 2001 From: Kobe Yang Date: Thu, 20 Jul 2017 16:59:26 -0700 Subject: [PATCH] Add more logs when PutMessage latency is high --- services/inputhost/pubconnection.go | 30 ++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/services/inputhost/pubconnection.go b/services/inputhost/pubconnection.go index b7c4ef68..3fa79733 100644 --- a/services/inputhost/pubconnection.go +++ b/services/inputhost/pubconnection.go @@ -486,7 +486,10 @@ func (conn *pubConnection) failInflightMessages(inflightMessages map[string]resp common.TagDstPth: common.FmtDstPth(conn.destinationPath), common.TagInPutAckID: common.FmtInPutAckID(id), `duration`: d, - }).Debug(`failInflightMessages: publish message latency`) + `putMsgChanLen`: len(conn.putMsgCh), + `putMsgAckChanLen`: len(conn.ackChannel), + `replyChanLen`: len(conn.replyCh), + }).Error(`failInflightMessages: publish message latency`) } // Record the number of failed messages conn.pathCache.m3Client.IncCounter(metrics.PubConnectionStreamScope, metrics.InputhostMessageFailures) @@ -606,10 +609,14 @@ func (conn *pubConnection) updateInflightMap(inflightMessages map[string]respons conn.pathCache.destM3Client.RecordTimer(metrics.PubConnectionScope, metrics.InputhostDestWriteMessageBeforeAckLatency, ackReceiveTime.Sub(resp.putMsgRecvTime)) if d > timeLatencyToLog { - conn.logger. - WithField(common.TagDstPth, common.FmtDstPth(conn.destinationPath)). - WithField(common.TagInPutAckID, common.FmtInPutAckID(ackID)). - WithField(`d`, d).Info(`publish message latency at updateInflightMap`) + conn.logger.WithFields(bark.Fields{ + common.TagDstPth: common.FmtDstPth(conn.destinationPath), + common.TagInPutAckID: common.FmtInPutAckID(ackID), + `d`: d, + `putMsgChanLen`: len(conn.putMsgCh), + `putMsgAckChanLen`: len(conn.ackChannel), + `replyChanLen`: len(conn.replyCh), + }).Info(`publish message latency at updateInflightMap`) } delete(inflightMessages, ackID) return ok @@ -625,10 +632,15 @@ func (conn *pubConnection) updateEarlyReplyAcks(resCh response, earlyReplyAcks m ack, _ := earlyReplyAcks[resCh.ackID] actualDuration := d - time.Since(ack.ackSentTime) if d > timeLatencyToLog { - conn.logger. - WithField(common.TagDstPth, common.FmtDstPth(conn.destinationPath)). - WithField(common.TagInPutAckID, common.FmtInPutAckID(resCh.ackID)). - WithFields(bark.Fields{`d`: d, `actualDuration`: actualDuration}).Info(`publish message latency at updateEarlyReplyAcks and actualDuration`) + conn.logger.WithFields(bark.Fields{ + common.TagDstPth: common.FmtDstPth(conn.destinationPath), + common.TagInPutAckID: common.FmtInPutAckID(resCh.ackID), + `d`: d, + `actualDuration`: actualDuration, + `putMsgChanLen`: len(conn.putMsgCh), + `putMsgAckChanLen`: len(conn.ackChannel), + `replyChanLen`: len(conn.replyCh), + }).Info(`publish message latency at updateEarlyReplyAcks and actualDuration`) } conn.pathCache.m3Client.RecordTimer(metrics.PubConnectionStreamScope, metrics.InputhostWriteMessageLatency, actualDuration) conn.pathCache.destM3Client.RecordTimer(metrics.PubConnectionScope, metrics.InputhostDestWriteMessageLatency, actualDuration)