From d21e701577d202bb262a3b6b98ec1d263a01dd46 Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Fri, 4 Dec 2020 20:45:22 -0500 Subject: [PATCH] [cluster] Fix watch resource leak/hang (#2984) --- src/cluster/etcd/watchmanager/manager.go | 10 +- src/cluster/etcd/watchmanager/manager_test.go | 95 ++++++++++++++----- 2 files changed, 76 insertions(+), 29 deletions(-) diff --git a/src/cluster/etcd/watchmanager/manager.go b/src/cluster/etcd/watchmanager/manager.go index 518f0a2168..cef548c8fa 100644 --- a/src/cluster/etcd/watchmanager/manager.go +++ b/src/cluster/etcd/watchmanager/manager.go @@ -131,7 +131,6 @@ func (w *manager) Watch(key string) { case r, ok := <-watchChan: if !ok { // the watch chan is closed, set it to nil so it will be recreated - // this is unlikely to happen but just to be defensive cancelFn() watchChan = nil logger.Warn("etcd watch channel closed on key, recreating a watch channel") @@ -161,14 +160,17 @@ func (w *manager) Watch(key string) { if err == rpctypes.ErrCompacted { logger.Warn("recreating watch at revision", zap.Int64("revision", r.CompactRevision)) revOverride = r.CompactRevision - watchChan = nil + } else { + logger.Warn("recreating watch due to an error") } - } - if r.IsProgressNotify() { + cancelFn() + watchChan = nil + } else if r.IsProgressNotify() { // Do not call updateFn on ProgressNotify as it happens periodically with no update events continue } + if err = w.updateFn(key, r.Events); err != nil { logger.Error("received notification for key, but failed to get value", zap.Error(err)) } diff --git a/src/cluster/etcd/watchmanager/manager_test.go b/src/cluster/etcd/watchmanager/manager_test.go index 4b60bf5ace..1b8b96c70f 100644 --- a/src/cluster/etcd/watchmanager/manager_test.go +++ b/src/cluster/etcd/watchmanager/manager_test.go @@ -22,17 +22,18 @@ package watchmanager import ( "fmt" + "runtime" "sync/atomic" "testing" "time" "github.com/m3db/m3/src/cluster/mocks" - "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/integration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/integration" "golang.org/x/net/context" ) @@ -150,21 +151,32 @@ func TestWatchRecreate(t *testing.T) { } func TestWatchNoLeader(t *testing.T) { + const ( + watchInitAndRetryDelay = 200 * time.Millisecond + watchCheckInterval = 50 * time.Millisecond + ) + ecluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer ecluster.Terminate(t) - ec := ecluster.Client(0) - var ( - updateCalled int32 - shouldStop int32 + ec = ecluster.Client(0) + tickDuration = 10 * time.Millisecond + electionTimeout = time.Duration(3*ecluster.Members[0].ElectionTicks) * tickDuration + doneCh = make(chan struct{}, 1) + eventLog = []*clientv3.Event{} + updateCalled int32 + shouldStop int32 ) - doneCh := make(chan struct{}, 1) + opts := NewOptions(). SetWatcher(ec.Watcher). SetUpdateFn( - func(string, []*clientv3.Event) error { + func(_ string, e []*clientv3.Event) error { atomic.AddInt32(&updateCalled, 1) + if len(e) > 0 { + eventLog = append(eventLog, e...) + } return nil }, ). @@ -176,50 +188,83 @@ func TestWatchNoLeader(t *testing.T) { close(doneCh) - // stopped = true return true }, ). - SetWatchChanInitTimeout(200 * time.Millisecond) + SetWatchChanInitTimeout(watchInitAndRetryDelay). + SetWatchChanCheckInterval(watchCheckInterval) wh, err := NewWatchManager(opts) require.NoError(t, err) go wh.Watch("foo") - time.Sleep(2 * time.Second) + runtime.Gosched() + time.Sleep(10 * time.Millisecond) + // there should be a valid watch now, trigger a notification - _, err = ec.Put(context.Background(), "foo", "v") + _, err = ec.Put(context.Background(), "foo", "bar") require.NoError(t, err) - for { - if atomic.LoadInt32(&updateCalled) == int32(1) { + leaderIdx := ecluster.WaitLeader(t) + require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader") + + for i := 0; i < 10; i++ { + if atomic.LoadInt32(&updateCalled) == int32(3) { break } - time.Sleep(100 * time.Millisecond) + time.Sleep(10 * time.Millisecond) } + // simulate quorum loss ecluster.Members[1].Stop(t) ecluster.Members[2].Stop(t) - ecluster.Client(1).Close() - ecluster.Client(2).Close() - ecluster.TakeClient(1) - ecluster.TakeClient(2) // wait for election timeout, then member[0] will not have a leader. - tickDuration := 10 * time.Millisecond - time.Sleep(time.Duration(3*ecluster.Members[0].ElectionTicks) * tickDuration) + time.Sleep(electionTimeout) - for { - if atomic.LoadInt32(&updateCalled) == 2 { + for i := 0; i < 100; i++ { // 10ms * 100 = 1s + // test that leader loss is retried - even on error, we should attempt update. + // 5 is an arbitraty number greater than amount of actual updates + if atomic.LoadInt32(&updateCalled) >= 10 { break } - time.Sleep(50 * time.Millisecond) + time.Sleep(10 * time.Millisecond) } - // clean up the background go routine + updates := atomic.LoadInt32(&updateCalled) + if updates < 10 { + require.Fail(t, + "insufficient update calls", + "expected at least 10 update attempts, got %d during a partition", + updates) + } + + require.NoError(t, ecluster.Members[1].Restart(t)) + require.NoError(t, ecluster.Members[2].Restart(t)) + // wait for leader + election delay just in case + time.Sleep(time.Duration(3*ecluster.Members[0].ElectionTicks) * tickDuration) + + leaderIdx = ecluster.WaitLeader(t) + require.True(t, leaderIdx >= 0 && leaderIdx < len(ecluster.Members), "got invalid leader") + + _, err = ec.Put(context.Background(), "foo", "baz") + require.NoError(t, err) + + // give some time for watch to be updated + runtime.Gosched() + time.Sleep(watchInitAndRetryDelay) + atomic.AddInt32(&shouldStop, 1) <-doneCh + + require.Len(t, eventLog, 2) + require.NotNil(t, eventLog[0]) + require.Equal(t, eventLog[0].Kv.Key, []byte("foo")) + require.Equal(t, eventLog[0].Kv.Value, []byte("bar")) + require.NotNil(t, eventLog[1]) + require.Equal(t, eventLog[1].Kv.Key, []byte("foo")) + require.Equal(t, eventLog[1].Kv.Value, []byte("baz")) } func TestWatchCompactedRevision(t *testing.T) {