-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: Make CacheKV store interleaved iterator and insertion not O(n^2) #10026
Changes from 3 commits
b9e6389
4567473
3a4d278
f0aa753
8de7613
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,107 +1,50 @@ | ||
package cachekv | ||
|
||
import ( | ||
"errors" | ||
|
||
dbm "github.com/tendermint/tm-db" | ||
|
||
"github.com/cosmos/cosmos-sdk/types/kv" | ||
"github.com/cosmos/cosmos-sdk/store/types" | ||
) | ||
|
||
// Iterates over iterKVCache items. | ||
// if key is nil, means it was deleted. | ||
// Implements Iterator. | ||
type memIterator struct { | ||
start, end []byte | ||
items []*kv.Pair | ||
ascending bool | ||
} | ||
|
||
func newMemIterator(start, end []byte, items *kv.List, ascending bool) *memIterator { | ||
itemsInDomain := make([]*kv.Pair, 0, items.Len()) | ||
|
||
var entered bool | ||
|
||
for e := items.Front(); e != nil; e = e.Next() { | ||
item := e.Value | ||
if !dbm.IsKeyInDomain(item.Key, start, end) { | ||
if entered { | ||
break | ||
} | ||
|
||
continue | ||
} | ||
|
||
itemsInDomain = append(itemsInDomain, item) | ||
entered = true | ||
} | ||
types.Iterator | ||
|
||
return &memIterator{ | ||
start: start, | ||
end: end, | ||
items: itemsInDomain, | ||
ascending: ascending, | ||
} | ||
deleted map[string]struct{} | ||
} | ||
|
||
func (mi *memIterator) Domain() ([]byte, []byte) { | ||
return mi.start, mi.end | ||
} | ||
func newMemIterator(start, end []byte, items *dbm.MemDB, deleted map[string]struct{}, ascending bool) *memIterator { | ||
var iter types.Iterator | ||
var err error | ||
|
||
func (mi *memIterator) Valid() bool { | ||
return len(mi.items) > 0 | ||
} | ||
if ascending { | ||
iter, err = items.Iterator(start, end) | ||
} else { | ||
iter, err = items.ReverseIterator(start, end) | ||
} | ||
|
||
func (mi *memIterator) assertValid() { | ||
if err := mi.Error(); err != nil { | ||
if err != nil { | ||
panic(err) | ||
} | ||
} | ||
|
||
func (mi *memIterator) Next() { | ||
mi.assertValid() | ||
|
||
if mi.ascending { | ||
mi.items = mi.items[1:] | ||
} else { | ||
mi.items = mi.items[:len(mi.items)-1] | ||
newDeleted := make(map[string]struct{}) | ||
for k, v := range deleted { | ||
newDeleted[k] = v | ||
} | ||
} | ||
|
||
func (mi *memIterator) Key() []byte { | ||
mi.assertValid() | ||
return &memIterator{ | ||
Iterator: iter, | ||
|
||
if mi.ascending { | ||
return mi.items[0].Key | ||
deleted: newDeleted, | ||
} | ||
|
||
return mi.items[len(mi.items)-1].Key | ||
} | ||
|
||
func (mi *memIterator) Value() []byte { | ||
mi.assertValid() | ||
|
||
if mi.ascending { | ||
return mi.items[0].Value | ||
} | ||
|
||
return mi.items[len(mi.items)-1].Value | ||
} | ||
|
||
func (mi *memIterator) Close() error { | ||
mi.start = nil | ||
mi.end = nil | ||
mi.items = nil | ||
|
||
return nil | ||
} | ||
|
||
// Error returns an error if the memIterator is invalid defined by the Valid | ||
// method. | ||
func (mi *memIterator) Error() error { | ||
if !mi.Valid() { | ||
return errors.New("invalid memIterator") | ||
key := mi.Iterator.Key() | ||
if _, ok := mi.deleted[string(key)]; ok { | ||
return nil | ||
} | ||
|
||
return nil | ||
return mi.Iterator.Value() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,9 +3,11 @@ package cachekv | |
import ( | ||
"bytes" | ||
"io" | ||
"reflect" | ||
"sort" | ||
"sync" | ||
"time" | ||
"unsafe" | ||
|
||
dbm "github.com/tendermint/tm-db" | ||
|
||
|
@@ -20,17 +22,17 @@ import ( | |
// If value is nil but deleted is false, it means the parent doesn't have the | ||
// key. (No need to delete upon Write()) | ||
type cValue struct { | ||
value []byte | ||
deleted bool | ||
dirty bool | ||
value []byte | ||
dirty bool | ||
} | ||
|
||
// Store wraps an in-memory cache around an underlying types.KVStore. | ||
type Store struct { | ||
mtx sync.Mutex | ||
cache map[string]*cValue | ||
deleted map[string]struct{} | ||
unsortedCache map[string]struct{} | ||
sortedCache *kv.List // always ascending sorted | ||
sortedCache *dbm.MemDB // always ascending sorted | ||
parent types.KVStore | ||
} | ||
|
||
|
@@ -40,8 +42,9 @@ var _ types.CacheKVStore = (*Store)(nil) | |
func NewStore(parent types.KVStore) *Store { | ||
return &Store{ | ||
cache: make(map[string]*cValue), | ||
deleted: make(map[string]struct{}), | ||
unsortedCache: make(map[string]struct{}), | ||
sortedCache: kv.NewList(), | ||
sortedCache: dbm.NewMemDB(), | ||
parent: parent, | ||
} | ||
} | ||
|
@@ -122,7 +125,7 @@ func (store *Store) Write() { | |
cacheValue := store.cache[key] | ||
|
||
switch { | ||
case cacheValue.deleted: | ||
case store.isDeleted(key): | ||
store.parent.Delete([]byte(key)) | ||
case cacheValue.value == nil: | ||
// Skip, it already doesn't exist in parent. | ||
|
@@ -133,8 +136,9 @@ func (store *Store) Write() { | |
|
||
// Clear the cache | ||
store.cache = make(map[string]*cValue) | ||
store.deleted = make(map[string]struct{}) | ||
store.unsortedCache = make(map[string]struct{}) | ||
store.sortedCache = kv.NewList() | ||
store.sortedCache = dbm.NewMemDB() | ||
} | ||
|
||
// CacheWrap implements CacheWrapper. | ||
|
@@ -178,23 +182,53 @@ func (store *Store) iterator(start, end []byte, ascending bool) types.Iterator { | |
} | ||
|
||
store.dirtyItems(start, end) | ||
cache = newMemIterator(start, end, store.sortedCache, ascending) | ||
cache = newMemIterator(start, end, store.sortedCache, store.deleted, ascending) | ||
|
||
return newCacheMergeIterator(parent, cache, ascending) | ||
} | ||
|
||
// strToByte is meant to make a zero allocation conversion | ||
// from string -> []byte to speed up operations, it is not meant | ||
// to be used generally, but for a specific pattern to check for available | ||
// keys within a domain. | ||
func strToByte(s string) []byte { | ||
var b []byte | ||
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b)) | ||
hdr.Cap = len(s) | ||
hdr.Len = len(s) | ||
hdr.Data = (*reflect.StringHeader)(unsafe.Pointer(&s)).Data | ||
return b | ||
} | ||
|
||
// Constructs a slice of dirty items, to use w/ memIterator. | ||
func (store *Store) dirtyItems(start, end []byte) { | ||
unsorted := make([]*kv.Pair, 0) | ||
|
||
n := len(store.unsortedCache) | ||
for key := range store.unsortedCache { | ||
if dbm.IsKeyInDomain(conv.UnsafeStrToBytes(key), start, end) { | ||
unsorted := make([]*kv.Pair, 0) | ||
// If the unsortedCache is too big, its costs too much to determine | ||
// whats in the subset we are concerned about. | ||
// If you are interleaving iterator calls with writes, this can easily become an | ||
// O(N^2) overhead. | ||
// Even without that, too many range checks eventually becomes more expensive | ||
// than just not having the cache. | ||
if n >= 1024 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is really a magic number. Its intended to capture that doing a direct scan is actually a good idea for small I imagine an optimal |
||
for key := range store.unsortedCache { | ||
cacheValue := store.cache[key] | ||
unsorted = append(unsorted, &kv.Pair{Key: []byte(key), Value: cacheValue.value}) | ||
} | ||
} else { | ||
// else do a linear scan to determine if the unsorted pairs are in the pool. | ||
for key := range store.unsortedCache { | ||
if dbm.IsKeyInDomain(strToByte(key), start, end) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mailed a PR that'll speed this up. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah perfect, I had a similar branch to remove the duplicated checks here as well. Happy to review your PR if you can link it (I didn't see it in the open PR list) |
||
cacheValue := store.cache[key] | ||
unsorted = append(unsorted, &kv.Pair{Key: []byte(key), Value: cacheValue.value}) | ||
} | ||
} | ||
} | ||
store.clearUnsortedCacheSubset(unsorted) | ||
} | ||
|
||
func (store *Store) clearUnsortedCacheSubset(unsorted []*kv.Pair) { | ||
n := len(store.unsortedCache) | ||
if len(unsorted) == n { // This pattern allows the Go compiler to emit the map clearing idiom for the entire map. | ||
for key := range store.unsortedCache { | ||
delete(store.unsortedCache, key) | ||
|
@@ -204,32 +238,21 @@ func (store *Store) dirtyItems(start, end []byte) { | |
delete(store.unsortedCache, conv.UnsafeBytesToStr(kv.Key)) | ||
} | ||
} | ||
|
||
sort.Slice(unsorted, func(i, j int) bool { | ||
return bytes.Compare(unsorted[i].Key, unsorted[j].Key) < 0 | ||
}) | ||
|
||
for e := store.sortedCache.Front(); e != nil && len(unsorted) != 0; { | ||
uitem := unsorted[0] | ||
sitem := e.Value | ||
comp := bytes.Compare(uitem.Key, sitem.Key) | ||
|
||
switch comp { | ||
case -1: | ||
unsorted = unsorted[1:] | ||
|
||
store.sortedCache.InsertBefore(uitem, e) | ||
case 1: | ||
e = e.Next() | ||
case 0: | ||
unsorted = unsorted[1:] | ||
e.Value = uitem | ||
e = e.Next() | ||
for _, item := range unsorted { | ||
if item.Value == nil { | ||
// deleted element, tracked by store.deleted | ||
// setting arbitrary value | ||
store.sortedCache.Set(item.Key, []byte{}) | ||
continue | ||
} | ||
err := store.sortedCache.Set(item.Key, item.Value) | ||
if err != nil { | ||
panic(err) | ||
} | ||
} | ||
|
||
for _, kvp := range unsorted { | ||
store.sortedCache.PushBack(kvp) | ||
} | ||
} | ||
|
||
|
@@ -240,11 +263,20 @@ func (store *Store) dirtyItems(start, end []byte) { | |
func (store *Store) setCacheValue(key, value []byte, deleted bool, dirty bool) { | ||
keyStr := conv.UnsafeBytesToStr(key) | ||
store.cache[keyStr] = &cValue{ | ||
value: value, | ||
deleted: deleted, | ||
dirty: dirty, | ||
value: value, | ||
dirty: dirty, | ||
} | ||
if deleted { | ||
store.deleted[keyStr] = struct{}{} | ||
} else { | ||
delete(store.deleted, keyStr) | ||
} | ||
if dirty { | ||
store.unsortedCache[keyStr] = struct{}{} | ||
} | ||
} | ||
|
||
func (store *Store) isDeleted(key string) bool { | ||
_, ok := store.deleted[key] | ||
return ok | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is not new code, and actually was written before? (I don't think that either I or joon wrote it)
Maybe it just got deleted at some point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check internal/conv, we have an exported function for this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah perfect, thanks