diff --git a/.buildkite/scripts/common_e2e.sh b/.buildkite/scripts/common_e2e.sh index 938c3a2b503..2605884c28d 100644 --- a/.buildkite/scripts/common_e2e.sh +++ b/.buildkite/scripts/common_e2e.sh @@ -289,6 +289,9 @@ run_compute_node() { # Optional named arguments: # clear_storage - clear storage node dir (default: 1) # +# Output environment variables: +# EKIDEN_LAST_NODE_DATA_DIR - the data directory for the last node run +# run_storage_node() { # Process arguments. local id=$1 @@ -296,6 +299,7 @@ run_storage_node() { # Optional arguments with default values. local clear_storage=1 + local extra_args="" # Load named arguments that override defaults. local "$@" @@ -309,6 +313,7 @@ run_storage_node() { if [[ $clear_storage == 1 ]]; then rm -rf ${data_dir} fi + EKIDEN_LAST_NODE_DATA_DIR="${data_dir}" local log_file=${EKIDEN_COMMITTEE_DIR}/storage-$id.log rm -rf ${log_file} @@ -341,6 +346,7 @@ run_storage_node() { --worker.runtime.id ${EKIDEN_RUNTIME_ID} \ --datadir ${data_dir} \ --debug.allow_test_keys \ + $extra_args \ 2>&1 | sed "s/^/[storage-node-${id}] /" & } diff --git a/.buildkite/scripts/test_e2e.sh b/.buildkite/scripts/test_e2e.sh index 0ef0656f198..75723607d6b 100755 --- a/.buildkite/scripts/test_e2e.sh +++ b/.buildkite/scripts/test_e2e.sh @@ -108,6 +108,40 @@ assert_merge_discrepancy_scenario_works() { assert_merge_discrepancies } +scenario_storage_sync() { + local runtime=simple-keyvalue + local client=simple-keyvalue + + # Initialize compute nodes. + run_compute_node 1 ${runtime} + run_compute_node 2 ${runtime} + run_compute_node 3 ${runtime} + + # Initialize storage nodes. + run_storage_node 1 + run_storage_node 2 extra_args="--worker.debug.storage.ignore_apply" + + # Wait for all nodes to start: 3 compute + 2 storage + key manager. + wait_nodes 6 + + # Advance epoch to elect a new committee. + set_epoch 1 + + # Wait for the client to exit without waiting on it. + while kill -0 ${EKIDEN_CLIENT_PID}; do + sleep 5 + done + + local log_file=${EKIDEN_COMMITTEE_DIR}/client-storage-debug.log + ${EKIDEN_NODE} debug storage check-roots \ + --datadir ${EKIDEN_COMMITTEE_DIR} \ + --log.level debug \ + --log.file ${log_file} \ + --storage.debug.client.address "unix:${EKIDEN_LAST_NODE_DATA_DIR}/internal.sock" \ + --address "unix:${EKIDEN_LAST_NODE_DATA_DIR}/internal.sock" \ + ${EKIDEN_RUNTIME_ID} +} + run_client_km_restart() { local runtime=$1 local client=$2 @@ -198,6 +232,15 @@ test_suite() { client=simple-keyvalue \ on_success_hook=assert_merge_discrepancy_scenario_works \ beacon_deterministic=1 + + # Storage node syncing scenario. + run_test \ + scenario=scenario_storage_sync \ + name="e2e-${backend_name}-storage-sync" \ + backend_runner=$backend_runner \ + runtime=simple-keyvalue \ + client=simple-keyvalue + } ########################################## diff --git a/go/ekiden/cmd/debug/debug.go b/go/ekiden/cmd/debug/debug.go index 6398faa0d8f..fd38b20c206 100644 --- a/go/ekiden/cmd/debug/debug.go +++ b/go/ekiden/cmd/debug/debug.go @@ -7,6 +7,7 @@ import ( "github.com/oasislabs/ekiden/go/ekiden/cmd/debug/client" "github.com/oasislabs/ekiden/go/ekiden/cmd/debug/dummy" "github.com/oasislabs/ekiden/go/ekiden/cmd/debug/roothash" + "github.com/oasislabs/ekiden/go/ekiden/cmd/debug/storage" "github.com/oasislabs/ekiden/go/ekiden/cmd/debug/tendermint" ) @@ -20,6 +21,7 @@ func Register(parentCmd *cobra.Command) { client.Register(debugCmd) dummy.Register(debugCmd) roothash.Register(debugCmd) + storage.Register(debugCmd) tendermint.Register(debugCmd) parentCmd.AddCommand(debugCmd) diff --git a/go/ekiden/cmd/debug/roothash/roothash.go b/go/ekiden/cmd/debug/roothash/roothash.go index aa130274000..37028c9b6d7 100644 --- a/go/ekiden/cmd/debug/roothash/roothash.go +++ b/go/ekiden/cmd/debug/roothash/roothash.go @@ -37,7 +37,7 @@ var ( return err } for _, arg := range args { - if err := validateRuntimeIDStr(arg); err != nil { + if err := ValidateRuntimeIDStr(arg); err != nil { return fmt.Errorf("malformed runtime id '%v': %v", arg, err) } } @@ -52,7 +52,8 @@ var ( cfgRoothashExportFile = "output_file" ) -func validateRuntimeIDStr(idStr string) error { +// ValidateRuntimeIDStr validates that the given string is a valid runtime id. +func ValidateRuntimeIDStr(idStr string) error { b, err := hex.DecodeString(idStr) if err != nil { return err diff --git a/go/ekiden/cmd/debug/storage/storage.go b/go/ekiden/cmd/debug/storage/storage.go new file mode 100644 index 00000000000..321c3379049 --- /dev/null +++ b/go/ekiden/cmd/debug/storage/storage.go @@ -0,0 +1,234 @@ +// Package storage implements the storage debug sub-commands. +package storage + +import ( + "context" + "fmt" + "math" + "os" + "time" + + "github.com/spf13/cobra" + + "github.com/oasislabs/ekiden/go/common/crypto/signature" + "github.com/oasislabs/ekiden/go/common/logging" + cmdGrpc "github.com/oasislabs/ekiden/go/ekiden/cmd/common/grpc" + cmdDebugClient "github.com/oasislabs/ekiden/go/ekiden/cmd/debug/client" + cmdRoothashDebug "github.com/oasislabs/ekiden/go/ekiden/cmd/debug/roothash" + clientGrpc "github.com/oasislabs/ekiden/go/grpc/client" + storageGrpc "github.com/oasislabs/ekiden/go/grpc/storage" + "github.com/oasislabs/ekiden/go/roothash/api/block" + storageApi "github.com/oasislabs/ekiden/go/storage/api" + storageClient "github.com/oasislabs/ekiden/go/storage/client" + "github.com/oasislabs/ekiden/go/storage/mkvs/urkel/node" +) + +const ( + // MaxSyncCheckRetries is the maximum number of waiting loops for the storage worker to get synced. + MaxSyncCheckRetries = 180 +) + +var ( + storageCmd = &cobra.Command{ + Use: "storage", + Short: "node storage interface utilities", + } + + storageCheckRootsCmd = &cobra.Command{ + Use: "check-roots runtime-id (hex)", + Short: "check that the given storage node has all the roots up to the current block", + Args: func(cmd *cobra.Command, args []string) error { + nrFn := cobra.ExactArgs(1) + if err := nrFn(cmd, args); err != nil { + return err + } + for _, arg := range args { + if err := cmdRoothashDebug.ValidateRuntimeIDStr(arg); err != nil { + return fmt.Errorf("malformed runtime id '%v': %v", arg, err) + } + } + + return nil + }, + Run: doCheckRoots, + } + + logger = logging.GetLogger("cmd/storage") +) + +func checkDiff(ctx context.Context, storageClient storageApi.Backend, root string, oldRoot node.Root, newRoot node.Root) { + it, err := storageClient.GetDiff(ctx, oldRoot, newRoot) + if err != nil { + logger.Error("error getting write log from the syncing node", + "err", err, + "root_type", root, + "old_root", oldRoot, + "new_root", newRoot, + ) + os.Exit(1) + } + for { + more, err := it.Next() + if err != nil { + logger.Error("can't get next item from write log iterator", + "err", err, + "root_type", root, + "old_root", oldRoot, + "new_root", newRoot, + ) + os.Exit(1) + } + if !more { + break + } + + val, err := it.Value() + if err != nil { + logger.Error("can't get value out of write log iterator", + "err", err, + "root_type", root, + "old_root", oldRoot, + "new_root", newRoot, + ) + os.Exit(1) + } + logger.Debug("write log entry", "key", val.Key, "value", val.Value) + } + logger.Debug("write log read successfully", + "root_type", root, + "old_root", oldRoot, + "new_root", newRoot, + ) +} + +func doCheckRoots(cmd *cobra.Command, args []string) { + ctx := context.Background() + + storageClient.RegisterFlags(cmd) + cmdGrpc.RegisterClientFlags(cmd, true) + + conn, client := cmdDebugClient.DoConnect(cmd) + storageWorkerClient := storageGrpc.NewStorageWorkerClient(conn) + defer conn.Close() + + storageClient, err := storageClient.New(ctx, nil, nil, nil) + if err != nil { + logger.Error("error while connecting to storage client", + "err", err, + ) + os.Exit(1) + } + + var id signature.PublicKey + if err = id.UnmarshalHex(args[0]); err != nil { + logger.Error("failed to decode runtime id", + "err", err, + ) + os.Exit(1) + } + + res, err := client.GetBlock(ctx, &clientGrpc.GetBlockRequest{RuntimeId: id, Round: math.MaxUint64}) + if err != nil { + logger.Error("failed to get latest block from roothash", + "err", err, + ) + os.Exit(1) + } + + var latestBlock block.Block + if err = latestBlock.UnmarshalCBOR(res.Block); err != nil { + logger.Error("failed to parse block", + "err", err, + "runtime_id", id, + ) + os.Exit(1) + } + + // Wait for the worker to sync until this last round. + var resp *storageGrpc.GetLastSyncedRoundResponse + retryCount := 0 + for { + lastSyncedReq := &storageGrpc.GetLastSyncedRoundRequest{ + RuntimeId: id, + } + resp, err = storageWorkerClient.GetLastSyncedRound(ctx, lastSyncedReq) + if err != nil { + logger.Error("failed to get last synced round from storage worker", + "err", err, + ) + os.Exit(1) + } + + if resp.GetRound() >= latestBlock.Header.Round { + break + } + logger.Debug("storage worker not synced yet, waiting", + "last_synced", resp.GetRound(), + "expected", latestBlock.Header.Round, + ) + time.Sleep(5 * time.Second) + + retryCount++ + if retryCount > MaxSyncCheckRetries { + logger.Error("exceeded maximum wait retries, aborting") + os.Exit(1) + } + } + logger.Debug("storage worker is synced at least to the round we want", + "last_synced", resp.GetRound(), + "expected", latestBlock.Header.Round, + ) + + // Go through every block up to latestBlock and try getting write logs for each of them. + oldStateRoot := node.Root{} + oldStateRoot.Hash.Empty() + emptyRoot := node.Root{} + emptyRoot.Hash.Empty() + for i := uint64(0); i <= latestBlock.Header.Round; i++ { + res, err = client.GetBlock(ctx, &clientGrpc.GetBlockRequest{RuntimeId: id, Round: i}) + if err != nil { + logger.Error("failed to get block from roothash", + "err", err, + "round", i, + ) + os.Exit(1) + } + + var blk block.Block + if err = blk.UnmarshalCBOR(res.Block); err != nil { + logger.Error("failed to parse block", + "err", err, + "runtime_id", id, + "round", i, + ) + os.Exit(1) + } + + stateRoot := node.Root{ + Round: i, + Hash: blk.Header.StateRoot, + } + if !oldStateRoot.Hash.Equal(&stateRoot.Hash) { + checkDiff(ctx, storageClient, "state", oldStateRoot, stateRoot) + } + oldStateRoot = stateRoot + + emptyRoot.Round = i + ioRoot := node.Root{ + Round: i, + Hash: blk.Header.IORoot, + } + if !ioRoot.Hash.IsEmpty() { + checkDiff(ctx, storageClient, "io", emptyRoot, ioRoot) + } + } +} + +// Register registers the storage sub-command and all of its children. +func Register(parentCmd *cobra.Command) { + storageClient.RegisterFlags(storageCheckRootsCmd) + cmdGrpc.RegisterClientFlags(storageCheckRootsCmd, true) + + storageCmd.AddCommand(storageCheckRootsCmd) + parentCmd.AddCommand(storageCmd) +} diff --git a/go/ekiden/cmd/node/node.go b/go/ekiden/cmd/node/node.go index 4f3ae95b893..3dc77e5751f 100644 --- a/go/ekiden/cmd/node/node.go +++ b/go/ekiden/cmd/node/node.go @@ -162,7 +162,7 @@ func (n *Node) initBackends() error { grpcSrv := n.grpcInternal.Server() registry.NewGRPCServer(grpcSrv, n.Registry) staking.NewGRPCServer(grpcSrv, n.Staking) - storage.NewGRPCServer(grpcSrv, n.Storage, &grpc.AllowAllRuntimePolicyChecker{}) + storage.NewGRPCServer(grpcSrv, n.Storage, &grpc.AllowAllRuntimePolicyChecker{}, false) dummydebug.NewGRPCServer(grpcSrv, n.Epochtime, n.Registry) cmdCommon.Logger().Debug("backends initialized") @@ -232,6 +232,7 @@ func (n *Node) initAndStartWorkers(logger *logging.Logger) error { // Initialize the storage worker. n.StorageWorker, err = workerStorage.New( + n.grpcInternal, n.CommonWorker, n.WorkerRegistration, n.Genesis, diff --git a/go/grpc/storage/storage.proto b/go/grpc/storage/storage.proto index 69c66dadde1..76428efc3b8 100644 --- a/go/grpc/storage/storage.proto +++ b/go/grpc/storage/storage.proto @@ -3,6 +3,20 @@ syntax = "proto3"; package storage; option go_package = "github.com/oasislabs/ekiden/go/grpc/storage"; +service StorageWorker { + rpc GetLastSyncedRound(GetLastSyncedRoundRequest) returns (GetLastSyncedRoundResponse) {} +} + +message GetLastSyncedRoundRequest { + bytes runtime_id = 1; +} + +message GetLastSyncedRoundResponse { + uint64 round = 1; + bytes io_root = 2; + bytes state_root = 3; +} + service Storage { // MKVS rpc Apply(ApplyRequest) returns (ApplyResponse) {} diff --git a/go/storage/grpc.go b/go/storage/grpc.go index 1f1692e232f..7607bb23bb5 100644 --- a/go/storage/grpc.go +++ b/go/storage/grpc.go @@ -28,9 +28,14 @@ var _ pb.StorageServer = (*GrpcServer)(nil) type GrpcServer struct { backend api.Backend commonGrpc.RuntimePolicyChecker + + ignoreApply bool } func (s *GrpcServer) Apply(ctx context.Context, req *pb.ApplyRequest) (*pb.ApplyResponse, error) { + if s.ignoreApply { + return nil, errors.New("storage: ignoring apply operation") + } var ns common.Namespace if err := ns.UnmarshalBinary(req.GetNamespace()); err != nil { return nil, errors.Wrap(err, "storage: failed to unmarshal namespace") @@ -66,6 +71,9 @@ func (s *GrpcServer) Apply(ctx context.Context, req *pb.ApplyRequest) (*pb.Apply } func (s *GrpcServer) ApplyBatch(ctx context.Context, req *pb.ApplyBatchRequest) (*pb.ApplyBatchResponse, error) { + if s.ignoreApply { + return nil, errors.New("storage: ignoring apply operation") + } var ns common.Namespace if err := ns.UnmarshalBinary(req.GetNamespace()); err != nil { return nil, errors.Wrap(err, "storage: failed to unmarshal namespace") @@ -330,10 +338,11 @@ func (s *GrpcServer) GetCheckpoint(req *pb.GetCheckpointRequest, stream pb.Stora // NewGRPCServer initializes and registers a gRPC storage server backend. // by the provided Backend. -func NewGRPCServer(srv *grpc.Server, b api.Backend, policy commonGrpc.RuntimePolicyChecker) *GrpcServer { +func NewGRPCServer(srv *grpc.Server, b api.Backend, policy commonGrpc.RuntimePolicyChecker, ignoreApply bool) *GrpcServer { s := &GrpcServer{ backend: b, RuntimePolicyChecker: policy, + ignoreApply: ignoreApply, } pb.RegisterStorageServer(srv, s) diff --git a/go/worker/storage/worker.go b/go/worker/storage/worker.go index 9968d68a035..1d7ab3efd76 100644 --- a/go/worker/storage/worker.go +++ b/go/worker/storage/worker.go @@ -28,6 +28,8 @@ import ( const ( cfgWorkerEnabled = "worker.storage.enabled" cfgWorkerFetcherCount = "worker.storage.fetcher_count" + + cfgWorkerDebugIgnoreApply = "worker.debug.storage.ignore_apply" ) var ( @@ -59,6 +61,7 @@ type Worker struct { // New constructs a new storage worker. func New( + grpcInternal *grpc.Server, commonWorker *workerCommon.Worker, registration *registration.Registration, genesis genesis.Provider, @@ -110,9 +113,9 @@ func New( } }) - // Attach storage worker to gRPC server. + // Attach storage interface to gRPC server. s.grpcPolicy = grpc.NewDynamicRuntimePolicyChecker() - storage.NewGRPCServer(s.commonWorker.Grpc.Server(), s.commonWorker.Storage, s.grpcPolicy) + storage.NewGRPCServer(s.commonWorker.Grpc.Server(), s.commonWorker.Storage, s.grpcPolicy, viper.GetBool(cfgWorkerDebugIgnoreApply)) // Register storage worker role. s.registration.RegisterRole(func(n *node.Node) error { @@ -127,6 +130,9 @@ func New( return nil, err } } + + // Attach the storage worker's internal GRPC interface. + newGRPCServer(grpcInternal, s) } return s, nil @@ -267,10 +273,13 @@ func RegisterFlags(cmd *cobra.Command) { if !cmd.Flags().Parsed() { cmd.Flags().Bool(cfgWorkerEnabled, false, "Enable storage worker") cmd.Flags().Uint(cfgWorkerFetcherCount, 4, "Number of concurrent storage diff fetchers") + cmd.Flags().Bool(cfgWorkerDebugIgnoreApply, false, "Ignore Apply operations (for debugging purposes)") + _ = cmd.Flags().MarkHidden(cfgWorkerDebugIgnoreApply) } for _, v := range []string{ cfgWorkerEnabled, cfgWorkerFetcherCount, + cfgWorkerDebugIgnoreApply, } { viper.BindPFlag(v, cmd.Flags().Lookup(v)) // nolint: errcheck }