Skip to content

Commit

Permalink
go/storage: Refactor checkpointing interface
Browse files Browse the repository at this point in the history
Previously the way storage checkpoints were implemented had several
drawbacks, namely:

- Since the checkpoint only streamed key/value pairs this prevented
  correct tree reconstruction as tree nodes also include a Round field
  which specifies the round at which a given tree node was created.

- While old checkpoints were streamed in chunks and thus could be
  resumed or streamed in parallel from multiple nodes, there was no
  support for verifying the integrity of a single chunk.

This change introduces an explicit checkpointing mechanism with a simple
file-based backend reference implementation. The same mechanism could
also be used in the future with Tendermint's app state sync proposal.
  • Loading branch information
kostko committed Feb 14, 2020
1 parent 7c800a8 commit 46f06ef
Show file tree
Hide file tree
Showing 46 changed files with 1,591 additions and 361 deletions.
16 changes: 16 additions & 0 deletions .changelog/2659.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
go/storage: Refactor checkpointing interface

Previously the way storage checkpoints were implemented had several
drawbacks, namely:

- Since the checkpoint only streamed key/value pairs this prevented
correct tree reconstruction as tree nodes also include a Round field
which specifies the round at which a given tree node was created.

- While old checkpoints were streamed in chunks and thus could be
resumed or streamed in parallel from multiple nodes, there was no
support for verifying the integrity of a single chunk.

This change introduces an explicit checkpointing mechanism with a simple
file-based backend reference implementation. The same mechanism could
also be used in the future with Tendermint's app state sync proposal.
26 changes: 21 additions & 5 deletions go/common/cbor/cbor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,22 @@
// to always have the same serialization.
package cbor

import "github.com/fxamacker/cbor"
import (
"io"

"github.com/fxamacker/cbor"
)

// RawMessage is a raw encoded CBOR value. It implements Marshaler and
// Unmarshaler interfaces and can be used to delay CBOR decoding or
// precompute a CBOR encoding.
type RawMessage = cbor.RawMessage

var encOptions = cbor.EncOptions{
Canonical: true,
TimeRFC3339: false, // Second granular unix timestamps
}

// FixSliceForSerde will convert `nil` to `[]byte` to work around serde
// brain damage.
func FixSliceForSerde(b []byte) []byte {
Expand All @@ -23,10 +32,7 @@ func FixSliceForSerde(b []byte) []byte {

// Marshal serializes a given type into a CBOR byte vector.
func Marshal(src interface{}) []byte {
b, err := cbor.Marshal(src, cbor.EncOptions{
Canonical: true,
TimeRFC3339: false, // Second granular unix timestamps
})
b, err := cbor.Marshal(src, encOptions)
if err != nil {
panic("common/cbor: failed to marshal: " + err.Error())
}
Expand All @@ -49,3 +55,13 @@ func MustUnmarshal(data []byte, dst interface{}) {
panic(err)
}
}

// NewEncoder creates a new CBOR encoder.
func NewEncoder(w io.Writer) *cbor.Encoder {
return cbor.NewEncoder(w, encOptions)
}

// NewDecoder creates a new CBOR decoder.
func NewDecoder(r io.Reader) *cbor.Decoder {
return cbor.NewDecoder(r)
}
25 changes: 25 additions & 0 deletions go/common/crypto/hash/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/base64"
"encoding/hex"
"errors"
"hash"

"github.com/oasislabs/oasis-core/go/common/cbor"
)
Expand Down Expand Up @@ -107,3 +108,27 @@ func (h *Hash) IsEmpty() bool {
func (h Hash) String() string {
return hex.EncodeToString(h[:])
}

// Builder is a hash builder that can be used to compute hashes iteratively.
type Builder struct {
hasher hash.Hash
}

// Write adds more data to the running hash.
// It never returns an error.
func (b *Builder) Write(p []byte) (int, error) {
return b.hasher.Write(p)
}

// Build returns the current hash.
// It does not change the underlying hash state.
func (b *Builder) Build() (h Hash) {
sum := b.hasher.Sum([]byte{})
_ = h.UnmarshalBinary(sum[:])
return
}

// NewBuilder creates a new hash builder.
func NewBuilder() *Builder {
return &Builder{hasher: sha512.New512_256()}
}
27 changes: 27 additions & 0 deletions go/common/grpc/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package grpc

import (
"io"

"google.golang.org/grpc"
)

type streamWriter struct {
grpc.ServerStream
}

// Implements io.Writer.
func (c *streamWriter) Write(p []byte) (int, error) {
err := c.SendMsg(p)
if err != nil {
return 0, err
}
return len(p), nil
}

// NewStreamWriter wraps a server-side gRPC stream into an io.Writer interface so that a stream can
// be used as a writer. Each Write into such a strema will cause a message to be sent, encoded as a
// raw byte slice.
func NewStreamWriter(stream grpc.ServerStream) io.Writer {
return &streamWriter{stream}
}
1 change: 1 addition & 0 deletions go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
github.com/glycerine/goconvey v0.0.0-20190410193231-58a59202ab31 // indirect
github.com/go-kit/kit v0.9.0
github.com/golang/protobuf v1.3.2
github.com/golang/snappy v0.0.1
github.com/google/gofuzz v1.0.0
github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
Expand Down
10 changes: 8 additions & 2 deletions go/oasis-node/cmd/debug/byzantine/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"io"

"github.com/pkg/errors"
"google.golang.org/grpc"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/oasislabs/oasis-core/go/common/node"
scheduler "github.com/oasislabs/oasis-core/go/scheduler/api"
storage "github.com/oasislabs/oasis-core/go/storage/api"
"github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/checkpoint"
"github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/syncer"
)

Expand Down Expand Up @@ -119,8 +121,12 @@ func (hns *honestNodeStorage) GetDiff(ctx context.Context, request *storage.GetD
return hns.client.GetDiff(ctx, request)
}

func (hns *honestNodeStorage) GetCheckpoint(ctx context.Context, request *storage.GetCheckpointRequest) (storage.WriteLogIterator, error) {
return hns.client.GetCheckpoint(ctx, request)
func (hns *honestNodeStorage) GetCheckpoints(ctx context.Context, request *checkpoint.GetCheckpointsRequest) ([]*checkpoint.Metadata, error) {
return hns.client.GetCheckpoints(ctx, request)
}

func (hns *honestNodeStorage) GetCheckpointChunk(ctx context.Context, chunk *checkpoint.ChunkMetadata, w io.Writer) error {
return hns.client.GetCheckpointChunk(ctx, chunk, w)
}

func (hns *honestNodeStorage) Cleanup() {
Expand Down
50 changes: 11 additions & 39 deletions go/oasis-node/cmd/debug/storage/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
storageAPI "github.com/oasislabs/oasis-core/go/storage/api"
storageClient "github.com/oasislabs/oasis-core/go/storage/client"
storageDatabase "github.com/oasislabs/oasis-core/go/storage/database"
"github.com/oasislabs/oasis-core/go/storage/mkvs/urkel"
)

const cfgExportDir = "storage.export.dir"
Expand Down Expand Up @@ -98,26 +99,14 @@ func exportRuntime(dataDir, destDir string, id common.Namespace, rtg *registry.R
<-storageBackend.Initialized()
defer storageBackend.Cleanup()

// Get the checkpoint iterator.
root := storageAPI.Root{
Namespace: id,
Round: rtg.Round,
Hash: rtg.StateRoot,
}
it, err := storageBackend.GetCheckpoint(context.Background(),
&storageAPI.GetCheckpointRequest{
Root: root,
},
)
if err != nil {
logger.Error("failed getting checkpoint",
"err", err,
"namespace", root.Namespace,
"round", root.Round,
"root", root.Hash,
)
return err
}
tree := urkel.NewWithRoot(storageBackend, nil, root)
it := tree.NewIterator(context.Background(), urkel.IteratorPrefetch(10_000))
defer it.Close()

fn := fmt.Sprintf("storage-dump-%v-%d.json",
root.Namespace.String(),
Expand All @@ -127,7 +116,7 @@ func exportRuntime(dataDir, destDir string, id common.Namespace, rtg *registry.R
return exportIterator(fn, &root, it)
}

func exportIterator(fn string, root *storageAPI.Root, it storageAPI.WriteLogIterator) error {
func exportIterator(fn string, root *storageAPI.Root, it urkel.Iterator) error {
// Create the dump file, and initialize a JSON stream encoder.
f, err := os.Create(fn)
if err != nil {
Expand All @@ -152,35 +141,18 @@ func exportIterator(fn string, root *storageAPI.Root, it storageAPI.WriteLogIter
return err
}

// Dump the write log.
for {
more, err := it.Next()
if err != nil {
logger.Error("failed to fetch next item from write log iterator",
"err", err,
)
return err
}

if !more {
return nil
}

val, err := it.Value()
if err != nil {
logger.Error("failed to get value from write log iterator",
"err", err,
)
return err
}

if err = enc.Encode([][]byte{val.Key, val.Value}); err != nil {
// Dump the tree.
for it.Rewind(); it.Valid(); it.Next() {
key, value := it.Key(), it.Value()
if err = enc.Encode([][]byte{key, value}); err != nil {
logger.Error("failed to encode write log entry",
"err", err,
)
return err
}
}

return nil
}

func newDirectStorageBackend(dataDir string, namespace common.Namespace) (storageAPI.Backend, error) {
Expand Down
14 changes: 14 additions & 0 deletions go/oasis-node/cmd/registry/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ const (
CfgStorageMaxApplyOps = "runtime.storage.max_apply_ops"
CfgStorageMaxMergeRoots = "runtime.storage.max_merge_roots"
CfgStorageMaxMergeOps = "runtime.storage.max_merge_ops"
CfgStorageCheckpointInterval = "runtime.storage.checkpoint_interval"
CfgStorageCheckpointNumKept = "runtime.storage.checkpoint_num_kept"
CfgStorageCheckpointChunkSize = "runtime.storage.checkpoint_chunk_size"

// Transaction scheduler flags.
CfgTxnSchedulerGroupSize = "runtime.txn_scheduler.group_size"
Expand Down Expand Up @@ -385,6 +388,9 @@ func runtimeFromFlags() (*registry.Runtime, signature.Signer, error) {
MaxApplyOps: viper.GetUint64(CfgStorageMaxApplyOps),
MaxMergeRoots: viper.GetUint64(CfgStorageMaxMergeRoots),
MaxMergeOps: viper.GetUint64(CfgStorageMaxMergeOps),
CheckpointInterval: viper.GetUint64(CfgStorageCheckpointInterval),
CheckpointNumKept: viper.GetUint64(CfgStorageCheckpointNumKept),
CheckpointChunkSize: viper.GetUint64(CfgStorageCheckpointChunkSize),
},
}
if teeHardware == node.TEEHardwareIntelSGX {
Expand Down Expand Up @@ -427,6 +433,11 @@ func runtimeFromFlags() (*registry.Runtime, signature.Signer, error) {
return nil, nil, fmt.Errorf("invalid runtime admission policy")
}

// Validate storage configuration.
if err = registry.VerifyRegisterRuntimeStorageArgs(rt, logger); err != nil {
return nil, nil, fmt.Errorf("invalid runtime storage configuration: %w", err)
}

return rt, signer, nil
}

Expand Down Expand Up @@ -518,6 +529,9 @@ func init() {
runtimeFlags.Uint64(CfgStorageMaxApplyOps, 2, "Maximum number of apply operations in a batch")
runtimeFlags.Uint64(CfgStorageMaxMergeRoots, 8, "Maximum number of merge roots")
runtimeFlags.Uint64(CfgStorageMaxMergeOps, 2, "Maximum number of merge operations in a batch")
runtimeFlags.Uint64(CfgStorageCheckpointInterval, 0, "Storage checkpoint interval (in rounds)")
runtimeFlags.Uint64(CfgStorageCheckpointNumKept, 0, "Number of storage checkpoints to keep")
runtimeFlags.Uint64(CfgStorageCheckpointChunkSize, 0, "Storage checkpoint chunk size")

// Init Admission policy flags.
runtimeFlags.String(CfgAdmissionPolicy, "", "What type of node admission policy to have")
Expand Down
16 changes: 12 additions & 4 deletions go/oasis-node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
staking "github.com/oasislabs/oasis-core/go/staking/api"
stakingTests "github.com/oasislabs/oasis-core/go/staking/tests"
"github.com/oasislabs/oasis-core/go/storage"
storageAPI "github.com/oasislabs/oasis-core/go/storage/api"
storageClient "github.com/oasislabs/oasis-core/go/storage/client"
storageClientTests "github.com/oasislabs/oasis-core/go/storage/client/tests"
storageTests "github.com/oasislabs/oasis-core/go/storage/tests"
Expand Down Expand Up @@ -388,11 +389,13 @@ func testStorage(t *testing.T, node *testNode) {
require.NoError(t, err, "TempDir")
defer os.RemoveAll(dataDir)

storage, err := storage.New(context.Background(), dataDir, testRuntimeID, node.Identity, node.Consensus.Scheduler(), node.Consensus.Registry())
backend, err := storage.New(context.Background(), dataDir, testRuntimeID, node.Identity, node.Consensus.Scheduler(), node.Consensus.Registry())
require.NoError(t, err, "storage.New")
defer storage.Cleanup()
defer backend.Cleanup()
// We are always testing a local storage backend here.
localBackend := backend.(storageAPI.LocalBackend)

storageTests.StorageImplementationTests(t, storage, testRuntimeID, 0)
storageTests.StorageImplementationTests(t, localBackend, backend, testRuntimeID, 0)
}

func testRegistry(t *testing.T, node *testNode) {
Expand Down Expand Up @@ -462,6 +465,11 @@ func testClient(t *testing.T, node *testNode) {
func testStorageClientWithNode(t *testing.T, node *testNode) {
ctx := context.Background()

// Get the local storage backend (the one that the client is connecting to).
rt, err := node.RuntimeRegistry.GetRuntime(testRuntimeID)
require.NoError(t, err, "GetRuntime")
localBackend := rt.Storage().(storageAPI.LocalBackend)

client, err := storageClient.NewStatic(ctx, testRuntimeID, node.Identity, node.Consensus.Registry(), node.Identity.NodeSigner.Public())
require.NoError(t, err, "NewStatic")

Expand All @@ -470,7 +478,7 @@ func testStorageClientWithNode(t *testing.T, node *testNode) {
blk, err := node.Consensus.RootHash().GetLatestBlock(ctx, testRuntimeID, consensusAPI.HeightLatest)
require.NoError(t, err, "GetLatestBlock")

storageTests.StorageImplementationTests(t, client, testRuntimeID, blk.Header.Round+1)
storageTests.StorageImplementationTests(t, localBackend, client, testRuntimeID, blk.Header.Round+1)
}

func testStorageClientWithoutNode(t *testing.T, node *testNode) {
Expand Down
8 changes: 8 additions & 0 deletions go/oasis-test-runner/oasis/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"path/filepath"
"strconv"
"time"

"github.com/oasislabs/oasis-core/go/common"
commonGrpc "github.com/oasislabs/oasis-core/go/common/grpc"
Expand Down Expand Up @@ -320,6 +321,13 @@ func (args *argBuilder) workerStorageDebugIgnoreApplies(ignore bool) *argBuilder
return args
}

func (args *argBuilder) workerStorageCheckpointCheckInterval(interval time.Duration) *argBuilder {
if interval > 0 {
args.vec = append(args.vec, "--"+workerStorage.CfgWorkerCheckpointCheckInterval, interval.String())
}
return args
}

func (args *argBuilder) workerTxnschedulerCheckTxEnabled() *argBuilder {
args.vec = append(args.vec, "--"+txnscheduler.CfgCheckTxEnabled)
return args
Expand Down
3 changes: 3 additions & 0 deletions go/oasis-test-runner/oasis/cli/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ func (r *RegistryHelpers) GenerateRegisterRuntimeTx(
"--"+cmdRegRt.CfgStorageMaxApplyOps, strconv.FormatUint(runtime.Storage.MaxApplyOps, 10),
"--"+cmdRegRt.CfgStorageMaxMergeRoots, strconv.FormatUint(runtime.Storage.MaxMergeRoots, 10),
"--"+cmdRegRt.CfgStorageMaxMergeOps, strconv.FormatUint(runtime.Storage.MaxMergeOps, 10),
"--"+cmdRegRt.CfgStorageCheckpointInterval, strconv.FormatUint(runtime.Storage.CheckpointInterval, 10),
"--"+cmdRegRt.CfgStorageCheckpointNumKept, strconv.FormatUint(runtime.Storage.CheckpointNumKept, 10),
"--"+cmdRegRt.CfgStorageCheckpointChunkSize, strconv.FormatUint(runtime.Storage.CheckpointChunkSize, 10),
"--"+cmdRegRt.CfgTxnSchedulerGroupSize, strconv.FormatUint(runtime.TxnScheduler.GroupSize, 10),
"--"+cmdRegRt.CfgTxnSchedulerAlgorithm, runtime.TxnScheduler.Algorithm,
"--"+cmdRegRt.CfgTxnSchedulerBatchFlushTimeout, runtime.TxnScheduler.BatchFlushTimeout.String(),
Expand Down
Loading

0 comments on commit 46f06ef

Please sign in to comment.