Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cluster] Close etcd watches completely before recreating #2997

Merged
merged 4 commits into from
Dec 10, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions src/cluster/client/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,7 @@ func (c *csclient) txnGen(
if ok {
return store, nil
}
if store, err = etcdkv.NewStore(
cli.KV,
cli.Watcher,
c.newkvOptions(opts, cacheFileFn),
); err != nil {
if store, err = etcdkv.NewStore(cli, c.newkvOptions(opts, cacheFileFn)); err != nil {
return nil, err
}

Expand Down
49 changes: 31 additions & 18 deletions src/cluster/etcd/watchmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,27 +71,39 @@ func (w *manager) watchChanWithTimeout(key string, rev int64) (clientv3.WatchCha

ctx, cancelFn := context.WithCancel(clientv3.WithRequireLeader(context.Background()))

var watchChan clientv3.WatchChan
var (
watcher = clientv3.NewWatcher(w.opts.Client())
abliqo marked this conversation as resolved.
Show resolved Hide resolved
watchChan clientv3.WatchChan
)
go func() {
wOpts := w.opts.WatchOptions()
if rev > 0 {
wOpts = append(wOpts, clientv3.WithRev(rev))
}
watchChan = w.opts.Watcher().Watch(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we add comment to the definition of Watcher() to not rely on it?

watchChan = watcher.Watch(
ctx,
key,
wOpts...,
)
close(doneCh)
}()

timeout := w.opts.WatchChanInitTimeout()
var (
timeout = w.opts.WatchChanInitTimeout()
cancelWatchFn = func() {
cancelFn()
if err := watcher.Close(); err != nil {
w.logger.Info("error closing watcher", zap.Error(err))
Copy link
Collaborator

@cw9 cw9 Dec 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add a comment for why is this error ok to ignore

}
}
)

select {
case <-doneCh:
return watchChan, cancelFn, nil
return watchChan, cancelWatchFn, nil
case <-time.After(timeout):
cancelFn()
return nil, nil, fmt.Errorf("etcd watch create timed out after %s for key: %s", timeout.String(), key)
err := fmt.Errorf("etcd watch create timed out after %s for key: %s", timeout.String(), key)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you call cancelWatchFn() here instead of having user to call this on err != nil?

return nil, cancelWatchFn, err
}
}

Expand All @@ -108,6 +120,16 @@ func (w *manager) Watch(key string) {

defer ticker.Stop()

resetWatch := func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ultra-nit: perhaps naming it resetWatchAndSleep would be more accurate.

w.m.etcdWatchReset.Inc(1)

cancelFn()
// set it to nil so it will be recreated
watchChan = nil
// avoid recreating watch channel too frequently
time.Sleep(w.opts.WatchChanResetInterval())
}

for {
if watchChan == nil {
w.m.etcdWatchCreate.Inc(1)
Expand All @@ -121,24 +143,16 @@ func (w *manager) Watch(key string) {
if err = w.updateFn(key, nil); err != nil {
logger.Error("failed to get value for key", zap.Error(err))
}
// avoid recreating watch channel too frequently
time.Sleep(w.opts.WatchChanResetInterval())
resetWatch()
continue
}
}

select {
case r, ok := <-watchChan:
if !ok {
// the watch chan is closed, set it to nil so it will be recreated
cancelFn()
watchChan = nil
resetWatch()
logger.Warn("etcd watch channel closed on key, recreating a watch channel")

// avoid recreating watch channel too frequently
time.Sleep(w.opts.WatchChanResetInterval())
w.m.etcdWatchReset.Inc(1)

continue
}

Expand All @@ -164,8 +178,7 @@ func (w *manager) Watch(key string) {
logger.Warn("recreating watch due to an error")
}

cancelFn()
watchChan = nil
resetWatch()
} else if r.IsProgressNotify() {
// Do not call updateFn on ProgressNotify as it happens periodically with no update events
continue
Expand Down
62 changes: 42 additions & 20 deletions src/cluster/etcd/watchmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"testing"
"time"

"github.com/m3db/m3/src/cluster/mocks"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
Expand All @@ -38,9 +36,11 @@ import (
)

func TestWatchChan(t *testing.T) {
wh, ec, _, _, _, closer := testSetup(t)
wh, ecluster, _, _, _, closer := testCluster(t) //nolint:dogsled
defer closer()

ec := ecluster.RandClient()

wc, _, err := wh.watchChanWithTimeout("foo", 0)
require.NoError(t, err)
require.Equal(t, 0, len(wc))
Expand All @@ -54,13 +54,13 @@ func TestWatchChan(t *testing.T) {
require.Fail(t, "could not get notification")
}

mw := mocks.NewBlackholeWatcher(ec, 3, func() { time.Sleep(time.Minute) })
wh.opts = wh.opts.SetWatcher(mw).SetWatchChanInitTimeout(100 * time.Millisecond)
ecluster.Members[0].Stop(t)

before := time.Now()
_, _, err = wh.watchChanWithTimeout("foo", 0)
require.WithinDuration(t, time.Now(), before, 150*time.Millisecond)
require.Error(t, err)
require.NoError(t, ecluster.Members[0].Restart(t))
}

func TestWatchSimple(t *testing.T) {
Expand Down Expand Up @@ -111,35 +111,44 @@ func TestWatchSimple(t *testing.T) {
}

func TestWatchRecreate(t *testing.T) {
wh, ec, updateCalled, shouldStop, doneCh, closer := testSetup(t)
wh, ecluster, updateCalled, shouldStop, doneCh, closer := testCluster(t)
defer closer()

ec := ecluster.RandClient()

failTotal := 2
mw := mocks.NewBlackholeWatcher(ec, failTotal, func() { time.Sleep(time.Minute) })
wh.opts = wh.opts.
SetWatcher(mw).
SetClient(ec).
SetWatchChanInitTimeout(200 * time.Millisecond).
SetWatchChanResetInterval(100 * time.Millisecond)

go wh.Watch("foo")
go func() {
ecluster.Members[0].DropConnections()
ecluster.Members[0].Blackhole()
wh.Watch("foo")
}()

time.Sleep(2 * wh.opts.WatchChanInitTimeout())

// watch will error out but updateFn will be tried
for {
if atomic.LoadInt32(updateCalled) == int32(failTotal) {
if atomic.LoadInt32(updateCalled) >= int32(failTotal) {
break
}
time.Sleep(10 * time.Millisecond)
}

ecluster.Members[0].Unblackhole()
// now we have retried failTotal times, give enough time for reset to happen
time.Sleep(3 * (wh.opts.WatchChanResetInterval()))

updatesBefore := atomic.LoadInt32(updateCalled)
// there should be a valid watch now, trigger a notification
_, err := ec.Put(context.Background(), "foo", "v")
require.NoError(t, err)

for {
if atomic.LoadInt32(updateCalled) == int32(failTotal+1) {
if atomic.LoadInt32(updateCalled) > updatesBefore {
break
}
time.Sleep(10 * time.Millisecond)
Expand Down Expand Up @@ -170,7 +179,7 @@ func TestWatchNoLeader(t *testing.T) {
)

opts := NewOptions().
SetWatcher(ec.Watcher).
SetClient(ec).
SetUpdateFn(
func(_ string, e []*clientv3.Event) error {
atomic.AddInt32(&updateCalled, 1)
Expand All @@ -192,6 +201,7 @@ func TestWatchNoLeader(t *testing.T) {
},
).
SetWatchChanInitTimeout(watchInitAndRetryDelay).
SetWatchChanResetInterval(watchInitAndRetryDelay).
SetWatchChanCheckInterval(watchCheckInterval)

wh, err := NewWatchManager(opts)
Expand All @@ -213,7 +223,7 @@ func TestWatchNoLeader(t *testing.T) {
if atomic.LoadInt32(&updateCalled) == int32(3) {
break
}
time.Sleep(10 * time.Millisecond)
time.Sleep(watchInitAndRetryDelay)
}

// simulate quorum loss
Expand All @@ -223,13 +233,13 @@ func TestWatchNoLeader(t *testing.T) {
// wait for election timeout, then member[0] will not have a leader.
time.Sleep(electionTimeout)

for i := 0; i < 100; i++ { // 10ms * 100 = 1s
for i := 0; i < 100; i++ {
// test that leader loss is retried - even on error, we should attempt update.
// 5 is an arbitraty number greater than amount of actual updates
if atomic.LoadInt32(&updateCalled) >= 10 {
break
}
time.Sleep(10 * time.Millisecond)
time.Sleep(watchInitAndRetryDelay)
}

updates := atomic.LoadInt32(&updateCalled)
Expand Down Expand Up @@ -309,9 +319,15 @@ func TestWatchCompactedRevision(t *testing.T) {
<-doneCh
}

func testSetup(t *testing.T) (*manager, *clientv3.Client, *int32, *int32, chan struct{}, func()) {
func testCluster(t *testing.T) (
*manager,
*integration.ClusterV3,
*int32,
*int32,
chan struct{},
func(),
) {
ecluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
ec := ecluster.RandClient()

closer := func() {
ecluster.Terminate(t)
Expand All @@ -323,7 +339,7 @@ func testSetup(t *testing.T) (*manager, *clientv3.Client, *int32, *int32, chan s
)
doneCh := make(chan struct{}, 1)
opts := NewOptions().
SetWatcher(ec.Watcher).
SetClient(ecluster.RandClient()).
SetUpdateFn(func(string, []*clientv3.Event) error {
atomic.AddInt32(&updateCalled, 1)
return nil
Expand All @@ -338,10 +354,16 @@ func testSetup(t *testing.T) (*manager, *clientv3.Client, *int32, *int32, chan s
return true
}).
SetWatchChanCheckInterval(100 * time.Millisecond).
SetWatchChanInitTimeout(100 * time.Millisecond)
SetWatchChanInitTimeout(100 * time.Millisecond).
SetWatchChanResetInterval(100 * time.Millisecond)

wh, err := NewWatchManager(opts)
require.NoError(t, err)

return wh.(*manager), ec, &updateCalled, &shouldStop, doneCh, closer
return wh.(*manager), ecluster, &updateCalled, &shouldStop, doneCh, closer
}

func testSetup(t *testing.T) (*manager, *clientv3.Client, *int32, *int32, chan struct{}, func()) {
wh, ecluster, updateCalled, shouldStop, donech, closer := testCluster(t)
return wh, ecluster.RandClient(), updateCalled, shouldStop, donech, closer
}
16 changes: 8 additions & 8 deletions src/cluster/etcd/watchmanager/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
)

var (
errNilWatch = errors.New("invalid options: nil watcher")
errNilClient = errors.New("invalid options: nil client")
errNilUpdateFn = errors.New("invalid options: nil updateFn")
errNilTickAndStopFn = errors.New("invalid options: nil tickAndStopFn")
errNilInstrumentOptions = errors.New("invalid options: nil instrument options")
Expand All @@ -54,7 +54,7 @@ func NewOptions() Options {
}

type options struct {
watcher clientv3.Watcher
client *clientv3.Client
updateFn UpdateFn
tickAndStopFn TickAndStopFn

Expand All @@ -65,13 +65,13 @@ type options struct {
iopts instrument.Options
}

func (o *options) Watcher() clientv3.Watcher {
return o.watcher
func (o *options) Client() *clientv3.Client {
return o.client
}

func (o *options) SetWatcher(w clientv3.Watcher) Options {
func (o *options) SetClient(cli *clientv3.Client) Options {
opts := *o
opts.watcher = w
opts.client = cli
return &opts
}

Expand Down Expand Up @@ -146,8 +146,8 @@ func (o *options) SetInstrumentsOptions(iopts instrument.Options) Options {
}

func (o *options) Validate() error {
if o.watcher == nil {
return errNilWatch
if o.client == nil {
return errNilClient
}

if o.updateFn == nil {
Expand Down
8 changes: 4 additions & 4 deletions src/cluster/etcd/watchmanager/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ type TickAndStopFn func(key string) bool

// Options are options for the etcd watch helper
type Options interface {
// Watcher is the etcd watcher
Watcher() clientv3.Watcher
// SetWatcher sets the Watcher
SetWatcher(w clientv3.Watcher) Options
// Client sets the etcd client
Client() *clientv3.Client
// SetClient sets the etcd client
SetClient(cli *clientv3.Client) Options

// UpdateFn is the function called when a notification on a key is received
UpdateFn() UpdateFn
Expand Down
10 changes: 4 additions & 6 deletions src/cluster/kv/etcd/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import (
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/retry"

"go.etcd.io/etcd/clientv3"
"github.com/golang/protobuf/proto"
"github.com/uber-go/tally"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"golang.org/x/net/context"
)
Expand All @@ -51,13 +51,12 @@ var (
)

// NewStore creates a kv store based on etcd
func NewStore(etcdKV clientv3.KV, etcdWatcher clientv3.Watcher, opts Options) (kv.TxnStore, error) {
func NewStore(etcdKV *clientv3.Client, opts Options) (kv.TxnStore, error) {
scope := opts.InstrumentsOptions().MetricsScope()

store := &client{
opts: opts,
kv: etcdKV,
watcher: etcdWatcher,
watchables: map[string]kv.ValueWatchable{},
retrier: retry.NewRetrier(opts.RetryOptions()),
logger: opts.InstrumentsOptions().Logger(),
Expand Down Expand Up @@ -86,7 +85,7 @@ func NewStore(etcdKV clientv3.KV, etcdWatcher clientv3.Watcher, opts Options) (k
}

wOpts := watchmanager.NewOptions().
SetWatcher(etcdWatcher).
SetClient(etcdKV).
SetUpdateFn(store.update).
SetTickAndStopFn(store.tickAndStop).
SetWatchOptions(clientWatchOpts).
Expand Down Expand Up @@ -124,8 +123,7 @@ type client struct {
sync.RWMutex

opts Options
kv clientv3.KV
watcher clientv3.Watcher
kv *clientv3.Client
watchables map[string]kv.ValueWatchable
retrier retry.Retrier
logger *zap.Logger
Expand Down
Loading