Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: apply rw mutex to cachekv #204

Merged
merged 3 commits into from
May 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 41 additions & 44 deletions store/cachekv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ type cValue struct {
}

// Store wraps an in-memory cache around an underlying types.KVStore.
// Set, Delete and Write for the same key must be called sequentially.
type Store struct {
mtx sync.Mutex
cache map[string]*cValue
unsortedCache map[string]struct{}
mtx sync.RWMutex
cache sync.Map
unsortedCache sync.Map
sortedCache *list.List // always ascending sorted
parent types.KVStore
}
Expand All @@ -38,8 +39,8 @@ var _ types.CacheKVStore = (*Store)(nil)
// NewStore creates a new Store object
func NewStore(parent types.KVStore) *Store {
return &Store{
cache: make(map[string]*cValue),
unsortedCache: make(map[string]struct{}),
cache: sync.Map{},
unsortedCache: sync.Map{},
sortedCache: list.New(),
parent: parent,
}
Expand All @@ -51,33 +52,31 @@ func (store *Store) GetStoreType() types.StoreType {
}

// Get implements types.KVStore.
func (store *Store) Get(key []byte) (value []byte) {
store.mtx.Lock()
defer store.mtx.Unlock()
func (store *Store) Get(key []byte) []byte {
defer telemetry.MeasureSince(time.Now(), "store", "cachekv", "get")

types.AssertValidKey(key)

cacheValue, ok := store.cache[string(key)]
if !ok {
value = store.parent.Get(key)
store.setCacheValue(key, value, false, false)
} else {
value = cacheValue.value
store.mtx.RLock()
defer store.mtx.RUnlock()
cacheValue, ok := store.cache.Load(string(key))
if ok {
return cacheValue.(*cValue).value
}

value := store.parent.Get(key)
store.setCacheValue(key, value, false, false)
return value
}

// Set implements types.KVStore.
func (store *Store) Set(key []byte, value []byte) {
store.mtx.Lock()
defer store.mtx.Unlock()
defer telemetry.MeasureSince(time.Now(), "store", "cachekv", "set")

types.AssertValidKey(key)
types.AssertValidValue(value)

store.mtx.Lock()
defer store.mtx.Unlock()
store.setCacheValue(key, value, false, true)
}

Expand All @@ -89,11 +88,11 @@ func (store *Store) Has(key []byte) bool {

// Delete implements types.KVStore.
func (store *Store) Delete(key []byte) {
store.mtx.Lock()
defer store.mtx.Unlock()
defer telemetry.MeasureSince(time.Now(), "store", "cachekv", "delete")

types.AssertValidKey(key)
store.mtx.Lock()
defer store.mtx.Unlock()
store.setCacheValue(key, nil, true, true)
}

Expand All @@ -105,20 +104,21 @@ func (store *Store) Write() {

// We need a copy of all of the keys.
// Not the best, but probably not a bottleneck depending.
keys := make([]string, 0, len(store.cache))

for key, dbValue := range store.cache {
if dbValue.dirty {
keys = append(keys, key)
keys := make([]string, 0)
store.cache.Range(func(key, value interface{}) bool {
if value.(*cValue).dirty {
keys = append(keys, key.(string))
}
}
return true
})

sort.Strings(keys)

// TODO: Consider allowing usage of Batch, which would allow the write to
// at least happen atomically.
for _, key := range keys {
cacheValue := store.cache[key]
v, _ := store.cache.Load(key)
cacheValue := v.(*cValue)

switch {
case cacheValue.deleted:
Expand All @@ -131,8 +131,8 @@ func (store *Store) Write() {
}

// Clear the cache
store.cache = make(map[string]*cValue)
store.unsortedCache = make(map[string]struct{})
store.cache = sync.Map{}
store.unsortedCache = sync.Map{}
store.sortedCache = list.New()
}

Expand Down Expand Up @@ -203,22 +203,19 @@ func byteSliceToStr(b []byte) string {
func (store *Store) dirtyItems(start, end []byte) {
unsorted := make([]*kv.Pair, 0)

n := len(store.unsortedCache)
for key := range store.unsortedCache {
store.unsortedCache.Range(func(k, _ interface{}) bool {
key := k.(string)
if IsKeyInDomain(strToByte(key), start, end) {
cacheValue := store.cache[key]
unsorted = append(unsorted, &kv.Pair{Key: []byte(key), Value: cacheValue.value})
cacheValue, ok := store.cache.Load(key)
if ok {
unsorted = append(unsorted, &kv.Pair{Key: []byte(key), Value: cacheValue.(*cValue).value})
}
}
}
return true
})

if len(unsorted) == n { // This pattern allows the Go compiler to emit the map clearing idiom for the entire map.
for key := range store.unsortedCache {
delete(store.unsortedCache, key)
}
} else { // Otherwise, normally delete the unsorted keys from the map.
for _, kv := range unsorted {
delete(store.unsortedCache, byteSliceToStr(kv.Key))
}
for _, kv := range unsorted {
store.unsortedCache.Delete(byteSliceToStr(kv.Key))
}

sort.Slice(unsorted, func(i, j int) bool {
Expand Down Expand Up @@ -254,12 +251,12 @@ func (store *Store) dirtyItems(start, end []byte) {

// Only entrypoint to mutate store.cache.
func (store *Store) setCacheValue(key, value []byte, deleted bool, dirty bool) {
store.cache[string(key)] = &cValue{
store.cache.Store(string(key), &cValue{
value: value,
deleted: deleted,
dirty: dirty,
}
})
if dirty {
store.unsortedCache[string(key)] = struct{}{}
store.unsortedCache.Store(string(key), struct{}{})
}
}
34 changes: 34 additions & 0 deletions store/cachekv/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cachekv_test

import (
"fmt"
"sync"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -308,6 +309,39 @@ func TestCacheKVMergeIteratorRandom(t *testing.T) {
}
}

// Set, Delete and Write for the same key must be called sequentially.
func TestCacheKVConcurrency(t *testing.T) {
const NumOps = 2000

st := newCacheKVStore()

wg := &sync.WaitGroup{}
wg.Add(NumOps * 3)
for i := 0; i < NumOps; i++ {
i := i
go func() {
st.Set([]byte(fmt.Sprintf("key%d", i)), []byte(fmt.Sprintf("value%d", i)))
st.Write()
wg.Done()
}()
go func() {
st.Get([]byte(fmt.Sprintf("key%d", i)))
wg.Done()
}()
go func() {
iter := st.Iterator([]byte("key0"), []byte(fmt.Sprintf("key%d", NumOps)))
for ; iter.Valid(); iter.Next() {
}
wg.Done()
}()
}
wg.Wait()

for i := 0; i < NumOps; i++ {
require.Equal(t, []byte(fmt.Sprintf("value%d", i)), st.Get([]byte(fmt.Sprintf("key%d", i))))
}
}

//-------------------------------------------------------------------------------------------
// do some random ops

Expand Down