From 62fa6695c1063b3be5ef51bb5e115a310e0acced Mon Sep 17 00:00:00 2001 From: kiranrg Date: Thu, 4 May 2017 14:07:12 -0700 Subject: [PATCH 1/2] write-pump error should not close read-pump --- services/outputhost/extcache.go | 8 +-- services/outputhost/replicaconnection.go | 85 +++++++++++------------- 2 files changed, 40 insertions(+), 53 deletions(-) diff --git a/services/outputhost/extcache.go b/services/outputhost/extcache.go index be607d32..d4624968 100644 --- a/services/outputhost/extcache.go +++ b/services/outputhost/extcache.go @@ -31,8 +31,6 @@ import ( "sync" "time" - "golang.org/x/net/context" - "github.com/uber-common/bark" "github.com/uber/tchannel-go/thrift" @@ -202,7 +200,6 @@ func (extCache *extentCache) load( func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence common.SequenceNumber, startIndex int) (repl *replicaConnection, pickedIndex int, err error) { var call serverStream.BStoreOpenReadStreamOutCall - var cancel context.CancelFunc extUUID := extCache.extUUID startIndex-- @@ -309,7 +306,6 @@ func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence logger.WithField(common.TagErr, err).Error(`outputhost: Websocket dial store replica: failed`) return } - cancel = nil // successfully opened read stream on the replica; save this index if startSequence != 0 { @@ -319,7 +315,7 @@ func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence logger.WithField(`startIndex`, startIndex).Debug(`opened read stream`) pickedIndex = startIndex replicaConnectionName := fmt.Sprintf(`replicaConnection{Extent: %s, Store: %s}`, extUUID, storeUUID) - repl = newReplicaConnection(call, extCache, cancel, replicaConnectionName, logger, startSequence) + repl = newReplicaConnection(call, extCache, replicaConnectionName, logger, startSequence) // all open connections should be closed before shutdown extCache.shutdownWG.Add(1) repl.open() @@ -393,7 +389,7 @@ func (extCache *extentCache) loadKafkaStream( // Setup the replicaConnection replicaConnectionName := fmt.Sprintf(`replicaConnection{Extent: %s, kafkaCluster: %s}`, extCache.extUUID, kafkaCluster) - repl = newReplicaConnection(call, extCache, nil, replicaConnectionName, extCache.logger, 0) + repl = newReplicaConnection(call, extCache, replicaConnectionName, extCache.logger, 0) extCache.shutdownWG.Add(1) repl.open() return diff --git a/services/outputhost/replicaconnection.go b/services/outputhost/replicaconnection.go index 275d88c7..99df60c9 100644 --- a/services/outputhost/replicaconnection.go +++ b/services/outputhost/replicaconnection.go @@ -24,8 +24,6 @@ import ( "sync" "time" - "golang.org/x/net/context" - "github.com/uber-common/bark" "github.com/uber/cherami-server/common" @@ -53,7 +51,6 @@ type ( call storeStream.BStoreOpenReadStreamOutCall msgsCh chan<- *cherami.ConsumerMessage initialCredits int32 - cancel context.CancelFunc closeChannel chan struct{} connectionsClosedCh chan<- error readMsgsCh chan int32 @@ -74,22 +71,21 @@ type ( // one message at a time, and read on close after the pumps // are done; therefore, these do not require to be protected // for concurrent access - sentCreds int64 // total credits sent - recvMsgs int64 // total messages received + sentCreds int32 // total credits sent + recvMsgs int32 // total messages received // consumerM3Client for metrics per consumer group consumerM3Client metrics.Client } ) -func newReplicaConnection(stream storeStream.BStoreOpenReadStreamOutCall, extCache *extentCache, cancel context.CancelFunc, +func newReplicaConnection(stream storeStream.BStoreOpenReadStreamOutCall, extCache *extentCache, replicaConnectionName string, logger bark.Logger, startingSequence common.SequenceNumber) *replicaConnection { conn := &replicaConnection{ call: stream, msgsCh: extCache.msgsCh, initialCredits: extCache.initialCredits, - cancel: cancel, shutdownWG: extCache.shutdownWG, closeChannel: make(chan struct{}), readMsgsCh: make(chan int32, replicaConnectionCreditsChBuffer), @@ -143,20 +139,6 @@ func (conn *replicaConnection) close(err error) { conn.lk.Unlock() } -func (conn *replicaConnection) sendCreditsToStore(credits int32) error { - cFlow := cherami.NewControlFlow() - cFlow.Credits = common.Int32Ptr(credits) - //conn.logger.WithField(`Credits`,cFlow.GetCredits()).Debug(`Sending credits!!`) - err := conn.call.Write(cFlow) - if err == nil { - err = conn.call.Flush() - } - - conn.sentCreds += int64(credits) - - return err -} - // updateSuccessfulSendToMsgCh updates the local counter and records metric as well func (conn *replicaConnection) updateSuccessfulSendToMsgsCh(localMsgs *int32, length int64) { *localMsgs++ @@ -335,32 +317,32 @@ func (conn *replicaConnection) readMessagesPump() { } } -func (conn *replicaConnection) utilSendCredits(credits int32, numMsgsRead *int32, totalCreditsSent *int32) { - // conn.logger.WithField(`credits`, credits).Debug(`Sending credits to store.`) - if err := conn.sendCreditsToStore(credits); err != nil { - conn.logger.WithField(common.TagErr, err).Error(`error writing credits to store`) - - go conn.close(nil) - } - *numMsgsRead = *numMsgsRead - credits - *totalCreditsSent = *totalCreditsSent + credits -} - func (conn *replicaConnection) writeCreditsPump() { defer conn.waitWG.Done() defer conn.call.Done() - if conn.cancel != nil { - defer conn.cancel() - } - totalCreditsSent := conn.initialCredits + sendCreditsToStore := func(credits int32) (err error) { + + //conn.logger.WithField(`Credits`,cFlow.GetCredits()).Debug(`Sending credits!!`) + + cFlow := cherami.NewControlFlow() + cFlow.Credits = common.Int32Ptr(credits) + if err = conn.call.Write(cFlow); err != nil { + conn.logger.WithField(common.TagErr, err).Error(`error writing credits to store`) + return + } - // send initial credits to the store.. so that we can - // actually start reading + err = conn.call.Flush() + conn.sentCreds += credits + return + } + + // send initial credits to the store.. so that we can actually start reading conn.logger.WithField(`initialCredits`, conn.initialCredits).Info(`writeCreditsPump: sending initial credits to store`) - if err := conn.sendCreditsToStore(conn.initialCredits); err != nil { - conn.logger.WithField(common.TagErr, err).Error(`error writing initial credits to store`) + if err := sendCreditsToStore(conn.initialCredits); err != nil { + conn.logger.WithField(common.TagErr, err).Error(`writeCreditsPump: error writing initial credits to store`) + // if sending of initial credits failed, then close this connection go conn.close(nil) return } @@ -382,20 +364,26 @@ func (conn *replicaConnection) writeCreditsPump() { if numMsgsRead > creditBatchSize { select { case credits := <-conn.creditNotifyCh: // this is the common credit channel from redelivery cache - conn.utilSendCredits(credits, &numMsgsRead, &totalCreditsSent) + if err := sendCreditsToStore(credits); err != nil { + return + } + numMsgsRead -= credits case credits := <-conn.localCreditCh: // this is the local credits channel only for this connection - conn.utilSendCredits(credits, &numMsgsRead, &totalCreditsSent) + if err := sendCreditsToStore(credits); err != nil { + return + } + numMsgsRead -= credits case msgsRead := <-conn.readMsgsCh: numMsgsRead += msgsRead case <-creditRequestTicker.C: // if we didn't get any credits for a long time and we are starving for // credits, then request some credits - if numMsgsRead >= totalCreditsSent { - conn.logger.Warn("WritecreditsPump starving for credits: requesting more") + if numMsgsRead >= conn.sentCreds { + conn.logger.Warn("writeCreditsPump: starving for credits, requesting more") conn.extCache.requestCredits() } case <-conn.closeChannel: - conn.logger.Info("WriteCreditsPump closing due to connection closed.") + conn.logger.Info("writeCreditsPump: connection closed") return } } else { @@ -405,9 +393,12 @@ func (conn *replicaConnection) writeCreditsPump() { // we should listen on the local credits ch, just in case we requested for some credits // above and we are just getting it now case credits := <-conn.localCreditCh: - conn.utilSendCredits(credits, &numMsgsRead, &totalCreditsSent) + if err := sendCreditsToStore(credits); err != nil { + return + } + numMsgsRead -= credits case <-conn.closeChannel: - conn.logger.Info("WriteCreditsPump closing due to connection closed.") + conn.logger.Info("writeCreditsPump: connection closed") return } } From febd42b357e540aaf7161d0aae0b27c997bfb4e1 Mon Sep 17 00:00:00 2001 From: kiranrg Date: Fri, 5 May 2017 13:11:36 -0700 Subject: [PATCH 2/2] credit refresh --- services/outputhost/replicaconnection.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/services/outputhost/replicaconnection.go b/services/outputhost/replicaconnection.go index 99df60c9..b086343e 100644 --- a/services/outputhost/replicaconnection.go +++ b/services/outputhost/replicaconnection.go @@ -321,6 +321,9 @@ func (conn *replicaConnection) writeCreditsPump() { defer conn.waitWG.Done() defer conn.call.Done() + var localCredits int32 // credits accumulated + var creditRefresh time.Time + sendCreditsToStore := func(credits int32) (err error) { //conn.logger.WithField(`Credits`,cFlow.GetCredits()).Debug(`Sending credits!!`) @@ -334,9 +337,13 @@ func (conn *replicaConnection) writeCreditsPump() { err = conn.call.Flush() conn.sentCreds += credits + localCredits -= credits + creditRefresh = time.Now() return } + localCredits = conn.initialCredits + // send initial credits to the store.. so that we can actually start reading conn.logger.WithField(`initialCredits`, conn.initialCredits).Info(`writeCreditsPump: sending initial credits to store`) if err := sendCreditsToStore(conn.initialCredits); err != nil { @@ -354,31 +361,28 @@ func (conn *replicaConnection) writeCreditsPump() { creditBatchSize = minCreditBatchSize } - var numMsgsRead int32 creditRequestTicker := time.NewTicker(creditRequestTimeout) defer creditRequestTicker.Stop() // Start the write pump for { // listen for credits only if we satisfy the batch size - if numMsgsRead > creditBatchSize { + if localCredits > creditBatchSize { select { case credits := <-conn.creditNotifyCh: // this is the common credit channel from redelivery cache if err := sendCreditsToStore(credits); err != nil { return } - numMsgsRead -= credits case credits := <-conn.localCreditCh: // this is the local credits channel only for this connection if err := sendCreditsToStore(credits); err != nil { return } - numMsgsRead -= credits case msgsRead := <-conn.readMsgsCh: - numMsgsRead += msgsRead + localCredits += msgsRead case <-creditRequestTicker.C: // if we didn't get any credits for a long time and we are starving for // credits, then request some credits - if numMsgsRead >= conn.sentCreds { + if time.Since(creditRefresh) > creditRequestTimeout { conn.logger.Warn("writeCreditsPump: starving for credits, requesting more") conn.extCache.requestCredits() } @@ -389,14 +393,13 @@ func (conn *replicaConnection) writeCreditsPump() { } else { select { case msgsRead := <-conn.readMsgsCh: - numMsgsRead += msgsRead + localCredits += msgsRead // we should listen on the local credits ch, just in case we requested for some credits // above and we are just getting it now case credits := <-conn.localCreditCh: if err := sendCreditsToStore(credits); err != nil { return } - numMsgsRead -= credits case <-conn.closeChannel: conn.logger.Info("writeCreditsPump: connection closed") return