From 84145c7e4656c6b70e5a216aebb963070207c9c4 Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Tue, 16 Jul 2019 18:28:34 +0200 Subject: [PATCH] go/storage: Integrate pruning and finalization into storage worker --- go/storage/api/api.go | 43 ++++++++++- go/storage/badger/badger.go | 8 ++ go/storage/leveldb/leveldb.go | 8 ++ go/storage/memory/memory.go | 8 ++ go/storage/metrics.go | 44 +++++++++++ go/worker/common/host/mock.go | 2 +- go/worker/storage/committee/node.go | 115 +++++++++++++++++++++++----- go/worker/storage/worker.go | 19 +++-- 8 files changed, 216 insertions(+), 31 deletions(-) diff --git a/go/storage/api/api.go b/go/storage/api/api.go index cf48af2a9a0..09f0493671c 100644 --- a/go/storage/api/api.go +++ b/go/storage/api/api.go @@ -20,14 +20,41 @@ var ( // ErrCantProve is the error returned when the backend is incapable // of generating proofs (unsupported, no key, etc). ErrCantProve = errors.New("storage: unable to provide proofs") - // ErrNoRoots is the error returned when the generated receipt would // not contain any roots. ErrNoRoots = errors.New("storage: no roots to generate receipt for") - // ErrExpectedRootMismatch is the error returned when the expected root // does not match the computed root. ErrExpectedRootMismatch = errors.New("storage: expected root mismatch") + // ErrUnsupported is the error returned when the called method is not + // supported by the given backend. + ErrUnsupported = errors.New("storage: method not supported by backend") + + // The following errors are reimports from NodeDB. + + // ErrNodeNotFound indicates that a node with the specified hash couldn't be found + // in the database. + ErrNodeNotFound = nodedb.ErrNodeNotFound + // ErrWriteLogNotFound indicates that a write log for the specified storage hashes + // couldn't be found. + ErrWriteLogNotFound = nodedb.ErrWriteLogNotFound + // ErrNotFinalized indicates that the operation requires a round to be finalized + // but the round is not yet finalized. + ErrNotFinalized = nodedb.ErrNotFinalized + // ErrAlreadyFinalized indicates that the given round has already been finalized. + ErrAlreadyFinalized = nodedb.ErrAlreadyFinalized + // ErrRoundNotFound indicates that the given round cannot be found. + ErrRoundNotFound = nodedb.ErrRoundNotFound + // ErrPreviousRoundMismatch indicates that the round given for the old root does + // not match the previous round. + ErrPreviousRoundMismatch = nodedb.ErrPreviousRoundMismatch + // ErrRoundWentBackwards indicates that the new round is earlier than an already + // inserted round. + ErrRoundWentBackwards = nodedb.ErrRoundWentBackwards + // ErrRootNotFound indicates that the given root cannot be found. + ErrRootNotFound = nodedb.ErrRootNotFound + // ErrRootMustFollowOld indicates that the passed new root does not follow old root. + ErrRootMustFollowOld = nodedb.ErrRootMustFollowOld // ReceiptSignatureContext is the signature context used for verifying MKVS receipts. ReceiptSignatureContext = []byte("EkStrRct") @@ -224,7 +251,17 @@ type LocalBackend interface { Backend // HasRoot checks if the storage backend contains the specified storage root. - HasRoot(Root) bool + HasRoot(root Root) bool + + // Finalize finalizes the specified round. The passed list of roots are the + // roots within the round that have been finalized. All non-finalized roots + // can be discarded. + Finalize(ctx context.Context, namespace common.Namespace, round uint64, roots []hash.Hash) error + + // Prune removes all roots recorded under the given namespace and round. + // + // Returns the number of pruned nodes. + Prune(ctx context.Context, namespace common.Namespace, round uint64) (int, error) } // ClientBackend is a storage client backend implementation. diff --git a/go/storage/badger/badger.go b/go/storage/badger/badger.go index 429f0c974a2..4712af74217 100644 --- a/go/storage/badger/badger.go +++ b/go/storage/badger/badger.go @@ -156,6 +156,14 @@ func (ba *badgerBackend) HasRoot(root api.Root) bool { return ba.nodedb.HasRoot(root) } +func (ba *badgerBackend) Finalize(ctx context.Context, namespace common.Namespace, round uint64, roots []hash.Hash) error { + return ba.nodedb.Finalize(ctx, namespace, round, roots) +} + +func (ba *badgerBackend) Prune(ctx context.Context, namespace common.Namespace, round uint64) (int, error) { + return ba.nodedb.Prune(ctx, namespace, round) +} + // NewLogAdapter returns a badger.Logger backed by an ekiden logger. func NewLogAdapter(logger *logging.Logger) badger.Logger { return &badgerLogger{ diff --git a/go/storage/leveldb/leveldb.go b/go/storage/leveldb/leveldb.go index e7dcd3e4bf7..0063537844f 100644 --- a/go/storage/leveldb/leveldb.go +++ b/go/storage/leveldb/leveldb.go @@ -113,6 +113,14 @@ func (b *leveldbBackend) HasRoot(root api.Root) bool { return b.nodedb.HasRoot(root) } +func (b *leveldbBackend) Finalize(ctx context.Context, namespace common.Namespace, round uint64, roots []hash.Hash) error { + return b.nodedb.Finalize(ctx, namespace, round, roots) +} + +func (b *leveldbBackend) Prune(ctx context.Context, namespace common.Namespace, round uint64) (int, error) { + return b.nodedb.Prune(ctx, namespace, round) +} + func (b *leveldbBackend) Cleanup() { b.closeOnce.Do(func() { b.nodedb.Close() diff --git a/go/storage/memory/memory.go b/go/storage/memory/memory.go index cfb3af337e7..1d2c51bbdc8 100644 --- a/go/storage/memory/memory.go +++ b/go/storage/memory/memory.go @@ -180,6 +180,14 @@ func (b *memoryBackend) HasRoot(root api.Root) bool { return b.nodedb.HasRoot(root) } +func (b *memoryBackend) Finalize(ctx context.Context, namespace common.Namespace, round uint64, roots []hash.Hash) error { + return b.nodedb.Finalize(ctx, namespace, round, roots) +} + +func (b *memoryBackend) Prune(ctx context.Context, namespace common.Namespace, round uint64) (int, error) { + return b.nodedb.Prune(ctx, namespace, round) +} + func (b *memoryBackend) Cleanup() { b.nodedb.Close() } diff --git a/go/storage/metrics.go b/go/storage/metrics.go index 24d9aff77c9..22c27663ba3 100644 --- a/go/storage/metrics.go +++ b/go/storage/metrics.go @@ -45,12 +45,26 @@ var ( }, []string{"call"}, ) + storagePrunedCount = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "ekiden_storage_pruned", + Help: "Number of pruned nodes.", + }, + ) + storageFinalizedCount = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "ekiden_storage_finalized", + Help: "Number of finalized rounds.", + }, + ) storageCollectors = []prometheus.Collector{ storageFailures, storageCalls, storageLatency, storageValueSize, + storagePrunedCount, + storageFinalizedCount, } labelApply = prometheus.Labels{"call": "apply"} @@ -59,6 +73,8 @@ var ( labelGetPath = prometheus.Labels{"call": "get_path"} labelGetNode = prometheus.Labels{"call": "get_node"} labelHasRoot = prometheus.Labels{"call": "has_root"} + labelFinalize = prometheus.Labels{"call": "finalize"} + labelPrune = prometheus.Labels{"call": "prune"} _ api.LocalBackend = (*metricsWrapper)(nil) _ api.ClientBackend = (*metricsWrapper)(nil) @@ -188,6 +204,34 @@ func (w *metricsWrapper) HasRoot(root api.Root) bool { return flag } +func (w *metricsWrapper) Finalize(ctx context.Context, namespace common.Namespace, round uint64, roots []hash.Hash) error { + localBackend, ok := w.Backend.(api.LocalBackend) + if !ok { + return api.ErrUnsupported + } + start := time.Now() + err := localBackend.Finalize(ctx, namespace, round, roots) + storageLatency.With(labelFinalize).Observe(time.Since(start).Seconds()) + storageCalls.With(labelFinalize).Inc() + if err == nil { + storageFinalizedCount.Inc() + } + return err +} + +func (w *metricsWrapper) Prune(ctx context.Context, namespace common.Namespace, round uint64) (int, error) { + localBackend, ok := w.Backend.(api.LocalBackend) + if !ok { + return 0, api.ErrUnsupported + } + start := time.Now() + pruned, err := localBackend.Prune(ctx, namespace, round) + storageLatency.With(labelPrune).Observe(time.Since(start).Seconds()) + storageCalls.With(labelPrune).Inc() + storagePrunedCount.Add(float64(pruned)) + return pruned, err +} + func newMetricsWrapper(base api.Backend) api.Backend { metricsOnce.Do(func() { prometheus.MustRegister(storageCollectors...) diff --git a/go/worker/common/host/mock.go b/go/worker/common/host/mock.go index 29db4983255..37c6e29735c 100644 --- a/go/worker/common/host/mock.go +++ b/go/worker/common/host/mock.go @@ -64,7 +64,7 @@ func (h *mockHost) MakeRequest(ctx context.Context, body *protocol.Body) (<-chan _ = tree.Insert(ctx, block.IoKeyInputs, rq.Inputs.MarshalCBOR()) _ = tree.Insert(ctx, block.IoKeyOutputs, rq.Inputs.MarshalCBOR()) _ = tree.Insert(ctx, block.IoKeyTags, cbor.Marshal(tags)) - ioWriteLog, ioRoot, err := tree.Commit(ctx, rq.Block.Header.Namespace, rq.Block.Header.Round) + ioWriteLog, ioRoot, err := tree.Commit(ctx, rq.Block.Header.Namespace, rq.Block.Header.Round+1) if err != nil { ch <- &protocol.Body{Error: &protocol.Error{Message: "(mock) failed to create I/O tree"}} break diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index 7f624d64d25..4941b1c899a 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -17,6 +17,7 @@ import ( "github.com/oasislabs/ekiden/go/common/grpc" "github.com/oasislabs/ekiden/go/common/logging" "github.com/oasislabs/ekiden/go/common/node" + "github.com/oasislabs/ekiden/go/common/pubsub" "github.com/oasislabs/ekiden/go/common/workerpool" roothashApi "github.com/oasislabs/ekiden/go/roothash/api" "github.com/oasislabs/ekiden/go/roothash/api/block" @@ -176,11 +177,18 @@ func NewNode( node.ctx, node.ctxCancel = context.WithCancel(context.Background()) + // Create a new storage client that will be used for remote sync. scl, err := client.New(node.ctx, node.commonNode.Identity.TLSCertificate, node.commonNode.Scheduler, node.commonNode.Registry) if err != nil { return nil, err } node.storageClient = scl.(storageApi.ClientBackend) + if err := node.storageClient.WatchRuntime(commonNode.RuntimeID); err != nil { + node.logger.Error("error watching storage runtime", + "err", err, + ) + return nil, err + } return node, nil } @@ -283,26 +291,41 @@ func (n *Node) GetLastSynced() (uint64, hash.Hash, hash.Hash) { func (n *Node) fetchDiff(round uint64, prevRoot *urkelNode.Root, thisRoot *urkelNode.Root) error { var writeLog storageApi.WriteLog + // Check if the new root doesn't already exist. if !n.localStorage.HasRoot(*thisRoot) { - n.logger.Debug("calling GetDiff", "previous_root", prevRoot, "root", thisRoot) - it, err := n.storageClient.GetDiff(n.ctx, *prevRoot, *thisRoot) - if err != nil { - return err - } - for { - more, err := it.Next() + if thisRoot.Hash.Equal(&prevRoot.Hash) { + // Even if HasRoot returns false the root can still exist if it is equal + // to the previous root and the root was emitted by the consensus committee + // directly (e.g., during an epoch transition). In this case we need to + // still apply the (empty) write log. + writeLog = storageApi.WriteLog{} + } else { + // New root does not yet exist in storage and we need to fetch it from a + // remote node. + n.logger.Debug("calling GetDiff", + "previous_root", prevRoot, + "root", thisRoot, + ) + + it, err := n.storageClient.GetDiff(n.ctx, *prevRoot, *thisRoot) if err != nil { return err } - if !more { - break - } + for { + more, err := it.Next() + if err != nil { + return err + } + if !more { + break + } - chunk, err := it.Value() - if err != nil { - return err + chunk, err := it.Value() + if err != nil { + return err + } + writeLog = append(writeLog, chunk) } - writeLog = append(writeLog, chunk) } } n.diffCh <- &fetchedDiff{ @@ -318,7 +341,7 @@ type inFlight struct { outstanding int } -func (n *Node) worker() { +func (n *Node) worker() { // nolint: gocyclo defer close(n.quitCh) defer close(n.diffCh) @@ -338,10 +361,20 @@ func (n *Node) worker() { genesisBlock, err := n.commonNode.Roothash.GetGenesisBlock(n.ctx, n.commonNode.RuntimeID) if err != nil { n.logger.Error("can't retrieve genesis block", "err", err) - panic("can't retrieve genesis block") + return } n.undefinedRound = genesisBlock.Header.Round - 1 + // Subscribe to pruned roothash blocks. + var pruneCh <-chan *roothashApi.PrunedBlock + var pruneSub *pubsub.Subscription + pruneCh, pruneSub, err = n.commonNode.Roothash.WatchPrunedBlocks() + if err != nil { + n.logger.Error("failed to watch pruned blocks", "err", err) + return + } + defer pruneSub.Close() + var fetcherGroup sync.WaitGroup n.syncedLock.RLock() @@ -368,14 +401,18 @@ mainLoop: for { if len(*outOfOrderDone) > 0 && cachedLastRound+1 == (*outOfOrderDone)[0].round { lastDiff := heap.Pop(outOfOrderDone).(*fetchedDiff) - // Check if we already had the writelog and apply it if not. + // Apply the write log if one exists. if lastDiff.writeLog != nil { _, err := n.localStorage.Apply(n.ctx, lastDiff.thisRoot.Namespace, lastDiff.prevRoot.Round, lastDiff.prevRoot.Hash, lastDiff.thisRoot.Round, lastDiff.thisRoot.Hash, lastDiff.writeLog) if err != nil { - n.logger.Error("can't apply write log", "err", err) + n.logger.Error("can't apply write log", + "err", err, + "prev_root", lastDiff.prevRoot, + "root", lastDiff.thisRoot, + ) } } @@ -388,11 +425,34 @@ mainLoop: summary := hashCache[lastDiff.round] delete(hashCache, lastDiff.round-1) + // Finalize storage for this round. + err := n.localStorage.Finalize(n.ctx, lastDiff.thisRoot.Namespace, lastDiff.round, []hash.Hash{ + summary.IORoot.Hash, + summary.StateRoot.Hash, + }) + switch err { + case nil: + n.logger.Debug("storage round finalized", + "round", lastDiff.round, + ) + case storageApi.ErrAlreadyFinalized: + // This can happen if we are restoring after a roothash migration or if + // we crashed before updating the sync state. + n.logger.Warn("storage round already finalized", + "round", lastDiff.round, + ) + default: + n.logger.Error("failed to finalize storage round", + "err", err, + "round", lastDiff.round, + ) + } + n.syncedLock.Lock() n.syncedState.LastBlock.Round = lastDiff.round n.syncedState.LastBlock.IORoot = summary.IORoot n.syncedState.LastBlock.StateRoot = summary.StateRoot - err := n.stateStore.Update(func(tx *bolt.Tx) error { + err = n.stateStore.Update(func(tx *bolt.Tx) error { bkt := tx.Bucket(n.bucketName) bytes := cbor.Marshal(&n.syncedState) return bkt.Put(n.commonNode.RuntimeID[:], bytes) @@ -408,6 +468,19 @@ mainLoop: } select { + case prunedBlk := <-pruneCh: + n.logger.Debug("pruning storage for round", "round", prunedBlk.Round) + + // Prune given block. + var ns common.Namespace + copy(ns[:], prunedBlk.RuntimeID[:]) + + if _, err := n.localStorage.Prune(n.ctx, ns, prunedBlk.Round); err != nil { + n.logger.Error("failed to prune block", + "err", err, + ) + continue mainLoop + } case inBlk := <-n.blockCh.Out(): blk := inBlk.(*block.Block) n.logger.Debug("incoming block", @@ -418,10 +491,12 @@ mainLoop: if _, ok := hashCache[cachedLastRound]; !ok && cachedLastRound == n.undefinedRound { dummy := blockSummary{ Namespace: blk.Header.Namespace, - Round: cachedLastRound, + Round: cachedLastRound + 1, } dummy.IORoot.Empty() + dummy.IORoot.Round = cachedLastRound + 1 dummy.StateRoot.Empty() + dummy.StateRoot.Round = cachedLastRound + 1 hashCache[cachedLastRound] = &dummy } for i := cachedLastRound; i < blk.Header.Round; i++ { diff --git a/go/worker/storage/worker.go b/go/worker/storage/worker.go index f42f531e88f..9968d68a035 100644 --- a/go/worker/storage/worker.go +++ b/go/worker/storage/worker.go @@ -170,6 +170,18 @@ func (s *Worker) Start() error { return nil } + // Wait for all runtimes to terminate. + go func() { + defer close(s.quitCh) + + for _, r := range s.runtimes { + <-r.Quit() + } + if s.fetchPool != nil { + <-s.fetchPool.Quit() + } + }() + // Wait for the node to be registered for the current epoch. go func() { s.logger.Info("starting storage worker, waiting for registration") @@ -194,19 +206,12 @@ func (s *Worker) Start() error { // Stop halts the service. func (s *Worker) Stop() { go func() { - defer close(s.quitCh) for _, r := range s.runtimes { r.Stop() } if s.fetchPool != nil { s.fetchPool.Stop() } - for _, r := range s.runtimes { - <-r.Quit() - } - if s.fetchPool != nil { - <-s.fetchPool.Quit() - } if s.watchState != nil { s.watchState.Close() }