Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement GetDiff E2E tests #1975

Merged
merged 4 commits into from
Aug 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .buildkite/scripts/common_e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,17 @@ run_compute_node() {
# Optional named arguments:
# clear_storage - clear storage node dir (default: 1)
#
# Output environment variables:
# EKIDEN_LAST_NODE_DATA_DIR - the data directory for the last node run
#
run_storage_node() {
# Process arguments.
local id=$1
shift || true

# Optional arguments with default values.
local clear_storage=1
local extra_args=""
# Load named arguments that override defaults.
local "$@"

Expand All @@ -317,6 +321,7 @@ run_storage_node() {
if [[ $clear_storage == 1 ]]; then
rm -rf ${data_dir}
fi
EKIDEN_LAST_NODE_DATA_DIR="${data_dir}"
local log_file=${EKIDEN_COMMITTEE_DIR}/storage-$id.log
rm -rf ${log_file}

Expand Down Expand Up @@ -349,6 +354,7 @@ run_storage_node() {
--worker.runtime.id ${EKIDEN_RUNTIME_ID} \
--datadir ${data_dir} \
--debug.allow_test_keys \
$extra_args \
2>&1 | sed "s/^/[storage-node-${id}] /" &
}

Expand Down
43 changes: 43 additions & 0 deletions .buildkite/scripts/test_e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,40 @@ assert_merge_discrepancy_scenario_works() {
assert_merge_discrepancies
}

scenario_storage_sync() {
local runtime=simple-keyvalue
local client=simple-keyvalue

# Initialize compute nodes.
run_compute_node 1 ${runtime}
run_compute_node 2 ${runtime}
run_compute_node 3 ${runtime}

# Initialize storage nodes.
run_storage_node 1
run_storage_node 2 extra_args="--worker.debug.storage.ignore_apply"

# Wait for all nodes to start: 3 compute + 2 storage + key manager.
wait_nodes 6

# Advance epoch to elect a new committee.
set_epoch 1

# Wait for the client to exit without waiting on it.
while kill -0 ${EKIDEN_CLIENT_PID}; do
sleep 5
done

local log_file=${EKIDEN_COMMITTEE_DIR}/client-storage-debug.log
${EKIDEN_NODE} debug storage check-roots \
--datadir ${EKIDEN_COMMITTEE_DIR} \
--log.level debug \
--log.file ${log_file} \
--storage.debug.client.address "unix:${EKIDEN_LAST_NODE_DATA_DIR}/internal.sock" \
--address "unix:${EKIDEN_LAST_NODE_DATA_DIR}/internal.sock" \
${EKIDEN_RUNTIME_ID}
}

run_client_km_restart() {
local runtime=$1
local client=$2
Expand Down Expand Up @@ -198,6 +232,15 @@ test_suite() {
client=simple-keyvalue \
on_success_hook=assert_merge_discrepancy_scenario_works \
beacon_deterministic=1

# Storage node syncing scenario.
run_test \
scenario=scenario_storage_sync \
name="e2e-${backend_name}-storage-sync" \
backend_runner=$backend_runner \
runtime=simple-keyvalue \
client=simple-keyvalue

}

##########################################
Expand Down
2 changes: 2 additions & 0 deletions go/ekiden/cmd/debug/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/oasislabs/ekiden/go/ekiden/cmd/debug/client"
"github.com/oasislabs/ekiden/go/ekiden/cmd/debug/dummy"
"github.com/oasislabs/ekiden/go/ekiden/cmd/debug/roothash"
"github.com/oasislabs/ekiden/go/ekiden/cmd/debug/storage"
"github.com/oasislabs/ekiden/go/ekiden/cmd/debug/tendermint"
)

Expand All @@ -21,6 +22,7 @@ func Register(parentCmd *cobra.Command) {
client.Register(debugCmd)
dummy.Register(debugCmd)
roothash.Register(debugCmd)
storage.Register(debugCmd)
tendermint.Register(debugCmd)
byzantine.Register(debugCmd)

Expand Down
5 changes: 3 additions & 2 deletions go/ekiden/cmd/debug/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var (
return err
}
for _, arg := range args {
if err := validateRuntimeIDStr(arg); err != nil {
if err := ValidateRuntimeIDStr(arg); err != nil {
return fmt.Errorf("malformed runtime id '%v': %v", arg, err)
}
}
Expand All @@ -52,7 +52,8 @@ var (
cfgRoothashExportFile = "output_file"
)

func validateRuntimeIDStr(idStr string) error {
// ValidateRuntimeIDStr validates that the given string is a valid runtime id.
func ValidateRuntimeIDStr(idStr string) error {
b, err := hex.DecodeString(idStr)
if err != nil {
return err
Expand Down
234 changes: 234 additions & 0 deletions go/ekiden/cmd/debug/storage/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
// Package storage implements the storage debug sub-commands.
package storage

import (
"context"
"fmt"
"math"
"os"
"time"

"github.com/spf13/cobra"

"github.com/oasislabs/ekiden/go/common/crypto/signature"
"github.com/oasislabs/ekiden/go/common/logging"
cmdGrpc "github.com/oasislabs/ekiden/go/ekiden/cmd/common/grpc"
cmdDebugClient "github.com/oasislabs/ekiden/go/ekiden/cmd/debug/client"
cmdRoothashDebug "github.com/oasislabs/ekiden/go/ekiden/cmd/debug/roothash"
clientGrpc "github.com/oasislabs/ekiden/go/grpc/client"
storageGrpc "github.com/oasislabs/ekiden/go/grpc/storage"
"github.com/oasislabs/ekiden/go/roothash/api/block"
storageApi "github.com/oasislabs/ekiden/go/storage/api"
storageClient "github.com/oasislabs/ekiden/go/storage/client"
"github.com/oasislabs/ekiden/go/storage/mkvs/urkel/node"
)

const (
// MaxSyncCheckRetries is the maximum number of waiting loops for the storage worker to get synced.
MaxSyncCheckRetries = 180
)

var (
storageCmd = &cobra.Command{
Use: "storage",
Short: "node storage interface utilities",
}

storageCheckRootsCmd = &cobra.Command{
Use: "check-roots runtime-id (hex)",
Short: "check that the given storage node has all the roots up to the current block",
Args: func(cmd *cobra.Command, args []string) error {
nrFn := cobra.ExactArgs(1)
if err := nrFn(cmd, args); err != nil {
return err
}
for _, arg := range args {
if err := cmdRoothashDebug.ValidateRuntimeIDStr(arg); err != nil {
return fmt.Errorf("malformed runtime id '%v': %v", arg, err)
}
}

return nil
},
Run: doCheckRoots,
}

logger = logging.GetLogger("cmd/storage")
)

func checkDiff(ctx context.Context, storageClient storageApi.Backend, root string, oldRoot node.Root, newRoot node.Root) {
it, err := storageClient.GetDiff(ctx, oldRoot, newRoot)
if err != nil {
logger.Error("error getting write log from the syncing node",
"err", err,
"root_type", root,
"old_root", oldRoot,
"new_root", newRoot,
)
os.Exit(1)
}
for {
more, err := it.Next()
if err != nil {
logger.Error("can't get next item from write log iterator",
"err", err,
"root_type", root,
"old_root", oldRoot,
"new_root", newRoot,
)
os.Exit(1)
}
if !more {
break
}

val, err := it.Value()
if err != nil {
logger.Error("can't get value out of write log iterator",
"err", err,
"root_type", root,
"old_root", oldRoot,
"new_root", newRoot,
)
os.Exit(1)
}
logger.Debug("write log entry", "key", val.Key, "value", val.Value)
}
logger.Debug("write log read successfully",
"root_type", root,
"old_root", oldRoot,
"new_root", newRoot,
)
}

func doCheckRoots(cmd *cobra.Command, args []string) {
ctx := context.Background()

storageClient.RegisterFlags(cmd)
cmdGrpc.RegisterClientFlags(cmd, true)

conn, client := cmdDebugClient.DoConnect(cmd)
storageWorkerClient := storageGrpc.NewStorageWorkerClient(conn)
defer conn.Close()

storageClient, err := storageClient.New(ctx, nil, nil, nil)
if err != nil {
logger.Error("error while connecting to storage client",
"err", err,
)
os.Exit(1)
}

var id signature.PublicKey
if err = id.UnmarshalHex(args[0]); err != nil {
logger.Error("failed to decode runtime id",
"err", err,
)
os.Exit(1)
}

res, err := client.GetBlock(ctx, &clientGrpc.GetBlockRequest{RuntimeId: id, Round: math.MaxUint64})
if err != nil {
logger.Error("failed to get latest block from roothash",
"err", err,
)
os.Exit(1)
}

var latestBlock block.Block
if err = latestBlock.UnmarshalCBOR(res.Block); err != nil {
logger.Error("failed to parse block",
"err", err,
"runtime_id", id,
)
os.Exit(1)
}

// Wait for the worker to sync until this last round.
var resp *storageGrpc.GetLastSyncedRoundResponse
retryCount := 0
for {
lastSyncedReq := &storageGrpc.GetLastSyncedRoundRequest{
RuntimeId: id,
}
resp, err = storageWorkerClient.GetLastSyncedRound(ctx, lastSyncedReq)
if err != nil {
logger.Error("failed to get last synced round from storage worker",
"err", err,
)
os.Exit(1)
}

if resp.GetRound() >= latestBlock.Header.Round {
break
}
logger.Debug("storage worker not synced yet, waiting",
"last_synced", resp.GetRound(),
"expected", latestBlock.Header.Round,
)
time.Sleep(5 * time.Second)

retryCount++
if retryCount > MaxSyncCheckRetries {
logger.Error("exceeded maximum wait retries, aborting")
os.Exit(1)
}
}
logger.Debug("storage worker is synced at least to the round we want",
"last_synced", resp.GetRound(),
"expected", latestBlock.Header.Round,
)

// Go through every block up to latestBlock and try getting write logs for each of them.
oldStateRoot := node.Root{}
oldStateRoot.Hash.Empty()
emptyRoot := node.Root{}
emptyRoot.Hash.Empty()
for i := uint64(0); i <= latestBlock.Header.Round; i++ {
res, err = client.GetBlock(ctx, &clientGrpc.GetBlockRequest{RuntimeId: id, Round: i})
if err != nil {
logger.Error("failed to get block from roothash",
"err", err,
"round", i,
)
os.Exit(1)
}

var blk block.Block
if err = blk.UnmarshalCBOR(res.Block); err != nil {
logger.Error("failed to parse block",
"err", err,
"runtime_id", id,
"round", i,
)
os.Exit(1)
}

stateRoot := node.Root{
Round: i,
Hash: blk.Header.StateRoot,
}
if !oldStateRoot.Hash.Equal(&stateRoot.Hash) {
checkDiff(ctx, storageClient, "state", oldStateRoot, stateRoot)
}
oldStateRoot = stateRoot

emptyRoot.Round = i
ioRoot := node.Root{
Round: i,
Hash: blk.Header.IORoot,
}
if !ioRoot.Hash.IsEmpty() {
checkDiff(ctx, storageClient, "io", emptyRoot, ioRoot)
}
}
}

// Register registers the storage sub-command and all of its children.
func Register(parentCmd *cobra.Command) {
storageClient.RegisterFlags(storageCheckRootsCmd)
cmdGrpc.RegisterClientFlags(storageCheckRootsCmd, true)

storageCmd.AddCommand(storageCheckRootsCmd)
parentCmd.AddCommand(storageCmd)
}
3 changes: 2 additions & 1 deletion go/ekiden/cmd/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (n *Node) initBackends() error {
grpcSrv := n.grpcInternal.Server()
registry.NewGRPCServer(grpcSrv, n.Registry)
staking.NewGRPCServer(grpcSrv, n.Staking)
storage.NewGRPCServer(grpcSrv, n.Storage, &grpc.AllowAllRuntimePolicyChecker{})
storage.NewGRPCServer(grpcSrv, n.Storage, &grpc.AllowAllRuntimePolicyChecker{}, false)
dummydebug.NewGRPCServer(grpcSrv, n.Epochtime, n.Registry)

cmdCommon.Logger().Debug("backends initialized")
Expand Down Expand Up @@ -232,6 +232,7 @@ func (n *Node) initAndStartWorkers(logger *logging.Logger) error {

// Initialize the storage worker.
n.StorageWorker, err = workerStorage.New(
n.grpcInternal,
n.CommonWorker,
n.WorkerRegistration,
n.Genesis,
Expand Down
4 changes: 2 additions & 2 deletions go/ekiden/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func testStorageClient(t *testing.T, node *testNode) {
ctx := context.Background()

// Storage client tests.
storageClientTests.ClientWorkerTests(t, node.Identity.TLSCertificate, node.Beacon, timeSource, node.Registry, node.Scheduler)
storageClientTests.ClientWorkerTests(t, node.Identity, node.Beacon, timeSource, node.Registry, node.Scheduler)

// Client storage implementation tests.
config := []struct {
Expand All @@ -348,7 +348,7 @@ func testStorageClient(t *testing.T, node *testNode) {
for _, kv := range config {
viper.Set(kv.key, kv.value)
}
debugClient, err := storageClient.New(ctx, node.Identity.TLSCertificate, nil, nil)
debugClient, err := storageClient.New(ctx, node.Identity, nil, nil)
require.NoError(t, err, "NewDebugStorageClient")
storageTests.StorageImplementationTests(t, debugClient, testNamespace)
}
Expand Down
Loading