Skip to content

Commit

Permalink
[cluster] Close etcd watches completely before recreating (#2997)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdarulis authored Dec 10, 2020
1 parent d497d95 commit 232ab06
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 101 deletions.
6 changes: 1 addition & 5 deletions src/cluster/client/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,7 @@ func (c *csclient) txnGen(
if ok {
return store, nil
}
if store, err = etcdkv.NewStore(
cli.KV,
cli.Watcher,
c.newkvOptions(opts, cacheFileFn),
); err != nil {
if store, err = etcdkv.NewStore(cli, c.newkvOptions(opts, cacheFileFn)); err != nil {
return nil, err
}

Expand Down
49 changes: 31 additions & 18 deletions src/cluster/etcd/watchmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,27 +71,39 @@ func (w *manager) watchChanWithTimeout(key string, rev int64) (clientv3.WatchCha

ctx, cancelFn := context.WithCancel(clientv3.WithRequireLeader(context.Background()))

var watchChan clientv3.WatchChan
var (
watcher = clientv3.NewWatcher(w.opts.Client())
watchChan clientv3.WatchChan
)
go func() {
wOpts := w.opts.WatchOptions()
if rev > 0 {
wOpts = append(wOpts, clientv3.WithRev(rev))
}
watchChan = w.opts.Watcher().Watch(
watchChan = watcher.Watch(
ctx,
key,
wOpts...,
)
close(doneCh)
}()

timeout := w.opts.WatchChanInitTimeout()
var (
timeout = w.opts.WatchChanInitTimeout()
cancelWatchFn = func() {
cancelFn()
if err := watcher.Close(); err != nil {
w.logger.Info("error closing watcher", zap.Error(err))
}
}
)

select {
case <-doneCh:
return watchChan, cancelFn, nil
return watchChan, cancelWatchFn, nil
case <-time.After(timeout):
cancelFn()
return nil, nil, fmt.Errorf("etcd watch create timed out after %s for key: %s", timeout.String(), key)
err := fmt.Errorf("etcd watch create timed out after %s for key: %s", timeout.String(), key)
return nil, cancelWatchFn, err
}
}

Expand All @@ -108,6 +120,16 @@ func (w *manager) Watch(key string) {

defer ticker.Stop()

resetWatchWithSleep := func() {
w.m.etcdWatchReset.Inc(1)

cancelFn()
// set it to nil so it will be recreated
watchChan = nil
// avoid recreating watch channel too frequently
time.Sleep(w.opts.WatchChanResetInterval())
}

for {
if watchChan == nil {
w.m.etcdWatchCreate.Inc(1)
Expand All @@ -121,24 +143,16 @@ func (w *manager) Watch(key string) {
if err = w.updateFn(key, nil); err != nil {
logger.Error("failed to get value for key", zap.Error(err))
}
// avoid recreating watch channel too frequently
time.Sleep(w.opts.WatchChanResetInterval())
resetWatchWithSleep()
continue
}
}

select {
case r, ok := <-watchChan:
if !ok {
// the watch chan is closed, set it to nil so it will be recreated
cancelFn()
watchChan = nil
resetWatchWithSleep()
logger.Warn("etcd watch channel closed on key, recreating a watch channel")

// avoid recreating watch channel too frequently
time.Sleep(w.opts.WatchChanResetInterval())
w.m.etcdWatchReset.Inc(1)

continue
}

Expand All @@ -164,8 +178,7 @@ func (w *manager) Watch(key string) {
logger.Warn("recreating watch due to an error")
}

cancelFn()
watchChan = nil
resetWatchWithSleep()
} else if r.IsProgressNotify() {
// Do not call updateFn on ProgressNotify as it happens periodically with no update events
continue
Expand Down
67 changes: 47 additions & 20 deletions src/cluster/etcd/watchmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"testing"
"time"

"github.com/m3db/m3/src/cluster/mocks"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
Expand All @@ -38,9 +36,12 @@ import (
)

func TestWatchChan(t *testing.T) {
wh, ec, _, _, _, closer := testSetup(t)
t.Parallel()
wh, ecluster, _, _, _, closer := testCluster(t) //nolint:dogsled
defer closer()

ec := ecluster.RandClient()

wc, _, err := wh.watchChanWithTimeout("foo", 0)
require.NoError(t, err)
require.Equal(t, 0, len(wc))
Expand All @@ -54,16 +55,17 @@ func TestWatchChan(t *testing.T) {
require.Fail(t, "could not get notification")
}

mw := mocks.NewBlackholeWatcher(ec, 3, func() { time.Sleep(time.Minute) })
wh.opts = wh.opts.SetWatcher(mw).SetWatchChanInitTimeout(100 * time.Millisecond)
ecluster.Members[0].Stop(t)

before := time.Now()
_, _, err = wh.watchChanWithTimeout("foo", 0)
require.WithinDuration(t, time.Now(), before, 150*time.Millisecond)
require.Error(t, err)
require.NoError(t, ecluster.Members[0].Restart(t))
}

func TestWatchSimple(t *testing.T) {
t.Parallel()
wh, ec, updateCalled, shouldStop, doneCh, closer := testSetup(t)
defer closer()
require.Equal(t, int32(0), atomic.LoadInt32(updateCalled))
Expand Down Expand Up @@ -111,35 +113,45 @@ func TestWatchSimple(t *testing.T) {
}

func TestWatchRecreate(t *testing.T) {
wh, ec, updateCalled, shouldStop, doneCh, closer := testSetup(t)
t.Parallel()
wh, ecluster, updateCalled, shouldStop, doneCh, closer := testCluster(t)
defer closer()

ec := ecluster.RandClient()

failTotal := 2
mw := mocks.NewBlackholeWatcher(ec, failTotal, func() { time.Sleep(time.Minute) })
wh.opts = wh.opts.
SetWatcher(mw).
SetClient(ec).
SetWatchChanInitTimeout(200 * time.Millisecond).
SetWatchChanResetInterval(100 * time.Millisecond)

go wh.Watch("foo")
go func() {
ecluster.Members[0].DropConnections()
ecluster.Members[0].Blackhole()
wh.Watch("foo")
}()

time.Sleep(2 * wh.opts.WatchChanInitTimeout())

// watch will error out but updateFn will be tried
for {
if atomic.LoadInt32(updateCalled) == int32(failTotal) {
if atomic.LoadInt32(updateCalled) >= int32(failTotal) {
break
}
time.Sleep(10 * time.Millisecond)
}

ecluster.Members[0].Unblackhole()
// now we have retried failTotal times, give enough time for reset to happen
time.Sleep(3 * (wh.opts.WatchChanResetInterval()))

updatesBefore := atomic.LoadInt32(updateCalled)
// there should be a valid watch now, trigger a notification
_, err := ec.Put(context.Background(), "foo", "v")
require.NoError(t, err)

for {
if atomic.LoadInt32(updateCalled) == int32(failTotal+1) {
if atomic.LoadInt32(updateCalled) > updatesBefore {
break
}
time.Sleep(10 * time.Millisecond)
Expand All @@ -151,6 +163,7 @@ func TestWatchRecreate(t *testing.T) {
}

func TestWatchNoLeader(t *testing.T) {
t.Parallel()
const (
watchInitAndRetryDelay = 200 * time.Millisecond
watchCheckInterval = 50 * time.Millisecond
Expand All @@ -170,7 +183,7 @@ func TestWatchNoLeader(t *testing.T) {
)

opts := NewOptions().
SetWatcher(ec.Watcher).
SetClient(ec).
SetUpdateFn(
func(_ string, e []*clientv3.Event) error {
atomic.AddInt32(&updateCalled, 1)
Expand All @@ -192,6 +205,7 @@ func TestWatchNoLeader(t *testing.T) {
},
).
SetWatchChanInitTimeout(watchInitAndRetryDelay).
SetWatchChanResetInterval(watchInitAndRetryDelay).
SetWatchChanCheckInterval(watchCheckInterval)

wh, err := NewWatchManager(opts)
Expand All @@ -213,7 +227,7 @@ func TestWatchNoLeader(t *testing.T) {
if atomic.LoadInt32(&updateCalled) == int32(3) {
break
}
time.Sleep(10 * time.Millisecond)
time.Sleep(watchInitAndRetryDelay)
}

// simulate quorum loss
Expand All @@ -223,13 +237,13 @@ func TestWatchNoLeader(t *testing.T) {
// wait for election timeout, then member[0] will not have a leader.
time.Sleep(electionTimeout)

for i := 0; i < 100; i++ { // 10ms * 100 = 1s
for i := 0; i < 100; i++ {
// test that leader loss is retried - even on error, we should attempt update.
// 5 is an arbitraty number greater than amount of actual updates
if atomic.LoadInt32(&updateCalled) >= 10 {
break
}
time.Sleep(10 * time.Millisecond)
time.Sleep(watchInitAndRetryDelay)
}

updates := atomic.LoadInt32(&updateCalled)
Expand Down Expand Up @@ -268,6 +282,7 @@ func TestWatchNoLeader(t *testing.T) {
}

func TestWatchCompactedRevision(t *testing.T) {
t.Parallel()
wh, ec, updateCalled, shouldStop, doneCh, closer := testSetup(t)
defer closer()

Expand Down Expand Up @@ -309,9 +324,15 @@ func TestWatchCompactedRevision(t *testing.T) {
<-doneCh
}

func testSetup(t *testing.T) (*manager, *clientv3.Client, *int32, *int32, chan struct{}, func()) {
func testCluster(t *testing.T) (
*manager,
*integration.ClusterV3,
*int32,
*int32,
chan struct{},
func(),
) {
ecluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
ec := ecluster.RandClient()

closer := func() {
ecluster.Terminate(t)
Expand All @@ -323,7 +344,7 @@ func testSetup(t *testing.T) (*manager, *clientv3.Client, *int32, *int32, chan s
)
doneCh := make(chan struct{}, 1)
opts := NewOptions().
SetWatcher(ec.Watcher).
SetClient(ecluster.RandClient()).
SetUpdateFn(func(string, []*clientv3.Event) error {
atomic.AddInt32(&updateCalled, 1)
return nil
Expand All @@ -338,10 +359,16 @@ func testSetup(t *testing.T) (*manager, *clientv3.Client, *int32, *int32, chan s
return true
}).
SetWatchChanCheckInterval(100 * time.Millisecond).
SetWatchChanInitTimeout(100 * time.Millisecond)
SetWatchChanInitTimeout(100 * time.Millisecond).
SetWatchChanResetInterval(100 * time.Millisecond)

wh, err := NewWatchManager(opts)
require.NoError(t, err)

return wh.(*manager), ec, &updateCalled, &shouldStop, doneCh, closer
return wh.(*manager), ecluster, &updateCalled, &shouldStop, doneCh, closer
}

func testSetup(t *testing.T) (*manager, *clientv3.Client, *int32, *int32, chan struct{}, func()) {
wh, ecluster, updateCalled, shouldStop, donech, closer := testCluster(t)
return wh, ecluster.RandClient(), updateCalled, shouldStop, donech, closer
}
16 changes: 8 additions & 8 deletions src/cluster/etcd/watchmanager/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
)

var (
errNilWatch = errors.New("invalid options: nil watcher")
errNilClient = errors.New("invalid options: nil client")
errNilUpdateFn = errors.New("invalid options: nil updateFn")
errNilTickAndStopFn = errors.New("invalid options: nil tickAndStopFn")
errNilInstrumentOptions = errors.New("invalid options: nil instrument options")
Expand All @@ -54,7 +54,7 @@ func NewOptions() Options {
}

type options struct {
watcher clientv3.Watcher
client *clientv3.Client
updateFn UpdateFn
tickAndStopFn TickAndStopFn

Expand All @@ -65,13 +65,13 @@ type options struct {
iopts instrument.Options
}

func (o *options) Watcher() clientv3.Watcher {
return o.watcher
func (o *options) Client() *clientv3.Client {
return o.client
}

func (o *options) SetWatcher(w clientv3.Watcher) Options {
func (o *options) SetClient(cli *clientv3.Client) Options {
opts := *o
opts.watcher = w
opts.client = cli
return &opts
}

Expand Down Expand Up @@ -146,8 +146,8 @@ func (o *options) SetInstrumentsOptions(iopts instrument.Options) Options {
}

func (o *options) Validate() error {
if o.watcher == nil {
return errNilWatch
if o.client == nil {
return errNilClient
}

if o.updateFn == nil {
Expand Down
8 changes: 4 additions & 4 deletions src/cluster/etcd/watchmanager/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ type TickAndStopFn func(key string) bool

// Options are options for the etcd watch helper
type Options interface {
// Watcher is the etcd watcher
Watcher() clientv3.Watcher
// SetWatcher sets the Watcher
SetWatcher(w clientv3.Watcher) Options
// Client sets the etcd client
Client() *clientv3.Client
// SetClient sets the etcd client
SetClient(cli *clientv3.Client) Options

// UpdateFn is the function called when a notification on a key is received
UpdateFn() UpdateFn
Expand Down
Loading

0 comments on commit 232ab06

Please sign in to comment.