From 19c64006ab4f4435617d156d5f0ccf1438c8c6de Mon Sep 17 00:00:00 2001 From: Aravind Srinivasan Date: Wed, 29 Mar 2017 13:28:09 -0700 Subject: [PATCH 1/4] Add support for DRAIN command on the client This patch makes sure the client can handle the DRAIN command from the inputhost. As soon as it receives the command, it will stop it's write pump and will mark itself as "closed" so that if we get the same inputhost as a result of reconfiguration, we open a new connection object. Add a unit test to test the same. --- client/cherami/connection.go | 70 +++++++++++++++++++++++++++---- client/cherami/connection_test.go | 43 +++++++++++++++++++ common/metrics/names.go | 2 + glide.lock | 18 ++++---- glide.yaml | 1 + 5 files changed, 116 insertions(+), 18 deletions(-) diff --git a/client/cherami/connection.go b/client/cherami/connection.go index df0c295..35144e8 100644 --- a/client/cherami/connection.go +++ b/client/cherami/connection.go @@ -31,10 +31,10 @@ import ( "time" - "github.com/uber/cherami-thrift/.generated/go/cherami" "github.com/uber/cherami-client-go/common" "github.com/uber/cherami-client-go/common/metrics" "github.com/uber/cherami-client-go/stream" + "github.com/uber/cherami-thrift/.generated/go/cherami" ) type ( @@ -56,9 +56,10 @@ type ( logger bark.Logger reporter metrics.Reporter - lk sync.Mutex - opened int32 - closed int32 + lk sync.Mutex + opened int32 + closed int32 + drained int32 } // This struct is created by writePump after writing message to stream. @@ -140,17 +141,42 @@ func (conn *connection) open() error { return nil } -func (conn *connection) close() { +func (conn *connection) isStopped() bool { + select { + case <-conn.shuttingDownCh: + // already shutdown + return true + default: + } + + return false +} + +// stopWritePump should drain the pump after getting the lock +func (conn *connection) stopWritePump() { conn.lk.Lock() - defer conn.lk.Unlock() + conn.stopWritePumpWithLock() + conn.lk.Unlock() +} - if atomic.LoadInt32(&conn.closed) == 0 { - // First shutdown the write pump to make sure we don't leave any message without ack +// Note: this needs to be called with the conn lock held! +func (conn *connection) stopWritePumpWithLock() { + if !conn.isStopped() { close(conn.shuttingDownCh) if ok := common.AwaitWaitGroup(&conn.writeMsgPumpWG, defaultWGTimeout); !ok { conn.logger.Warn("writeMsgPumpWG timed out") } + conn.logger.Info("stopped write pump") + atomic.StoreInt32(&conn.drained, 1) + } +} +func (conn *connection) close() { + conn.lk.Lock() + defer conn.lk.Unlock() + if atomic.LoadInt32(&conn.closed) == 0 { + // First shutdown the write pump to make sure we don't leave any message without ack + conn.stopWritePumpWithLock() // Now shutdown the read pump and drain all inflight messages close(conn.closeCh) if ok := common.AwaitWaitGroup(&conn.readAckPumpWG, defaultWGTimeout); !ok { @@ -290,7 +316,25 @@ func (conn *connection) readAcksPump() { default: conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Warn("Reconfigure channel is full. Drop reconfigure command.") } + } else if cmd.GetType() == cherami.InputHostCommandType_DRAIN { + // drain all inflight messages + // reconfigure to pick up new extents if any + conn.reporter.IncCounter(metrics.PublishDrainRate, nil, 1) + // start draining by just stopping the write pump. + // this makes sure, we don't send any new messages. + // the read pump will exit when the server completes the drain + go conn.stopWritePump() + + reconfigInfo := cmd.Reconfigure + conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Info("Drain command received from InputHost.") + // reconfigure to pick up new extents + select { + case conn.reconfigureCh <- reconfigureInfo{eventType: reconfigureCmdReconfigureType, reconfigureID: reconfigInfo.GetUpdateUUID()}: + default: + conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Warn("Reconfigure channel is full. Drop reconfigure command.") + } } + } } } @@ -300,8 +344,16 @@ func (conn *connection) isOpened() bool { return atomic.LoadInt32(&conn.opened) != 0 } +// isClosed should return true if either closed it set +// or if "drain" is set, which means the connection has already +// stopped the write pump. +// This is needed to make sure if reconfigure gives the same +// inputhost, we should open up a new connection object (most +// likely for a new extent). In the worst case, if for some reason +// the controller returns the same old extent which is draining, +// the server will anyway reject the connection func (conn *connection) isClosed() bool { - return atomic.LoadInt32(&conn.closed) != 0 + return (atomic.LoadInt32(&conn.closed) != 0 || atomic.LoadInt32(&conn.drained) != 0) } func (e *ackChannelClosedError) Error() string { diff --git a/client/cherami/connection_test.go b/client/cherami/connection_test.go index 9179287..43b2415 100644 --- a/client/cherami/connection_test.go +++ b/client/cherami/connection_test.go @@ -23,12 +23,14 @@ package cherami import ( "errors" "io" + "sync/atomic" "testing" "time" _ "fmt" _ "strconv" + "github.com/pborman/uuid" "github.com/uber/cherami-client-go/common" "github.com/uber/cherami-client-go/common/metrics" mc "github.com/uber/cherami-client-go/mocks/clients/cherami" @@ -195,6 +197,39 @@ func (s *ConnectionSuite) TestAckClosedByInputHost() { //inputHostClient.AssertExpectations(s.T()) } +func (s *ConnectionSuite) TestClientDrain() { + conn, inputHostClient, messagesCh := createConnection() + + appendTicker := time.NewTicker(5 * time.Millisecond) + defer appendTicker.Stop() + + // setup inputhost to send a DRAIN command and then an EOF + inputHostClient.On("Write", mock.Anything).Return(nil) + inputHostClient.On("Flush").Return(nil) + inputHostClient.On("Read").Return(wrapDrainInCommand(&cherami.ReconfigureInfo{ + UpdateUUID: common.StringPtr(uuid.New()), + }), nil).WaitUntil(appendTicker.C).Once() + inputHostClient.On("Read").Return(nil, io.EOF).Once() + inputHostClient.On("Done").Return(nil) + + conn.open() + s.True(conn.isOpened(), "Connection not opened.") + + message := &cherami.PutMessage{ + ID: common.StringPtr("1"), + Data: []byte("test"), + } + + requestDone := make(chan *PublisherReceipt, 1) + + messagesCh <- putMessageRequest{message, requestDone} + <-time.After(6 * time.Millisecond) + // drain must be set + s.Equal(int32(1), atomic.LoadInt32(&conn.drained)) + // closed must return true as well + s.True(conn.isClosed()) +} + func (s *ConnectionSuite) TestClientClosed() { conn, inputHostClient, messagesCh := createConnection() @@ -359,3 +394,11 @@ func wrapAckInCommand(ack *cherami.PutMessageAck) *cherami.InputHostCommand { return cmd } + +func wrapDrainInCommand(reconfigure *cherami.ReconfigureInfo) *cherami.InputHostCommand { + cmd := cherami.NewInputHostCommand() + cmd.Type = common.CheramiInputHostCommandTypePtr(cherami.InputHostCommandType_DRAIN) + cmd.Reconfigure = reconfigure + + return cmd +} diff --git a/common/metrics/names.go b/common/metrics/names.go index 19286ab..6ac7916 100644 --- a/common/metrics/names.go +++ b/common/metrics/names.go @@ -49,6 +49,8 @@ const ( PublishAckRate = "cherami.publish.ack.rate" // PublishReconfigureRate is the rate of reconfiguration happening PublishReconfigureRate = "cherami.publish.reconfigure.rate" + // PublishDrainRate is the rate of drain happening + PublishDrainRate = "cherami.publish.drain.rate" // PublishNumConnections is the number of connections with input PublishNumConnections = "cherami.publish.connections" // PublishNumInflightMessagess is the number of inflight messages hold locally by publisher diff --git a/glide.lock b/glide.lock index c37638d..c317946 100644 --- a/glide.lock +++ b/glide.lock @@ -1,8 +1,8 @@ -hash: 57d13a71d768c6f1054eaf65aeedd138d1f50b98b3a4b9722bdee5acf5839f07 -updated: 2017-02-20T14:20:30.465282917-08:00 +hash: 44ce44fd568ec4273e53f7064b68ae4164cb1d6f75703301f1d986fb6769f1c4 +updated: 2017-03-28T09:22:53.814521025-07:00 imports: - name: github.com/apache/thrift - version: 2d6060d882069ed3e3d6302aa63ea7eb4bb155ad + version: b2a4d4ae21c789b689dd162deb819665567f481c subpackages: - lib/go/thrift - name: github.com/cactus/go-statsd-client @@ -21,13 +21,13 @@ imports: - ext - log - name: github.com/pborman/uuid - version: 1b00554d822231195d1babd97ff4a781231955c9 + version: a97ce2ca70fa5a848076093f05e639a89ca34d06 - name: github.com/pmezard/go-difflib version: 792786c7400a136282c1664665ae0a8db921c6c2 subpackages: - difflib - name: github.com/Sirupsen/logrus - version: 61e43dc76f7ee59a82bdf3d71033dc12bea4c77d + version: ba1b36c82c5e05c4f912a88eab0dcd91a171688f - name: github.com/stretchr/objx version: 1a9d0bb9f541897e62256577b352fdbc1fb4fd94 - name: github.com/stretchr/testify @@ -42,11 +42,11 @@ imports: - name: github.com/uber-go/atomic version: 3b8db5e93c4c02efbc313e17b2e796b0914a01fb - name: github.com/uber/cherami-thrift - version: 2e84419711eb57a3be7412c18de80f9fe7a0cc60 + version: 0ede83064ff6495044ab7901d0d215d713d76940 subpackages: - .generated/go/cherami - name: github.com/uber/tchannel-go - version: 79387824978f91318be3bfb43ae12e04c38cfe97 + version: 0b7f160817553b0bacb5b108dd84a5022dbdd1c4 subpackages: - hyperbahn - hyperbahn/gen-go/hyperbahn @@ -59,11 +59,11 @@ imports: - trand - typed - name: golang.org/x/net - version: 6b27048ae5e6ad1ef927e72e437531493de612fe + version: a6577fac2d73be281a500b310739095313165611 subpackages: - context - name: golang.org/x/sys - version: d75a52659825e75fff6158388dddc6a5b04f9ba5 + version: d4feaf1a7e61e1d9e79e6c4e76c6349e9cab0a03 subpackages: - unix testImports: [] diff --git a/glide.yaml b/glide.yaml index e513282..39d7590 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,6 +1,7 @@ package: github.com/uber/cherami-client-go import: - package: github.com/uber/cherami-thrift + version: graceful_drain subpackages: - .generated/go/cherami - package: github.com/Sirupsen/logrus From 16fed98a2d81e61565d4b37071bc0f84cd752986 Mon Sep 17 00:00:00 2001 From: Aravind Srinivasan Date: Wed, 29 Mar 2017 14:40:28 -0700 Subject: [PATCH 2/4] Give enough time for the drain to finish --- client/cherami/connection_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/cherami/connection_test.go b/client/cherami/connection_test.go index 43b2415..271985e 100644 --- a/client/cherami/connection_test.go +++ b/client/cherami/connection_test.go @@ -223,7 +223,7 @@ func (s *ConnectionSuite) TestClientDrain() { requestDone := make(chan *PublisherReceipt, 1) messagesCh <- putMessageRequest{message, requestDone} - <-time.After(6 * time.Millisecond) + <-time.After(10 * time.Millisecond) // drain must be set s.Equal(int32(1), atomic.LoadInt32(&conn.drained)) // closed must return true as well From 6bec9ac255257686faad840616d99de30b749116 Mon Sep 17 00:00:00 2001 From: Aravind Srinivasan Date: Thu, 30 Mar 2017 13:10:53 -0700 Subject: [PATCH 3/4] Update glide --- glide.lock | 6 +++--- glide.yaml | 1 - 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/glide.lock b/glide.lock index c317946..9f68efc 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 44ce44fd568ec4273e53f7064b68ae4164cb1d6f75703301f1d986fb6769f1c4 -updated: 2017-03-28T09:22:53.814521025-07:00 +hash: 57d13a71d768c6f1054eaf65aeedd138d1f50b98b3a4b9722bdee5acf5839f07 +updated: 2017-03-30T13:09:37.238077112-07:00 imports: - name: github.com/apache/thrift version: b2a4d4ae21c789b689dd162deb819665567f481c @@ -42,7 +42,7 @@ imports: - name: github.com/uber-go/atomic version: 3b8db5e93c4c02efbc313e17b2e796b0914a01fb - name: github.com/uber/cherami-thrift - version: 0ede83064ff6495044ab7901d0d215d713d76940 + version: 8cf6af068f1f1afd930c6a43ecdd9d0dca2289d5 subpackages: - .generated/go/cherami - name: github.com/uber/tchannel-go diff --git a/glide.yaml b/glide.yaml index 39d7590..e513282 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,7 +1,6 @@ package: github.com/uber/cherami-client-go import: - package: github.com/uber/cherami-thrift - version: graceful_drain subpackages: - .generated/go/cherami - package: github.com/Sirupsen/logrus From 1ede1e54487714d5b9c040d7f4c70fbf9f04abcd Mon Sep 17 00:00:00 2001 From: Aravind Srinivasan Date: Fri, 31 Mar 2017 14:42:05 -0700 Subject: [PATCH 4/4] CR comments * s/isStopped/isShuttingDown * move handle reconfig into its own utility --- client/cherami/connection.go | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/client/cherami/connection.go b/client/cherami/connection.go index 35144e8..c929c67 100644 --- a/client/cherami/connection.go +++ b/client/cherami/connection.go @@ -141,7 +141,7 @@ func (conn *connection) open() error { return nil } -func (conn *connection) isStopped() bool { +func (conn *connection) isShuttingDown() bool { select { case <-conn.shuttingDownCh: // already shutdown @@ -161,7 +161,7 @@ func (conn *connection) stopWritePump() { // Note: this needs to be called with the conn lock held! func (conn *connection) stopWritePumpWithLock() { - if !conn.isStopped() { + if !conn.isShuttingDown() { close(conn.shuttingDownCh) if ok := common.AwaitWaitGroup(&conn.writeMsgPumpWG, defaultWGTimeout); !ok { conn.logger.Warn("writeMsgPumpWG timed out") @@ -242,6 +242,14 @@ func (conn *connection) writeMessagesPump() { } } +func (conn *connection) handleReconfigCmd(reconfigInfo *cherami.ReconfigureInfo) { + select { + case conn.reconfigureCh <- reconfigureInfo{eventType: reconfigureCmdReconfigureType, reconfigureID: reconfigInfo.GetUpdateUUID()}: + default: + conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Warn("Reconfigure channel is full. Drop reconfigure command.") + } +} + func (conn *connection) readAcksPump() { defer conn.readAckPumpWG.Done() @@ -309,13 +317,9 @@ func (conn *connection) readAcksPump() { } } else if cmd.GetType() == cherami.InputHostCommandType_RECONFIGURE { conn.reporter.IncCounter(metrics.PublishReconfigureRate, nil, 1) - reconfigInfo := cmd.Reconfigure - conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Info("Reconfigure command received from InputHost.") - select { - case conn.reconfigureCh <- reconfigureInfo{eventType: reconfigureCmdReconfigureType, reconfigureID: reconfigInfo.GetUpdateUUID()}: - default: - conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Warn("Reconfigure channel is full. Drop reconfigure command.") - } + rInfo := cmd.Reconfigure + conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(rInfo.GetUpdateUUID())).Info("Reconfigure command received from InputHost.") + conn.handleReconfigCmd(rInfo) } else if cmd.GetType() == cherami.InputHostCommandType_DRAIN { // drain all inflight messages // reconfigure to pick up new extents if any @@ -324,15 +328,9 @@ func (conn *connection) readAcksPump() { // this makes sure, we don't send any new messages. // the read pump will exit when the server completes the drain go conn.stopWritePump() - - reconfigInfo := cmd.Reconfigure - conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Info("Drain command received from InputHost.") - // reconfigure to pick up new extents - select { - case conn.reconfigureCh <- reconfigureInfo{eventType: reconfigureCmdReconfigureType, reconfigureID: reconfigInfo.GetUpdateUUID()}: - default: - conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(reconfigInfo.GetUpdateUUID())).Warn("Reconfigure channel is full. Drop reconfigure command.") - } + rInfo := cmd.Reconfigure + conn.logger.WithField(common.TagReconfigureID, common.FmtReconfigureID(rInfo.GetUpdateUUID())).Info("Drain command received from InputHost.") + conn.handleReconfigCmd(rInfo) } }