Skip to content

Commit

Permalink
go/storage: Fix worker root tracking, implement retries, internal grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
jberci committed Aug 22, 2019
1 parent f25b4e9 commit 74d3d4a
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 54 deletions.
167 changes: 113 additions & 54 deletions go/worker/storage/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"container/heap"
"context"
"errors"
"fmt"
"strings"
"sync"

"github.com/eapache/channels"
Expand Down Expand Up @@ -39,13 +41,37 @@ const (
defaultUndefinedRound = ^uint64(0)
)

// outstandingMask records which storage roots still need to be synced or need to be retried.
type outstandingMask uint

const (
maskNone = outstandingMask(0x0)
maskIO = outstandingMask(0x1)
maskState = outstandingMask(0x2)
maskAll = maskIO | maskState
)

func (o outstandingMask) String() string {
var represented []string
if o&maskIO != 0 {
represented = append(represented, "io")
}
if o&maskState != 0 {
represented = append(represented, "state")
}
return fmt.Sprintf("outstanding_mask{%s}", strings.Join(represented, ", "))
}

// Syncing task context and support functions for container/heap.

type fetchedDiff struct {
round uint64
prevRoot urkelNode.Root
thisRoot urkelNode.Root
writeLog storageApi.WriteLog
fetchMask outstandingMask
fetched bool
err error
round uint64
prevRoot urkelNode.Root
thisRoot urkelNode.Root
writeLog storageApi.WriteLog
}

type outOfOrderQueue []*fetchedDiff
Expand Down Expand Up @@ -282,63 +308,71 @@ func (n *Node) HandleNewEventLocked(*roothashApi.Event) {
// Watcher implementation.

// GetLastSynced returns the height, IORoot hash and StateRoot hash of the last block that was fully synced to.
func (n *Node) GetLastSynced() (uint64, hash.Hash, hash.Hash) {
func (n *Node) GetLastSynced() (uint64, urkelNode.Root, urkelNode.Root) {
n.syncedLock.RLock()
defer n.syncedLock.RUnlock()

return n.syncedState.LastBlock.Round, n.syncedState.LastBlock.IORoot.Hash, n.syncedState.LastBlock.StateRoot.Hash
return n.syncedState.LastBlock.Round, n.syncedState.LastBlock.IORoot, n.syncedState.LastBlock.StateRoot
}

func (n *Node) fetchDiff(round uint64, prevRoot *urkelNode.Root, thisRoot *urkelNode.Root) error {
var writeLog storageApi.WriteLog
func (n *Node) fetchDiff(round uint64, prevRoot *urkelNode.Root, thisRoot *urkelNode.Root, fetchMask outstandingMask) {
result := &fetchedDiff{
fetchMask: fetchMask,
fetched: false,
round: round,
prevRoot: *prevRoot,
thisRoot: *thisRoot,
}
defer func() {
n.diffCh <- result
}()
// Check if the new root doesn't already exist.
if !n.localStorage.HasRoot(*thisRoot) {
result.fetched = true
if thisRoot.Hash.Equal(&prevRoot.Hash) {
// Even if HasRoot returns false the root can still exist if it is equal
// to the previous root and the root was emitted by the consensus committee
// directly (e.g., during an epoch transition). In this case we need to
// still apply the (empty) write log.
writeLog = storageApi.WriteLog{}
result.writeLog = storageApi.WriteLog{}
} else {
// New root does not yet exist in storage and we need to fetch it from a
// remote node.
n.logger.Debug("calling GetDiff",
"previous_root", prevRoot,
"root", thisRoot,
"old_root", prevRoot,
"new_root", thisRoot,
"fetch_mask", fetchMask,
)

it, err := n.storageClient.GetDiff(n.ctx, *prevRoot, *thisRoot)
if err != nil {
return err
result.err = err
return
}
for {
more, err := it.Next()
if err != nil {
return err
result.err = err
return
}
if !more {
break
}

chunk, err := it.Value()
if err != nil {
return err
result.err = err
return
}
writeLog = append(writeLog, chunk)
result.writeLog = append(result.writeLog, chunk)
}
}
}
n.diffCh <- &fetchedDiff{
round: round,
prevRoot: *prevRoot,
thisRoot: *thisRoot,
writeLog: writeLog,
}
return nil
}

type inFlight struct {
outstanding int
outstanding outstandingMask
awaitingRetry outstandingMask
}

func (n *Node) worker() { // nolint: gocyclo
Expand Down Expand Up @@ -399,25 +433,25 @@ mainLoop:
if len(*outOfOrderDone) > 0 && cachedLastRound+1 == (*outOfOrderDone)[0].round {
lastDiff := heap.Pop(outOfOrderDone).(*fetchedDiff)
// Apply the write log if one exists.
if lastDiff.writeLog != nil {
if lastDiff.fetched {
_, err := n.localStorage.Apply(n.ctx, lastDiff.thisRoot.Namespace,
lastDiff.prevRoot.Round, lastDiff.prevRoot.Hash,
lastDiff.thisRoot.Round, lastDiff.thisRoot.Hash,
lastDiff.writeLog)
if err != nil {
n.logger.Error("can't apply write log",
"err", err,
"prev_root", lastDiff.prevRoot,
"root", lastDiff.thisRoot,
"old_root", lastDiff.prevRoot,
"new_root", lastDiff.thisRoot,
)
}
}

// Check if we have synced the given round.
syncingRounds[lastDiff.round].outstanding--
if syncingRounds[lastDiff.round].outstanding == 0 {
syncing := syncingRounds[lastDiff.round]
syncing.outstanding &= ^lastDiff.fetchMask
if syncing.outstanding == maskNone && syncing.awaitingRetry == maskNone {
n.logger.Debug("finished syncing round", "round", lastDiff.round)

delete(syncingRounds, lastDiff.round)
summary := hashCache[lastDiff.round]
delete(hashCache, lastDiff.round-1)
Expand Down Expand Up @@ -478,6 +512,7 @@ mainLoop:
)
continue mainLoop
}

case inBlk := <-n.blockCh.Out():
blk := inBlk.(*block.Block)
n.logger.Debug("incoming block",
Expand All @@ -502,7 +537,11 @@ mainLoop:
}
oldBlock, err := n.commonNode.Roothash.GetBlock(n.ctx, n.commonNode.RuntimeID, i)
if err != nil {
n.logger.Error("can't get block for round", "err", err, "round", i, "current_round", blk.Header.Round)
n.logger.Error("can't get block for round",
"err", err,
"round", i,
"current_round", blk.Header.Round,
)
panic("can't get block in storage worker")
}
hashCache[i] = summaryFromBlock(oldBlock)
Expand All @@ -512,16 +551,23 @@ mainLoop:
}

for i := cachedLastRound + 1; i <= blk.Header.Round; i++ {
if _, ok := syncingRounds[i]; ok {
syncing, ok := syncingRounds[i]
if ok && syncing.outstanding == maskAll {
continue
}

n.logger.Debug("going to sync round", "round", i)

syncingRounds[i] = &inFlight{
// We are syncing two roots, I/O root and state root.
outstanding: 2,
if !ok {
syncing = &inFlight{
outstanding: maskNone,
awaitingRetry: maskAll,
}
syncingRounds[i] = syncing
}
n.logger.Debug("preparing round sync",
"round", i,
"outstanding_mask", syncing.outstanding,
"awaiting_retry", syncing.awaitingRetry,
)

prev := hashCache[i-1] // Closures take refs, so they need new variables here.
this := hashCache[i]
Expand All @@ -530,28 +576,41 @@ mainLoop:
Round: this.IORoot.Round,
}
prevIORoot.Hash.Empty()
fetcherGroup.Add(1)
n.fetchPool.Submit(func() {
defer fetcherGroup.Done()

err := n.fetchDiff(this.Round, &prevIORoot, &this.IORoot)
if err != nil {
n.logger.Error("error getting block io difference to round", "err", err, "round", this.Round)
}
})
fetcherGroup.Add(1)
n.fetchPool.Submit(func() {
defer fetcherGroup.Done()

err := n.fetchDiff(this.Round, &prev.StateRoot, &this.StateRoot)
if err != nil {
n.logger.Error("error getting block state difference to round", "err", err, "round", this.Round)
}
})
if (syncing.outstanding&maskIO) == 0 && (syncing.awaitingRetry&maskIO) != 0 {
syncing.outstanding |= maskIO
syncing.awaitingRetry &= ^maskIO
fetcherGroup.Add(1)
n.fetchPool.Submit(func() {
defer fetcherGroup.Done()
n.fetchDiff(this.Round, &prevIORoot, &this.IORoot, maskIO)
})
}
if (syncing.outstanding&maskState) == 0 && (syncing.awaitingRetry&maskState) != 0 {
syncing.outstanding |= maskState
syncing.awaitingRetry &= ^maskState
fetcherGroup.Add(1)
n.fetchPool.Submit(func() {
defer fetcherGroup.Done()
n.fetchDiff(this.Round, &prev.StateRoot, &this.StateRoot, maskState)
})
}
}

case item := <-n.diffCh:
heap.Push(outOfOrderDone, item)
if item.err != nil {
n.logger.Error("error calling getdiff",
"err", item.err,
"round", item.round,
"old_root", item.prevRoot,
"new_root", item.thisRoot,
"fetch_mask", item.fetchMask,
)
syncingRounds[item.round].outstanding &= ^item.fetchMask
syncingRounds[item.round].awaitingRetry |= item.fetchMask
} else {
heap.Push(outOfOrderDone, item)
}

case <-n.ctx.Done():
break mainLoop
Expand Down
50 changes: 50 additions & 0 deletions go/worker/storage/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package storage

import (
"context"

"github.com/pkg/errors"

"github.com/oasislabs/ekiden/go/common/crypto/signature"
"github.com/oasislabs/ekiden/go/common/grpc"
pb "github.com/oasislabs/ekiden/go/grpc/storage"
"github.com/oasislabs/ekiden/go/worker/storage/committee"
)

var (
_ pb.StorageWorkerServer = (*grpcServer)(nil)

// ErrRuntimeNotFound is the error returned when the called references an unknown runtime.
ErrRuntimeNotFound = errors.New("worker/storage: runtime not found")
)

type grpcServer struct {
w *Worker
}

func (s *grpcServer) GetLastSyncedRound(ctx context.Context, req *pb.GetLastSyncedRoundRequest) (*pb.GetLastSyncedRoundResponse, error) {
var id signature.PublicKey
if err := id.UnmarshalBinary(req.GetRuntimeId()); err != nil {
return nil, err
}

var node *committee.Node
node, ok := s.w.runtimes[id.ToMapKey()]
if !ok {
return nil, ErrRuntimeNotFound
}

round, ioRoot, stateRoot := node.GetLastSynced()

resp := &pb.GetLastSyncedRoundResponse{
Round: round,
IoRoot: ioRoot.MarshalCBOR(),
StateRoot: stateRoot.MarshalCBOR(),
}
return resp, nil
}

func newGRPCServer(grpc *grpc.Server, w *Worker) {
s := &grpcServer{w}
pb.RegisterStorageWorkerServer(grpc.Server(), s)
}

0 comments on commit 74d3d4a

Please sign in to comment.