Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R] fix state_sync #150

Merged
merged 2 commits into from
Jun 19, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 31 additions & 18 deletions store/statesync_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ type StateSyncHelper struct {
incompleteChunks map[int64][]incompleteChunkItem // node idx -> incomplete chunk items, for caching incomplete nodes temporally
prefixNodeDBs []PrefixNodeDB

storeKeys []sdk.StoreKey

reloadingMtx sync.RWMutex // guard below fields to make sure no concurrent load snapshot and response snapshot, and they should be updated atomically

snapshotManager *snapshot.SnapshotManager
Expand All @@ -88,10 +86,23 @@ func NewStateSyncHelper(
helper.commitMS = cms
helper.cdc = cdc

kvStores := cms.GetCommitKVStores()
helper.SnapshotHeights = make(chan int64, snapshotWorkingQueueSize)
helper.HeightsToDelete = make(chan int64, snapshotToRemoveQueueSize)

return &helper
}

// not all key in cms is committed
// for example the BEP9 timelock store upgrade will not commit the newly added store until upgrade height
func (helper *StateSyncHelper) getCommitedSortedStoreKeys() []sdk.StoreKey {
kvStores := helper.commitMS.GetCommitKVStores()
names := make([]string, 0, len(kvStores))
nameToKey := make(map[string]sdk.StoreKey, len(kvStores))
for key, store := range cms.GetCommitKVStores() {
for key, store := range helper.commitMS.GetCommitKVStores() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use kvStores above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my brain was dead

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about replacing helper.commitMS.GetCommitKVStores() with kvStores? Here you call helper.commitMS.GetCommitKVStores() twice.

if !sdk.ShouldCommitStore(key.Name()) {
continue
}

switch store.(type) {
case *IavlStore:
nameToKey[key.Name()] = key
Expand All @@ -101,14 +112,11 @@ func NewStateSyncHelper(
}
}
sort.Strings(names)
storeKeys := make([]sdk.StoreKey, 0, len(names))
for _, name := range names {
helper.storeKeys = append(helper.storeKeys, nameToKey[name])
storeKeys = append(storeKeys, nameToKey[name])
}

helper.SnapshotHeights = make(chan int64, snapshotWorkingQueueSize)
helper.HeightsToDelete = make(chan int64, snapshotToRemoveQueueSize)

return &helper
return storeKeys
}

// Split Init method and NewStateSyncHelper for snapshot command
Expand All @@ -129,31 +137,34 @@ func (helper *StateSyncHelper) Init(lastBreatheBlockHeight int64) {
func (helper *StateSyncHelper) StartRecovery(manifest *abci.Manifest) error {
helper.logger.Info("start recovery")

sdk.UpgradeMgr.SetHeight(manifest.Height)
storeKeys := helper.getCommitedSortedStoreKeys()

helper.manifest = manifest
helper.stateSyncStoreInfos = make([]StoreInfo, 0, len(helper.storeKeys))
helper.stateSyncStoreInfos = make([]StoreInfo, 0, len(storeKeys))
helper.hashesToIdx = make(map[abci.SHA256Sum]int, len(manifest.AppStateHashes))
helper.incompleteChunks = make(map[int64][]incompleteChunkItem, 0)
helper.prefixNodeDBs = make([]PrefixNodeDB, 0, len(helper.storeKeys))
helper.prefixNodeDBs = make([]PrefixNodeDB, 0, len(storeKeys))

idxOfChunk := 0
for _, h := range manifest.AppStateHashes {
helper.hashesToIdx[h] = idxOfChunk
idxOfChunk++
}

if len(manifest.NumKeys) != len(helper.storeKeys) {
return fmt.Errorf("sub store count in manifest %d does not match local %d", len(manifest.NumKeys), len(helper.storeKeys))
if len(manifest.NumKeys) != len(storeKeys) {
return fmt.Errorf("sub store count in manifest %d does not match local %d", len(manifest.NumKeys), len(storeKeys))
}

var startIdxForEachStore int64
for idx, numOfKeys := range manifest.NumKeys {
db := dbm.NewPrefixDB(helper.db, []byte("s/k:"+helper.storeKeys[idx].Name()+"/"))
db := dbm.NewPrefixDB(helper.db, []byte("s/k:"+storeKeys[idx].Name()+"/"))
nodeDB := iavl.NewNodeDB(db, 10000)
helper.prefixNodeDBs = append(helper.prefixNodeDBs,
PrefixNodeDB{
startIdxForEachStore,
startIdxForEachStore + numOfKeys,
helper.storeKeys[idx].Name(),
storeKeys[idx].Name(),
nodeDB})
startIdxForEachStore += numOfKeys
}
Expand Down Expand Up @@ -378,6 +389,8 @@ func (helper *StateSyncHelper) takeSnapshotImpl(height int64, retry int) {
return
}

storeKeys := helper.getCommitedSortedStoreKeys()

failed := true
for failed {
failed = false
Expand All @@ -395,11 +408,11 @@ func (helper *StateSyncHelper) takeSnapshotImpl(height int64, retry int) {
}

totalKeys := int64(0)
numKeys := make([]int64, 0, len(helper.storeKeys))
numKeys := make([]int64, 0, len(storeKeys))
currChunkNodes := make([][]byte, 0, 40000) // one account leaf node is around 100 bytes according to testnet experiment, non-leaf node should be less, 40000 should be a bit less than 4M
var currStartIdx int64
var currChunkTotalBytes int
for _, key := range helper.storeKeys {
for _, key := range storeKeys {
var currStoreKeys int64
store := helper.commitMS.GetKVStore(key)
// TODO: use Iterator method of store interface, no longer rely on implementation of KVStore
Expand Down