From 232ab06ea6766288abfd71e19a7a94f8553141cf Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Thu, 10 Dec 2020 14:04:04 -0500 Subject: [PATCH] [cluster] Close etcd watches completely before recreating (#2997) --- src/cluster/client/etcd/client.go | 6 +- src/cluster/etcd/watchmanager/manager.go | 49 ++++++----- src/cluster/etcd/watchmanager/manager_test.go | 67 ++++++++++----- src/cluster/etcd/watchmanager/options.go | 16 ++-- src/cluster/etcd/watchmanager/types.go | 8 +- src/cluster/kv/etcd/store.go | 10 +-- src/cluster/kv/etcd/store_test.go | 81 ++++++++++--------- src/cluster/services/heartbeat/etcd/store.go | 4 +- 8 files changed, 140 insertions(+), 101 deletions(-) diff --git a/src/cluster/client/etcd/client.go b/src/cluster/client/etcd/client.go index 927e384b9f..381b41935d 100644 --- a/src/cluster/client/etcd/client.go +++ b/src/cluster/client/etcd/client.go @@ -219,11 +219,7 @@ func (c *csclient) txnGen( if ok { return store, nil } - if store, err = etcdkv.NewStore( - cli.KV, - cli.Watcher, - c.newkvOptions(opts, cacheFileFn), - ); err != nil { + if store, err = etcdkv.NewStore(cli, c.newkvOptions(opts, cacheFileFn)); err != nil { return nil, err } diff --git a/src/cluster/etcd/watchmanager/manager.go b/src/cluster/etcd/watchmanager/manager.go index cef548c8fa..02fefd320e 100644 --- a/src/cluster/etcd/watchmanager/manager.go +++ b/src/cluster/etcd/watchmanager/manager.go @@ -71,13 +71,16 @@ func (w *manager) watchChanWithTimeout(key string, rev int64) (clientv3.WatchCha ctx, cancelFn := context.WithCancel(clientv3.WithRequireLeader(context.Background())) - var watchChan clientv3.WatchChan + var ( + watcher = clientv3.NewWatcher(w.opts.Client()) + watchChan clientv3.WatchChan + ) go func() { wOpts := w.opts.WatchOptions() if rev > 0 { wOpts = append(wOpts, clientv3.WithRev(rev)) } - watchChan = w.opts.Watcher().Watch( + watchChan = watcher.Watch( ctx, key, wOpts..., @@ -85,13 +88,22 @@ func (w *manager) watchChanWithTimeout(key string, rev int64) (clientv3.WatchCha close(doneCh) }() - timeout := w.opts.WatchChanInitTimeout() + var ( + timeout = w.opts.WatchChanInitTimeout() + cancelWatchFn = func() { + cancelFn() + if err := watcher.Close(); err != nil { + w.logger.Info("error closing watcher", zap.Error(err)) + } + } + ) + select { case <-doneCh: - return watchChan, cancelFn, nil + return watchChan, cancelWatchFn, nil case <-time.After(timeout): - cancelFn() - return nil, nil, fmt.Errorf("etcd watch create timed out after %s for key: %s", timeout.String(), key) + err := fmt.Errorf("etcd watch create timed out after %s for key: %s", timeout.String(), key) + return nil, cancelWatchFn, err } } @@ -108,6 +120,16 @@ func (w *manager) Watch(key string) { defer ticker.Stop() + resetWatchWithSleep := func() { + w.m.etcdWatchReset.Inc(1) + + cancelFn() + // set it to nil so it will be recreated + watchChan = nil + // avoid recreating watch channel too frequently + time.Sleep(w.opts.WatchChanResetInterval()) + } + for { if watchChan == nil { w.m.etcdWatchCreate.Inc(1) @@ -121,8 +143,7 @@ func (w *manager) Watch(key string) { if err = w.updateFn(key, nil); err != nil { logger.Error("failed to get value for key", zap.Error(err)) } - // avoid recreating watch channel too frequently - time.Sleep(w.opts.WatchChanResetInterval()) + resetWatchWithSleep() continue } } @@ -130,15 +151,8 @@ func (w *manager) Watch(key string) { select { case r, ok := <-watchChan: if !ok { - // the watch chan is closed, set it to nil so it will be recreated - cancelFn() - watchChan = nil + resetWatchWithSleep() logger.Warn("etcd watch channel closed on key, recreating a watch channel") - - // avoid recreating watch channel too frequently - time.Sleep(w.opts.WatchChanResetInterval()) - w.m.etcdWatchReset.Inc(1) - continue } @@ -164,8 +178,7 @@ func (w *manager) Watch(key string) { logger.Warn("recreating watch due to an error") } - cancelFn() - watchChan = nil + resetWatchWithSleep() } else if r.IsProgressNotify() { // Do not call updateFn on ProgressNotify as it happens periodically with no update events continue diff --git a/src/cluster/etcd/watchmanager/manager_test.go b/src/cluster/etcd/watchmanager/manager_test.go index 1b8b96c70f..ad1eda0d33 100644 --- a/src/cluster/etcd/watchmanager/manager_test.go +++ b/src/cluster/etcd/watchmanager/manager_test.go @@ -27,8 +27,6 @@ import ( "testing" "time" - "github.com/m3db/m3/src/cluster/mocks" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" @@ -38,9 +36,12 @@ import ( ) func TestWatchChan(t *testing.T) { - wh, ec, _, _, _, closer := testSetup(t) + t.Parallel() + wh, ecluster, _, _, _, closer := testCluster(t) //nolint:dogsled defer closer() + ec := ecluster.RandClient() + wc, _, err := wh.watchChanWithTimeout("foo", 0) require.NoError(t, err) require.Equal(t, 0, len(wc)) @@ -54,16 +55,17 @@ func TestWatchChan(t *testing.T) { require.Fail(t, "could not get notification") } - mw := mocks.NewBlackholeWatcher(ec, 3, func() { time.Sleep(time.Minute) }) - wh.opts = wh.opts.SetWatcher(mw).SetWatchChanInitTimeout(100 * time.Millisecond) + ecluster.Members[0].Stop(t) before := time.Now() _, _, err = wh.watchChanWithTimeout("foo", 0) require.WithinDuration(t, time.Now(), before, 150*time.Millisecond) require.Error(t, err) + require.NoError(t, ecluster.Members[0].Restart(t)) } func TestWatchSimple(t *testing.T) { + t.Parallel() wh, ec, updateCalled, shouldStop, doneCh, closer := testSetup(t) defer closer() require.Equal(t, int32(0), atomic.LoadInt32(updateCalled)) @@ -111,35 +113,45 @@ func TestWatchSimple(t *testing.T) { } func TestWatchRecreate(t *testing.T) { - wh, ec, updateCalled, shouldStop, doneCh, closer := testSetup(t) + t.Parallel() + wh, ecluster, updateCalled, shouldStop, doneCh, closer := testCluster(t) defer closer() + ec := ecluster.RandClient() + failTotal := 2 - mw := mocks.NewBlackholeWatcher(ec, failTotal, func() { time.Sleep(time.Minute) }) wh.opts = wh.opts. - SetWatcher(mw). + SetClient(ec). SetWatchChanInitTimeout(200 * time.Millisecond). SetWatchChanResetInterval(100 * time.Millisecond) - go wh.Watch("foo") + go func() { + ecluster.Members[0].DropConnections() + ecluster.Members[0].Blackhole() + wh.Watch("foo") + }() + + time.Sleep(2 * wh.opts.WatchChanInitTimeout()) // watch will error out but updateFn will be tried for { - if atomic.LoadInt32(updateCalled) == int32(failTotal) { + if atomic.LoadInt32(updateCalled) >= int32(failTotal) { break } time.Sleep(10 * time.Millisecond) } + ecluster.Members[0].Unblackhole() // now we have retried failTotal times, give enough time for reset to happen time.Sleep(3 * (wh.opts.WatchChanResetInterval())) + updatesBefore := atomic.LoadInt32(updateCalled) // there should be a valid watch now, trigger a notification _, err := ec.Put(context.Background(), "foo", "v") require.NoError(t, err) for { - if atomic.LoadInt32(updateCalled) == int32(failTotal+1) { + if atomic.LoadInt32(updateCalled) > updatesBefore { break } time.Sleep(10 * time.Millisecond) @@ -151,6 +163,7 @@ func TestWatchRecreate(t *testing.T) { } func TestWatchNoLeader(t *testing.T) { + t.Parallel() const ( watchInitAndRetryDelay = 200 * time.Millisecond watchCheckInterval = 50 * time.Millisecond @@ -170,7 +183,7 @@ func TestWatchNoLeader(t *testing.T) { ) opts := NewOptions(). - SetWatcher(ec.Watcher). + SetClient(ec). SetUpdateFn( func(_ string, e []*clientv3.Event) error { atomic.AddInt32(&updateCalled, 1) @@ -192,6 +205,7 @@ func TestWatchNoLeader(t *testing.T) { }, ). SetWatchChanInitTimeout(watchInitAndRetryDelay). + SetWatchChanResetInterval(watchInitAndRetryDelay). SetWatchChanCheckInterval(watchCheckInterval) wh, err := NewWatchManager(opts) @@ -213,7 +227,7 @@ func TestWatchNoLeader(t *testing.T) { if atomic.LoadInt32(&updateCalled) == int32(3) { break } - time.Sleep(10 * time.Millisecond) + time.Sleep(watchInitAndRetryDelay) } // simulate quorum loss @@ -223,13 +237,13 @@ func TestWatchNoLeader(t *testing.T) { // wait for election timeout, then member[0] will not have a leader. time.Sleep(electionTimeout) - for i := 0; i < 100; i++ { // 10ms * 100 = 1s + for i := 0; i < 100; i++ { // test that leader loss is retried - even on error, we should attempt update. // 5 is an arbitraty number greater than amount of actual updates if atomic.LoadInt32(&updateCalled) >= 10 { break } - time.Sleep(10 * time.Millisecond) + time.Sleep(watchInitAndRetryDelay) } updates := atomic.LoadInt32(&updateCalled) @@ -268,6 +282,7 @@ func TestWatchNoLeader(t *testing.T) { } func TestWatchCompactedRevision(t *testing.T) { + t.Parallel() wh, ec, updateCalled, shouldStop, doneCh, closer := testSetup(t) defer closer() @@ -309,9 +324,15 @@ func TestWatchCompactedRevision(t *testing.T) { <-doneCh } -func testSetup(t *testing.T) (*manager, *clientv3.Client, *int32, *int32, chan struct{}, func()) { +func testCluster(t *testing.T) ( + *manager, + *integration.ClusterV3, + *int32, + *int32, + chan struct{}, + func(), +) { ecluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - ec := ecluster.RandClient() closer := func() { ecluster.Terminate(t) @@ -323,7 +344,7 @@ func testSetup(t *testing.T) (*manager, *clientv3.Client, *int32, *int32, chan s ) doneCh := make(chan struct{}, 1) opts := NewOptions(). - SetWatcher(ec.Watcher). + SetClient(ecluster.RandClient()). SetUpdateFn(func(string, []*clientv3.Event) error { atomic.AddInt32(&updateCalled, 1) return nil @@ -338,10 +359,16 @@ func testSetup(t *testing.T) (*manager, *clientv3.Client, *int32, *int32, chan s return true }). SetWatchChanCheckInterval(100 * time.Millisecond). - SetWatchChanInitTimeout(100 * time.Millisecond) + SetWatchChanInitTimeout(100 * time.Millisecond). + SetWatchChanResetInterval(100 * time.Millisecond) wh, err := NewWatchManager(opts) require.NoError(t, err) - return wh.(*manager), ec, &updateCalled, &shouldStop, doneCh, closer + return wh.(*manager), ecluster, &updateCalled, &shouldStop, doneCh, closer +} + +func testSetup(t *testing.T) (*manager, *clientv3.Client, *int32, *int32, chan struct{}, func()) { + wh, ecluster, updateCalled, shouldStop, donech, closer := testCluster(t) + return wh, ecluster.RandClient(), updateCalled, shouldStop, donech, closer } diff --git a/src/cluster/etcd/watchmanager/options.go b/src/cluster/etcd/watchmanager/options.go index 7156c27314..d2d642c8e8 100644 --- a/src/cluster/etcd/watchmanager/options.go +++ b/src/cluster/etcd/watchmanager/options.go @@ -36,7 +36,7 @@ const ( ) var ( - errNilWatch = errors.New("invalid options: nil watcher") + errNilClient = errors.New("invalid options: nil client") errNilUpdateFn = errors.New("invalid options: nil updateFn") errNilTickAndStopFn = errors.New("invalid options: nil tickAndStopFn") errNilInstrumentOptions = errors.New("invalid options: nil instrument options") @@ -54,7 +54,7 @@ func NewOptions() Options { } type options struct { - watcher clientv3.Watcher + client *clientv3.Client updateFn UpdateFn tickAndStopFn TickAndStopFn @@ -65,13 +65,13 @@ type options struct { iopts instrument.Options } -func (o *options) Watcher() clientv3.Watcher { - return o.watcher +func (o *options) Client() *clientv3.Client { + return o.client } -func (o *options) SetWatcher(w clientv3.Watcher) Options { +func (o *options) SetClient(cli *clientv3.Client) Options { opts := *o - opts.watcher = w + opts.client = cli return &opts } @@ -146,8 +146,8 @@ func (o *options) SetInstrumentsOptions(iopts instrument.Options) Options { } func (o *options) Validate() error { - if o.watcher == nil { - return errNilWatch + if o.client == nil { + return errNilClient } if o.updateFn == nil { diff --git a/src/cluster/etcd/watchmanager/types.go b/src/cluster/etcd/watchmanager/types.go index 753ac14184..0efad25bc0 100644 --- a/src/cluster/etcd/watchmanager/types.go +++ b/src/cluster/etcd/watchmanager/types.go @@ -43,10 +43,10 @@ type TickAndStopFn func(key string) bool // Options are options for the etcd watch helper type Options interface { - // Watcher is the etcd watcher - Watcher() clientv3.Watcher - // SetWatcher sets the Watcher - SetWatcher(w clientv3.Watcher) Options + // Client sets the etcd client + Client() *clientv3.Client + // SetClient sets the etcd client + SetClient(cli *clientv3.Client) Options // UpdateFn is the function called when a notification on a key is received UpdateFn() UpdateFn diff --git a/src/cluster/kv/etcd/store.go b/src/cluster/kv/etcd/store.go index a144dcc541..efbd3643e6 100644 --- a/src/cluster/kv/etcd/store.go +++ b/src/cluster/kv/etcd/store.go @@ -33,9 +33,9 @@ import ( xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/retry" - "go.etcd.io/etcd/clientv3" "github.com/golang/protobuf/proto" "github.com/uber-go/tally" + "go.etcd.io/etcd/clientv3" "go.uber.org/zap" "golang.org/x/net/context" ) @@ -51,13 +51,12 @@ var ( ) // NewStore creates a kv store based on etcd -func NewStore(etcdKV clientv3.KV, etcdWatcher clientv3.Watcher, opts Options) (kv.TxnStore, error) { +func NewStore(etcdKV *clientv3.Client, opts Options) (kv.TxnStore, error) { scope := opts.InstrumentsOptions().MetricsScope() store := &client{ opts: opts, kv: etcdKV, - watcher: etcdWatcher, watchables: map[string]kv.ValueWatchable{}, retrier: retry.NewRetrier(opts.RetryOptions()), logger: opts.InstrumentsOptions().Logger(), @@ -86,7 +85,7 @@ func NewStore(etcdKV clientv3.KV, etcdWatcher clientv3.Watcher, opts Options) (k } wOpts := watchmanager.NewOptions(). - SetWatcher(etcdWatcher). + SetClient(etcdKV). SetUpdateFn(store.update). SetTickAndStopFn(store.tickAndStop). SetWatchOptions(clientWatchOpts). @@ -124,8 +123,7 @@ type client struct { sync.RWMutex opts Options - kv clientv3.KV - watcher clientv3.Watcher + kv *clientv3.Client watchables map[string]kv.ValueWatchable retrier retry.Retrier logger *zap.Logger diff --git a/src/cluster/kv/etcd/store_test.go b/src/cluster/kv/etcd/store_test.go index af6d33b7be..8924095d3f 100644 --- a/src/cluster/kv/etcd/store_test.go +++ b/src/cluster/kv/etcd/store_test.go @@ -33,7 +33,6 @@ import ( "github.com/m3db/m3/src/cluster/generated/proto/kvtest" "github.com/m3db/m3/src/cluster/kv" - "github.com/m3db/m3/src/cluster/mocks" xclock "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/retry" @@ -60,7 +59,7 @@ func TestGetAndSet(t *testing.T) { ec, opts, closeFn := testStore(t) defer closeFn() - store, err := NewStore(ec, ec, opts) + store, err := NewStore(ec, opts) require.NoError(t, err) value, err := store.Get("foo") @@ -89,7 +88,7 @@ func TestNoCache(t *testing.T) { ec, opts, closeFn := testStore(t) - store, err := NewStore(ec, ec, opts) + store, err := NewStore(ec, opts) require.NoError(t, err) require.Equal(t, 0, len(store.(*client).cacheUpdatedCh)) @@ -112,7 +111,7 @@ func TestNoCache(t *testing.T) { verifyValue(t, value, "bar1", 1) // new store but no cache file set - store, err = NewStore(ec, ec, opts) + store, err = NewStore(ec, opts) require.NoError(t, err) _, err = store.Set("foo", genProto("bar1")) @@ -136,7 +135,7 @@ func TestCacheDirCreation(t *testing.T) { return path.Join(cdir, opts.Prefix()) }) - store, err := NewStore(ec, ec, opts) + store, err := NewStore(ec, opts) require.NoError(t, err) info, err := os.Stat(cdir) @@ -164,7 +163,7 @@ func TestCache(t *testing.T) { return f.Name() }) - store, err := NewStore(ec, ec, opts) + store, err := NewStore(ec, opts) require.NoError(t, err) require.Equal(t, 0, len(store.(*client).cacheUpdatedCh)) @@ -190,7 +189,7 @@ func TestCache(t *testing.T) { require.Equal(t, 0, len(store.(*client).cacheUpdatedCh)) // new store but with cache file - store, err = NewStore(ec, ec, opts) + store, err = NewStore(ec, opts) require.NoError(t, err) _, err = store.Set("key", genProto("bar1")) @@ -212,7 +211,7 @@ func TestSetIfNotExist(t *testing.T) { ec, opts, closeFn := testStore(t) defer closeFn() - store, err := NewStore(ec, ec, opts) + store, err := NewStore(ec, opts) require.NoError(t, err) version, err := store.SetIfNotExists("foo", genProto("bar")) @@ -233,7 +232,7 @@ func TestCheckAndSet(t *testing.T) { ec, opts, closeFn := testStore(t) defer closeFn() - store, err := NewStore(ec, ec, opts) + store, err := NewStore(ec, opts) require.NoError(t, err) _, err = store.CheckAndSet("foo", 1, genProto("bar")) @@ -261,7 +260,7 @@ func TestWatchClose(t *testing.T) { ec, opts, closeFn := testStore(t) defer closeFn() - store, err := NewStore(ec, ec, opts) + store, err := NewStore(ec, opts) require.NoError(t, err) _, err = store.Set("foo", genProto("bar1")) @@ -312,7 +311,7 @@ func TestWatchLastVersion(t *testing.T) { ec, opts, closeFn := testStore(t) defer closeFn() - store, err := NewStore(ec, ec, opts) + store, err := NewStore(ec, opts) require.NoError(t, err) w, err := store.Watch("foo") @@ -348,7 +347,7 @@ func TestWatchFromExist(t *testing.T) { ec, opts, closeFn := testStore(t) defer closeFn() - store, err := NewStore(ec, ec, opts) + store, err := NewStore(ec, opts) require.NoError(t, err) _, err = store.Set("foo", genProto("bar1")) @@ -387,7 +386,7 @@ func TestWatchFromNotExist(t *testing.T) { ec, opts, closeFn := testStore(t) defer closeFn() - store, err := NewStore(ec, ec, opts) + store, err := NewStore(ec, opts) require.NoError(t, err) w, err := store.Watch("foo") @@ -415,7 +414,7 @@ func TestWatchFromNotExist(t *testing.T) { func TestGetFromKvNotFound(t *testing.T) { ec, opts, closeFn := testStore(t) defer closeFn() - store, err := NewStore(ec, ec, opts) + store, err := NewStore(ec, opts) require.NoError(t, err) c := store.(*client) _, err = c.Set("foo", genProto("bar1")) @@ -432,7 +431,7 @@ func TestMultipleWatchesFromExist(t *testing.T) { ec, opts, closeFn := testStore(t) defer closeFn() - store, err := NewStore(ec, ec, opts) + store, err := NewStore(ec, opts) require.NoError(t, err) _, err = store.Set("foo", genProto("bar1")) @@ -484,7 +483,7 @@ func TestMultipleWatchesFromNotExist(t *testing.T) { ec, opts, closeFn := testStore(t) defer closeFn() - store, err := NewStore(ec, ec, opts) + store, err := NewStore(ec, opts) require.NoError(t, err) w1, err := store.Watch("foo") require.NoError(t, err) @@ -525,14 +524,14 @@ func TestMultipleWatchesFromNotExist(t *testing.T) { func TestWatchNonBlocking(t *testing.T) { t.Parallel() - ec, opts, closeFn := testStore(t) + ecluster, opts, closeFn := testCluster(t) defer closeFn() + ec := ecluster.Client(0) + opts = opts.SetWatchChanResetInterval(200 * time.Millisecond).SetWatchChanInitTimeout(500 * time.Millisecond) - failTotal := 3 - mw := mocks.NewBlackholeWatcher(ec, failTotal, func() { time.Sleep(time.Minute) }) - store, err := NewStore(ec, mw, opts) + store, err := NewStore(ec, opts) require.NoError(t, err) c := store.(*client) @@ -540,10 +539,13 @@ func TestWatchNonBlocking(t *testing.T) { require.NoError(t, err) before := time.Now() + ecluster.Members[0].Blackhole() w1, err := c.Watch("foo") require.WithinDuration(t, time.Now(), before, 100*time.Millisecond) require.NoError(t, err) require.Equal(t, 0, len(w1.C())) + ecluster.Members[0].Unblackhole() + ecluster.Members[0].DropConnections() // watch channel will error out, but Get() will be tried <-w1.C() @@ -556,7 +558,7 @@ func TestHistory(t *testing.T) { ec, opts, closeFn := testStore(t) defer closeFn() - store, err := NewStore(ec, ec, opts) + store, err := NewStore(ec, opts) require.NoError(t, err) _, err = store.History("k1", 10, 5) @@ -614,7 +616,7 @@ func TestDelete(t *testing.T) { ec, opts, closeFn := testStore(t) defer closeFn() - store, err := NewStore(ec, ec, opts) + store, err := NewStore(ec, opts) require.NoError(t, err) _, err = store.Delete("foo") @@ -651,7 +653,7 @@ func TestDelete_UpdateCache(t *testing.T) { ec, opts, closeFn := testStore(t) defer closeFn() - c, err := NewStore(ec, ec, opts) + c, err := NewStore(ec, opts) require.NoError(t, err) store := c.(*client) @@ -680,7 +682,7 @@ func TestDelete_UpdateWatcherCache(t *testing.T) { ec, opts, closeFn := testStore(t) defer closeFn() - setStore, err := NewStore(ec, ec, opts) + setStore, err := NewStore(ec, opts) require.NoError(t, err) setClient := setStore.(*client) @@ -702,7 +704,7 @@ func TestDelete_UpdateWatcherCache(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(clientCachePath) - getStore, err := NewStore(ec, ec, opts.SetCacheFileFn(func(ns string) string { + getStore, err := NewStore(ec, opts.SetCacheFileFn(func(ns string) string { nsFile := path.Join(clientCachePath, fmt.Sprintf("%s.json", ns)) return nsFile })) @@ -757,7 +759,7 @@ func TestDelete_TriggerWatch(t *testing.T) { ec, opts, closeFn := testStore(t) defer closeFn() - store, err := NewStore(ec, ec, opts) + store, err := NewStore(ec, opts) require.NoError(t, err) vw, err := store.Watch("foo") @@ -799,7 +801,7 @@ func TestStaleDelete__FromGet(t *testing.T) { defer os.RemoveAll(serverCachePath) ec, opts, closeFn := testStore(t) - setStore, err := NewStore(ec, ec, opts.SetCacheFileFn(func(ns string) string { + setStore, err := NewStore(ec, opts.SetCacheFileFn(func(ns string) string { return path.Join(serverCachePath, fmt.Sprintf("%s.json", ns)) })) require.NoError(t, err) @@ -847,7 +849,7 @@ func TestStaleDelete__FromGet(t *testing.T) { // create new etcd cluster ec2, opts, closeFn2 := testStore(t) defer closeFn2() - getStore, err := NewStore(ec2, ec2, opts.SetCacheFileFn(func(ns string) string { + getStore, err := NewStore(ec2, opts.SetCacheFileFn(func(ns string) string { nsFile := path.Join(newServerCachePath, fmt.Sprintf("%s.json", ns)) return nsFile })) @@ -882,7 +884,7 @@ func TestStaleDelete__FromWatch(t *testing.T) { defer os.RemoveAll(serverCachePath) ec, opts, closeFn := testStore(t) - setStore, err := NewStore(ec, ec, opts.SetCacheFileFn(func(ns string) string { + setStore, err := NewStore(ec, opts.SetCacheFileFn(func(ns string) string { return path.Join(serverCachePath, fmt.Sprintf("%s.json", ns)) })) require.NoError(t, err) @@ -932,7 +934,7 @@ func TestStaleDelete__FromWatch(t *testing.T) { // create new etcd cluster ec2, opts, closeFn2 := testStore(t) defer closeFn2() - getStore, err := NewStore(ec2, ec2, opts.SetCacheFileFn(func(ns string) string { + getStore, err := NewStore(ec2, opts.SetCacheFileFn(func(ns string) string { nsFile := path.Join(newServerCachePath, fmt.Sprintf("%s.json", ns)) return nsFile })) @@ -969,7 +971,7 @@ func TestTxn(t *testing.T) { ec, opts, closeFn := testStore(t) defer closeFn() - store, err := NewStore(ec, ec, opts) + store, err := NewStore(ec, opts) require.NoError(t, err) r, err := store.Commit( @@ -1040,7 +1042,7 @@ func TestTxn_ConditionFail(t *testing.T) { ec, opts, closeFn := testStore(t) defer closeFn() - store, err := NewStore(ec, ec, opts) + store, err := NewStore(ec, opts) require.NoError(t, err) _, err = store.Commit( @@ -1079,7 +1081,7 @@ func TestTxn_UnknownType(t *testing.T) { ec, opts, closeFn := testStore(t) defer closeFn() - store, err := NewStore(ec, ec, opts) + store, err := NewStore(ec, opts) require.NoError(t, err) _, err = store.Commit( @@ -1109,7 +1111,7 @@ func TestWatchWithStartRevision(t *testing.T) { opts = opts.SetWatchWithRevision(rev) - store, err := NewStore(ec, ec, opts) + store, err := NewStore(ec, opts) require.NoError(t, err) for i := 1; i <= 50; i++ { @@ -1147,10 +1149,8 @@ func genProto(msg string) proto.Message { return &kvtest.Foo{Msg: msg} } -func testStore(t *testing.T) (*clientv3.Client, Options, func()) { +func testCluster(t *testing.T) (*integration.ClusterV3, Options, func()) { ecluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - ec := ecluster.RandClient() - closer := func() { ecluster.Terminate(t) } @@ -1163,7 +1163,12 @@ func testStore(t *testing.T) (*clientv3.Client, Options, func()) { SetRetryOptions(retry.NewOptions().SetMaxRetries(1).SetMaxBackoff(0)). SetPrefix("test") - return ec, opts, closer + return ecluster, opts, closer +} + +func testStore(t *testing.T) (*clientv3.Client, Options, func()) { + ecluster, opts, closer := testCluster(t) + return ecluster.RandClient(), opts, closer } func readCacheJSONAndFilename(dirPath string) (string, []byte, error) { diff --git a/src/cluster/services/heartbeat/etcd/store.go b/src/cluster/services/heartbeat/etcd/store.go index 35fcdcb48c..b1ca844e83 100644 --- a/src/cluster/services/heartbeat/etcd/store.go +++ b/src/cluster/services/heartbeat/etcd/store.go @@ -35,9 +35,9 @@ import ( "github.com/m3db/m3/src/x/retry" "github.com/m3db/m3/src/x/watch" - "go.etcd.io/etcd/clientv3" "github.com/golang/protobuf/proto" "github.com/uber-go/tally" + "go.etcd.io/etcd/clientv3" "go.uber.org/zap" "golang.org/x/net/context" ) @@ -80,7 +80,7 @@ func NewStore(c *clientv3.Client, opts Options) (services.HeartbeatService, erro } wOpts := watchmanager.NewOptions(). - SetWatcher(c.Watcher). + SetClient(c). SetUpdateFn(store.update). SetTickAndStopFn(store.tickAndStop). SetWatchOptions([]clientv3.OpOption{