Skip to content

Commit

Permalink
W1P: go/storage: Refactor checkpointing interface
Browse files Browse the repository at this point in the history
Previously the way storage checkpoints were implemented had several
drawbacks, namely:

* ...

TODO: Write better description.
  • Loading branch information
kostko committed Feb 10, 2020
1 parent a5172fb commit b28b016
Show file tree
Hide file tree
Showing 36 changed files with 1,362 additions and 303 deletions.
26 changes: 21 additions & 5 deletions go/common/cbor/cbor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,22 @@
// to always have the same serialization.
package cbor

import "github.com/fxamacker/cbor"
import (
"io"

"github.com/fxamacker/cbor"
)

// RawMessage is a raw encoded CBOR value. It implements Marshaler and
// Unmarshaler interfaces and can be used to delay CBOR decoding or
// precompute a CBOR encoding.
type RawMessage = cbor.RawMessage

var encOptions = cbor.EncOptions{
Canonical: true,
TimeRFC3339: false, // Second granular unix timestamps
}

// FixSliceForSerde will convert `nil` to `[]byte` to work around serde
// brain damage.
func FixSliceForSerde(b []byte) []byte {
Expand All @@ -23,10 +32,7 @@ func FixSliceForSerde(b []byte) []byte {

// Marshal serializes a given type into a CBOR byte vector.
func Marshal(src interface{}) []byte {
b, err := cbor.Marshal(src, cbor.EncOptions{
Canonical: true,
TimeRFC3339: false, // Second granular unix timestamps
})
b, err := cbor.Marshal(src, encOptions)
if err != nil {
panic("common/cbor: failed to marshal: " + err.Error())
}
Expand All @@ -49,3 +55,13 @@ func MustUnmarshal(data []byte, dst interface{}) {
panic(err)
}
}

// NewEncoder creates a new CBOR encoder.
func NewEncoder(w io.Writer) *cbor.Encoder {
return cbor.NewEncoder(w, encOptions)
}

// NewDecoder creates a new CBOR decoder.
func NewDecoder(r io.Reader) *cbor.Decoder {
return cbor.NewDecoder(r)
}
25 changes: 25 additions & 0 deletions go/common/crypto/hash/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/base64"
"encoding/hex"
"errors"
"hash"

"github.com/oasislabs/oasis-core/go/common/cbor"
)
Expand Down Expand Up @@ -107,3 +108,27 @@ func (h *Hash) IsEmpty() bool {
func (h Hash) String() string {
return hex.EncodeToString(h[:])
}

// Builder is a hash builder that can be used to compute hashes iteratively.
type Builder struct {
hasher hash.Hash
}

// Write adds more data to the running hash.
// It never returns an error.
func (b *Builder) Write(p []byte) (int, error) {
return b.hasher.Write(p)
}

// Build returns the current hash.
// It does not change the underlying hash state.
func (b *Builder) Build() (h Hash) {
sum := b.hasher.Sum([]byte{})
_ = h.UnmarshalBinary(sum[:])
return
}

// NewBuilder creates a new hash builder.
func NewBuilder() *Builder {
return &Builder{hasher: sha512.New512_256()}
}
1 change: 1 addition & 0 deletions go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
github.com/glycerine/goconvey v0.0.0-20190410193231-58a59202ab31 // indirect
github.com/go-kit/kit v0.9.0
github.com/golang/protobuf v1.3.2
github.com/golang/snappy v0.0.1
github.com/google/gofuzz v1.0.0
github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
Expand Down
10 changes: 8 additions & 2 deletions go/oasis-node/cmd/debug/byzantine/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"io"

"github.com/pkg/errors"
"google.golang.org/grpc"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/oasislabs/oasis-core/go/common/node"
scheduler "github.com/oasislabs/oasis-core/go/scheduler/api"
storage "github.com/oasislabs/oasis-core/go/storage/api"
"github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/checkpoint"
"github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/syncer"
)

Expand Down Expand Up @@ -119,8 +121,12 @@ func (hns *honestNodeStorage) GetDiff(ctx context.Context, request *storage.GetD
return hns.client.GetDiff(ctx, request)
}

func (hns *honestNodeStorage) GetCheckpoint(ctx context.Context, request *storage.GetCheckpointRequest) (storage.WriteLogIterator, error) {
return hns.client.GetCheckpoint(ctx, request)
func (hns *honestNodeStorage) GetCheckpoints(ctx context.Context, request *checkpoint.GetCheckpointsRequest) ([]*checkpoint.Metadata, error) {
return hns.client.GetCheckpoints(ctx, request)
}

func (hns *honestNodeStorage) GetCheckpointChunk(ctx context.Context, chunk *checkpoint.ChunkMetadata, w io.Writer) error {
return hns.client.GetCheckpointChunk(ctx, chunk, w)
}

func (hns *honestNodeStorage) Cleanup() {
Expand Down
50 changes: 11 additions & 39 deletions go/oasis-node/cmd/debug/storage/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
storageAPI "github.com/oasislabs/oasis-core/go/storage/api"
storageClient "github.com/oasislabs/oasis-core/go/storage/client"
storageDatabase "github.com/oasislabs/oasis-core/go/storage/database"
"github.com/oasislabs/oasis-core/go/storage/mkvs/urkel"
)

const cfgExportDir = "storage.export.dir"
Expand Down Expand Up @@ -98,26 +99,14 @@ func exportRuntime(dataDir, destDir string, id common.Namespace, rtg *registry.R
<-storageBackend.Initialized()
defer storageBackend.Cleanup()

// Get the checkpoint iterator.
root := storageAPI.Root{
Namespace: id,
Round: rtg.Round,
Hash: rtg.StateRoot,
}
it, err := storageBackend.GetCheckpoint(context.Background(),
&storageAPI.GetCheckpointRequest{
Root: root,
},
)
if err != nil {
logger.Error("failed getting checkpoint",
"err", err,
"namespace", root.Namespace,
"round", root.Round,
"root", root.Hash,
)
return err
}
tree := urkel.NewWithRoot(storageBackend, nil, root)
it := tree.NewIterator(context.Background(), urkel.IteratorPrefetch(10_000))
defer it.Close()

fn := fmt.Sprintf("storage-dump-%v-%d.json",
root.Namespace.String(),
Expand All @@ -127,7 +116,7 @@ func exportRuntime(dataDir, destDir string, id common.Namespace, rtg *registry.R
return exportIterator(fn, &root, it)
}

func exportIterator(fn string, root *storageAPI.Root, it storageAPI.WriteLogIterator) error {
func exportIterator(fn string, root *storageAPI.Root, it urkel.Iterator) error {
// Create the dump file, and initialize a JSON stream encoder.
f, err := os.Create(fn)
if err != nil {
Expand All @@ -152,35 +141,18 @@ func exportIterator(fn string, root *storageAPI.Root, it storageAPI.WriteLogIter
return err
}

// Dump the write log.
for {
more, err := it.Next()
if err != nil {
logger.Error("failed to fetch next item from write log iterator",
"err", err,
)
return err
}

if !more {
return nil
}

val, err := it.Value()
if err != nil {
logger.Error("failed to get value from write log iterator",
"err", err,
)
return err
}

if err = enc.Encode([][]byte{val.Key, val.Value}); err != nil {
// Dump the tree.
for it.Rewind(); it.Valid(); it.Next() {
key, value := it.Key(), it.Value()
if err = enc.Encode([][]byte{key, value}); err != nil {
logger.Error("failed to encode write log entry",
"err", err,
)
return err
}
}

return nil
}

func newDirectStorageBackend(dataDir string, namespace common.Namespace) (storageAPI.Backend, error) {
Expand Down
16 changes: 12 additions & 4 deletions go/oasis-node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
staking "github.com/oasislabs/oasis-core/go/staking/api"
stakingTests "github.com/oasislabs/oasis-core/go/staking/tests"
"github.com/oasislabs/oasis-core/go/storage"
storageAPI "github.com/oasislabs/oasis-core/go/storage/api"
storageClient "github.com/oasislabs/oasis-core/go/storage/client"
storageClientTests "github.com/oasislabs/oasis-core/go/storage/client/tests"
storageTests "github.com/oasislabs/oasis-core/go/storage/tests"
Expand Down Expand Up @@ -388,11 +389,13 @@ func testStorage(t *testing.T, node *testNode) {
require.NoError(t, err, "TempDir")
defer os.RemoveAll(dataDir)

storage, err := storage.New(context.Background(), dataDir, testRuntimeID, node.Identity, node.Scheduler, node.Registry)
backend, err := storage.New(context.Background(), dataDir, testRuntimeID, node.Identity, node.Scheduler, node.Registry)
require.NoError(t, err, "storage.New")
defer storage.Cleanup()
defer backend.Cleanup()
// We are always testing a local storage backend here.
localBackend := backend.(storageAPI.LocalBackend)

storageTests.StorageImplementationTests(t, storage, testRuntimeID, 0)
storageTests.StorageImplementationTests(t, localBackend, backend, testRuntimeID, 0)
}

func testRegistry(t *testing.T, node *testNode) {
Expand Down Expand Up @@ -462,6 +465,11 @@ func testClient(t *testing.T, node *testNode) {
func testStorageClientWithNode(t *testing.T, node *testNode) {
ctx := context.Background()

// Get the local storage backend (the one that the client is connecting to).
rt, err := node.RuntimeRegistry.GetRuntime(testRuntimeID)
require.NoError(t, err, "GetRuntime")
localBackend := rt.Storage().(storageAPI.LocalBackend)

client, err := storageClient.NewStatic(ctx, testRuntimeID, node.Identity, node.Registry, node.Identity.NodeSigner.Public())
require.NoError(t, err, "NewStatic")

Expand All @@ -470,7 +478,7 @@ func testStorageClientWithNode(t *testing.T, node *testNode) {
blk, err := node.RootHash.GetLatestBlock(ctx, testRuntimeID, consensusAPI.HeightLatest)
require.NoError(t, err, "GetLatestBlock")

storageTests.StorageImplementationTests(t, client, testRuntimeID, blk.Header.Round+1)
storageTests.StorageImplementationTests(t, localBackend, client, testRuntimeID, blk.Header.Round+1)
}

func testStorageClientWithoutNode(t *testing.T, node *testNode) {
Expand Down
21 changes: 21 additions & 0 deletions go/registry/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,27 @@ func VerifyRegisterRuntimeArgs(
)
return nil, fmt.Errorf("%w: storage MaxMergeOps parameter too small", ErrInvalidArgument)
}
// Verify storage checkpointing configuration if enabled.
if rt.Storage.CheckpointInterval > 0 {
if rt.Storage.CheckpointInterval < 1000 {
logger.Error("RegisterRuntime: storage CheckpointInterval parameter too small",
"runtime", rt,
)
return nil, fmt.Errorf("%w: storage CheckpointInterval parameter too small", ErrInvalidArgument)
}
if rt.Storage.CheckpointNumKept == 0 {
logger.Error("RegisterRuntime: storage CheckpointNumKept parameter too small",
"runtime", rt,
)
return nil, fmt.Errorf("%w: storage CheckpointNumKept parameter too small", ErrInvalidArgument)
}
if rt.Storage.CheckpointChunkSize < 1024*1024 {
logger.Error("RegisterRuntime: storage CheckpointChunkSize parameter too small",
"runtime", rt,
)
return nil, fmt.Errorf("%w: storage CheckpointChunkSize parameter too small", ErrInvalidArgument)
}
}

if rt.ID.IsKeyManager() {
logger.Error("RegisterRuntime: runtime ID flag mismatch",
Expand Down
9 changes: 9 additions & 0 deletions go/registry/api/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ type StorageParameters struct {

// MaxApplyOps configures the maximum number of merge operations in a batch.
MaxMergeOps uint64 `json:"max_merge_ops"`

// CheckpointInterval is the expected runtime state checkpoint interval (in rounds).
CheckpointInterval uint64 `json:"checkpoint_interval"`

// CheckpointNumKept is the expected minimum number of checkpoints to keep.
CheckpointNumKept uint64 `json:"checkpoint_num_kept"`

// CheckpointChunkSize is the chunk size parameter for checkpoint creation.
CheckpointChunkSize uint64 `json:"checkpoint_chunk_size"`
}

// AnyNodeRuntimeAdmissionPolicy allows any node to register.
Expand Down
15 changes: 15 additions & 0 deletions go/roothash/api/block/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,21 @@ func (h *Header) EncodedHash() hash.Hash {
return hh
}

// StorageRoots returns the storage roots contained in this header.
func (h *Header) StorageRoots() (roots []storage.Root) {
for _, rootHash := range []hash.Hash{
h.IORoot,
h.StateRoot,
} {
roots = append(roots, storage.Root{
Namespace: h.Namespace,
Round: h.Round,
Hash: rootHash,
})
}
return
}

// RootsForStorageReceipt gets the merkle roots that must be part of
// a storage receipt.
func (h *Header) RootsForStorageReceipt() []hash.Hash {
Expand Down
15 changes: 0 additions & 15 deletions go/runtime/history/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ type Pruner interface {
// Prune purges unneeded history, given the latest round.
Prune(ctx context.Context, latestRound uint64) error

// NextCheckpoint returns the round which should be fetched as a
// checkpoint, given the latest round and last fetched checkpoint.
NextCheckpoint(latestRound, lastCheckpoint uint64) (uint64, error)

// RegisterHandler registers a prune handler.
RegisterHandler(handler PruneHandler)
}
Expand Down Expand Up @@ -74,10 +70,6 @@ func (p *nonePruner) Prune(ctx context.Context, latestRound uint64) error {
return nil
}

func (p *nonePruner) NextCheckpoint(latestRound, lastCheckpoint uint64) (uint64, error) {
return 0, nil
}

// NewNonePruner creates a new pruner that never prunes anything.
func NewNonePruner() PrunerFactory {
return func(db *DB) (Pruner, error) {
Expand Down Expand Up @@ -160,13 +152,6 @@ func (p *keepLastPruner) Prune(ctx context.Context, latestRound uint64) error {
})
}

func (p *keepLastPruner) NextCheckpoint(latestRound, lastCheckpoint uint64) (uint64, error) {
if latestRound < p.numKept {
return 0, nil
}
return latestRound - p.numKept + 1, nil
}

// NewKeepLastPruner creates a pruner that keeps the last configured
// number of rounds.
func NewKeepLastPruner(numKept uint64) PrunerFactory {
Expand Down
16 changes: 13 additions & 3 deletions go/runtime/registry/storage_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package registry

import (
"context"
"io"

"github.com/oasislabs/oasis-core/go/common"
"github.com/oasislabs/oasis-core/go/storage/api"
"github.com/oasislabs/oasis-core/go/storage/mkvs/urkel/checkpoint"
)

var _ api.Backend = (*storageRouter)(nil)
Expand Down Expand Up @@ -81,12 +83,20 @@ func (sr *storageRouter) GetDiff(ctx context.Context, request *api.GetDiffReques
return rt.Storage().GetDiff(ctx, request)
}

func (sr *storageRouter) GetCheckpoint(ctx context.Context, request *api.GetCheckpointRequest) (api.WriteLogIterator, error) {
rt, err := sr.getRuntime(request.Root.Namespace)
func (sr *storageRouter) GetCheckpoints(ctx context.Context, request *checkpoint.GetCheckpointsRequest) ([]*checkpoint.Metadata, error) {
rt, err := sr.getRuntime(request.Namespace)
if err != nil {
return nil, err
}
return rt.Storage().GetCheckpoint(ctx, request)
return rt.Storage().GetCheckpoints(ctx, request)
}

func (sr *storageRouter) GetCheckpointChunk(ctx context.Context, chunk *checkpoint.ChunkMetadata, w io.Writer) error {
rt, err := sr.getRuntime(chunk.Root.Namespace)
if err != nil {
return err
}
return rt.Storage().GetCheckpointChunk(ctx, chunk, w)
}

func (sr *storageRouter) Cleanup() {
Expand Down
Loading

0 comments on commit b28b016

Please sign in to comment.