Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
write-pump error should not close read-pump
Browse files Browse the repository at this point in the history
  • Loading branch information
kiranrg committed May 4, 2017
1 parent 0486887 commit 62fa669
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 53 deletions.
8 changes: 2 additions & 6 deletions services/outputhost/extcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import (
"sync"
"time"

"golang.org/x/net/context"

"github.com/uber-common/bark"
"github.com/uber/tchannel-go/thrift"

Expand Down Expand Up @@ -202,7 +200,6 @@ func (extCache *extentCache) load(

func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence common.SequenceNumber, startIndex int) (repl *replicaConnection, pickedIndex int, err error) {
var call serverStream.BStoreOpenReadStreamOutCall
var cancel context.CancelFunc
extUUID := extCache.extUUID

startIndex--
Expand Down Expand Up @@ -309,7 +306,6 @@ func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence
logger.WithField(common.TagErr, err).Error(`outputhost: Websocket dial store replica: failed`)
return
}
cancel = nil

// successfully opened read stream on the replica; save this index
if startSequence != 0 {
Expand All @@ -319,7 +315,7 @@ func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence
logger.WithField(`startIndex`, startIndex).Debug(`opened read stream`)
pickedIndex = startIndex
replicaConnectionName := fmt.Sprintf(`replicaConnection{Extent: %s, Store: %s}`, extUUID, storeUUID)
repl = newReplicaConnection(call, extCache, cancel, replicaConnectionName, logger, startSequence)
repl = newReplicaConnection(call, extCache, replicaConnectionName, logger, startSequence)
// all open connections should be closed before shutdown
extCache.shutdownWG.Add(1)
repl.open()
Expand Down Expand Up @@ -393,7 +389,7 @@ func (extCache *extentCache) loadKafkaStream(

// Setup the replicaConnection
replicaConnectionName := fmt.Sprintf(`replicaConnection{Extent: %s, kafkaCluster: %s}`, extCache.extUUID, kafkaCluster)
repl = newReplicaConnection(call, extCache, nil, replicaConnectionName, extCache.logger, 0)
repl = newReplicaConnection(call, extCache, replicaConnectionName, extCache.logger, 0)
extCache.shutdownWG.Add(1)
repl.open()
return
Expand Down
85 changes: 38 additions & 47 deletions services/outputhost/replicaconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"sync"
"time"

"golang.org/x/net/context"

"github.com/uber-common/bark"

"github.com/uber/cherami-server/common"
Expand Down Expand Up @@ -53,7 +51,6 @@ type (
call storeStream.BStoreOpenReadStreamOutCall
msgsCh chan<- *cherami.ConsumerMessage
initialCredits int32
cancel context.CancelFunc
closeChannel chan struct{}
connectionsClosedCh chan<- error
readMsgsCh chan int32
Expand All @@ -74,22 +71,21 @@ type (
// one message at a time, and read on close after the pumps
// are done; therefore, these do not require to be protected
// for concurrent access
sentCreds int64 // total credits sent
recvMsgs int64 // total messages received
sentCreds int32 // total credits sent
recvMsgs int32 // total messages received

// consumerM3Client for metrics per consumer group
consumerM3Client metrics.Client
}
)

func newReplicaConnection(stream storeStream.BStoreOpenReadStreamOutCall, extCache *extentCache, cancel context.CancelFunc,
func newReplicaConnection(stream storeStream.BStoreOpenReadStreamOutCall, extCache *extentCache,
replicaConnectionName string, logger bark.Logger, startingSequence common.SequenceNumber) *replicaConnection {

conn := &replicaConnection{
call: stream,
msgsCh: extCache.msgsCh,
initialCredits: extCache.initialCredits,
cancel: cancel,
shutdownWG: extCache.shutdownWG,
closeChannel: make(chan struct{}),
readMsgsCh: make(chan int32, replicaConnectionCreditsChBuffer),
Expand Down Expand Up @@ -143,20 +139,6 @@ func (conn *replicaConnection) close(err error) {
conn.lk.Unlock()
}

func (conn *replicaConnection) sendCreditsToStore(credits int32) error {
cFlow := cherami.NewControlFlow()
cFlow.Credits = common.Int32Ptr(credits)
//conn.logger.WithField(`Credits`,cFlow.GetCredits()).Debug(`Sending credits!!`)
err := conn.call.Write(cFlow)
if err == nil {
err = conn.call.Flush()
}

conn.sentCreds += int64(credits)

return err
}

// updateSuccessfulSendToMsgCh updates the local counter and records metric as well
func (conn *replicaConnection) updateSuccessfulSendToMsgsCh(localMsgs *int32, length int64) {
*localMsgs++
Expand Down Expand Up @@ -335,32 +317,32 @@ func (conn *replicaConnection) readMessagesPump() {
}
}

func (conn *replicaConnection) utilSendCredits(credits int32, numMsgsRead *int32, totalCreditsSent *int32) {
// conn.logger.WithField(`credits`, credits).Debug(`Sending credits to store.`)
if err := conn.sendCreditsToStore(credits); err != nil {
conn.logger.WithField(common.TagErr, err).Error(`error writing credits to store`)

go conn.close(nil)
}
*numMsgsRead = *numMsgsRead - credits
*totalCreditsSent = *totalCreditsSent + credits
}

func (conn *replicaConnection) writeCreditsPump() {
defer conn.waitWG.Done()
defer conn.call.Done()
if conn.cancel != nil {
defer conn.cancel()
}

totalCreditsSent := conn.initialCredits
sendCreditsToStore := func(credits int32) (err error) {

//conn.logger.WithField(`Credits`,cFlow.GetCredits()).Debug(`Sending credits!!`)

cFlow := cherami.NewControlFlow()
cFlow.Credits = common.Int32Ptr(credits)
if err = conn.call.Write(cFlow); err != nil {
conn.logger.WithField(common.TagErr, err).Error(`error writing credits to store`)
return
}

// send initial credits to the store.. so that we can
// actually start reading
err = conn.call.Flush()
conn.sentCreds += credits
return
}

// send initial credits to the store.. so that we can actually start reading
conn.logger.WithField(`initialCredits`, conn.initialCredits).Info(`writeCreditsPump: sending initial credits to store`)
if err := conn.sendCreditsToStore(conn.initialCredits); err != nil {
conn.logger.WithField(common.TagErr, err).Error(`error writing initial credits to store`)
if err := sendCreditsToStore(conn.initialCredits); err != nil {
conn.logger.WithField(common.TagErr, err).Error(`writeCreditsPump: error writing initial credits to store`)

// if sending of initial credits failed, then close this connection
go conn.close(nil)
return
}
Expand All @@ -382,20 +364,26 @@ func (conn *replicaConnection) writeCreditsPump() {
if numMsgsRead > creditBatchSize {
select {
case credits := <-conn.creditNotifyCh: // this is the common credit channel from redelivery cache
conn.utilSendCredits(credits, &numMsgsRead, &totalCreditsSent)
if err := sendCreditsToStore(credits); err != nil {
return
}
numMsgsRead -= credits
case credits := <-conn.localCreditCh: // this is the local credits channel only for this connection
conn.utilSendCredits(credits, &numMsgsRead, &totalCreditsSent)
if err := sendCreditsToStore(credits); err != nil {
return
}
numMsgsRead -= credits
case msgsRead := <-conn.readMsgsCh:
numMsgsRead += msgsRead
case <-creditRequestTicker.C:
// if we didn't get any credits for a long time and we are starving for
// credits, then request some credits
if numMsgsRead >= totalCreditsSent {
conn.logger.Warn("WritecreditsPump starving for credits: requesting more")
if numMsgsRead >= conn.sentCreds {
conn.logger.Warn("writeCreditsPump: starving for credits, requesting more")
conn.extCache.requestCredits()
}
case <-conn.closeChannel:
conn.logger.Info("WriteCreditsPump closing due to connection closed.")
conn.logger.Info("writeCreditsPump: connection closed")
return
}
} else {
Expand All @@ -405,9 +393,12 @@ func (conn *replicaConnection) writeCreditsPump() {
// we should listen on the local credits ch, just in case we requested for some credits
// above and we are just getting it now
case credits := <-conn.localCreditCh:
conn.utilSendCredits(credits, &numMsgsRead, &totalCreditsSent)
if err := sendCreditsToStore(credits); err != nil {
return
}
numMsgsRead -= credits
case <-conn.closeChannel:
conn.logger.Info("WriteCreditsPump closing due to connection closed.")
conn.logger.Info("writeCreditsPump: connection closed")
return
}
}
Expand Down

0 comments on commit 62fa669

Please sign in to comment.