Skip to content

Commit

Permalink
go/storage: Integrate pruning and finalization into storage worker
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Aug 13, 2019
1 parent f45bc19 commit 84145c7
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 31 deletions.
43 changes: 40 additions & 3 deletions go/storage/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,41 @@ var (
// ErrCantProve is the error returned when the backend is incapable
// of generating proofs (unsupported, no key, etc).
ErrCantProve = errors.New("storage: unable to provide proofs")

// ErrNoRoots is the error returned when the generated receipt would
// not contain any roots.
ErrNoRoots = errors.New("storage: no roots to generate receipt for")

// ErrExpectedRootMismatch is the error returned when the expected root
// does not match the computed root.
ErrExpectedRootMismatch = errors.New("storage: expected root mismatch")
// ErrUnsupported is the error returned when the called method is not
// supported by the given backend.
ErrUnsupported = errors.New("storage: method not supported by backend")

// The following errors are reimports from NodeDB.

// ErrNodeNotFound indicates that a node with the specified hash couldn't be found
// in the database.
ErrNodeNotFound = nodedb.ErrNodeNotFound
// ErrWriteLogNotFound indicates that a write log for the specified storage hashes
// couldn't be found.
ErrWriteLogNotFound = nodedb.ErrWriteLogNotFound
// ErrNotFinalized indicates that the operation requires a round to be finalized
// but the round is not yet finalized.
ErrNotFinalized = nodedb.ErrNotFinalized
// ErrAlreadyFinalized indicates that the given round has already been finalized.
ErrAlreadyFinalized = nodedb.ErrAlreadyFinalized
// ErrRoundNotFound indicates that the given round cannot be found.
ErrRoundNotFound = nodedb.ErrRoundNotFound
// ErrPreviousRoundMismatch indicates that the round given for the old root does
// not match the previous round.
ErrPreviousRoundMismatch = nodedb.ErrPreviousRoundMismatch
// ErrRoundWentBackwards indicates that the new round is earlier than an already
// inserted round.
ErrRoundWentBackwards = nodedb.ErrRoundWentBackwards
// ErrRootNotFound indicates that the given root cannot be found.
ErrRootNotFound = nodedb.ErrRootNotFound
// ErrRootMustFollowOld indicates that the passed new root does not follow old root.
ErrRootMustFollowOld = nodedb.ErrRootMustFollowOld

// ReceiptSignatureContext is the signature context used for verifying MKVS receipts.
ReceiptSignatureContext = []byte("EkStrRct")
Expand Down Expand Up @@ -224,7 +251,17 @@ type LocalBackend interface {
Backend

// HasRoot checks if the storage backend contains the specified storage root.
HasRoot(Root) bool
HasRoot(root Root) bool

// Finalize finalizes the specified round. The passed list of roots are the
// roots within the round that have been finalized. All non-finalized roots
// can be discarded.
Finalize(ctx context.Context, namespace common.Namespace, round uint64, roots []hash.Hash) error

// Prune removes all roots recorded under the given namespace and round.
//
// Returns the number of pruned nodes.
Prune(ctx context.Context, namespace common.Namespace, round uint64) (int, error)
}

// ClientBackend is a storage client backend implementation.
Expand Down
8 changes: 8 additions & 0 deletions go/storage/badger/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ func (ba *badgerBackend) HasRoot(root api.Root) bool {
return ba.nodedb.HasRoot(root)
}

func (ba *badgerBackend) Finalize(ctx context.Context, namespace common.Namespace, round uint64, roots []hash.Hash) error {
return ba.nodedb.Finalize(ctx, namespace, round, roots)
}

func (ba *badgerBackend) Prune(ctx context.Context, namespace common.Namespace, round uint64) (int, error) {
return ba.nodedb.Prune(ctx, namespace, round)
}

// NewLogAdapter returns a badger.Logger backed by an ekiden logger.
func NewLogAdapter(logger *logging.Logger) badger.Logger {
return &badgerLogger{
Expand Down
8 changes: 8 additions & 0 deletions go/storage/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ func (b *leveldbBackend) HasRoot(root api.Root) bool {
return b.nodedb.HasRoot(root)
}

func (b *leveldbBackend) Finalize(ctx context.Context, namespace common.Namespace, round uint64, roots []hash.Hash) error {
return b.nodedb.Finalize(ctx, namespace, round, roots)
}

func (b *leveldbBackend) Prune(ctx context.Context, namespace common.Namespace, round uint64) (int, error) {
return b.nodedb.Prune(ctx, namespace, round)
}

func (b *leveldbBackend) Cleanup() {
b.closeOnce.Do(func() {
b.nodedb.Close()
Expand Down
8 changes: 8 additions & 0 deletions go/storage/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,14 @@ func (b *memoryBackend) HasRoot(root api.Root) bool {
return b.nodedb.HasRoot(root)
}

func (b *memoryBackend) Finalize(ctx context.Context, namespace common.Namespace, round uint64, roots []hash.Hash) error {
return b.nodedb.Finalize(ctx, namespace, round, roots)
}

func (b *memoryBackend) Prune(ctx context.Context, namespace common.Namespace, round uint64) (int, error) {
return b.nodedb.Prune(ctx, namespace, round)
}

func (b *memoryBackend) Cleanup() {
b.nodedb.Close()
}
Expand Down
44 changes: 44 additions & 0 deletions go/storage/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,26 @@ var (
},
[]string{"call"},
)
storagePrunedCount = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "ekiden_storage_pruned",
Help: "Number of pruned nodes.",
},
)
storageFinalizedCount = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "ekiden_storage_finalized",
Help: "Number of finalized rounds.",
},
)

storageCollectors = []prometheus.Collector{
storageFailures,
storageCalls,
storageLatency,
storageValueSize,
storagePrunedCount,
storageFinalizedCount,
}

labelApply = prometheus.Labels{"call": "apply"}
Expand All @@ -59,6 +73,8 @@ var (
labelGetPath = prometheus.Labels{"call": "get_path"}
labelGetNode = prometheus.Labels{"call": "get_node"}
labelHasRoot = prometheus.Labels{"call": "has_root"}
labelFinalize = prometheus.Labels{"call": "finalize"}
labelPrune = prometheus.Labels{"call": "prune"}

_ api.LocalBackend = (*metricsWrapper)(nil)
_ api.ClientBackend = (*metricsWrapper)(nil)
Expand Down Expand Up @@ -188,6 +204,34 @@ func (w *metricsWrapper) HasRoot(root api.Root) bool {
return flag
}

func (w *metricsWrapper) Finalize(ctx context.Context, namespace common.Namespace, round uint64, roots []hash.Hash) error {
localBackend, ok := w.Backend.(api.LocalBackend)
if !ok {
return api.ErrUnsupported
}
start := time.Now()
err := localBackend.Finalize(ctx, namespace, round, roots)
storageLatency.With(labelFinalize).Observe(time.Since(start).Seconds())
storageCalls.With(labelFinalize).Inc()
if err == nil {
storageFinalizedCount.Inc()
}
return err
}

func (w *metricsWrapper) Prune(ctx context.Context, namespace common.Namespace, round uint64) (int, error) {
localBackend, ok := w.Backend.(api.LocalBackend)
if !ok {
return 0, api.ErrUnsupported
}
start := time.Now()
pruned, err := localBackend.Prune(ctx, namespace, round)
storageLatency.With(labelPrune).Observe(time.Since(start).Seconds())
storageCalls.With(labelPrune).Inc()
storagePrunedCount.Add(float64(pruned))
return pruned, err
}

func newMetricsWrapper(base api.Backend) api.Backend {
metricsOnce.Do(func() {
prometheus.MustRegister(storageCollectors...)
Expand Down
2 changes: 1 addition & 1 deletion go/worker/common/host/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (h *mockHost) MakeRequest(ctx context.Context, body *protocol.Body) (<-chan
_ = tree.Insert(ctx, block.IoKeyInputs, rq.Inputs.MarshalCBOR())
_ = tree.Insert(ctx, block.IoKeyOutputs, rq.Inputs.MarshalCBOR())
_ = tree.Insert(ctx, block.IoKeyTags, cbor.Marshal(tags))
ioWriteLog, ioRoot, err := tree.Commit(ctx, rq.Block.Header.Namespace, rq.Block.Header.Round)
ioWriteLog, ioRoot, err := tree.Commit(ctx, rq.Block.Header.Namespace, rq.Block.Header.Round+1)
if err != nil {
ch <- &protocol.Body{Error: &protocol.Error{Message: "(mock) failed to create I/O tree"}}
break
Expand Down
115 changes: 95 additions & 20 deletions go/worker/storage/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/oasislabs/ekiden/go/common/grpc"
"github.com/oasislabs/ekiden/go/common/logging"
"github.com/oasislabs/ekiden/go/common/node"
"github.com/oasislabs/ekiden/go/common/pubsub"
"github.com/oasislabs/ekiden/go/common/workerpool"
roothashApi "github.com/oasislabs/ekiden/go/roothash/api"
"github.com/oasislabs/ekiden/go/roothash/api/block"
Expand Down Expand Up @@ -176,11 +177,18 @@ func NewNode(

node.ctx, node.ctxCancel = context.WithCancel(context.Background())

// Create a new storage client that will be used for remote sync.
scl, err := client.New(node.ctx, node.commonNode.Identity.TLSCertificate, node.commonNode.Scheduler, node.commonNode.Registry)
if err != nil {
return nil, err
}
node.storageClient = scl.(storageApi.ClientBackend)
if err := node.storageClient.WatchRuntime(commonNode.RuntimeID); err != nil {
node.logger.Error("error watching storage runtime",
"err", err,
)
return nil, err
}

return node, nil
}
Expand Down Expand Up @@ -283,26 +291,41 @@ func (n *Node) GetLastSynced() (uint64, hash.Hash, hash.Hash) {

func (n *Node) fetchDiff(round uint64, prevRoot *urkelNode.Root, thisRoot *urkelNode.Root) error {
var writeLog storageApi.WriteLog
// Check if the new root doesn't already exist.
if !n.localStorage.HasRoot(*thisRoot) {
n.logger.Debug("calling GetDiff", "previous_root", prevRoot, "root", thisRoot)
it, err := n.storageClient.GetDiff(n.ctx, *prevRoot, *thisRoot)
if err != nil {
return err
}
for {
more, err := it.Next()
if thisRoot.Hash.Equal(&prevRoot.Hash) {
// Even if HasRoot returns false the root can still exist if it is equal
// to the previous root and the root was emitted by the consensus committee
// directly (e.g., during an epoch transition). In this case we need to
// still apply the (empty) write log.
writeLog = storageApi.WriteLog{}
} else {
// New root does not yet exist in storage and we need to fetch it from a
// remote node.
n.logger.Debug("calling GetDiff",
"previous_root", prevRoot,
"root", thisRoot,
)

it, err := n.storageClient.GetDiff(n.ctx, *prevRoot, *thisRoot)
if err != nil {
return err
}
if !more {
break
}
for {
more, err := it.Next()
if err != nil {
return err
}
if !more {
break
}

chunk, err := it.Value()
if err != nil {
return err
chunk, err := it.Value()
if err != nil {
return err
}
writeLog = append(writeLog, chunk)
}
writeLog = append(writeLog, chunk)
}
}
n.diffCh <- &fetchedDiff{
Expand All @@ -318,7 +341,7 @@ type inFlight struct {
outstanding int
}

func (n *Node) worker() {
func (n *Node) worker() { // nolint: gocyclo
defer close(n.quitCh)
defer close(n.diffCh)

Expand All @@ -338,10 +361,20 @@ func (n *Node) worker() {
genesisBlock, err := n.commonNode.Roothash.GetGenesisBlock(n.ctx, n.commonNode.RuntimeID)
if err != nil {
n.logger.Error("can't retrieve genesis block", "err", err)
panic("can't retrieve genesis block")
return
}
n.undefinedRound = genesisBlock.Header.Round - 1

// Subscribe to pruned roothash blocks.
var pruneCh <-chan *roothashApi.PrunedBlock
var pruneSub *pubsub.Subscription
pruneCh, pruneSub, err = n.commonNode.Roothash.WatchPrunedBlocks()
if err != nil {
n.logger.Error("failed to watch pruned blocks", "err", err)
return
}
defer pruneSub.Close()

var fetcherGroup sync.WaitGroup

n.syncedLock.RLock()
Expand All @@ -368,14 +401,18 @@ mainLoop:
for {
if len(*outOfOrderDone) > 0 && cachedLastRound+1 == (*outOfOrderDone)[0].round {
lastDiff := heap.Pop(outOfOrderDone).(*fetchedDiff)
// Check if we already had the writelog and apply it if not.
// Apply the write log if one exists.
if lastDiff.writeLog != nil {
_, err := n.localStorage.Apply(n.ctx, lastDiff.thisRoot.Namespace,
lastDiff.prevRoot.Round, lastDiff.prevRoot.Hash,
lastDiff.thisRoot.Round, lastDiff.thisRoot.Hash,
lastDiff.writeLog)
if err != nil {
n.logger.Error("can't apply write log", "err", err)
n.logger.Error("can't apply write log",
"err", err,
"prev_root", lastDiff.prevRoot,
"root", lastDiff.thisRoot,
)
}
}

Expand All @@ -388,11 +425,34 @@ mainLoop:
summary := hashCache[lastDiff.round]
delete(hashCache, lastDiff.round-1)

// Finalize storage for this round.
err := n.localStorage.Finalize(n.ctx, lastDiff.thisRoot.Namespace, lastDiff.round, []hash.Hash{
summary.IORoot.Hash,
summary.StateRoot.Hash,
})
switch err {
case nil:
n.logger.Debug("storage round finalized",
"round", lastDiff.round,
)
case storageApi.ErrAlreadyFinalized:
// This can happen if we are restoring after a roothash migration or if
// we crashed before updating the sync state.
n.logger.Warn("storage round already finalized",
"round", lastDiff.round,
)
default:
n.logger.Error("failed to finalize storage round",
"err", err,
"round", lastDiff.round,
)
}

n.syncedLock.Lock()
n.syncedState.LastBlock.Round = lastDiff.round
n.syncedState.LastBlock.IORoot = summary.IORoot
n.syncedState.LastBlock.StateRoot = summary.StateRoot
err := n.stateStore.Update(func(tx *bolt.Tx) error {
err = n.stateStore.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(n.bucketName)
bytes := cbor.Marshal(&n.syncedState)
return bkt.Put(n.commonNode.RuntimeID[:], bytes)
Expand All @@ -408,6 +468,19 @@ mainLoop:
}

select {
case prunedBlk := <-pruneCh:
n.logger.Debug("pruning storage for round", "round", prunedBlk.Round)

// Prune given block.
var ns common.Namespace
copy(ns[:], prunedBlk.RuntimeID[:])

if _, err := n.localStorage.Prune(n.ctx, ns, prunedBlk.Round); err != nil {
n.logger.Error("failed to prune block",
"err", err,
)
continue mainLoop
}
case inBlk := <-n.blockCh.Out():
blk := inBlk.(*block.Block)
n.logger.Debug("incoming block",
Expand All @@ -418,10 +491,12 @@ mainLoop:
if _, ok := hashCache[cachedLastRound]; !ok && cachedLastRound == n.undefinedRound {
dummy := blockSummary{
Namespace: blk.Header.Namespace,
Round: cachedLastRound,
Round: cachedLastRound + 1,
}
dummy.IORoot.Empty()
dummy.IORoot.Round = cachedLastRound + 1
dummy.StateRoot.Empty()
dummy.StateRoot.Round = cachedLastRound + 1
hashCache[cachedLastRound] = &dummy
}
for i := cachedLastRound; i < blk.Header.Round; i++ {
Expand Down
Loading

0 comments on commit 84145c7

Please sign in to comment.