forked from crypto-org-chain/go-block-stm
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmvmemory.go
110 lines (95 loc) · 2.81 KB
/
mvmemory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package block_stm
import (
"sync/atomic"
storetypes "cosmossdk.io/store/types"
)
type (
// keys are sorted
Locations []Key
MultiLocations map[int]Locations
)
// MVMemory implements `Algorithm 2 The MVMemory module`
type MVMemory struct {
stores map[storetypes.StoreKey]int
data []MVStore
lastWrittenLocations []atomic.Pointer[MultiLocations]
lastReadSet []atomic.Pointer[MultiReadSet]
}
func NewMVMemory(block_size int, stores map[storetypes.StoreKey]int) *MVMemory {
data := make([]MVStore, len(stores))
for key, i := range stores {
data[i] = NewMVStore(key)
}
return &MVMemory{
stores: stores,
data: data,
lastWrittenLocations: make([]atomic.Pointer[MultiLocations], block_size),
lastReadSet: make([]atomic.Pointer[MultiReadSet], block_size),
}
}
func (mv *MVMemory) Record(version TxnVersion, view *MultiMVMemoryView) bool {
newLocations := view.ApplyWriteSet(version)
wroteNewLocation := mv.rcuUpdateWrittenLocations(version.Index, newLocations)
mv.lastReadSet[version.Index].Store(view.ReadSet())
return wroteNewLocation
}
// newLocations are sorted
func (mv *MVMemory) rcuUpdateWrittenLocations(txn TxnIndex, newLocations MultiLocations) bool {
var wroteNewLocation bool
prevLocations := mv.readLastWrittenLocations(txn)
for i, newLoc := range newLocations {
prevLoc, ok := prevLocations[i]
if !ok {
if len(newLocations[i]) > 0 {
wroteNewLocation = true
}
continue
}
DiffOrderedList(prevLoc, newLoc, func(key Key, is_new bool) bool {
if is_new {
wroteNewLocation = true
} else {
mv.data[i].Delete(key, txn)
}
return true
})
}
mv.lastWrittenLocations[txn].Store(&newLocations)
return wroteNewLocation
}
func (mv *MVMemory) ConvertWritesToEstimates(txn TxnIndex) {
for i, locations := range mv.readLastWrittenLocations(txn) {
for _, key := range locations {
mv.data[i].WriteEstimate(key, txn)
}
}
}
func (mv *MVMemory) ValidateReadSet(txn TxnIndex) bool {
// Invariant: at least one `Record` call has been made for `txn`
rs := *mv.lastReadSet[txn].Load()
for store, readSet := range rs {
if !mv.data[store].ValidateReadSet(txn, readSet) {
return false
}
}
return true
}
func (mv *MVMemory) readLastWrittenLocations(txn TxnIndex) MultiLocations {
p := mv.lastWrittenLocations[txn].Load()
if p != nil {
return *p
}
return nil
}
func (mv *MVMemory) WriteSnapshot(storage MultiStore) {
for name, i := range mv.stores {
mv.data[i].SnapshotToStore(storage.GetStore(name))
}
}
// View creates a view for a particular transaction.
func (mv *MVMemory) View(txn TxnIndex, storage MultiStore, scheduler *Scheduler) *MultiMVMemoryView {
return NewMultiMVMemoryView(mv.stores, storage, mv, scheduler, txn)
}
func (mv *MVMemory) GetMVStore(i int) MVStore {
return mv.data[i]
}