From c0be9cbc495231a6f1ee698a1799a444f6b2fd0d Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Fri, 20 Dec 2019 11:55:18 +0100 Subject: [PATCH 1/5] Make storage per-runtime This commit removes the global node storage backend and instead replaces it with per-runtime storage backends. To support answering queries for multiple runtimes via gRPC a runtime registry-based storage router is added. --- go/oasis-node/cmd/node/node.go | 15 +--- go/oasis-node/node_test.go | 33 ++++++-- go/runtime/client/client.go | 41 +++++----- go/runtime/registry/registry.go | 68 ++++++++++++---- go/runtime/registry/storage_router.go | 108 ++++++++++++++++++++++++++ go/worker/common/committee/node.go | 3 +- go/worker/common/worker.go | 7 -- go/worker/storage/service_external.go | 23 +++--- go/worker/storage/worker.go | 12 ++- 9 files changed, 232 insertions(+), 78 deletions(-) create mode 100644 go/runtime/registry/storage_router.go diff --git a/go/oasis-node/cmd/node/node.go b/go/oasis-node/cmd/node/node.go index 1fe9413b018..5bb1ddf0f16 100644 --- a/go/oasis-node/cmd/node/node.go +++ b/go/oasis-node/cmd/node/node.go @@ -115,7 +115,6 @@ type Node struct { Scheduler scheduler.Backend Sentry sentryAPI.Backend Staking stakingAPI.Backend - Storage storageAPI.Backend IAS iasAPI.Endpoint KeyManager keymanagerAPI.Backend @@ -171,19 +170,11 @@ func (n *Node) RegistrationStopped() { } func (n *Node) initBackends() error { - dataDir := cmdCommon.DataDir() - var err error - if n.Sentry, err = sentry.New(n.Consensus); err != nil { return err } - if n.Storage, err = storage.New(n.svcMgr.Ctx, dataDir, n.Identity, n.Scheduler, n.Registry); err != nil { - return err - } - n.svcMgr.RegisterCleanupOnly(n.Storage, "storage backend") - if supplementarysanity.Enabled() { if err = supplementarysanity.New(n.svcTmnt); err != nil { return err @@ -195,7 +186,6 @@ func (n *Node) initBackends() error { scheduler.RegisterService(grpcSrv, n.Scheduler) registryAPI.RegisterService(grpcSrv, n.Registry) stakingAPI.RegisterService(grpcSrv, n.Staking) - storageAPI.RegisterService(grpcSrv, n.Storage) consensusAPI.RegisterService(grpcSrv, n.Consensus) cmdCommon.Logger().Debug("backends initialized") @@ -237,7 +227,6 @@ func (n *Node) initWorkers(logger *logging.Logger) error { dataDir, compute.Enabled() || workerStorage.Enabled() || txnscheduler.Enabled() || merge.Enabled() || workerKeymanager.Enabled(), n.Identity, - n.Storage, n.RootHash, n.Registry, n.Scheduler, @@ -683,11 +672,12 @@ func newNode(testNode bool) (*Node, error) { logger.Info("starting Oasis node") // Initialize the node's runtime registry. - node.RuntimeRegistry, err = runtimeRegistry.New(node.svcMgr.Ctx, cmdCommon.DataDir(), node.Consensus, node.Storage) + node.RuntimeRegistry, err = runtimeRegistry.New(node.svcMgr.Ctx, cmdCommon.DataDir(), node.Consensus, node.Identity) if err != nil { return nil, err } node.svcMgr.RegisterCleanupOnly(node.RuntimeRegistry, "runtime registry") + storageAPI.RegisterService(node.grpcInternal.Server(), node.RuntimeRegistry.StorageRouter()) // Initialize the key manager client service. node.KeyManagerClient, err = keymanagerClient.New(node.KeyManager, node.Registry, node.Identity) @@ -703,7 +693,6 @@ func newNode(testNode bool) (*Node, error) { node.svcMgr.Ctx, cmdCommon.DataDir(), node.RootHash, - node.Storage, node.Scheduler, node.Registry, node.svcTmnt, diff --git a/go/oasis-node/node_test.go b/go/oasis-node/node_test.go index 466e3260431..e4b11d019a4 100644 --- a/go/oasis-node/node_test.go +++ b/go/oasis-node/node_test.go @@ -362,12 +362,15 @@ func testBeacon(t *testing.T, node *testNode) { } func testStorage(t *testing.T, node *testNode) { - // Determine the current round. This is required so that we can commit into - // storage at the next (non-finalized) round. - blk, err := node.RootHash.GetLatestBlock(context.Background(), testRuntimeID, consensusAPI.HeightLatest) - require.NoError(t, err, "GetLatestBlock") + dataDir, err := ioutil.TempDir("", "oasis-storage-test_") + require.NoError(t, err, "TempDir") + defer os.RemoveAll(dataDir) - storageTests.StorageImplementationTests(t, node.Storage, testNamespace, blk.Header.Round+1) + storage, err := storage.New(context.Background(), dataDir, node.Identity, node.Scheduler, node.Registry) + require.NoError(t, err, "storage.New") + defer storage.Cleanup() + + storageTests.StorageImplementationTests(t, storage, testNamespace, 0) } func testRegistry(t *testing.T, node *testNode) { @@ -401,7 +404,15 @@ func testStakingClient(t *testing.T, node *testNode) { } func testRootHash(t *testing.T, node *testNode) { - roothashTests.RootHashImplementationTests(t, node.RootHash, node.Consensus, node.Storage) + dataDir, err := ioutil.TempDir("", "oasis-storage-test_") + require.NoError(t, err, "TempDir") + defer os.RemoveAll(dataDir) + + storage, err := storage.New(context.Background(), dataDir, node.Identity, node.Scheduler, node.Registry) + require.NoError(t, err, "storage.New") + defer storage.Cleanup() + + roothashTests.RootHashImplementationTests(t, node.RootHash, node.Consensus, storage) } func testComputeWorker(t *testing.T, node *testNode) { @@ -419,7 +430,15 @@ func testTransactionSchedulerWorker(t *testing.T, node *testNode) { timeSource := (node.Epochtime).(epochtime.SetableBackend) require.NotNil(t, node.txnschedulerCommitteeNode) - txnschedulerWorkerTests.WorkerImplementationTests(t, node.TransactionSchedulerWorker, node.runtimeID, node.txnschedulerCommitteeNode, timeSource, node.RootHash, node.Storage) + txnschedulerWorkerTests.WorkerImplementationTests( + t, + node.TransactionSchedulerWorker, + node.runtimeID, + node.txnschedulerCommitteeNode, + timeSource, + node.RootHash, + node.RuntimeRegistry.StorageRouter(), + ) } func testClient(t *testing.T, node *testNode) { diff --git a/go/runtime/client/client.go b/go/runtime/client/client.go index c2a061ca5a1..4f9d847e660 100644 --- a/go/runtime/client/client.go +++ b/go/runtime/client/client.go @@ -43,12 +43,13 @@ const ( ) type clientCommon struct { - roothash roothash.Backend - storage storage.Backend - scheduler scheduler.Backend - registry registry.Backend - consensus consensus.Backend - keyManager *keymanager.Client + roothash roothash.Backend + storage storage.Backend + scheduler scheduler.Backend + registry registry.Backend + consensus consensus.Backend + keyManager *keymanager.Client + runtimeRegistry runtimeRegistry.Registry ctx context.Context } @@ -67,8 +68,7 @@ func (c *submitContext) cancel() { type runtimeClient struct { sync.Mutex - common *clientCommon - runtimeRegistry runtimeRegistry.Registry + common *clientCommon watchers map[signature.PublicKey]*blockWatcher @@ -76,7 +76,7 @@ type runtimeClient struct { } func (c *runtimeClient) tagIndexer(runtimeID signature.PublicKey) (tagindexer.QueryableBackend, error) { - rt, err := c.runtimeRegistry.GetRuntime(runtimeID) + rt, err := c.common.runtimeRegistry.GetRuntime(runtimeID) if err != nil { return nil, err } @@ -221,7 +221,7 @@ func (c *runtimeClient) GetBlock(ctx context.Context, request *api.GetBlockReque return c.common.roothash.GetLatestBlock(ctx, request.RuntimeID, consensus.HeightLatest) } - rt, err := c.runtimeRegistry.GetRuntime(request.RuntimeID) + rt, err := c.common.runtimeRegistry.GetRuntime(request.RuntimeID) if err != nil { return nil, err } @@ -462,7 +462,6 @@ func New( ctx context.Context, dataDir string, roothash roothash.Backend, - storage storage.Backend, scheduler scheduler.Backend, registry registry.Backend, consensus consensus.Backend, @@ -471,17 +470,17 @@ func New( ) (api.RuntimeClient, error) { c := &runtimeClient{ common: &clientCommon{ - roothash: roothash, - storage: storage, - scheduler: scheduler, - registry: registry, - consensus: consensus, - keyManager: keyManager, - ctx: ctx, + roothash: roothash, + storage: runtimeRegistry.StorageRouter(), + scheduler: scheduler, + registry: registry, + consensus: consensus, + keyManager: keyManager, + runtimeRegistry: runtimeRegistry, + ctx: ctx, }, - runtimeRegistry: runtimeRegistry, - watchers: make(map[signature.PublicKey]*blockWatcher), - logger: logging.GetLogger("runtime/client"), + watchers: make(map[signature.PublicKey]*blockWatcher), + logger: logging.GetLogger("runtime/client"), } return c, nil } diff --git a/go/runtime/registry/registry.go b/go/runtime/registry/registry.go index 8fb7b488af2..c64fe01ad0a 100644 --- a/go/runtime/registry/registry.go +++ b/go/runtime/registry/registry.go @@ -11,12 +11,14 @@ import ( "github.com/spf13/viper" "github.com/oasislabs/oasis-core/go/common/crypto/signature" + "github.com/oasislabs/oasis-core/go/common/identity" "github.com/oasislabs/oasis-core/go/common/logging" consensus "github.com/oasislabs/oasis-core/go/consensus/api" registry "github.com/oasislabs/oasis-core/go/registry/api" "github.com/oasislabs/oasis-core/go/runtime/history" "github.com/oasislabs/oasis-core/go/runtime/tagindexer" - storage "github.com/oasislabs/oasis-core/go/storage/api" + "github.com/oasislabs/oasis-core/go/storage" + storageAPI "github.com/oasislabs/oasis-core/go/storage/api" ) const ( @@ -37,6 +39,11 @@ type Registry interface { // registry. NewUnmanagedRuntime(runtimeID signature.PublicKey) Runtime + // StorageRouter returns a storage backend which routes requests to the + // correct per-runtime storage backend based on the namespace contained + // in the request. + StorageRouter() storageAPI.Backend + // Cleanup performs post-termination cleanup. Cleanup() } @@ -55,6 +62,9 @@ type Runtime interface { // TagIndexer returns the tag indexer backend. TagIndexer() tagindexer.QueryableBackend + + // Storage returns the per-runtime storage backend. + Storage() storageAPI.Backend } type runtime struct { @@ -62,6 +72,7 @@ type runtime struct { descriptor *registry.Runtime consensus consensus.Backend + storage storageAPI.Backend history history.History tagIndexer *tagindexer.Service @@ -103,6 +114,10 @@ func (r *runtime) TagIndexer() tagindexer.QueryableBackend { return r.tagIndexer } +func (r *runtime) Storage() storageAPI.Backend { + return r.storage +} + type runtimeRegistry struct { sync.RWMutex @@ -110,7 +125,7 @@ type runtimeRegistry struct { dataDir string consensus consensus.Backend - storage storage.Backend + identity *identity.Identity runtimes map[signature.PublicKey]*runtime } @@ -145,11 +160,17 @@ func (r *runtimeRegistry) NewUnmanagedRuntime(runtimeID signature.PublicKey) Run } } +func (r *runtimeRegistry) StorageRouter() storageAPI.Backend { + return &storageRouter{registry: r} +} + func (r *runtimeRegistry) Cleanup() { r.Lock() defer r.Unlock() for _, rt := range r.runtimes { + // Close storage backend. + rt.storage.Cleanup() // Close tag indexer service. rt.tagIndexer.Stop() <-rt.tagIndexer.Quit() @@ -175,12 +196,21 @@ func (r *runtimeRegistry) addSupportedRuntime(ctx context.Context, id signature. return err } + // Create runtime history keeper. history, err := history.New(path, id, &cfg.History) if err != nil { return fmt.Errorf("runtime/registry: cannot create block history for runtime %s: %w", id, err) } - tagIndexer, err := tagindexer.New(path, cfg.TagIndexer, history, r.consensus.RootHash(), r.storage) + // Create runtime-specific storage backend. + // TODO: Pass runtime identifier. + storageBackend, err := storage.New(ctx, path, r.identity, r.consensus.Scheduler(), r.consensus.Registry()) + if err != nil { + return fmt.Errorf("runtime/registry: cannot create storage for runtime %s: %w", id, err) + } + + // Create runtime tag indexer. + tagIndexer, err := tagindexer.New(path, cfg.TagIndexer, history, r.consensus.RootHash(), storageBackend) if err != nil { return fmt.Errorf("runtime/registry: cannot create tag indexer for runtime %s: %w", id, err) } @@ -188,9 +218,22 @@ func (r *runtimeRegistry) addSupportedRuntime(ctx context.Context, id signature. return fmt.Errorf("runtime/registry: failed to start tag indexer for runtime %s: %w", id, err) } + // If using a storage client, it should watch the configured runtimes. + // TODO: This should be done automatically by the storage backend when we add a runtime parameter. + if storageClient, ok := storageBackend.(storageAPI.ClientBackend); ok { + if err = storageClient.WatchRuntime(id); err != nil { + r.logger.Warn("error watching storage runtime, expected if using metricswrapper with local backend", + "err", err, + ) + } + } else { + r.logger.Info("not watching storage runtime since not using a storage client backend") + } + r.runtimes[id] = &runtime{ id: id, consensus: r.consensus, + storage: storageBackend, history: history, tagIndexer: tagIndexer, } @@ -200,27 +243,16 @@ func (r *runtimeRegistry) addSupportedRuntime(ctx context.Context, id signature. return fmt.Errorf("runtime/registry: cannot track runtime %s: %w", id, err) } - // If using a storage client, it should watch the configured runtimes. - if storageClient, ok := r.storage.(storage.ClientBackend); ok { - if err := storageClient.WatchRuntime(id); err != nil { - r.logger.Warn("error watching storage runtime, expected if using metricswrapper with local backend", - "err", err, - ) - } - } else { - r.logger.Info("not watching storage runtime since not using a storage client backend") - } - return nil } // New creates a new runtime registry. -func New(ctx context.Context, dataDir string, consensus consensus.Backend, storage storage.Backend) (Registry, error) { +func New(ctx context.Context, dataDir string, consensus consensus.Backend, identity *identity.Identity) (Registry, error) { r := &runtimeRegistry{ logger: logging.GetLogger("runtime/registry"), dataDir: dataDir, consensus: consensus, - storage: storage, + identity: identity, runtimes: make(map[signature.PublicKey]*runtime), } @@ -247,5 +279,9 @@ func New(ctx context.Context, dataDir string, consensus consensus.Backend, stora } } + if len(runtimes) == 0 { + r.logger.Info("no supported runtimes configured") + } + return r, nil } diff --git a/go/runtime/registry/storage_router.go b/go/runtime/registry/storage_router.go new file mode 100644 index 00000000000..26bc576b5c9 --- /dev/null +++ b/go/runtime/registry/storage_router.go @@ -0,0 +1,108 @@ +package registry + +import ( + "context" + + "github.com/oasislabs/oasis-core/go/common" + "github.com/oasislabs/oasis-core/go/storage/api" +) + +var _ api.Backend = (*storageRouter)(nil) + +type storageRouter struct { + registry Registry +} + +func (sr *storageRouter) getRuntime(ns common.Namespace) (Runtime, error) { + id, err := ns.ToRuntimeID() + if err != nil { + return nil, err + } + return sr.registry.GetRuntime(id) +} + +func (sr *storageRouter) SyncGet(ctx context.Context, request *api.GetRequest) (*api.ProofResponse, error) { + rt, err := sr.getRuntime(request.Tree.Root.Namespace) + if err != nil { + return nil, err + } + return rt.Storage().SyncGet(ctx, request) +} + +func (sr *storageRouter) SyncGetPrefixes(ctx context.Context, request *api.GetPrefixesRequest) (*api.ProofResponse, error) { + rt, err := sr.getRuntime(request.Tree.Root.Namespace) + if err != nil { + return nil, err + } + return rt.Storage().SyncGetPrefixes(ctx, request) +} + +func (sr *storageRouter) SyncIterate(ctx context.Context, request *api.IterateRequest) (*api.ProofResponse, error) { + rt, err := sr.getRuntime(request.Tree.Root.Namespace) + if err != nil { + return nil, err + } + return rt.Storage().SyncIterate(ctx, request) +} + +func (sr *storageRouter) Apply(ctx context.Context, request *api.ApplyRequest) ([]*api.Receipt, error) { + rt, err := sr.getRuntime(request.Namespace) + if err != nil { + return nil, err + } + return rt.Storage().Apply(ctx, request) +} + +func (sr *storageRouter) ApplyBatch(ctx context.Context, request *api.ApplyBatchRequest) ([]*api.Receipt, error) { + rt, err := sr.getRuntime(request.Namespace) + if err != nil { + return nil, err + } + return rt.Storage().ApplyBatch(ctx, request) +} + +func (sr *storageRouter) Merge(ctx context.Context, request *api.MergeRequest) ([]*api.Receipt, error) { + rt, err := sr.getRuntime(request.Namespace) + if err != nil { + return nil, err + } + return rt.Storage().Merge(ctx, request) +} + +func (sr *storageRouter) MergeBatch(ctx context.Context, request *api.MergeBatchRequest) ([]*api.Receipt, error) { + rt, err := sr.getRuntime(request.Namespace) + if err != nil { + return nil, err + } + return rt.Storage().MergeBatch(ctx, request) +} + +func (sr *storageRouter) GetDiff(ctx context.Context, request *api.GetDiffRequest) (api.WriteLogIterator, error) { + rt, err := sr.getRuntime(request.StartRoot.Namespace) + if err != nil { + return nil, err + } + return rt.Storage().GetDiff(ctx, request) +} + +func (sr *storageRouter) GetCheckpoint(ctx context.Context, request *api.GetCheckpointRequest) (api.WriteLogIterator, error) { + rt, err := sr.getRuntime(request.Root.Namespace) + if err != nil { + return nil, err + } + return rt.Storage().GetCheckpoint(ctx, request) +} + +func (sr *storageRouter) Cleanup() { +} + +func (sr *storageRouter) Initialized() <-chan struct{} { + ch := make(chan struct{}) + go func() { + defer close(ch) + for _, rt := range sr.registry.Runtimes() { + <-rt.Storage().Initialized() + } + }() + return ch +} diff --git a/go/worker/common/committee/node.go b/go/worker/common/committee/node.go index 0ac3f670663..4e60cc21895 100644 --- a/go/worker/common/committee/node.go +++ b/go/worker/common/committee/node.go @@ -314,7 +314,6 @@ func NewNode( keymanager keymanagerApi.Backend, keymanagerClient *keymanagerClient.Client, localStorage *host.LocalStorage, - storage storage.Backend, roothash roothash.Backend, registry registry.Backend, scheduler scheduler.Backend, @@ -333,7 +332,7 @@ func NewNode( KeyManager: keymanager, KeyManagerClient: keymanagerClient, LocalStorage: localStorage, - Storage: storage, + Storage: runtime.Storage(), Roothash: roothash, Registry: registry, Scheduler: scheduler, diff --git a/go/worker/common/worker.go b/go/worker/common/worker.go index 94a77c87920..7b958b79ea5 100644 --- a/go/worker/common/worker.go +++ b/go/worker/common/worker.go @@ -16,7 +16,6 @@ import ( roothash "github.com/oasislabs/oasis-core/go/roothash/api" runtimeRegistry "github.com/oasislabs/oasis-core/go/runtime/registry" scheduler "github.com/oasislabs/oasis-core/go/scheduler/api" - storage "github.com/oasislabs/oasis-core/go/storage/api" "github.com/oasislabs/oasis-core/go/worker/common/committee" "github.com/oasislabs/oasis-core/go/worker/common/host" "github.com/oasislabs/oasis-core/go/worker/common/p2p" @@ -31,7 +30,6 @@ type Worker struct { cfg Config Identity *identity.Identity - Storage storage.Backend Roothash roothash.Backend Registry registry.Backend Scheduler scheduler.Backend @@ -198,7 +196,6 @@ func (w *Worker) NewUnmanagedCommitteeNode(runtime runtimeRegistry.Runtime, enab w.KeyManager, w.KeyManagerClient, w.LocalStorage, - w.Storage, w.Roothash, w.Registry, w.Scheduler, @@ -230,7 +227,6 @@ func newWorker( dataDir string, enabled bool, identity *identity.Identity, - storageBackend storage.Backend, roothash roothash.Backend, registryInst registry.Backend, scheduler scheduler.Backend, @@ -248,7 +244,6 @@ func newWorker( enabled: enabled, cfg: cfg, Identity: identity, - Storage: storageBackend, Roothash: roothash, Registry: registryInst, Scheduler: scheduler, @@ -294,7 +289,6 @@ func New( dataDir string, enabled bool, identity *identity.Identity, - storage storage.Backend, roothash roothash.Backend, registry registry.Backend, scheduler scheduler.Backend, @@ -326,7 +320,6 @@ func New( dataDir, enabled, identity, - storage, roothash, registry, scheduler, diff --git a/go/worker/storage/service_external.go b/go/worker/storage/service_external.go index 9523a095f91..ecc057a0c8b 100644 --- a/go/worker/storage/service_external.go +++ b/go/worker/storage/service_external.go @@ -11,7 +11,8 @@ import ( // storageService is the service exposed to external clients via gRPC. type storageService struct { - w *Worker + w *Worker + storage api.Backend debugRejectUpdates bool } @@ -39,21 +40,21 @@ func (s *storageService) SyncGet(ctx context.Context, request *api.GetRequest) ( if err := s.ensureInitialized(ctx); err != nil { return nil, err } - return s.w.commonWorker.Storage.SyncGet(ctx, request) + return s.storage.SyncGet(ctx, request) } func (s *storageService) SyncGetPrefixes(ctx context.Context, request *api.GetPrefixesRequest) (*api.ProofResponse, error) { if err := s.ensureInitialized(ctx); err != nil { return nil, err } - return s.w.commonWorker.Storage.SyncGetPrefixes(ctx, request) + return s.storage.SyncGetPrefixes(ctx, request) } func (s *storageService) SyncIterate(ctx context.Context, request *api.IterateRequest) (*api.ProofResponse, error) { if err := s.ensureInitialized(ctx); err != nil { return nil, err } - return s.w.commonWorker.Storage.SyncIterate(ctx, request) + return s.storage.SyncIterate(ctx, request) } func (s *storageService) Apply(ctx context.Context, request *api.ApplyRequest) ([]*api.Receipt, error) { @@ -63,7 +64,7 @@ func (s *storageService) Apply(ctx context.Context, request *api.ApplyRequest) ( if err := s.ensureInitialized(ctx); err != nil { return nil, err } - return s.w.commonWorker.Storage.Apply(ctx, request) + return s.storage.Apply(ctx, request) } func (s *storageService) ApplyBatch(ctx context.Context, request *api.ApplyBatchRequest) ([]*api.Receipt, error) { @@ -73,7 +74,7 @@ func (s *storageService) ApplyBatch(ctx context.Context, request *api.ApplyBatch if err := s.ensureInitialized(ctx); err != nil { return nil, err } - return s.w.commonWorker.Storage.ApplyBatch(ctx, request) + return s.storage.ApplyBatch(ctx, request) } func (s *storageService) Merge(ctx context.Context, request *api.MergeRequest) ([]*api.Receipt, error) { @@ -83,7 +84,7 @@ func (s *storageService) Merge(ctx context.Context, request *api.MergeRequest) ( if err := s.ensureInitialized(ctx); err != nil { return nil, err } - return s.w.commonWorker.Storage.Merge(ctx, request) + return s.storage.Merge(ctx, request) } func (s *storageService) MergeBatch(ctx context.Context, request *api.MergeBatchRequest) ([]*api.Receipt, error) { @@ -93,7 +94,7 @@ func (s *storageService) MergeBatch(ctx context.Context, request *api.MergeBatch if err := s.ensureInitialized(ctx); err != nil { return nil, err } - return s.w.commonWorker.Storage.MergeBatch(ctx, request) + return s.storage.MergeBatch(ctx, request) } func (s *storageService) GetDiff(ctx context.Context, request *api.GetDiffRequest) (api.WriteLogIterator, error) { @@ -103,7 +104,7 @@ func (s *storageService) GetDiff(ctx context.Context, request *api.GetDiffReques if err := s.ensureInitialized(ctx); err != nil { return nil, err } - return s.w.commonWorker.Storage.GetDiff(ctx, request) + return s.storage.GetDiff(ctx, request) } func (s *storageService) GetCheckpoint(ctx context.Context, request *api.GetCheckpointRequest) (api.WriteLogIterator, error) { @@ -113,12 +114,12 @@ func (s *storageService) GetCheckpoint(ctx context.Context, request *api.GetChec if err := s.ensureInitialized(ctx); err != nil { return nil, err } - return s.w.commonWorker.Storage.GetCheckpoint(ctx, request) + return s.storage.GetCheckpoint(ctx, request) } func (s *storageService) Cleanup() { } func (s *storageService) Initialized() <-chan struct{} { - return s.w.commonWorker.Storage.Initialized() + return s.storage.Initialized() } diff --git a/go/worker/storage/worker.go b/go/worker/storage/worker.go index cdf6b8113c8..b074e895df4 100644 --- a/go/worker/storage/worker.go +++ b/go/worker/storage/worker.go @@ -118,6 +118,7 @@ func New( s.grpcPolicy = grpc.NewDynamicRuntimePolicyChecker() api.RegisterService(s.commonWorker.Grpc.Server(), &storageService{ w: s, + storage: s.commonWorker.RuntimeRegistry.StorageRouter(), debugRejectUpdates: viper.GetBool(CfgWorkerDebugIgnoreApply) && flags.DebugDontBlameOasis(), }) @@ -256,11 +257,20 @@ func (s *Worker) initGenesis(gen *genesis.Document) error { return err } + regRt, err := s.commonWorker.RuntimeRegistry.GetRuntime(rt.ID) + if err != nil { + // Skip unsupported runtimes. + s.logger.Warn("skipping unsupported runtime", + "runtime_id", rt.ID, + ) + continue + } + if rt.Genesis.State != nil { var ns common.Namespace copy(ns[:], rt.ID[:]) - _, err = s.commonWorker.Storage.Apply(ctx, &api.ApplyRequest{ + _, err = regRt.Storage().Apply(ctx, &api.ApplyRequest{ Namespace: ns, SrcRound: 0, SrcRoot: emptyRoot, From c9219dcdbd03b5b4965e427f414d5899cce35ace Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Fri, 20 Dec 2019 12:54:21 +0100 Subject: [PATCH 2/5] Make untrusted local storage per-runtime --- go/oasis-node/cmd/node/unsafe_reset.go | 17 ++++---- go/oasis-test-runner/oasis/keymanager.go | 6 --- go/runtime/history/history.go | 5 ++- .../localstorage/localstorage.go} | 40 ++++++++++++------- go/runtime/registry/registry.go | 35 ++++++++++++---- go/worker/common/committee/node.go | 4 -- go/worker/common/runtime_host.go | 13 +++--- go/worker/common/worker.go | 21 ---------- go/worker/keymanager/handler.go | 12 +++--- go/worker/keymanager/init.go | 14 ++++++- 10 files changed, 94 insertions(+), 73 deletions(-) rename go/{worker/common/host/local_storage.go => runtime/localstorage/localstorage.go} (68%) diff --git a/go/oasis-node/cmd/node/unsafe_reset.go b/go/oasis-node/cmd/node/unsafe_reset.go index 5392d70b316..23551994cc7 100644 --- a/go/oasis-node/cmd/node/unsafe_reset.go +++ b/go/oasis-node/cmd/node/unsafe_reset.go @@ -12,6 +12,7 @@ import ( "github.com/oasislabs/oasis-core/go/consensus/tendermint" cmdCommon "github.com/oasislabs/oasis-core/go/oasis-node/cmd/common" cmdFlags "github.com/oasislabs/oasis-core/go/oasis-node/cmd/common/flags" + "github.com/oasislabs/oasis-core/go/runtime/history" runtimeRegistry "github.com/oasislabs/oasis-core/go/runtime/registry" ) @@ -34,15 +35,17 @@ var ( Run: doUnsafeReset, } + runtimesGlob = filepath.Join(runtimeRegistry.RuntimesDir, "*") + nodeStateGlobs = []string{ "abci-mux-state.*.db", "persistent-store.*.db", tendermint.StateDir, - runtimeRegistry.RuntimesDir, + filepath.Join(runtimesGlob, history.DbFilename), } - localStorageGlob = "worker-local-storage.*.db" - mkvsDatabaseGlob = "mkvs_storage.*.db" + runtimeLocalStorageGlob = filepath.Join(runtimesGlob, "worker-local-storage.*.db") + runtimeMkvsDatabaseGlob = filepath.Join(runtimesGlob, "mkvs_storage.*.db") logger = logging.GetLogger("cmd/unsafe-reset") ) @@ -74,12 +77,12 @@ func doUnsafeReset(cmd *cobra.Command, args []string) { if viper.GetBool(CfgPreserveLocalStorage) { logger.Info("preserving untrusted local storage") } else { - globs = append(globs, localStorageGlob) + globs = append(globs, filepath.Join(runtimesGlob, runtimeLocalStorageGlob)) } if viper.GetBool(CfgPreserveMKVSDatabase) { logger.Info("preserving MKVS database") } else { - globs = append(globs, mkvsDatabaseGlob) + globs = append(globs, filepath.Join(runtimesGlob, runtimeMkvsDatabaseGlob)) } // Enumerate the locations to purge. @@ -126,7 +129,7 @@ func doUnsafeReset(cmd *cobra.Command, args []string) { } func init() { - unsafeResetFlags.Bool(CfgPreserveLocalStorage, false, "preserve untrusted local storage") - unsafeResetFlags.Bool(CfgPreserveMKVSDatabase, false, "preserve MKVS database") + unsafeResetFlags.Bool(CfgPreserveLocalStorage, false, "preserve per-runtime untrusted local storage") + unsafeResetFlags.Bool(CfgPreserveMKVSDatabase, false, "preserve per-runtime MKVS database") _ = viper.BindPFlags(unsafeResetFlags) } diff --git a/go/oasis-test-runner/oasis/keymanager.go b/go/oasis-test-runner/oasis/keymanager.go index 865cef0e325..39c4390b0e7 100644 --- a/go/oasis-test-runner/oasis/keymanager.go +++ b/go/oasis-test-runner/oasis/keymanager.go @@ -8,7 +8,6 @@ import ( "github.com/oasislabs/oasis-core/go/common/node" registry "github.com/oasislabs/oasis-core/go/registry/api" - workerCommon "github.com/oasislabs/oasis-core/go/worker/common" ) const ( @@ -65,11 +64,6 @@ func (km *Keymanager) ExportsPath() string { return nodeExportsPath(km.dir) } -// LocalStoragePath returns the path to the node's local storage. -func (km *Keymanager) LocalStoragePath() string { - return filepath.Join(km.dir.String(), workerCommon.LocalStorageFile) -} - func (km *Keymanager) provisionGenesis() error { // Provision status and policy. We can only provision this here as we need // a list of runtimes allowed to query the key manager. diff --git a/go/runtime/history/history.go b/go/runtime/history/history.go index e130d8d4b7f..07d9e1dd2e7 100644 --- a/go/runtime/history/history.go +++ b/go/runtime/history/history.go @@ -15,7 +15,8 @@ import ( "github.com/oasislabs/oasis-core/go/roothash/api/block" ) -const dbFilename = "history.db" +// DbFilename is the filename of the history database. +const DbFilename = "history.db" var ( errNopHistory = errors.New("runtime/history: not supported") @@ -191,7 +192,7 @@ func (h *runtimeHistory) pruneWorker() { // New creates a new runtime history keeper. func New(dataDir string, runtimeID signature.PublicKey, cfg *Config) (History, error) { - db, err := newDB(filepath.Join(dataDir, dbFilename), runtimeID) + db, err := newDB(filepath.Join(dataDir, DbFilename), runtimeID) if err != nil { return nil, err } diff --git a/go/worker/common/host/local_storage.go b/go/runtime/localstorage/localstorage.go similarity index 68% rename from go/worker/common/host/local_storage.go rename to go/runtime/localstorage/localstorage.go index 9dd5d23723b..bf8512ebfab 100644 --- a/go/worker/common/host/local_storage.go +++ b/go/runtime/localstorage/localstorage.go @@ -1,4 +1,6 @@ -package host +// Package localstorage implements untrusted local storage that is used +// by runtimes to store per-node key/value pairs. +package localstorage import ( "encoding/hex" @@ -12,31 +14,42 @@ import ( cmnBadger "github.com/oasislabs/oasis-core/go/common/badger" "github.com/oasislabs/oasis-core/go/common/cbor" "github.com/oasislabs/oasis-core/go/common/crypto/signature" - "github.com/oasislabs/oasis-core/go/common/keyformat" "github.com/oasislabs/oasis-core/go/common/logging" ) var ( errInvalidKey = errors.New("invalid local storage key") - runtimeKeyFmt = keyformat.New(0x00, &signature.PublicKey{}, []byte{}) + _ LocalStorage = (*localStorage)(nil) ) -type LocalStorage struct { +// LocalStorage is the untrusted local storage interface. +type LocalStorage interface { + // Get retrieves a previously stored value under the given key. + Get(key []byte) ([]byte, error) + + // Set sets a key to a specific value. + Set(key, value []byte) error + + // Stop stops local storage. + Stop() +} + +type localStorage struct { logger *logging.Logger db *badger.DB gc *cmnBadger.GCWorker } -func (s *LocalStorage) Get(id signature.PublicKey, key []byte) ([]byte, error) { +func (s *localStorage) Get(key []byte) ([]byte, error) { if len(key) == 0 { return nil, errInvalidKey } var value []byte if err := s.db.View(func(tx *badger.Txn) error { - item, txErr := tx.Get(runtimeKeyFmt.Encode(&id, key)) + item, txErr := tx.Get(key) switch txErr { case nil: case badger.ErrKeyNotFound: @@ -52,7 +65,6 @@ func (s *LocalStorage) Get(id signature.PublicKey, key []byte) ([]byte, error) { }); err != nil { s.logger.Error("failed get", "err", err, - "id", id, "key", hex.EncodeToString(key), ) return nil, err @@ -61,17 +73,16 @@ func (s *LocalStorage) Get(id signature.PublicKey, key []byte) ([]byte, error) { return cbor.FixSliceForSerde(value), nil } -func (s *LocalStorage) Set(id signature.PublicKey, key, value []byte) error { +func (s *localStorage) Set(key, value []byte) error { if len(key) == 0 { return errInvalidKey } if err := s.db.Update(func(tx *badger.Txn) error { - return tx.Set(runtimeKeyFmt.Encode(&id, key), value) + return tx.Set(key, value) }); err != nil { s.logger.Error("failed put", "err", err, - "id", id, "key", hex.EncodeToString(key), "value", hex.EncodeToString(value), ) @@ -81,7 +92,7 @@ func (s *LocalStorage) Set(id signature.PublicKey, key, value []byte) error { return nil } -func (s *LocalStorage) Stop() { +func (s *localStorage) Stop() { s.gc.Close() if err := s.db.Close(); err != nil { s.logger.Error("failed to close local storage", @@ -91,9 +102,10 @@ func (s *LocalStorage) Stop() { s.db = nil } -func NewLocalStorage(dataDir, fn string) (*LocalStorage, error) { - s := &LocalStorage{ - logger: logging.GetLogger("worker/common/host/localStorage"), +// New creates new untrusted local storage. +func New(dataDir, fn string, runtimeID signature.PublicKey) (LocalStorage, error) { + s := &localStorage{ + logger: logging.GetLogger("runtime/localstorage").With("runtime_id", runtimeID), } opts := badger.DefaultOptions(filepath.Join(dataDir, fn)) diff --git a/go/runtime/registry/registry.go b/go/runtime/registry/registry.go index c64fe01ad0a..2285d9eb906 100644 --- a/go/runtime/registry/registry.go +++ b/go/runtime/registry/registry.go @@ -16,6 +16,7 @@ import ( consensus "github.com/oasislabs/oasis-core/go/consensus/api" registry "github.com/oasislabs/oasis-core/go/registry/api" "github.com/oasislabs/oasis-core/go/runtime/history" + "github.com/oasislabs/oasis-core/go/runtime/localstorage" "github.com/oasislabs/oasis-core/go/runtime/tagindexer" "github.com/oasislabs/oasis-core/go/storage" storageAPI "github.com/oasislabs/oasis-core/go/storage/api" @@ -25,6 +26,9 @@ const ( // MaxRuntimeCount is the maximum number of runtimes that can be supported // by a single node. MaxRuntimeCount = 64 + + // LocalStorageFile is the filename of the worker's local storage database. + LocalStorageFile = "worker-local-storage.badger.db" ) // Registry is the running node's runtime registry interface. @@ -65,14 +69,18 @@ type Runtime interface { // Storage returns the per-runtime storage backend. Storage() storageAPI.Backend + + // LocalStorage returns the per-runtime local storage. + LocalStorage() localstorage.LocalStorage } type runtime struct { id signature.PublicKey descriptor *registry.Runtime - consensus consensus.Backend - storage storageAPI.Backend + consensus consensus.Backend + storage storageAPI.Backend + localStorage localstorage.LocalStorage history history.History tagIndexer *tagindexer.Service @@ -118,6 +126,10 @@ func (r *runtime) Storage() storageAPI.Backend { return r.storage } +func (r *runtime) LocalStorage() localstorage.LocalStorage { + return r.localStorage +} + type runtimeRegistry struct { sync.RWMutex @@ -169,6 +181,8 @@ func (r *runtimeRegistry) Cleanup() { defer r.Unlock() for _, rt := range r.runtimes { + // Close local storage backend. + rt.localStorage.Stop() // Close storage backend. rt.storage.Cleanup() // Close tag indexer service. @@ -202,6 +216,12 @@ func (r *runtimeRegistry) addSupportedRuntime(ctx context.Context, id signature. return fmt.Errorf("runtime/registry: cannot create block history for runtime %s: %w", id, err) } + // Create runtime-specific local storage backend. + localStorage, err := localstorage.New(path, LocalStorageFile, id) + if err != nil { + return fmt.Errorf("runtime/registry: cannot create local storage for runtime %s: %w", id, err) + } + // Create runtime-specific storage backend. // TODO: Pass runtime identifier. storageBackend, err := storage.New(ctx, path, r.identity, r.consensus.Scheduler(), r.consensus.Registry()) @@ -231,11 +251,12 @@ func (r *runtimeRegistry) addSupportedRuntime(ctx context.Context, id signature. } r.runtimes[id] = &runtime{ - id: id, - consensus: r.consensus, - storage: storageBackend, - history: history, - tagIndexer: tagIndexer, + id: id, + consensus: r.consensus, + storage: storageBackend, + localStorage: localStorage, + history: history, + tagIndexer: tagIndexer, } // Start tracking this runtime. diff --git a/go/worker/common/committee/node.go b/go/worker/common/committee/node.go index 4e60cc21895..f6d4d2af4c2 100644 --- a/go/worker/common/committee/node.go +++ b/go/worker/common/committee/node.go @@ -18,7 +18,6 @@ import ( runtimeRegistry "github.com/oasislabs/oasis-core/go/runtime/registry" scheduler "github.com/oasislabs/oasis-core/go/scheduler/api" storage "github.com/oasislabs/oasis-core/go/storage/api" - "github.com/oasislabs/oasis-core/go/worker/common/host" "github.com/oasislabs/oasis-core/go/worker/common/p2p" ) @@ -82,7 +81,6 @@ type Node struct { Identity *identity.Identity KeyManager keymanagerApi.Backend KeyManagerClient *keymanagerClient.Client - LocalStorage *host.LocalStorage Storage storage.Backend Roothash roothash.Backend Registry registry.Backend @@ -313,7 +311,6 @@ func NewNode( identity *identity.Identity, keymanager keymanagerApi.Backend, keymanagerClient *keymanagerClient.Client, - localStorage *host.LocalStorage, roothash roothash.Backend, registry registry.Backend, scheduler scheduler.Backend, @@ -331,7 +328,6 @@ func NewNode( Identity: identity, KeyManager: keymanager, KeyManagerClient: keymanagerClient, - LocalStorage: localStorage, Storage: runtime.Storage(), Roothash: roothash, Registry: registry, diff --git a/go/worker/common/runtime_host.go b/go/worker/common/runtime_host.go index 157df94eced..aaff4a55542 100644 --- a/go/worker/common/runtime_host.go +++ b/go/worker/common/runtime_host.go @@ -15,6 +15,7 @@ import ( keymanagerApi "github.com/oasislabs/oasis-core/go/keymanager/api" keymanagerClient "github.com/oasislabs/oasis-core/go/keymanager/client" registry "github.com/oasislabs/oasis-core/go/registry/api" + "github.com/oasislabs/oasis-core/go/runtime/localstorage" storage "github.com/oasislabs/oasis-core/go/storage/api" "github.com/oasislabs/oasis-core/go/worker/common/committee" "github.com/oasislabs/oasis-core/go/worker/common/host" @@ -36,7 +37,7 @@ type runtimeHostHandler struct { storage storage.Backend keyManager keymanagerApi.Backend keyManagerClient *keymanagerClient.Client - localStorage *host.LocalStorage + localStorage localstorage.LocalStorage } func (h *runtimeHostHandler) Handle(ctx context.Context, body *protocol.Body) (*protocol.Body, error) { @@ -100,14 +101,14 @@ func (h *runtimeHostHandler) Handle(ctx context.Context, body *protocol.Body) (* } // Local storage. if body.HostLocalStorageGetRequest != nil { - value, err := h.localStorage.Get(h.runtime.ID, body.HostLocalStorageGetRequest.Key) + value, err := h.localStorage.Get(body.HostLocalStorageGetRequest.Key) if err != nil { return nil, err } return &protocol.Body{HostLocalStorageGetResponse: &protocol.HostLocalStorageGetResponse{Value: value}}, nil } if body.HostLocalStorageSetRequest != nil { - if err := h.localStorage.Set(h.runtime.ID, body.HostLocalStorageSetRequest.Key, body.HostLocalStorageSetRequest.Value); err != nil { + if err := h.localStorage.Set(body.HostLocalStorageSetRequest.Key, body.HostLocalStorageSetRequest.Value); err != nil { return nil, err } return &protocol.Body{HostLocalStorageSetResponse: &protocol.Empty{}}, nil @@ -122,7 +123,7 @@ func NewRuntimeHostHandler( storage storage.Backend, keyManager keymanagerApi.Backend, keyManagerClient *keymanagerClient.Client, - localStorage *host.LocalStorage, + localStorage localstorage.LocalStorage, ) protocol.Handler { return &runtimeHostHandler{runtime, storage, keyManager, keyManagerClient, localStorage} } @@ -220,10 +221,10 @@ func (n *RuntimeHostNode) InitializeRuntimeWorkerHost(ctx context.Context) error TEEHardware: rt.TEEHardware, MessageHandler: NewRuntimeHostHandler( rt, - n.commonNode.Storage, + n.commonNode.Runtime.Storage(), n.commonNode.KeyManager, n.commonNode.KeyManagerClient, - n.commonNode.LocalStorage, + n.commonNode.Runtime.LocalStorage(), ), } workerHost, err := n.workerHostFactory.NewWorkerHost(cfg) diff --git a/go/worker/common/worker.go b/go/worker/common/worker.go index 7b958b79ea5..66ba23470d4 100644 --- a/go/worker/common/worker.go +++ b/go/worker/common/worker.go @@ -17,13 +17,9 @@ import ( runtimeRegistry "github.com/oasislabs/oasis-core/go/runtime/registry" scheduler "github.com/oasislabs/oasis-core/go/scheduler/api" "github.com/oasislabs/oasis-core/go/worker/common/committee" - "github.com/oasislabs/oasis-core/go/worker/common/host" "github.com/oasislabs/oasis-core/go/worker/common/p2p" ) -// LocalStorageFile is the filename of the worker's local storage database. -const LocalStorageFile = "worker-local-storage.badger.db" - // Worker is a garbage bag with lower level services and common runtime objects. type Worker struct { enabled bool @@ -39,7 +35,6 @@ type Worker struct { IAS ias.Endpoint KeyManager keymanagerApi.Backend KeyManagerClient *keymanagerClient.Client - LocalStorage *host.LocalStorage RuntimeRegistry runtimeRegistry.Registry GenesisDoc *genesis.Document @@ -117,10 +112,6 @@ func (w *Worker) Stop() { } w.Grpc.Stop() - - if w.LocalStorage != nil { - w.LocalStorage.Stop() - } } // Enabled returns if worker is enabled. @@ -195,7 +186,6 @@ func (w *Worker) NewUnmanagedCommitteeNode(runtime runtimeRegistry.Runtime, enab w.Identity, w.KeyManager, w.KeyManagerClient, - w.LocalStorage, w.Roothash, w.Registry, w.Scheduler, @@ -262,17 +252,6 @@ func newWorker( } if enabled { - // Open the local storage. - var err error - if w.LocalStorage, err = host.NewLocalStorage(dataDir, LocalStorageFile); err != nil { - w.logger.Error("failed to initialize local storage", - "err", err, - "data_dir", dataDir, - "local_storage_file", LocalStorageFile, - ) - return nil, err - } - for _, rt := range runtimeRegistry.Runtimes() { // Register all configured runtimes. if err := w.registerRuntime(rt); err != nil { diff --git a/go/worker/keymanager/handler.go b/go/worker/keymanager/handler.go index 90571979b98..72e9e59ce2b 100644 --- a/go/worker/keymanager/handler.go +++ b/go/worker/keymanager/handler.go @@ -4,6 +4,7 @@ import ( "context" "errors" + "github.com/oasislabs/oasis-core/go/runtime/localstorage" "github.com/oasislabs/oasis-core/go/worker/common/host/protocol" ) @@ -14,20 +15,21 @@ var ( ) type hostHandler struct { - w *Worker + w *Worker + localStorage localstorage.LocalStorage } func (h *hostHandler) Handle(ctx context.Context, body *protocol.Body) (*protocol.Body, error) { // Local storage. if body.HostLocalStorageGetRequest != nil { - value, err := h.w.commonWorker.LocalStorage.Get(h.w.runtimeID, body.HostLocalStorageGetRequest.Key) + value, err := h.localStorage.Get(body.HostLocalStorageGetRequest.Key) if err != nil { return nil, err } return &protocol.Body{HostLocalStorageGetResponse: &protocol.HostLocalStorageGetResponse{Value: value}}, nil } if body.HostLocalStorageSetRequest != nil { - if err := h.w.commonWorker.LocalStorage.Set(h.w.runtimeID, body.HostLocalStorageSetRequest.Key, body.HostLocalStorageSetRequest.Value); err != nil { + if err := h.localStorage.Set(body.HostLocalStorageSetRequest.Key, body.HostLocalStorageSetRequest.Value); err != nil { return nil, err } return &protocol.Body{HostLocalStorageSetResponse: &protocol.Empty{}}, nil @@ -36,6 +38,6 @@ func (h *hostHandler) Handle(ctx context.Context, body *protocol.Body) (*protoco return nil, errMethodNotSupported } -func newHostHandler(w *Worker) protocol.Handler { - return &hostHandler{w} +func newHostHandler(w *Worker, localStorage localstorage.LocalStorage) protocol.Handler { + return &hostHandler{w, localStorage} } diff --git a/go/worker/keymanager/init.go b/go/worker/keymanager/init.go index 2aa380ffdb0..1109c6e3a60 100644 --- a/go/worker/keymanager/init.go +++ b/go/worker/keymanager/init.go @@ -15,6 +15,8 @@ import ( ias "github.com/oasislabs/oasis-core/go/ias/api" "github.com/oasislabs/oasis-core/go/keymanager/api" enclaverpc "github.com/oasislabs/oasis-core/go/runtime/enclaverpc/api" + "github.com/oasislabs/oasis-core/go/runtime/localstorage" + runtimeRegistry "github.com/oasislabs/oasis-core/go/runtime/registry" workerCommon "github.com/oasislabs/oasis-core/go/worker/common" "github.com/oasislabs/oasis-core/go/worker/common/host" "github.com/oasislabs/oasis-core/go/worker/registration" @@ -94,6 +96,16 @@ func New( return nil, fmt.Errorf("worker/keymanager: runtime binary not configured") } + // Create local storage for the key manager. + path, err := runtimeRegistry.EnsureRuntimeStateDir(dataDir, w.runtimeID) + if err != nil { + return nil, fmt.Errorf("worker/keymanager: failed to ensure runtime state directory: %w", err) + } + localStorage, err := localstorage.New(path, runtimeRegistry.LocalStorageFile, w.runtimeID) + if err != nil { + return nil, fmt.Errorf("worker/keymanager: cannot create local storage: %w", err) + } + if err := w.registration.RegisterRole(node.RoleKeyManager, w.onNodeRegistration); err != nil { return nil, fmt.Errorf("worker/keymanager: failed to register role: %w", err) } @@ -105,7 +117,7 @@ func New( RuntimeBinary: runtimeBinary, TEEHardware: teeHardware, IAS: ias, - MessageHandler: newHostHandler(w), + MessageHandler: newHostHandler(w, localStorage), } // Register the EnclaveRPC transport gRPC service. From 90f3a206d4a516a9969ef72c5d76512e41db256b Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Sun, 22 Dec 2019 15:07:58 +0100 Subject: [PATCH 3/5] go/storage: Remove namespace support --- .../cmd/storage/benchmark/benchmark.go | 6 +- go/oasis-node/node_test.go | 12 +- go/roothash/tests/tester.go | 47 ++-- go/runtime/history/db.go | 14 +- go/runtime/registry/registry.go | 7 +- go/storage/api/api.go | 6 +- go/storage/crashing_test.go | 3 +- go/storage/database/database_test.go | 1 + go/storage/init.go | 3 + go/storage/mkvs/urkel/db/api/api.go | 6 + go/storage/mkvs/urkel/db/badger/badger.go | 262 +++++++++++------- go/storage/mkvs/urkel/urkel_test.go | 18 +- 12 files changed, 240 insertions(+), 145 deletions(-) diff --git a/go/oasis-node/cmd/storage/benchmark/benchmark.go b/go/oasis-node/cmd/storage/benchmark/benchmark.go index ed9cb9b77f9..64782117760 100644 --- a/go/oasis-node/cmd/storage/benchmark/benchmark.go +++ b/go/oasis-node/cmd/storage/benchmark/benchmark.go @@ -81,7 +81,9 @@ func doBenchmark(cmd *cobra.Command, args []string) { // nolint: gocyclo // Disable expected root checks. viper.Set("storage.debug.insecure_skip_checks", true) - storage, err := storage.New(context.Background(), dataDir, ident, nil, nil) + var ns common.Namespace + + storage, err := storage.New(context.Background(), dataDir, ns, ident, nil, nil) if err != nil { logger.Error("failed to initialize storage", "err", err, @@ -112,8 +114,6 @@ func doBenchmark(cmd *cobra.Command, args []string) { // nolint: gocyclo defer pprof.StopCPUProfile() } - var ns common.Namespace - // Benchmark MKVS storage (single-insert). for _, sz := range []int{ 256, 512, 1024, 4096, 8192, 16384, 32768, diff --git a/go/oasis-node/node_test.go b/go/oasis-node/node_test.go index e4b11d019a4..930676ceec8 100644 --- a/go/oasis-node/node_test.go +++ b/go/oasis-node/node_test.go @@ -366,7 +366,7 @@ func testStorage(t *testing.T, node *testNode) { require.NoError(t, err, "TempDir") defer os.RemoveAll(dataDir) - storage, err := storage.New(context.Background(), dataDir, node.Identity, node.Scheduler, node.Registry) + storage, err := storage.New(context.Background(), dataDir, testNamespace, node.Identity, node.Scheduler, node.Registry) require.NoError(t, err, "storage.New") defer storage.Cleanup() @@ -404,15 +404,7 @@ func testStakingClient(t *testing.T, node *testNode) { } func testRootHash(t *testing.T, node *testNode) { - dataDir, err := ioutil.TempDir("", "oasis-storage-test_") - require.NoError(t, err, "TempDir") - defer os.RemoveAll(dataDir) - - storage, err := storage.New(context.Background(), dataDir, node.Identity, node.Scheduler, node.Registry) - require.NoError(t, err, "storage.New") - defer storage.Cleanup() - - roothashTests.RootHashImplementationTests(t, node.RootHash, node.Consensus, storage) + roothashTests.RootHashImplementationTests(t, node.RootHash, node.Consensus, node.Identity) } func testComputeWorker(t *testing.T, node *testNode) { diff --git a/go/roothash/tests/tester.go b/go/roothash/tests/tester.go index 18add6d1221..3990f806e46 100644 --- a/go/roothash/tests/tester.go +++ b/go/roothash/tests/tester.go @@ -3,6 +3,8 @@ package tests import ( "context" + "io/ioutil" + "os" "strconv" "testing" "time" @@ -12,6 +14,7 @@ import ( "github.com/oasislabs/oasis-core/go/common" "github.com/oasislabs/oasis-core/go/common/crypto/hash" "github.com/oasislabs/oasis-core/go/common/crypto/signature" + "github.com/oasislabs/oasis-core/go/common/identity" "github.com/oasislabs/oasis-core/go/common/pubsub" consensusAPI "github.com/oasislabs/oasis-core/go/consensus/api" epochtime "github.com/oasislabs/oasis-core/go/epochtime/api" @@ -22,7 +25,8 @@ import ( "github.com/oasislabs/oasis-core/go/roothash/api/commitment" "github.com/oasislabs/oasis-core/go/runtime/transaction" scheduler "github.com/oasislabs/oasis-core/go/scheduler/api" - storage "github.com/oasislabs/oasis-core/go/storage/api" + "github.com/oasislabs/oasis-core/go/storage" + storageAPI "github.com/oasislabs/oasis-core/go/storage/api" ) const ( @@ -43,7 +47,7 @@ type runtimeState struct { // RootHashImplementationTests exercises the basic functionality of a // roothash backend. -func RootHashImplementationTests(t *testing.T, backend api.Backend, consensus consensusAPI.Backend, storage storage.Backend) { +func RootHashImplementationTests(t *testing.T, backend api.Backend, consensus consensusAPI.Backend, identity *identity.Identity) { seedBase := []byte("RootHashImplementationTests") require := require.New(t) @@ -92,7 +96,7 @@ func RootHashImplementationTests(t *testing.T, backend api.Backend, consensus co // EpochTransitionBlock was successful. Otherwise this may leave the // committees set to nil and cause a crash. t.Run("SucessfulRound", func(t *testing.T) { - testSucessfulRound(t, backend, consensus, storage, rtStates) + testSucessfulRound(t, backend, consensus, identity, rtStates) }) } @@ -215,17 +219,28 @@ func (s *runtimeState) testEpochTransitionBlock(t *testing.T, scheduler schedule } } -func testSucessfulRound(t *testing.T, backend api.Backend, consensus consensusAPI.Backend, storage storage.Backend, states []*runtimeState) { +func testSucessfulRound(t *testing.T, backend api.Backend, consensus consensusAPI.Backend, identity *identity.Identity, states []*runtimeState) { for _, state := range states { - state.testSuccessfulRound(t, backend, consensus, storage) + state.testSuccessfulRound(t, backend, consensus, identity) } } -func (s *runtimeState) testSuccessfulRound(t *testing.T, backend api.Backend, consensus consensusAPI.Backend, storageBackend storage.Backend) { +func (s *runtimeState) testSuccessfulRound(t *testing.T, backend api.Backend, consensus consensusAPI.Backend, identity *identity.Identity) { require := require.New(t) rt, computeCommittee, mergeCommittee := s.rt, s.computeCommittee, s.mergeCommittee + dataDir, err := ioutil.TempDir("", "oasis-storage-test_") + require.NoError(err, "TempDir") + defer os.RemoveAll(dataDir) + + var ns common.Namespace + copy(ns[:], rt.Runtime.ID[:]) + + storageBackend, err := storage.New(context.Background(), dataDir, ns, identity, consensus.Scheduler(), consensus.Registry()) + require.NoError(err, "storage.New") + defer storageBackend.Cleanup() + child, err := backend.GetLatestBlock(context.Background(), rt.Runtime.ID, consensusAPI.HeightLatest) require.NoError(err, "GetLatestBlock") @@ -234,7 +249,7 @@ func (s *runtimeState) testSuccessfulRound(t *testing.T, backend api.Backend, co defer sub.Close() // Generate a dummy I/O root. - ioRoot := storage.Root{ + ioRoot := storageAPI.Root{ Namespace: child.Header.Namespace, Round: child.Header.Round + 1, } @@ -271,10 +286,10 @@ func (s *runtimeState) testSuccessfulRound(t *testing.T, backend api.Backend, co s.storageCommittee, child.Header.Namespace, child.Header.Round+1, - []storage.ApplyOp{ - storage.ApplyOp{SrcRound: child.Header.Round + 1, SrcRoot: emptyRoot, DstRoot: ioRootHash, WriteLog: ioWriteLog}, + []storageAPI.ApplyOp{ + storageAPI.ApplyOp{SrcRound: child.Header.Round + 1, SrcRoot: emptyRoot, DstRoot: ioRootHash, WriteLog: ioWriteLog}, // NOTE: Twice to get a receipt over both roots which we set to the same value. - storage.ApplyOp{SrcRound: child.Header.Round, SrcRoot: emptyRoot, DstRoot: ioRootHash, WriteLog: ioWriteLog}, + storageAPI.ApplyOp{SrcRound: child.Header.Round, SrcRoot: emptyRoot, DstRoot: ioRootHash, WriteLog: ioWriteLog}, }, ) @@ -467,15 +482,15 @@ func mustGetCommittee( func mustStore( t *testing.T, - store storage.Backend, + store storageAPI.Backend, committee *testCommittee, ns common.Namespace, round uint64, - ops []storage.ApplyOp, + ops []storageAPI.ApplyOp, ) []signature.Signature { require := require.New(t) - receipts, err := store.ApplyBatch(context.Background(), &storage.ApplyBatchRequest{ + receipts, err := store.ApplyBatch(context.Background(), &storageAPI.ApplyBatchRequest{ Namespace: ns, DstRound: round, Ops: ops, @@ -486,14 +501,14 @@ func mustStore( // We need to fake the storage signatures as the storage committee under test // does not contain the key of the actual storage backend. - var body storage.ReceiptBody + var body storageAPI.ReceiptBody err = receipts[0].Open(&body) require.NoError(err, "Open") var signatures []signature.Signature for _, node := range committee.workers { - var receipt *storage.Receipt - receipt, err = storage.SignReceipt(node.Signer, ns, round, body.Roots) + var receipt *storageAPI.Receipt + receipt, err = storageAPI.SignReceipt(node.Signer, ns, round, body.Roots) require.NoError(err, "SignReceipt") signatures = append(signatures, receipt.Signed.Signature) diff --git a/go/runtime/history/db.go b/go/runtime/history/db.go index ed14f752632..ff9a6169211 100644 --- a/go/runtime/history/db.go +++ b/go/runtime/history/db.go @@ -14,9 +14,7 @@ import ( roothash "github.com/oasislabs/oasis-core/go/roothash/api" ) -const ( - dbVersion = 1 -) +const dbVersion = 1 var ( // metadataKeyFmt is the metadata key format. @@ -30,11 +28,15 @@ var ( ) type dbMetadata struct { + // RuntimeID is the runtime ID this database is for. RuntimeID signature.PublicKey `json:"runtime_id"` - Version uint64 `json:"version"` + // Version is the database schema version. + Version uint64 `json:"version"` - LastConsensusHeight int64 `json:"last_consensus_height"` - LastRound uint64 `json:"last_round"` + // LastConsensusHeight is the last consensus height. + LastConsensusHeight int64 `json:"last_consensus_height"` + // LastRound is the last round. + LastRound uint64 `json:"last_round"` } // DB is the history database. diff --git a/go/runtime/registry/registry.go b/go/runtime/registry/registry.go index 2285d9eb906..cba37050703 100644 --- a/go/runtime/registry/registry.go +++ b/go/runtime/registry/registry.go @@ -10,6 +10,7 @@ import ( "github.com/spf13/viper" + "github.com/oasislabs/oasis-core/go/common" "github.com/oasislabs/oasis-core/go/common/crypto/signature" "github.com/oasislabs/oasis-core/go/common/identity" "github.com/oasislabs/oasis-core/go/common/logging" @@ -223,8 +224,10 @@ func (r *runtimeRegistry) addSupportedRuntime(ctx context.Context, id signature. } // Create runtime-specific storage backend. - // TODO: Pass runtime identifier. - storageBackend, err := storage.New(ctx, path, r.identity, r.consensus.Scheduler(), r.consensus.Registry()) + var ns common.Namespace + copy(ns[:], id[:]) + + storageBackend, err := storage.New(ctx, path, ns, r.identity, r.consensus.Scheduler(), r.consensus.Registry()) if err != nil { return fmt.Errorf("runtime/registry: cannot create storage for runtime %s: %w", id, err) } diff --git a/go/storage/api/api.go b/go/storage/api/api.go index 9624fb43b3a..f34b55f7b9e 100644 --- a/go/storage/api/api.go +++ b/go/storage/api/api.go @@ -87,12 +87,16 @@ type Config struct { // InsecureSkipChecks bypasses the known root checks. InsecureSkipChecks bool + + // Namespace is the namespace contained within the database. + Namespace common.Namespace } // ToNodeDB converts from a Config to a node DB Config. func (cfg *Config) ToNodeDB() *nodedb.Config { return &nodedb.Config{ - DB: cfg.DB, + DB: cfg.DB, + Namespace: cfg.Namespace, } } diff --git a/go/storage/crashing_test.go b/go/storage/crashing_test.go index 9955a2ad6a4..f9b8b28dbba 100644 --- a/go/storage/crashing_test.go +++ b/go/storage/crashing_test.go @@ -24,7 +24,8 @@ func TestCrashingBackendDoNotInterfere(t *testing.T) { var ( cfg = api.Config{ - Backend: database.BackendNameBadgerDB, + Backend: database.BackendNameBadgerDB, + Namespace: testNs, } err error ) diff --git a/go/storage/database/database_test.go b/go/storage/database/database_test.go index cbb00808361..7f683e0790c 100644 --- a/go/storage/database/database_test.go +++ b/go/storage/database/database_test.go @@ -35,6 +35,7 @@ func doTestImpl(t *testing.T, backend string) { cfg = api.Config{ Backend: backend, ApplyLockLRUSlots: 100, + Namespace: testNs, } err error ) diff --git a/go/storage/init.go b/go/storage/init.go index b6b34c83c72..c386d97087c 100644 --- a/go/storage/init.go +++ b/go/storage/init.go @@ -10,6 +10,7 @@ import ( flag "github.com/spf13/pflag" "github.com/spf13/viper" + "github.com/oasislabs/oasis-core/go/common" "github.com/oasislabs/oasis-core/go/common/identity" cmdFlags "github.com/oasislabs/oasis-core/go/oasis-node/cmd/common/flags" registry "github.com/oasislabs/oasis-core/go/registry/api" @@ -35,6 +36,7 @@ var Flags = flag.NewFlagSet("", flag.ContinueOnError) func New( ctx context.Context, dataDir string, + namespace common.Namespace, identity *identity.Identity, schedulerBackend scheduler.Backend, registryBackend registry.Backend, @@ -45,6 +47,7 @@ func New( Signer: identity.NodeSigner, ApplyLockLRUSlots: uint64(viper.GetInt(CfgLRUSlots)), InsecureSkipChecks: viper.GetBool(cfgInsecureSkipChecks) && cmdFlags.DebugDontBlameOasis(), + Namespace: namespace, } var ( diff --git a/go/storage/mkvs/urkel/db/api/api.go b/go/storage/mkvs/urkel/db/api/api.go index 632fd582542..4911dcfca40 100644 --- a/go/storage/mkvs/urkel/db/api/api.go +++ b/go/storage/mkvs/urkel/db/api/api.go @@ -35,6 +35,9 @@ var ( ErrRootNotFound = errors.New("urkel: root not found") // ErrRootMustFollowOld indicates that the passed new root does not follow old root. ErrRootMustFollowOld = errors.New("urkel: root must follow old root") + // ErrBadNamespace indicates that the passed namespace does not match what is + // actually contained within the database. + ErrBadNamespace = errors.New("urkel: bad namespace") ) // Config is the node database backend configuration. @@ -44,6 +47,9 @@ type Config struct { // DebugNoFsync will disable fsync() where possible. DebugNoFsync bool + + // Namespace is the namespace contained within the database. + Namespace common.Namespace } // NodeDB is the persistence layer used for persisting the in-memory tree. diff --git a/go/storage/mkvs/urkel/db/badger/badger.go b/go/storage/mkvs/urkel/db/badger/badger.go index 4b0abc23c82..e03f738057b 100644 --- a/go/storage/mkvs/urkel/db/badger/badger.go +++ b/go/storage/mkvs/urkel/db/badger/badger.go @@ -3,6 +3,7 @@ package badger import ( "context" + "fmt" "sync" "github.com/dgraph-io/badger/v2" @@ -20,46 +21,50 @@ import ( "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/writelog" ) +const dbVersion = 1 + var ( _ api.NodeDB = (*badgerNodeDB)(nil) - // TODO: Storing the full namespace with each node seems quite inefficient. - - // nodeKeyFmt is the key format for nodes (namespace, node hash). + // nodeKeyFmt is the key format for nodes (node hash). // // Value is serialized node. - nodeKeyFmt = keyformat.New('N', &common.Namespace{}, &hash.Hash{}) - // writeLogKeyFmt is the key format for write logs (namespace, round, - // new root, old root). + nodeKeyFmt = keyformat.New(0x00, &hash.Hash{}) + // writeLogKeyFmt is the key format for write logs (round, new root, + // old root). // // Value is CBOR-serialized write log. - writeLogKeyFmt = keyformat.New('L', &common.Namespace{}, uint64(0), &hash.Hash{}, &hash.Hash{}) - // rootLinkKeyFmt is the key format for the root links (namespace, round, - // src root, dst root). + writeLogKeyFmt = keyformat.New(0x01, uint64(0), &hash.Hash{}, &hash.Hash{}) + // rootLinkKeyFmt is the key format for the root links (round, src root, + // dst root). // // Value is empty. - rootLinkKeyFmt = keyformat.New('M', &common.Namespace{}, uint64(0), &hash.Hash{}, &hash.Hash{}) + rootLinkKeyFmt = keyformat.New(0x02, uint64(0), &hash.Hash{}, &hash.Hash{}) // rootGcUpdatesKeyFmt is the key format for the pending garbage collection // index updates that need to be applied only in case the given root is among - // the finalized roots. The key format is (namespace, round, root). + // the finalized roots. The key format is (round, root). // // Value is CBOR-serialized list of updates for garbage collection index. - rootGcUpdatesKeyFmt = keyformat.New('I', &common.Namespace{}, uint64(0), &hash.Hash{}) + rootGcUpdatesKeyFmt = keyformat.New(0x03, uint64(0), &hash.Hash{}) // rootAddedNodesKeyFmt is the key format for the pending added nodes for the // given root that need to be removed only in case the given root is not among - // the finalized roots. They key format is (namespace, round, root). + // the finalized roots. They key format is (round, root). // // Value is CBOR-serialized list of node hashes. - rootAddedNodesKeyFmt = keyformat.New('J', &common.Namespace{}, uint64(0), &hash.Hash{}) + rootAddedNodesKeyFmt = keyformat.New(0x04, uint64(0), &hash.Hash{}) // gcIndexKeyFmt is the key format for the garbage collection index - // (namespace, end round, start round, node hash). + // (end round, start round, node hash). // // Value is empty. - gcIndexKeyFmt = keyformat.New('G', &common.Namespace{}, uint64(0), uint64(0), &hash.Hash{}) + gcIndexKeyFmt = keyformat.New(0x05, uint64(0), uint64(0), &hash.Hash{}) // finalizedKeyFmt is the key format for the last finalized round number. // // Value is the last finalized round number. - finalizedKeyFmt = keyformat.New('F', &common.Namespace{}) + finalizedKeyFmt = keyformat.New(0x06) + // metadataKeyFmt is the key format for metadata. + // + // Value is CBOR-serialized metadata. + metadataKeyFmt = keyformat.New(0x07) ) // rootGcIndexUpdate is an element of the rootGcUpdates list. @@ -77,31 +82,44 @@ type rootGcUpdates []rootGcUpdate // rootAddedNodes is the value of the rootAddedNodes keys. type rootAddedNodes []hash.Hash +// metadata is the database metadata. type metadata struct { - sync.RWMutex + sync.RWMutex `json:"-"` + + // Version is the database schema version. + Version uint64 `json:"version"` + // Namespace is the namespace this database is for. + Namespace common.Namespace `json:"namespace"` - lastFinalizedRound map[common.Namespace]uint64 + lastFinalizedRound *uint64 } -func (m *metadata) getLastFinalizedRound(ns common.Namespace) (uint64, bool) { +func (m *metadata) getLastFinalizedRound() (uint64, bool) { m.RLock() defer m.RUnlock() - round, ok := m.lastFinalizedRound[ns] - return round, ok + if m.lastFinalizedRound == nil { + return 0, false + } + return *m.lastFinalizedRound, true } -func (m *metadata) setLastFinalizedRound(ns common.Namespace, round uint64) { +func (m *metadata) setLastFinalizedRound(round uint64) { m.Lock() defer m.Unlock() - m.lastFinalizedRound[ns] = round + if m.lastFinalizedRound != nil && round <= *m.lastFinalizedRound { + return + } + + m.lastFinalizedRound = &round } // New creates a new BadgerDB-backed node database. func New(cfg *api.Config) (api.NodeDB, error) { db := &badgerNodeDB{ - logger: logging.GetLogger("urkel/db/badger"), + logger: logging.GetLogger("urkel/db/badger"), + namespace: cfg.Namespace, } db.CheckpointableDB = api.NewCheckpointableDB(db) @@ -133,6 +151,8 @@ type badgerNodeDB struct { logger *logging.Logger + namespace common.Namespace + db *badger.DB gc *cmnBadger.GCWorker meta metadata @@ -141,49 +161,75 @@ type badgerNodeDB struct { } func (d *badgerNodeDB) load() error { - d.meta.Lock() - defer d.meta.Unlock() - - d.meta.lastFinalizedRound = make(map[common.Namespace]uint64) - - return d.db.View(func(tx *badger.Txn) error { - // Load finalized rounds. - it := tx.NewIterator(badger.IteratorOptions{Prefix: finalizedKeyFmt.Encode()}) - defer it.Close() - - for it.Rewind(); it.Valid(); it.Next() { - item := it.Item() - - var decNs common.Namespace - - if !finalizedKeyFmt.Decode(item.Key(), &decNs) { - // This should not happen as the Badger iterator should take care of it. - panic("urkel/db/badger: bad iterator") - } - - var lastFinalizedRound uint64 - err := item.Value(func(data []byte) error { - return cbor.Unmarshal(data, &lastFinalizedRound) + return d.db.Update(func(tx *badger.Txn) error { + // Load metadata. + item, err := tx.Get(metadataKeyFmt.Encode()) + switch err { + case nil: + // Metadata already exists, just load it and verify that it is + // compatible with what we have here. + err = item.Value(func(data []byte) error { + return cbor.Unmarshal(data, &d.meta) }) if err != nil { return err } - d.meta.lastFinalizedRound[decNs] = lastFinalizedRound + if d.meta.Version != dbVersion { + return fmt.Errorf("incompatible database version (expected: %d got: %d)", + dbVersion, + d.meta.Version, + ) + } + if !d.meta.Namespace.Equal(&d.namespace) { + return fmt.Errorf("incompatible namespace (expected: %s got: %s)", + d.namespace, + d.meta.Namespace, + ) + } + + // Load last finalized round. + item, err = tx.Get(finalizedKeyFmt.Encode()) + switch err { + case nil: + return item.Value(func(data []byte) error { + return cbor.Unmarshal(data, &d.meta.lastFinalizedRound) + }) + case badger.ErrKeyNotFound: + return nil + default: + return err + } + case badger.ErrKeyNotFound: + default: + return err } - return nil + // No metadata exists, create some. + d.meta.Version = dbVersion + d.meta.Namespace = d.namespace + return tx.Set(metadataKeyFmt.Encode(), cbor.Marshal(&d.meta)) }) } +func (d *badgerNodeDB) sanityCheckNamespace(ns common.Namespace) error { + if !ns.Equal(&d.namespace) { + return api.ErrBadNamespace + } + return nil +} + func (d *badgerNodeDB) GetNode(root node.Root, ptr *node.Pointer) (node.Node, error) { if ptr == nil || !ptr.IsClean() { panic("urkel/db/badger: attempted to get invalid pointer from node database") } + if err := d.sanityCheckNamespace(root.Namespace); err != nil { + return nil, err + } tx := d.db.NewTransaction(false) defer tx.Discard() - item, err := tx.Get(nodeKeyFmt.Encode(&root.Namespace, &ptr.Hash)) + item, err := tx.Get(nodeKeyFmt.Encode(&ptr.Hash)) switch err { case nil: case badger.ErrKeyNotFound: @@ -214,6 +260,9 @@ func (d *badgerNodeDB) GetWriteLog(ctx context.Context, startRoot node.Root, end if !endRoot.Follows(&startRoot) { return nil, api.ErrRootMustFollowOld } + if err := d.sanityCheckNamespace(startRoot.Namespace); err != nil { + return nil, err + } tx := d.db.NewTransaction(false) defer tx.Discard() @@ -248,7 +297,7 @@ func (d *badgerNodeDB) GetWriteLog(ctx context.Context, startRoot node.Root, end wl, err := func() (writelog.Iterator, error) { // Iterate over all write logs that result in the current item. - prefix := writeLogKeyFmt.Encode(&endRoot.Namespace, endRoot.Round, &curItem.endRootHash) + prefix := writeLogKeyFmt.Encode(endRoot.Round, &curItem.endRootHash) it := tx.NewIterator(badger.IteratorOptions{Prefix: prefix}) defer it.Close() @@ -259,12 +308,11 @@ func (d *badgerNodeDB) GetWriteLog(ctx context.Context, startRoot node.Root, end item := it.Item() - var decNs common.Namespace var decRound uint64 var decEndRootHash hash.Hash var decStartRootHash hash.Hash - if !writeLogKeyFmt.Decode(item.Key(), &decNs, &decRound, &decEndRootHash, &decStartRootHash) { + if !writeLogKeyFmt.Decode(item.Key(), &decRound, &decEndRootHash, &decStartRootHash) { // This should not happen as the Badger iterator should take care of it. panic("urkel/db/badger: bad iterator") } @@ -339,6 +387,10 @@ func (d *badgerNodeDB) GetWriteLog(ctx context.Context, startRoot node.Root, end } func (d *badgerNodeDB) HasRoot(root node.Root) bool { + if err := d.sanityCheckNamespace(root.Namespace); err != nil { + return false + } + // An empty root is always implicitly present. if root.Hash.IsEmpty() { return true @@ -348,7 +400,7 @@ func (d *badgerNodeDB) HasRoot(root node.Root) bool { emptyHash.Empty() err := d.db.View(func(tx *badger.Txn) error { - _, err := tx.Get(rootLinkKeyFmt.Encode(&root.Namespace, root.Round, &root.Hash, &emptyHash)) + _, err := tx.Get(rootLinkKeyFmt.Encode(root.Round, &root.Hash, &emptyHash)) return err }) switch err { @@ -362,6 +414,10 @@ func (d *badgerNodeDB) HasRoot(root node.Root) bool { } func (d *badgerNodeDB) Finalize(ctx context.Context, namespace common.Namespace, round uint64, roots []hash.Hash) error { // nolint: gocyclo + if err := d.sanityCheckNamespace(namespace); err != nil { + return err + } + // We don't need to put the operations into a write transaction as the // content of the node database is based on immutable keys, so multiple // concurrent prunes cannot cause corruption. @@ -371,7 +427,7 @@ func (d *badgerNodeDB) Finalize(ctx context.Context, namespace common.Namespace, defer tx.Discard() // Make sure that the previous round has been finalized. - lastFinalizedRound, exists := d.meta.getLastFinalizedRound(namespace) + lastFinalizedRound, exists := d.meta.getLastFinalizedRound() if round > 0 && exists && lastFinalizedRound < (round-1) { return api.ErrNotFinalized } @@ -390,7 +446,7 @@ func (d *badgerNodeDB) Finalize(ctx context.Context, namespace common.Namespace, for updated := true; updated; { updated = false - prefix := rootLinkKeyFmt.Encode(&namespace, round) + prefix := rootLinkKeyFmt.Encode(round) it := tx.NewIterator(badger.IteratorOptions{Prefix: prefix}) defer it.Close() @@ -398,12 +454,11 @@ func (d *badgerNodeDB) Finalize(ctx context.Context, namespace common.Namespace, item := it.Item() // If next root hash is among the finalized roots, add this root as well. - var decNs common.Namespace var decRound uint64 var rootHash hash.Hash var nextRoot hash.Hash - if !rootLinkKeyFmt.Decode(item.Key(), &decNs, &decRound, &rootHash, &nextRoot) { + if !rootLinkKeyFmt.Decode(item.Key(), &decRound, &rootHash, &nextRoot) { // This should not happen as the Badger iterator should take care of it. panic("urkel/db/badger: bad iterator") } @@ -420,7 +475,7 @@ func (d *badgerNodeDB) Finalize(ctx context.Context, namespace common.Namespace, // Go through all roots and either commit GC updates or prune them based on // whether they are included in the finalized roots or not. - prefix := rootLinkKeyFmt.Encode(&namespace, round) + prefix := rootLinkKeyFmt.Encode(round) it := tx.NewIterator(badger.IteratorOptions{Prefix: prefix}) defer it.Close() @@ -428,12 +483,11 @@ func (d *badgerNodeDB) Finalize(ctx context.Context, namespace common.Namespace, notLoneNodes := make(map[hash.Hash]bool) for it.Rewind(); it.Valid(); it.Next() { - var decNs common.Namespace var decRound uint64 var rootHash hash.Hash var nextRoot hash.Hash - if !rootLinkKeyFmt.Decode(it.Item().Key(), &decNs, &decRound, &rootHash, &nextRoot) { + if !rootLinkKeyFmt.Decode(it.Item().Key(), &decRound, &rootHash, &nextRoot) { // This should not happen as the Badger iterator should take care of it. panic("urkel/db/badger: bad iterator") } @@ -448,8 +502,8 @@ func (d *badgerNodeDB) Finalize(ctx context.Context, namespace common.Namespace, continue } - rootGcUpdatesKey := rootGcUpdatesKeyFmt.Encode(&namespace, round, &rootHash) - rootAddedNodesKey := rootAddedNodesKeyFmt.Encode(&namespace, round, &rootHash) + rootGcUpdatesKey := rootGcUpdatesKeyFmt.Encode(round, &rootHash) + rootAddedNodesKey := rootAddedNodesKeyFmt.Encode(round, &rootHash) // Load hashes of nodes added during this round for this root. item, err := tx.Get(rootAddedNodesKey) @@ -481,7 +535,7 @@ func (d *badgerNodeDB) Finalize(ctx context.Context, namespace common.Namespace, } for _, u := range gcUpdates { - key := gcIndexKeyFmt.Encode(&namespace, u.EndRound, u.StartRound, &u.Node) + key := gcIndexKeyFmt.Encode(u.EndRound, u.StartRound, &u.Node) if err = batch.Set(key, []byte("")); err != nil { return err } @@ -505,7 +559,7 @@ func (d *badgerNodeDB) Finalize(ctx context.Context, namespace common.Namespace, // Remove write logs for the non-finalized root. if err = func() error { - rootWriteLogsPrefix := writeLogKeyFmt.Encode(&namespace, round, &rootHash) + rootWriteLogsPrefix := writeLogKeyFmt.Encode(round, &rootHash) wit := tx.NewIterator(badger.IteratorOptions{Prefix: rootWriteLogsPrefix}) defer wit.Close() @@ -536,7 +590,7 @@ func (d *badgerNodeDB) Finalize(ctx context.Context, namespace common.Namespace, continue } - key := nodeKeyFmt.Encode(&namespace, &h) + key := nodeKeyFmt.Encode(&h) if err := batch.Delete(key); err != nil { return err } @@ -544,7 +598,7 @@ func (d *badgerNodeDB) Finalize(ctx context.Context, namespace common.Namespace, // Update last finalized round. This is done at the end as Badger may // split the batch into multiple transactions. - if err := batch.Set(finalizedKeyFmt.Encode(&namespace), cbor.Marshal(round)); err != nil { + if err := batch.Set(finalizedKeyFmt.Encode(), cbor.Marshal(round)); err != nil { return err } @@ -554,12 +608,16 @@ func (d *badgerNodeDB) Finalize(ctx context.Context, namespace common.Namespace, } // Update cached last finalized round. - d.meta.setLastFinalizedRound(namespace, round) + d.meta.setLastFinalizedRound(round) return nil } func (d *badgerNodeDB) Prune(ctx context.Context, namespace common.Namespace, round uint64) (int, error) { + if err := d.sanityCheckNamespace(namespace); err != nil { + return 0, err + } + // We don't need to put the operations into a write transaction as the // content of the node database is based on immutable keys, so multiple // concurrent prunes cannot cause corruption. @@ -569,12 +627,12 @@ func (d *badgerNodeDB) Prune(ctx context.Context, namespace common.Namespace, ro defer tx.Discard() // Make sure that the round that we try to prune has been finalized. - lastFinalizedRound, exists := d.meta.getLastFinalizedRound(namespace) + lastFinalizedRound, exists := d.meta.getLastFinalizedRound() if !exists || lastFinalizedRound < round { return 0, api.ErrNotFinalized } - prevRound, err := getPreviousRound(tx, namespace, round) + prevRound, err := getPreviousRound(tx, round) if err != nil { return 0, err } @@ -582,17 +640,16 @@ func (d *badgerNodeDB) Prune(ctx context.Context, namespace common.Namespace, ro pruneHashes := make(map[hash.Hash]bool) // Iterate over all lifetimes that end in the passed round. - prefix := gcIndexKeyFmt.Encode(&namespace, round) + prefix := gcIndexKeyFmt.Encode(round) it := tx.NewIterator(badger.IteratorOptions{Prefix: prefix}) defer it.Close() for it.Rewind(); it.Valid(); it.Next() { - var decNs common.Namespace var endRound uint64 var startRound uint64 var h hash.Hash - if !gcIndexKeyFmt.Decode(it.Item().Key(), &decNs, &endRound, &startRound, &h) { + if !gcIndexKeyFmt.Decode(it.Item().Key(), &endRound, &startRound, &h) { // This should not happen as the Badger iterator should take care of it. panic("urkel/db/badger: bad iterator") } @@ -608,14 +665,14 @@ func (d *badgerNodeDB) Prune(ctx context.Context, namespace common.Namespace, ro } else { // Since the current round is being pruned, the lifetime ends at the // previous round. - if err = batch.Set(gcIndexKeyFmt.Encode(&decNs, prevRound, startRound, &h), []byte("")); err != nil { + if err = batch.Set(gcIndexKeyFmt.Encode(prevRound, startRound, &h), []byte("")); err != nil { return 0, err } } } // Prune all roots in round. - prefix = rootLinkKeyFmt.Encode(&namespace, round) + prefix = rootLinkKeyFmt.Encode(round) it = tx.NewIterator(badger.IteratorOptions{Prefix: prefix}) defer it.Close() @@ -623,12 +680,11 @@ func (d *badgerNodeDB) Prune(ctx context.Context, namespace common.Namespace, ro for it.Rewind(); it.Valid(); it.Next() { item := it.Item() - var decNs common.Namespace var decRound uint64 var rootHash hash.Hash var nextRoot hash.Hash - if !rootLinkKeyFmt.Decode(item.Key(), &decNs, &decRound, &rootHash, &nextRoot) { + if !rootLinkKeyFmt.Decode(item.Key(), &decRound, &rootHash, &nextRoot) { // This should not happen as the iterator should take care of it. panic("urkel/db/badger: bad iterator") } @@ -666,7 +722,7 @@ func (d *badgerNodeDB) Prune(ctx context.Context, namespace common.Namespace, ro } // Prune all write logs in round. - prefix = writeLogKeyFmt.Encode(&namespace, round) + prefix = writeLogKeyFmt.Encode(round) it = tx.NewIterator(badger.IteratorOptions{Prefix: prefix}) defer it.Close() @@ -679,7 +735,7 @@ func (d *badgerNodeDB) Prune(ctx context.Context, namespace common.Namespace, ro // Prune all collected hashes. var pruned int for h := range pruneHashes { - if err = batch.Delete(nodeKeyFmt.Encode(&namespace, &h)); err != nil { + if err = batch.Delete(nodeKeyFmt.Encode(&h)); err != nil { return 0, err } pruned++ @@ -702,11 +758,10 @@ func (d *badgerNodeDB) NewBatch(namespace common.Namespace, round uint64, oldRoo // the transaction out. return &badgerBatch{ - db: d, - bat: d.db.NewWriteBatch(), - namespace: namespace, - round: round, - oldRoot: oldRoot, + db: d, + bat: d.db.NewWriteBatch(), + round: round, + oldRoot: oldRoot, } } @@ -722,29 +777,28 @@ func (d *badgerNodeDB) Close() { }) } -func getPreviousRound(tx *badger.Txn, namespace common.Namespace, round uint64) (uint64, error) { +func getPreviousRound(tx *badger.Txn, round uint64) (uint64, error) { if round == 0 { return 0, nil } it := tx.NewIterator(badger.IteratorOptions{ Reverse: true, - Prefix: rootLinkKeyFmt.Encode(&namespace), + Prefix: rootLinkKeyFmt.Encode(), }) defer it.Close() // When iterating in reverse, seek moves us to the given key or to the previous // key in case the given key does not exist. So this will give us either the // queried round or the previous round. - it.Seek(rootLinkKeyFmt.Encode(&namespace, round)) + it.Seek(rootLinkKeyFmt.Encode(round)) if !it.Valid() { // No previous round. return 0, nil } // Try to decode the current or previous round as a linkKeyFmt. - var decNs common.Namespace var decRound uint64 - if !rootLinkKeyFmt.Decode(it.Item().Key(), &decNs, &decRound) || !decNs.Equal(&namespace) { + if !rootLinkKeyFmt.Decode(it.Item().Key(), &decRound) { // No previous round. return 0, nil } @@ -757,7 +811,7 @@ func getPreviousRound(tx *badger.Txn, namespace common.Namespace, round uint64) return 0, nil } - if !rootLinkKeyFmt.Decode(it.Item().Key(), &decNs, &decRound) || !decNs.Equal(&namespace) { + if !rootLinkKeyFmt.Decode(it.Item().Key(), &decRound) { // No previous round. return 0, nil } @@ -772,9 +826,8 @@ type badgerBatch struct { db *badgerNodeDB bat *badger.WriteBatch - namespace common.Namespace - round uint64 - oldRoot node.Root + round uint64 + oldRoot node.Root writeLog writelog.WriteLog annotations writelog.Annotations @@ -801,6 +854,9 @@ func (ba *badgerBatch) RemoveNodes(nodes []node.Node) error { } func (ba *badgerBatch) Commit(root node.Root) error { + if err := ba.db.sanityCheckNamespace(root.Namespace); err != nil { + return err + } if !root.Follows(&ba.oldRoot) { return api.ErrRootMustFollowOld } @@ -814,13 +870,13 @@ func (ba *badgerBatch) Commit(root node.Root) error { // Make sure that the round that we try to commit into has not yet been // finalized. - lastFinalizedRound, exists := ba.db.meta.getLastFinalizedRound(root.Namespace) + lastFinalizedRound, exists := ba.db.meta.getLastFinalizedRound() if exists && lastFinalizedRound >= root.Round { return api.ErrAlreadyFinalized } // Get previous round. - prevRound, err := getPreviousRound(tx, root.Namespace, root.Round) + prevRound, err := getPreviousRound(tx, root.Round) if err != nil { return err } @@ -828,7 +884,7 @@ func (ba *badgerBatch) Commit(root node.Root) error { // Create root with an empty next link. var emptyHash hash.Hash emptyHash.Empty() - if err = ba.bat.Set(rootLinkKeyFmt.Encode(&root.Namespace, root.Round, &root.Hash, &emptyHash), []byte("")); err != nil { + if err = ba.bat.Set(rootLinkKeyFmt.Encode(root.Round, &root.Hash, &emptyHash), []byte("")); err != nil { return errors.Wrap(err, "urkel/db/badger: set returned error") } @@ -838,7 +894,7 @@ func (ba *badgerBatch) Commit(root node.Root) error { return api.ErrPreviousRoundMismatch } - key := rootLinkKeyFmt.Encode(&ba.oldRoot.Namespace, ba.oldRoot.Round, &ba.oldRoot.Hash, &emptyHash) + key := rootLinkKeyFmt.Encode(ba.oldRoot.Round, &ba.oldRoot.Hash, &emptyHash) _, err = tx.Get(key) switch err { case nil: @@ -848,7 +904,7 @@ func (ba *badgerBatch) Commit(root node.Root) error { return err } - key = rootLinkKeyFmt.Encode(&ba.oldRoot.Namespace, ba.oldRoot.Round, &ba.oldRoot.Hash, &root.Hash) + key = rootLinkKeyFmt.Encode(ba.oldRoot.Round, &ba.oldRoot.Hash, &root.Hash) if err = ba.bat.Set(key, []byte("")); err != nil { return errors.Wrap(err, "urkel/db/badger: set returned error") } @@ -876,13 +932,13 @@ func (ba *badgerBatch) Commit(root node.Root) error { Node: h, }) } - key := rootGcUpdatesKeyFmt.Encode(&root.Namespace, root.Round, &root.Hash) + key := rootGcUpdatesKeyFmt.Encode(root.Round, &root.Hash) if err = ba.bat.Set(key, cbor.Marshal(gcUpdates)); err != nil { return errors.Wrap(err, "urkel/db/badger: set returned error") } // Store added nodes (only needed until the round is finalized). - key = rootAddedNodesKeyFmt.Encode(&root.Namespace, root.Round, &root.Hash) + key = rootAddedNodesKeyFmt.Encode(root.Round, &root.Hash) if err = ba.bat.Set(key, cbor.Marshal(ba.addedNodes)); err != nil { return errors.Wrap(err, "urkel/db/badger: set returned error") } @@ -891,7 +947,7 @@ func (ba *badgerBatch) Commit(root node.Root) error { if ba.writeLog != nil && ba.annotations != nil { log := api.MakeHashedDBWriteLog(ba.writeLog, ba.annotations) bytes := cbor.Marshal(log) - key := writeLogKeyFmt.Encode(&root.Namespace, root.Round, &root.Hash, &ba.oldRoot.Hash) + key := writeLogKeyFmt.Encode(root.Round, &root.Hash, &ba.oldRoot.Hash) if err := ba.bat.Set(key, bytes); err != nil { return errors.Wrap(err, "urkel/db/badger: set new write log returned error") } @@ -929,7 +985,7 @@ func (s *badgerSubtree) PutNode(depth node.Depth, ptr *node.Pointer) error { h := ptr.Node.GetHash() s.batch.addedNodes = append(s.batch.addedNodes, h) - if err = s.batch.bat.Set(nodeKeyFmt.Encode(&s.batch.namespace, &h), data); err != nil { + if err = s.batch.bat.Set(nodeKeyFmt.Encode(&h), data); err != nil { return err } return nil diff --git a/go/storage/mkvs/urkel/urkel_test.go b/go/storage/mkvs/urkel/urkel_test.go index ba048da5fa9..c4b838d9c93 100644 --- a/go/storage/mkvs/urkel/urkel_test.go +++ b/go/storage/mkvs/urkel/urkel_test.go @@ -1432,7 +1432,7 @@ func testPruneLoneRootsShared2(t *testing.T, ndb db.NodeDB) { Items []item }{ { - Namespace: common.Namespace{}, + Namespace: testNs, Round: 4, SrcRoot: "xnK40e9W7Sirh8NiLFEUBpvdOte4+XN0mNDAHs7wlno=", DstRoot: "lBnLyljpBdIweInarStbMkAGn8qq2sftGfJJWsvHCTk=", @@ -1449,7 +1449,7 @@ func testPruneLoneRootsShared2(t *testing.T, ndb db.NodeDB) { }, }, { - Namespace: common.Namespace{}, + Namespace: testNs, Round: 4, SrcRoot: "lBnLyljpBdIweInarStbMkAGn8qq2sftGfJJWsvHCTk=", DstRoot: "XeNxDPHiY0PAQI5vFxFNxjwgAj++Sf0kCohpaUvImUg=", @@ -1471,7 +1471,7 @@ func testPruneLoneRootsShared2(t *testing.T, ndb db.NodeDB) { }, }, { - Namespace: common.Namespace{}, + Namespace: testNs, Round: 4, SrcRoot: "lBnLyljpBdIweInarStbMkAGn8qq2sftGfJJWsvHCTk=", DstRoot: "rgbZz2sV2QlI/XG/+GiQoYlDpmxrMbY/hFs6PhTu1hA=", @@ -1805,6 +1805,17 @@ func testErrors(t *testing.T, ndb db.NodeDB) { _, _, err = tree.Commit(ctx, testNs, 0) require.Error(t, err, "Commit should fail for already finalized round") require.Equal(t, db.ErrAlreadyFinalized, err) + + // Commit for a different namespace should fail. + var badNs common.Namespace + _ = badNs.UnmarshalText([]byte("badbadbadbadbadbadbadbadbadbadbadbadbadbadbadbadbadbadbadbadbadb")) + + tree = New(nil, ndb) + err = tree.Insert(ctx, []byte("bad namespace"), []byte("woohoo")) + require.NoError(t, err, "Insert") + _, _, err = tree.Commit(ctx, badNs, 0) + require.Error(t, err, "Commit should fail for bad namespace") + require.Equal(t, db.ErrBadNamespace, err) } func testBackend( @@ -1875,6 +1886,7 @@ func TestUrkelBadgerBackend(t *testing.T) { ndb, err := badgerDb.New(&db.Config{ DB: dir, DebugNoFsync: true, + Namespace: testNs, }) require.NoError(t, err, "New") From dbb0c7278adf0ede0bc6da07089ba1f02b39d3e9 Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Mon, 23 Dec 2019 13:12:49 +0100 Subject: [PATCH 4/5] go/storage/client: Remove multi-runtime support Since the expected usage is now to create a storage client for each runtime (namespace), the client can be simplified to only support tracking a single runtime. --- .changelog/2494.breaking.md | 11 +++ go/oasis-node/cmd/debug/storage/export.go | 95 ++++++++++--------- go/oasis-node/cmd/debug/storage/storage.go | 3 +- go/oasis-node/node_test.go | 2 +- go/roothash/tests/tester.go | 6 +- go/runtime/registry/registry.go | 12 --- go/storage/api/api.go | 3 - go/storage/client/client.go | 103 ++------------------- go/storage/client/init.go | 32 +++---- go/storage/client/tests/tests.go | 10 +- go/storage/init.go | 2 +- go/storage/metrics.go | 10 -- go/worker/storage/committee/node.go | 16 +--- 13 files changed, 100 insertions(+), 205 deletions(-) create mode 100644 .changelog/2494.breaking.md diff --git a/.changelog/2494.breaking.md b/.changelog/2494.breaking.md new file mode 100644 index 00000000000..bfb137e39aa --- /dev/null +++ b/.changelog/2494.breaking.md @@ -0,0 +1,11 @@ +Make storage per-runtime. + +Previously there was a single storage backend used by `oasis-node` which required that a single +database supported multiple namespaces for the case when multiple runtimes were being used in a +single node. + +This change simplifies the storage database backends by removing the need for backends to implement +multi-namespace support, reducing overhead and cleanly separating per-runtime state. + +Due to this changing the internal database format, this breaks previous (compute node) deployments +with no way to do an automatic migration. diff --git a/go/oasis-node/cmd/debug/storage/export.go b/go/oasis-node/cmd/debug/storage/export.go index e3d341f2470..c2ad051b19c 100644 --- a/go/oasis-node/cmd/debug/storage/export.go +++ b/go/oasis-node/cmd/debug/storage/export.go @@ -14,8 +14,11 @@ import ( "github.com/spf13/viper" "github.com/oasislabs/oasis-core/go/common" + "github.com/oasislabs/oasis-core/go/common/crypto/signature" cmdCommon "github.com/oasislabs/oasis-core/go/oasis-node/cmd/common" cmdConsensus "github.com/oasislabs/oasis-core/go/oasis-node/cmd/common/consensus" + registry "github.com/oasislabs/oasis-core/go/registry/api" + runtimeRegistry "github.com/oasislabs/oasis-core/go/runtime/registry" "github.com/oasislabs/oasis-core/go/storage" storageAPI "github.com/oasislabs/oasis-core/go/storage/api" storageClient "github.com/oasislabs/oasis-core/go/storage/client" @@ -66,61 +69,66 @@ func doExport(cmd *cobra.Command, args []string) { // Load the genesis document. genesisDoc := cmdConsensus.InitGenesis() + // For each storage root. + for runtimeID, rtg := range genesisDoc.RootHash.RuntimeStates { + logger.Info("fetching checkpoint write log", + "runtime_id", runtimeID, + ) + + if err := exportRuntime(dataDir, destDir, runtimeID, rtg); err != nil { + return + } + } + + ok = true +} + +func exportRuntime(dataDir, destDir string, id signature.PublicKey, rtg *registry.RuntimeGenesis) error { + dataDir = filepath.Join(dataDir, runtimeRegistry.RuntimesDir, id.String()) + // Initialize the storage backend. - storageBackend, err := newDirectStorageBackend(dataDir) + var ns common.Namespace + _ = ns.UnmarshalBinary(id[:]) + + storageBackend, err := newDirectStorageBackend(dataDir, ns) if err != nil { logger.Error("failed to construct storage backend", "err", err, ) - return + return err } logger.Info("waiting for storage backend initialization") <-storageBackend.Initialized() defer storageBackend.Cleanup() - // For each storage root. - for runtimeID, rtg := range genesisDoc.RootHash.RuntimeStates { - logger.Info("fetching checkpoint write log", - "runtime_id", runtimeID, - ) - - // Use RuntimeID for the Roothash namespace. - var ns common.Namespace - _ = ns.UnmarshalBinary(runtimeID[:]) - - // Get the checkpoint iterator. - root := storageAPI.Root{ - Namespace: ns, - Round: rtg.Round, - Hash: rtg.StateRoot, - } - it, err := storageBackend.GetCheckpoint(context.Background(), - &storageAPI.GetCheckpointRequest{ - Root: root, - }, - ) - if err != nil { - logger.Error("failed getting checkpoint", - "err", err, - "namespace", root.Namespace, - "round", root.Round, - "root", root.Hash, - ) - return - } - - fn := fmt.Sprintf("storage-dump-%v-%d.json", - runtimeID.String(), - rtg.Round, + // Get the checkpoint iterator. + root := storageAPI.Root{ + Namespace: ns, + Round: rtg.Round, + Hash: rtg.StateRoot, + } + it, err := storageBackend.GetCheckpoint(context.Background(), + &storageAPI.GetCheckpointRequest{ + Root: root, + }, + ) + if err != nil { + logger.Error("failed getting checkpoint", + "err", err, + "namespace", root.Namespace, + "round", root.Round, + "root", root.Hash, ) - fn = filepath.Join(destDir, fn) - if err = exportIterator(fn, &root, it); err != nil { - return - } + return err } - ok = true + fn := fmt.Sprintf("storage-dump-%v-%d.json", + root.Namespace.String(), + root.Round, + ) + fn = filepath.Join(destDir, fn) + return exportIterator(fn, &root, it) } func exportIterator(fn string, root *storageAPI.Root, it storageAPI.WriteLogIterator) error { @@ -179,13 +187,14 @@ func exportIterator(fn string, root *storageAPI.Root, it storageAPI.WriteLogIter } } -func newDirectStorageBackend(dataDir string) (storageAPI.Backend, error) { +func newDirectStorageBackend(dataDir string, namespace common.Namespace) (storageAPI.Backend, error) { // The right thing to do will be to use storage.New, but the backend config // assumes that identity is valid, and we don't have one. cfg := &storageAPI.Config{ Backend: strings.ToLower(viper.GetString(storage.CfgBackend)), DB: dataDir, ApplyLockLRUSlots: uint64(viper.GetInt(storage.CfgLRUSlots)), + Namespace: namespace, } b := strings.ToLower(viper.GetString(storage.CfgBackend)) @@ -194,7 +203,7 @@ func newDirectStorageBackend(dataDir string) (storageAPI.Backend, error) { cfg.DB = filepath.Join(cfg.DB, storageDatabase.DefaultFileName(cfg.Backend)) return storageDatabase.New(cfg) case storageClient.BackendName: - return storageClient.New(context.Background(), nil, nil, nil) + return storageClient.New(context.Background(), namespace, nil, nil, nil) default: return nil, fmt.Errorf("storage: unsupported backend: '%v'", cfg.Backend) } diff --git a/go/oasis-node/cmd/debug/storage/storage.go b/go/oasis-node/cmd/debug/storage/storage.go index 032fd22ef26..d6facba3a2b 100644 --- a/go/oasis-node/cmd/debug/storage/storage.go +++ b/go/oasis-node/cmd/debug/storage/storage.go @@ -10,6 +10,7 @@ import ( "github.com/spf13/cobra" + "github.com/oasislabs/oasis-core/go/common" "github.com/oasislabs/oasis-core/go/common/crypto/signature" "github.com/oasislabs/oasis-core/go/common/logging" cmdFlags "github.com/oasislabs/oasis-core/go/oasis-node/cmd/common/flags" @@ -147,7 +148,7 @@ func doCheckRoots(cmd *cobra.Command, args []string) { storageWorkerClient := storageWorkerAPI.NewStorageWorkerClient(conn) defer conn.Close() - storageClient, err := storageClient.New(ctx, nil, nil, nil) + storageClient, err := storageClient.New(ctx, common.Namespace{}, nil, nil, nil) if err != nil { logger.Error("error while connecting to storage client", "err", err, diff --git a/go/oasis-node/node_test.go b/go/oasis-node/node_test.go index 930676ceec8..196db1d0dff 100644 --- a/go/oasis-node/node_test.go +++ b/go/oasis-node/node_test.go @@ -451,7 +451,7 @@ func testStorageClientWithNode(t *testing.T, node *testNode) { for _, kv := range config { viper.Set(kv.key, kv.value) } - debugClient, err := storageClient.New(ctx, node.Identity, nil, nil) + debugClient, err := storageClient.New(ctx, testNamespace, node.Identity, nil, nil) require.NoError(t, err, "NewDebugStorageClient") // Determine the current round. This is required so that we can commit into diff --git a/go/roothash/tests/tester.go b/go/roothash/tests/tester.go index 3990f806e46..db05041eab8 100644 --- a/go/roothash/tests/tester.go +++ b/go/roothash/tests/tester.go @@ -95,8 +95,8 @@ func RootHashImplementationTests(t *testing.T, backend api.Backend, consensus co // It only makes sense to run the SuccessfulRound test in case the // EpochTransitionBlock was successful. Otherwise this may leave the // committees set to nil and cause a crash. - t.Run("SucessfulRound", func(t *testing.T) { - testSucessfulRound(t, backend, consensus, identity, rtStates) + t.Run("SuccessfulRound", func(t *testing.T) { + testSuccessfulRound(t, backend, consensus, identity, rtStates) }) } @@ -219,7 +219,7 @@ func (s *runtimeState) testEpochTransitionBlock(t *testing.T, scheduler schedule } } -func testSucessfulRound(t *testing.T, backend api.Backend, consensus consensusAPI.Backend, identity *identity.Identity, states []*runtimeState) { +func testSuccessfulRound(t *testing.T, backend api.Backend, consensus consensusAPI.Backend, identity *identity.Identity, states []*runtimeState) { for _, state := range states { state.testSuccessfulRound(t, backend, consensus, identity) } diff --git a/go/runtime/registry/registry.go b/go/runtime/registry/registry.go index cba37050703..73750df20e6 100644 --- a/go/runtime/registry/registry.go +++ b/go/runtime/registry/registry.go @@ -241,18 +241,6 @@ func (r *runtimeRegistry) addSupportedRuntime(ctx context.Context, id signature. return fmt.Errorf("runtime/registry: failed to start tag indexer for runtime %s: %w", id, err) } - // If using a storage client, it should watch the configured runtimes. - // TODO: This should be done automatically by the storage backend when we add a runtime parameter. - if storageClient, ok := storageBackend.(storageAPI.ClientBackend); ok { - if err = storageClient.WatchRuntime(id); err != nil { - r.logger.Warn("error watching storage runtime, expected if using metricswrapper with local backend", - "err", err, - ) - } - } else { - r.logger.Info("not watching storage runtime since not using a storage client backend") - } - r.runtimes[id] = &runtime{ id: id, consensus: r.consensus, diff --git a/go/storage/api/api.go b/go/storage/api/api.go index f34b55f7b9e..6352753dfe9 100644 --- a/go/storage/api/api.go +++ b/go/storage/api/api.go @@ -350,7 +350,4 @@ type ClientBackend interface { // GetConnectedNodes returns currently connected storage nodes. GetConnectedNodes() []*node.Node - - // WatchRuntime adds a runtime for which client should keep track of scheduled storage nodes. - WatchRuntime(signature.PublicKey) error } diff --git a/go/storage/client/client.go b/go/storage/client/client.go index aa35c993758..852cb396ba0 100644 --- a/go/storage/client/client.go +++ b/go/storage/client/client.go @@ -6,7 +6,6 @@ import ( "context" cryptorand "crypto/rand" "math/rand" - "sync" "time" "github.com/cenkalti/backoff" @@ -18,11 +17,8 @@ import ( "github.com/oasislabs/oasis-core/go/common/crypto/hash" "github.com/oasislabs/oasis-core/go/common/crypto/mathrand" "github.com/oasislabs/oasis-core/go/common/crypto/signature" - "github.com/oasislabs/oasis-core/go/common/identity" "github.com/oasislabs/oasis-core/go/common/logging" "github.com/oasislabs/oasis-core/go/common/node" - registry "github.com/oasislabs/oasis-core/go/registry/api" - scheduler "github.com/oasislabs/oasis-core/go/scheduler/api" "github.com/oasislabs/oasis-core/go/storage/api" ) @@ -32,8 +28,6 @@ var ( ) var ( - // ErrNoWatcher is an error when watcher for runtime is missing. - ErrNoWatcher = errors.New("storage/client: no watcher for runtime") // ErrStorageNotAvailable is the error returned when no storage node is available. ErrStorageNotAvailable = errors.New("storage/client: storage not available") ) @@ -51,78 +45,18 @@ type storageClientBackend struct { logger *logging.Logger - initCh chan struct{} - signaledInit bool - debugRuntimeID *signature.PublicKey - scheduler scheduler.Backend - registry registry.Backend - - runtimeWatchersLock sync.RWMutex - runtimeWatchers map[signature.PublicKey]storageWatcher - - identity *identity.Identity + runtimeWatcher storageWatcher haltCtx context.Context cancelFn context.CancelFunc } -func (b *storageClientBackend) getStorageWatcher(runtimeID signature.PublicKey) (storageWatcher, error) { - b.runtimeWatchersLock.RLock() - defer b.runtimeWatchersLock.RUnlock() - - watcher := b.runtimeWatchers[runtimeID] - if watcher == nil { - b.logger.Error("worker/storage/client: no watcher for runtime", - "runtime_id", runtimeID, - ) - return nil, ErrNoWatcher - } - return watcher, nil -} - // GetConnectedNodes returns registry node information about all connected // storage nodes. func (b *storageClientBackend) GetConnectedNodes() []*node.Node { - b.runtimeWatchersLock.RLock() - defer b.runtimeWatchersLock.RUnlock() - - nodes := []*node.Node{} - for _, watcher := range b.runtimeWatchers { - nodes = append(nodes, watcher.getConnectedNodes()...) - } - return nodes -} - -func (b *storageClientBackend) WatchRuntime(id signature.PublicKey) error { - b.runtimeWatchersLock.Lock() - defer b.runtimeWatchersLock.Unlock() - - watcher := b.runtimeWatchers[id] - if watcher != nil { - // Already watching, nothing to do. - return nil - } - - // Watcher doesn't exist. Start new watcher. - watcher = newWatcher(b.ctx, id, b.identity, b.scheduler, b.registry) - b.runtimeWatchers[id] = watcher - - // Signal init when the first registered runtime is initialized. - if !b.signaledInit { - b.signaledInit = true - go func() { - select { - case <-watcher.initialized(): - case <-b.ctx.Done(): - return - } - close(b.initCh) - }() - } - - return nil + return b.runtimeWatcher.getConnectedNodes() } type grpcResponse struct { @@ -159,17 +93,7 @@ func (b *storageClientBackend) writeWithClient( return nil, ErrStorageNotAvailable } - // Get watcher for runtime. - watcher, err := b.getStorageWatcher(runtimeID) - if err != nil { - b.logger.Error("writeWithClient: cannot get watcher for runtime", - "runtime_id", runtimeID, - "err", err, - ) - return nil, ErrStorageNotAvailable - } - - clientStates := watcher.getClientStates() + clientStates := b.runtimeWatcher.getClientStates() n := len(clientStates) if n == 0 { b.logger.Error("writeWithClient: no connected nodes for runtime", @@ -380,17 +304,7 @@ func (b *storageClientBackend) readWithClient( return nil, ErrStorageNotAvailable } - // Get watcher for runtime. - watcher, err := b.getStorageWatcher(runtimeID) - if err != nil { - b.logger.Error("readWithClient: cannot get watcher for runtime", - "runtime_id", runtimeID, - "err", err, - ) - return nil, ErrStorageNotAvailable - } - - clientStates := watcher.getClientStates() + clientStates := b.runtimeWatcher.getClientStates() n := len(clientStates) if n == 0 { b.logger.Error("readWithClient: no connected nodes for runtime", @@ -497,14 +411,9 @@ func (b *storageClientBackend) GetCheckpoint(ctx context.Context, request *api.G func (b *storageClientBackend) Cleanup() { b.cancelFn() - - b.runtimeWatchersLock.Lock() - defer b.runtimeWatchersLock.Unlock() - for _, watcher := range b.runtimeWatchers { - watcher.cleanup() - } + b.runtimeWatcher.cleanup() } func (b *storageClientBackend) Initialized() <-chan struct{} { - return b.initCh + return b.runtimeWatcher.initialized() } diff --git a/go/storage/client/init.go b/go/storage/client/init.go index c471a4436aa..cbb40409ae8 100644 --- a/go/storage/client/init.go +++ b/go/storage/client/init.go @@ -9,7 +9,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" - "github.com/oasislabs/oasis-core/go/common/crypto/signature" + "github.com/oasislabs/oasis-core/go/common" memorySigner "github.com/oasislabs/oasis-core/go/common/crypto/signature/signers/memory" cmnGrpc "github.com/oasislabs/oasis-core/go/common/grpc" "github.com/oasislabs/oasis-core/go/common/identity" @@ -41,12 +41,18 @@ var Flags = flag.NewFlagSet("", flag.ContinueOnError) // New creates a new storage client. func New( ctx context.Context, + namespace common.Namespace, ident *identity.Identity, schedulerBackend scheduler.Backend, registryBackend registry.Backend, ) (api.Backend, error) { logger := logging.GetLogger("storage/client") + runtimeID, err := namespace.ToRuntimeID() + if err != nil { + return nil, err + } + if addr := viper.GetString(CfgDebugClientAddress); addr != "" && cmdFlags.DebugDontBlameOasis() { logger.Warn("Storage client in debug mode, connecting to provided client", "address", CfgDebugClientAddress, @@ -81,32 +87,22 @@ func New( testRuntimeSigner := memorySigner.NewTestSigner(debugModeFakeRuntimeSeed) debugRuntimeID := testRuntimeSigner.Public() b := &storageClientBackend{ - ctx: ctx, - initCh: make(chan struct{}), - logger: logger, - debugRuntimeID: &debugRuntimeID, - scheduler: schedulerBackend, - registry: registryBackend, - runtimeWatchers: make(map[signature.PublicKey]storageWatcher), - identity: ident, + ctx: ctx, + logger: logger, + debugRuntimeID: &debugRuntimeID, } state := &clientState{ client: client, conn: conn, } - close(b.initCh) - b.runtimeWatchers[debugRuntimeID] = newDebugWatcher(state) + b.runtimeWatcher = newDebugWatcher(state) return b, nil } b := &storageClientBackend{ - ctx: ctx, - initCh: make(chan struct{}), - logger: logger, - scheduler: schedulerBackend, - registry: registryBackend, - runtimeWatchers: make(map[signature.PublicKey]storageWatcher), - identity: ident, + ctx: ctx, + logger: logger, + runtimeWatcher: newWatcher(ctx, runtimeID, ident, schedulerBackend, registryBackend), } b.haltCtx, b.cancelFn = context.WithCancel(ctx) diff --git a/go/storage/client/tests/tests.go b/go/storage/client/tests/tests.go index d84cbd02f6c..7a8949e7e57 100644 --- a/go/storage/client/tests/tests.go +++ b/go/storage/client/tests/tests.go @@ -49,10 +49,10 @@ func ClientWorkerTests( // Populate the registry with an entity and nodes. nodes := rt.Populate(t, consensus.Registry(), consensus, seed) - // Initialize storage client - client, err := storageClient.New(ctx, identity, consensus.Scheduler(), consensus.Registry()) - require.NoError(err, "NewStorageClient") - err = client.(api.ClientBackend).WatchRuntime(rt.Runtime.ID) + ns := runtimeIDToNamespace(t, rt.Runtime.ID) + + // Initialize storage client. + client, err := storageClient.New(ctx, ns, identity, consensus.Scheduler(), consensus.Registry()) require.NoError(err, "NewStorageClient") // Create mock root hash. @@ -60,7 +60,7 @@ func ClientWorkerTests( rootHash.FromBytes([]byte("non-existing")) root := api.Root{ - Namespace: runtimeIDToNamespace(t, rt.Runtime.ID), + Namespace: ns, Round: 0, Hash: rootHash, } diff --git a/go/storage/init.go b/go/storage/init.go index c386d97087c..24b4e063f45 100644 --- a/go/storage/init.go +++ b/go/storage/init.go @@ -59,7 +59,7 @@ func New( cfg.DB = filepath.Join(cfg.DB, database.DefaultFileName(cfg.Backend)) impl, err = database.New(cfg) case client.BackendName: - impl, err = client.New(ctx, identity, schedulerBackend, registryBackend) + impl, err = client.New(ctx, namespace, identity, schedulerBackend, registryBackend) default: err = fmt.Errorf("storage: unsupported backend: '%v'", cfg.Backend) } diff --git a/go/storage/metrics.go b/go/storage/metrics.go index 34253709d4b..53946d5f07d 100644 --- a/go/storage/metrics.go +++ b/go/storage/metrics.go @@ -5,13 +5,10 @@ import ( "sync" "time" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" "github.com/oasislabs/oasis-core/go/common" "github.com/oasislabs/oasis-core/go/common/crypto/hash" - "github.com/oasislabs/oasis-core/go/common/crypto/signature" "github.com/oasislabs/oasis-core/go/common/node" "github.com/oasislabs/oasis-core/go/storage/api" ) @@ -95,13 +92,6 @@ func (w *metricsWrapper) GetConnectedNodes() []*node.Node { return []*node.Node{} } -func (w *metricsWrapper) WatchRuntime(id signature.PublicKey) error { - if clientBackend, ok := w.Backend.(api.ClientBackend); ok { - return clientBackend.WatchRuntime(id) - } - return errors.New("storage/metricswrapper: backend not ClientBackend") -} - func (w *metricsWrapper) Apply(ctx context.Context, request *api.ApplyRequest) ([]*api.Receipt, error) { start := time.Now() receipts, err := w.Backend.Apply(ctx, request) diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index ed95edff9f9..e4a4d2d8562 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -208,24 +208,18 @@ func NewNode( node.ctx, node.ctxCancel = context.WithCancel(context.Background()) + var ns common.Namespace + runtimeID := commonNode.Runtime.ID() + copy(ns[:], runtimeID[:]) + // Create a new storage client that will be used for remote sync. - scl, err := client.New(node.ctx, node.commonNode.Identity, node.commonNode.Scheduler, node.commonNode.Registry) + scl, err := client.New(node.ctx, ns, node.commonNode.Identity, node.commonNode.Scheduler, node.commonNode.Registry) if err != nil { return nil, err } node.storageClient = scl.(storageApi.ClientBackend) - if err := node.storageClient.WatchRuntime(commonNode.Runtime.ID()); err != nil { - node.logger.Error("error watching storage runtime", - "err", err, - ) - return nil, err - } // Register prune handler. - var ns common.Namespace - runtimeID := commonNode.Runtime.ID() - copy(ns[:], runtimeID[:]) - commonNode.Runtime.History().Pruner().RegisterHandler(&pruneHandler{ logger: node.logger, node: node, From 067af88244fc99ddbdb26c98bdb89424f580bea0 Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Fri, 27 Dec 2019 17:32:19 +0100 Subject: [PATCH 5/5] go/storage: Make maximum in-memory storage cache size configurable --- .changelog/2494.feature.md | 5 +++++ go/oasis-node/cmd/debug/storage/export.go | 1 + go/storage/api/api.go | 8 ++++++-- go/storage/crashing_test.go | 5 +++-- go/storage/database/database_test.go | 1 + go/storage/init.go | 13 ++++++++++--- go/storage/mkvs/urkel/db/api/api.go | 3 +++ go/storage/mkvs/urkel/db/badger/badger.go | 3 +-- .../mkvs/urkel/interop/cmd/protocol_server.go | 1 + go/storage/mkvs/urkel/urkel_test.go | 17 ++++++++++------- 10 files changed, 41 insertions(+), 16 deletions(-) create mode 100644 .changelog/2494.feature.md diff --git a/.changelog/2494.feature.md b/.changelog/2494.feature.md new file mode 100644 index 00000000000..15ceb61af98 --- /dev/null +++ b/.changelog/2494.feature.md @@ -0,0 +1,5 @@ +Make maximum in-memory cache size for runtime storage configurable. + +Previously the value of 64mb was always used as the size of the in-memory storage cache. This adds a +new configuration parameter/command-line flag `--storage.max_cache_size` which configures the +maximum size of the in-memory runtime storage cache. diff --git a/go/oasis-node/cmd/debug/storage/export.go b/go/oasis-node/cmd/debug/storage/export.go index c2ad051b19c..7aa23d6c0d3 100644 --- a/go/oasis-node/cmd/debug/storage/export.go +++ b/go/oasis-node/cmd/debug/storage/export.go @@ -195,6 +195,7 @@ func newDirectStorageBackend(dataDir string, namespace common.Namespace) (storag DB: dataDir, ApplyLockLRUSlots: uint64(viper.GetInt(storage.CfgLRUSlots)), Namespace: namespace, + MaxCacheSize: int64(viper.GetSizeInBytes(storage.CfgMaxCacheSize)), } b := strings.ToLower(viper.GetString(storage.CfgBackend)) diff --git a/go/storage/api/api.go b/go/storage/api/api.go index 6352753dfe9..cd7777d0b13 100644 --- a/go/storage/api/api.go +++ b/go/storage/api/api.go @@ -90,13 +90,17 @@ type Config struct { // Namespace is the namespace contained within the database. Namespace common.Namespace + + // MaxCacheSize is the maximum in-memory cache size for the database. + MaxCacheSize int64 } // ToNodeDB converts from a Config to a node DB Config. func (cfg *Config) ToNodeDB() *nodedb.Config { return &nodedb.Config{ - DB: cfg.DB, - Namespace: cfg.Namespace, + DB: cfg.DB, + Namespace: cfg.Namespace, + MaxCacheSize: cfg.MaxCacheSize, } } diff --git a/go/storage/crashing_test.go b/go/storage/crashing_test.go index f9b8b28dbba..0b7e158be5b 100644 --- a/go/storage/crashing_test.go +++ b/go/storage/crashing_test.go @@ -24,8 +24,9 @@ func TestCrashingBackendDoNotInterfere(t *testing.T) { var ( cfg = api.Config{ - Backend: database.BackendNameBadgerDB, - Namespace: testNs, + Backend: database.BackendNameBadgerDB, + Namespace: testNs, + MaxCacheSize: 16 * 1024 * 1024, } err error ) diff --git a/go/storage/database/database_test.go b/go/storage/database/database_test.go index 7f683e0790c..97dd854089f 100644 --- a/go/storage/database/database_test.go +++ b/go/storage/database/database_test.go @@ -36,6 +36,7 @@ func doTestImpl(t *testing.T, backend string) { Backend: backend, ApplyLockLRUSlots: 100, Namespace: testNs, + MaxCacheSize: 16 * 1024 * 1024, } err error ) diff --git a/go/storage/init.go b/go/storage/init.go index 24b4e063f45..ff09b1f84dc 100644 --- a/go/storage/init.go +++ b/go/storage/init.go @@ -22,10 +22,15 @@ import ( const ( // CfgBackend configures the storage backend flag. - CfgBackend = "storage.backend" - cfgCrashEnabled = "storage.crash.enabled" + CfgBackend = "storage.backend" + // CfgLRUSlots configures the LRU apply lock slots. - CfgLRUSlots = "storage.root_cache.apply_lock_lru_slots" + CfgLRUSlots = "storage.root_cache.apply_lock_lru_slots" + + // CfgMaxCacheSize configures the maximum in-memory cache size. + CfgMaxCacheSize = "storage.max_cache_size" + + cfgCrashEnabled = "storage.crash.enabled" cfgInsecureSkipChecks = "storage.debug.insecure_skip_checks" ) @@ -48,6 +53,7 @@ func New( ApplyLockLRUSlots: uint64(viper.GetInt(CfgLRUSlots)), InsecureSkipChecks: viper.GetBool(cfgInsecureSkipChecks) && cmdFlags.DebugDontBlameOasis(), Namespace: namespace, + MaxCacheSize: int64(viper.GetSizeInBytes(CfgMaxCacheSize)), } var ( @@ -80,6 +86,7 @@ func init() { Flags.String(CfgBackend, database.BackendNameBadgerDB, "Storage backend") Flags.Bool(cfgCrashEnabled, false, "Enable the crashing storage wrapper") Flags.Int(CfgLRUSlots, 1000, "How many LRU slots to use for Apply call locks in the MKVS tree root cache") + Flags.String(CfgMaxCacheSize, "64mb", "Maximum in-memory cache size") Flags.Bool(cfgInsecureSkipChecks, false, "INSECURE: Skip known root checks") diff --git a/go/storage/mkvs/urkel/db/api/api.go b/go/storage/mkvs/urkel/db/api/api.go index 4911dcfca40..06d5183dacd 100644 --- a/go/storage/mkvs/urkel/db/api/api.go +++ b/go/storage/mkvs/urkel/db/api/api.go @@ -50,6 +50,9 @@ type Config struct { // Namespace is the namespace contained within the database. Namespace common.Namespace + + // MaxCacheSize is the maximum in-memory cache size for the database. + MaxCacheSize int64 } // NodeDB is the persistence layer used for persisting the in-memory tree. diff --git a/go/storage/mkvs/urkel/db/badger/badger.go b/go/storage/mkvs/urkel/db/badger/badger.go index e03f738057b..f88afbdeb9c 100644 --- a/go/storage/mkvs/urkel/db/badger/badger.go +++ b/go/storage/mkvs/urkel/db/badger/badger.go @@ -127,8 +127,7 @@ func New(cfg *api.Config) (api.NodeDB, error) { opts = opts.WithLogger(cmnBadger.NewLogAdapter(db.logger)) opts = opts.WithSyncWrites(!cfg.DebugNoFsync) opts = opts.WithCompression(options.None) - // Reduce cache size to 64 MiB as the default is 1 GiB. - opts = opts.WithMaxCacheSize(64 * 1024 * 1024) + opts = opts.WithMaxCacheSize(cfg.MaxCacheSize) var err error if db.db, err = badger.Open(opts); err != nil { diff --git a/go/storage/mkvs/urkel/interop/cmd/protocol_server.go b/go/storage/mkvs/urkel/interop/cmd/protocol_server.go index cc73f31c600..5aaac37af66 100644 --- a/go/storage/mkvs/urkel/interop/cmd/protocol_server.go +++ b/go/storage/mkvs/urkel/interop/cmd/protocol_server.go @@ -76,6 +76,7 @@ func doProtoServer(cmd *cobra.Command, args []string) { Signer: ident.NodeSigner, ApplyLockLRUSlots: 1, InsecureSkipChecks: false, + MaxCacheSize: 16 * 1024 * 1024, } backend, err := database.New(&storageCfg) if err != nil { diff --git a/go/storage/mkvs/urkel/urkel_test.go b/go/storage/mkvs/urkel/urkel_test.go index c4b838d9c93..a7ee84263c7 100644 --- a/go/storage/mkvs/urkel/urkel_test.go +++ b/go/storage/mkvs/urkel/urkel_test.go @@ -1887,6 +1887,7 @@ func TestUrkelBadgerBackend(t *testing.T) { DB: dir, DebugNoFsync: true, Namespace: testNs, + MaxCacheSize: 16 * 1024 * 1024, }) require.NoError(t, err, "New") @@ -1942,7 +1943,9 @@ func benchmarkInsertBatch(b *testing.B, numValues int, commit bool) { require.NoError(b, err, "TempDir") defer os.RemoveAll(dir) ndb, err := badgerDb.New(&db.Config{ - DB: dir, + DB: dir, + Namespace: testNs, + MaxCacheSize: 16 * 1024 * 1024, }) require.NoError(b, err, "New") tree := New(nil, ndb) @@ -1975,12 +1978,6 @@ func generateKeyValuePairs() ([][]byte, [][]byte) { return generateKeyValuePairsEx("", insertItems) } -func init() { - var ns hash.Hash - ns.FromBytes([]byte("oasis urkel test ns")) - copy(testNs[:], ns[:]) -} - func generateLongKeyValuePairs() ([][]byte, [][]byte) { keys := make([][]byte, len(longKey)) values := make([][]byte, len(longKey)) @@ -2013,3 +2010,9 @@ func generatePopulatedTree(t *testing.T, ndb db.NodeDB) ([][]byte, [][]byte, nod } return keys, values, root, tree } + +func init() { + var ns hash.Hash + ns.FromBytes([]byte("oasis urkel test ns")) + copy(testNs[:], ns[:]) +}