Skip to content

Commit

Permalink
go/storage: Make Finalize in storage sync asynchronous
Browse files Browse the repository at this point in the history
  • Loading branch information
jberci committed Sep 13, 2019
1 parent 8fdb720 commit 2fdfd28
Showing 1 changed file with 124 additions and 68 deletions.
192 changes: 124 additions & 68 deletions go/worker/storage/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,48 +67,59 @@ func (o outstandingMask) String() string {
return fmt.Sprintf("outstanding_mask{%s}", strings.Join(represented, ", "))
}

// Syncing task context and support functions for container/heap.

type fetchedDiff struct {
fetchMask outstandingMask
fetched bool
err error
round uint64
prevRoot urkelNode.Root
thisRoot urkelNode.Root
writeLog storageApi.WriteLog
type roundItem interface {
GetRound() uint64
}

type outOfOrderQueue []*fetchedDiff
// outOfOrderRoundQueue is a Round()-based min priority queue.
type outOfOrderRoundQueue []roundItem

// Sorting interface.
func (q outOfOrderQueue) Len() int { return len(q) }
func (q outOfOrderQueue) Less(i, j int) bool { return q[i].round < q[j].round }
func (q outOfOrderQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] }
func (q outOfOrderRoundQueue) Len() int { return len(q) }
func (q outOfOrderRoundQueue) Less(i, j int) bool { return q[i].GetRound() < q[j].GetRound() }
func (q outOfOrderRoundQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] }

// Push appends x as the last element in the heap's array.
func (q *outOfOrderQueue) Push(x interface{}) {
*q = append(*q, x.(*fetchedDiff))
func (q *outOfOrderRoundQueue) Push(x interface{}) {
*q = append(*q, x.(roundItem))
}

// Pop removes and returns the last element in the heap's array.
func (q *outOfOrderQueue) Pop() interface{} {
func (q *outOfOrderRoundQueue) Pop() interface{} {
old := *q
n := len(old)
x := old[n-1]
*q = old[0 : n-1]
return x
}

// Small block metadata cache.
// fetchedDiff has all the context needed for a single GetDiff operation.
type fetchedDiff struct {
fetchMask outstandingMask
fetched bool
err error
round uint64
prevRoot urkelNode.Root
thisRoot urkelNode.Root
writeLog storageApi.WriteLog
}

func (d *fetchedDiff) GetRound() uint64 {
return d.round
}

// blockSummary is a short summary of a single block.Block.
type blockSummary struct {
Namespace common.Namespace `codec:"namespace"`
Round uint64 `codec:"round"`
IORoot urkelNode.Root `codec:"io_root"`
StateRoot urkelNode.Root `codec:"state_root"`
}

func (s *blockSummary) GetRound() uint64 {
return s.Round
}

func summaryFromBlock(blk *block.Block) *blockSummary {
return &blockSummary{
Namespace: blk.Header.Namespace,
Expand Down Expand Up @@ -150,8 +161,9 @@ type Node struct {
syncedLock sync.RWMutex
syncedState watcherState

blockCh *channels.InfiniteChannel
diffCh chan *fetchedDiff
blockCh *channels.InfiniteChannel
diffCh chan *fetchedDiff
finalizeCh chan *blockSummary

ctx context.Context
ctxCancel context.CancelFunc
Expand Down Expand Up @@ -185,8 +197,9 @@ func NewNode(
stateStore: db,
bucketName: bucket,

blockCh: channels.NewInfiniteChannel(),
diffCh: make(chan *fetchedDiff),
blockCh: channels.NewInfiniteChannel(),
diffCh: make(chan *fetchedDiff),
finalizeCh: make(chan *blockSummary),

quitCh: make(chan struct{}),
initCh: make(chan struct{}),
Expand Down Expand Up @@ -400,6 +413,32 @@ func (n *Node) fetchDiff(round uint64, prevRoot *urkelNode.Root, thisRoot *urkel
}
}

func (n *Node) finalize(summary *blockSummary) {
err := n.localStorage.Finalize(n.ctx, summary.Namespace, summary.Round, []hash.Hash{
summary.IORoot.Hash,
summary.StateRoot.Hash,
})
switch err {
case nil:
n.logger.Debug("storage round finalized",
"round", summary.Round,
)
case storageApi.ErrAlreadyFinalized:
// This can happen if we are restoring after a roothash migration or if
// we crashed before updating the sync state.
n.logger.Warn("storage round already finalized",
"round", summary.Round,
)
default:
n.logger.Error("failed to finalize storage round",
"err", err,
"round", summary.Round,
)
}

n.finalizeCh <- summary
}

type inFlight struct {
outstanding outstandingMask
awaitingRetry outstandingMask
Expand Down Expand Up @@ -450,21 +489,36 @@ func (n *Node) worker() { // nolint: gocyclo
"last_synced", cachedLastRound,
)

outOfOrderDone := &outOfOrderQueue{}
outOfOrderDiffs := &outOfOrderRoundQueue{}
outOfOrderApplieds := &outOfOrderRoundQueue{}
syncingRounds := make(map[uint64]*inFlight)
hashCache := make(map[uint64]*blockSummary)
lastFullyAppliedRound := cachedLastRound

heap.Init(outOfOrderDone)
heap.Init(outOfOrderDiffs)

close(n.initCh)

// Main processing loop. When a new block comes in, its state and io roots are inspected and their
// writelogs fetched from remote storage nodes in case we don't have them locally yet. Fetches are
// asynchronous and, once complete, trigger local Apply operations. These are serialized
// per round (all applies for a given round have to be complete before applying anyting for following
// rounds) using the outOfOrderDiffs priority queue and outOfOrderApplieds. Once a round has all its write
// logs applied, a Finalize for it is triggered, again serialized by round but otherwise asynchronous
// (outOfOrderApplieds and cachedLastRound).
mainLoop:
for {
if len(*outOfOrderDone) > 0 && cachedLastRound+1 == (*outOfOrderDone)[0].round {
lastDiff := heap.Pop(outOfOrderDone).(*fetchedDiff)
// Drain the Apply and Finalize queues first, before waiting for new events in the select
// below. Applies are drained first, followed by finalizations (which are asynchronous
// but serialized, i.e. only one Finalize can be in progress at a time).

// Apply any writelogs that came in through fetchDiff, but only if they are for the round
// after the last fully applied one (lastFullyAppliedRound).
if len(*outOfOrderDiffs) > 0 && lastFullyAppliedRound+1 == (*outOfOrderDiffs)[0].GetRound() {
lastDiff := heap.Pop(outOfOrderDiffs).(*fetchedDiff)
// Apply the write log if one exists.
if lastDiff.fetched {
_, err := n.localStorage.Apply(n.ctx, lastDiff.thisRoot.Namespace,
_, err = n.localStorage.Apply(n.ctx, lastDiff.thisRoot.Namespace,
lastDiff.prevRoot.Round, lastDiff.prevRoot.Hash,
lastDiff.thisRoot.Round, lastDiff.thisRoot.Hash,
lastDiff.writeLog)
Expand All @@ -477,7 +531,8 @@ mainLoop:
}
}

// Check if we have synced the given round.
// Check if we have fully synced the given round. If we have, we can proceed
// with the Finalize operation.
syncing := syncingRounds[lastDiff.round]
syncing.outstanding &= ^lastDiff.fetchMask
if syncing.outstanding == maskNone && syncing.awaitingRetry == maskNone {
Expand All @@ -486,48 +541,30 @@ mainLoop:
summary := hashCache[lastDiff.round]
delete(hashCache, lastDiff.round-1)

// Finalize storage for this round.
err := n.localStorage.Finalize(n.ctx, lastDiff.thisRoot.Namespace, lastDiff.round, []hash.Hash{
summary.IORoot.Hash,
summary.StateRoot.Hash,
})
switch err {
case nil:
n.logger.Debug("storage round finalized",
"round", lastDiff.round,
)
case storageApi.ErrAlreadyFinalized:
// This can happen if we are restoring after a roothash migration or if
// we crashed before updating the sync state.
n.logger.Warn("storage round already finalized",
"round", lastDiff.round,
)
default:
n.logger.Error("failed to finalize storage round",
"err", err,
"round", lastDiff.round,
)
}

n.syncedLock.Lock()
n.syncedState.LastBlock.Round = lastDiff.round
n.syncedState.LastBlock.IORoot = summary.IORoot
n.syncedState.LastBlock.StateRoot = summary.StateRoot
err = n.stateStore.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(n.bucketName)
bytes := cbor.Marshal(&n.syncedState)
return bkt.Put(n.commonNode.RuntimeID[:], bytes)
})
n.syncedLock.Unlock()
cachedLastRound = lastDiff.round
if err != nil {
n.logger.Error("can't store watcher state to database", "err", err)
}
// Finalize storage for this round. This happens asynchronously
// with respect to Apply operations for subsequent rounds.
lastFullyAppliedRound = lastDiff.round
heap.Push(outOfOrderApplieds, summary)
}

continue
}

// Check if any new rounds were fully applied and need to be finalized. Only finalize
// if it's the round after the one that was finalized last (cachedLastRound).
// The finalization happens asynchronously with respect to this worker loop and any
// applies that happen for subsequent rounds (which can proceed while earlier rounds are
// still finalizing).
if len(*outOfOrderApplieds) > 0 && cachedLastRound+1 == (*outOfOrderApplieds)[0].GetRound() {
lastSummary := heap.Pop(outOfOrderApplieds).(*blockSummary)
fetcherGroup.Add(1)
go func() {
defer fetcherGroup.Done()
n.finalize(lastSummary)
}()
continue
}

select {
case prunedBlk := <-pruneCh:
n.logger.Debug("pruning storage for round", "round", prunedBlk.Round)
Expand All @@ -536,7 +573,7 @@ mainLoop:
var ns common.Namespace
copy(ns[:], prunedBlk.RuntimeID[:])

if _, err := n.localStorage.Prune(n.ctx, ns, prunedBlk.Round); err != nil {
if _, err = n.localStorage.Prune(n.ctx, ns, prunedBlk.Round); err != nil {
n.logger.Error("failed to prune block",
"err", err,
)
Expand Down Expand Up @@ -573,7 +610,8 @@ mainLoop:
if _, ok := hashCache[i]; ok {
continue
}
oldBlock, err := n.commonNode.Roothash.GetBlock(n.ctx, n.commonNode.RuntimeID, i)
var oldBlock *block.Block
oldBlock, err = n.commonNode.Roothash.GetBlock(n.ctx, n.commonNode.RuntimeID, i)
if err != nil {
n.logger.Error("can't get block for round",
"err", err,
Expand Down Expand Up @@ -647,7 +685,25 @@ mainLoop:
syncingRounds[item.round].outstanding &= ^item.fetchMask
syncingRounds[item.round].awaitingRetry |= item.fetchMask
} else {
heap.Push(outOfOrderDone, item)
heap.Push(outOfOrderDiffs, item)
}

case finalized := <-n.finalizeCh:
// No further sync or out of order handling needed here, since
// only one finalize at a time is triggered (for round cachedLastRound+1)
n.syncedLock.Lock()
n.syncedState.LastBlock.Round = finalized.Round
n.syncedState.LastBlock.IORoot = finalized.IORoot
n.syncedState.LastBlock.StateRoot = finalized.StateRoot
err = n.stateStore.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(n.bucketName)
bytes := cbor.Marshal(&n.syncedState)
return bkt.Put(n.commonNode.RuntimeID[:], bytes)
})
n.syncedLock.Unlock()
cachedLastRound = finalized.Round
if err != nil {
n.logger.Error("can't store watcher state to database", "err", err)
}

case <-n.ctx.Done():
Expand Down

0 comments on commit 2fdfd28

Please sign in to comment.