Skip to content

Commit

Permalink
Merge pull request etcd-io#18976 from serathius/watch-sync-refactor
Browse files Browse the repository at this point in the history
Watch sync refactor
  • Loading branch information
serathius authored Dec 2, 2024
2 parents 2269d60 + 3741000 commit 78885f6
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 166 deletions.
2 changes: 1 addition & 1 deletion server/storage/mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ func TestKVSnapshot(t *testing.T) {

func TestWatchableKVWatch(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}))
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b)

w := s.NewWatchStream()
Expand Down
13 changes: 8 additions & 5 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,18 @@ type watchableStore struct {
wg sync.WaitGroup
}

var _ WatchableKV = (*watchableStore)(nil)

// cancelFunc updates unsynced and synced maps when running
// cancel operations.
type cancelFunc func()

func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) WatchableKV {
return newWatchableStore(lg, b, le, cfg)
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
s := newWatchableStore(lg, b, le, cfg)
s.wg.Add(2)
go s.syncWatchersLoop()
go s.syncVictimsLoop()
return s
}

func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
Expand All @@ -95,9 +101,6 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg S
// use this store as the deleter so revokes trigger watch events
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
}
s.wg.Add(2)
go s.syncWatchersLoop()
go s.syncVictimsLoop()
return s
}

Expand Down
22 changes: 4 additions & 18 deletions server/storage/mvcc/watchable_store_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func BenchmarkWatchableStoreWatchPutUnsync(b *testing.B) {

func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
be, _ := betesting.NewDefaultTmpBackend(b)
s := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, be)

k := []byte("testkey")
Expand Down Expand Up @@ -122,21 +122,7 @@ func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
// we should put to simulate the real-world use cases.
func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
be, _ := betesting.NewDefaultTmpBackend(b)
s := NewStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})

// manually create watchableStore instead of newWatchableStore
// because newWatchableStore periodically calls syncWatchersLoop
// method to sync watchers in unsynced map. We want to keep watchers
// in unsynced for this benchmark.
ws := &watchableStore{
store: s,
unsynced: newWatcherGroup(),

// to make the test not crash from assigning to nil map.
// 'synced' doesn't get populated in this test.
synced: newWatcherGroup(),
stopc: make(chan struct{}),
}
ws := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})

defer cleanup(ws, be)

Expand All @@ -146,7 +132,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
// and force watchers to be in unsynced.
testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue, lease.NoLease)
ws.Put(testKey, testValue, lease.NoLease)

w := ws.NewWatchStream()
defer w.Close()
Expand Down Expand Up @@ -178,7 +164,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {

func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
be, _ := betesting.NewDefaultTmpBackend(b)
s := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})

defer cleanup(s, be)

Expand Down
Loading

0 comments on commit 78885f6

Please sign in to comment.