Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Outputhost replica stream: write-pump error should not close read-pump #190

Merged
merged 2 commits into from
May 5, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions services/outputhost/extcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import (
"sync"
"time"

"golang.org/x/net/context"

"github.com/uber-common/bark"
"github.com/uber/tchannel-go/thrift"

Expand Down Expand Up @@ -202,7 +200,6 @@ func (extCache *extentCache) load(

func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence common.SequenceNumber, startIndex int) (repl *replicaConnection, pickedIndex int, err error) {
var call serverStream.BStoreOpenReadStreamOutCall
var cancel context.CancelFunc
extUUID := extCache.extUUID

startIndex--
Expand Down Expand Up @@ -309,7 +306,6 @@ func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence
logger.WithField(common.TagErr, err).Error(`outputhost: Websocket dial store replica: failed`)
return
}
cancel = nil

// successfully opened read stream on the replica; save this index
if startSequence != 0 {
Expand All @@ -319,7 +315,7 @@ func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence
logger.WithField(`startIndex`, startIndex).Debug(`opened read stream`)
pickedIndex = startIndex
replicaConnectionName := fmt.Sprintf(`replicaConnection{Extent: %s, Store: %s}`, extUUID, storeUUID)
repl = newReplicaConnection(call, extCache, cancel, replicaConnectionName, logger, startSequence)
repl = newReplicaConnection(call, extCache, replicaConnectionName, logger, startSequence)
// all open connections should be closed before shutdown
extCache.shutdownWG.Add(1)
repl.open()
Expand Down Expand Up @@ -393,7 +389,7 @@ func (extCache *extentCache) loadKafkaStream(

// Setup the replicaConnection
replicaConnectionName := fmt.Sprintf(`replicaConnection{Extent: %s, kafkaCluster: %s}`, extCache.extUUID, kafkaCluster)
repl = newReplicaConnection(call, extCache, nil, replicaConnectionName, extCache.logger, 0)
repl = newReplicaConnection(call, extCache, replicaConnectionName, extCache.logger, 0)
extCache.shutdownWG.Add(1)
repl.open()
return
Expand Down
85 changes: 38 additions & 47 deletions services/outputhost/replicaconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"sync"
"time"

"golang.org/x/net/context"

"github.com/uber-common/bark"

"github.com/uber/cherami-server/common"
Expand Down Expand Up @@ -53,7 +51,6 @@ type (
call storeStream.BStoreOpenReadStreamOutCall
msgsCh chan<- *cherami.ConsumerMessage
initialCredits int32
cancel context.CancelFunc
closeChannel chan struct{}
connectionsClosedCh chan<- error
readMsgsCh chan int32
Expand All @@ -74,22 +71,21 @@ type (
// one message at a time, and read on close after the pumps
// are done; therefore, these do not require to be protected
// for concurrent access
sentCreds int64 // total credits sent
recvMsgs int64 // total messages received
sentCreds int32 // total credits sent
recvMsgs int32 // total messages received

// consumerM3Client for metrics per consumer group
consumerM3Client metrics.Client
}
)

func newReplicaConnection(stream storeStream.BStoreOpenReadStreamOutCall, extCache *extentCache, cancel context.CancelFunc,
func newReplicaConnection(stream storeStream.BStoreOpenReadStreamOutCall, extCache *extentCache,
replicaConnectionName string, logger bark.Logger, startingSequence common.SequenceNumber) *replicaConnection {

conn := &replicaConnection{
call: stream,
msgsCh: extCache.msgsCh,
initialCredits: extCache.initialCredits,
cancel: cancel,
shutdownWG: extCache.shutdownWG,
closeChannel: make(chan struct{}),
readMsgsCh: make(chan int32, replicaConnectionCreditsChBuffer),
Expand Down Expand Up @@ -143,20 +139,6 @@ func (conn *replicaConnection) close(err error) {
conn.lk.Unlock()
}

func (conn *replicaConnection) sendCreditsToStore(credits int32) error {
cFlow := cherami.NewControlFlow()
cFlow.Credits = common.Int32Ptr(credits)
//conn.logger.WithField(`Credits`,cFlow.GetCredits()).Debug(`Sending credits!!`)
err := conn.call.Write(cFlow)
if err == nil {
err = conn.call.Flush()
}

conn.sentCreds += int64(credits)

return err
}

// updateSuccessfulSendToMsgCh updates the local counter and records metric as well
func (conn *replicaConnection) updateSuccessfulSendToMsgsCh(localMsgs *int32, length int64) {
*localMsgs++
Expand Down Expand Up @@ -335,32 +317,32 @@ func (conn *replicaConnection) readMessagesPump() {
}
}

func (conn *replicaConnection) utilSendCredits(credits int32, numMsgsRead *int32, totalCreditsSent *int32) {
// conn.logger.WithField(`credits`, credits).Debug(`Sending credits to store.`)
if err := conn.sendCreditsToStore(credits); err != nil {
conn.logger.WithField(common.TagErr, err).Error(`error writing credits to store`)

go conn.close(nil)
}
*numMsgsRead = *numMsgsRead - credits
*totalCreditsSent = *totalCreditsSent + credits
}

func (conn *replicaConnection) writeCreditsPump() {
defer conn.waitWG.Done()
defer conn.call.Done()
if conn.cancel != nil {
defer conn.cancel()
}

totalCreditsSent := conn.initialCredits
sendCreditsToStore := func(credits int32) (err error) {

//conn.logger.WithField(`Credits`,cFlow.GetCredits()).Debug(`Sending credits!!`)

cFlow := cherami.NewControlFlow()
cFlow.Credits = common.Int32Ptr(credits)
if err = conn.call.Write(cFlow); err != nil {
conn.logger.WithField(common.TagErr, err).Error(`error writing credits to store`)
return
}

// send initial credits to the store.. so that we can
// actually start reading
err = conn.call.Flush()
conn.sentCreds += credits
return
}

// send initial credits to the store.. so that we can actually start reading
conn.logger.WithField(`initialCredits`, conn.initialCredits).Info(`writeCreditsPump: sending initial credits to store`)
if err := conn.sendCreditsToStore(conn.initialCredits); err != nil {
conn.logger.WithField(common.TagErr, err).Error(`error writing initial credits to store`)
if err := sendCreditsToStore(conn.initialCredits); err != nil {
conn.logger.WithField(common.TagErr, err).Error(`writeCreditsPump: error writing initial credits to store`)

// if sending of initial credits failed, then close this connection
go conn.close(nil)
return
}
Expand All @@ -382,20 +364,26 @@ func (conn *replicaConnection) writeCreditsPump() {
if numMsgsRead > creditBatchSize {
select {
case credits := <-conn.creditNotifyCh: // this is the common credit channel from redelivery cache
conn.utilSendCredits(credits, &numMsgsRead, &totalCreditsSent)
if err := sendCreditsToStore(credits); err != nil {
return
}
numMsgsRead -= credits
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this within the sendCreditsToStore() so that we don't need to decrement this every single time. You can rename this numMsgsRead as well..

case credits := <-conn.localCreditCh: // this is the local credits channel only for this connection
conn.utilSendCredits(credits, &numMsgsRead, &totalCreditsSent)
if err := sendCreditsToStore(credits); err != nil {
return
}
numMsgsRead -= credits
case msgsRead := <-conn.readMsgsCh:
numMsgsRead += msgsRead
case <-creditRequestTicker.C:
// if we didn't get any credits for a long time and we are starving for
// credits, then request some credits
if numMsgsRead >= totalCreditsSent {
conn.logger.Warn("WritecreditsPump starving for credits: requesting more")
if numMsgsRead >= conn.sentCreds {
conn.logger.Warn("writeCreditsPump: starving for credits, requesting more")
conn.extCache.requestCredits()
}
case <-conn.closeChannel:
conn.logger.Info("WriteCreditsPump closing due to connection closed.")
conn.logger.Info("writeCreditsPump: connection closed")
return
}
} else {
Expand All @@ -405,9 +393,12 @@ func (conn *replicaConnection) writeCreditsPump() {
// we should listen on the local credits ch, just in case we requested for some credits
// above and we are just getting it now
case credits := <-conn.localCreditCh:
conn.utilSendCredits(credits, &numMsgsRead, &totalCreditsSent)
if err := sendCreditsToStore(credits); err != nil {
return
}
numMsgsRead -= credits
case <-conn.closeChannel:
conn.logger.Info("WriteCreditsPump closing due to connection closed.")
conn.logger.Info("writeCreditsPump: connection closed")
return
}
}
Expand Down