Skip to content

Commit

Permalink
[occ] Iterateset tracking and validation implementation (#337)
Browse files Browse the repository at this point in the history
## Describe your changes and provide context
This implements a tracked iterator that is used to keep track of keys
that have been iterated, and to also save metadata about the iteration
for LATER validation. The iterator will be replayed and if there are any
new keys / any keys missing within the iteration range, it will fail
validation. the actual values served by the iterator are covered by
readset validation.

Additionally, the early stop behavior allows the iterateset to ONLY be
sensitive to changes to the keys available WITHIN the iteration range.
In the event that we perform iteration, and THEN write a key within the
range of iteration, this will not fail iteration because we take a
snapshot of the mvkv writeset at the moment of iteration, so when we
replay the iterator, we populate that iterator with the writeset at that
time, so we appropriately replicate the iterator behavior.

In the case that we encounter an ESTIMATE, we have to terminate the
iterator validation and mark it as failed because it is impossible to
know whether that ESTIMATE represents a value change or a delete, since
the latter, will affect the keys available for iteration.

This change also implements handlers that iterators receive for updating
readset and iterateset in the `mvkv`

## Testing performed to validate your change
Unit tests for various iteration scenarios
  • Loading branch information
udpatil committed Jan 8, 2024
1 parent 3cc9662 commit d6d2065
Show file tree
Hide file tree
Showing 7 changed files with 706 additions and 57 deletions.
52 changes: 46 additions & 6 deletions store/multiversion/memiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ type memIterator struct {
types.Iterator

mvStore MultiVersionStore
writeset map[string][]byte
writeset WriteSet
index int
abortChannel chan occtypes.Abort
ReadsetHandler
}

func (store *VersionIndexedStore) newMemIterator(
start, end []byte,
items *dbm.MemDB,
ascending bool,
readsetHandler ReadsetHandler,
) *memIterator {
var iter types.Iterator
var err error
Expand All @@ -41,11 +43,12 @@ func (store *VersionIndexedStore) newMemIterator(
}

return &memIterator{
Iterator: iter,
mvStore: store.multiVersionStore,
index: store.transactionIndex,
abortChannel: store.abortChannel,
writeset: store.GetWriteset(),
Iterator: iter,
mvStore: store.multiVersionStore,
index: store.transactionIndex,
abortChannel: store.abortChannel,
writeset: store.GetWriteset(),
ReadsetHandler: readsetHandler,
}
}

Expand All @@ -66,9 +69,46 @@ func (mi *memIterator) Value() []byte {
mi.abortChannel <- occtypes.NewEstimateAbort(val.Index())
}

// need to update readset
// if we have a deleted value, return nil
if val.IsDeleted() {
defer mi.ReadsetHandler.UpdateReadSet(key, nil)
return nil
}
defer mi.ReadsetHandler.UpdateReadSet(key, val.Value())
return val.Value()
}

func (store *Store) newMVSValidationIterator(
index int,
start, end []byte,
items *dbm.MemDB,
ascending bool,
writeset WriteSet,
abortChannel chan occtypes.Abort,
) *memIterator {
var iter types.Iterator
var err error

if ascending {
iter, err = items.Iterator(start, end)
} else {
iter, err = items.ReverseIterator(start, end)
}

if err != nil {
if iter != nil {
iter.Close()
}
panic(err)
}

return &memIterator{
Iterator: iter,
mvStore: store,
index: index,
abortChannel: abortChannel,
ReadsetHandler: NoOpHandler{},
writeset: writeset,
}
}
9 changes: 6 additions & 3 deletions store/multiversion/mergeiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@ type mvsMergeIterator struct {
parent types.Iterator
cache types.Iterator
ascending bool
ReadsetHandler
}

var _ types.Iterator = (*mvsMergeIterator)(nil)

func NewMVSMergeIterator(
parent, cache types.Iterator,
ascending bool,
readsetHandler ReadsetHandler,
) *mvsMergeIterator {
iter := &mvsMergeIterator{
parent: parent,
cache: cache,
ascending: ascending,
parent: parent,
cache: cache,
ascending: ascending,
ReadsetHandler: readsetHandler,
}

return iter
Expand Down
116 changes: 86 additions & 30 deletions store/multiversion/mvkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,70 @@ import (
dbm "github.com/tendermint/tm-db"
)

// exposes a handler for adding items to readset, useful for iterators
type ReadsetHandler interface {
UpdateReadSet(key []byte, value []byte)
}

type NoOpHandler struct{}

func (NoOpHandler) UpdateReadSet(key []byte, value []byte) {}

// exposes a handler for adding items to iterateset, to be called upon iterator close
type IterateSetHandler interface {
UpdateIterateSet(iterationTracker)
}

type iterationTracker struct {
startKey []byte // start of the iteration range
endKey []byte // end of the iteration range
earlyStopKey []byte // key that caused early stop
iteratedKeys map[string]struct{} // TODO: is a map okay because the ordering will be enforced when we replay the iterator?
ascending bool

writeset WriteSet

// TODO: is it possible that terimation is affected by keys later in iteration that weren't reached? eg. number of keys affecting iteration?
// TODO: i believe to get number of keys the iteration would need to be done fully so its not a concern?

// TODO: maybe we need to store keys served from writeset for the transaction? that way if theres OTHER keys within the writeset and the iteration range, and were written to the writeset later, we can discriminate between the groups?
// keysServedFromWriteset map[string]struct{}

// actually its simpler to just store a copy of the writeset at the time of iterator creation
}

func NewIterationTracker(startKey, endKey []byte, ascending bool, writeset WriteSet) iterationTracker {
copyWriteset := make(WriteSet, len(writeset))

for key, value := range writeset {
copyWriteset[key] = value
}

return iterationTracker{
startKey: startKey,
endKey: endKey,
iteratedKeys: make(map[string]struct{}),
ascending: ascending,
writeset: copyWriteset,
}
}

func (item *iterationTracker) AddKey(key []byte) {
item.iteratedKeys[string(key)] = struct{}{}
}

func (item *iterationTracker) SetEarlyStopKey(key []byte) {
item.earlyStopKey = key
}

// Version Indexed Store wraps the multiversion store in a way that implements the KVStore interface, but also stores the index of the transaction, and so store actions are applied to the multiversion store using that index
type VersionIndexedStore struct {
mtx sync.Mutex
// used for tracking reads and writes for eventual validation + persistence into multi-version store
readset map[string][]byte // contains the key -> value mapping for all keys read from the store (not mvkv, underlying store)
writeset map[string][]byte // contains the key -> value mapping for all keys written to the store
// TODO: does this need sync.Map?
readset map[string][]byte // contains the key -> value mapping for all keys read from the store (not mvkv, underlying store)
writeset map[string][]byte // contains the key -> value mapping for all keys written to the store
iterateset Iterateset
// TODO: need to add iterateset here as well

// dirty keys that haven't been sorted yet for iteration
Expand All @@ -36,11 +94,14 @@ type VersionIndexedStore struct {
}

var _ types.KVStore = (*VersionIndexedStore)(nil)
var _ ReadsetHandler = (*VersionIndexedStore)(nil)
var _ IterateSetHandler = (*VersionIndexedStore)(nil)

func NewVersionIndexedStore(parent types.KVStore, multiVersionStore MultiVersionStore, transactionIndex, incarnation int, abortChannel chan scheduler.Abort) *VersionIndexedStore {
return &VersionIndexedStore{
readset: make(map[string][]byte),
writeset: make(map[string][]byte),
iterateset: []iterationTracker{},
dirtySet: make(map[string]struct{}),
sortedStore: dbm.NewMemDB(),
parent: parent,
Expand Down Expand Up @@ -97,7 +158,7 @@ func (store *VersionIndexedStore) Get(key []byte) []byte {
}
// if we didn't find it in the multiversion store, then we want to check the parent store + add to readset
parentValue := store.parent.Get(key)
store.updateReadSet(key, parentValue)
store.UpdateReadSet(key, parentValue)
return parentValue
}

Expand All @@ -107,7 +168,7 @@ func (store *VersionIndexedStore) parseValueAndUpdateReadset(strKey string, mvsV
if mvsValue.IsDeleted() {
value = nil
}
store.updateReadSet([]byte(strKey), value)
store.UpdateReadSet([]byte(strKey), value)
return value
}

Expand Down Expand Up @@ -201,49 +262,36 @@ func (v *VersionIndexedStore) ReverseIterator(start []byte, end []byte) dbm.Iter
func (store *VersionIndexedStore) iterator(start []byte, end []byte, ascending bool) dbm.Iterator {
store.mtx.Lock()
defer store.mtx.Unlock()
// TODO: ideally we persist writeset keys into a sorted btree for later use
// make a set of total keys across mvkv and mvs to iterate
keysToIterate := make(map[string]struct{})
for key := range store.writeset {
keysToIterate[key] = struct{}{}
}

// get the sorted keys from MVS
// TODO: ideally we take advantage of mvs keys already being sorted
// get the multiversion store sorted keys
writesetMap := store.multiVersionStore.GetAllWritesetKeys()
for i := 0; i < store.transactionIndex; i++ {
// add all the writesets keys up until current index
for _, key := range writesetMap[i] {
keysToIterate[key] = struct{}{}
}
}
// TODO: ideally merge btree and mvs keys into a single sorted btree
memDB := store.multiVersionStore.CollectIteratorItems(store.transactionIndex)

// TODO: this is horribly inefficient, fix this
sortedKeys := make([]string, len(keysToIterate))
for key := range keysToIterate {
sortedKeys = append(sortedKeys, key)
}
sort.Strings(sortedKeys)

memDB := dbm.NewMemDB()
for _, key := range sortedKeys {
// TODO: ideally we persist writeset keys into a sorted btree for later use
// make a set of total keys across mvkv and mvs to iterate
for key := range store.writeset {
memDB.Set([]byte(key), []byte{})
}

var parent, memIterator types.Iterator

// make a memIterator
memIterator = store.newMemIterator(start, end, memDB, ascending)
memIterator = store.newMemIterator(start, end, memDB, ascending, store)

if ascending {
parent = store.parent.Iterator(start, end)
} else {
parent = store.parent.ReverseIterator(start, end)
}

mergeIterator := NewMVSMergeIterator(parent, memIterator, ascending, store)

iterationTracker := NewIterationTracker(start, end, ascending, store.writeset)
trackedIterator := NewTrackedIterator(mergeIterator, iterationTracker, store)

// mergeIterator
return NewMVSMergeIterator(parent, memIterator, ascending)
return trackedIterator

}

Expand Down Expand Up @@ -288,19 +336,27 @@ func (store *VersionIndexedStore) WriteToMultiVersionStore() {
defer store.mtx.Unlock()
defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "write_mvs")
store.multiVersionStore.SetWriteset(store.transactionIndex, store.incarnation, store.writeset)
store.multiVersionStore.SetReadset(store.transactionIndex, store.readset)
store.multiVersionStore.SetIterateset(store.transactionIndex, store.iterateset)
}

func (store *VersionIndexedStore) WriteEstimatesToMultiVersionStore() {
store.mtx.Lock()
defer store.mtx.Unlock()
defer telemetry.MeasureSince(time.Now(), "store", "mvkv", "write_mvs")
store.multiVersionStore.SetEstimatedWriteset(store.transactionIndex, store.incarnation, store.writeset)
// TODO: do we need to write readset and iterateset in this case? I don't think so since if this is called it means we aren't doing validation
}

func (store *VersionIndexedStore) updateReadSet(key []byte, value []byte) {
func (store *VersionIndexedStore) UpdateReadSet(key []byte, value []byte) {
// add to readset
keyStr := string(key)
store.readset[keyStr] = value
// add to dirty set
store.dirtySet[keyStr] = struct{}{}
}

func (store *VersionIndexedStore) UpdateIterateSet(iterationTracker iterationTracker) {
// append to iterateset
store.iterateset = append(store.iterateset, iterationTracker)
}
17 changes: 17 additions & 0 deletions store/multiversion/mvkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,12 @@ func TestIterator(t *testing.T) {

// iterate over the keys - exclusive on key5
iter := vis.Iterator([]byte("000"), []byte("key5"))

// verify domain is superset
start, end := iter.Domain()
require.Equal(t, []byte("000"), start)
require.Equal(t, []byte("key5"), end)

vals := []string{}
defer iter.Close()
for ; iter.Valid(); iter.Next() {
Expand Down Expand Up @@ -354,6 +360,17 @@ func TestIterator(t *testing.T) {
require.Equal(t, []string{"value1", "value2", "value3", "valueNew"}, vals3)
iter3.Close()

vis.Set([]byte("key6"), []byte("value6"))
// iterate over the keys, writeset being the last of the iteration range
iter4 := vis.Iterator([]byte("000"), []byte("key7"))
vals4 := []string{}
defer iter4.Close()
for ; iter4.Valid(); iter4.Next() {
vals4 = append(vals4, string(iter4.Value()))
}
require.Equal(t, []string{"value1", "value2", "value3", "valueNew", "value5", "value6"}, vals4)
iter4.Close()

// add an estimate to MVS
mvs.SetEstimatedWriteset(1, 1, map[string][]byte{
"key2": []byte("value1_b"),
Expand Down
Loading

0 comments on commit d6d2065

Please sign in to comment.