From b73c51e3f40a4b2379b4a51f0edb11bb0a4a7f91 Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Thu, 24 Jun 2021 20:58:20 +0200 Subject: [PATCH] go/worker/storage: Synchronize checkpoints with consensus layer --- go/worker/storage/committee/node.go | 53 +++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index 5bae1704ceb..e06d257af71 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -363,6 +363,9 @@ func (n *Node) Name() string { func (n *Node) Start() error { go n.watchQuit() go n.worker() + if n.checkpointer != nil { + go n.consensusCheckpointSyncer() + } return nil } @@ -835,6 +838,56 @@ func (n *Node) watchQuit() { close(n.quitCh) } +func (n *Node) consensusCheckpointSyncer() { + // Make sure we always create a checkpoint when the consensus layer creates a checkpoint. The + // reason why we do this is to make it faster for storage nodes that use consensus state sync + // to catch up as exactly the right checkpoint will be available. + consensusCp := n.commonNode.Consensus.Checkpointer() + if consensusCp == nil { + return + } + + ch, sub, err := consensusCp.WatchCheckpoints() + if err != nil { + n.logger.Error("failed to watch checkpoints", + "err", err, + ) + return + } + defer sub.Close() + + for { + select { + case <-n.quitCh: + return + case <-n.ctx.Done(): + return + case version := <-ch: + // Lookup what runtime round corresponds to the given consensus layer version and make + // sure we checkpoint it. + blk, err := n.commonNode.Consensus.RootHash().GetLatestBlock(n.ctx, &roothashApi.RuntimeRequest{ + RuntimeID: n.commonNode.Runtime.ID(), + Height: int64(version), + }) + if err != nil { + n.logger.Error("failed to get runtime block corresponding to consensus checkpoint", + "err", err, + "height", version, + ) + continue + } + + // Force runtime storage checkpointer to create a checkpoint at this round. + n.logger.Info("consensus checkpoint, force runtime checkpoint", + "height", version, + "round", blk.Header.Round, + ) + + n.checkpointer.ForceCheckpoint(blk.Header.Round) + } + } +} + // This is only called from the main worker goroutine, so no locking should be necessary. func (n *Node) nudgeAvailability(lastSynced, latest uint64) { if lastSynced == n.undefinedRound || latest == n.undefinedRound {