Skip to content
This repository has been archived by the owner on Dec 3, 2019. It is now read-only.

Commit

Permalink
Unblock read acks pump from inputhost (#17)
Browse files Browse the repository at this point in the history
* Unblock read acks pump from inputhost

If the server is draining gracefully, then the server will send a
DRAIN command. The DRAIN command used to simply stop the write pump and
wait for the server to close the stream.

Even though server successfully will close the stream after finishing
the DRAIN, the readAcks pump will never see the EOF because we don't do
a stream.Read() unless we have some messages inflight.

This patch tries to solve that by waiting for a default of a minute and
explicitly closing the connection. In addition this patch also adds some
additional logs and metrics to make sure we can track retries and
failures on publish.

1. We need to make sure if we are already closing/draining we shouldn't
close the connection. This is critical because during reconfig we
could decide to close the connection but in the meanwhile we could
have received a drain command and started draining. In that case we
should just allow the drain process to take care of the rest.
2. Instead of waiting for the entire timeout period, we can ideally just
check for the number of responses we received and bail out immediately.

Fix connection close to make sure we always wait for sometime to give
a chance to drain.

We can avoid using the atomic variables for closed and we can just rely
on the shutting down channel to make sure we close is idempotent.

* Make sure we wait for the final drain timeout

If the checkDrainTimer fires, we exit immediately even if the
drain has not finished yet. But we should wait for the bigger
timeout as well.
  • Loading branch information
aravindvs authored Apr 13, 2017
1 parent de07d3a commit 342225e
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 58 deletions.
165 changes: 118 additions & 47 deletions client/cherami/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,15 @@ type (
logger bark.Logger
reporter metrics.Reporter

lk sync.Mutex
opened int32
closed int32
drained int32
lk sync.Mutex
opened int32

sentMsgs int64 // no: of messages sent to the inputhost
sentAcks int64 // no: of messages successfully acked
sentNacks int64 // no: of messages nacked (STATUS_FAILED)
sentThrottled int64 // no: of messages throttled
failedMsgs int64 // no: of failed messages *after* we successfully wrote to the stream
writeFailedMsgs int64 // no: of messages failed to even write to the stream
}

// This struct is created by writePump after writing message to stream.
Expand All @@ -80,6 +85,8 @@ type (
const (
defaultMaxInflightMessages = 1000
defaultWGTimeout = time.Minute
defaultDrainTimeout = time.Minute
defaultCheckDrainTimeout = 5 * time.Second
)

func newConnection(client cherami.TChanBIn, wsConnector WSConnector, path string, messages <-chan putMessageRequest,
Expand Down Expand Up @@ -153,55 +160,32 @@ func (conn *connection) isShuttingDown() bool {
}

// stopWritePump should drain the pump after getting the lock
func (conn *connection) stopWritePump() {
func (conn *connection) stopWritePumpAndClose() {
conn.lk.Lock()
conn.stopWritePumpWithLock()
conn.stopWritePumpWithLockAndClose()
conn.lk.Unlock()
}

// Note: this needs to be called with the conn lock held!
func (conn *connection) stopWritePumpWithLock() {
func (conn *connection) stopWritePumpWithLockAndClose() {
if !conn.isShuttingDown() {
close(conn.shuttingDownCh)
if ok := common.AwaitWaitGroup(&conn.writeMsgPumpWG, defaultWGTimeout); !ok {
conn.logger.Warn("writeMsgPumpWG timed out")
}
conn.logger.Info("stopped write pump")
atomic.StoreInt32(&conn.drained, 1)
// now make sure we spawn a routine to wait for drain and
// close the connection if needed
go conn.waitForDrainAndClose()
}
}
func (conn *connection) close() {
conn.lk.Lock()
defer conn.lk.Unlock()

if atomic.LoadInt32(&conn.closed) == 0 {
// First shutdown the write pump to make sure we don't leave any message without ack
conn.stopWritePumpWithLock()
// Now shutdown the read pump and drain all inflight messages
close(conn.closeCh)
if ok := common.AwaitWaitGroup(&conn.readAckPumpWG, defaultWGTimeout); !ok {
conn.logger.Warn("readAckPumpWG timed out")
}

// Both pumps are shutdown. Close the underlying stream.
if conn.cancel != nil {
conn.cancel()
}

if conn.inputHostStream != nil {
conn.inputHostStream.Done()
}

atomic.StoreInt32(&conn.closed, 1)
conn.logger.Info("Input host connection closed.")

// trigger a reconfiguration due to connection closed
select {
case conn.reconfigureCh <- reconfigureInfo{eventType: connClosedReconfigureType, reconfigureID: conn.connKey}:
default:
conn.logger.Info("Reconfigure channel is full. Drop reconfigure command due to connection close.")
}
}
// close always stops the write pump first and then
// waits for some time to make sure we give a chance to
// drain inflight messages and then close the read pump
func (conn *connection) close() {
// First shutdown the write pump to make sure we don't leave any message without ack
conn.stopWritePumpAndClose()
}

func (conn *connection) writeMessagesPump() {
Expand All @@ -222,6 +206,7 @@ func (conn *connection) writeMessagesPump() {

if err == nil {
conn.replyCh <- &messageResponse{pr.message.GetID(), pr.messageAck, pr.message.GetUserContext()}
atomic.AddInt64(&conn.sentMsgs, 1)
} else {
conn.reporter.IncCounter(metrics.PublishMessageFailedRate, nil, 1)
conn.logger.WithField(common.TagMsgID, common.FmtMsgID(pr.message.GetID())).Infof("Error writing message to stream: %v", err)
Expand All @@ -231,6 +216,7 @@ func (conn *connection) writeMessagesPump() {
Error: err,
UserContext: pr.message.GetUserContext(),
}
conn.writeFailedMsgs++

// Write failed, rebuild connection
go conn.close()
Expand All @@ -256,7 +242,7 @@ func (conn *connection) readAcksPump() {
inflightMessages := make(map[string]*messageResponse)
// This map is needed when we receive a reply out of order before the inflightMessages is populated
earlyReplyAcks := make(map[string]*PublisherReceipt)
defer failInflightMessages(inflightMessages)
defer conn.failInflightMessages(inflightMessages)

// Flag which is set when AckStream is closed by InputHost
isEOF := false
Expand Down Expand Up @@ -310,10 +296,10 @@ func (conn *connection) readAcksPump() {
// Probably we received the ack even before the inflightMessages map is populated.
// Let's put it in the earlyReplyAcks map so we can immediately complete the response when seen by this pump.
//conn.logger.WithField(common.TagAckID, common.FmtAckID(ack.GetID())).Debug("Received Ack before populating inflight messages.")
earlyReplyAcks[ack.GetID()] = processMessageAck(ack)
earlyReplyAcks[ack.GetID()] = conn.processMessageAck(ack)
} else {
delete(inflightMessages, ack.GetID())
messageResp.completion <- processMessageAck(ack)
messageResp.completion <- conn.processMessageAck(ack)
}
} else if cmd.GetType() == cherami.InputHostCommandType_RECONFIGURE {
conn.reporter.IncCounter(metrics.PublishReconfigureRate, nil, 1)
Expand All @@ -327,7 +313,7 @@ func (conn *connection) readAcksPump() {
// start draining by just stopping the write pump.
// this makes sure, we don't send any new messages.
// the read pump will exit when the server completes the drain
go conn.stopWritePump()
go conn.stopWritePumpAndClose()
rInfo := cmd.Reconfigure
conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(rInfo.GetUpdateUUID())).Info("Drain command received from InputHost.")
conn.handleReconfigCmd(rInfo)
Expand All @@ -351,28 +337,112 @@ func (conn *connection) isOpened() bool {
// the controller returns the same old extent which is draining,
// the server will anyway reject the connection
func (conn *connection) isClosed() bool {
return (atomic.LoadInt32(&conn.closed) != 0 || atomic.LoadInt32(&conn.drained) != 0)
return conn.isShuttingDown()
}

func (e *ackChannelClosedError) Error() string {
return "Ack channel closed."
}

func processMessageAck(messageAck *cherami.PutMessageAck) *PublisherReceipt {
func (conn *connection) processMessageAck(messageAck *cherami.PutMessageAck) *PublisherReceipt {
ret := &PublisherReceipt{
ID: messageAck.GetID(),
UserContext: messageAck.GetUserContext(),
}

if messageAck.GetStatus() != cherami.Status_OK {
stat := messageAck.GetStatus()
if stat != cherami.Status_OK {
if stat == cherami.Status_THROTTLED {
atomic.AddInt64(&conn.sentThrottled, 1)
} else {
atomic.AddInt64(&conn.sentNacks, 1)
}
ret.Error = errors.New(messageAck.GetMessage())
} else {
atomic.AddInt64(&conn.sentAcks, 1)
ret.Receipt = messageAck.GetReceipt()
}

return ret
}

// isAlreadyDrained returns true when we have sent response for all the sent messages
// response should include acks, nacks and throttled messages
// Note: failed messages and writeFailed messages should not be counted
func (conn *connection) isAlreadyDrained() bool {
respSent := atomic.LoadInt64(&conn.sentAcks) + atomic.LoadInt64(&conn.sentNacks) + atomic.LoadInt64(&conn.sentThrottled)
if respSent == atomic.LoadInt64(&conn.sentMsgs) {
return true
}

return false
}

func (conn *connection) finalClose() {
conn.lk.Lock()
defer conn.lk.Unlock()

// Now shutdown the read pump and drain all inflight messages
close(conn.closeCh)
if ok := common.AwaitWaitGroup(&conn.readAckPumpWG, defaultWGTimeout); !ok {
conn.logger.Warn("readAckPumpWG timed out")
}

// Both pumps are shutdown. Close the underlying stream.
if conn.cancel != nil {
conn.cancel()
}

if conn.inputHostStream != nil {
conn.inputHostStream.Done()
}

conn.logger.WithFields(bark.Fields{
`sentMsgs`: conn.sentMsgs,
`sentAcks`: conn.sentAcks,
`sentNacks`: conn.sentNacks,
`sentThrottled`: conn.sentThrottled,
`failedMsgs`: conn.failedMsgs,
`writeFailedMsgs`: conn.writeFailedMsgs,
}).Info("Input host connection closed.")

// trigger a reconfiguration due to connection closed
select {
case conn.reconfigureCh <- reconfigureInfo{eventType: connClosedReconfigureType, reconfigureID: conn.connKey}:
default:
conn.logger.Info("Reconfigure channel is full. Drop reconfigure command due to connection close.")
}
}

func (conn *connection) waitForDrainAndClose() {
defer conn.finalClose()
// wait for some timeout period and close the connection
// this is needed because even though server closes the
// connection, we never call "stream.Read" until we have some
// messages in-flight, which will not be the case when we have already drained
drainTimer := time.NewTimer(defaultDrainTimeout)
defer drainTimer.Stop()

checkDrainTimer := time.NewTimer(defaultCheckDrainTimeout)
defer checkDrainTimer.Stop()
for {
checkDrainTimer.Reset(defaultCheckDrainTimeout)
select {
case <-conn.closeCh:
// already closed
return
case <-checkDrainTimer.C:
// check if we got the acks for all sent messages
if conn.isAlreadyDrained() {
conn.logger.Infof("Inputhost connection drained completely")
return
}
case <-drainTimer.C:
return
}
}
}

// populateInflightMapUtil is used to populate the inflightMessages Map,
// based on the acks we received as well.
func populateInflightMapUtil(inflightMessages map[string]*messageResponse, earlyReplyAcks map[string]*PublisherReceipt, resCh *messageResponse) {
Expand All @@ -388,12 +458,13 @@ func populateInflightMapUtil(inflightMessages map[string]*messageResponse, early
}
}

func failInflightMessages(inflightMessages map[string]*messageResponse) {
func (conn *connection) failInflightMessages(inflightMessages map[string]*messageResponse) {
for id, messageResp := range inflightMessages {
messageResp.completion <- &PublisherReceipt{
ID: id,
Error: &ackChannelClosedError{},
UserContext: messageResp.userContext,
}
conn.failedMsgs++
}
}
5 changes: 1 addition & 4 deletions client/cherami/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package cherami
import (
"errors"
"io"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -224,9 +223,7 @@ func (s *ConnectionSuite) TestClientDrain() {

messagesCh <- putMessageRequest{message, requestDone}
<-time.After(10 * time.Millisecond)
// drain must be set
s.Equal(int32(1), atomic.LoadInt32(&conn.drained))
// closed must return true as well
// we must have drained and so closed must return true
s.True(conn.isClosed())
}

Expand Down
16 changes: 9 additions & 7 deletions client/cherami/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import (
"sync/atomic"
"time"

"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/cherami-client-go/common"
"github.com/uber/cherami-client-go/common/backoff"
"github.com/uber/cherami-client-go/common/metrics"
"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/thrift"

Expand Down Expand Up @@ -70,12 +70,12 @@ var _ Publisher = (*publisherImpl)(nil)
// NewPublisher constructs a new Publisher object
func NewPublisher(client *clientImpl, path string, maxInflightMessagesPerConnection int) Publisher {
base := basePublisher{
client: client,
retryPolicy: createDefaultPublisherRetryPolicy(),
path: path,
logger: client.options.Logger.WithField(common.TagDstPth, common.FmtDstPth(path)),
reporter: client.options.MetricsReporter,
reconfigurationPollingInterval: client.options.ReconfigurationPollingInterval,
client: client,
retryPolicy: createDefaultPublisherRetryPolicy(),
path: path,
logger: client.options.Logger.WithField(common.TagDstPth, common.FmtDstPth(path)),
reporter: client.options.MetricsReporter,
reconfigurationPollingInterval: client.options.ReconfigurationPollingInterval,
}
publisher := &publisherImpl{
basePublisher: base,
Expand Down Expand Up @@ -182,8 +182,10 @@ func (s *publisherImpl) Publish(message *PublisherMessage) *PublisherReceipt {

select {
case receipt = <-srCh:
s.reporter.IncCounter(metrics.PublisherMessageFailed, nil, 1)
return receipt.Error
case <-timeoutTimer.C:
s.reporter.IncCounter(metrics.PublisherMessageTimedout, nil, 1)
return ErrMessageTimedout
}
}
Expand Down
7 changes: 7 additions & 0 deletions common/metrics/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ const (
PublishNumConnections = "cherami.publish.connections"
// PublishNumInflightMessagess is the number of inflight messages hold locally by publisher
PublishNumInflightMessagess = "cherami.publish.message.inflights"
// PublisherMessageFailed is the number of failed messages on the publisher
PublisherMessageFailed = "cherami.publisher.message.failed"
// PublisherMessageTimedout is the number of messages timed out on the publisher
PublisherMessageTimedout = "cherami.publisher.message.timedout"

// ConsumeMessageRate is the rate of message got from output
ConsumeMessageRate = "cherami.consume.message.rate"
Expand Down Expand Up @@ -91,8 +95,11 @@ var MetricDefs = map[MetricName]MetricType{
PublishMessageLatency: Timer,
PublishAckRate: Counter,
PublishReconfigureRate: Counter,
PublishDrainRate: Counter,
PublishNumConnections: Gauge,
PublishNumInflightMessagess: Gauge,
PublisherMessageFailed: Counter,
PublisherMessageTimedout: Counter,

ConsumeMessageRate: Counter,
ConsumeCreditRate: Counter,
Expand Down

0 comments on commit 342225e

Please sign in to comment.