From d855da4b55f91964de98532afe73a829c536e486 Mon Sep 17 00:00:00 2001 From: Chao Chen Date: Wed, 13 Mar 2024 22:30:05 -0700 Subject: [PATCH] watchable_store_test.go: add unit test TestWatchNoEventLossOnCompact Signed-off-by: Chao Chen --- server/storage/mvcc/watchable_store_test.go | 67 +++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/server/storage/mvcc/watchable_store_test.go b/server/storage/mvcc/watchable_store_test.go index 86e35697f326..96dc717e7b3d 100644 --- a/server/storage/mvcc/watchable_store_test.go +++ b/server/storage/mvcc/watchable_store_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "go.uber.org/zap" "go.uber.org/zap/zaptest" "go.etcd.io/etcd/api/v3/mvccpb" @@ -250,6 +251,72 @@ func TestWatchCompacted(t *testing.T) { } } +func TestWatchNoEventLossOnCompact(t *testing.T) { + oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync + + b, _ := betesting.NewDefaultTmpBackend(t) + lg := zaptest.NewLogger(t) + s := newWatchableStore(lg, b, &lease.FakeLessor{}, StoreConfig{}) + + defer func() { + cleanup(s, b) + chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync + }() + + chanBufLen, maxWatchersPerSync = 1, 4 + testKey, testValue := []byte("foo"), []byte("bar") + + maxRev := 10 + compactRev := int64(5) + for i := 0; i < maxRev; i++ { + s.Put(testKey, testValue, lease.NoLease) + } + _, err := s.Compact(traceutil.TODO(), compactRev) + if err != nil { + t.Fatalf("failed to compact kv (%v)", err) + } + + w := s.NewWatchStream() + defer w.Close() + + // 1. create unsyncd watchers with startRev < compactRev + watchersStartRevs := []int64{1, 1} + var watchersNextRev = make(map[WatchID]int64) + for _, startRev := range watchersStartRevs { + id, err := w.Watch(0, testKey, nil, startRev) + if err != nil { + t.Fatalf("failed to create watch with startRev %d", startRev) + } + watchersNextRev[id] = startRev + } + // 2. create unsyncd watchers with compactRev < startRev < currentRev + w.Watch(0, testKey, nil, 6) + // 3. fill up w.Chan() with 1 buf via 2 compacted watch response + s.syncWatchers() + + // 4. expect 2 watchers to send out compacted watch response + compactedResponseCount := 0 + for compactedResponseCount < len(watchersStartRevs) { + select { + case resp := <-w.Chan(): + if resp.CompactRevision != 0 { + lg.Info("got compacted watch response", + zap.Int64("watch-id", int64(resp.WatchID)), + zap.Int64("compact-revision", resp.CompactRevision)) + compactedResponseCount++ + } + for _, ev := range resp.Events { + if nextRev, ok := watchersNextRev[resp.WatchID]; ok { + if ev.Kv.ModRevision != nextRev { + t.Fatalf("got event revision %d but want %d for watcher with watch ID %d", ev.Kv.ModRevision, nextRev, resp.WatchID) + } + watchersNextRev[resp.WatchID]++ + } + } + } + } +} + func TestWatchFutureRev(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})