Skip to content

Commit

Permalink
fix state_sync according to time_lock
Browse files Browse the repository at this point in the history
  • Loading branch information
ackratos committed Jun 18, 2019
1 parent 93eb442 commit fc532fd
Showing 1 changed file with 30 additions and 18 deletions.
48 changes: 30 additions & 18 deletions store/statesync_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ type StateSyncHelper struct {
incompleteChunks map[int64][]incompleteChunkItem // node idx -> incomplete chunk items, for caching incomplete nodes temporally
prefixNodeDBs []PrefixNodeDB

storeKeys []sdk.StoreKey

reloadingMtx sync.RWMutex // guard below fields to make sure no concurrent load snapshot and response snapshot, and they should be updated atomically

snapshotManager *snapshot.SnapshotManager
Expand All @@ -88,10 +86,23 @@ func NewStateSyncHelper(
helper.commitMS = cms
helper.cdc = cdc

kvStores := cms.GetCommitKVStores()
helper.SnapshotHeights = make(chan int64, snapshotWorkingQueueSize)
helper.HeightsToDelete = make(chan int64, snapshotToRemoveQueueSize)

return &helper
}

// not all key in cms is committed
// for example the BEP9 timelock store upgrade will not commit the newly added store until upgrade height
func (helper *StateSyncHelper) getCommitedSortedStoreKeys() []sdk.StoreKey {
kvStores := helper.commitMS.GetCommitKVStores()
names := make([]string, 0, len(kvStores))
nameToKey := make(map[string]sdk.StoreKey, len(kvStores))
for key, store := range cms.GetCommitKVStores() {
for key, store := range helper.commitMS.GetCommitKVStores() {
if !sdk.ShouldCommitStore(key.Name()) {
continue
}

switch store.(type) {
case *IavlStore:
nameToKey[key.Name()] = key
Expand All @@ -101,14 +112,11 @@ func NewStateSyncHelper(
}
}
sort.Strings(names)
storeKeys := make([]sdk.StoreKey, 0, len(names))
for _, name := range names {
helper.storeKeys = append(helper.storeKeys, nameToKey[name])
storeKeys = append(storeKeys, nameToKey[name])
}

helper.SnapshotHeights = make(chan int64, snapshotWorkingQueueSize)
helper.HeightsToDelete = make(chan int64, snapshotToRemoveQueueSize)

return &helper
return storeKeys
}

// Split Init method and NewStateSyncHelper for snapshot command
Expand All @@ -129,31 +137,33 @@ func (helper *StateSyncHelper) Init(lastBreatheBlockHeight int64) {
func (helper *StateSyncHelper) StartRecovery(manifest *abci.Manifest) error {
helper.logger.Info("start recovery")

storeKeys := helper.getCommitedSortedStoreKeys()

helper.manifest = manifest
helper.stateSyncStoreInfos = make([]StoreInfo, 0, len(helper.storeKeys))
helper.stateSyncStoreInfos = make([]StoreInfo, 0, len(storeKeys))
helper.hashesToIdx = make(map[abci.SHA256Sum]int, len(manifest.AppStateHashes))
helper.incompleteChunks = make(map[int64][]incompleteChunkItem, 0)
helper.prefixNodeDBs = make([]PrefixNodeDB, 0, len(helper.storeKeys))
helper.prefixNodeDBs = make([]PrefixNodeDB, 0, len(storeKeys))

idxOfChunk := 0
for _, h := range manifest.AppStateHashes {
helper.hashesToIdx[h] = idxOfChunk
idxOfChunk++
}

if len(manifest.NumKeys) != len(helper.storeKeys) {
return fmt.Errorf("sub store count in manifest %d does not match local %d", len(manifest.NumKeys), len(helper.storeKeys))
if len(manifest.NumKeys) != len(storeKeys) {
return fmt.Errorf("sub store count in manifest %d does not match local %d", len(manifest.NumKeys), len(storeKeys))
}

var startIdxForEachStore int64
for idx, numOfKeys := range manifest.NumKeys {
db := dbm.NewPrefixDB(helper.db, []byte("s/k:"+helper.storeKeys[idx].Name()+"/"))
db := dbm.NewPrefixDB(helper.db, []byte("s/k:"+storeKeys[idx].Name()+"/"))
nodeDB := iavl.NewNodeDB(db, 10000)
helper.prefixNodeDBs = append(helper.prefixNodeDBs,
PrefixNodeDB{
startIdxForEachStore,
startIdxForEachStore + numOfKeys,
helper.storeKeys[idx].Name(),
storeKeys[idx].Name(),
nodeDB})
startIdxForEachStore += numOfKeys
}
Expand Down Expand Up @@ -378,6 +388,8 @@ func (helper *StateSyncHelper) takeSnapshotImpl(height int64, retry int) {
return
}

storeKeys := helper.getCommitedSortedStoreKeys()

failed := true
for failed {
failed = false
Expand All @@ -395,11 +407,11 @@ func (helper *StateSyncHelper) takeSnapshotImpl(height int64, retry int) {
}

totalKeys := int64(0)
numKeys := make([]int64, 0, len(helper.storeKeys))
numKeys := make([]int64, 0, len(storeKeys))
currChunkNodes := make([][]byte, 0, 40000) // one account leaf node is around 100 bytes according to testnet experiment, non-leaf node should be less, 40000 should be a bit less than 4M
var currStartIdx int64
var currChunkTotalBytes int
for _, key := range helper.storeKeys {
for _, key := range storeKeys {
var currStoreKeys int64
store := helper.commitMS.GetKVStore(key)
// TODO: use Iterator method of store interface, no longer rely on implementation of KVStore
Expand Down

0 comments on commit fc532fd

Please sign in to comment.