Skip to content
This repository has been archived by the owner on Dec 3, 2019. It is now read-only.

Graceful drain on client #16

Merged
merged 4 commits into from
Mar 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 66 additions & 16 deletions client/cherami/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import (

"time"

"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/cherami-client-go/common"
"github.com/uber/cherami-client-go/common/metrics"
"github.com/uber/cherami-client-go/stream"
"github.com/uber/cherami-thrift/.generated/go/cherami"
)

type (
Expand All @@ -56,9 +56,10 @@ type (
logger bark.Logger
reporter metrics.Reporter

lk sync.Mutex
opened int32
closed int32
lk sync.Mutex
opened int32
closed int32
drained int32
}

// This struct is created by writePump after writing message to stream.
Expand Down Expand Up @@ -140,17 +141,42 @@ func (conn *connection) open() error {
return nil
}

func (conn *connection) close() {
func (conn *connection) isShuttingDown() bool {
select {
case <-conn.shuttingDownCh:
// already shutdown
return true
default:
}

return false
}

// stopWritePump should drain the pump after getting the lock
func (conn *connection) stopWritePump() {
conn.lk.Lock()
defer conn.lk.Unlock()
conn.stopWritePumpWithLock()
conn.lk.Unlock()
}

if atomic.LoadInt32(&conn.closed) == 0 {
// First shutdown the write pump to make sure we don't leave any message without ack
// Note: this needs to be called with the conn lock held!
func (conn *connection) stopWritePumpWithLock() {
if !conn.isShuttingDown() {
close(conn.shuttingDownCh)
if ok := common.AwaitWaitGroup(&conn.writeMsgPumpWG, defaultWGTimeout); !ok {
conn.logger.Warn("writeMsgPumpWG timed out")
}
conn.logger.Info("stopped write pump")
atomic.StoreInt32(&conn.drained, 1)
}
}
func (conn *connection) close() {
conn.lk.Lock()
defer conn.lk.Unlock()

if atomic.LoadInt32(&conn.closed) == 0 {
// First shutdown the write pump to make sure we don't leave any message without ack
conn.stopWritePumpWithLock()
// Now shutdown the read pump and drain all inflight messages
close(conn.closeCh)
if ok := common.AwaitWaitGroup(&conn.readAckPumpWG, defaultWGTimeout); !ok {
Expand Down Expand Up @@ -216,6 +242,14 @@ func (conn *connection) writeMessagesPump() {
}
}

func (conn *connection) handleReconfigCmd(reconfigInfo *cherami.ReconfigureInfo) {
select {
case conn.reconfigureCh <- reconfigureInfo{eventType: reconfigureCmdReconfigureType, reconfigureID: reconfigInfo.GetUpdateUUID()}:
default:
conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Warn("Reconfigure channel is full. Drop reconfigure command.")
}
}

func (conn *connection) readAcksPump() {
defer conn.readAckPumpWG.Done()

Expand Down Expand Up @@ -283,14 +317,22 @@ func (conn *connection) readAcksPump() {
}
} else if cmd.GetType() == cherami.InputHostCommandType_RECONFIGURE {
conn.reporter.IncCounter(metrics.PublishReconfigureRate, nil, 1)
reconfigInfo := cmd.Reconfigure
conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Info("Reconfigure command received from InputHost.")
select {
case conn.reconfigureCh <- reconfigureInfo{eventType: reconfigureCmdReconfigureType, reconfigureID: reconfigInfo.GetUpdateUUID()}:
default:
conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Warn("Reconfigure channel is full. Drop reconfigure command.")
}
rInfo := cmd.Reconfigure
conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(rInfo.GetUpdateUUID())).Info("Reconfigure command received from InputHost.")
conn.handleReconfigCmd(rInfo)
} else if cmd.GetType() == cherami.InputHostCommandType_DRAIN {
// drain all inflight messages
// reconfigure to pick up new extents if any
conn.reporter.IncCounter(metrics.PublishDrainRate, nil, 1)
// start draining by just stopping the write pump.
// this makes sure, we don't send any new messages.
// the read pump will exit when the server completes the drain
go conn.stopWritePump()
rInfo := cmd.Reconfigure
conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(rInfo.GetUpdateUUID())).Info("Drain command received from InputHost.")
conn.handleReconfigCmd(rInfo)
}

}
}
}
Expand All @@ -300,8 +342,16 @@ func (conn *connection) isOpened() bool {
return atomic.LoadInt32(&conn.opened) != 0
}

// isClosed should return true if either closed it set
// or if "drain" is set, which means the connection has already
// stopped the write pump.
// This is needed to make sure if reconfigure gives the same
// inputhost, we should open up a new connection object (most
// likely for a new extent). In the worst case, if for some reason
// the controller returns the same old extent which is draining,
// the server will anyway reject the connection
func (conn *connection) isClosed() bool {
return atomic.LoadInt32(&conn.closed) != 0
return (atomic.LoadInt32(&conn.closed) != 0 || atomic.LoadInt32(&conn.drained) != 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need the additional atomic var ? Can you not do isShuttingDown() instead of Load(&conn.drained) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only reason i added this is to avoid taking the lock in isClosed(). If i use isShuttingDown() i need to take the lock to prevent race..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

channels are thread safe.. not sure about the race.. Will leave it up to you resolve it.

}

func (e *ackChannelClosedError) Error() string {
Expand Down
43 changes: 43 additions & 0 deletions client/cherami/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ package cherami
import (
"errors"
"io"
"sync/atomic"
"testing"
"time"

_ "fmt"
_ "strconv"

"github.com/pborman/uuid"
"github.com/uber/cherami-client-go/common"
"github.com/uber/cherami-client-go/common/metrics"
mc "github.com/uber/cherami-client-go/mocks/clients/cherami"
Expand Down Expand Up @@ -195,6 +197,39 @@ func (s *ConnectionSuite) TestAckClosedByInputHost() {
//inputHostClient.AssertExpectations(s.T())
}

func (s *ConnectionSuite) TestClientDrain() {
conn, inputHostClient, messagesCh := createConnection()

appendTicker := time.NewTicker(5 * time.Millisecond)
defer appendTicker.Stop()

// setup inputhost to send a DRAIN command and then an EOF
inputHostClient.On("Write", mock.Anything).Return(nil)
inputHostClient.On("Flush").Return(nil)
inputHostClient.On("Read").Return(wrapDrainInCommand(&cherami.ReconfigureInfo{
UpdateUUID: common.StringPtr(uuid.New()),
}), nil).WaitUntil(appendTicker.C).Once()
inputHostClient.On("Read").Return(nil, io.EOF).Once()
inputHostClient.On("Done").Return(nil)

conn.open()
s.True(conn.isOpened(), "Connection not opened.")

message := &cherami.PutMessage{
ID: common.StringPtr("1"),
Data: []byte("test"),
}

requestDone := make(chan *PublisherReceipt, 1)

messagesCh <- putMessageRequest{message, requestDone}
<-time.After(10 * time.Millisecond)
// drain must be set
s.Equal(int32(1), atomic.LoadInt32(&conn.drained))
// closed must return true as well
s.True(conn.isClosed())
}

func (s *ConnectionSuite) TestClientClosed() {
conn, inputHostClient, messagesCh := createConnection()

Expand Down Expand Up @@ -359,3 +394,11 @@ func wrapAckInCommand(ack *cherami.PutMessageAck) *cherami.InputHostCommand {

return cmd
}

func wrapDrainInCommand(reconfigure *cherami.ReconfigureInfo) *cherami.InputHostCommand {
cmd := cherami.NewInputHostCommand()
cmd.Type = common.CheramiInputHostCommandTypePtr(cherami.InputHostCommandType_DRAIN)
cmd.Reconfigure = reconfigure

return cmd
}
2 changes: 2 additions & 0 deletions common/metrics/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
PublishAckRate = "cherami.publish.ack.rate"
// PublishReconfigureRate is the rate of reconfiguration happening
PublishReconfigureRate = "cherami.publish.reconfigure.rate"
// PublishDrainRate is the rate of drain happening
PublishDrainRate = "cherami.publish.drain.rate"
// PublishNumConnections is the number of connections with input
PublishNumConnections = "cherami.publish.connections"
// PublishNumInflightMessagess is the number of inflight messages hold locally by publisher
Expand Down
16 changes: 8 additions & 8 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.