Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/storage: Refactor checkpointing interface #2659

Merged
merged 1 commit into from
Feb 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .changelog/2659.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
go/storage: Refactor checkpointing interface

Previously the way storage checkpoints were implemented had several
drawbacks, namely:

- Since the checkpoint only streamed key/value pairs this prevented
correct tree reconstruction as tree nodes also include a Round field
which specifies the round at which a given tree node was created.

- While old checkpoints were streamed in chunks and thus could be
resumed or streamed in parallel from multiple nodes, there was no
support for verifying the integrity of a single chunk.

This change introduces an explicit checkpointing mechanism with a simple
file-based backend reference implementation. The same mechanism could
also be used in the future with Tendermint's app state sync proposal.
26 changes: 21 additions & 5 deletions go/common/cbor/cbor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,22 @@
// to always have the same serialization.
package cbor

import "github.com/fxamacker/cbor"
import (
"io"

"github.com/fxamacker/cbor"
)

// RawMessage is a raw encoded CBOR value. It implements Marshaler and
// Unmarshaler interfaces and can be used to delay CBOR decoding or
// precompute a CBOR encoding.
type RawMessage = cbor.RawMessage

var encOptions = cbor.EncOptions{
Canonical: true,
TimeRFC3339: false, // Second granular unix timestamps
}

// FixSliceForSerde will convert `nil` to `[]byte` to work around serde
// brain damage.
func FixSliceForSerde(b []byte) []byte {
Expand All @@ -23,10 +32,7 @@ func FixSliceForSerde(b []byte) []byte {

// Marshal serializes a given type into a CBOR byte vector.
func Marshal(src interface{}) []byte {
b, err := cbor.Marshal(src, cbor.EncOptions{
Canonical: true,
TimeRFC3339: false, // Second granular unix timestamps
})
b, err := cbor.Marshal(src, encOptions)
if err != nil {
panic("common/cbor: failed to marshal: " + err.Error())
}
Expand All @@ -49,3 +55,13 @@ func MustUnmarshal(data []byte, dst interface{}) {
panic(err)
}
}

// NewEncoder creates a new CBOR encoder.
func NewEncoder(w io.Writer) *cbor.Encoder {
return cbor.NewEncoder(w, encOptions)
}

// NewDecoder creates a new CBOR decoder.
func NewDecoder(r io.Reader) *cbor.Decoder {
return cbor.NewDecoder(r)
}
25 changes: 25 additions & 0 deletions go/common/crypto/hash/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/base64"
"encoding/hex"
"errors"
"hash"

"github.com/oasislabs/oasis-core/go/common/cbor"
)
Expand Down Expand Up @@ -107,3 +108,27 @@ func (h *Hash) IsEmpty() bool {
func (h Hash) String() string {
return hex.EncodeToString(h[:])
}

// Builder is a hash builder that can be used to compute hashes iteratively.
type Builder struct {
hasher hash.Hash
}

// Write adds more data to the running hash.
// It never returns an error.
func (b *Builder) Write(p []byte) (int, error) {
return b.hasher.Write(p)
}

// Build returns the current hash.
// It does not change the underlying hash state.
func (b *Builder) Build() (h Hash) {
sum := b.hasher.Sum([]byte{})
_ = h.UnmarshalBinary(sum[:])
return
}

// NewBuilder creates a new hash builder.
func NewBuilder() *Builder {
return &Builder{hasher: sha512.New512_256()}
}
27 changes: 27 additions & 0 deletions go/common/grpc/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package grpc

import (
"io"

"google.golang.org/grpc"
)

type streamWriter struct {
grpc.ServerStream
}

// Implements io.Writer.
func (c *streamWriter) Write(p []byte) (int, error) {
err := c.SendMsg(p)
if err != nil {
return 0, err
}
return len(p), nil
}

// NewStreamWriter wraps a server-side gRPC stream into an io.Writer interface so that a stream can
// be used as a writer. Each Write into such a strema will cause a message to be sent, encoded as a
// raw byte slice.
func NewStreamWriter(stream grpc.ServerStream) io.Writer {
return &streamWriter{stream}
}
1 change: 1 addition & 0 deletions go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
github.com/glycerine/goconvey v0.0.0-20190410193231-58a59202ab31 // indirect
github.com/go-kit/kit v0.9.0
github.com/golang/protobuf v1.3.2
github.com/golang/snappy v0.0.1
github.com/google/gofuzz v1.0.0
github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
Expand Down
10 changes: 8 additions & 2 deletions go/oasis-node/cmd/debug/byzantine/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"io"

"github.com/pkg/errors"
"google.golang.org/grpc"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/oasislabs/oasis-core/go/common/node"
scheduler "github.com/oasislabs/oasis-core/go/scheduler/api"
storage "github.com/oasislabs/oasis-core/go/storage/api"
"github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/checkpoint"
"github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/syncer"
)

Expand Down Expand Up @@ -119,8 +121,12 @@ func (hns *honestNodeStorage) GetDiff(ctx context.Context, request *storage.GetD
return hns.client.GetDiff(ctx, request)
}

func (hns *honestNodeStorage) GetCheckpoint(ctx context.Context, request *storage.GetCheckpointRequest) (storage.WriteLogIterator, error) {
return hns.client.GetCheckpoint(ctx, request)
func (hns *honestNodeStorage) GetCheckpoints(ctx context.Context, request *checkpoint.GetCheckpointsRequest) ([]*checkpoint.Metadata, error) {
return hns.client.GetCheckpoints(ctx, request)
}

func (hns *honestNodeStorage) GetCheckpointChunk(ctx context.Context, chunk *checkpoint.ChunkMetadata, w io.Writer) error {
return hns.client.GetCheckpointChunk(ctx, chunk, w)
}

func (hns *honestNodeStorage) Cleanup() {
Expand Down
50 changes: 11 additions & 39 deletions go/oasis-node/cmd/debug/storage/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
storageAPI "github.com/oasislabs/oasis-core/go/storage/api"
storageClient "github.com/oasislabs/oasis-core/go/storage/client"
storageDatabase "github.com/oasislabs/oasis-core/go/storage/database"
"github.com/oasislabs/oasis-core/go/storage/mkvs/urkel"
)

const cfgExportDir = "storage.export.dir"
Expand Down Expand Up @@ -98,26 +99,14 @@ func exportRuntime(dataDir, destDir string, id common.Namespace, rtg *registry.R
<-storageBackend.Initialized()
defer storageBackend.Cleanup()

// Get the checkpoint iterator.
root := storageAPI.Root{
Namespace: id,
Round: rtg.Round,
Hash: rtg.StateRoot,
}
it, err := storageBackend.GetCheckpoint(context.Background(),
&storageAPI.GetCheckpointRequest{
Root: root,
},
)
if err != nil {
logger.Error("failed getting checkpoint",
"err", err,
"namespace", root.Namespace,
"round", root.Round,
"root", root.Hash,
)
return err
}
tree := urkel.NewWithRoot(storageBackend, nil, root)
it := tree.NewIterator(context.Background(), urkel.IteratorPrefetch(10_000))
defer it.Close()

fn := fmt.Sprintf("storage-dump-%v-%d.json",
root.Namespace.String(),
Expand All @@ -127,7 +116,7 @@ func exportRuntime(dataDir, destDir string, id common.Namespace, rtg *registry.R
return exportIterator(fn, &root, it)
}

func exportIterator(fn string, root *storageAPI.Root, it storageAPI.WriteLogIterator) error {
func exportIterator(fn string, root *storageAPI.Root, it urkel.Iterator) error {
// Create the dump file, and initialize a JSON stream encoder.
f, err := os.Create(fn)
if err != nil {
Expand All @@ -152,35 +141,18 @@ func exportIterator(fn string, root *storageAPI.Root, it storageAPI.WriteLogIter
return err
}

// Dump the write log.
for {
more, err := it.Next()
if err != nil {
logger.Error("failed to fetch next item from write log iterator",
"err", err,
)
return err
}

if !more {
return nil
}

val, err := it.Value()
if err != nil {
logger.Error("failed to get value from write log iterator",
"err", err,
)
return err
}

if err = enc.Encode([][]byte{val.Key, val.Value}); err != nil {
// Dump the tree.
for it.Rewind(); it.Valid(); it.Next() {
key, value := it.Key(), it.Value()
if err = enc.Encode([][]byte{key, value}); err != nil {
logger.Error("failed to encode write log entry",
"err", err,
)
return err
}
}

return nil
}

func newDirectStorageBackend(dataDir string, namespace common.Namespace) (storageAPI.Backend, error) {
Expand Down
14 changes: 14 additions & 0 deletions go/oasis-node/cmd/registry/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ const (
CfgStorageMaxApplyOps = "runtime.storage.max_apply_ops"
CfgStorageMaxMergeRoots = "runtime.storage.max_merge_roots"
CfgStorageMaxMergeOps = "runtime.storage.max_merge_ops"
CfgStorageCheckpointInterval = "runtime.storage.checkpoint_interval"
CfgStorageCheckpointNumKept = "runtime.storage.checkpoint_num_kept"
CfgStorageCheckpointChunkSize = "runtime.storage.checkpoint_chunk_size"

// Transaction scheduler flags.
CfgTxnSchedulerGroupSize = "runtime.txn_scheduler.group_size"
Expand Down Expand Up @@ -385,6 +388,9 @@ func runtimeFromFlags() (*registry.Runtime, signature.Signer, error) {
MaxApplyOps: viper.GetUint64(CfgStorageMaxApplyOps),
MaxMergeRoots: viper.GetUint64(CfgStorageMaxMergeRoots),
MaxMergeOps: viper.GetUint64(CfgStorageMaxMergeOps),
CheckpointInterval: viper.GetUint64(CfgStorageCheckpointInterval),
CheckpointNumKept: viper.GetUint64(CfgStorageCheckpointNumKept),
CheckpointChunkSize: viper.GetUint64(CfgStorageCheckpointChunkSize),
kostko marked this conversation as resolved.
Show resolved Hide resolved
},
}
if teeHardware == node.TEEHardwareIntelSGX {
Expand Down Expand Up @@ -427,6 +433,11 @@ func runtimeFromFlags() (*registry.Runtime, signature.Signer, error) {
return nil, nil, fmt.Errorf("invalid runtime admission policy")
}

// Validate storage configuration.
if err = registry.VerifyRegisterRuntimeStorageArgs(rt, logger); err != nil {
return nil, nil, fmt.Errorf("invalid runtime storage configuration: %w", err)
}

return rt, signer, nil
}

Expand Down Expand Up @@ -518,6 +529,9 @@ func init() {
runtimeFlags.Uint64(CfgStorageMaxApplyOps, 2, "Maximum number of apply operations in a batch")
runtimeFlags.Uint64(CfgStorageMaxMergeRoots, 8, "Maximum number of merge roots")
runtimeFlags.Uint64(CfgStorageMaxMergeOps, 2, "Maximum number of merge operations in a batch")
runtimeFlags.Uint64(CfgStorageCheckpointInterval, 0, "Storage checkpoint interval (in rounds)")
runtimeFlags.Uint64(CfgStorageCheckpointNumKept, 0, "Number of storage checkpoints to keep")
runtimeFlags.Uint64(CfgStorageCheckpointChunkSize, 0, "Storage checkpoint chunk size")

// Init Admission policy flags.
runtimeFlags.String(CfgAdmissionPolicy, "", "What type of node admission policy to have")
Expand Down
16 changes: 12 additions & 4 deletions go/oasis-node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
staking "github.com/oasislabs/oasis-core/go/staking/api"
stakingTests "github.com/oasislabs/oasis-core/go/staking/tests"
"github.com/oasislabs/oasis-core/go/storage"
storageAPI "github.com/oasislabs/oasis-core/go/storage/api"
storageClient "github.com/oasislabs/oasis-core/go/storage/client"
storageClientTests "github.com/oasislabs/oasis-core/go/storage/client/tests"
storageTests "github.com/oasislabs/oasis-core/go/storage/tests"
Expand Down Expand Up @@ -388,11 +389,13 @@ func testStorage(t *testing.T, node *testNode) {
require.NoError(t, err, "TempDir")
defer os.RemoveAll(dataDir)

storage, err := storage.New(context.Background(), dataDir, testRuntimeID, node.Identity, node.Consensus.Scheduler(), node.Consensus.Registry())
backend, err := storage.New(context.Background(), dataDir, testRuntimeID, node.Identity, node.Consensus.Scheduler(), node.Consensus.Registry())
require.NoError(t, err, "storage.New")
defer storage.Cleanup()
defer backend.Cleanup()
// We are always testing a local storage backend here.
localBackend := backend.(storageAPI.LocalBackend)

storageTests.StorageImplementationTests(t, storage, testRuntimeID, 0)
storageTests.StorageImplementationTests(t, localBackend, backend, testRuntimeID, 0)
}

func testRegistry(t *testing.T, node *testNode) {
Expand Down Expand Up @@ -462,6 +465,11 @@ func testClient(t *testing.T, node *testNode) {
func testStorageClientWithNode(t *testing.T, node *testNode) {
ctx := context.Background()

// Get the local storage backend (the one that the client is connecting to).
rt, err := node.RuntimeRegistry.GetRuntime(testRuntimeID)
require.NoError(t, err, "GetRuntime")
localBackend := rt.Storage().(storageAPI.LocalBackend)

client, err := storageClient.NewStatic(ctx, testRuntimeID, node.Identity, node.Consensus.Registry(), node.Identity.NodeSigner.Public())
require.NoError(t, err, "NewStatic")

Expand All @@ -470,7 +478,7 @@ func testStorageClientWithNode(t *testing.T, node *testNode) {
blk, err := node.Consensus.RootHash().GetLatestBlock(ctx, testRuntimeID, consensusAPI.HeightLatest)
require.NoError(t, err, "GetLatestBlock")

storageTests.StorageImplementationTests(t, client, testRuntimeID, blk.Header.Round+1)
storageTests.StorageImplementationTests(t, localBackend, client, testRuntimeID, blk.Header.Round+1)
}

func testStorageClientWithoutNode(t *testing.T, node *testNode) {
Expand Down
8 changes: 8 additions & 0 deletions go/oasis-test-runner/oasis/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"path/filepath"
"strconv"
"time"

"github.com/oasislabs/oasis-core/go/common"
commonGrpc "github.com/oasislabs/oasis-core/go/common/grpc"
Expand Down Expand Up @@ -320,6 +321,13 @@ func (args *argBuilder) workerStorageDebugIgnoreApplies(ignore bool) *argBuilder
return args
}

func (args *argBuilder) workerStorageCheckpointCheckInterval(interval time.Duration) *argBuilder {
if interval > 0 {
args.vec = append(args.vec, "--"+workerStorage.CfgWorkerCheckpointCheckInterval, interval.String())
}
return args
}

func (args *argBuilder) workerTxnschedulerCheckTxEnabled() *argBuilder {
args.vec = append(args.vec, "--"+txnscheduler.CfgCheckTxEnabled)
return args
Expand Down
3 changes: 3 additions & 0 deletions go/oasis-test-runner/oasis/cli/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ func (r *RegistryHelpers) GenerateRegisterRuntimeTx(
"--"+cmdRegRt.CfgStorageMaxApplyOps, strconv.FormatUint(runtime.Storage.MaxApplyOps, 10),
"--"+cmdRegRt.CfgStorageMaxMergeRoots, strconv.FormatUint(runtime.Storage.MaxMergeRoots, 10),
"--"+cmdRegRt.CfgStorageMaxMergeOps, strconv.FormatUint(runtime.Storage.MaxMergeOps, 10),
"--"+cmdRegRt.CfgStorageCheckpointInterval, strconv.FormatUint(runtime.Storage.CheckpointInterval, 10),
"--"+cmdRegRt.CfgStorageCheckpointNumKept, strconv.FormatUint(runtime.Storage.CheckpointNumKept, 10),
"--"+cmdRegRt.CfgStorageCheckpointChunkSize, strconv.FormatUint(runtime.Storage.CheckpointChunkSize, 10),
"--"+cmdRegRt.CfgTxnSchedulerGroupSize, strconv.FormatUint(runtime.TxnScheduler.GroupSize, 10),
"--"+cmdRegRt.CfgTxnSchedulerAlgorithm, runtime.TxnScheduler.Algorithm,
"--"+cmdRegRt.CfgTxnSchedulerBatchFlushTimeout, runtime.TxnScheduler.BatchFlushTimeout.String(),
Expand Down
Loading