Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Outputhost replica stream: write-pump error should not close read-pump #190

Merged
merged 2 commits into from
May 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions services/outputhost/extcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import (
"sync"
"time"

"golang.org/x/net/context"

"github.com/uber-common/bark"
"github.com/uber/tchannel-go/thrift"

Expand Down Expand Up @@ -202,7 +200,6 @@ func (extCache *extentCache) load(

func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence common.SequenceNumber, startIndex int) (repl *replicaConnection, pickedIndex int, err error) {
var call serverStream.BStoreOpenReadStreamOutCall
var cancel context.CancelFunc
extUUID := extCache.extUUID

startIndex--
Expand Down Expand Up @@ -309,7 +306,6 @@ func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence
logger.WithField(common.TagErr, err).Error(`outputhost: Websocket dial store replica: failed`)
return
}
cancel = nil

// successfully opened read stream on the replica; save this index
if startSequence != 0 {
Expand All @@ -319,7 +315,7 @@ func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence
logger.WithField(`startIndex`, startIndex).Debug(`opened read stream`)
pickedIndex = startIndex
replicaConnectionName := fmt.Sprintf(`replicaConnection{Extent: %s, Store: %s}`, extUUID, storeUUID)
repl = newReplicaConnection(call, extCache, cancel, replicaConnectionName, logger, startSequence)
repl = newReplicaConnection(call, extCache, replicaConnectionName, logger, startSequence)
// all open connections should be closed before shutdown
extCache.shutdownWG.Add(1)
repl.open()
Expand Down Expand Up @@ -393,7 +389,7 @@ func (extCache *extentCache) loadKafkaStream(

// Setup the replicaConnection
replicaConnectionName := fmt.Sprintf(`replicaConnection{Extent: %s, kafkaCluster: %s}`, extCache.extUUID, kafkaCluster)
repl = newReplicaConnection(call, extCache, nil, replicaConnectionName, extCache.logger, 0)
repl = newReplicaConnection(call, extCache, replicaConnectionName, extCache.logger, 0)
extCache.shutdownWG.Add(1)
repl.open()
return
Expand Down
94 changes: 44 additions & 50 deletions services/outputhost/replicaconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"sync"
"time"

"golang.org/x/net/context"

"github.com/uber-common/bark"

"github.com/uber/cherami-server/common"
Expand Down Expand Up @@ -53,7 +51,6 @@ type (
call storeStream.BStoreOpenReadStreamOutCall
msgsCh chan<- *cherami.ConsumerMessage
initialCredits int32
cancel context.CancelFunc
closeChannel chan struct{}
connectionsClosedCh chan<- error
readMsgsCh chan int32
Expand All @@ -74,22 +71,21 @@ type (
// one message at a time, and read on close after the pumps
// are done; therefore, these do not require to be protected
// for concurrent access
sentCreds int64 // total credits sent
recvMsgs int64 // total messages received
sentCreds int32 // total credits sent
recvMsgs int32 // total messages received

// consumerM3Client for metrics per consumer group
consumerM3Client metrics.Client
}
)

func newReplicaConnection(stream storeStream.BStoreOpenReadStreamOutCall, extCache *extentCache, cancel context.CancelFunc,
func newReplicaConnection(stream storeStream.BStoreOpenReadStreamOutCall, extCache *extentCache,
replicaConnectionName string, logger bark.Logger, startingSequence common.SequenceNumber) *replicaConnection {

conn := &replicaConnection{
call: stream,
msgsCh: extCache.msgsCh,
initialCredits: extCache.initialCredits,
cancel: cancel,
shutdownWG: extCache.shutdownWG,
closeChannel: make(chan struct{}),
readMsgsCh: make(chan int32, replicaConnectionCreditsChBuffer),
Expand Down Expand Up @@ -143,20 +139,6 @@ func (conn *replicaConnection) close(err error) {
conn.lk.Unlock()
}

func (conn *replicaConnection) sendCreditsToStore(credits int32) error {
cFlow := cherami.NewControlFlow()
cFlow.Credits = common.Int32Ptr(credits)
//conn.logger.WithField(`Credits`,cFlow.GetCredits()).Debug(`Sending credits!!`)
err := conn.call.Write(cFlow)
if err == nil {
err = conn.call.Flush()
}

conn.sentCreds += int64(credits)

return err
}

// updateSuccessfulSendToMsgCh updates the local counter and records metric as well
func (conn *replicaConnection) updateSuccessfulSendToMsgsCh(localMsgs *int32, length int64) {
*localMsgs++
Expand Down Expand Up @@ -335,32 +317,39 @@ func (conn *replicaConnection) readMessagesPump() {
}
}

func (conn *replicaConnection) utilSendCredits(credits int32, numMsgsRead *int32, totalCreditsSent *int32) {
// conn.logger.WithField(`credits`, credits).Debug(`Sending credits to store.`)
if err := conn.sendCreditsToStore(credits); err != nil {
conn.logger.WithField(common.TagErr, err).Error(`error writing credits to store`)

go conn.close(nil)
}
*numMsgsRead = *numMsgsRead - credits
*totalCreditsSent = *totalCreditsSent + credits
}

func (conn *replicaConnection) writeCreditsPump() {
defer conn.waitWG.Done()
defer conn.call.Done()
if conn.cancel != nil {
defer conn.cancel()

var localCredits int32 // credits accumulated
var creditRefresh time.Time

sendCreditsToStore := func(credits int32) (err error) {

//conn.logger.WithField(`Credits`,cFlow.GetCredits()).Debug(`Sending credits!!`)

cFlow := cherami.NewControlFlow()
cFlow.Credits = common.Int32Ptr(credits)
if err = conn.call.Write(cFlow); err != nil {
conn.logger.WithField(common.TagErr, err).Error(`error writing credits to store`)
return
}

err = conn.call.Flush()
conn.sentCreds += credits
localCredits -= credits
creditRefresh = time.Now()
return
}

totalCreditsSent := conn.initialCredits
localCredits = conn.initialCredits

// send initial credits to the store.. so that we can
// actually start reading
// send initial credits to the store.. so that we can actually start reading
conn.logger.WithField(`initialCredits`, conn.initialCredits).Info(`writeCreditsPump: sending initial credits to store`)
if err := conn.sendCreditsToStore(conn.initialCredits); err != nil {
conn.logger.WithField(common.TagErr, err).Error(`error writing initial credits to store`)
if err := sendCreditsToStore(conn.initialCredits); err != nil {
conn.logger.WithField(common.TagErr, err).Error(`writeCreditsPump: error writing initial credits to store`)

// if sending of initial credits failed, then close this connection
go conn.close(nil)
return
}
Expand All @@ -372,42 +361,47 @@ func (conn *replicaConnection) writeCreditsPump() {
creditBatchSize = minCreditBatchSize
}

var numMsgsRead int32
creditRequestTicker := time.NewTicker(creditRequestTimeout)
defer creditRequestTicker.Stop()

// Start the write pump
for {
// listen for credits only if we satisfy the batch size
if numMsgsRead > creditBatchSize {
if localCredits > creditBatchSize {
select {
case credits := <-conn.creditNotifyCh: // this is the common credit channel from redelivery cache
conn.utilSendCredits(credits, &numMsgsRead, &totalCreditsSent)
if err := sendCreditsToStore(credits); err != nil {
return
}
case credits := <-conn.localCreditCh: // this is the local credits channel only for this connection
conn.utilSendCredits(credits, &numMsgsRead, &totalCreditsSent)
if err := sendCreditsToStore(credits); err != nil {
return
}
case msgsRead := <-conn.readMsgsCh:
numMsgsRead += msgsRead
localCredits += msgsRead
case <-creditRequestTicker.C:
// if we didn't get any credits for a long time and we are starving for
// credits, then request some credits
if numMsgsRead >= totalCreditsSent {
conn.logger.Warn("WritecreditsPump starving for credits: requesting more")
if time.Since(creditRefresh) > creditRequestTimeout {
conn.logger.Warn("writeCreditsPump: starving for credits, requesting more")
conn.extCache.requestCredits()
}
case <-conn.closeChannel:
conn.logger.Info("WriteCreditsPump closing due to connection closed.")
conn.logger.Info("writeCreditsPump: connection closed")
return
}
} else {
select {
case msgsRead := <-conn.readMsgsCh:
numMsgsRead += msgsRead
localCredits += msgsRead
// we should listen on the local credits ch, just in case we requested for some credits
// above and we are just getting it now
case credits := <-conn.localCreditCh:
conn.utilSendCredits(credits, &numMsgsRead, &totalCreditsSent)
if err := sendCreditsToStore(credits); err != nil {
return
}
case <-conn.closeChannel:
conn.logger.Info("WriteCreditsPump closing due to connection closed.")
conn.logger.Info("writeCreditsPump: connection closed")
return
}
}
Expand Down