Skip to content

Commit

Permalink
Merge pull request #2494 from oasislabs/kostko/feature/storage-per-rt
Browse files Browse the repository at this point in the history
Make storage per-runtime
  • Loading branch information
kostko authored Dec 28, 2019
2 parents 609047e + 067af88 commit 95370a2
Show file tree
Hide file tree
Showing 36 changed files with 675 additions and 485 deletions.
11 changes: 11 additions & 0 deletions .changelog/2494.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Make storage per-runtime.

Previously there was a single storage backend used by `oasis-node` which required that a single
database supported multiple namespaces for the case when multiple runtimes were being used in a
single node.

This change simplifies the storage database backends by removing the need for backends to implement
multi-namespace support, reducing overhead and cleanly separating per-runtime state.

Due to this changing the internal database format, this breaks previous (compute node) deployments
with no way to do an automatic migration.
5 changes: 5 additions & 0 deletions .changelog/2494.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Make maximum in-memory cache size for runtime storage configurable.

Previously the value of 64mb was always used as the size of the in-memory storage cache. This adds a
new configuration parameter/command-line flag `--storage.max_cache_size` which configures the
maximum size of the in-memory runtime storage cache.
96 changes: 53 additions & 43 deletions go/oasis-node/cmd/debug/storage/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ import (
"github.com/spf13/viper"

"github.com/oasislabs/oasis-core/go/common"
"github.com/oasislabs/oasis-core/go/common/crypto/signature"
cmdCommon "github.com/oasislabs/oasis-core/go/oasis-node/cmd/common"
cmdConsensus "github.com/oasislabs/oasis-core/go/oasis-node/cmd/common/consensus"
registry "github.com/oasislabs/oasis-core/go/registry/api"
runtimeRegistry "github.com/oasislabs/oasis-core/go/runtime/registry"
"github.com/oasislabs/oasis-core/go/storage"
storageAPI "github.com/oasislabs/oasis-core/go/storage/api"
storageClient "github.com/oasislabs/oasis-core/go/storage/client"
Expand Down Expand Up @@ -66,61 +69,66 @@ func doExport(cmd *cobra.Command, args []string) {
// Load the genesis document.
genesisDoc := cmdConsensus.InitGenesis()

// For each storage root.
for runtimeID, rtg := range genesisDoc.RootHash.RuntimeStates {
logger.Info("fetching checkpoint write log",
"runtime_id", runtimeID,
)

if err := exportRuntime(dataDir, destDir, runtimeID, rtg); err != nil {
return
}
}

ok = true
}

func exportRuntime(dataDir, destDir string, id signature.PublicKey, rtg *registry.RuntimeGenesis) error {
dataDir = filepath.Join(dataDir, runtimeRegistry.RuntimesDir, id.String())

// Initialize the storage backend.
storageBackend, err := newDirectStorageBackend(dataDir)
var ns common.Namespace
_ = ns.UnmarshalBinary(id[:])

storageBackend, err := newDirectStorageBackend(dataDir, ns)
if err != nil {
logger.Error("failed to construct storage backend",
"err", err,
)
return
return err
}

logger.Info("waiting for storage backend initialization")
<-storageBackend.Initialized()
defer storageBackend.Cleanup()

// For each storage root.
for runtimeID, rtg := range genesisDoc.RootHash.RuntimeStates {
logger.Info("fetching checkpoint write log",
"runtime_id", runtimeID,
)

// Use RuntimeID for the Roothash namespace.
var ns common.Namespace
_ = ns.UnmarshalBinary(runtimeID[:])

// Get the checkpoint iterator.
root := storageAPI.Root{
Namespace: ns,
Round: rtg.Round,
Hash: rtg.StateRoot,
}
it, err := storageBackend.GetCheckpoint(context.Background(),
&storageAPI.GetCheckpointRequest{
Root: root,
},
)
if err != nil {
logger.Error("failed getting checkpoint",
"err", err,
"namespace", root.Namespace,
"round", root.Round,
"root", root.Hash,
)
return
}

fn := fmt.Sprintf("storage-dump-%v-%d.json",
runtimeID.String(),
rtg.Round,
// Get the checkpoint iterator.
root := storageAPI.Root{
Namespace: ns,
Round: rtg.Round,
Hash: rtg.StateRoot,
}
it, err := storageBackend.GetCheckpoint(context.Background(),
&storageAPI.GetCheckpointRequest{
Root: root,
},
)
if err != nil {
logger.Error("failed getting checkpoint",
"err", err,
"namespace", root.Namespace,
"round", root.Round,
"root", root.Hash,
)
fn = filepath.Join(destDir, fn)
if err = exportIterator(fn, &root, it); err != nil {
return
}
return err
}

ok = true
fn := fmt.Sprintf("storage-dump-%v-%d.json",
root.Namespace.String(),
root.Round,
)
fn = filepath.Join(destDir, fn)
return exportIterator(fn, &root, it)
}

func exportIterator(fn string, root *storageAPI.Root, it storageAPI.WriteLogIterator) error {
Expand Down Expand Up @@ -179,13 +187,15 @@ func exportIterator(fn string, root *storageAPI.Root, it storageAPI.WriteLogIter
}
}

func newDirectStorageBackend(dataDir string) (storageAPI.Backend, error) {
func newDirectStorageBackend(dataDir string, namespace common.Namespace) (storageAPI.Backend, error) {
// The right thing to do will be to use storage.New, but the backend config
// assumes that identity is valid, and we don't have one.
cfg := &storageAPI.Config{
Backend: strings.ToLower(viper.GetString(storage.CfgBackend)),
DB: dataDir,
ApplyLockLRUSlots: uint64(viper.GetInt(storage.CfgLRUSlots)),
Namespace: namespace,
MaxCacheSize: int64(viper.GetSizeInBytes(storage.CfgMaxCacheSize)),
}

b := strings.ToLower(viper.GetString(storage.CfgBackend))
Expand All @@ -194,7 +204,7 @@ func newDirectStorageBackend(dataDir string) (storageAPI.Backend, error) {
cfg.DB = filepath.Join(cfg.DB, storageDatabase.DefaultFileName(cfg.Backend))
return storageDatabase.New(cfg)
case storageClient.BackendName:
return storageClient.New(context.Background(), nil, nil, nil)
return storageClient.New(context.Background(), namespace, nil, nil, nil)
default:
return nil, fmt.Errorf("storage: unsupported backend: '%v'", cfg.Backend)
}
Expand Down
3 changes: 2 additions & 1 deletion go/oasis-node/cmd/debug/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/spf13/cobra"

"github.com/oasislabs/oasis-core/go/common"
"github.com/oasislabs/oasis-core/go/common/crypto/signature"
"github.com/oasislabs/oasis-core/go/common/logging"
cmdFlags "github.com/oasislabs/oasis-core/go/oasis-node/cmd/common/flags"
Expand Down Expand Up @@ -147,7 +148,7 @@ func doCheckRoots(cmd *cobra.Command, args []string) {
storageWorkerClient := storageWorkerAPI.NewStorageWorkerClient(conn)
defer conn.Close()

storageClient, err := storageClient.New(ctx, nil, nil, nil)
storageClient, err := storageClient.New(ctx, common.Namespace{}, nil, nil, nil)
if err != nil {
logger.Error("error while connecting to storage client",
"err", err,
Expand Down
15 changes: 2 additions & 13 deletions go/oasis-node/cmd/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ type Node struct {
Scheduler scheduler.Backend
Sentry sentryAPI.Backend
Staking stakingAPI.Backend
Storage storageAPI.Backend
IAS iasAPI.Endpoint

KeyManager keymanagerAPI.Backend
Expand Down Expand Up @@ -171,19 +170,11 @@ func (n *Node) RegistrationStopped() {
}

func (n *Node) initBackends() error {
dataDir := cmdCommon.DataDir()

var err error

if n.Sentry, err = sentry.New(n.Consensus); err != nil {
return err
}

if n.Storage, err = storage.New(n.svcMgr.Ctx, dataDir, n.Identity, n.Scheduler, n.Registry); err != nil {
return err
}
n.svcMgr.RegisterCleanupOnly(n.Storage, "storage backend")

if supplementarysanity.Enabled() {
if err = supplementarysanity.New(n.svcTmnt); err != nil {
return err
Expand All @@ -195,7 +186,6 @@ func (n *Node) initBackends() error {
scheduler.RegisterService(grpcSrv, n.Scheduler)
registryAPI.RegisterService(grpcSrv, n.Registry)
stakingAPI.RegisterService(grpcSrv, n.Staking)
storageAPI.RegisterService(grpcSrv, n.Storage)
consensusAPI.RegisterService(grpcSrv, n.Consensus)

cmdCommon.Logger().Debug("backends initialized")
Expand Down Expand Up @@ -237,7 +227,6 @@ func (n *Node) initWorkers(logger *logging.Logger) error {
dataDir,
compute.Enabled() || workerStorage.Enabled() || txnscheduler.Enabled() || merge.Enabled() || workerKeymanager.Enabled(),
n.Identity,
n.Storage,
n.RootHash,
n.Registry,
n.Scheduler,
Expand Down Expand Up @@ -683,11 +672,12 @@ func newNode(testNode bool) (*Node, error) {
logger.Info("starting Oasis node")

// Initialize the node's runtime registry.
node.RuntimeRegistry, err = runtimeRegistry.New(node.svcMgr.Ctx, cmdCommon.DataDir(), node.Consensus, node.Storage)
node.RuntimeRegistry, err = runtimeRegistry.New(node.svcMgr.Ctx, cmdCommon.DataDir(), node.Consensus, node.Identity)
if err != nil {
return nil, err
}
node.svcMgr.RegisterCleanupOnly(node.RuntimeRegistry, "runtime registry")
storageAPI.RegisterService(node.grpcInternal.Server(), node.RuntimeRegistry.StorageRouter())

// Initialize the key manager client service.
node.KeyManagerClient, err = keymanagerClient.New(node.KeyManager, node.Registry, node.Identity)
Expand All @@ -703,7 +693,6 @@ func newNode(testNode bool) (*Node, error) {
node.svcMgr.Ctx,
cmdCommon.DataDir(),
node.RootHash,
node.Storage,
node.Scheduler,
node.Registry,
node.svcTmnt,
Expand Down
17 changes: 10 additions & 7 deletions go/oasis-node/cmd/node/unsafe_reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/oasislabs/oasis-core/go/consensus/tendermint"
cmdCommon "github.com/oasislabs/oasis-core/go/oasis-node/cmd/common"
cmdFlags "github.com/oasislabs/oasis-core/go/oasis-node/cmd/common/flags"
"github.com/oasislabs/oasis-core/go/runtime/history"
runtimeRegistry "github.com/oasislabs/oasis-core/go/runtime/registry"
)

Expand All @@ -34,15 +35,17 @@ var (
Run: doUnsafeReset,
}

runtimesGlob = filepath.Join(runtimeRegistry.RuntimesDir, "*")

nodeStateGlobs = []string{
"abci-mux-state.*.db",
"persistent-store.*.db",
tendermint.StateDir,
runtimeRegistry.RuntimesDir,
filepath.Join(runtimesGlob, history.DbFilename),
}

localStorageGlob = "worker-local-storage.*.db"
mkvsDatabaseGlob = "mkvs_storage.*.db"
runtimeLocalStorageGlob = filepath.Join(runtimesGlob, "worker-local-storage.*.db")
runtimeMkvsDatabaseGlob = filepath.Join(runtimesGlob, "mkvs_storage.*.db")

logger = logging.GetLogger("cmd/unsafe-reset")
)
Expand Down Expand Up @@ -74,12 +77,12 @@ func doUnsafeReset(cmd *cobra.Command, args []string) {
if viper.GetBool(CfgPreserveLocalStorage) {
logger.Info("preserving untrusted local storage")
} else {
globs = append(globs, localStorageGlob)
globs = append(globs, filepath.Join(runtimesGlob, runtimeLocalStorageGlob))
}
if viper.GetBool(CfgPreserveMKVSDatabase) {
logger.Info("preserving MKVS database")
} else {
globs = append(globs, mkvsDatabaseGlob)
globs = append(globs, filepath.Join(runtimesGlob, runtimeMkvsDatabaseGlob))
}

// Enumerate the locations to purge.
Expand Down Expand Up @@ -126,7 +129,7 @@ func doUnsafeReset(cmd *cobra.Command, args []string) {
}

func init() {
unsafeResetFlags.Bool(CfgPreserveLocalStorage, false, "preserve untrusted local storage")
unsafeResetFlags.Bool(CfgPreserveMKVSDatabase, false, "preserve MKVS database")
unsafeResetFlags.Bool(CfgPreserveLocalStorage, false, "preserve per-runtime untrusted local storage")
unsafeResetFlags.Bool(CfgPreserveMKVSDatabase, false, "preserve per-runtime MKVS database")
_ = viper.BindPFlags(unsafeResetFlags)
}
6 changes: 3 additions & 3 deletions go/oasis-node/cmd/storage/benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ func doBenchmark(cmd *cobra.Command, args []string) { // nolint: gocyclo
// Disable expected root checks.
viper.Set("storage.debug.insecure_skip_checks", true)

storage, err := storage.New(context.Background(), dataDir, ident, nil, nil)
var ns common.Namespace

storage, err := storage.New(context.Background(), dataDir, ns, ident, nil, nil)
if err != nil {
logger.Error("failed to initialize storage",
"err", err,
Expand Down Expand Up @@ -112,8 +114,6 @@ func doBenchmark(cmd *cobra.Command, args []string) { // nolint: gocyclo
defer pprof.StopCPUProfile()
}

var ns common.Namespace

// Benchmark MKVS storage (single-insert).
for _, sz := range []int{
256, 512, 1024, 4096, 8192, 16384, 32768,
Expand Down
27 changes: 19 additions & 8 deletions go/oasis-node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,15 @@ func testBeacon(t *testing.T, node *testNode) {
}

func testStorage(t *testing.T, node *testNode) {
// Determine the current round. This is required so that we can commit into
// storage at the next (non-finalized) round.
blk, err := node.RootHash.GetLatestBlock(context.Background(), testRuntimeID, consensusAPI.HeightLatest)
require.NoError(t, err, "GetLatestBlock")
dataDir, err := ioutil.TempDir("", "oasis-storage-test_")
require.NoError(t, err, "TempDir")
defer os.RemoveAll(dataDir)

storage, err := storage.New(context.Background(), dataDir, testNamespace, node.Identity, node.Scheduler, node.Registry)
require.NoError(t, err, "storage.New")
defer storage.Cleanup()

storageTests.StorageImplementationTests(t, node.Storage, testNamespace, blk.Header.Round+1)
storageTests.StorageImplementationTests(t, storage, testNamespace, 0)
}

func testRegistry(t *testing.T, node *testNode) {
Expand Down Expand Up @@ -401,7 +404,7 @@ func testStakingClient(t *testing.T, node *testNode) {
}

func testRootHash(t *testing.T, node *testNode) {
roothashTests.RootHashImplementationTests(t, node.RootHash, node.Consensus, node.Storage)
roothashTests.RootHashImplementationTests(t, node.RootHash, node.Consensus, node.Identity)
}

func testComputeWorker(t *testing.T, node *testNode) {
Expand All @@ -419,7 +422,15 @@ func testTransactionSchedulerWorker(t *testing.T, node *testNode) {
timeSource := (node.Epochtime).(epochtime.SetableBackend)

require.NotNil(t, node.txnschedulerCommitteeNode)
txnschedulerWorkerTests.WorkerImplementationTests(t, node.TransactionSchedulerWorker, node.runtimeID, node.txnschedulerCommitteeNode, timeSource, node.RootHash, node.Storage)
txnschedulerWorkerTests.WorkerImplementationTests(
t,
node.TransactionSchedulerWorker,
node.runtimeID,
node.txnschedulerCommitteeNode,
timeSource,
node.RootHash,
node.RuntimeRegistry.StorageRouter(),
)
}

func testClient(t *testing.T, node *testNode) {
Expand All @@ -440,7 +451,7 @@ func testStorageClientWithNode(t *testing.T, node *testNode) {
for _, kv := range config {
viper.Set(kv.key, kv.value)
}
debugClient, err := storageClient.New(ctx, node.Identity, nil, nil)
debugClient, err := storageClient.New(ctx, testNamespace, node.Identity, nil, nil)
require.NoError(t, err, "NewDebugStorageClient")

// Determine the current round. This is required so that we can commit into
Expand Down
6 changes: 0 additions & 6 deletions go/oasis-test-runner/oasis/keymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/oasislabs/oasis-core/go/common/node"
registry "github.com/oasislabs/oasis-core/go/registry/api"
workerCommon "github.com/oasislabs/oasis-core/go/worker/common"
)

const (
Expand Down Expand Up @@ -65,11 +64,6 @@ func (km *Keymanager) ExportsPath() string {
return nodeExportsPath(km.dir)
}

// LocalStoragePath returns the path to the node's local storage.
func (km *Keymanager) LocalStoragePath() string {
return filepath.Join(km.dir.String(), workerCommon.LocalStorageFile)
}

func (km *Keymanager) provisionGenesis() error {
// Provision status and policy. We can only provision this here as we need
// a list of runtimes allowed to query the key manager.
Expand Down
Loading

0 comments on commit 95370a2

Please sign in to comment.