diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 7be9641d402..be38af1338f 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -267,6 +267,7 @@ func (s *watchableStore) Restore(b backend.Backend) error { } for wa := range s.synced.watchers { + wa.restore = true s.unsynced.add(wa) } s.synced = newWatcherGroup() @@ -549,6 +550,14 @@ type watcher struct { // compacted is set when the watcher is removed because of compaction compacted bool + // restore is true when the watcher is being restored from leader snapshot + // which means that this watcher has just been moved from "synced" to "unsynced" + // watcher group, possibly with a future revision when it was first added + // to the synced watcher + // "unsynced" watcher revision must always be <= current revision, + // except when the watcher were to be moved from "synced" watcher group + restore bool + // minRev is the minimum revision update the watcher will accept minRev int64 id WatchID diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index cef00d41f57..3e3c4a51c16 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -336,6 +336,62 @@ func TestWatchRestore(t *testing.T) { t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration } +// TestWatchRestoreSyncedWatcher tests such a case that: +// 1. watcher is created with a future revision "math.MaxInt64 - 2" +// 2. watcher with a future revision is added to "synced" watcher group +// 3. restore/overwrite storage with snapshot of a higher lasat revision +// 4. restore operation moves "synced" to "unsynced" watcher group +// 5. choose the watcher from step 1, without panic +func TestWatchRestoreSyncedWatcher(t *testing.T) { + b1, b1Path := backend.NewDefaultTmpBackend() + s1 := newWatchableStore(b1, &lease.FakeLessor{}, nil) + defer cleanup(s1, b1, b1Path) + + b2, b2Path := backend.NewDefaultTmpBackend() + s2 := newWatchableStore(b2, &lease.FakeLessor{}, nil) + defer cleanup(s2, b2, b2Path) + + testKey, testValue := []byte("foo"), []byte("bar") + rev := s1.Put(testKey, testValue, lease.NoLease) + startRev := rev + 2 + + // create a watcher with a future revision + // add to "synced" watcher group (startRev > s.store.currentRev) + w1 := s1.NewWatchStream() + w1.Watch(testKey, nil, startRev) + + // make "s2" ends up with a higher last revision + s2.Put(testKey, testValue, lease.NoLease) + s2.Put(testKey, testValue, lease.NoLease) + + // overwrite storage with higher revisions + if err := s1.Restore(b2); err != nil { + t.Fatal(err) + } + + // wait for next "syncWatchersLoop" iteration + // and the unsynced watcher should be chosen + time.Sleep(2 * time.Second) + + // trigger events for "startRev" + s1.Put(testKey, testValue, lease.NoLease) + + select { + case resp := <-w1.Chan(): + if resp.Revision != startRev { + t.Fatalf("resp.Revision expect %d, got %d", startRev, resp.Revision) + } + if len(resp.Events) != 1 { + t.Fatalf("len(resp.Events) expect 1, got %d", len(resp.Events)) + } + if resp.Events[0].Kv.ModRevision != startRev { + t.Fatalf("resp.Events[0].Kv.ModRevision expect %d, got %d", startRev, resp.Events[0].Kv.ModRevision) + } + case <-time.After(time.Second): + t.Fatal("failed to receive event in 1 second") + } +} + // TestWatchBatchUnsynced tests batching on unsynced watchers func TestWatchBatchUnsynced(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() diff --git a/mvcc/watcher_group.go b/mvcc/watcher_group.go index 2710c1cc940..8920656d1b8 100644 --- a/mvcc/watcher_group.go +++ b/mvcc/watcher_group.go @@ -15,6 +15,7 @@ package mvcc import ( + "fmt" "math" "github.com/coreos/etcd/mvcc/mvccpb" @@ -238,7 +239,15 @@ func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 { minRev := int64(math.MaxInt64) for w := range wg.watchers { if w.minRev > curRev { - panic("watcher current revision should not exceed current revision") + // after network partition, possibly choosing future revision watcher from restore operation + // with watch key "proxy-namespace__lostleader" and revision "math.MaxInt64 - 2" + // do not panic when such watcher had been moved from "synced" watcher during restore operation + if !w.restore { + panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev)) + } + + // mark 'restore' done, since it's chosen + w.restore = false } if w.minRev < compactRev { select {