Skip to content

Commit

Permalink
go/worker/storage: Synchronize checkpoints with consensus layer
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Jun 24, 2021
1 parent 053f499 commit b73c51e
Showing 1 changed file with 53 additions and 0 deletions.
53 changes: 53 additions & 0 deletions go/worker/storage/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,9 @@ func (n *Node) Name() string {
func (n *Node) Start() error {
go n.watchQuit()
go n.worker()
if n.checkpointer != nil {
go n.consensusCheckpointSyncer()
}
return nil
}

Expand Down Expand Up @@ -835,6 +838,56 @@ func (n *Node) watchQuit() {
close(n.quitCh)
}

func (n *Node) consensusCheckpointSyncer() {
// Make sure we always create a checkpoint when the consensus layer creates a checkpoint. The
// reason why we do this is to make it faster for storage nodes that use consensus state sync
// to catch up as exactly the right checkpoint will be available.
consensusCp := n.commonNode.Consensus.Checkpointer()
if consensusCp == nil {
return
}

ch, sub, err := consensusCp.WatchCheckpoints()
if err != nil {
n.logger.Error("failed to watch checkpoints",
"err", err,
)
return
}
defer sub.Close()

for {
select {
case <-n.quitCh:
return
case <-n.ctx.Done():
return
case version := <-ch:
// Lookup what runtime round corresponds to the given consensus layer version and make
// sure we checkpoint it.
blk, err := n.commonNode.Consensus.RootHash().GetLatestBlock(n.ctx, &roothashApi.RuntimeRequest{
RuntimeID: n.commonNode.Runtime.ID(),
Height: int64(version),
})
if err != nil {
n.logger.Error("failed to get runtime block corresponding to consensus checkpoint",
"err", err,
"height", version,
)
continue
}

// Force runtime storage checkpointer to create a checkpoint at this round.
n.logger.Info("consensus checkpoint, force runtime checkpoint",
"height", version,
"round", blk.Header.Round,
)

n.checkpointer.ForceCheckpoint(blk.Header.Round)
}
}
}

// This is only called from the main worker goroutine, so no locking should be necessary.
func (n *Node) nudgeAvailability(lastSynced, latest uint64) {
if lastSynced == n.undefinedRound || latest == n.undefinedRound {
Expand Down

0 comments on commit b73c51e

Please sign in to comment.