Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[occ] Iterateset tracking and validation implementation #337

Merged
merged 9 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 46 additions & 6 deletions store/multiversion/memiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ type memIterator struct {
types.Iterator

mvStore MultiVersionStore
writeset map[string][]byte
writeset WriteSet
index int
abortChannel chan occtypes.Abort
ReadsetHandler
}

func (store *VersionIndexedStore) newMemIterator(
start, end []byte,
items *dbm.MemDB,
ascending bool,
readsetHandler ReadsetHandler,
) *memIterator {
var iter types.Iterator
var err error
Expand All @@ -41,11 +43,12 @@ func (store *VersionIndexedStore) newMemIterator(
}

return &memIterator{
Iterator: iter,
mvStore: store.multiVersionStore,
index: store.transactionIndex,
abortChannel: store.abortChannel,
writeset: store.GetWriteset(),
Iterator: iter,
mvStore: store.multiVersionStore,
index: store.transactionIndex,
abortChannel: store.abortChannel,
writeset: store.GetWriteset(),
ReadsetHandler: readsetHandler,
}
}

Expand All @@ -66,9 +69,46 @@ func (mi *memIterator) Value() []byte {
mi.abortChannel <- occtypes.NewEstimateAbort(val.Index())
}

// need to update readset
// if we have a deleted value, return nil
if val.IsDeleted() {
defer mi.ReadsetHandler.UpdateReadSet(key, nil)
return nil
}
defer mi.ReadsetHandler.UpdateReadSet(key, val.Value())
return val.Value()
}

func (store *Store) newMVSValidationIterator(
index int,
start, end []byte,
items *dbm.MemDB,
ascending bool,
writeset WriteSet,
abortChannel chan occtypes.Abort,
) *memIterator {
var iter types.Iterator
var err error

if ascending {
iter, err = items.Iterator(start, end)
} else {
iter, err = items.ReverseIterator(start, end)
}

if err != nil {
if iter != nil {
iter.Close()
}
panic(err)
}

return &memIterator{
Iterator: iter,
mvStore: store,
index: index,
abortChannel: abortChannel,
ReadsetHandler: NoOpHandler{},
writeset: writeset,
}
}
9 changes: 6 additions & 3 deletions store/multiversion/mergeiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@ type mvsMergeIterator struct {
parent types.Iterator
cache types.Iterator
ascending bool
ReadsetHandler
}

var _ types.Iterator = (*mvsMergeIterator)(nil)

func NewMVSMergeIterator(
parent, cache types.Iterator,
ascending bool,
readsetHandler ReadsetHandler,
) *mvsMergeIterator {
iter := &mvsMergeIterator{
parent: parent,
cache: cache,
ascending: ascending,
parent: parent,
cache: cache,
ascending: ascending,
ReadsetHandler: readsetHandler,
}

return iter
Expand Down
116 changes: 86 additions & 30 deletions store/multiversion/mvkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,70 @@ import (
dbm "github.com/tendermint/tm-db"
)

// exposes a handler for adding items to readset, useful for iterators
type ReadsetHandler interface {
UpdateReadSet(key []byte, value []byte)
}

type NoOpHandler struct{}

func (NoOpHandler) UpdateReadSet(key []byte, value []byte) {}

// exposes a handler for adding items to iterateset, to be called upon iterator close
type IterateSetHandler interface {
UpdateIterateSet(iterationTracker)
}

type iterationTracker struct {
startKey []byte // start of the iteration range
endKey []byte // end of the iteration range
earlyStopKey []byte // key that caused early stop
iteratedKeys map[string]struct{} // TODO: is a map okay because the ordering will be enforced when we replay the iterator?
ascending bool

writeset WriteSet

// TODO: is it possible that terimation is affected by keys later in iteration that weren't reached? eg. number of keys affecting iteration?
// TODO: i believe to get number of keys the iteration would need to be done fully so its not a concern?

// TODO: maybe we need to store keys served from writeset for the transaction? that way if theres OTHER keys within the writeset and the iteration range, and were written to the writeset later, we can discriminate between the groups?
// keysServedFromWriteset map[string]struct{}

// actually its simpler to just store a copy of the writeset at the time of iterator creation
}

func NewIterationTracker(startKey, endKey []byte, ascending bool, writeset WriteSet) iterationTracker {
copyWriteset := make(WriteSet, len(writeset))

for key, value := range writeset {
copyWriteset[key] = value
}

return iterationTracker{
startKey: startKey,
endKey: endKey,
iteratedKeys: make(map[string]struct{}),
ascending: ascending,
writeset: copyWriteset,
}
}

func (item *iterationTracker) AddKey(key []byte) {
item.iteratedKeys[string(key)] = struct{}{}
}

func (item *iterationTracker) SetEarlyStopKey(key []byte) {
item.earlyStopKey = key
}

// Version Indexed Store wraps the multiversion store in a way that implements the KVStore interface, but also stores the index of the transaction, and so store actions are applied to the multiversion store using that index
type VersionIndexedStore struct {
mtx sync.Mutex
// used for tracking reads and writes for eventual validation + persistence into multi-version store
readset map[string][]byte // contains the key -> value mapping for all keys read from the store (not mvkv, underlying store)
writeset map[string][]byte // contains the key -> value mapping for all keys written to the store
// TODO: does this need sync.Map?
readset map[string][]byte // contains the key -> value mapping for all keys read from the store (not mvkv, underlying store)
writeset map[string][]byte // contains the key -> value mapping for all keys written to the store
iterateset Iterateset
// TODO: need to add iterateset here as well

// dirty keys that haven't been sorted yet for iteration
Expand All @@ -36,11 +94,14 @@ type VersionIndexedStore struct {
}

var _ types.KVStore = (*VersionIndexedStore)(nil)
var _ ReadsetHandler = (*VersionIndexedStore)(nil)
var _ IterateSetHandler = (*VersionIndexedStore)(nil)

func NewVersionIndexedStore(parent types.KVStore, multiVersionStore MultiVersionStore, transactionIndex, incarnation int, abortChannel chan scheduler.Abort) *VersionIndexedStore {
return &VersionIndexedStore{
readset: make(map[string][]byte),
writeset: make(map[string][]byte),
iterateset: []iterationTracker{},
dirtySet: make(map[string]struct{}),
sortedStore: dbm.NewMemDB(),
parent: parent,
Expand Down Expand Up @@ -97,7 +158,7 @@ func (store *VersionIndexedStore) Get(key []byte) []byte {
}
// if we didn't find it in the multiversion store, then we want to check the parent store + add to readset
parentValue := store.parent.Get(key)
store.updateReadSet(key, parentValue)
store.UpdateReadSet(key, parentValue)
return parentValue
}

Expand All @@ -107,7 +168,7 @@ func (store *VersionIndexedStore) parseValueAndUpdateReadset(strKey string, mvsV
if mvsValue.IsDeleted() {
value = nil
}
store.updateReadSet([]byte(strKey), value)
store.UpdateReadSet([]byte(strKey), value)
return value
}

Expand Down Expand Up @@ -201,49 +262,36 @@ func (v *VersionIndexedStore) ReverseIterator(start []byte, end []byte) dbm.Iter
func (store *VersionIndexedStore) iterator(start []byte, end []byte, ascending bool) dbm.Iterator {
store.mtx.Lock()
defer store.mtx.Unlock()
// TODO: ideally we persist writeset keys into a sorted btree for later use
// make a set of total keys across mvkv and mvs to iterate
keysToIterate := make(map[string]struct{})
for key := range store.writeset {
keysToIterate[key] = struct{}{}
}

// get the sorted keys from MVS
// TODO: ideally we take advantage of mvs keys already being sorted
// get the multiversion store sorted keys
writesetMap := store.multiVersionStore.GetAllWritesetKeys()
for i := 0; i < store.transactionIndex; i++ {
// add all the writesets keys up until current index
for _, key := range writesetMap[i] {
keysToIterate[key] = struct{}{}
}
}
// TODO: ideally merge btree and mvs keys into a single sorted btree
memDB := store.multiVersionStore.CollectIteratorItems(store.transactionIndex)

// TODO: this is horribly inefficient, fix this
sortedKeys := make([]string, len(keysToIterate))
for key := range keysToIterate {
sortedKeys = append(sortedKeys, key)
}
sort.Strings(sortedKeys)

memDB := dbm.NewMemDB()
for _, key := range sortedKeys {
// TODO: ideally we persist writeset keys into a sorted btree for later use
// make a set of total keys across mvkv and mvs to iterate
for key := range store.writeset {
memDB.Set([]byte(key), []byte{})
}

var parent, memIterator types.Iterator

// make a memIterator
memIterator = store.newMemIterator(start, end, memDB, ascending)
memIterator = store.newMemIterator(start, end, memDB, ascending, store)

if ascending {
parent = store.parent.Iterator(start, end)
} else {
parent = store.parent.ReverseIterator(start, end)
}

mergeIterator := NewMVSMergeIterator(parent, memIterator, ascending, store)

iterationTracker := NewIterationTracker(start, end, ascending, store.writeset)
trackedIterator := NewTrackedIterator(mergeIterator, iterationTracker, store)

// mergeIterator
return NewMVSMergeIterator(parent, memIterator, ascending)
return trackedIterator

}

Expand Down Expand Up @@ -288,19 +336,27 @@ func (store *VersionIndexedStore) WriteToMultiVersionStore() {
defer store.mtx.Unlock()
defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "write_mvs")
store.multiVersionStore.SetWriteset(store.transactionIndex, store.incarnation, store.writeset)
store.multiVersionStore.SetReadset(store.transactionIndex, store.readset)
store.multiVersionStore.SetIterateset(store.transactionIndex, store.iterateset)
}

func (store *VersionIndexedStore) WriteEstimatesToMultiVersionStore() {
store.mtx.Lock()
defer store.mtx.Unlock()
defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "write_mvs")
store.multiVersionStore.SetEstimatedWriteset(store.transactionIndex, store.incarnation, store.writeset)
// TODO: do we need to write readset and iterateset in this case? I don't think so since if this is called it means we aren't doing validation
}

func (store *VersionIndexedStore) updateReadSet(key []byte, value []byte) {
func (store *VersionIndexedStore) UpdateReadSet(key []byte, value []byte) {
// add to readset
keyStr := string(key)
store.readset[keyStr] = value
// add to dirty set
store.dirtySet[keyStr] = struct{}{}
}

func (store *VersionIndexedStore) UpdateIterateSet(iterationTracker iterationTracker) {
// append to iterateset
store.iterateset = append(store.iterateset, iterationTracker)
}
17 changes: 17 additions & 0 deletions store/multiversion/mvkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,12 @@ func TestIterator(t *testing.T) {

// iterate over the keys - exclusive on key5
iter := vis.Iterator([]byte("000"), []byte("key5"))

// verify domain is superset
start, end := iter.Domain()
require.Equal(t, []byte("000"), start)
require.Equal(t, []byte("key5"), end)

vals := []string{}
defer iter.Close()
for ; iter.Valid(); iter.Next() {
Expand Down Expand Up @@ -354,6 +360,17 @@ func TestIterator(t *testing.T) {
require.Equal(t, []string{"value1", "value2", "value3", "valueNew"}, vals3)
iter3.Close()

vis.Set([]byte("key6"), []byte("value6"))
// iterate over the keys, writeset being the last of the iteration range
iter4 := vis.Iterator([]byte("000"), []byte("key7"))
vals4 := []string{}
defer iter4.Close()
for ; iter4.Valid(); iter4.Next() {
vals4 = append(vals4, string(iter4.Value()))
}
require.Equal(t, []string{"value1", "value2", "value3", "valueNew", "value5", "value6"}, vals4)
iter4.Close()

// add an estimate to MVS
mvs.SetEstimatedWriteset(1, 1, map[string][]byte{
"key2": []byte("value1_b"),
Expand Down
Loading
Loading