Skip to content

Commit

Permalink
go/storage: Implement E2E test for GetDiff sync
Browse files Browse the repository at this point in the history
  • Loading branch information
jberci committed Aug 21, 2019
1 parent a9c17e8 commit 701afb4
Show file tree
Hide file tree
Showing 9 changed files with 325 additions and 6 deletions.
6 changes: 6 additions & 0 deletions .buildkite/scripts/common_e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,17 @@ run_compute_node() {
# Optional named arguments:
# clear_storage - clear storage node dir (default: 1)
#
# Output environment variables:
# EKIDEN_LAST_NODE_DATA_DIR - the data directory for the last node run
#
run_storage_node() {
# Process arguments.
local id=$1
shift || true

# Optional arguments with default values.
local clear_storage=1
local extra_args=""
# Load named arguments that override defaults.
local "$@"

Expand All @@ -317,6 +321,7 @@ run_storage_node() {
if [[ $clear_storage == 1 ]]; then
rm -rf ${data_dir}
fi
EKIDEN_LAST_NODE_DATA_DIR="${data_dir}"
local log_file=${EKIDEN_COMMITTEE_DIR}/storage-$id.log
rm -rf ${log_file}

Expand Down Expand Up @@ -349,6 +354,7 @@ run_storage_node() {
--worker.runtime.id ${EKIDEN_RUNTIME_ID} \
--datadir ${data_dir} \
--debug.allow_test_keys \
$extra_args \
2>&1 | sed "s/^/[storage-node-${id}] /" &
}

Expand Down
43 changes: 43 additions & 0 deletions .buildkite/scripts/test_e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,40 @@ assert_merge_discrepancy_scenario_works() {
assert_merge_discrepancies
}

scenario_storage_sync() {
local runtime=simple-keyvalue
local client=simple-keyvalue

# Initialize compute nodes.
run_compute_node 1 ${runtime}
run_compute_node 2 ${runtime}
run_compute_node 3 ${runtime}

# Initialize storage nodes.
run_storage_node 1
run_storage_node 2 extra_args="--worker.debug.storage.ignore_apply"

# Wait for all nodes to start: 3 compute + 2 storage + key manager.
wait_nodes 6

# Advance epoch to elect a new committee.
set_epoch 1

# Wait for the client to exit without waiting on it.
while kill -0 ${EKIDEN_CLIENT_PID}; do
sleep 5
done

local log_file=${EKIDEN_COMMITTEE_DIR}/client-storage-debug.log
${EKIDEN_NODE} debug storage check-roots \
--datadir ${EKIDEN_COMMITTEE_DIR} \
--log.level debug \
--log.file ${log_file} \
--storage.debug.client.address "unix:${EKIDEN_LAST_NODE_DATA_DIR}/internal.sock" \
--address "unix:${EKIDEN_LAST_NODE_DATA_DIR}/internal.sock" \
${EKIDEN_RUNTIME_ID}
}

run_client_km_restart() {
local runtime=$1
local client=$2
Expand Down Expand Up @@ -198,6 +232,15 @@ test_suite() {
client=simple-keyvalue \
on_success_hook=assert_merge_discrepancy_scenario_works \
beacon_deterministic=1

# Storage node syncing scenario.
run_test \
scenario=scenario_storage_sync \
name="e2e-${backend_name}-storage-sync" \
backend_runner=$backend_runner \
runtime=simple-keyvalue \
client=simple-keyvalue

}

##########################################
Expand Down
2 changes: 2 additions & 0 deletions go/ekiden/cmd/debug/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/oasislabs/ekiden/go/ekiden/cmd/debug/client"
"github.com/oasislabs/ekiden/go/ekiden/cmd/debug/dummy"
"github.com/oasislabs/ekiden/go/ekiden/cmd/debug/roothash"
"github.com/oasislabs/ekiden/go/ekiden/cmd/debug/storage"
"github.com/oasislabs/ekiden/go/ekiden/cmd/debug/tendermint"
)

Expand All @@ -20,6 +21,7 @@ func Register(parentCmd *cobra.Command) {
client.Register(debugCmd)
dummy.Register(debugCmd)
roothash.Register(debugCmd)
storage.Register(debugCmd)
tendermint.Register(debugCmd)

parentCmd.AddCommand(debugCmd)
Expand Down
5 changes: 3 additions & 2 deletions go/ekiden/cmd/debug/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var (
return err
}
for _, arg := range args {
if err := validateRuntimeIDStr(arg); err != nil {
if err := ValidateRuntimeIDStr(arg); err != nil {
return fmt.Errorf("malformed runtime id '%v': %v", arg, err)
}
}
Expand All @@ -52,7 +52,8 @@ var (
cfgRoothashExportFile = "output_file"
)

func validateRuntimeIDStr(idStr string) error {
// ValidateRuntimeIDStr validates that the given string is a valid runtime id.
func ValidateRuntimeIDStr(idStr string) error {
b, err := hex.DecodeString(idStr)
if err != nil {
return err
Expand Down
234 changes: 234 additions & 0 deletions go/ekiden/cmd/debug/storage/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
// Package storage implements the storage debug sub-commands.
package storage

import (
"context"
"fmt"
"math"
"os"
"time"

"github.com/spf13/cobra"

"github.com/oasislabs/ekiden/go/common/crypto/signature"
"github.com/oasislabs/ekiden/go/common/logging"
cmdGrpc "github.com/oasislabs/ekiden/go/ekiden/cmd/common/grpc"
cmdDebugClient "github.com/oasislabs/ekiden/go/ekiden/cmd/debug/client"
cmdRoothashDebug "github.com/oasislabs/ekiden/go/ekiden/cmd/debug/roothash"
clientGrpc "github.com/oasislabs/ekiden/go/grpc/client"
storageGrpc "github.com/oasislabs/ekiden/go/grpc/storage"
"github.com/oasislabs/ekiden/go/roothash/api/block"
storageApi "github.com/oasislabs/ekiden/go/storage/api"
storageClient "github.com/oasislabs/ekiden/go/storage/client"
"github.com/oasislabs/ekiden/go/storage/mkvs/urkel/node"
)

const (
// MaxSyncCheckRetries is the maximum number of waiting loops for the storage worker to get synced.
MaxSyncCheckRetries = 180
)

var (
storageCmd = &cobra.Command{
Use: "storage",
Short: "node storage interface utilities",
}

storageCheckRootsCmd = &cobra.Command{
Use: "check-roots runtime-id (hex)",
Short: "check that the given storage node has all the roots up to the current block",
Args: func(cmd *cobra.Command, args []string) error {
nrFn := cobra.ExactArgs(1)
if err := nrFn(cmd, args); err != nil {
return err
}
for _, arg := range args {
if err := cmdRoothashDebug.ValidateRuntimeIDStr(arg); err != nil {
return fmt.Errorf("malformed runtime id '%v': %v", arg, err)
}
}

return nil
},
Run: doCheckRoots,
}

logger = logging.GetLogger("cmd/storage")
)

func checkDiff(ctx context.Context, storageClient storageApi.Backend, root string, oldRoot node.Root, newRoot node.Root) {
it, err := storageClient.GetDiff(ctx, oldRoot, newRoot)
if err != nil {
logger.Error("error getting write log from the syncing node",
"err", err,
"root_type", root,
"old_root", oldRoot,
"new_root", newRoot,
)
os.Exit(1)
}
for {
more, err := it.Next()
if err != nil {
logger.Error("can't get next item from write log iterator",
"err", err,
"root_type", root,
"old_root", oldRoot,
"new_root", newRoot,
)
os.Exit(1)
}
if !more {
break
}

val, err := it.Value()
if err != nil {
logger.Error("can't get value out of write log iterator",
"err", err,
"root_type", root,
"old_root", oldRoot,
"new_root", newRoot,
)
os.Exit(1)
}
logger.Debug("write log entry", "key", val.Key, "value", val.Value)
}
logger.Debug("write log read successfully",
"root_type", root,
"old_root", oldRoot,
"new_root", newRoot,
)
}

func doCheckRoots(cmd *cobra.Command, args []string) {
ctx := context.Background()

storageClient.RegisterFlags(cmd)
cmdGrpc.RegisterClientFlags(cmd, true)

conn, client := cmdDebugClient.DoConnect(cmd)
storageWorkerClient := storageGrpc.NewStorageWorkerClient(conn)
defer conn.Close()

storageClient, err := storageClient.New(ctx, nil, nil, nil)
if err != nil {
logger.Error("error while connecting to storage client",
"err", err,
)
os.Exit(1)
}

var id signature.PublicKey
if err = id.UnmarshalHex(args[0]); err != nil {
logger.Error("failed to decode runtime id",
"err", err,
)
os.Exit(1)
}

res, err := client.GetBlock(ctx, &clientGrpc.GetBlockRequest{RuntimeId: id, Round: math.MaxUint64})
if err != nil {
logger.Error("failed to get latest block from roothash",
"err", err,
)
os.Exit(1)
}

var latestBlock block.Block
if err = latestBlock.UnmarshalCBOR(res.Block); err != nil {
logger.Error("failed to parse block",
"err", err,
"runtime_id", id,
)
os.Exit(1)
}

// Wait for the worker to sync until this last round.
var resp *storageGrpc.GetLastSyncedRoundResponse
retryCount := 0
for {
lastSyncedReq := &storageGrpc.GetLastSyncedRoundRequest{
RuntimeId: id,
}
resp, err = storageWorkerClient.GetLastSyncedRound(ctx, lastSyncedReq)
if err != nil {
logger.Error("failed to get last synced round from storage worker",
"err", err,
)
os.Exit(1)
}

if resp.GetRound() >= latestBlock.Header.Round {
break
}
logger.Debug("storage worker not synced yet, waiting",
"last_synced", resp.GetRound(),
"expected", latestBlock.Header.Round,
)
time.Sleep(5 * time.Second)

retryCount++
if retryCount > MaxSyncCheckRetries {
logger.Error("exceeded maximum wait retries, aborting")
os.Exit(1)
}
}
logger.Debug("storage worker is synced at least to the round we want",
"last_synced", resp.GetRound(),
"expected", latestBlock.Header.Round,
)

// Go through every block up to latestBlock and try getting write logs for each of them.
oldStateRoot := node.Root{}
oldStateRoot.Hash.Empty()
emptyRoot := node.Root{}
emptyRoot.Hash.Empty()
for i := uint64(0); i <= latestBlock.Header.Round; i++ {
res, err = client.GetBlock(ctx, &clientGrpc.GetBlockRequest{RuntimeId: id, Round: i})
if err != nil {
logger.Error("failed to get block from roothash",
"err", err,
"round", i,
)
os.Exit(1)
}

var blk block.Block
if err = blk.UnmarshalCBOR(res.Block); err != nil {
logger.Error("failed to parse block",
"err", err,
"runtime_id", id,
"round", i,
)
os.Exit(1)
}

stateRoot := node.Root{
Round: i,
Hash: blk.Header.StateRoot,
}
if !oldStateRoot.Hash.Equal(&stateRoot.Hash) {
checkDiff(ctx, storageClient, "state", oldStateRoot, stateRoot)
}
oldStateRoot = stateRoot

emptyRoot.Round = i
ioRoot := node.Root{
Round: i,
Hash: blk.Header.IORoot,
}
if !ioRoot.Hash.IsEmpty() {
checkDiff(ctx, storageClient, "io", emptyRoot, ioRoot)
}
}
}

// Register registers the storage sub-command and all of its children.
func Register(parentCmd *cobra.Command) {
storageClient.RegisterFlags(storageCheckRootsCmd)
cmdGrpc.RegisterClientFlags(storageCheckRootsCmd, true)

storageCmd.AddCommand(storageCheckRootsCmd)
parentCmd.AddCommand(storageCmd)
}
3 changes: 2 additions & 1 deletion go/ekiden/cmd/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (n *Node) initBackends() error {
grpcSrv := n.grpcInternal.Server()
registry.NewGRPCServer(grpcSrv, n.Registry)
staking.NewGRPCServer(grpcSrv, n.Staking)
storage.NewGRPCServer(grpcSrv, n.Storage, &grpc.AllowAllRuntimePolicyChecker{})
storage.NewGRPCServer(grpcSrv, n.Storage, &grpc.AllowAllRuntimePolicyChecker{}, false)
dummydebug.NewGRPCServer(grpcSrv, n.Epochtime, n.Registry)

cmdCommon.Logger().Debug("backends initialized")
Expand Down Expand Up @@ -232,6 +232,7 @@ func (n *Node) initAndStartWorkers(logger *logging.Logger) error {

// Initialize the storage worker.
n.StorageWorker, err = workerStorage.New(
n.grpcInternal,
n.CommonWorker,
n.WorkerRegistration,
n.Genesis,
Expand Down
14 changes: 14 additions & 0 deletions go/grpc/storage/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,20 @@ syntax = "proto3";
package storage;
option go_package = "github.com/oasislabs/ekiden/go/grpc/storage";

service StorageWorker {
rpc GetLastSyncedRound(GetLastSyncedRoundRequest) returns (GetLastSyncedRoundResponse) {}
}

message GetLastSyncedRoundRequest {
bytes runtime_id = 1;
}

message GetLastSyncedRoundResponse {
uint64 round = 1;
bytes io_root = 2;
bytes state_root = 3;
}

service Storage {
// MKVS
rpc Apply(ApplyRequest) returns (ApplyResponse) {}
Expand Down
Loading

0 comments on commit 701afb4

Please sign in to comment.