diff --git a/.changelog/2659.internal.md b/.changelog/2659.internal.md new file mode 100644 index 00000000000..ed8a291fb3c --- /dev/null +++ b/.changelog/2659.internal.md @@ -0,0 +1,16 @@ +go/storage: Refactor checkpointing interface + +Previously the way storage checkpoints were implemented had several +drawbacks, namely: + +- Since the checkpoint only streamed key/value pairs this prevented + correct tree reconstruction as tree nodes also include a Round field + which specifies the round at which a given tree node was created. + +- While old checkpoints were streamed in chunks and thus could be + resumed or streamed in parallel from multiple nodes, there was no + support for verifying the integrity of a single chunk. + +This change introduces an explicit checkpointing mechanism with a simple +file-based backend reference implementation. The same mechanism could +also be used in the future with Tendermint's app state sync proposal. diff --git a/go/common/cbor/cbor.go b/go/common/cbor/cbor.go index 03e17807c34..fa853f488e4 100644 --- a/go/common/cbor/cbor.go +++ b/go/common/cbor/cbor.go @@ -5,13 +5,22 @@ // to always have the same serialization. package cbor -import "github.com/fxamacker/cbor" +import ( + "io" + + "github.com/fxamacker/cbor" +) // RawMessage is a raw encoded CBOR value. It implements Marshaler and // Unmarshaler interfaces and can be used to delay CBOR decoding or // precompute a CBOR encoding. type RawMessage = cbor.RawMessage +var encOptions = cbor.EncOptions{ + Canonical: true, + TimeRFC3339: false, // Second granular unix timestamps +} + // FixSliceForSerde will convert `nil` to `[]byte` to work around serde // brain damage. func FixSliceForSerde(b []byte) []byte { @@ -23,10 +32,7 @@ func FixSliceForSerde(b []byte) []byte { // Marshal serializes a given type into a CBOR byte vector. func Marshal(src interface{}) []byte { - b, err := cbor.Marshal(src, cbor.EncOptions{ - Canonical: true, - TimeRFC3339: false, // Second granular unix timestamps - }) + b, err := cbor.Marshal(src, encOptions) if err != nil { panic("common/cbor: failed to marshal: " + err.Error()) } @@ -49,3 +55,13 @@ func MustUnmarshal(data []byte, dst interface{}) { panic(err) } } + +// NewEncoder creates a new CBOR encoder. +func NewEncoder(w io.Writer) *cbor.Encoder { + return cbor.NewEncoder(w, encOptions) +} + +// NewDecoder creates a new CBOR decoder. +func NewDecoder(r io.Reader) *cbor.Decoder { + return cbor.NewDecoder(r) +} diff --git a/go/common/crypto/hash/hash.go b/go/common/crypto/hash/hash.go index a9331f4ab1c..e91914c7a02 100644 --- a/go/common/crypto/hash/hash.go +++ b/go/common/crypto/hash/hash.go @@ -8,6 +8,7 @@ import ( "encoding/base64" "encoding/hex" "errors" + "hash" "github.com/oasislabs/oasis-core/go/common/cbor" ) @@ -107,3 +108,27 @@ func (h *Hash) IsEmpty() bool { func (h Hash) String() string { return hex.EncodeToString(h[:]) } + +// Builder is a hash builder that can be used to compute hashes iteratively. +type Builder struct { + hasher hash.Hash +} + +// Write adds more data to the running hash. +// It never returns an error. +func (b *Builder) Write(p []byte) (int, error) { + return b.hasher.Write(p) +} + +// Build returns the current hash. +// It does not change the underlying hash state. +func (b *Builder) Build() (h Hash) { + sum := b.hasher.Sum([]byte{}) + _ = h.UnmarshalBinary(sum[:]) + return +} + +// NewBuilder creates a new hash builder. +func NewBuilder() *Builder { + return &Builder{hasher: sha512.New512_256()} +} diff --git a/go/go.mod b/go/go.mod index 76c8a423e4c..fe414cc239d 100644 --- a/go/go.mod +++ b/go/go.mod @@ -29,6 +29,7 @@ require ( github.com/glycerine/goconvey v0.0.0-20190410193231-58a59202ab31 // indirect github.com/go-kit/kit v0.9.0 github.com/golang/protobuf v1.3.2 + github.com/golang/snappy v0.0.1 github.com/google/gofuzz v1.0.0 github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 diff --git a/go/oasis-node/cmd/debug/byzantine/storage.go b/go/oasis-node/cmd/debug/byzantine/storage.go index 6e09472f49a..c5ec75c0677 100644 --- a/go/oasis-node/cmd/debug/byzantine/storage.go +++ b/go/oasis-node/cmd/debug/byzantine/storage.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "crypto/x509" + "io" "github.com/pkg/errors" "google.golang.org/grpc" @@ -19,6 +20,7 @@ import ( "github.com/oasislabs/oasis-core/go/common/node" scheduler "github.com/oasislabs/oasis-core/go/scheduler/api" storage "github.com/oasislabs/oasis-core/go/storage/api" + "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/checkpoint" "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/syncer" ) @@ -119,8 +121,12 @@ func (hns *honestNodeStorage) GetDiff(ctx context.Context, request *storage.GetD return hns.client.GetDiff(ctx, request) } -func (hns *honestNodeStorage) GetCheckpoint(ctx context.Context, request *storage.GetCheckpointRequest) (storage.WriteLogIterator, error) { - return hns.client.GetCheckpoint(ctx, request) +func (hns *honestNodeStorage) GetCheckpoints(ctx context.Context, request *checkpoint.GetCheckpointsRequest) ([]*checkpoint.Metadata, error) { + return hns.client.GetCheckpoints(ctx, request) +} + +func (hns *honestNodeStorage) GetCheckpointChunk(ctx context.Context, chunk *checkpoint.ChunkMetadata, w io.Writer) error { + return hns.client.GetCheckpointChunk(ctx, chunk, w) } func (hns *honestNodeStorage) Cleanup() { diff --git a/go/oasis-node/cmd/debug/storage/export.go b/go/oasis-node/cmd/debug/storage/export.go index b582f71b94f..e945fc53fcc 100644 --- a/go/oasis-node/cmd/debug/storage/export.go +++ b/go/oasis-node/cmd/debug/storage/export.go @@ -22,6 +22,7 @@ import ( storageAPI "github.com/oasislabs/oasis-core/go/storage/api" storageClient "github.com/oasislabs/oasis-core/go/storage/client" storageDatabase "github.com/oasislabs/oasis-core/go/storage/database" + "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel" ) const cfgExportDir = "storage.export.dir" @@ -98,26 +99,14 @@ func exportRuntime(dataDir, destDir string, id common.Namespace, rtg *registry.R <-storageBackend.Initialized() defer storageBackend.Cleanup() - // Get the checkpoint iterator. root := storageAPI.Root{ Namespace: id, Round: rtg.Round, Hash: rtg.StateRoot, } - it, err := storageBackend.GetCheckpoint(context.Background(), - &storageAPI.GetCheckpointRequest{ - Root: root, - }, - ) - if err != nil { - logger.Error("failed getting checkpoint", - "err", err, - "namespace", root.Namespace, - "round", root.Round, - "root", root.Hash, - ) - return err - } + tree := urkel.NewWithRoot(storageBackend, nil, root) + it := tree.NewIterator(context.Background(), urkel.IteratorPrefetch(10_000)) + defer it.Close() fn := fmt.Sprintf("storage-dump-%v-%d.json", root.Namespace.String(), @@ -127,7 +116,7 @@ func exportRuntime(dataDir, destDir string, id common.Namespace, rtg *registry.R return exportIterator(fn, &root, it) } -func exportIterator(fn string, root *storageAPI.Root, it storageAPI.WriteLogIterator) error { +func exportIterator(fn string, root *storageAPI.Root, it urkel.Iterator) error { // Create the dump file, and initialize a JSON stream encoder. f, err := os.Create(fn) if err != nil { @@ -152,35 +141,18 @@ func exportIterator(fn string, root *storageAPI.Root, it storageAPI.WriteLogIter return err } - // Dump the write log. - for { - more, err := it.Next() - if err != nil { - logger.Error("failed to fetch next item from write log iterator", - "err", err, - ) - return err - } - - if !more { - return nil - } - - val, err := it.Value() - if err != nil { - logger.Error("failed to get value from write log iterator", - "err", err, - ) - return err - } - - if err = enc.Encode([][]byte{val.Key, val.Value}); err != nil { + // Dump the tree. + for it.Rewind(); it.Valid(); it.Next() { + key, value := it.Key(), it.Value() + if err = enc.Encode([][]byte{key, value}); err != nil { logger.Error("failed to encode write log entry", "err", err, ) return err } } + + return nil } func newDirectStorageBackend(dataDir string, namespace common.Namespace) (storageAPI.Backend, error) { diff --git a/go/oasis-node/cmd/registry/runtime/runtime.go b/go/oasis-node/cmd/registry/runtime/runtime.go index 5ba473c45f7..5d17d3e3053 100644 --- a/go/oasis-node/cmd/registry/runtime/runtime.go +++ b/go/oasis-node/cmd/registry/runtime/runtime.go @@ -62,6 +62,9 @@ const ( CfgStorageMaxApplyOps = "runtime.storage.max_apply_ops" CfgStorageMaxMergeRoots = "runtime.storage.max_merge_roots" CfgStorageMaxMergeOps = "runtime.storage.max_merge_ops" + CfgStorageCheckpointInterval = "runtime.storage.checkpoint_interval" + CfgStorageCheckpointNumKept = "runtime.storage.checkpoint_num_kept" + CfgStorageCheckpointChunkSize = "runtime.storage.checkpoint_chunk_size" // Transaction scheduler flags. CfgTxnSchedulerGroupSize = "runtime.txn_scheduler.group_size" @@ -385,6 +388,9 @@ func runtimeFromFlags() (*registry.Runtime, signature.Signer, error) { MaxApplyOps: viper.GetUint64(CfgStorageMaxApplyOps), MaxMergeRoots: viper.GetUint64(CfgStorageMaxMergeRoots), MaxMergeOps: viper.GetUint64(CfgStorageMaxMergeOps), + CheckpointInterval: viper.GetUint64(CfgStorageCheckpointInterval), + CheckpointNumKept: viper.GetUint64(CfgStorageCheckpointNumKept), + CheckpointChunkSize: viper.GetUint64(CfgStorageCheckpointChunkSize), }, } if teeHardware == node.TEEHardwareIntelSGX { @@ -518,6 +524,9 @@ func init() { runtimeFlags.Uint64(CfgStorageMaxApplyOps, 2, "Maximum number of apply operations in a batch") runtimeFlags.Uint64(CfgStorageMaxMergeRoots, 8, "Maximum number of merge roots") runtimeFlags.Uint64(CfgStorageMaxMergeOps, 2, "Maximum number of merge operations in a batch") + runtimeFlags.Uint64(CfgStorageCheckpointInterval, 0, "Storage checkpoint interval (in rounds)") + runtimeFlags.Uint64(CfgStorageCheckpointNumKept, 0, "Number of storage checkpoints to keep") + runtimeFlags.Uint64(CfgStorageCheckpointChunkSize, 0, "Storage checkpoint chunk size") // Init Admission policy flags. runtimeFlags.String(CfgAdmissionPolicy, "", "What type of node admission policy to have") diff --git a/go/oasis-node/node_test.go b/go/oasis-node/node_test.go index 9e289a1dce2..543fcfac688 100644 --- a/go/oasis-node/node_test.go +++ b/go/oasis-node/node_test.go @@ -37,6 +37,7 @@ import ( staking "github.com/oasislabs/oasis-core/go/staking/api" stakingTests "github.com/oasislabs/oasis-core/go/staking/tests" "github.com/oasislabs/oasis-core/go/storage" + storageAPI "github.com/oasislabs/oasis-core/go/storage/api" storageClient "github.com/oasislabs/oasis-core/go/storage/client" storageClientTests "github.com/oasislabs/oasis-core/go/storage/client/tests" storageTests "github.com/oasislabs/oasis-core/go/storage/tests" @@ -388,11 +389,13 @@ func testStorage(t *testing.T, node *testNode) { require.NoError(t, err, "TempDir") defer os.RemoveAll(dataDir) - storage, err := storage.New(context.Background(), dataDir, testRuntimeID, node.Identity, node.Scheduler, node.Registry) + backend, err := storage.New(context.Background(), dataDir, testRuntimeID, node.Identity, node.Scheduler, node.Registry) require.NoError(t, err, "storage.New") - defer storage.Cleanup() + defer backend.Cleanup() + // We are always testing a local storage backend here. + localBackend := backend.(storageAPI.LocalBackend) - storageTests.StorageImplementationTests(t, storage, testRuntimeID, 0) + storageTests.StorageImplementationTests(t, localBackend, backend, testRuntimeID, 0) } func testRegistry(t *testing.T, node *testNode) { @@ -462,6 +465,11 @@ func testClient(t *testing.T, node *testNode) { func testStorageClientWithNode(t *testing.T, node *testNode) { ctx := context.Background() + // Get the local storage backend (the one that the client is connecting to). + rt, err := node.RuntimeRegistry.GetRuntime(testRuntimeID) + require.NoError(t, err, "GetRuntime") + localBackend := rt.Storage().(storageAPI.LocalBackend) + client, err := storageClient.NewStatic(ctx, testRuntimeID, node.Identity, node.Registry, node.Identity.NodeSigner.Public()) require.NoError(t, err, "NewStatic") @@ -470,7 +478,7 @@ func testStorageClientWithNode(t *testing.T, node *testNode) { blk, err := node.RootHash.GetLatestBlock(ctx, testRuntimeID, consensusAPI.HeightLatest) require.NoError(t, err, "GetLatestBlock") - storageTests.StorageImplementationTests(t, client, testRuntimeID, blk.Header.Round+1) + storageTests.StorageImplementationTests(t, localBackend, client, testRuntimeID, blk.Header.Round+1) } func testStorageClientWithoutNode(t *testing.T, node *testNode) { diff --git a/go/oasis-test-runner/oasis/args.go b/go/oasis-test-runner/oasis/args.go index ecffba34cb6..00842b5f816 100644 --- a/go/oasis-test-runner/oasis/args.go +++ b/go/oasis-test-runner/oasis/args.go @@ -5,6 +5,7 @@ import ( "fmt" "path/filepath" "strconv" + "time" "github.com/oasislabs/oasis-core/go/common" commonGrpc "github.com/oasislabs/oasis-core/go/common/grpc" @@ -320,6 +321,13 @@ func (args *argBuilder) workerStorageDebugIgnoreApplies(ignore bool) *argBuilder return args } +func (args *argBuilder) workerStorageCheckpointCheckInterval(interval time.Duration) *argBuilder { + if interval > 0 { + args.vec = append(args.vec, "--"+workerStorage.CfgWorkerCheckpointCheckInterval, interval.String()) + } + return args +} + func (args *argBuilder) workerTxnschedulerCheckTxEnabled() *argBuilder { args.vec = append(args.vec, "--"+txnscheduler.CfgCheckTxEnabled) return args diff --git a/go/oasis-test-runner/oasis/cli/registry.go b/go/oasis-test-runner/oasis/cli/registry.go index 2737fa9c6b9..0f3acf37a3e 100644 --- a/go/oasis-test-runner/oasis/cli/registry.go +++ b/go/oasis-test-runner/oasis/cli/registry.go @@ -77,6 +77,9 @@ func (r *RegistryHelpers) GenerateRegisterRuntimeTx( "--"+cmdRegRt.CfgStorageMaxApplyOps, strconv.FormatUint(runtime.Storage.MaxApplyOps, 10), "--"+cmdRegRt.CfgStorageMaxMergeRoots, strconv.FormatUint(runtime.Storage.MaxMergeRoots, 10), "--"+cmdRegRt.CfgStorageMaxMergeOps, strconv.FormatUint(runtime.Storage.MaxMergeOps, 10), + "--"+cmdRegRt.CfgStorageCheckpointInterval, strconv.FormatUint(runtime.Storage.CheckpointInterval, 10), + "--"+cmdRegRt.CfgStorageCheckpointNumKept, strconv.FormatUint(runtime.Storage.CheckpointNumKept, 10), + "--"+cmdRegRt.CfgStorageCheckpointChunkSize, strconv.FormatUint(runtime.Storage.CheckpointChunkSize, 10), "--"+cmdRegRt.CfgTxnSchedulerGroupSize, strconv.FormatUint(runtime.TxnScheduler.GroupSize, 10), "--"+cmdRegRt.CfgTxnSchedulerAlgorithm, runtime.TxnScheduler.Algorithm, "--"+cmdRegRt.CfgTxnSchedulerBatchFlushTimeout, runtime.TxnScheduler.BatchFlushTimeout.String(), diff --git a/go/oasis-test-runner/oasis/controller.go b/go/oasis-test-runner/oasis/controller.go index ff837058ed5..b9bbbbf0703 100644 --- a/go/oasis-test-runner/oasis/controller.go +++ b/go/oasis-test-runner/oasis/controller.go @@ -9,6 +9,7 @@ import ( registry "github.com/oasislabs/oasis-core/go/registry/api" runtimeClient "github.com/oasislabs/oasis-core/go/runtime/client/api" staking "github.com/oasislabs/oasis-core/go/staking/api" + storage "github.com/oasislabs/oasis-core/go/storage/api" ) // Controller is a network controller that connects to one of the @@ -21,6 +22,7 @@ type Controller struct { Staking staking.Backend Registry registry.Backend RuntimeClient runtimeClient.RuntimeClient + Storage storage.Backend conn *grpc.ClientConn } @@ -49,6 +51,7 @@ func NewController(socketPath string) (*Controller, error) { Staking: staking.NewStakingClient(conn), Registry: registry.NewRegistryClient(conn), RuntimeClient: runtimeClient.NewRuntimeClient(conn), + Storage: storage.NewStorageClient(conn), conn: conn, }, nil diff --git a/go/oasis-test-runner/oasis/fixture.go b/go/oasis-test-runner/oasis/fixture.go index 31372bc8389..6f4c07c04d1 100644 --- a/go/oasis-test-runner/oasis/fixture.go +++ b/go/oasis-test-runner/oasis/fixture.go @@ -2,6 +2,7 @@ package oasis import ( "fmt" + "time" "github.com/oasislabs/oasis-core/go/common" "github.com/oasislabs/oasis-core/go/common/node" @@ -264,7 +265,8 @@ type StorageWorkerFixture struct { // nolint: maligned Sentries []int `json:"sentries,omitempty"` - IgnoreApplies bool `json:"ignore_applies,omitempty"` + CheckpointCheckInterval time.Duration `json:"checkpoint_check_interval,omitempty"` + IgnoreApplies bool `json:"ignore_applies,omitempty"` } // Create instantiates the storage worker described by the fixture. @@ -280,10 +282,11 @@ func (f *StorageWorkerFixture) Create(net *Network) (*Storage, error) { LogWatcherHandlerFactories: f.LogWatcherHandlerFactories, SubmissionGasPrice: f.SubmissionGasPrice, }, - Backend: f.Backend, - Entity: entity, - SentryIndices: f.Sentries, - IgnoreApplies: f.IgnoreApplies, + Backend: f.Backend, + Entity: entity, + SentryIndices: f.Sentries, + CheckpointCheckInterval: f.CheckpointCheckInterval, + IgnoreApplies: f.IgnoreApplies, }) } diff --git a/go/oasis-test-runner/oasis/runtime.go b/go/oasis-test-runner/oasis/runtime.go index a389add6296..6acea7711d4 100644 --- a/go/oasis-test-runner/oasis/runtime.go +++ b/go/oasis-test-runner/oasis/runtime.go @@ -139,6 +139,9 @@ func (net *Network) NewRuntime(cfg *RuntimeCfg) (*Runtime, error) { "--" + cmdRegRt.CfgStorageMaxApplyOps, strconv.FormatUint(cfg.Storage.MaxApplyOps, 10), "--" + cmdRegRt.CfgStorageMaxMergeRoots, strconv.FormatUint(cfg.Storage.MaxMergeRoots, 10), "--" + cmdRegRt.CfgStorageMaxMergeOps, strconv.FormatUint(cfg.Storage.MaxMergeOps, 10), + "--" + cmdRegRt.CfgStorageCheckpointInterval, strconv.FormatUint(cfg.Storage.CheckpointInterval, 10), + "--" + cmdRegRt.CfgStorageCheckpointNumKept, strconv.FormatUint(cfg.Storage.CheckpointNumKept, 10), + "--" + cmdRegRt.CfgStorageCheckpointChunkSize, strconv.FormatUint(cfg.Storage.CheckpointChunkSize, 10), }...) if cfg.GenesisState != "" { diff --git a/go/oasis-test-runner/oasis/storage.go b/go/oasis-test-runner/oasis/storage.go index a2d2d02ced6..ca1b4a324a9 100644 --- a/go/oasis-test-runner/oasis/storage.go +++ b/go/oasis-test-runner/oasis/storage.go @@ -3,6 +3,7 @@ package oasis import ( "fmt" "path/filepath" + "time" "github.com/pkg/errors" @@ -19,9 +20,11 @@ type Storage struct { // nolint: maligned sentryIndices []int - backend string - entity *Entity - ignoreApplies bool + backend string + entity *Entity + + ignoreApplies bool + checkpointCheckInterval time.Duration tmAddress string consensusPort uint16 @@ -36,7 +39,9 @@ type StorageCfg struct { // nolint: maligned SentryIndices []int Backend string Entity *Entity - IgnoreApplies bool + + IgnoreApplies bool + CheckpointCheckInterval time.Duration } // IdentityKeyPath returns the path to the node's identity key. @@ -97,6 +102,7 @@ func (worker *Storage) startNode() error { workerP2pPort(worker.p2pPort). workerStorageEnabled(). workerStorageDebugIgnoreApplies(worker.ignoreApplies). + workerStorageCheckpointCheckInterval(worker.checkpointCheckInterval). appendNetwork(worker.net). appendSeedNodes(worker.net). appendEntity(worker.entity) @@ -161,14 +167,15 @@ func (net *Network) NewStorage(cfg *StorageCfg) (*Storage, error) { logWatcherHandlerFactories: cfg.LogWatcherHandlerFactories, submissionGasPrice: cfg.SubmissionGasPrice, }, - backend: cfg.Backend, - entity: cfg.Entity, - sentryIndices: cfg.SentryIndices, - ignoreApplies: cfg.IgnoreApplies, - tmAddress: crypto.PublicKeyToTendermint(&publicKey).Address().String(), - consensusPort: net.nextNodePort, - clientPort: net.nextNodePort + 1, - p2pPort: net.nextNodePort + 2, + backend: cfg.Backend, + entity: cfg.Entity, + sentryIndices: cfg.SentryIndices, + ignoreApplies: cfg.IgnoreApplies, + checkpointCheckInterval: cfg.CheckpointCheckInterval, + tmAddress: crypto.PublicKeyToTendermint(&publicKey).Address().String(), + consensusPort: net.nextNodePort, + clientPort: net.nextNodePort + 1, + p2pPort: net.nextNodePort + 2, } worker.doStartNode = worker.startNode diff --git a/go/oasis-test-runner/scenario/e2e/storage_sync.go b/go/oasis-test-runner/scenario/e2e/storage_sync.go index 913848f1a00..20fe5e7263f 100644 --- a/go/oasis-test-runner/scenario/e2e/storage_sync.go +++ b/go/oasis-test-runner/scenario/e2e/storage_sync.go @@ -1,13 +1,17 @@ package e2e import ( - "github.com/pkg/errors" + "context" + "fmt" + "time" "github.com/oasislabs/oasis-core/go/oasis-test-runner/env" "github.com/oasislabs/oasis-core/go/oasis-test-runner/oasis" "github.com/oasislabs/oasis-core/go/oasis-test-runner/oasis/cli" "github.com/oasislabs/oasis-core/go/oasis-test-runner/scenario" + runtimeClient "github.com/oasislabs/oasis-core/go/runtime/client/api" "github.com/oasislabs/oasis-core/go/storage/database" + "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/checkpoint" ) var ( @@ -31,6 +35,12 @@ func (sc *storageSyncImpl) Fixture() (*oasis.NetworkFixture, error) { return nil, err } + // Make the first storage worker check for checkpoints more often. + f.StorageWorkers[0].CheckpointCheckInterval = 1 * time.Second + // Configure runtime for storage checkpointing. + f.Runtimes[1].Storage.CheckpointInterval = 10 + f.Runtimes[1].Storage.CheckpointNumKept = 1 + f.Runtimes[1].Storage.CheckpointChunkSize = 1024 * 1024 // Provision another storage node and make it ignore all applies. f.StorageWorkers = append(f.StorageWorkers, oasis.StorageWorkerFixture{ Backend: database.BackendNameBadgerDB, @@ -62,7 +72,57 @@ func (sc *storageSyncImpl) Run(childEnv *env.Env) error { sc.basicImpl.net.Runtimes()[1].ID().String(), } if err = cli.RunSubCommand(childEnv, sc.logger, "storage-check-roots", sc.basicImpl.net.Config().NodeBinary, args); err != nil { - return errors.Wrap(err, "scenario/e2e/storage_sync: root check failed after sync") + return fmt.Errorf("root check failed after sync: %w", err) + } + + // Generate some more rounds to trigger checkpointing. Up to this point there have been ~9 + // rounds, we create 15 more rounds to bring this up to ~24. Checkpoints are every 10 rounds so + // this leaves some space for any unintended epoch transitions. + ctx := context.Background() + for i := 0; i < 15; i++ { + sc.logger.Info("submitting transaction to runtime", + "seq", i, + ) + if err = sc.submitRuntimeTx(ctx, runtimeID, "checkpoint", fmt.Sprintf("my cp %d", i)); err != nil { + return err + } + } + + // Make sure that the first storage node created checkpoints. + ctrl, err := oasis.NewController(sc.net.StorageWorkers()[0].SocketPath()) + if err != nil { + return fmt.Errorf("failed to connect with the first storage node: %w", err) + } + + cps, err := ctrl.Storage.GetCheckpoints(ctx, &checkpoint.GetCheckpointsRequest{Version: 1, Namespace: runtimeID}) + if err != nil { + return fmt.Errorf("failed to get checkpoints: %w", err) + } + + blk, err := ctrl.RuntimeClient.GetBlock(ctx, &runtimeClient.GetBlockRequest{RuntimeID: runtimeID, Round: 20}) + if err != nil { + return fmt.Errorf("failed to get block for round 20: %w", err) + } + + // There should be two checkpoints (for the two roots at round 20), due to garbage collection + // pruning checkpoints at round 10. + if len(cps) != 2 { + return fmt.Errorf("incorrect number of checkpoints (expected: 2 got: %d)", len(cps)) + } + for _, cp := range cps { + if cp.Root.Round != blk.Header.Round { + return fmt.Errorf("checkpoint at incorrect round (expected: %d got: %d)", blk.Header.Round, cp.Root.Round) + } + var found bool + for _, root := range blk.Header.StorageRoots() { + if root.Equal(&cp.Root) { + found = true + break + } + } + if !found { + return fmt.Errorf("checkpoint for unexpected root %s", cp.Root) + } } return nil diff --git a/go/registry/api/api.go b/go/registry/api/api.go index 35b41203b9e..3aa6c88b4c7 100644 --- a/go/registry/api/api.go +++ b/go/registry/api/api.go @@ -1038,36 +1038,9 @@ func VerifyRegisterRuntimeArgs( return nil, fmt.Errorf("%w: transaction scheduler group to small", ErrInvalidArgument) } - // Ensure there is at least one member of the storage group. - if rt.Storage.GroupSize == 0 { - logger.Error("RegisterRuntime: storage group too small", - "runtime", rt, - ) - return nil, fmt.Errorf("%w: storage group too small", ErrInvalidArgument) - } - if rt.Storage.MaxApplyWriteLogEntries < 10 { - logger.Error("RegisterRuntime: storage MaxApplyWriteLogEntries parameter too small", - "runtime", rt, - ) - return nil, fmt.Errorf("%w: storage MaxApplyWriteLogEntries parameter too small", ErrInvalidArgument) - } - if rt.Storage.MaxApplyOps < 2 { - logger.Error("RegisterRuntime: storage MaxApplyOps parameter too small", - "runtime", rt, - ) - return nil, fmt.Errorf("%w: storage MaxApplyOps parameter too small", ErrInvalidArgument) - } - if rt.Storage.MaxMergeRoots == 0 { - logger.Error("RegisterRuntime: storage MaxMergeRoots parameter too small", - "runtime", rt, - ) - return nil, fmt.Errorf("%w: storage MaxMergeRoots parameter too small", ErrInvalidArgument) - } - if rt.Storage.MaxMergeOps < 2 { - logger.Error("RegisterRuntime: storage MaxMergeOps parameter too small", - "runtime", rt, - ) - return nil, fmt.Errorf("%w: storage MaxMergeOps parameter too small", ErrInvalidArgument) + // Ensure storage parameters have sensible values. + if err := VerifyRegisterRuntimeStorageArgs(&rt, logger); err != nil { + return nil, err } if rt.ID.IsKeyManager() { @@ -1121,6 +1094,68 @@ func VerifyRegisterRuntimeArgs( return &rt, nil } +// VerifyRegisterRuntimeStorageArgs verifies the runtime's storage parameters +func VerifyRegisterRuntimeStorageArgs(rt *Runtime, logger *logging.Logger) error { + params := rt.Storage + + // Ensure there is at least one member of the storage group. + if params.GroupSize == 0 { + logger.Error("RegisterRuntime: storage group too small", + "runtime", rt, + ) + return fmt.Errorf("%w: storage group too small", ErrInvalidArgument) + } + + // Ensure limit parameters have sensible values. + if params.MaxApplyWriteLogEntries < 10 { + logger.Error("RegisterRuntime: storage MaxApplyWriteLogEntries parameter too small", + "runtime", rt, + ) + return fmt.Errorf("%w: storage MaxApplyWriteLogEntries parameter too small", ErrInvalidArgument) + } + if params.MaxApplyOps < 2 { + logger.Error("RegisterRuntime: storage MaxApplyOps parameter too small", + "runtime", rt, + ) + return fmt.Errorf("%w: storage MaxApplyOps parameter too small", ErrInvalidArgument) + } + if params.MaxMergeRoots == 0 { + logger.Error("RegisterRuntime: storage MaxMergeRoots parameter too small", + "runtime", rt, + ) + return fmt.Errorf("%w: storage MaxMergeRoots parameter too small", ErrInvalidArgument) + } + if params.MaxMergeOps < 2 { + logger.Error("RegisterRuntime: storage MaxMergeOps parameter too small", + "runtime", rt, + ) + return fmt.Errorf("%w: storage MaxMergeOps parameter too small", ErrInvalidArgument) + } + + // Verify storage checkpointing configuration if enabled. + if params.CheckpointInterval > 0 { + if params.CheckpointInterval < 10 { + logger.Error("RegisterRuntime: storage CheckpointInterval parameter too small", + "runtime", rt, + ) + return fmt.Errorf("%w: storage CheckpointInterval parameter too small", ErrInvalidArgument) + } + if params.CheckpointNumKept == 0 { + logger.Error("RegisterRuntime: storage CheckpointNumKept parameter too small", + "runtime", rt, + ) + return fmt.Errorf("%w: storage CheckpointNumKept parameter too small", ErrInvalidArgument) + } + if params.CheckpointChunkSize < 1024*1024 { + logger.Error("RegisterRuntime: storage CheckpointChunkSize parameter too small", + "runtime", rt, + ) + return fmt.Errorf("%w: storage CheckpointChunkSize parameter too small", ErrInvalidArgument) + } + } + return nil +} + // VerifyRegisterComputeRuntimeArgs verifies compute runtime-specific arguments for RegisterRuntime. func VerifyRegisterComputeRuntimeArgs(logger *logging.Logger, rt *Runtime, runtimeLookup RuntimeLookup) error { // Check runtime's key manager, if key manager ID is set. diff --git a/go/registry/api/runtime.go b/go/registry/api/runtime.go index 5ef93891906..750397aec9c 100644 --- a/go/registry/api/runtime.go +++ b/go/registry/api/runtime.go @@ -146,6 +146,15 @@ type StorageParameters struct { // MaxApplyOps configures the maximum number of merge operations in a batch. MaxMergeOps uint64 `json:"max_merge_ops"` + + // CheckpointInterval is the expected runtime state checkpoint interval (in rounds). + CheckpointInterval uint64 `json:"checkpoint_interval"` + + // CheckpointNumKept is the expected minimum number of checkpoints to keep. + CheckpointNumKept uint64 `json:"checkpoint_num_kept"` + + // CheckpointChunkSize is the chunk size parameter for checkpoint creation. + CheckpointChunkSize uint64 `json:"checkpoint_chunk_size"` } // AnyNodeRuntimeAdmissionPolicy allows any node to register. diff --git a/go/roothash/api/block/header.go b/go/roothash/api/block/header.go index 37a26a01239..e1bd1689a19 100644 --- a/go/roothash/api/block/header.go +++ b/go/roothash/api/block/header.go @@ -105,6 +105,21 @@ func (h *Header) EncodedHash() hash.Hash { return hh } +// StorageRoots returns the storage roots contained in this header. +func (h *Header) StorageRoots() (roots []storage.Root) { + for _, rootHash := range []hash.Hash{ + h.IORoot, + h.StateRoot, + } { + roots = append(roots, storage.Root{ + Namespace: h.Namespace, + Round: h.Round, + Hash: rootHash, + }) + } + return +} + // RootsForStorageReceipt gets the merkle roots that must be part of // a storage receipt. func (h *Header) RootsForStorageReceipt() []hash.Hash { diff --git a/go/runtime/history/prune.go b/go/runtime/history/prune.go index 1bbae5ec7b7..ab2bfd558eb 100644 --- a/go/runtime/history/prune.go +++ b/go/runtime/history/prune.go @@ -39,10 +39,6 @@ type Pruner interface { // Prune purges unneeded history, given the latest round. Prune(ctx context.Context, latestRound uint64) error - // NextCheckpoint returns the round which should be fetched as a - // checkpoint, given the latest round and last fetched checkpoint. - NextCheckpoint(latestRound, lastCheckpoint uint64) (uint64, error) - // RegisterHandler registers a prune handler. RegisterHandler(handler PruneHandler) } @@ -74,10 +70,6 @@ func (p *nonePruner) Prune(ctx context.Context, latestRound uint64) error { return nil } -func (p *nonePruner) NextCheckpoint(latestRound, lastCheckpoint uint64) (uint64, error) { - return 0, nil -} - // NewNonePruner creates a new pruner that never prunes anything. func NewNonePruner() PrunerFactory { return func(db *DB) (Pruner, error) { @@ -160,13 +152,6 @@ func (p *keepLastPruner) Prune(ctx context.Context, latestRound uint64) error { }) } -func (p *keepLastPruner) NextCheckpoint(latestRound, lastCheckpoint uint64) (uint64, error) { - if latestRound < p.numKept { - return 0, nil - } - return latestRound - p.numKept + 1, nil -} - // NewKeepLastPruner creates a pruner that keeps the last configured // number of rounds. func NewKeepLastPruner(numKept uint64) PrunerFactory { diff --git a/go/runtime/registry/storage_router.go b/go/runtime/registry/storage_router.go index bc2fa2e8881..a8d10d1b83c 100644 --- a/go/runtime/registry/storage_router.go +++ b/go/runtime/registry/storage_router.go @@ -2,9 +2,11 @@ package registry import ( "context" + "io" "github.com/oasislabs/oasis-core/go/common" "github.com/oasislabs/oasis-core/go/storage/api" + "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/checkpoint" ) var _ api.Backend = (*storageRouter)(nil) @@ -81,12 +83,20 @@ func (sr *storageRouter) GetDiff(ctx context.Context, request *api.GetDiffReques return rt.Storage().GetDiff(ctx, request) } -func (sr *storageRouter) GetCheckpoint(ctx context.Context, request *api.GetCheckpointRequest) (api.WriteLogIterator, error) { - rt, err := sr.getRuntime(request.Root.Namespace) +func (sr *storageRouter) GetCheckpoints(ctx context.Context, request *checkpoint.GetCheckpointsRequest) ([]*checkpoint.Metadata, error) { + rt, err := sr.getRuntime(request.Namespace) if err != nil { return nil, err } - return rt.Storage().GetCheckpoint(ctx, request) + return rt.Storage().GetCheckpoints(ctx, request) +} + +func (sr *storageRouter) GetCheckpointChunk(ctx context.Context, chunk *checkpoint.ChunkMetadata, w io.Writer) error { + rt, err := sr.getRuntime(chunk.Root.Namespace) + if err != nil { + return err + } + return rt.Storage().GetCheckpointChunk(ctx, chunk, w) } func (sr *storageRouter) Cleanup() { diff --git a/go/storage/api/api.go b/go/storage/api/api.go index 97b416ec4ed..4e04012695c 100644 --- a/go/storage/api/api.go +++ b/go/storage/api/api.go @@ -9,6 +9,7 @@ import ( "github.com/oasislabs/oasis-core/go/common/crypto/signature" "github.com/oasislabs/oasis-core/go/common/errors" "github.com/oasislabs/oasis-core/go/common/node" + "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/checkpoint" nodedb "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/db/api" urkelNode "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/node" "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/syncer" @@ -20,7 +21,7 @@ const ( ModuleName = "storage" // WriteLogIteratorChunkSize defines the chunk size of write log entries - // for GetCheckpoint and GetDiff methods. + // for the GetDiff method. WriteLogIteratorChunkSize = 10 ) @@ -264,8 +265,7 @@ type SyncOptions struct { Limit uint64 `json:"limit"` } -// SyncChunk is a chunk of write log entries sent during GetDiff and -// GetCheckpoint operations. +// SyncChunk is a chunk of write log entries sent during GetDiff operation. type SyncChunk struct { Final bool `json:"final"` WriteLog WriteLog `json:"writelog"` @@ -278,15 +278,10 @@ type GetDiffRequest struct { Options SyncOptions `json:"options"` } -// GetCheckpointRequest is a GetCheckpoint request. -type GetCheckpointRequest struct { - Root Root `json:"root"` - Options SyncOptions `json:"options"` -} - // Backend is a storage backend implementation. type Backend interface { syncer.ReadSyncer + checkpoint.ChunkProvider // Apply applies a set of operations against the MKVS. The root may refer // to a nil node, in which case a new root will be created. @@ -320,10 +315,6 @@ type Backend interface { // to get from the first given root to the second one. GetDiff(ctx context.Context, request *GetDiffRequest) (WriteLogIterator, error) - // GetCheckpoint returns an iterator of write log entries in the provided - // root. - GetCheckpoint(ctx context.Context, request *GetCheckpointRequest) (WriteLogIterator, error) - // Cleanup closes/cleans up the storage backend. Cleanup() @@ -348,6 +339,9 @@ type LocalBackend interface { // // Returns the number of pruned nodes. Prune(ctx context.Context, namespace common.Namespace, round uint64) (int, error) + + // Checkpointer returns the checkpoint creator/restorer for this storage backend. + Checkpointer() checkpoint.CreateRestorer } // ClientBackend is a storage client backend implementation. diff --git a/go/storage/api/grpc.go b/go/storage/api/grpc.go index 1787ec16ca9..d78f05c7f32 100644 --- a/go/storage/api/grpc.go +++ b/go/storage/api/grpc.go @@ -10,6 +10,7 @@ import ( "github.com/oasislabs/oasis-core/go/common" cmnGrpc "github.com/oasislabs/oasis-core/go/common/grpc" + "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/checkpoint" "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/writelog" ) @@ -90,14 +91,27 @@ var ( return true }) - // MethodGetCheckpoint is the GetCheckpoint method. - MethodGetCheckpoint = ServiceName.NewMethod("GetCheckpoint", GetCheckpointRequest{}). + // MethodGetCheckpoints is the GetCheckpoints method. + MethodGetCheckpoints = ServiceName.NewMethod("GetCheckpoints", checkpoint.GetCheckpointsRequest{}). WithNamespaceExtractor(func(req interface{}) (common.Namespace, error) { - r, ok := req.(*GetCheckpointRequest) + r, ok := req.(*checkpoint.GetCheckpointsRequest) if !ok { return common.Namespace{}, errInvalidRequestType } - return r.Root.Namespace, nil + return r.Namespace, nil + }). + WithAccessControl(func(req interface{}) bool { + return true + }) + + // MethodGetCheckpointChunk is the GetCheckpointChunk method. + MethodGetCheckpointChunk = ServiceName.NewMethod("GetCheckpointChunk", checkpoint.ChunkMetadata{}). + WithNamespaceExtractor(func(req interface{}) (common.Namespace, error) { + cm, ok := req.(*checkpoint.ChunkMetadata) + if !ok { + return common.Namespace{}, errInvalidRequestType + } + return cm.Root.Namespace, nil }). WithAccessControl(func(req interface{}) bool { return true @@ -136,6 +150,10 @@ var ( MethodName: MethodMergeBatch.ShortName(), Handler: handlerMergeBatch, }, + { + MethodName: MethodGetCheckpoints.ShortName(), + Handler: handlerGetCheckpoints, + }, }, Streams: []grpc.StreamDesc{ { @@ -144,8 +162,8 @@ var ( ServerStreams: true, }, { - StreamName: MethodGetCheckpoint.ShortName(), - Handler: handlerGetCheckpoint, + StreamName: MethodGetCheckpointChunk.ShortName(), + Handler: handlerGetCheckpointChunk, ServerStreams: true, }, }, @@ -313,6 +331,29 @@ func handlerMergeBatch( // nolint: golint return interceptor(ctx, &req, info, handler) } +func handlerGetCheckpoints( // nolint: golint + srv interface{}, + ctx context.Context, + dec func(interface{}) error, + interceptor grpc.UnaryServerInterceptor, +) (interface{}, error) { + var req checkpoint.GetCheckpointsRequest + if err := dec(&req); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(Backend).GetCheckpoints(ctx, &req) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: MethodGetCheckpoints.FullName(), + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(Backend).GetCheckpoints(ctx, req.(*checkpoint.GetCheckpointsRequest)) + } + return interceptor(ctx, &req, info, handler) +} + func sendWriteLogIterator(it WriteLogIterator, opts *SyncOptions, stream grpc.ServerStream) error { var totalSent uint64 skipping := true @@ -390,19 +431,25 @@ func handlerGetDiff(srv interface{}, stream grpc.ServerStream) error { return sendWriteLogIterator(it, &req.Options, stream) } -func handlerGetCheckpoint(srv interface{}, stream grpc.ServerStream) error { - var req GetCheckpointRequest - if err := stream.RecvMsg(&req); err != nil { - return err - } +type grpcStreamWriter struct { + grpc.ServerStream +} - ctx := stream.Context() - it, err := srv.(Backend).GetCheckpoint(ctx, &req) +func (c *grpcStreamWriter) Write(p []byte) (int, error) { + err := c.ServerStream.SendMsg(p) if err != nil { + return 0, err + } + return len(p), nil +} + +func handlerGetCheckpointChunk(srv interface{}, stream grpc.ServerStream) error { + var md checkpoint.ChunkMetadata + if err := stream.RecvMsg(&md); err != nil { return err } - return sendWriteLogIterator(it, &req.Options, stream) + return srv.(Backend).GetCheckpointChunk(stream.Context(), &md, &grpcStreamWriter{stream}) } // RegisterService registers a new sentry service with the given gRPC server. @@ -470,6 +517,14 @@ func (c *storageClient) MergeBatch(ctx context.Context, request *MergeBatchReque return rsp, nil } +func (c *storageClient) GetCheckpoints(ctx context.Context, request *checkpoint.GetCheckpointsRequest) ([]*checkpoint.Metadata, error) { + var rsp []*checkpoint.Metadata + if err := c.conn.Invoke(ctx, MethodGetCheckpoints.FullName(), request, &rsp); err != nil { + return nil, err + } + return rsp, nil +} + func receiveWriteLogIterator(ctx context.Context, stream grpc.ClientStream) WriteLogIterator { pipe := writelog.NewPipeIterator(ctx) @@ -517,19 +572,32 @@ func (c *storageClient) GetDiff(ctx context.Context, request *GetDiffRequest) (W return receiveWriteLogIterator(ctx, stream), nil } -func (c *storageClient) GetCheckpoint(ctx context.Context, request *GetCheckpointRequest) (WriteLogIterator, error) { - stream, err := c.conn.NewStream(ctx, &serviceDesc.Streams[1], MethodGetCheckpoint.FullName()) +func (c *storageClient) GetCheckpointChunk(ctx context.Context, chunk *checkpoint.ChunkMetadata, w io.Writer) error { + stream, err := c.conn.NewStream(ctx, &serviceDesc.Streams[1], MethodGetCheckpointChunk.FullName()) if err != nil { - return nil, err + return err } - if err = stream.SendMsg(request); err != nil { - return nil, err + if err = stream.SendMsg(chunk); err != nil { + return err } if err = stream.CloseSend(); err != nil { - return nil, err + return err } - return receiveWriteLogIterator(ctx, stream), nil + for { + var part []byte + switch stream.RecvMsg(&part) { + case nil: + case io.EOF: + return nil + default: + return err + } + + if _, err = w.Write(part); err != nil { + return err + } + } } func (c *storageClient) Cleanup() { diff --git a/go/storage/client/client.go b/go/storage/client/client.go index 109970a6ba9..87c5348149a 100644 --- a/go/storage/client/client.go +++ b/go/storage/client/client.go @@ -6,6 +6,7 @@ import ( "context" cryptorand "crypto/rand" "errors" + "io" "math/rand" "time" @@ -20,6 +21,7 @@ import ( "github.com/oasislabs/oasis-core/go/common/node" "github.com/oasislabs/oasis-core/go/runtime/committee" "github.com/oasislabs/oasis-core/go/storage/api" + "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/checkpoint" ) var ( @@ -394,18 +396,29 @@ func (b *storageClientBackend) GetDiff(ctx context.Context, request *api.GetDiff return rsp.(api.WriteLogIterator), nil } -func (b *storageClientBackend) GetCheckpoint(ctx context.Context, request *api.GetCheckpointRequest) (api.WriteLogIterator, error) { +func (b *storageClientBackend) GetCheckpoints(ctx context.Context, request *checkpoint.GetCheckpointsRequest) ([]*checkpoint.Metadata, error) { rsp, err := b.readWithClient( ctx, - request.Root.Namespace, + request.Namespace, func(ctx context.Context, c api.Backend) (interface{}, error) { - return c.GetCheckpoint(ctx, request) + return c.GetCheckpoints(ctx, request) }, ) if err != nil { return nil, err } - return rsp.(api.WriteLogIterator), nil + return rsp.([]*checkpoint.Metadata), nil +} + +func (b *storageClientBackend) GetCheckpointChunk(ctx context.Context, chunk *checkpoint.ChunkMetadata, w io.Writer) error { + _, err := b.readWithClient( + ctx, + chunk.Root.Namespace, + func(ctx context.Context, c api.Backend) (interface{}, error) { + return nil, c.GetCheckpointChunk(ctx, chunk, w) + }, + ) + return err } func (b *storageClientBackend) Cleanup() { diff --git a/go/storage/crashing_test.go b/go/storage/crashing_test.go index 93376efac95..24c2975ee61 100644 --- a/go/storage/crashing_test.go +++ b/go/storage/crashing_test.go @@ -40,6 +40,7 @@ func TestCrashingBackendDoNotInterfere(t *testing.T) { realBackend, err := database.New(&cfg) require.NoError(err, "database.New") backend := newCrashingWrapper(realBackend) + localBackend := realBackend.(api.LocalBackend) crash.Config(map[string]float64{ "storage.write.before": 0.0, @@ -48,5 +49,5 @@ func TestCrashingBackendDoNotInterfere(t *testing.T) { "storage.read.after": 0.0, }) - tests.StorageImplementationTests(t, backend, testNs, 0) + tests.StorageImplementationTests(t, localBackend, backend, testNs, 0) } diff --git a/go/storage/database/database.go b/go/storage/database/database.go index 674759fae45..f215422c828 100644 --- a/go/storage/database/database.go +++ b/go/storage/database/database.go @@ -5,11 +5,14 @@ import ( "context" "errors" "fmt" + "io" + "path/filepath" "github.com/oasislabs/oasis-core/go/common" "github.com/oasislabs/oasis-core/go/common/crypto/hash" "github.com/oasislabs/oasis-core/go/common/crypto/signature" "github.com/oasislabs/oasis-core/go/storage/api" + "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/checkpoint" nodedb "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/db/api" badgerNodedb "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/db/badger" ) @@ -20,6 +23,8 @@ const ( // DBFileBadgerDB is the default BadgerDB backing store filename. DBFileBadgerDB = "mkvs_storage.badger.db" + + checkpointDir = "checkpoints" ) // DefaultFileName returns the default database filename for the specified @@ -34,8 +39,9 @@ func DefaultFileName(backend string) string { } type databaseBackend struct { - nodedb nodedb.NodeDB - rootCache *api.RootCache + nodedb nodedb.NodeDB + checkpointer checkpoint.CreateRestorer + rootCache *api.RootCache signer signature.Signer initCh chan struct{} @@ -65,15 +71,28 @@ func New(cfg *api.Config) (api.Backend, error) { return nil, fmt.Errorf("storage/database: failed to create root cache: %w", err) } - // Satisfy the interface.... + // Satisfy the interface. initCh := make(chan struct{}) close(initCh) + // Create the checkpointer. + creator, err := checkpoint.NewFileCreator(filepath.Join(cfg.DB, checkpointDir), ndb) + if err != nil { + ndb.Close() + return nil, fmt.Errorf("storage/database: failed to create checkpoint creator: %w", err) + } + restorer, err := checkpoint.NewRestorer(ndb) + if err != nil { + ndb.Close() + return nil, fmt.Errorf("storage/database: failed to create checkpoint restorer: %w", err) + } + return &databaseBackend{ - nodedb: ndb, - rootCache: rootCache, - signer: cfg.Signer, - initCh: initCh, + nodedb: ndb, + checkpointer: checkpoint.NewCreateRestorer(creator, restorer), + rootCache: rootCache, + signer: cfg.Signer, + initCh: initCh, }, nil } @@ -175,8 +194,12 @@ func (ba *databaseBackend) GetDiff(ctx context.Context, request *api.GetDiffRequ return ba.nodedb.GetWriteLog(ctx, request.StartRoot, request.EndRoot) } -func (ba *databaseBackend) GetCheckpoint(ctx context.Context, request *api.GetCheckpointRequest) (api.WriteLogIterator, error) { - return ba.nodedb.GetCheckpoint(ctx, request.Root) +func (ba *databaseBackend) GetCheckpoints(ctx context.Context, request *checkpoint.GetCheckpointsRequest) ([]*checkpoint.Metadata, error) { + return ba.checkpointer.GetCheckpoints(ctx, request) +} + +func (ba *databaseBackend) GetCheckpointChunk(ctx context.Context, chunk *checkpoint.ChunkMetadata, w io.Writer) error { + return ba.checkpointer.GetCheckpointChunk(ctx, chunk, w) } func (ba *databaseBackend) HasRoot(root api.Root) bool { @@ -190,3 +213,7 @@ func (ba *databaseBackend) Finalize(ctx context.Context, namespace common.Namesp func (ba *databaseBackend) Prune(ctx context.Context, namespace common.Namespace, round uint64) (int, error) { return ba.nodedb.Prune(ctx, namespace, round) } + +func (ba *databaseBackend) Checkpointer() checkpoint.CreateRestorer { + return ba.checkpointer +} diff --git a/go/storage/database/database_test.go b/go/storage/database/database_test.go index b8e76c39d90..4798e00fa64 100644 --- a/go/storage/database/database_test.go +++ b/go/storage/database/database_test.go @@ -51,6 +51,7 @@ func doTestImpl(t *testing.T, backend string) { impl, err := New(&cfg) require.NoError(err, "New()") defer impl.Cleanup() + localBackend := impl.(api.LocalBackend) - tests.StorageImplementationTests(t, impl, testNs, 0) + tests.StorageImplementationTests(t, localBackend, impl, testNs, 0) } diff --git a/go/storage/metrics.go b/go/storage/metrics.go index 53946d5f07d..a5d14c66609 100644 --- a/go/storage/metrics.go +++ b/go/storage/metrics.go @@ -11,6 +11,7 @@ import ( "github.com/oasislabs/oasis-core/go/common/crypto/hash" "github.com/oasislabs/oasis-core/go/common/node" "github.com/oasislabs/oasis-core/go/storage/api" + "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/checkpoint" ) var ( @@ -237,6 +238,14 @@ func (w *metricsWrapper) Prune(ctx context.Context, namespace common.Namespace, return pruned, err } +func (w *metricsWrapper) Checkpointer() checkpoint.CreateRestorer { + localBackend, ok := w.Backend.(api.LocalBackend) + if !ok { + return nil + } + return localBackend.Checkpointer() +} + func newMetricsWrapper(base api.Backend) api.Backend { metricsOnce.Do(func() { prometheus.MustRegister(storageCollectors...) diff --git a/go/storage/mkvs/urkel/cache.go b/go/storage/mkvs/urkel/cache.go index 696c433a921..d3e3e4ae991 100644 --- a/go/storage/mkvs/urkel/cache.go +++ b/go/storage/mkvs/urkel/cache.go @@ -419,7 +419,7 @@ func (c *cache) remoteSync(ctx context.Context, ptr *node.Pointer, fetcher readS if c.persistEverythingFromSyncer { // NOTE: This is a dummy batch, we assume that the node database backend is a // cache-only backend and does not care about correct values. - batch = c.db.NewBatch(c.syncRoot.Namespace, c.syncRoot.Round, c.syncRoot) + batch = c.db.NewBatch(c.syncRoot, false) dbSubtree = batch.MaybeStartSubtree(nil, 0, subtree) } diff --git a/go/storage/mkvs/urkel/checkpoint/checkpoint.go b/go/storage/mkvs/urkel/checkpoint/checkpoint.go new file mode 100644 index 00000000000..e0fb274b4d1 --- /dev/null +++ b/go/storage/mkvs/urkel/checkpoint/checkpoint.go @@ -0,0 +1,117 @@ +// Package checkpoint provides methods for creating MKVS checkpoints. +package checkpoint + +import ( + "context" + "io" + + "github.com/oasislabs/oasis-core/go/common" + "github.com/oasislabs/oasis-core/go/common/crypto/hash" + "github.com/oasislabs/oasis-core/go/common/errors" + "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/node" +) + +const moduleName = "storage/mkvs/checkpoint" + +var ( + // ErrCheckpointNotFound is the error when a checkpoint is not found. + ErrCheckpointNotFound = errors.New(moduleName, 1, "checkpoint: not found") + + // ErrChunkNotFound is the error when a chunk is not found. + ErrChunkNotFound = errors.New(moduleName, 2, "checkpoint: chunk not found") +) + +// ChunkProvider is a chunk provider. +type ChunkProvider interface { + // GetCheckpoints returns a list of checkpoint metadata for all known checkpoints. + GetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) ([]*Metadata, error) + + // GetCheckpointChunk fetches a specific chunk from an existing chekpoint. + GetCheckpointChunk(ctx context.Context, chunk *ChunkMetadata, w io.Writer) error +} + +// GetCheckpointsRequest is a GetCheckpoints request. +type GetCheckpointsRequest struct { + Version uint16 `json:"version"` + Namespace common.Namespace `json:"namespace"` +} + +// Creator is a checkpoint creator. +type Creator interface { + ChunkProvider + + // CreateCheckpoint creates a new checkpoint at the given root. + CreateCheckpoint(ctx context.Context, root node.Root, chunkSize uint64) (*Metadata, error) + + // GetCheckpoint retrieves checkpoint metadata for a specific checkpoint. + GetCheckpoint(ctx context.Context, request *GetCheckpointRequest) (*Metadata, error) + + // DeleteCheckpoint deletes a specific checkpoint. + DeleteCheckpoint(ctx context.Context, request *DeleteCheckpointRequest) error +} + +// GetCheckpointRequest is a GetCheckpoint request. +type GetCheckpointRequest struct { + Version uint16 `json:"version"` + Root node.Root `json:"root"` +} + +// DeleteCheckpointRequest is a DeleteCheckpoint request. +type DeleteCheckpointRequest struct { + Version uint16 `json:"version"` + Root node.Root `json:"root"` +} + +// Restorer is a checkpoint restorer. +type Restorer interface { + // RestoreChunk restores the given chunk into the underlying node database. + RestoreChunk(ctx context.Context, chunk *ChunkMetadata, r io.Reader) error +} + +// CreateRestorer is an interface that combines the checkpoint creator and restorer. +type CreateRestorer interface { + Creator + Restorer +} + +type createRestorer struct { + Creator + Restorer +} + +// NewCreateRestorer combines a checkpoint creator and a restorer. +func NewCreateRestorer(creator Creator, restorer Restorer) CreateRestorer { + return &createRestorer{ + Creator: creator, + Restorer: restorer, + } +} + +// ChunkMetadata is chunk metadata. +type ChunkMetadata struct { + Version uint16 `json:"version"` + Root node.Root `json:"root"` + Index uint64 `json:"index"` + Digest hash.Hash `json:"digest"` +} + +// Metadata is checkpoint metadata. +type Metadata struct { + Version uint16 `json:"version"` + Root node.Root `json:"root"` + Chunks []hash.Hash `json:"chunks"` +} + +// GetChunkMetadata returns the chunk metadata for the corresponding chunk. +func (c Metadata) GetChunkMetadata(idx uint64) (*ChunkMetadata, error) { + if idx >= uint64(len(c.Chunks)) { + return nil, ErrChunkNotFound + } + + return &ChunkMetadata{ + Version: c.Version, + Root: c.Root, + Index: idx, + Digest: c.Chunks[int(idx)], + }, nil +} diff --git a/go/storage/mkvs/urkel/checkpoint/checkpoint_test.go b/go/storage/mkvs/urkel/checkpoint/checkpoint_test.go new file mode 100644 index 00000000000..e317a02f2f6 --- /dev/null +++ b/go/storage/mkvs/urkel/checkpoint/checkpoint_test.go @@ -0,0 +1,175 @@ +package checkpoint + +import ( + "bytes" + "context" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/oasislabs/oasis-core/go/common" + "github.com/oasislabs/oasis-core/go/common/crypto/hash" + "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel" + db "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/db/api" + badgerDb "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/db/badger" + "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/node" +) + +var testNs = common.NewTestNamespaceFromSeed([]byte("oasis urkel checkpoint test ns"), 0) + +func TestFileCheckpointCreator(t *testing.T) { + require := require.New(t) + + // Generate some data. + dir, err := ioutil.TempDir("", "mkvs.checkpoint") + require.NoError(err, "TempDir") + defer os.RemoveAll(dir) + + ndb, err := badgerDb.New(&db.Config{ + DB: filepath.Join(dir, "db"), + Namespace: testNs, + MaxCacheSize: 16 * 1024 * 1024, + }) + require.NoError(err, "New") + + ctx := context.Background() + tree := urkel.New(nil, ndb) + for i := 0; i < 1000; i++ { + err = tree.Insert(ctx, []byte(strconv.Itoa(i)), []byte(strconv.Itoa(i))) + require.NoError(err, "Insert") + } + + _, rootHash, err := tree.Commit(ctx, testNs, 0) + require.NoError(err, "Commit") + root := node.Root{ + Namespace: testNs, + Round: 0, + Hash: rootHash, + } + + // Create a file-based checkpoint creator. + fc, err := NewFileCreator(filepath.Join(dir, "checkpoints"), ndb) + require.NoError(err, "NewFileCreator") + + // There should be no checkpoints before one is created. + cps, err := fc.GetCheckpoints(ctx, &GetCheckpointsRequest{}) + require.NoError(err, "GetCheckpoints") + require.Len(cps, 0) + + _, err = fc.GetCheckpoint(ctx, &GetCheckpointRequest{Root: root}) + require.Error(err, "GetCheckpoint should fail with non-existent checkpoint") + + // Create a checkpoint and check that it has been created correctly. + cp, err := fc.CreateCheckpoint(ctx, root, 16*1024) + require.NoError(err, "CreateCheckpoint") + require.EqualValues(1, cp.Version, "version should be correct") + require.EqualValues(root, cp.Root, "checkpoint root should be correct") + require.Len(cp.Chunks, 3, "there should be the correct number of chunks") + + var expectedChunks []hash.Hash + for _, hh := range []string{ + "620f318c245858b351602a8b21e708663b03cd2befd982210ffbaa3c56bf9358", + "37d38a95492038df4afee65b5b91bf8499f522dea346c0e553e03d3333bff394", + "df608e9821dc8d248a0f0d0ff4cb998d51f329767dfc8a7520c15e38c47be1e9", + } { + var h hash.Hash + _ = h.UnmarshalHex(hh) + expectedChunks = append(expectedChunks, h) + } + require.EqualValues(expectedChunks, cp.Chunks, "chunk hashes should be correct") + + // There should now be one checkpoint. + cps, err = fc.GetCheckpoints(ctx, &GetCheckpointsRequest{Version: 1}) + require.NoError(err, "GetCheckpoints") + require.Len(cps, 1, "there should be one checkpoint") + require.Equal(cp, cps[0], "checkpoint returned by GetCheckpoint should be correct") + + gcp, err := fc.GetCheckpoint(ctx, &GetCheckpointRequest{Version: 1, Root: root}) + require.NoError(err, "GetCheckpoint") + require.Equal(cp, gcp) + + // Try re-creating the same checkpoint again and make sure we get the same metadata. + existingCp, err := fc.CreateCheckpoint(ctx, root, 16*1024) + require.NoError(err, "CreateCheckpoint on an existing root should work") + require.Equal(cp, existingCp, "created checkpoint should be correct") + + // We should be able to retrieve chunks. + _, err = cp.GetChunkMetadata(999) + require.Error(err, "GetChunkMetadata should fail for unknown chunk") + chunk0, err := cp.GetChunkMetadata(0) + require.NoError(err, "GetChunkMetadata") + + var buf bytes.Buffer + err = fc.GetCheckpointChunk(ctx, chunk0, &buf) + require.NoError(err, "GetChunk should work") + + // Fetching a non-existent chunk should fail. + invalidChunk := *chunk0 + invalidChunk.Index = 999 + err = fc.GetCheckpointChunk(ctx, &invalidChunk, &buf) + require.Error(err, "GetChunk on a non-existent chunk should fail") + + // Create a fresh node database to restore into. + ndb2, err := badgerDb.New(&db.Config{ + DB: filepath.Join(dir, "db2"), + Namespace: testNs, + MaxCacheSize: 16 * 1024 * 1024, + }) + require.NoError(err, "New") + + // Try to restore some chunks. + rs, err := NewRestorer(ndb2) + require.NoError(err, "NewRestorer") + for i := 0; i < len(cp.Chunks); i++ { + var cm *ChunkMetadata + cm, err = cp.GetChunkMetadata(uint64(i)) + require.NoError(err, "GetChunkMetadata") + + buf.Reset() + err = fc.GetCheckpointChunk(ctx, cm, &buf) + require.NoError(err, "GetChunk") + err = rs.RestoreChunk(ctx, cm, &buf) + require.NoError(err, "RestoreChunk") + } + err = ndb2.Finalize(ctx, root.Namespace, root.Round, []hash.Hash{root.Hash}) + require.NoError(err, "Finalize") + + // Verify that everything has been restored. + tree = urkel.NewWithRoot(nil, ndb2, root) + for i := 0; i < 1000; i++ { + var value []byte + value, err = tree.Get(ctx, []byte(strconv.Itoa(i))) + require.NoError(err, "Get") + require.Equal([]byte(strconv.Itoa(i)), value) + } + + // Deleting a checkpoint should work. + err = fc.DeleteCheckpoint(ctx, &DeleteCheckpointRequest{Version: 1, Root: root}) + require.NoError(err, "DeleteCheckpoint") + + // There should now be no checkpoints. + cps, err = fc.GetCheckpoints(ctx, &GetCheckpointsRequest{Version: 1}) + require.NoError(err, "GetCheckpoints") + require.Len(cps, 0, "there should be no checkpoints") + + _, err = fc.GetCheckpoint(ctx, &GetCheckpointRequest{Version: 1, Root: root}) + require.Error(err, "GetCheckpoint should fail with non-existent checkpoint") + + // Deleting a non-existent checkpoint should fail. + err = fc.DeleteCheckpoint(ctx, &DeleteCheckpointRequest{Version: 1, Root: root}) + require.Error(err, "DeleteCheckpoint on a non-existent checkpoint should fail") + + // Fetching a non-existent chunk should fail. + err = fc.GetCheckpointChunk(ctx, chunk0, &buf) + require.Error(err, "GetChunk on a non-existent chunk should fail") + + // Create a checkpoint with unknown root. + invalidRoot := root + invalidRoot.Hash.FromBytes([]byte("mkvs checkpoint test invalid root")) + _, err = fc.CreateCheckpoint(ctx, invalidRoot, 16*1024) + require.Error(err, "CreateCheckpoint should fail for invalid root") +} diff --git a/go/storage/mkvs/urkel/checkpoint/chunk.go b/go/storage/mkvs/urkel/checkpoint/chunk.go new file mode 100644 index 00000000000..a897e59d356 --- /dev/null +++ b/go/storage/mkvs/urkel/checkpoint/chunk.go @@ -0,0 +1,185 @@ +package checkpoint + +import ( + "context" + "errors" + "fmt" + "io" + + "github.com/golang/snappy" + + "github.com/oasislabs/oasis-core/go/common/cbor" + "github.com/oasislabs/oasis-core/go/common/crypto/hash" + "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel" + db "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/db/api" + "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/node" + "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/syncer" +) + +func createChunk( + ctx context.Context, + tree *urkel.Tree, + root node.Root, + offset node.Key, + chunkSize uint64, + w io.Writer, +) ( + chunkHash hash.Hash, + nextOffset node.Key, + err error, +) { + it := tree.NewIterator(ctx, urkel.WithProof(root.Hash)) + defer it.Close() + + // We build the chunk until the proof becomes too large or we have reached the end. + for it.Seek(offset); it.Valid() && it.GetProofBuilder().Size() < chunkSize; it.Next() { + // Check if context got cancelled while iterating to abort early. + if ctx.Err() != nil { + err = ctx.Err() + return + } + + nextOffset = it.Key() + } + if it.Err() != nil { + err = fmt.Errorf("chunk: failed to iterate: %w", it.Err()) + return + } + if !it.Valid() { + // We have finished iterating. + nextOffset = nil + } + + // Build our chunk. + proof, err := it.GetProof() + if err != nil { + err = fmt.Errorf("chunk: failed to build proof: %w", err) + return + } + + hb := hash.NewBuilder() + sw := snappy.NewBufferedWriter(io.MultiWriter(w, hb)) + enc := cbor.NewEncoder(sw) + for _, entry := range proof.Entries { + if err = enc.Encode(entry); err != nil { + err = fmt.Errorf("chunk: failed to encode chunk part: %w", err) + return + } + } + if err = sw.Close(); err != nil { + err = fmt.Errorf("chunk: failed to close chunk: %w", err) + return + } + + chunkHash = hb.Build() + return +} + +func restoreChunk(ctx context.Context, ndb db.NodeDB, chunk *ChunkMetadata, r io.Reader) error { + hb := hash.NewBuilder() + sr := snappy.NewReader(io.TeeReader(r, hb)) + dec := cbor.NewDecoder(sr) + + // Reconstruct the proof. + var p syncer.Proof + for { + if ctx.Err() != nil { + return ctx.Err() + } + + var entry []byte + if err := dec.Decode(&entry); err != nil { + if errors.Is(err, io.EOF) { + break + } + return fmt.Errorf("chunk: failed to decode chunk: %w", err) + } + + p.Entries = append(p.Entries, entry) + } + p.UntrustedRoot = chunk.Root.Hash + + // Verify overall chunk integrity. + chunkHash := hb.Build() + if !chunk.Digest.Equal(&chunkHash) { + return fmt.Errorf("chunk: digest incorrect (expected: %s got: %s)", + chunk.Digest, + chunkHash, + ) + } + + // Verify the proof. + var pv syncer.ProofVerifier + ptr, err := pv.VerifyProof(ctx, chunk.Root.Hash, &p) + if err != nil { + return fmt.Errorf("chunk: chunk proof verification failed: %w", err) + } + + // Import chunk into the node database. + emptyRoot := node.Root{ + Namespace: chunk.Root.Namespace, + Round: chunk.Root.Round, + } + emptyRoot.Hash.Empty() + + batch := ndb.NewBatch(emptyRoot, true) + defer batch.Reset() + + subtree := batch.MaybeStartSubtree(nil, 0, ptr) + if err = doRestoreChunk(ctx, batch, subtree, 0, ptr); err != nil { + return fmt.Errorf("chunk: node import failed: %w", err) + } + if err = subtree.Commit(); err != nil { + return fmt.Errorf("chunk: node import failed: %w", err) + } + if err = batch.Commit(chunk.Root); err != nil { + return fmt.Errorf("chunk: node import failed: %w", err) + } + + return nil +} + +func doRestoreChunk( + ctx context.Context, + batch db.Batch, + subtree db.Subtree, + depth node.Depth, + ptr *node.Pointer, +) (err error) { + if ptr == nil { + return + } + + switch n := ptr.Node.(type) { + case nil: + case *node.InternalNode: + // Commit internal leaf (considered to be on the same depth as the internal node). + if err = doRestoreChunk(ctx, batch, subtree, depth, n.LeafNode); err != nil { + return + } + + for _, subNode := range []*node.Pointer{n.Left, n.Right} { + newSubtree := batch.MaybeStartSubtree(subtree, depth+1, subNode) + if err = doRestoreChunk(ctx, batch, newSubtree, depth+1, subNode); err != nil { + return + } + if newSubtree != subtree { + if err = newSubtree.Commit(); err != nil { + return + } + } + } + + // Store the node. + if err = subtree.PutNode(depth, ptr); err != nil { + return + } + case *node.LeafNode: + // Leaf node -- store the node. + if err = subtree.PutNode(depth, ptr); err != nil { + return + } + } + + return +} diff --git a/go/storage/mkvs/urkel/checkpoint/file.go b/go/storage/mkvs/urkel/checkpoint/file.go new file mode 100644 index 00000000000..afd4ae23ad1 --- /dev/null +++ b/go/storage/mkvs/urkel/checkpoint/file.go @@ -0,0 +1,209 @@ +package checkpoint + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "strconv" + + "github.com/oasislabs/oasis-core/go/common" + "github.com/oasislabs/oasis-core/go/common/cbor" + "github.com/oasislabs/oasis-core/go/common/crypto/hash" + "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel" + db "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/db/api" + "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/node" +) + +const ( + chunksDir = "chunks" + checkpointMetadataFile = "meta" + checkpointVersion = 1 +) + +type fileCreator struct { + dataDir string + ndb db.NodeDB +} + +func (fc *fileCreator) CreateCheckpoint(ctx context.Context, root node.Root, chunkSize uint64) (meta *Metadata, err error) { + tree := urkel.NewWithRoot(nil, fc.ndb, root) + defer tree.Close() + + // Create checkpoint directory. + checkpointDir := filepath.Join( + fc.dataDir, + strconv.FormatUint(root.Round, 10), + root.Hash.String(), + ) + if err = common.Mkdir(checkpointDir); err != nil { + return nil, fmt.Errorf("checkpoint: failed to create checkpoint directory: %w", err) + } + defer func() { + if err != nil { + // In case we have failed to create a checkpoint, make sure to clean up after ourselves. + _ = os.RemoveAll(checkpointDir) + } + }() + + // Check if the checkpoint already exists and just return the existing metadata in this case. + data, err := ioutil.ReadFile(filepath.Join(checkpointDir, checkpointMetadataFile)) + if err == nil { + var existing Metadata + if err = cbor.Unmarshal(data, &existing); err != nil { + return nil, fmt.Errorf("checkpoint: corrupted checkpoint metadata: %w", err) + } + return &existing, nil + } + + // Create chunks directory. + chunksDir := filepath.Join(checkpointDir, chunksDir) + if err = common.Mkdir(chunksDir); err != nil { + return nil, fmt.Errorf("checkpoint: failed to create chunk directory: %w", err) + } + + // Create chunks until we are done. + var chunks []hash.Hash + var nextOffset node.Key + for chunkIndex := 0; ; chunkIndex++ { + dataFilename := filepath.Join(chunksDir, strconv.Itoa(chunkIndex)) + + // Generate chunk. + var f *os.File + if f, err = os.Create(dataFilename); err != nil { + return nil, fmt.Errorf("checkpoint: failed to create chunk file for chunk %d: %w", chunkIndex, err) + } + + var chunkHash hash.Hash + chunkHash, nextOffset, err = createChunk(ctx, tree, root, nextOffset, chunkSize, f) + f.Close() + if err != nil { + return nil, fmt.Errorf("checkpoint: failed to create chunk %d: %w", chunkIndex, err) + } + + chunks = append(chunks, chunkHash) + + // Check if we are finished. + if nextOffset == nil { + break + } + } + + // Generate and write checkpoint metadata. + meta = &Metadata{ + Version: checkpointVersion, + Root: root, + Chunks: chunks, + } + + if err = ioutil.WriteFile(filepath.Join(checkpointDir, checkpointMetadataFile), cbor.Marshal(meta), 0600); err != nil { + return nil, fmt.Errorf("checkpoint: failed to create checkpoint metadata: %w", err) + } + return meta, nil +} + +func (fc *fileCreator) GetCheckpoints(ctx context.Context, request *GetCheckpointsRequest) ([]*Metadata, error) { + // Currently we only support a single version so we report no checkpoints for other versions. + if request.Version != checkpointVersion { + return []*Metadata{}, nil + } + + matches, err := filepath.Glob(filepath.Join(fc.dataDir, "*", "*", checkpointMetadataFile)) + if err != nil { + return nil, fmt.Errorf("checkpoint: failed to enumerate checkpoints: %w", err) + } + + var cps []*Metadata + for _, m := range matches { + data, err := ioutil.ReadFile(m) + if err != nil { + return nil, fmt.Errorf("checkpoint: failed to read checkpoint metadata at %s: %w", m, err) + } + + var cp Metadata + if err = cbor.Unmarshal(data, &cp); err != nil { + return nil, fmt.Errorf("checkpoint: corrupted checkpoint metadata at %s: %w", m, err) + } + + cps = append(cps, &cp) + } + return cps, nil +} + +func (fc *fileCreator) GetCheckpoint(ctx context.Context, request *GetCheckpointRequest) (*Metadata, error) { + // Currently we only support a single version. + if request.Version != checkpointVersion { + return nil, ErrCheckpointNotFound + } + + checkpointFilename := filepath.Join( + fc.dataDir, + strconv.FormatUint(request.Root.Round, 10), + request.Root.Hash.String(), + checkpointMetadataFile, + ) + data, err := ioutil.ReadFile(checkpointFilename) + if err != nil { + return nil, ErrCheckpointNotFound + } + + var cp Metadata + if err = cbor.Unmarshal(data, &cp); err != nil { + return nil, fmt.Errorf("checkpoint: corrupted checkpoint metadata: %w", err) + } + return &cp, nil +} + +func (fc *fileCreator) DeleteCheckpoint(ctx context.Context, request *DeleteCheckpointRequest) error { + // Currently we only support a single version. + if request.Version != checkpointVersion { + return ErrCheckpointNotFound + } + + checkpointDir := filepath.Join( + fc.dataDir, + strconv.FormatUint(request.Root.Round, 10), + request.Root.Hash.String(), + ) + if _, err := os.Stat(checkpointDir); err != nil { + return ErrCheckpointNotFound + } + + return os.RemoveAll(checkpointDir) +} + +func (fc *fileCreator) GetCheckpointChunk(ctx context.Context, chunk *ChunkMetadata, w io.Writer) error { + // Currently we only support a single version. + if chunk.Version != checkpointVersion { + return ErrChunkNotFound + } + + chunkFilename := filepath.Join( + fc.dataDir, + strconv.FormatUint(chunk.Root.Round, 10), + chunk.Root.Hash.String(), + chunksDir, + strconv.FormatUint(chunk.Index, 10), + ) + + f, err := os.Open(chunkFilename) + if err != nil { + return ErrChunkNotFound + } + defer f.Close() + + if _, err = io.Copy(w, f); err != nil { + return fmt.Errorf("checkpoint: failed to read chunk: %w", err) + } + return nil +} + +// NewFileCreator creates a new checkpoint creator that writes created chunks into the filesystem. +func NewFileCreator(dataDir string, ndb db.NodeDB) (Creator, error) { + return &fileCreator{ + dataDir: dataDir, + ndb: ndb, + }, nil +} diff --git a/go/storage/mkvs/urkel/checkpoint/restorer.go b/go/storage/mkvs/urkel/checkpoint/restorer.go new file mode 100644 index 00000000000..235f1da5594 --- /dev/null +++ b/go/storage/mkvs/urkel/checkpoint/restorer.go @@ -0,0 +1,22 @@ +package checkpoint + +import ( + "context" + "io" + + db "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/db/api" +) + +// restorer is a checkpoint restorer. +type restorer struct { + ndb db.NodeDB +} + +func (rs *restorer) RestoreChunk(ctx context.Context, chunk *ChunkMetadata, r io.Reader) error { + return restoreChunk(ctx, rs.ndb, chunk, r) +} + +// NewRestorer creates a new checkpoint restorer. +func NewRestorer(ndb db.NodeDB) (Restorer, error) { + return &restorer{ndb: ndb}, nil +} diff --git a/go/storage/mkvs/urkel/commit.go b/go/storage/mkvs/urkel/commit.go index 76b5968d522..39d8fbf22ee 100644 --- a/go/storage/mkvs/urkel/commit.go +++ b/go/storage/mkvs/urkel/commit.go @@ -52,7 +52,7 @@ func (t *Tree) commitWithHooks( oldRoot.Round = round } - batch := t.cache.db.NewBatch(namespace, round, oldRoot) + batch := t.cache.db.NewBatch(oldRoot, false) defer batch.Reset() subtree := batch.MaybeStartSubtree(nil, 0, t.cache.pendingRoot) diff --git a/go/storage/mkvs/urkel/db/api/api.go b/go/storage/mkvs/urkel/db/api/api.go index c6409dd00c5..23371c6ef6e 100644 --- a/go/storage/mkvs/urkel/db/api/api.go +++ b/go/storage/mkvs/urkel/db/api/api.go @@ -66,11 +66,13 @@ type NodeDB interface { // GetWriteLog retrieves a write log between two storage instances from the database. GetWriteLog(ctx context.Context, startRoot node.Root, endRoot node.Root) (writelog.Iterator, error) - // GetCheckpoint retrieves a write log of entries in root. - GetCheckpoint(ctx context.Context, root node.Root) (writelog.Iterator, error) - // NewBatch starts a new batch. - NewBatch(namespace common.Namespace, round uint64, oldRoot node.Root) Batch + // + // The chunk argument specifies whether the given batch is being used to import a chunk of an + // existing root. Chunks may contain unresolved pointers (e.g., pointers that point to hashes + // which are not present in the database). Committing a chunk batch will prevent the round from + // being finalized. + NewBatch(oldRoot node.Root, chunk bool) Batch // HasRoot checks whether the given root exists. HasRoot(root node.Root) bool @@ -166,10 +168,6 @@ func (d *nopNodeDB) GetWriteLog(ctx context.Context, startRoot node.Root, endRoo return nil, ErrWriteLogNotFound } -func (d *nopNodeDB) GetCheckpoint(ctx context.Context, root node.Root) (writelog.Iterator, error) { - return nil, ErrWriteLogNotFound -} - func (d *nopNodeDB) HasRoot(root node.Root) bool { return false } @@ -191,7 +189,7 @@ type nopBatch struct { BaseBatch } -func (d *nopNodeDB) NewBatch(namespace common.Namespace, round uint64, oldRoot node.Root) Batch { +func (d *nopNodeDB) NewBatch(oldRoot node.Root, chunk bool) Batch { return &nopBatch{} } @@ -224,68 +222,3 @@ func (s *nopSubtree) VisitCleanNode(depth node.Depth, ptr *node.Pointer) error { func (s *nopSubtree) Commit() error { return nil } - -// CheckpointableDB encapsulates functionality of getting a checkpoint. -type CheckpointableDB struct { - db NodeDB -} - -// NewCheckpointableDB creates a new instance of CheckpoitableDb. -func NewCheckpointableDB(db NodeDB) CheckpointableDB { - return CheckpointableDB{db: db} -} - -// GetCheckpoint returns an iterator of write log entries in the provided -func (b *CheckpointableDB) GetCheckpoint(ctx context.Context, root node.Root) (writelog.Iterator, error) { - if !b.db.HasRoot(root) { - return nil, ErrNodeNotFound - } - ptr := &node.Pointer{ - Clean: true, - Hash: root.Hash, - } - pipe := writelog.NewPipeIterator(ctx) - go func() { - defer pipe.Close() - - b.getNodeWriteLog(ctx, &pipe, root, ptr) - }() - - return &pipe, nil -} - -func (b *CheckpointableDB) getNodeWriteLog(ctx context.Context, pipe *writelog.PipeIterator, root node.Root, ptr *node.Pointer) { - select { - case <-ctx.Done(): - return - default: - } - - nod, err := b.db.GetNode(root, ptr) - if err != nil { - _ = pipe.PutError(err) - return - } - switch n := nod.(type) { - case *node.LeafNode: - entry := writelog.LogEntry{ - Key: n.Key[:], - Value: n.Value[:], - } - if err := pipe.Put(&entry); err != nil { - _ = pipe.PutError(err) - } - case *node.InternalNode: - if n.LeafNode != nil { - b.getNodeWriteLog(ctx, pipe, root, n.LeafNode) - } - if n.Left != nil { - b.getNodeWriteLog(ctx, pipe, root, n.Left) - } - if n.Right != nil { - b.getNodeWriteLog(ctx, pipe, root, n.Right) - } - default: - panic("urkel/db/CheckpoitableDB: invalid root node type") - } -} diff --git a/go/storage/mkvs/urkel/db/badger/badger.go b/go/storage/mkvs/urkel/db/badger/badger.go index f88afbdeb9c..a5a473563ee 100644 --- a/go/storage/mkvs/urkel/db/badger/badger.go +++ b/go/storage/mkvs/urkel/db/badger/badger.go @@ -121,7 +121,6 @@ func New(cfg *api.Config) (api.NodeDB, error) { logger: logging.GetLogger("urkel/db/badger"), namespace: cfg.Namespace, } - db.CheckpointableDB = api.NewCheckpointableDB(db) opts := badger.DefaultOptions(cfg.DB) opts = opts.WithLogger(cmnBadger.NewLogAdapter(db.logger)) @@ -146,8 +145,6 @@ func New(cfg *api.Config) (api.NodeDB, error) { } type badgerNodeDB struct { - api.CheckpointableDB - logger *logging.Logger namespace common.Namespace @@ -748,7 +745,7 @@ func (d *badgerNodeDB) Prune(ctx context.Context, namespace common.Namespace, ro return pruned, nil } -func (d *badgerNodeDB) NewBatch(namespace common.Namespace, round uint64, oldRoot node.Root) api.Batch { +func (d *badgerNodeDB) NewBatch(oldRoot node.Root, chunk bool) api.Batch { // WARNING: There is a maximum batch size and maximum batch entry count. // Both of these things are derived from the MaxTableSize option. // @@ -759,8 +756,8 @@ func (d *badgerNodeDB) NewBatch(namespace common.Namespace, round uint64, oldRoo return &badgerBatch{ db: d, bat: d.db.NewWriteBatch(), - round: round, oldRoot: oldRoot, + chunk: chunk, } } @@ -825,8 +822,8 @@ type badgerBatch struct { db *badgerNodeDB bat *badger.WriteBatch - round uint64 oldRoot node.Root + chunk bool writeLog writelog.WriteLog annotations writelog.Annotations @@ -842,12 +839,20 @@ func (ba *badgerBatch) MaybeStartSubtree(subtree api.Subtree, depth node.Depth, } func (ba *badgerBatch) PutWriteLog(writeLog writelog.WriteLog, annotations writelog.Annotations) error { + if ba.chunk { + return fmt.Errorf("urkel/db/badger: cannot put write log in chunk mode") + } + ba.writeLog = writeLog ba.annotations = annotations return nil } func (ba *badgerBatch) RemoveNodes(nodes []node.Node) error { + if ba.chunk { + return fmt.Errorf("urkel/db/badger: cannot remove nodes in chunk mode") + } + ba.removedNodes = nodes return nil } @@ -887,68 +892,80 @@ func (ba *badgerBatch) Commit(root node.Root) error { return errors.Wrap(err, "urkel/db/badger: set returned error") } - // Update the root link for the old root. - if !ba.oldRoot.Hash.IsEmpty() { - if prevRound != ba.oldRoot.Round && ba.oldRoot.Round != root.Round { - return api.ErrPreviousRoundMismatch + if ba.chunk { + // Skip most of metadata updates if we are just importing chunks. + key := rootGcUpdatesKeyFmt.Encode(root.Round, &root.Hash) + if err = ba.bat.Set(key, cbor.Marshal(rootGcUpdates{})); err != nil { + return errors.Wrap(err, "urkel/db/badger: set returned error") + } + key = rootAddedNodesKeyFmt.Encode(root.Round, &root.Hash) + if err = ba.bat.Set(key, cbor.Marshal(rootAddedNodes{})); err != nil { + return errors.Wrap(err, "urkel/db/badger: set returned error") } + } else { + // Update the root link for the old root. + if !ba.oldRoot.Hash.IsEmpty() { + if prevRound != ba.oldRoot.Round && ba.oldRoot.Round != root.Round { + return api.ErrPreviousRoundMismatch + } - key := rootLinkKeyFmt.Encode(ba.oldRoot.Round, &ba.oldRoot.Hash, &emptyHash) - _, err = tx.Get(key) - switch err { - case nil: - case badger.ErrKeyNotFound: - return api.ErrRootNotFound - default: - return err + key := rootLinkKeyFmt.Encode(ba.oldRoot.Round, &ba.oldRoot.Hash, &emptyHash) + _, err = tx.Get(key) + switch err { + case nil: + case badger.ErrKeyNotFound: + return api.ErrRootNotFound + default: + return err + } + + key = rootLinkKeyFmt.Encode(ba.oldRoot.Round, &ba.oldRoot.Hash, &root.Hash) + if err = ba.bat.Set(key, []byte("")); err != nil { + return errors.Wrap(err, "urkel/db/badger: set returned error") + } } - key = rootLinkKeyFmt.Encode(ba.oldRoot.Round, &ba.oldRoot.Hash, &root.Hash) - if err = ba.bat.Set(key, []byte("")); err != nil { + // Mark removed nodes for garbage collection. Updates against the GC index + // are only applied in case this root is finalized. + var gcUpdates rootGcUpdates + for _, n := range ba.removedNodes { + // Node lives from the round it was created in up to the previous round. + // + // NOTE: The node will never be resurrected as the round number is part + // of the node hash. + endRound := prevRound + if ba.oldRoot.Round == root.Round { + // If the previous root is in the same round, the node needs to end + // in the same round instead. + endRound = root.Round + } + + h := n.GetHash() + gcUpdates = append(gcUpdates, rootGcUpdate{ + EndRound: endRound, + StartRound: n.GetCreatedRound(), + Node: h, + }) + } + key := rootGcUpdatesKeyFmt.Encode(root.Round, &root.Hash) + if err = ba.bat.Set(key, cbor.Marshal(gcUpdates)); err != nil { return errors.Wrap(err, "urkel/db/badger: set returned error") } - } - // Mark removed nodes for garbage collection. Updates against the GC index - // are only applied in case this root is finalized. - var gcUpdates rootGcUpdates - for _, n := range ba.removedNodes { - // Node lives from the round it was created in up to the previous round. - // - // NOTE: The node will never be resurrected as the round number is part - // of the node hash. - endRound := prevRound - if ba.oldRoot.Round == root.Round { - // If the previous root is in the same round, the node needs to end - // in the same round instead. - endRound = root.Round + // Store added nodes (only needed until the round is finalized). + key = rootAddedNodesKeyFmt.Encode(root.Round, &root.Hash) + if err = ba.bat.Set(key, cbor.Marshal(ba.addedNodes)); err != nil { + return errors.Wrap(err, "urkel/db/badger: set returned error") } - h := n.GetHash() - gcUpdates = append(gcUpdates, rootGcUpdate{ - EndRound: endRound, - StartRound: n.GetCreatedRound(), - Node: h, - }) - } - key := rootGcUpdatesKeyFmt.Encode(root.Round, &root.Hash) - if err = ba.bat.Set(key, cbor.Marshal(gcUpdates)); err != nil { - return errors.Wrap(err, "urkel/db/badger: set returned error") - } - - // Store added nodes (only needed until the round is finalized). - key = rootAddedNodesKeyFmt.Encode(root.Round, &root.Hash) - if err = ba.bat.Set(key, cbor.Marshal(ba.addedNodes)); err != nil { - return errors.Wrap(err, "urkel/db/badger: set returned error") - } - - // Store write log. - if ba.writeLog != nil && ba.annotations != nil { - log := api.MakeHashedDBWriteLog(ba.writeLog, ba.annotations) - bytes := cbor.Marshal(log) - key := writeLogKeyFmt.Encode(root.Round, &root.Hash, &ba.oldRoot.Hash) - if err := ba.bat.Set(key, bytes); err != nil { - return errors.Wrap(err, "urkel/db/badger: set new write log returned error") + // Store write log. + if ba.writeLog != nil && ba.annotations != nil { + log := api.MakeHashedDBWriteLog(ba.writeLog, ba.annotations) + bytes := cbor.Marshal(log) + key := writeLogKeyFmt.Encode(root.Round, &root.Hash, &ba.oldRoot.Hash) + if err := ba.bat.Set(key, bytes); err != nil { + return errors.Wrap(err, "urkel/db/badger: set new write log returned error") + } } } diff --git a/go/storage/mkvs/urkel/iterator.go b/go/storage/mkvs/urkel/iterator.go index 36d01ba668e..f30f6b3c310 100644 --- a/go/storage/mkvs/urkel/iterator.go +++ b/go/storage/mkvs/urkel/iterator.go @@ -99,6 +99,8 @@ type Iterator interface { // You must initialize the iterator with a WithProof option, otherwise // calling this method will panic. GetProof() (*syncer.Proof, error) + // GetProofBuilder returns the proof builder associated with this iterator. + GetProofBuilder() *syncer.ProofBuilder // Close releases resources associated with the iterator. // // Not calling this method leads to memory leaks. @@ -342,6 +344,10 @@ func (it *treeIterator) GetProof() (*syncer.Proof, error) { return it.proofBuilder.Build(it.ctx) } +func (it *treeIterator) GetProofBuilder() *syncer.ProofBuilder { + return it.proofBuilder +} + func (it *treeIterator) Close() { it.reset() it.ctx = nil diff --git a/go/storage/mkvs/urkel/syncer/proof.go b/go/storage/mkvs/urkel/syncer/proof.go index 733fdf28788..3ca9677ca97 100644 --- a/go/storage/mkvs/urkel/syncer/proof.go +++ b/go/storage/mkvs/urkel/syncer/proof.go @@ -35,6 +35,7 @@ type proofNode struct { type ProofBuilder struct { root hash.Hash included map[hash.Hash]*proofNode + size uint64 } // NewProofBuilder creates a new Merkle proof builder for the given root. @@ -56,6 +57,12 @@ func (b *ProofBuilder) Include(n node.Node) { panic("proof: attempted to add a dirty node") } + // If node is already included, skip it. + nh := n.GetHash() + if _, ok := b.included[nh]; ok { + return + } + // Node is available, serialize it. var err error var pn proofNode @@ -83,7 +90,8 @@ func (b *ProofBuilder) Include(n node.Node) { } } - b.included[n.GetHash()] = &pn + b.included[nh] = &pn + b.size += 1 + uint64(len(pn.serialized)) } // HasRoot returns true if the root node has already been included. @@ -96,6 +104,11 @@ func (b *ProofBuilder) GetRoot() hash.Hash { return b.root } +// Size returns the current size of this proof. +func (b *ProofBuilder) Size() uint64 { + return b.size +} + // Build tries to build the proof. func (b *ProofBuilder) Build(ctx context.Context) (*Proof, error) { proof := Proof{ diff --git a/go/storage/mkvs/urkel/urkel_test.go b/go/storage/mkvs/urkel/urkel_test.go index c454e5a7526..fa1e0a1a008 100644 --- a/go/storage/mkvs/urkel/urkel_test.go +++ b/go/storage/mkvs/urkel/urkel_test.go @@ -939,7 +939,7 @@ func testOnCommitHooks(t *testing.T, ndb db.NodeDB) { Hash: emptyRoot, } - batch := ndb.NewBatch(testNs, 0, root) + batch := ndb.NewBatch(root, false) defer batch.Reset() var calls []int diff --git a/go/storage/tests/tester.go b/go/storage/tests/tester.go index 39309969b4f..b3633930a03 100644 --- a/go/storage/tests/tester.go +++ b/go/storage/tests/tester.go @@ -4,6 +4,7 @@ package tests import ( "bytes" "context" + "io" "sort" "strconv" "testing" @@ -15,6 +16,7 @@ import ( genesisTestHelpers "github.com/oasislabs/oasis-core/go/genesis/tests" "github.com/oasislabs/oasis-core/go/storage/api" "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel" + "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/checkpoint" "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/writelog" ) @@ -82,20 +84,20 @@ func foldWriteLogIterator(t *testing.T, w api.WriteLogIterator) api.WriteLog { // StorageImplementationTests exercises the basic functionality of a storage // backend. -func StorageImplementationTests(t *testing.T, backend api.Backend, namespace common.Namespace, round uint64) { +func StorageImplementationTests(t *testing.T, localBackend api.LocalBackend, backend api.Backend, namespace common.Namespace, round uint64) { genesisTestHelpers.SetTestChainContext() <-backend.Initialized() t.Run("Basic", func(t *testing.T) { - testBasic(t, backend, namespace, round) + testBasic(t, localBackend, backend, namespace, round) }) t.Run("Merge", func(t *testing.T) { testMerge(t, backend, namespace, round) }) } -func testBasic(t *testing.T, backend api.Backend, namespace common.Namespace, round uint64) { +func testBasic(t *testing.T, localBackend api.LocalBackend, backend api.Backend, namespace common.Namespace, round uint64) { ctx := context.Background() var rootHash hash.Hash @@ -237,47 +239,29 @@ func testBasic(t *testing.T, backend api.Backend, namespace common.Namespace, ro require.EqualValues(t, expectedNewRoot, receiptBody.Roots[0], "receiptBody root should equal the expected new root") } - // Test GetCheckpoint. - logsIter, err := backend.GetCheckpoint(ctx, &api.GetCheckpointRequest{Root: newRoot}) - require.NoError(t, err, "GetCheckpoint()") - logs := foldWriteLogIterator(t, logsIter) - // Applying the writeLog should return same root. - logsRootHash := CalculateExpectedNewRoot(t, logs, namespace, round) - require.EqualValues(t, logsRootHash, receiptBody.Roots[0]) - - // Single node tree. - root.Empty() - wl3 := prepareWriteLog([][]byte{testValues[0]}) - expectedNewRoot3 := CalculateExpectedNewRoot(t, wl3, namespace, round) - - receipts, err = backend.Apply(ctx, &api.ApplyRequest{ - Namespace: namespace, - SrcRound: round, - SrcRoot: rootHash, - DstRound: round, - DstRoot: expectedNewRoot3, - WriteLog: wl3, + // Test checkpoints. + t.Run("Checkpoints", func(t *testing.T) { + // Create a new checkpoint with the local backend. + cp, err := localBackend.Checkpointer().CreateCheckpoint(ctx, newRoot, 16*1024) + require.NoError(t, err, "CreateCheckpoint") + + cps, err := backend.GetCheckpoints(ctx, &checkpoint.GetCheckpointsRequest{Version: 1, Namespace: namespace}) + require.NoError(t, err, "GetCheckpoints") + require.Len(t, cps, 1, "GetCheckpoints should return one checkpoint") + require.Equal(t, cp, cps[0], "GetCheckpoints should return correct checkpoint metadata") + require.Len(t, cps[0].Chunks, 1, "checkpoint should have a single chunk") + + var buf bytes.Buffer + chunk, err := cps[0].GetChunkMetadata(0) + require.NoError(t, err, "GetChunkMetadata") + err = backend.GetCheckpointChunk(ctx, chunk, &buf) + require.NoError(t, err, "GetCheckpointChunk") + + hb := hash.NewBuilder() + _, err = io.Copy(hb, &buf) + require.NoError(t, err, "Copy") + require.Equal(t, cp.Chunks[0], hb.Build(), "GetCheckpointChunk must return correct chunk") }) - require.NoError(t, err, "Apply() should not return an error") - require.NotNil(t, receipts, "Apply() should return receipts") - - for i, receipt := range receipts { - err = receipt.Open(&receiptBody) - require.NoError(t, err, "receipt.Open() should not return an error") - require.Equal(t, uint16(1), receiptBody.Version, "mkvs receipt version should be 1") - require.Equal(t, 1, len(receiptBody.Roots), "mkvs receipt should contain 1 root") - require.EqualValues(t, expectedNewRoot3, receiptBody.Roots[0], "mkvs receipt root should equal the expected new root") - if i == 0 { - newRoot.Hash = receiptBody.Roots[0] - } - } - - logsIter, err = backend.GetCheckpoint(ctx, &api.GetCheckpointRequest{Root: newRoot}) - require.NoError(t, err, "GetCheckpoint()") - logs = foldWriteLogIterator(t, logsIter) - // Applying the writeLog should return same root. - logsRootHash = CalculateExpectedNewRoot(t, logs, namespace, round) - require.EqualValues(t, logsRootHash, newRoot.Hash) } func testMerge(t *testing.T, backend api.Backend, namespace common.Namespace, round uint64) { diff --git a/go/worker/storage/committee/checkpointer.go b/go/worker/storage/committee/checkpointer.go new file mode 100644 index 00000000000..bab8c9e6ad1 --- /dev/null +++ b/go/worker/storage/committee/checkpointer.go @@ -0,0 +1,206 @@ +package committee + +import ( + "context" + "fmt" + "sort" + "time" + + "github.com/eapache/channels" + + "github.com/oasislabs/oasis-core/go/common/logging" + registry "github.com/oasislabs/oasis-core/go/registry/api" + "github.com/oasislabs/oasis-core/go/runtime/history" + runtimeRegistry "github.com/oasislabs/oasis-core/go/runtime/registry" + storage "github.com/oasislabs/oasis-core/go/storage/api" + "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/checkpoint" +) + +// CheckpointerConfig is a checkpointer configuration. +type CheckpointerConfig struct { + CheckInterval time.Duration +} + +type checkpointer struct { + cfg CheckpointerConfig + + creator checkpoint.Creator + notifyBlockCh *channels.RingChannel + + logger *logging.Logger +} + +func (c *checkpointer) notifyNewBlock(round uint64) { + c.notifyBlockCh.In() <- round +} + +func (c *checkpointer) checkpoint(ctx context.Context, rt *registry.Runtime, round uint64, history history.History) (err error) { + blk, err := history.GetBlock(ctx, round) + if err != nil { + return fmt.Errorf("checkpointer: failed to get block: %w", err) + } + + roots := blk.Header.StorageRoots() + defer func() { + if err == nil { + return + } + + // If there is an error, make sure to remove any created checkpoints. + for _, root := range roots { + _ = c.creator.DeleteCheckpoint(ctx, &checkpoint.DeleteCheckpointRequest{Version: 1, Root: root}) + } + }() + + for _, root := range roots { + c.logger.Info("creating new checkpoint", + "root", root, + "chunk_size", rt.Storage.CheckpointChunkSize, + ) + + _, err = c.creator.CreateCheckpoint(ctx, root, rt.Storage.CheckpointChunkSize) + if err != nil { + c.logger.Error("failed to create checkpoint", + "root", root, + "err", err, + ) + return fmt.Errorf("checkpointer: failed to create checkpoint: %w", err) + } + } + return nil +} + +func (c *checkpointer) maybeCheckpoint(ctx context.Context, rt *registry.Runtime, round uint64, history history.History) error { + // Get a list of all current checkpoints. + cps, err := c.creator.GetCheckpoints(ctx, &checkpoint.GetCheckpointsRequest{ + Version: 1, + Namespace: rt.ID, + }) + if err != nil { + return fmt.Errorf("checkpointer: failed to get existing checkpoints: %w", err) + } + + // Check if we need to create a new checkpoint based on the list of existing checkpoints, the + // current round and the runtime configuration. Note that for each round we create two + // checkpoints, one for the state root and another one for the IO root. + var lastCheckpointRound uint64 + var cpRounds []uint64 + cpsByRound := make(map[uint64][]storage.Root) + for _, cp := range cps { + if cpsByRound[cp.Root.Round] == nil { + cpRounds = append(cpRounds, cp.Root.Round) + } + cpsByRound[cp.Root.Round] = append(cpsByRound[cp.Root.Round], cp.Root) + if len(cpsByRound[cp.Root.Round]) == 2 && cp.Root.Round > lastCheckpointRound { + lastCheckpointRound = cp.Root.Round + } + } + sort.Slice(cpRounds, func(i, j int) bool { return cpRounds[i] < cpRounds[j] }) + + // Checkpoint any missing rounds. + cpInterval := rt.Storage.CheckpointInterval + for cpRound := lastCheckpointRound + cpInterval; cpRound < round; cpRound = cpRound + cpInterval { + c.logger.Info("checkpointing round", + "round", cpRound, + ) + + if err = c.checkpoint(ctx, rt, cpRound, history); err != nil { + c.logger.Error("failed to checkpoint round", + "round", cpRound, + "err", err, + ) + return fmt.Errorf("checkpointer: failed to checkpoint round: %w", err) + } + } + + // Garbage collect old checkpoints. + if int(rt.Storage.CheckpointNumKept) < len(cpRounds) { + c.logger.Info("performing checkpoint garbage collection", + "num_checkpoints", len(cpRounds), + "num_kept", rt.Storage.CheckpointNumKept, + ) + + for _, round := range cpRounds[:len(cpRounds)-int(rt.Storage.CheckpointNumKept)] { + for _, root := range cpsByRound[round] { + if err = c.creator.DeleteCheckpoint(ctx, &checkpoint.DeleteCheckpointRequest{ + Version: 1, + Root: root, + }); err != nil { + c.logger.Warn("failed to garbage collect checkpoint", + "root", root, + "err", err, + ) + continue + } + } + } + } + + return nil +} + +func (c *checkpointer) worker(ctx context.Context, runtime runtimeRegistry.Runtime) { + c.logger.Debug("storage checkpointer started", + "check_interval", c.cfg.CheckInterval, + ) + defer func() { + c.logger.Debug("storage checkpointer terminating") + }() + + // Use a ticker to avoid checking for checkpoints too often. + ticker := time.NewTicker(c.cfg.CheckInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + var round uint64 + select { + case <-ctx.Done(): + return + case r := <-c.notifyBlockCh.Out(): + round = r.(uint64) + } + + rt, err := runtime.RegistryDescriptor(ctx) + if err != nil { + c.logger.Warn("failed to get runtime descriptor", + "round", round, + "err", err, + ) + continue + } + + // Don't checkpoint if checkpoints are disabled. + if rt.Storage.CheckpointInterval == 0 { + continue + } + + if err := c.maybeCheckpoint(ctx, rt, round, runtime.History()); err != nil { + c.logger.Error("failed to checkpoint", + "round", round, + "err", err, + ) + continue + } + } + } +} + +func newCheckpointer( + ctx context.Context, + runtime runtimeRegistry.Runtime, + creator checkpoint.Creator, + cfg CheckpointerConfig, +) (*checkpointer, error) { + c := &checkpointer{ + cfg: cfg, + creator: creator, + notifyBlockCh: channels.NewRingChannel(1), + logger: logging.GetLogger("worker/storage/committee/checkpointer").With("runtime_id", runtime.ID()), + } + go c.worker(ctx, runtime) + return c, nil +} diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index 942043d4698..4595cd19883 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -161,6 +161,8 @@ type Node struct { workerCommonCfg workerCommon.Config + checkpointer *checkpointer + syncedLock sync.RWMutex syncedState watcherState @@ -182,6 +184,7 @@ func NewNode( store *persistent.ServiceStore, roleProvider registration.RoleProvider, workerCommonCfg workerCommon.Config, + checkpointerCfg CheckpointerConfig, ) (*Node, error) { localStorage, ok := commonNode.Storage.(storageApi.LocalBackend) if !ok { @@ -216,7 +219,7 @@ func NewNode( rtID := commonNode.Runtime.ID() err := store.GetCBOR(rtID[:], &node.syncedState) if err != nil && err != persistent.ErrNotFound { - return nil, err + return nil, fmt.Errorf("storage worker: failed to restore sync state: %w", err) } node.ctx, node.ctxCancel = context.WithCancel(context.Background()) @@ -228,10 +231,16 @@ func NewNode( // Create a new storage client that will be used for remote sync. scl, err := client.New(node.ctx, ns, node.commonNode.Identity, node.commonNode.Scheduler, node.commonNode.Registry) if err != nil { - return nil, err + return nil, fmt.Errorf("storage worker: failed to create client: %w", err) } node.storageClient = scl.(storageApi.ClientBackend) + // Create a new checkpointer. + node.checkpointer, err = newCheckpointer(node.ctx, commonNode.Runtime, localStorage.Checkpointer(), checkpointerCfg) + if err != nil { + return nil, fmt.Errorf("storage worker: failed to create checkpointer: %w", err) + } + // Register prune handler. commonNode.Runtime.History().Pruner().RegisterHandler(&pruneHandler{ logger: node.logger, @@ -328,10 +337,8 @@ func (n *Node) HandleNewBlockEarlyLocked(*block.Block) { // Guarded by CrossNode. func (n *Node) HandleNewBlockLocked(blk *block.Block) { - select { - case n.blockCh.In() <- blk: - case <-n.ctx.Done(): - } + // Notify the state syncer that there is a new block. + n.blockCh.In() <- blk } // Guarded by CrossNode. @@ -710,6 +717,9 @@ mainLoop: n.logger.Error("can't store watcher state to database", "err", err) } + // Notify the checkpointer that there is a new round. + n.checkpointer.notifyNewBlock(finalized.Round) + case <-n.ctx.Done(): break mainLoop } @@ -738,6 +748,8 @@ func (p *pruneHandler) Prune(ctx context.Context, rounds []uint64) error { ) } + // TODO: Make sure we don't prune rounds that need to be checkpointed but haven't been yet. + p.logger.Debug("pruning storage for round", "round", round) // Prune given block. diff --git a/go/worker/storage/committee/policy.go b/go/worker/storage/committee/policy.go index 95644989bdf..16a6d33974b 100644 --- a/go/worker/storage/committee/policy.go +++ b/go/worker/storage/committee/policy.go @@ -27,13 +27,14 @@ var ( accessctl.Action(api.MethodMergeBatch.FullName()), }, } - // NOTE: GetDiff/GetCheckpoint need to be accessible to all storage nodes, + // NOTE: GetDiff/GetCheckpoint* need to be accessible to all storage nodes, // not just the ones in the current storage committee so that new nodes can // sync-up. storageNodesPolicy = &committee.AccessPolicy{ Actions: []accessctl.Action{ accessctl.Action(api.MethodGetDiff.FullName()), - accessctl.Action(api.MethodGetCheckpoint.FullName()), + accessctl.Action(api.MethodGetCheckpoints.FullName()), + accessctl.Action(api.MethodGetCheckpointChunk.FullName()), }, } sentryNodesPolicy = &committee.AccessPolicy{ diff --git a/go/worker/storage/service_external.go b/go/worker/storage/service_external.go index f7123e20bca..0cc8120fb08 100644 --- a/go/worker/storage/service_external.go +++ b/go/worker/storage/service_external.go @@ -4,12 +4,14 @@ import ( "context" "errors" "fmt" + "io" "github.com/oasislabs/oasis-core/go/common" "github.com/oasislabs/oasis-core/go/common/grpc/auth" "github.com/oasislabs/oasis-core/go/common/grpc/policy" registry "github.com/oasislabs/oasis-core/go/registry/api" "github.com/oasislabs/oasis-core/go/storage/api" + "github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/checkpoint" ) var ( @@ -173,11 +175,18 @@ func (s *storageService) GetDiff(ctx context.Context, request *api.GetDiffReques return s.storage.GetDiff(ctx, request) } -func (s *storageService) GetCheckpoint(ctx context.Context, request *api.GetCheckpointRequest) (api.WriteLogIterator, error) { +func (s *storageService) GetCheckpoints(ctx context.Context, request *checkpoint.GetCheckpointsRequest) ([]*checkpoint.Metadata, error) { if err := s.ensureInitialized(ctx); err != nil { return nil, err } - return s.storage.GetCheckpoint(ctx, request) + return s.storage.GetCheckpoints(ctx, request) +} + +func (s *storageService) GetCheckpointChunk(ctx context.Context, chunk *checkpoint.ChunkMetadata, w io.Writer) error { + if err := s.ensureInitialized(ctx); err != nil { + return err + } + return s.storage.GetCheckpointChunk(ctx, chunk, w) } func (s *storageService) Cleanup() { diff --git a/go/worker/storage/worker.go b/go/worker/storage/worker.go index dae61eab87b..0343c7b7de6 100644 --- a/go/worker/storage/worker.go +++ b/go/worker/storage/worker.go @@ -3,6 +3,7 @@ package storage import ( "context" "fmt" + "time" flag "github.com/spf13/pflag" "github.com/spf13/viper" @@ -31,6 +32,9 @@ const ( CfgWorkerEnabled = "worker.storage.enabled" cfgWorkerFetcherCount = "worker.storage.fetcher_count" + // CfgWorkerCheckpointCheckInterval configures the checkpointer check interval. + CfgWorkerCheckpointCheckInterval = "worker.storage.checkpointer.check_interval" + // CfgWorkerDebugIgnoreApply is a debug option that makes the worker ignore // all apply operations. CfgWorkerDebugIgnoreApply = "worker.debug.storage.ignore_apply" @@ -122,9 +126,13 @@ func New( debugRejectUpdates: viper.GetBool(CfgWorkerDebugIgnoreApply) && flags.DebugDontBlameOasis(), }) + checkpointerCfg := committee.CheckpointerConfig{ + CheckInterval: viper.GetDuration(CfgWorkerCheckpointCheckInterval), + } + // Start storage node for every runtime. for _, rt := range s.commonWorker.GetRuntimes() { - if err := s.registerRuntime(rt); err != nil { + if err := s.registerRuntime(rt, checkpointerCfg); err != nil { return nil, err } } @@ -136,7 +144,7 @@ func New( return s, nil } -func (s *Worker) registerRuntime(commonNode *committeeCommon.Node) error { +func (s *Worker) registerRuntime(commonNode *committeeCommon.Node, checkpointerCfg committee.CheckpointerConfig) error { id := commonNode.Runtime.ID() s.logger.Info("registering new runtime", "runtime_id", id, @@ -147,7 +155,7 @@ func (s *Worker) registerRuntime(commonNode *committeeCommon.Node) error { return fmt.Errorf("failed to create role provider: %w", err) } - node, err := committee.NewNode(commonNode, s.grpcPolicy, s.fetchPool, s.watchState, rp, s.commonWorker.GetConfig()) + node, err := committee.NewNode(commonNode, s.grpcPolicy, s.fetchPool, s.watchState, rp, s.commonWorker.GetConfig(), checkpointerCfg) if err != nil { return err } @@ -302,6 +310,7 @@ func (s *Worker) initGenesis(gen *genesis.Document) error { func init() { Flags.Bool(CfgWorkerEnabled, false, "Enable storage worker") Flags.Uint(cfgWorkerFetcherCount, 4, "Number of concurrent storage diff fetchers") + Flags.Duration(CfgWorkerCheckpointCheckInterval, 1*time.Minute, "Storage checkpointer check interval") Flags.Bool(CfgWorkerDebugIgnoreApply, false, "Ignore Apply operations (for debugging purposes)") _ = Flags.MarkHidden(CfgWorkerDebugIgnoreApply)