Skip to content

Commit

Permalink
perf: apply rw mutex to cachekv
Browse files Browse the repository at this point in the history
  • Loading branch information
Sangyeop.lee committed May 24, 2021
1 parent 9546267 commit 163bb34
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 44 deletions.
85 changes: 41 additions & 44 deletions store/cachekv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ type cValue struct {
}

// Store wraps an in-memory cache around an underlying types.KVStore.
// Set, Delete and Write for the same key must be called sequentially.
type Store struct {
mtx sync.Mutex
cache map[string]*cValue
unsortedCache map[string]struct{}
mtx sync.RWMutex
cache sync.Map
unsortedCache sync.Map
sortedCache *list.List // always ascending sorted
parent types.KVStore
}
Expand All @@ -38,8 +39,8 @@ var _ types.CacheKVStore = (*Store)(nil)
// NewStore creates a new Store object
func NewStore(parent types.KVStore) *Store {
return &Store{
cache: make(map[string]*cValue),
unsortedCache: make(map[string]struct{}),
cache: sync.Map{},
unsortedCache: sync.Map{},
sortedCache: list.New(),
parent: parent,
}
Expand All @@ -51,33 +52,31 @@ func (store *Store) GetStoreType() types.StoreType {
}

// Get implements types.KVStore.
func (store *Store) Get(key []byte) (value []byte) {
store.mtx.Lock()
defer store.mtx.Unlock()
func (store *Store) Get(key []byte) []byte {
defer telemetry.MeasureSince(time.Now(), "store", "cachekv", "get")

types.AssertValidKey(key)

cacheValue, ok := store.cache[string(key)]
if !ok {
value = store.parent.Get(key)
store.setCacheValue(key, value, false, false)
} else {
value = cacheValue.value
store.mtx.RLock()
defer store.mtx.RUnlock()
cacheValue, ok := store.cache.Load(string(key))
if ok {
return cacheValue.(*cValue).value
}

value := store.parent.Get(key)
store.setCacheValue(key, value, false, false)
return value
}

// Set implements types.KVStore.
func (store *Store) Set(key []byte, value []byte) {
store.mtx.Lock()
defer store.mtx.Unlock()
defer telemetry.MeasureSince(time.Now(), "store", "cachekv", "set")

types.AssertValidKey(key)
types.AssertValidValue(value)

store.mtx.Lock()
defer store.mtx.Unlock()
store.setCacheValue(key, value, false, true)
}

Expand All @@ -89,11 +88,11 @@ func (store *Store) Has(key []byte) bool {

// Delete implements types.KVStore.
func (store *Store) Delete(key []byte) {
store.mtx.Lock()
defer store.mtx.Unlock()
defer telemetry.MeasureSince(time.Now(), "store", "cachekv", "delete")

types.AssertValidKey(key)
store.mtx.Lock()
defer store.mtx.Unlock()
store.setCacheValue(key, nil, true, true)
}

Expand All @@ -105,20 +104,21 @@ func (store *Store) Write() {

// We need a copy of all of the keys.
// Not the best, but probably not a bottleneck depending.
keys := make([]string, 0, len(store.cache))

for key, dbValue := range store.cache {
if dbValue.dirty {
keys = append(keys, key)
keys := make([]string, 0)
store.cache.Range(func(key, value interface{}) bool {
if value.(*cValue).dirty {
keys = append(keys, key.(string))
}
}
return true
})

sort.Strings(keys)

// TODO: Consider allowing usage of Batch, which would allow the write to
// at least happen atomically.
for _, key := range keys {
cacheValue := store.cache[key]
v, _ := store.cache.Load(key)
cacheValue := v.(*cValue)

switch {
case cacheValue.deleted:
Expand All @@ -131,8 +131,8 @@ func (store *Store) Write() {
}

// Clear the cache
store.cache = make(map[string]*cValue)
store.unsortedCache = make(map[string]struct{})
store.cache = sync.Map{}
store.unsortedCache = sync.Map{}
store.sortedCache = list.New()
}

Expand Down Expand Up @@ -203,22 +203,19 @@ func byteSliceToStr(b []byte) string {
func (store *Store) dirtyItems(start, end []byte) {
unsorted := make([]*kv.Pair, 0)

n := len(store.unsortedCache)
for key := range store.unsortedCache {
store.unsortedCache.Range(func(k, _ interface{}) bool {
key := k.(string)
if IsKeyInDomain(strToByte(key), start, end) {
cacheValue := store.cache[key]
unsorted = append(unsorted, &kv.Pair{Key: []byte(key), Value: cacheValue.value})
cacheValue, ok := store.cache.Load(key)
if ok {
unsorted = append(unsorted, &kv.Pair{Key: []byte(key), Value: cacheValue.(*cValue).value})
}
}
}
return true
})

if len(unsorted) == n { // This pattern allows the Go compiler to emit the map clearing idiom for the entire map.
for key := range store.unsortedCache {
delete(store.unsortedCache, key)
}
} else { // Otherwise, normally delete the unsorted keys from the map.
for _, kv := range unsorted {
delete(store.unsortedCache, byteSliceToStr(kv.Key))
}
for _, kv := range unsorted {
store.unsortedCache.Delete(byteSliceToStr(kv.Key))
}

sort.Slice(unsorted, func(i, j int) bool {
Expand Down Expand Up @@ -254,12 +251,12 @@ func (store *Store) dirtyItems(start, end []byte) {

// Only entrypoint to mutate store.cache.
func (store *Store) setCacheValue(key, value []byte, deleted bool, dirty bool) {
store.cache[string(key)] = &cValue{
store.cache.Store(string(key), &cValue{
value: value,
deleted: deleted,
dirty: dirty,
}
})
if dirty {
store.unsortedCache[string(key)] = struct{}{}
store.unsortedCache.Store(string(key), struct{}{})
}
}
34 changes: 34 additions & 0 deletions store/cachekv/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cachekv_test

import (
"fmt"
"sync"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -308,6 +309,39 @@ func TestCacheKVMergeIteratorRandom(t *testing.T) {
}
}

// Set, Delete and Write for the same key must be called sequentially.
func TestCacheKVConcurrency(t *testing.T) {
const NumOps = 2000

st := newCacheKVStore()

wg := &sync.WaitGroup{}
wg.Add(NumOps * 3)
for i := 0; i < NumOps; i++ {
i := i
go func() {
st.Set([]byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("value%d", i)))
st.Write()
wg.Done()
}()
go func() {
st.Get([]byte(fmt.Sprintf("key%d", i)))
wg.Done()
}()
go func() {
iter := st.Iterator([]byte("key0"), []byte(fmt.Sprintf("key%d", NumOps)))
for ; iter.Valid(); iter.Next() {
}
wg.Done()
}()
}
wg.Wait()

for i := 0; i < NumOps; i++ {
require.Equal(t, []byte(fmt.Sprintf("value%d", i)), st.Get([]byte(fmt.Sprintf("key%d", i))))
}
}

//-------------------------------------------------------------------------------------------
// do some random ops

Expand Down

0 comments on commit 163bb34

Please sign in to comment.