Skip to content
This repository has been archived by the owner on Dec 3, 2019. It is now read-only.

Commit

Permalink
Graceful drain on client (#16)
Browse files Browse the repository at this point in the history
* Add support for DRAIN command on the client

This patch makes sure the client can handle the DRAIN command from
the inputhost.

As soon as it receives the command, it will stop it's write pump and
will mark itself as "closed" so that if we get the same inputhost as a
result of reconfiguration, we open a new connection object.

Add a unit test to test the same.
  • Loading branch information
aravindvs authored Mar 31, 2017
1 parent 5b9071f commit de07d3a
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 24 deletions.
82 changes: 66 additions & 16 deletions client/cherami/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import (

"time"

"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/cherami-client-go/common"
"github.com/uber/cherami-client-go/common/metrics"
"github.com/uber/cherami-client-go/stream"
"github.com/uber/cherami-thrift/.generated/go/cherami"
)

type (
Expand All @@ -56,9 +56,10 @@ type (
logger bark.Logger
reporter metrics.Reporter

lk sync.Mutex
opened int32
closed int32
lk sync.Mutex
opened int32
closed int32
drained int32
}

// This struct is created by writePump after writing message to stream.
Expand Down Expand Up @@ -140,17 +141,42 @@ func (conn *connection) open() error {
return nil
}

func (conn *connection) close() {
func (conn *connection) isShuttingDown() bool {
select {
case <-conn.shuttingDownCh:
// already shutdown
return true
default:
}

return false
}

// stopWritePump should drain the pump after getting the lock
func (conn *connection) stopWritePump() {
conn.lk.Lock()
defer conn.lk.Unlock()
conn.stopWritePumpWithLock()
conn.lk.Unlock()
}

if atomic.LoadInt32(&conn.closed) == 0 {
// First shutdown the write pump to make sure we don't leave any message without ack
// Note: this needs to be called with the conn lock held!
func (conn *connection) stopWritePumpWithLock() {
if !conn.isShuttingDown() {
close(conn.shuttingDownCh)
if ok := common.AwaitWaitGroup(&conn.writeMsgPumpWG, defaultWGTimeout); !ok {
conn.logger.Warn("writeMsgPumpWG timed out")
}
conn.logger.Info("stopped write pump")
atomic.StoreInt32(&conn.drained, 1)
}
}
func (conn *connection) close() {
conn.lk.Lock()
defer conn.lk.Unlock()

if atomic.LoadInt32(&conn.closed) == 0 {
// First shutdown the write pump to make sure we don't leave any message without ack
conn.stopWritePumpWithLock()
// Now shutdown the read pump and drain all inflight messages
close(conn.closeCh)
if ok := common.AwaitWaitGroup(&conn.readAckPumpWG, defaultWGTimeout); !ok {
Expand Down Expand Up @@ -216,6 +242,14 @@ func (conn *connection) writeMessagesPump() {
}
}

func (conn *connection) handleReconfigCmd(reconfigInfo *cherami.ReconfigureInfo) {
select {
case conn.reconfigureCh <- reconfigureInfo{eventType: reconfigureCmdReconfigureType, reconfigureID: reconfigInfo.GetUpdateUUID()}:
default:
conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Warn("Reconfigure channel is full. Drop reconfigure command.")
}
}

func (conn *connection) readAcksPump() {
defer conn.readAckPumpWG.Done()

Expand Down Expand Up @@ -283,14 +317,22 @@ func (conn *connection) readAcksPump() {
}
} else if cmd.GetType() == cherami.InputHostCommandType_RECONFIGURE {
conn.reporter.IncCounter(metrics.PublishReconfigureRate, nil, 1)
reconfigInfo := cmd.Reconfigure
conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Info("Reconfigure command received from InputHost.")
select {
case conn.reconfigureCh <- reconfigureInfo{eventType: reconfigureCmdReconfigureType, reconfigureID: reconfigInfo.GetUpdateUUID()}:
default:
conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Warn("Reconfigure channel is full. Drop reconfigure command.")
}
rInfo := cmd.Reconfigure
conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(rInfo.GetUpdateUUID())).Info("Reconfigure command received from InputHost.")
conn.handleReconfigCmd(rInfo)
} else if cmd.GetType() == cherami.InputHostCommandType_DRAIN {
// drain all inflight messages
// reconfigure to pick up new extents if any
conn.reporter.IncCounter(metrics.PublishDrainRate, nil, 1)
// start draining by just stopping the write pump.
// this makes sure, we don't send any new messages.
// the read pump will exit when the server completes the drain
go conn.stopWritePump()
rInfo := cmd.Reconfigure
conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(rInfo.GetUpdateUUID())).Info("Drain command received from InputHost.")
conn.handleReconfigCmd(rInfo)
}

}
}
}
Expand All @@ -300,8 +342,16 @@ func (conn *connection) isOpened() bool {
return atomic.LoadInt32(&conn.opened) != 0
}

// isClosed should return true if either closed it set
// or if "drain" is set, which means the connection has already
// stopped the write pump.
// This is needed to make sure if reconfigure gives the same
// inputhost, we should open up a new connection object (most
// likely for a new extent). In the worst case, if for some reason
// the controller returns the same old extent which is draining,
// the server will anyway reject the connection
func (conn *connection) isClosed() bool {
return atomic.LoadInt32(&conn.closed) != 0
return (atomic.LoadInt32(&conn.closed) != 0 || atomic.LoadInt32(&conn.drained) != 0)
}

func (e *ackChannelClosedError) Error() string {
Expand Down
43 changes: 43 additions & 0 deletions client/cherami/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ package cherami
import (
"errors"
"io"
"sync/atomic"
"testing"
"time"

_ "fmt"
_ "strconv"

"github.com/pborman/uuid"
"github.com/uber/cherami-client-go/common"
"github.com/uber/cherami-client-go/common/metrics"
mc "github.com/uber/cherami-client-go/mocks/clients/cherami"
Expand Down Expand Up @@ -195,6 +197,39 @@ func (s *ConnectionSuite) TestAckClosedByInputHost() {
//inputHostClient.AssertExpectations(s.T())
}

func (s *ConnectionSuite) TestClientDrain() {
conn, inputHostClient, messagesCh := createConnection()

appendTicker := time.NewTicker(5 * time.Millisecond)
defer appendTicker.Stop()

// setup inputhost to send a DRAIN command and then an EOF
inputHostClient.On("Write", mock.Anything).Return(nil)
inputHostClient.On("Flush").Return(nil)
inputHostClient.On("Read").Return(wrapDrainInCommand(&cherami.ReconfigureInfo{
UpdateUUID: common.StringPtr(uuid.New()),
}), nil).WaitUntil(appendTicker.C).Once()
inputHostClient.On("Read").Return(nil, io.EOF).Once()
inputHostClient.On("Done").Return(nil)

conn.open()
s.True(conn.isOpened(), "Connection not opened.")

message := &cherami.PutMessage{
ID: common.StringPtr("1"),
Data: []byte("test"),
}

requestDone := make(chan *PublisherReceipt, 1)

messagesCh <- putMessageRequest{message, requestDone}
<-time.After(10 * time.Millisecond)
// drain must be set
s.Equal(int32(1), atomic.LoadInt32(&conn.drained))
// closed must return true as well
s.True(conn.isClosed())
}

func (s *ConnectionSuite) TestClientClosed() {
conn, inputHostClient, messagesCh := createConnection()

Expand Down Expand Up @@ -359,3 +394,11 @@ func wrapAckInCommand(ack *cherami.PutMessageAck) *cherami.InputHostCommand {

return cmd
}

func wrapDrainInCommand(reconfigure *cherami.ReconfigureInfo) *cherami.InputHostCommand {
cmd := cherami.NewInputHostCommand()
cmd.Type = common.CheramiInputHostCommandTypePtr(cherami.InputHostCommandType_DRAIN)
cmd.Reconfigure = reconfigure

return cmd
}
2 changes: 2 additions & 0 deletions common/metrics/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
PublishAckRate = "cherami.publish.ack.rate"
// PublishReconfigureRate is the rate of reconfiguration happening
PublishReconfigureRate = "cherami.publish.reconfigure.rate"
// PublishDrainRate is the rate of drain happening
PublishDrainRate = "cherami.publish.drain.rate"
// PublishNumConnections is the number of connections with input
PublishNumConnections = "cherami.publish.connections"
// PublishNumInflightMessagess is the number of inflight messages hold locally by publisher
Expand Down
16 changes: 8 additions & 8 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit de07d3a

Please sign in to comment.