diff --git a/client/cherami/connection.go b/client/cherami/connection.go index c929c67..b951cda 100644 --- a/client/cherami/connection.go +++ b/client/cherami/connection.go @@ -56,10 +56,15 @@ type ( logger bark.Logger reporter metrics.Reporter - lk sync.Mutex - opened int32 - closed int32 - drained int32 + lk sync.Mutex + opened int32 + + sentMsgs int64 // no: of messages sent to the inputhost + sentAcks int64 // no: of messages successfully acked + sentNacks int64 // no: of messages nacked (STATUS_FAILED) + sentThrottled int64 // no: of messages throttled + failedMsgs int64 // no: of failed messages *after* we successfully wrote to the stream + writeFailedMsgs int64 // no: of messages failed to even write to the stream } // This struct is created by writePump after writing message to stream. @@ -80,6 +85,8 @@ type ( const ( defaultMaxInflightMessages = 1000 defaultWGTimeout = time.Minute + defaultDrainTimeout = time.Minute + defaultCheckDrainTimeout = 5 * time.Second ) func newConnection(client cherami.TChanBIn, wsConnector WSConnector, path string, messages <-chan putMessageRequest, @@ -153,55 +160,32 @@ func (conn *connection) isShuttingDown() bool { } // stopWritePump should drain the pump after getting the lock -func (conn *connection) stopWritePump() { +func (conn *connection) stopWritePumpAndClose() { conn.lk.Lock() - conn.stopWritePumpWithLock() + conn.stopWritePumpWithLockAndClose() conn.lk.Unlock() } // Note: this needs to be called with the conn lock held! -func (conn *connection) stopWritePumpWithLock() { +func (conn *connection) stopWritePumpWithLockAndClose() { if !conn.isShuttingDown() { close(conn.shuttingDownCh) if ok := common.AwaitWaitGroup(&conn.writeMsgPumpWG, defaultWGTimeout); !ok { conn.logger.Warn("writeMsgPumpWG timed out") } conn.logger.Info("stopped write pump") - atomic.StoreInt32(&conn.drained, 1) + // now make sure we spawn a routine to wait for drain and + // close the connection if needed + go conn.waitForDrainAndClose() } } -func (conn *connection) close() { - conn.lk.Lock() - defer conn.lk.Unlock() - - if atomic.LoadInt32(&conn.closed) == 0 { - // First shutdown the write pump to make sure we don't leave any message without ack - conn.stopWritePumpWithLock() - // Now shutdown the read pump and drain all inflight messages - close(conn.closeCh) - if ok := common.AwaitWaitGroup(&conn.readAckPumpWG, defaultWGTimeout); !ok { - conn.logger.Warn("readAckPumpWG timed out") - } - - // Both pumps are shutdown. Close the underlying stream. - if conn.cancel != nil { - conn.cancel() - } - - if conn.inputHostStream != nil { - conn.inputHostStream.Done() - } - atomic.StoreInt32(&conn.closed, 1) - conn.logger.Info("Input host connection closed.") - - // trigger a reconfiguration due to connection closed - select { - case conn.reconfigureCh <- reconfigureInfo{eventType: connClosedReconfigureType, reconfigureID: conn.connKey}: - default: - conn.logger.Info("Reconfigure channel is full. Drop reconfigure command due to connection close.") - } - } +// close always stops the write pump first and then +// waits for some time to make sure we give a chance to +// drain inflight messages and then close the read pump +func (conn *connection) close() { + // First shutdown the write pump to make sure we don't leave any message without ack + conn.stopWritePumpAndClose() } func (conn *connection) writeMessagesPump() { @@ -222,6 +206,7 @@ func (conn *connection) writeMessagesPump() { if err == nil { conn.replyCh <- &messageResponse{pr.message.GetID(), pr.messageAck, pr.message.GetUserContext()} + atomic.AddInt64(&conn.sentMsgs, 1) } else { conn.reporter.IncCounter(metrics.PublishMessageFailedRate, nil, 1) conn.logger.WithField(common.TagMsgID, common.FmtMsgID(pr.message.GetID())).Infof("Error writing message to stream: %v", err) @@ -231,6 +216,7 @@ func (conn *connection) writeMessagesPump() { Error: err, UserContext: pr.message.GetUserContext(), } + conn.writeFailedMsgs++ // Write failed, rebuild connection go conn.close() @@ -256,7 +242,7 @@ func (conn *connection) readAcksPump() { inflightMessages := make(map[string]*messageResponse) // This map is needed when we receive a reply out of order before the inflightMessages is populated earlyReplyAcks := make(map[string]*PublisherReceipt) - defer failInflightMessages(inflightMessages) + defer conn.failInflightMessages(inflightMessages) // Flag which is set when AckStream is closed by InputHost isEOF := false @@ -310,10 +296,10 @@ func (conn *connection) readAcksPump() { // Probably we received the ack even before the inflightMessages map is populated. // Let's put it in the earlyReplyAcks map so we can immediately complete the response when seen by this pump. //conn.logger.WithField(common.TagAckID, common.FmtAckID(ack.GetID())).Debug("Received Ack before populating inflight messages.") - earlyReplyAcks[ack.GetID()] = processMessageAck(ack) + earlyReplyAcks[ack.GetID()] = conn.processMessageAck(ack) } else { delete(inflightMessages, ack.GetID()) - messageResp.completion <- processMessageAck(ack) + messageResp.completion <- conn.processMessageAck(ack) } } else if cmd.GetType() == cherami.InputHostCommandType_RECONFIGURE { conn.reporter.IncCounter(metrics.PublishReconfigureRate, nil, 1) @@ -327,7 +313,7 @@ func (conn *connection) readAcksPump() { // start draining by just stopping the write pump. // this makes sure, we don't send any new messages. // the read pump will exit when the server completes the drain - go conn.stopWritePump() + go conn.stopWritePumpAndClose() rInfo := cmd.Reconfigure conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(rInfo.GetUpdateUUID())).Info("Drain command received from InputHost.") conn.handleReconfigCmd(rInfo) @@ -351,28 +337,112 @@ func (conn *connection) isOpened() bool { // the controller returns the same old extent which is draining, // the server will anyway reject the connection func (conn *connection) isClosed() bool { - return (atomic.LoadInt32(&conn.closed) != 0 || atomic.LoadInt32(&conn.drained) != 0) + return conn.isShuttingDown() } func (e *ackChannelClosedError) Error() string { return "Ack channel closed." } -func processMessageAck(messageAck *cherami.PutMessageAck) *PublisherReceipt { +func (conn *connection) processMessageAck(messageAck *cherami.PutMessageAck) *PublisherReceipt { ret := &PublisherReceipt{ ID: messageAck.GetID(), UserContext: messageAck.GetUserContext(), } - if messageAck.GetStatus() != cherami.Status_OK { + stat := messageAck.GetStatus() + if stat != cherami.Status_OK { + if stat == cherami.Status_THROTTLED { + atomic.AddInt64(&conn.sentThrottled, 1) + } else { + atomic.AddInt64(&conn.sentNacks, 1) + } ret.Error = errors.New(messageAck.GetMessage()) } else { + atomic.AddInt64(&conn.sentAcks, 1) ret.Receipt = messageAck.GetReceipt() } return ret } +// isAlreadyDrained returns true when we have sent response for all the sent messages +// response should include acks, nacks and throttled messages +// Note: failed messages and writeFailed messages should not be counted +func (conn *connection) isAlreadyDrained() bool { + respSent := atomic.LoadInt64(&conn.sentAcks) + atomic.LoadInt64(&conn.sentNacks) + atomic.LoadInt64(&conn.sentThrottled) + if respSent == atomic.LoadInt64(&conn.sentMsgs) { + return true + } + + return false +} + +func (conn *connection) finalClose() { + conn.lk.Lock() + defer conn.lk.Unlock() + + // Now shutdown the read pump and drain all inflight messages + close(conn.closeCh) + if ok := common.AwaitWaitGroup(&conn.readAckPumpWG, defaultWGTimeout); !ok { + conn.logger.Warn("readAckPumpWG timed out") + } + + // Both pumps are shutdown. Close the underlying stream. + if conn.cancel != nil { + conn.cancel() + } + + if conn.inputHostStream != nil { + conn.inputHostStream.Done() + } + + conn.logger.WithFields(bark.Fields{ + `sentMsgs`: conn.sentMsgs, + `sentAcks`: conn.sentAcks, + `sentNacks`: conn.sentNacks, + `sentThrottled`: conn.sentThrottled, + `failedMsgs`: conn.failedMsgs, + `writeFailedMsgs`: conn.writeFailedMsgs, + }).Info("Input host connection closed.") + + // trigger a reconfiguration due to connection closed + select { + case conn.reconfigureCh <- reconfigureInfo{eventType: connClosedReconfigureType, reconfigureID: conn.connKey}: + default: + conn.logger.Info("Reconfigure channel is full. Drop reconfigure command due to connection close.") + } +} + +func (conn *connection) waitForDrainAndClose() { + defer conn.finalClose() + // wait for some timeout period and close the connection + // this is needed because even though server closes the + // connection, we never call "stream.Read" until we have some + // messages in-flight, which will not be the case when we have already drained + drainTimer := time.NewTimer(defaultDrainTimeout) + defer drainTimer.Stop() + + checkDrainTimer := time.NewTimer(defaultCheckDrainTimeout) + defer checkDrainTimer.Stop() + for { + checkDrainTimer.Reset(defaultCheckDrainTimeout) + select { + case <-conn.closeCh: + // already closed + return + case <-checkDrainTimer.C: + // check if we got the acks for all sent messages + if conn.isAlreadyDrained() { + conn.logger.Infof("Inputhost connection drained completely") + return + } + case <-drainTimer.C: + return + } + } +} + // populateInflightMapUtil is used to populate the inflightMessages Map, // based on the acks we received as well. func populateInflightMapUtil(inflightMessages map[string]*messageResponse, earlyReplyAcks map[string]*PublisherReceipt, resCh *messageResponse) { @@ -388,12 +458,13 @@ func populateInflightMapUtil(inflightMessages map[string]*messageResponse, early } } -func failInflightMessages(inflightMessages map[string]*messageResponse) { +func (conn *connection) failInflightMessages(inflightMessages map[string]*messageResponse) { for id, messageResp := range inflightMessages { messageResp.completion <- &PublisherReceipt{ ID: id, Error: &ackChannelClosedError{}, UserContext: messageResp.userContext, } + conn.failedMsgs++ } } diff --git a/client/cherami/connection_test.go b/client/cherami/connection_test.go index 271985e..31b53e9 100644 --- a/client/cherami/connection_test.go +++ b/client/cherami/connection_test.go @@ -23,7 +23,6 @@ package cherami import ( "errors" "io" - "sync/atomic" "testing" "time" @@ -224,9 +223,7 @@ func (s *ConnectionSuite) TestClientDrain() { messagesCh <- putMessageRequest{message, requestDone} <-time.After(10 * time.Millisecond) - // drain must be set - s.Equal(int32(1), atomic.LoadInt32(&conn.drained)) - // closed must return true as well + // we must have drained and so closed must return true s.True(conn.isClosed()) } diff --git a/client/cherami/publisher.go b/client/cherami/publisher.go index 91c8b80..5e906da 100644 --- a/client/cherami/publisher.go +++ b/client/cherami/publisher.go @@ -26,10 +26,10 @@ import ( "sync/atomic" "time" - "github.com/uber/cherami-thrift/.generated/go/cherami" "github.com/uber/cherami-client-go/common" "github.com/uber/cherami-client-go/common/backoff" "github.com/uber/cherami-client-go/common/metrics" + "github.com/uber/cherami-thrift/.generated/go/cherami" "github.com/uber/tchannel-go" "github.com/uber/tchannel-go/thrift" @@ -70,12 +70,12 @@ var _ Publisher = (*publisherImpl)(nil) // NewPublisher constructs a new Publisher object func NewPublisher(client *clientImpl, path string, maxInflightMessagesPerConnection int) Publisher { base := basePublisher{ - client: client, - retryPolicy: createDefaultPublisherRetryPolicy(), - path: path, - logger: client.options.Logger.WithField(common.TagDstPth, common.FmtDstPth(path)), - reporter: client.options.MetricsReporter, - reconfigurationPollingInterval: client.options.ReconfigurationPollingInterval, + client: client, + retryPolicy: createDefaultPublisherRetryPolicy(), + path: path, + logger: client.options.Logger.WithField(common.TagDstPth, common.FmtDstPth(path)), + reporter: client.options.MetricsReporter, + reconfigurationPollingInterval: client.options.ReconfigurationPollingInterval, } publisher := &publisherImpl{ basePublisher: base, @@ -182,8 +182,10 @@ func (s *publisherImpl) Publish(message *PublisherMessage) *PublisherReceipt { select { case receipt = <-srCh: + s.reporter.IncCounter(metrics.PublisherMessageFailed, nil, 1) return receipt.Error case <-timeoutTimer.C: + s.reporter.IncCounter(metrics.PublisherMessageTimedout, nil, 1) return ErrMessageTimedout } } diff --git a/common/metrics/names.go b/common/metrics/names.go index 6ac7916..6907fa3 100644 --- a/common/metrics/names.go +++ b/common/metrics/names.go @@ -55,6 +55,10 @@ const ( PublishNumConnections = "cherami.publish.connections" // PublishNumInflightMessagess is the number of inflight messages hold locally by publisher PublishNumInflightMessagess = "cherami.publish.message.inflights" + // PublisherMessageFailed is the number of failed messages on the publisher + PublisherMessageFailed = "cherami.publisher.message.failed" + // PublisherMessageTimedout is the number of messages timed out on the publisher + PublisherMessageTimedout = "cherami.publisher.message.timedout" // ConsumeMessageRate is the rate of message got from output ConsumeMessageRate = "cherami.consume.message.rate" @@ -91,8 +95,11 @@ var MetricDefs = map[MetricName]MetricType{ PublishMessageLatency: Timer, PublishAckRate: Counter, PublishReconfigureRate: Counter, + PublishDrainRate: Counter, PublishNumConnections: Gauge, PublishNumInflightMessagess: Gauge, + PublisherMessageFailed: Counter, + PublisherMessageTimedout: Counter, ConsumeMessageRate: Counter, ConsumeCreditRate: Counter,