Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve invalidate writeset #369

Merged
merged 1 commit into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions store/multiversion/mvkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,6 @@ func TestVersionIndexedStoreWrite(t *testing.T) {
"key3": []byte("value3"),
})

require.False(t, mvs.Has(3, []byte("key1")))
require.False(t, mvs.Has(3, []byte("key2")))
require.True(t, mvs.Has(3, []byte("key3")))

// write some keys
vis.Set([]byte("key1"), []byte("value1"))
vis.Set([]byte("key2"), []byte("value2"))
Expand All @@ -175,10 +171,6 @@ func TestVersionIndexedStoreWriteEstimates(t *testing.T) {
"key3": []byte("value3"),
})

require.False(t, mvs.Has(3, []byte("key1")))
require.False(t, mvs.Has(3, []byte("key2")))
require.True(t, mvs.Has(3, []byte("key3")))

// write some keys
vis.Set([]byte("key1"), []byte("value1"))
vis.Set([]byte("key2"), []byte("value2"))
Expand Down
49 changes: 22 additions & 27 deletions store/multiversion/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
type MultiVersionStore interface {
GetLatest(key []byte) (value MultiVersionValueItem)
GetLatestBeforeIndex(index int, key []byte) (value MultiVersionValueItem)
Has(index int, key []byte) bool
WriteLatestToStore()
SetWriteset(index int, incarnation int, writeset WriteSet)
InvalidateWriteset(index int, incarnation int)
Expand Down Expand Up @@ -46,6 +45,8 @@ type Store struct {
txReadSets *sync.Map // map of tx index -> readset ReadSet
txIterateSets *sync.Map // map of tx index -> iterateset Iterateset

txEstimateFlags *sync.Map

parentStore types.KVStore
}

Expand All @@ -55,6 +56,7 @@ func NewMultiVersionStore(parentStore types.KVStore) *Store {
txWritesetKeys: &sync.Map{},
txReadSets: &sync.Map{},
txIterateSets: &sync.Map{},
txEstimateFlags: &sync.Map{},
parentStore: parentStore,
}
}
Expand All @@ -76,6 +78,13 @@ func (s *Store) GetLatest(key []byte) (value MultiVersionValueItem) {
if !found {
return nil // this is possible IF there is are writeset that are then removed for that key
}
txIndex := latestVal.Index()
// check against estimate map
_, estimateFound := s.txEstimateFlags.Load(txIndex)
if estimateFound {
// it shouldnt be an issue to have a new item instead of modifying existing?
return NewEstimateItem(txIndex, latestVal.Incarnation())
}
return latestVal
}

Expand All @@ -92,23 +101,17 @@ func (s *Store) GetLatestBeforeIndex(index int, key []byte) (value MultiVersionV
if !found {
return nil
}
txIndex := val.Index()
// check against estimate map
_, estimateFound := s.txEstimateFlags.Load(txIndex)
if estimateFound {
// it shouldnt be an issue to have a new item instead of modifying existing?
return NewEstimateItem(txIndex, val.Incarnation())
}
// found a value prior to the passed in index, return that value (could be estimate OR deleted, but it is a definitive value)
return val
}

// Has implements MultiVersionStore. It checks if the key exists in the multiversion store at or before the specified index.
func (s *Store) Has(index int, key []byte) bool {

keyString := string(key)
mvVal, found := s.multiVersionMap.Load(keyString)
// if the key doesn't exist in the overall map, return nil
if !found {
return false // this is okay because the caller of this will THEN need to access the parent store to verify that the key doesnt exist there
}
_, foundVal := mvVal.(MultiVersionValue).GetLatestBeforeIndex(index)
return foundVal
}

func (s *Store) removeOldWriteset(index int, newWriteSet WriteSet) {
writeset := make(map[string][]byte)
if newWriteSet != nil {
Expand Down Expand Up @@ -157,23 +160,14 @@ func (s *Store) SetWriteset(index int, incarnation int, writeset WriteSet) {
mvVal.Set(index, incarnation, value)
}
}
sort.Strings(writeSetKeys) // TODO: if we're sorting here anyways, maybe we just put it into a btree instead of a slice
sort.Strings(writeSetKeys) // TODO: if we're sorting here anyways, maybe we just put it into a btree instead of a slice
s.txEstimateFlags.Delete(index) // remove estimate flag if it exists
s.txWritesetKeys.Store(index, writeSetKeys)
}

// InvalidateWriteset iterates over the keys for the given index and incarnation writeset and replaces with ESTIMATEs
// InvalidateWriteset updates the estimateFlags to indicate the writeset is out of date
func (s *Store) InvalidateWriteset(index int, incarnation int) {
keysAny, found := s.txWritesetKeys.Load(index)
if !found {
return
}
keys := keysAny.([]string)
for _, key := range keys {
// invalidate all of the writeset items - is this suboptimal? - we could potentially do concurrently if slow because locking is on an item specific level
val, _ := s.multiVersionMap.LoadOrStore(key, NewMultiVersionItem())
val.(MultiVersionValue).SetEstimate(index, incarnation)
}
// we leave the writeset in place because we'll need it for key removal later if/when we replace with a new writeset
s.txEstimateFlags.Store(index, struct{}{}) // set estimate flag
}

// SetEstimatedWriteset is used to directly write estimates instead of writing a writeset and later invalidating
Expand All @@ -190,6 +184,7 @@ func (s *Store) SetEstimatedWriteset(index int, incarnation int, writeset WriteS
mvVal.(MultiVersionValue).SetEstimate(index, incarnation)
}
sort.Strings(writeSetKeys)
s.txEstimateFlags.Store(index, struct{}{}) // set estimate flag
s.txWritesetKeys.Store(index, writeSetKeys)
}

Expand Down
6 changes: 0 additions & 6 deletions store/multiversion/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ func TestMultiVersionStore(t *testing.T) {
})
require.True(t, store.GetLatestBeforeIndex(5, []byte("key1")).IsEstimate())
require.Equal(t, []byte("value4"), store.GetLatestBeforeIndex(7, []byte("key1")).Value())

// Test Has
require.True(t, store.Has(2, []byte("key1")))
require.False(t, store.Has(0, []byte("key1")))
require.False(t, store.Has(5, []byte("key4")))
}

func TestMultiVersionStoreHasLaterValue(t *testing.T) {
Expand All @@ -69,7 +64,6 @@ func TestMultiVersionStoreKeyDNE(t *testing.T) {

require.Nil(t, store.GetLatest([]byte("key1")))
require.Nil(t, store.GetLatestBeforeIndex(0, []byte("key1")))
require.False(t, store.Has(0, []byte("key1")))
}

func TestMultiVersionStoreWriteToParent(t *testing.T) {
Expand Down