From 163bb34b6259025b8ed851d75acbfad8d592e420 Mon Sep 17 00:00:00 2001 From: "Sangyeop.lee" Date: Mon, 24 May 2021 14:03:43 +0900 Subject: [PATCH] perf: apply rw mutex to cachekv --- store/cachekv/store.go | 85 ++++++++++++++++++------------------- store/cachekv/store_test.go | 34 +++++++++++++++ 2 files changed, 75 insertions(+), 44 deletions(-) diff --git a/store/cachekv/store.go b/store/cachekv/store.go index 52b86610b1..47dd5738b7 100644 --- a/store/cachekv/store.go +++ b/store/cachekv/store.go @@ -25,10 +25,11 @@ type cValue struct { } // Store wraps an in-memory cache around an underlying types.KVStore. +// Set, Delete and Write for the same key must be called sequentially. type Store struct { - mtx sync.Mutex - cache map[string]*cValue - unsortedCache map[string]struct{} + mtx sync.RWMutex + cache sync.Map + unsortedCache sync.Map sortedCache *list.List // always ascending sorted parent types.KVStore } @@ -38,8 +39,8 @@ var _ types.CacheKVStore = (*Store)(nil) // NewStore creates a new Store object func NewStore(parent types.KVStore) *Store { return &Store{ - cache: make(map[string]*cValue), - unsortedCache: make(map[string]struct{}), + cache: sync.Map{}, + unsortedCache: sync.Map{}, sortedCache: list.New(), parent: parent, } @@ -51,33 +52,31 @@ func (store *Store) GetStoreType() types.StoreType { } // Get implements types.KVStore. -func (store *Store) Get(key []byte) (value []byte) { - store.mtx.Lock() - defer store.mtx.Unlock() +func (store *Store) Get(key []byte) []byte { defer telemetry.MeasureSince(time.Now(), "store", "cachekv", "get") types.AssertValidKey(key) - - cacheValue, ok := store.cache[string(key)] - if !ok { - value = store.parent.Get(key) - store.setCacheValue(key, value, false, false) - } else { - value = cacheValue.value + store.mtx.RLock() + defer store.mtx.RUnlock() + cacheValue, ok := store.cache.Load(string(key)) + if ok { + return cacheValue.(*cValue).value } + value := store.parent.Get(key) + store.setCacheValue(key, value, false, false) return value } // Set implements types.KVStore. func (store *Store) Set(key []byte, value []byte) { - store.mtx.Lock() - defer store.mtx.Unlock() defer telemetry.MeasureSince(time.Now(), "store", "cachekv", "set") types.AssertValidKey(key) types.AssertValidValue(value) + store.mtx.Lock() + defer store.mtx.Unlock() store.setCacheValue(key, value, false, true) } @@ -89,11 +88,11 @@ func (store *Store) Has(key []byte) bool { // Delete implements types.KVStore. func (store *Store) Delete(key []byte) { - store.mtx.Lock() - defer store.mtx.Unlock() defer telemetry.MeasureSince(time.Now(), "store", "cachekv", "delete") types.AssertValidKey(key) + store.mtx.Lock() + defer store.mtx.Unlock() store.setCacheValue(key, nil, true, true) } @@ -105,20 +104,21 @@ func (store *Store) Write() { // We need a copy of all of the keys. // Not the best, but probably not a bottleneck depending. - keys := make([]string, 0, len(store.cache)) - - for key, dbValue := range store.cache { - if dbValue.dirty { - keys = append(keys, key) + keys := make([]string, 0) + store.cache.Range(func(key, value interface{}) bool { + if value.(*cValue).dirty { + keys = append(keys, key.(string)) } - } + return true + }) sort.Strings(keys) // TODO: Consider allowing usage of Batch, which would allow the write to // at least happen atomically. for _, key := range keys { - cacheValue := store.cache[key] + v, _ := store.cache.Load(key) + cacheValue := v.(*cValue) switch { case cacheValue.deleted: @@ -131,8 +131,8 @@ func (store *Store) Write() { } // Clear the cache - store.cache = make(map[string]*cValue) - store.unsortedCache = make(map[string]struct{}) + store.cache = sync.Map{} + store.unsortedCache = sync.Map{} store.sortedCache = list.New() } @@ -203,22 +203,19 @@ func byteSliceToStr(b []byte) string { func (store *Store) dirtyItems(start, end []byte) { unsorted := make([]*kv.Pair, 0) - n := len(store.unsortedCache) - for key := range store.unsortedCache { + store.unsortedCache.Range(func(k, _ interface{}) bool { + key := k.(string) if IsKeyInDomain(strToByte(key), start, end) { - cacheValue := store.cache[key] - unsorted = append(unsorted, &kv.Pair{Key: []byte(key), Value: cacheValue.value}) + cacheValue, ok := store.cache.Load(key) + if ok { + unsorted = append(unsorted, &kv.Pair{Key: []byte(key), Value: cacheValue.(*cValue).value}) + } } - } + return true + }) - if len(unsorted) == n { // This pattern allows the Go compiler to emit the map clearing idiom for the entire map. - for key := range store.unsortedCache { - delete(store.unsortedCache, key) - } - } else { // Otherwise, normally delete the unsorted keys from the map. - for _, kv := range unsorted { - delete(store.unsortedCache, byteSliceToStr(kv.Key)) - } + for _, kv := range unsorted { + store.unsortedCache.Delete(byteSliceToStr(kv.Key)) } sort.Slice(unsorted, func(i, j int) bool { @@ -254,12 +251,12 @@ func (store *Store) dirtyItems(start, end []byte) { // Only entrypoint to mutate store.cache. func (store *Store) setCacheValue(key, value []byte, deleted bool, dirty bool) { - store.cache[string(key)] = &cValue{ + store.cache.Store(string(key), &cValue{ value: value, deleted: deleted, dirty: dirty, - } + }) if dirty { - store.unsortedCache[string(key)] = struct{}{} + store.unsortedCache.Store(string(key), struct{}{}) } } diff --git a/store/cachekv/store_test.go b/store/cachekv/store_test.go index 4648a1303e..e9505ab1b9 100644 --- a/store/cachekv/store_test.go +++ b/store/cachekv/store_test.go @@ -2,6 +2,7 @@ package cachekv_test import ( "fmt" + "sync" "testing" "github.com/stretchr/testify/require" @@ -308,6 +309,39 @@ func TestCacheKVMergeIteratorRandom(t *testing.T) { } } +// Set, Delete and Write for the same key must be called sequentially. +func TestCacheKVConcurrency(t *testing.T) { + const NumOps = 2000 + + st := newCacheKVStore() + + wg := &sync.WaitGroup{} + wg.Add(NumOps * 3) + for i := 0; i < NumOps; i++ { + i := i + go func() { + st.Set([]byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("value%d", i))) + st.Write() + wg.Done() + }() + go func() { + st.Get([]byte(fmt.Sprintf("key%d", i))) + wg.Done() + }() + go func() { + iter := st.Iterator([]byte("key0"), []byte(fmt.Sprintf("key%d", NumOps))) + for ; iter.Valid(); iter.Next() { + } + wg.Done() + }() + } + wg.Wait() + + for i := 0; i < NumOps; i++ { + require.Equal(t, []byte(fmt.Sprintf("value%d", i)), st.Get([]byte(fmt.Sprintf("key%d", i)))) + } +} + //------------------------------------------------------------------------------------------- // do some random ops