Skip to content

Commit

Permalink
watchable_store_test.go: add unit test TestWatchNoEventLossOnCompact
Browse files Browse the repository at this point in the history
Signed-off-by: Chao Chen <[email protected]>
  • Loading branch information
chaochn47 committed Mar 14, 2024
1 parent 861b6a5 commit d855da4
Showing 1 changed file with 67 additions and 0 deletions.
67 changes: 67 additions & 0 deletions server/storage/mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"go.etcd.io/etcd/api/v3/mvccpb"
Expand Down Expand Up @@ -250,6 +251,72 @@ func TestWatchCompacted(t *testing.T) {
}
}

func TestWatchNoEventLossOnCompact(t *testing.T) {
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync

b, _ := betesting.NewDefaultTmpBackend(t)
lg := zaptest.NewLogger(t)
s := newWatchableStore(lg, b, &lease.FakeLessor{}, StoreConfig{})

defer func() {
cleanup(s, b)
chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync
}()

chanBufLen, maxWatchersPerSync = 1, 4
testKey, testValue := []byte("foo"), []byte("bar")

maxRev := 10
compactRev := int64(5)
for i := 0; i < maxRev; i++ {
s.Put(testKey, testValue, lease.NoLease)
}
_, err := s.Compact(traceutil.TODO(), compactRev)
if err != nil {
t.Fatalf("failed to compact kv (%v)", err)
}

w := s.NewWatchStream()
defer w.Close()

// 1. create unsyncd watchers with startRev < compactRev
watchersStartRevs := []int64{1, 1}
var watchersNextRev = make(map[WatchID]int64)
for _, startRev := range watchersStartRevs {
id, err := w.Watch(0, testKey, nil, startRev)
if err != nil {
t.Fatalf("failed to create watch with startRev %d", startRev)
}
watchersNextRev[id] = startRev
}
// 2. create unsyncd watchers with compactRev < startRev < currentRev
w.Watch(0, testKey, nil, 6)
// 3. fill up w.Chan() with 1 buf via 2 compacted watch response
s.syncWatchers()

// 4. expect 2 watchers to send out compacted watch response
compactedResponseCount := 0
for compactedResponseCount < len(watchersStartRevs) {
select {
case resp := <-w.Chan():
if resp.CompactRevision != 0 {
lg.Info("got compacted watch response",
zap.Int64("watch-id", int64(resp.WatchID)),
zap.Int64("compact-revision", resp.CompactRevision))
compactedResponseCount++
}
for _, ev := range resp.Events {
if nextRev, ok := watchersNextRev[resp.WatchID]; ok {
if ev.Kv.ModRevision != nextRev {
t.Fatalf("got event revision %d but want %d for watcher with watch ID %d", ev.Kv.ModRevision, nextRev, resp.WatchID)
}
watchersNextRev[resp.WatchID]++
}
}
}
}
}

func TestWatchFutureRev(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
Expand Down

0 comments on commit d855da4

Please sign in to comment.