diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index c181ee7fa70..1987936efd4 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -15,16 +15,17 @@ package mvcc import ( - "go.etcd.io/etcd/auth" - "go.etcd.io/etcd/clientv3" "sync" "time" + "go.uber.org/zap" + + "go.etcd.io/etcd/auth" + "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" "go.etcd.io/etcd/pkg/traceutil" - "go.uber.org/zap" ) // non-const so modifiable by tests @@ -370,6 +371,11 @@ func (s *watchableStore) syncWatchers() int { var victims watcherBatch wb := newWatcherBatch(wg, evs) for w := range wg.watchers { + if w.minRev < compactionRev { + // Skip the watcher that failed to send compacted watch response due to w.ch is full. + // Next retry of syncWatchers would try to resend the compacted watch response to w.ch + continue + } w.minRev = curRev + 1 eb, ok := wb[w] diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index 1bfd7b81955..382f044782e 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -23,11 +23,14 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" "go.etcd.io/etcd/pkg/traceutil" - "go.uber.org/zap" ) func TestWatch(t *testing.T) { @@ -259,6 +262,63 @@ func TestWatchCompacted(t *testing.T) { } } +func TestWatchNoEventLossOnCompact(t *testing.T) { + oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync + b, tmpPath := backend.NewDefaultTmpBackend() + lg := zaptest.NewLogger(t) + s := newWatchableStore(lg, b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + + defer func() { + s.store.Close() + os.Remove(tmpPath) + chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync + }() + + chanBufLen, maxWatchersPerSync = 1, 4 + testKey, testValue := []byte("foo"), []byte("bar") + + maxRev := 10 + compactRev := int64(5) + for i := 0; i < maxRev; i++ { + s.Put(testKey, testValue, lease.NoLease) + } + _, err := s.Compact(traceutil.TODO(), compactRev) + require.NoErrorf(t, err, "failed to compact kv (%v)", err) + + w := s.NewWatchStream() + defer w.Close() + + watchers := map[WatchID]int64{ + 0: 1, + 1: 1, // create unsyncd watchers with startRev < compactRev + 2: 6, // create unsyncd watchers with compactRev < startRev < currentRev + } + for id, startRev := range watchers { + _, err := w.Watch(id, testKey, nil, startRev) + require.NoError(t, err) + } + // fill up w.Chan() with 1 buf via 2 compacted watch response + s.syncWatchers() + + for len(watchers) > 0 { + resp := <-w.Chan() + if resp.CompactRevision != 0 { + require.Equal(t, resp.CompactRevision, compactRev) + require.Contains(t, watchers, resp.WatchID) + delete(watchers, resp.WatchID) + continue + } + nextRev := watchers[resp.WatchID] + for _, ev := range resp.Events { + require.Equalf(t, nextRev, ev.Kv.ModRevision, "got event revision %d but want %d for watcher with watch ID %d", ev.Kv.ModRevision, nextRev, resp.WatchID) + nextRev++ + } + if nextRev == s.rev()+1 { + delete(watchers, resp.WatchID) + } + } +} + func TestWatchFutureRev(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})