Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix storage sync from non-empty genesis and p2p multi call dispatch #5005

Merged
merged 4 commits into from
Oct 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changelog/5005.bugfix.1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
go/worker/common: Reorder state determination checks

Otherwise the shown state would be misleading, e.g. showing that it is
waiting for runtime host being provisioned while it is actually blocked
in initialization like storage sync.
7 changes: 7 additions & 0 deletions .changelog/5005.bugfix.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
go/worker/storage: Fix case when checkpoint sync disabled but forced

If checkpoint sync is disabled but sync has been forced (e.g. because
the state at genesis is non-empty), we must request to sync the
checkpoint at genesis as otherwise we will jump to a later state which
may not be desired given that checkpoint sync has been explicitly
disabled via config.
1 change: 1 addition & 0 deletions .changelog/5005.bugfix.3.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/storage/mkvs/checkpoint: Exclude initial version when pruning
1 change: 1 addition & 0 deletions .changelog/5005.bugfix.4.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/p2p/rpc: Fix multi call dispatch to different peers
1 change: 1 addition & 0 deletions go/p2p/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ func (c *client) CallMulti(
}
var resultChs []channels.SimpleOutChannel
for _, peer := range c.GetBestPeers() {
peer := peer // Make sure goroutine below operates on the right instance.
if !c.isPeerAcceptable(peer) {
continue
}
Expand Down
2 changes: 2 additions & 0 deletions go/p2p/rpc/peermgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (mgr *peerManager) AddPeer(peerID core.PeerID) {

mgr.logger.Debug("added new peer",
"peer_id", peerID,
"num_peers", len(mgr.peers),
)
}

Expand All @@ -134,6 +135,7 @@ func (mgr *peerManager) RemovePeer(peerID core.PeerID) {

mgr.logger.Debug("removed peer",
"peer_id", peerID,
"num_peers", len(mgr.peers),
)
}

Expand Down
5 changes: 4 additions & 1 deletion go/storage/mkvs/checkpoint/checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,10 @@ func (c *checkpointer) maybeCheckpoint(ctx context.Context, version uint64, para
}
}

// Garbage collect old checkpoints.
// Garbage collect old checkpoints, first making sure that genesis checkpoint is excluded.
if len(cpVersions) > 0 && cpVersions[0] == params.InitialVersion {
cpVersions = cpVersions[1:]
}
if int(params.NumKept) < len(cpVersions) {
c.logger.Info("performing checkpoint garbage collection",
"num_checkpoints", len(cpVersions),
Expand Down
6 changes: 3 additions & 3 deletions go/worker/common/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,15 @@ func (n *Node) getStatusStateLocked() api.StatusState {
if atomic.LoadUint32(&n.keymanagerAvailable) == 0 {
return api.StatusStateWaitingKeymanager
}
if atomic.LoadUint32(&n.hostedRuntimeProvisioned) == 0 {
return api.StatusStateWaitingHostedRuntime
}
if atomic.LoadUint32(&n.historyReindexingDone) == 0 {
return api.StatusStateWaitingHistoryReindex
}
if atomic.LoadUint32(&n.workersInitialized) == 0 {
return api.StatusStateWaitingWorkersInit
}
if atomic.LoadUint32(&n.hostedRuntimeProvisioned) == 0 {
return api.StatusStateWaitingHostedRuntime
}
// If resumeCh exists the runtime is suspended (safe to check since the cross node lock should be held).
if n.resumeCh != nil {
return api.StatusStateRuntimeSuspended
Expand Down
13 changes: 12 additions & 1 deletion go/worker/storage/committee/checkpoint_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (n *Node) checkCheckpointUsable(cp *storageSync.Checkpoint, remainingMask o
return false
}

func (n *Node) syncCheckpoints(genesisRound uint64) (*blockSummary, error) {
func (n *Node) syncCheckpoints(genesisRound uint64, wantOnlyGenesis bool) (*blockSummary, error) {
// Store roots and round info for checkpoints that finished syncing.
// Round and namespace info will get overwritten as rounds are skipped
// for errors, driven by remainingRoots.
Expand All @@ -343,6 +343,17 @@ func (n *Node) syncCheckpoints(genesisRound uint64) (*blockSummary, error) {
return nil, fmt.Errorf("can't get checkpoint list from peers: %w", err)
}

// If we only want the genesis checkpoint, filter it out.
if wantOnlyGenesis && len(cps) > 0 {
var filteredCps []*storageSync.Checkpoint
for _, cp := range cps {
if cp.Root.Version == genesisRound {
filteredCps = append(filteredCps, cp)
}
}
cps = filteredCps
}

// Try all the checkpoints now, from most recent backwards.
var (
prevVersion = ^uint64(0)
Expand Down
7 changes: 6 additions & 1 deletion go/worker/storage/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,14 +911,19 @@ func (n *Node) worker() { // nolint: gocyclo
//
// - We haven't synced anything yet and checkpoint sync is not disabled.
//
// If checkpoint sync is disabled but sync has been forced (e.g. because the state at genesis
// is non-empty), we must request to sync the checkpoint at genesis as otherwise we will jump
// to a later state which may not be desired given that checkpoint sync has been explicitly
// disabled via config.
//
if (isInitialStartup && !n.checkpointSyncCfg.Disabled) || n.checkpointSyncForced {
var (
summary *blockSummary
attempt int
)
CheckpointSyncRetry:
for {
summary, err = n.syncCheckpoints(genesisBlock.Header.Round)
summary, err = n.syncCheckpoints(genesisBlock.Header.Round, n.checkpointSyncCfg.Disabled)
if err == nil {
break
}
Expand Down