From 948b445e6e2198c6af2fe2cc1dad1fb139c52ed1 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 29 Nov 2024 16:52:59 +0100 Subject: [PATCH] Extract rangeEvents function Signed-off-by: Marek Siarkowicz --- server/storage/mvcc/watchable_store.go | 42 +++++--- server/storage/mvcc/watchable_store_test.go | 106 ++++++++++++++++++++ 2 files changed, 132 insertions(+), 16 deletions(-) diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index 22d46e5048d1..7d57f50ae58a 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -357,20 +357,7 @@ func (s *watchableStore) syncWatchers() int { compactionRev := s.store.compactMainRev wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev) - minBytes, maxBytes := NewRevBytes(), NewRevBytes() - minBytes = RevToBytes(Revision{Main: minRev}, minBytes) - maxBytes = RevToBytes(Revision{Main: curRev + 1}, maxBytes) - - // UnsafeRange returns keys and values. And in boltdb, keys are revisions. - // values are actual key-value pairs in backend. - tx := s.store.b.ReadTx() - tx.RLock() - revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0) - evs := kvsToEvents(s.store.lg, wg, revs, vs) - // Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy. - // We can only unlock after Unmarshal, which will do deep copy. - // Otherwise we will trigger SIGSEGV during boltdb re-mmap. - tx.RUnlock() + evs := rangeEvents(s.store.lg, s.store.b, minRev, curRev+1, wg) victims := make(watcherBatch) wb := newWatcherBatch(wg, evs) @@ -422,15 +409,38 @@ func (s *watchableStore) syncWatchers() int { return s.unsynced.size() } +// rangeEvents returns events in range [minRev, maxRev). +func rangeEvents(lg *zap.Logger, b backend.Backend, minRev, maxRev int64, c contains) []mvccpb.Event { + minBytes, maxBytes := NewRevBytes(), NewRevBytes() + minBytes = RevToBytes(Revision{Main: minRev}, minBytes) + maxBytes = RevToBytes(Revision{Main: maxRev}, maxBytes) + + // UnsafeRange returns keys and values. And in boltdb, keys are revisions. + // values are actual key-value pairs in backend. + tx := b.ReadTx() + tx.RLock() + revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0) + evs := kvsToEvents(lg, c, revs, vs) + // Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy. + // We can only unlock after Unmarshal, which will do deep copy. + // Otherwise we will trigger SIGSEGV during boltdb re-mmap. + tx.RUnlock() + return evs +} + +type contains interface { + contains(string) bool +} + // kvsToEvents gets all events for the watchers from all key-value pairs -func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) { +func kvsToEvents(lg *zap.Logger, c contains, revs, vals [][]byte) (evs []mvccpb.Event) { for i, v := range vals { var kv mvccpb.KeyValue if err := kv.Unmarshal(v); err != nil { lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err)) } - if !wg.contains(string(kv.Key)) { + if !c.contains(string(kv.Key)) { continue } diff --git a/server/storage/mvcc/watchable_store_test.go b/server/storage/mvcc/watchable_store_test.go index c15bd12326f7..98f93bdbed2c 100644 --- a/server/storage/mvcc/watchable_store_test.go +++ b/server/storage/mvcc/watchable_store_test.go @@ -164,6 +164,112 @@ func TestSyncWatchers(t *testing.T) { } } +func TestRangeEvents(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + lg := zaptest.NewLogger(t) + s := NewStore(lg, b, &lease.FakeLessor{}, StoreConfig{}) + + defer cleanup(s, b) + + foo1 := []byte("foo1") + foo2 := []byte("foo2") + foo3 := []byte("foo3") + value := []byte("bar") + s.Put(foo1, value, lease.NoLease) + s.Put(foo2, value, lease.NoLease) + s.Put(foo3, value, lease.NoLease) + s.DeleteRange(foo1, foo3) // Deletes "foo1" and "foo2" generating 2 events + + expectEvents := []mvccpb.Event{ + { + Type: mvccpb.PUT, + Kv: &mvccpb.KeyValue{ + Key: foo1, + CreateRevision: 2, + ModRevision: 2, + Version: 1, + Value: value, + }, + }, + { + Type: mvccpb.PUT, + Kv: &mvccpb.KeyValue{ + Key: foo2, + CreateRevision: 3, + ModRevision: 3, + Version: 1, + Value: value, + }, + }, + { + Type: mvccpb.PUT, + Kv: &mvccpb.KeyValue{ + Key: foo3, + CreateRevision: 4, + ModRevision: 4, + Version: 1, + Value: value, + }, + }, + { + Type: mvccpb.DELETE, + Kv: &mvccpb.KeyValue{ + Key: foo1, + ModRevision: 5, + }, + }, + { + Type: mvccpb.DELETE, + Kv: &mvccpb.KeyValue{ + Key: foo2, + ModRevision: 5, + }, + }, + } + + tcs := []struct { + minRev int64 + maxRev int64 + expectEvents []mvccpb.Event + }{ + // maxRev, top to bottom + {minRev: 2, maxRev: 6, expectEvents: expectEvents[0:5]}, + {minRev: 2, maxRev: 5, expectEvents: expectEvents[0:3]}, + {minRev: 2, maxRev: 4, expectEvents: expectEvents[0:2]}, + {minRev: 2, maxRev: 3, expectEvents: expectEvents[0:1]}, + {minRev: 2, maxRev: 2, expectEvents: expectEvents[0:0]}, + + // minRev, bottom to top + {minRev: 2, maxRev: 6, expectEvents: expectEvents[0:5]}, + {minRev: 3, maxRev: 6, expectEvents: expectEvents[1:5]}, + {minRev: 4, maxRev: 6, expectEvents: expectEvents[2:5]}, + {minRev: 5, maxRev: 6, expectEvents: expectEvents[3:5]}, + {minRev: 6, maxRev: 6, expectEvents: expectEvents[0:0]}, + + // Moving window algorithm, first increase maxRev, then increase minRev, repeat. + {minRev: 2, maxRev: 2, expectEvents: expectEvents[0:0]}, + {minRev: 2, maxRev: 3, expectEvents: expectEvents[0:1]}, + {minRev: 2, maxRev: 4, expectEvents: expectEvents[0:2]}, + {minRev: 3, maxRev: 4, expectEvents: expectEvents[1:2]}, + {minRev: 3, maxRev: 5, expectEvents: expectEvents[1:3]}, + {minRev: 4, maxRev: 5, expectEvents: expectEvents[2:3]}, + {minRev: 4, maxRev: 6, expectEvents: expectEvents[2:5]}, + {minRev: 5, maxRev: 6, expectEvents: expectEvents[3:5]}, + {minRev: 6, maxRev: 6, expectEvents: expectEvents[5:5]}, + } + for _, tc := range tcs { + t.Run(fmt.Sprintf("%d-%d", tc.minRev, tc.maxRev), func(t *testing.T) { + assert.ElementsMatch(t, tc.expectEvents, rangeEvents(lg, b, tc.minRev, tc.maxRev, fakeContains{})) + }) + } +} + +type fakeContains struct{} + +func (f fakeContains) contains(string) bool { + return true +} + // TestWatchCompacted tests a watcher that watches on a compacted revision. func TestWatchCompacted(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t)