From 507c75c3080647aa8c0bf96650f3dbf298adaa32 Mon Sep 17 00:00:00 2001 From: ptrus Date: Tue, 2 Jul 2019 15:05:45 +0200 Subject: [PATCH] go/epochtime: refactor to ticker --- .buildkite/scripts/common_e2e.sh | 53 +++-- .buildkite/scripts/test_e2e.sh | 6 +- .buildkite/scripts/test_migration.sh | 8 +- configs/single_node.yml | 10 +- configs/single_node_sgx.yml | 10 +- go/README.md | 6 +- go/beacon/init.go | 4 +- go/beacon/tendermint/tendermint.go | 4 +- go/beacon/tests/tester.go | 9 +- go/dummydebug/grpc.go | 41 +++- go/ekiden/cmd/debug/dummy/dummy.go | 22 +- go/ekiden/cmd/node/node.go | 25 +- go/ekiden/node_test.go | 34 +-- go/epochtime/api/api.go | 41 ---- go/epochtime/init.go | 51 ---- go/epochtime/tendermint/tendermint.go | 123 ---------- .../tendermint_mock/tendermint_mock.go | 225 ------------------ go/epochtime/tests/mock_tester.go | 68 ------ go/go.mod | 2 +- go/go.sum | 5 + go/grpc/dummydebug/dummy_debug.proto | 11 +- go/keymanager/init.go | 4 +- go/keymanager/tendermint/tendermint.go | 4 +- go/registry/init.go | 4 +- go/registry/tendermint/tendermint.go | 4 +- go/registry/tests/tester.go | 23 +- go/roothash/init.go | 4 +- go/roothash/tendermint/tendermint.go | 4 +- go/roothash/tests/tester.go | 22 +- go/scheduler/api/api.go | 14 +- go/scheduler/init.go | 4 +- go/scheduler/tendermint/tendermint.go | 19 +- go/scheduler/tests/tester.go | 15 +- go/storage/client/tests/tests.go | 8 +- go/tendermint/abci/mux.go | 48 ++-- go/tendermint/apps/beacon/beacon.go | 11 +- go/tendermint/apps/epochtime_mock/api.go | 48 ---- .../apps/epochtime_mock/epochtime_mock.go | 186 --------------- go/tendermint/apps/epochtime_mock/state.go | 118 --------- go/tendermint/apps/keymanager/keymanager.go | 9 +- go/tendermint/apps/registry/registry.go | 20 +- go/tendermint/apps/roothash/roothash.go | 8 +- go/tendermint/apps/scheduler/api.go | 3 + go/tendermint/apps/scheduler/scheduler.go | 19 +- go/tendermint/apps/ticker_mock/api.go | 48 ++++ go/tendermint/apps/ticker_mock/state.go | 143 +++++++++++ go/tendermint/apps/ticker_mock/ticker_mock.go | 173 ++++++++++++++ go/tendermint/tendermint.go | 12 +- go/ticker/api/api.go | 29 +++ go/ticker/init.go | 54 +++++ go/ticker/tendermint/tendermint.go | 130 ++++++++++ go/ticker/tendermint_mock/tendermint_mock.go | 216 +++++++++++++++++ go/ticker/tests/mock_tester.go | 89 +++++++ go/worker/compute/tests/tester.go | 14 +- go/worker/registration/registration.go | 46 ++-- go/worker/storage/storage.go | 2 - go/worker/txnscheduler/tests/tester.go | 14 +- scripts/benchmark-e2e.sh | 8 +- 58 files changed, 1228 insertions(+), 1107 deletions(-) delete mode 100644 go/epochtime/api/api.go delete mode 100644 go/epochtime/init.go delete mode 100644 go/epochtime/tendermint/tendermint.go delete mode 100644 go/epochtime/tendermint_mock/tendermint_mock.go delete mode 100644 go/epochtime/tests/mock_tester.go delete mode 100644 go/tendermint/apps/epochtime_mock/api.go delete mode 100644 go/tendermint/apps/epochtime_mock/epochtime_mock.go delete mode 100644 go/tendermint/apps/epochtime_mock/state.go create mode 100644 go/tendermint/apps/ticker_mock/api.go create mode 100644 go/tendermint/apps/ticker_mock/state.go create mode 100644 go/tendermint/apps/ticker_mock/ticker_mock.go create mode 100644 go/ticker/api/api.go create mode 100644 go/ticker/init.go create mode 100644 go/ticker/tendermint/tendermint.go create mode 100644 go/ticker/tendermint_mock/tendermint_mock.go create mode 100644 go/ticker/tests/mock_tester.go diff --git a/.buildkite/scripts/common_e2e.sh b/.buildkite/scripts/common_e2e.sh index dd5e53b9978..f60824f3a83 100644 --- a/.buildkite/scripts/common_e2e.sh +++ b/.buildkite/scripts/common_e2e.sh @@ -24,14 +24,13 @@ EKIDEN_KM_RUNTIME_ID=${EKIDEN_KM_RUNTIME_ID:-"ffffffffffffffffffffffffffffffffff # EKIDEN_COMMITTEE_DIR # EKIDEN_GENESIS_FILE # EKIDEN_IAS_PROXY_PORT -# EKIDEN_EPOCHTIME_BACKEND # EKIDEN_VALIDATOR_SOCKET # EKIDEN_CLIENT_SOCKET # EKIDEN_ENTITY_PRIVATE_KEY # # Optional named arguments: # -# epochtime_backend - epochtime backend (default: tendermint) +# ekiden_ticker_settable - settable ticker (default: 1) # id - commitee identifier (default: 1) # replica_group_size - runtime replica group size (default: 2) # replica_group_backup_size - runtime replica group backup size (default: 1) @@ -39,7 +38,7 @@ EKIDEN_KM_RUNTIME_ID=${EKIDEN_KM_RUNTIME_ID:-"ffffffffffffffffffffffffffffffffff # run_backend_tendermint_committee() { # Optional arguments with default values. - local epochtime_backend="tendermint" + local ekiden_ticker_settable=1 local id=1 local replica_group_size=2 local replica_group_backup_size=1 @@ -140,9 +139,12 @@ run_backend_tendermint_committee() { EKIDEN_VALIDATOR_SOCKET=${base_datadir}-1/internal.sock EKIDEN_IAS_PROXY_PORT=${ias_proxy_port} EKIDEN_GENESIS_FILE=${genesis_file} - EKIDEN_EPOCHTIME_BACKEND=${epochtime_backend} + EKIDEN_TICKER_SETTABLE=${ekiden_ticker_settable} EKIDEN_ENTITY_PRIVATE_KEY=${entity_dir}/entity.pem + echo "EKIDEN TICKER SETTABLE" + echo "${EKIDEN_TICKER_SETTABLE}" + # Run the seed node. run_seed_node @@ -161,12 +163,13 @@ run_backend_tendermint_committee() { --log.file ${committee_dir}/validator-${idx}.log \ --grpc.log.verbose_debug \ --grpc.debug.port ${grpc_debug_port} \ - --epochtime.backend ${epochtime_backend} \ - --epochtime.tendermint.interval 30 \ + --ticker.interval 5 \ + ${EKIDEN_TICKER_SETTABLE:+--ticker.debug.settable} \ ${EKIDEN_BEACON_DETERMINISTIC:+--beacon.debug.deterministic} \ --metrics.mode none \ --storage.backend client \ --consensus.backend tendermint \ + --tendermint.abci.epoch_interval 5 \ --genesis.file ${genesis_file} \ --tendermint.core.listen_address tcp://0.0.0.0:${tm_port} \ --tendermint.consensus.timeout_commit 250ms \ @@ -244,11 +247,12 @@ run_compute_node() { --grpc.log.verbose_debug \ --storage.backend cachingclient \ --storage.cachingclient.file ${data_dir}/storage-cache \ - --epochtime.backend ${EKIDEN_EPOCHTIME_BACKEND} \ - --epochtime.tendermint.interval 30 \ + ${EKIDEN_TICKER_SETTABLE:+--ticker.debug.settable} \ + --ticker.interval 5 \ ${EKIDEN_BEACON_DETERMINISTIC:+--beacon.debug.deterministic} \ --metrics.mode none \ --consensus.backend tendermint \ + --tendermint.abci.epoch_interval 5 \ --genesis.file ${EKIDEN_GENESIS_FILE} \ --tendermint.core.listen_address tcp://0.0.0.0:${tm_port} \ --tendermint.consensus.timeout_commit 250ms \ @@ -315,12 +319,13 @@ run_storage_node() { --log.level debug \ --log.file ${log_file} \ --grpc.log.verbose_debug \ - --epochtime.backend ${EKIDEN_EPOCHTIME_BACKEND} \ - --epochtime.tendermint.interval 30 \ + ${EKIDEN_TICKER_SETTABLE:+--ticker.debug.settable} \ + --ticker.interval 5 \ ${EKIDEN_BEACON_DETERMINISTIC:+--beacon.debug.deterministic} \ --metrics.mode none \ --storage.backend leveldb \ --consensus.backend tendermint \ + --tendermint.abci.epoch_interval 5 \ --genesis.file ${EKIDEN_GENESIS_FILE} \ --tendermint.core.listen_address tcp://0.0.0.0:${tm_port} \ --tendermint.consensus.timeout_commit 250ms \ @@ -372,13 +377,14 @@ run_client_node() { --log.level debug \ --log.file ${log_file} \ --grpc.log.verbose_debug \ - --epochtime.backend ${EKIDEN_EPOCHTIME_BACKEND} \ - --epochtime.tendermint.interval 30 \ + ${EKIDEN_TICKER_SETTABLE:+--ticker.debug.settable} \ + --ticker.interval 5 \ ${EKIDEN_BEACON_DETERMINISTIC:+--beacon.debug.deterministic} \ --metrics.mode none \ --storage.backend cachingclient \ --storage.cachingclient.file ${data_dir}/storage-cache \ --consensus.backend tendermint \ + --tendermint.abci.epoch_interval 5 \ --roothash.tendermint.index_blocks \ --genesis.file ${EKIDEN_GENESIS_FILE} \ --tendermint.core.listen_address tcp://0.0.0.0:${tm_port} \ @@ -408,16 +414,11 @@ wait_nodes() { --nodes $nodes } -# Set epoch. +# Advances epoch. # -# Arguments: -# epoch - epoch to set -set_epoch() { - local epoch=$1 - - ${EKIDEN_NODE} debug dummy set-epoch \ - --address unix:${EKIDEN_VALIDATOR_SOCKET} \ - --epoch $epoch +advance_epoch() { + ${EKIDEN_NODE} debug dummy advance-epoch \ + --address unix:${EKIDEN_VALIDATOR_SOCKET} } # Run a key manager node. @@ -453,11 +454,12 @@ run_keymanager_node() { --grpc.log.verbose_debug \ --storage.backend cachingclient \ --storage.cachingclient.file ${data_dir}/storage-cache \ - --epochtime.backend ${EKIDEN_EPOCHTIME_BACKEND} \ - --epochtime.tendermint.interval 30 \ + ${EKIDEN_TICKER_SETTABLE:+--ticker.debug.settable} \ + --ticker.interval 5 \ ${EKIDEN_BEACON_DETERMINISTIC:+--beacon.debug.deterministic} \ --metrics.mode none \ --consensus.backend tendermint \ + --tendermint.abci.epoch_interval 5 \ --genesis.file ${EKIDEN_GENESIS_FILE} \ --tendermint.core.listen_address tcp://0.0.0.0:${tm_port} \ --tendermint.consensus.timeout_commit 250ms \ @@ -509,10 +511,11 @@ run_seed_node() { --log.file ${log_file} \ --metrics.mode none \ --genesis.file ${EKIDEN_GENESIS_FILE} \ - --epochtime.backend ${EKIDEN_EPOCHTIME_BACKEND} \ - --epochtime.tendermint.interval 30 \ + ${EKIDEN_TICKER_SETTABLE:+--ticker.debug.settable} \ + --ticker.interval 5 \ ${EKIDEN_BEACON_DETERMINISTIC:+--beacon.debug.deterministic} \ --consensus.backend tendermint \ + --tendermint.abci.epoch_interval 5 \ --tendermint.core.listen_address tcp://0.0.0.0:${EKIDEN_SEED_NODE_PORT} \ --tendermint.seed_mode \ --tendermint.debug.addr_book_lenient \ diff --git a/.buildkite/scripts/test_e2e.sh b/.buildkite/scripts/test_e2e.sh index b33ec81b542..2bccf5883ed 100755 --- a/.buildkite/scripts/test_e2e.sh +++ b/.buildkite/scripts/test_e2e.sh @@ -52,7 +52,7 @@ scenario_basic() { wait_nodes 6 # Advance epoch to elect a new committee. - set_epoch 1 + advance_epoch } scenario_compute_discrepancy() { @@ -72,7 +72,7 @@ scenario_compute_discrepancy() { wait_nodes 6 # Advance epoch to elect a new committee. - set_epoch 1 + advance_epoch } assert_compute_discrepancy_scenario_works() { @@ -98,7 +98,7 @@ scenario_merge_discrepancy() { wait_nodes 6 # Advance epoch to elect a new committee. - set_epoch 1 + advance_epoch } assert_merge_discrepancy_scenario_works() { diff --git a/.buildkite/scripts/test_migration.sh b/.buildkite/scripts/test_migration.sh index 3b7e7f3cc9b..4a969a78ed2 100755 --- a/.buildkite/scripts/test_migration.sh +++ b/.buildkite/scripts/test_migration.sh @@ -44,7 +44,7 @@ test_migration() { # Start the first network. run_backend_tendermint_committee \ - epochtime_backend=tendermint_mock \ + ekiden_ticker_settable=1 \ id=1 \ replica_group_size=1 \ replica_group_backup_size=0 \ @@ -56,7 +56,7 @@ test_migration() { # Wait for all nodes to start: 1 compute + 1 storage + key manager. wait_nodes 3 - set_epoch 1 + advance_epoch sleep 1 # Start client and do the state mutations. @@ -83,7 +83,7 @@ test_migration() { # Start the second network. run_backend_tendermint_committee \ - epochtime_backend=tendermint_mock \ + ekiden_ticker_settable=1 \ id=2 \ replica_group_size=1 \ replica_group_backup_size=0 \ @@ -96,7 +96,7 @@ test_migration() { # Wait for all nodes to start: 1 compute + 1 storage + key manager. wait_nodes 3 - set_epoch 2 + advance_epoch # Start client and do state verification, checking that migration succeeded. ${CLIENT} \ diff --git a/configs/single_node.yml b/configs/single_node.yml index 40336100776..aadccee68ad 100644 --- a/configs/single_node.yml +++ b/configs/single_node.yml @@ -70,12 +70,10 @@ worker: consensus: backend: tendermint -# Epochtime backend. -epochtime: - backend: tendermint - tendermint: - # This makes each epoch last for 30 tendermint blocks. - interval: 30 +# Ticker backend. +ticker: + # This makes tick happen every 5 blocks. + interval: 5 # Storage backend. storage: diff --git a/configs/single_node_sgx.yml b/configs/single_node_sgx.yml index fb5a2582d66..0f577ceeab8 100644 --- a/configs/single_node_sgx.yml +++ b/configs/single_node_sgx.yml @@ -77,12 +77,10 @@ ias: consensus: backend: tendermint -# Epochtime backend. -epochtime: - backend: tendermint - tendermint: - # This makes each epoch last for 30 tendermint blocks. - interval: 30 +# Ticker backend. +ticker: + # This makes tick happen every 5 blocks. + interval: 5 # Storage backend. storage: diff --git a/go/README.md b/go/README.md index 88834af6f94..cd35f304db7 100644 --- a/go/README.md +++ b/go/README.md @@ -35,7 +35,7 @@ node, development, and debugging. All sub-commands have online documentation that can be accessed via the `--help` parameter, for example: ``` -ekiden debug dummy set-epoch --help +ekiden debug dummy advance-epoch --help ``` ### `debug dummy` - Control the dummy (centralized) node during tests @@ -43,9 +43,9 @@ ekiden debug dummy set-epoch --help The `debug dummy` sub-command provides faclities for controlling the centralized node during tests. -#### `debug dummy set-epoch` - Set the Oasis epoch +#### `debug dummy advance-epoch` - Set the Oasis epoch -The `dummy set-epoch` sub-command allows the node's Oasis epoch to be +The `dummy advance-epoch` sub-command allows the node's Oasis epoch to be set to an arbitrary value, provided a compatible epochtime backend is being used (`mock`, `tendermint_mock`). diff --git a/go/beacon/init.go b/go/beacon/init.go index 2a309f9307f..ce8dfdc8c4d 100644 --- a/go/beacon/init.go +++ b/go/beacon/init.go @@ -12,8 +12,8 @@ import ( "github.com/oasislabs/ekiden/go/beacon/api" "github.com/oasislabs/ekiden/go/beacon/tendermint" commonFlags "github.com/oasislabs/ekiden/go/ekiden/cmd/common/flags" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" "github.com/oasislabs/ekiden/go/tendermint/service" + ticker "github.com/oasislabs/ekiden/go/ticker/api" ) const ( @@ -21,7 +21,7 @@ const ( ) // New constructs a new Backend based on the configuration flags. -func New(ctx context.Context, timeSource epochtime.Backend, tmService service.TendermintService) (api.Backend, error) { +func New(ctx context.Context, timeSource ticker.Backend, tmService service.TendermintService) (api.Backend, error) { backend := commonFlags.ConsensusBackend() switch strings.ToLower(backend) { case tendermint.BackendName: diff --git a/go/beacon/tendermint/tendermint.go b/go/beacon/tendermint/tendermint.go index b8f9a7f9c2e..439d2d81882 100644 --- a/go/beacon/tendermint/tendermint.go +++ b/go/beacon/tendermint/tendermint.go @@ -8,10 +8,10 @@ import ( "github.com/oasislabs/ekiden/go/beacon/api" "github.com/oasislabs/ekiden/go/common/logging" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" tmapi "github.com/oasislabs/ekiden/go/tendermint/api" app "github.com/oasislabs/ekiden/go/tendermint/apps/beacon" "github.com/oasislabs/ekiden/go/tendermint/service" + ticker "github.com/oasislabs/ekiden/go/ticker/api" ) // BackendName is the name of this implementation. @@ -36,7 +36,7 @@ func (t *Backend) GetBeacon(ctx context.Context, height int64) ([]byte, error) { } // New constructs a new tendermint backed beacon Backend instance. -func New(ctx context.Context, timeSource epochtime.Backend, service service.TendermintService, cfg *api.Config) (api.Backend, error) { +func New(ctx context.Context, timeSource ticker.Backend, service service.TendermintService, cfg *api.Config) (api.Backend, error) { if err := service.ForceInitialize(); err != nil { return nil, err } diff --git a/go/beacon/tests/tester.go b/go/beacon/tests/tester.go index 8c2f611d067..9d08b9451d5 100644 --- a/go/beacon/tests/tester.go +++ b/go/beacon/tests/tester.go @@ -8,20 +8,21 @@ import ( "github.com/stretchr/testify/require" "github.com/oasislabs/ekiden/go/beacon/api" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" - epochtimeTests "github.com/oasislabs/ekiden/go/epochtime/tests" + scheduler "github.com/oasislabs/ekiden/go/scheduler/api" + ticker "github.com/oasislabs/ekiden/go/ticker/api" + tickerTests "github.com/oasislabs/ekiden/go/ticker/tests" ) // BeaconImplementationTests exercises the basic functionality of a // beacon backend. -func BeaconImplementationTests(t *testing.T, backend api.Backend, epochtime epochtime.SetableBackend) { +func BeaconImplementationTests(t *testing.T, backend api.Backend, ticker ticker.SetableBackend, scheduler scheduler.Backend) { require := require.New(t) beacon, err := backend.GetBeacon(context.Background(), 0) require.NoError(err, "GetBeacon") require.Len(beacon, api.BeaconSize, "GetBeacon - length") - _ = epochtimeTests.MustAdvanceEpoch(t, epochtime, 1) + tickerTests.MustAdvanceEpoch(t, ticker, scheduler) newBeacon, err := backend.GetBeacon(context.Background(), 0) require.NoError(err, "GetBeacon") diff --git a/go/dummydebug/grpc.go b/go/dummydebug/grpc.go index 2c52acce9e7..4a4ee616e1d 100644 --- a/go/dummydebug/grpc.go +++ b/go/dummydebug/grpc.go @@ -7,14 +7,15 @@ import ( "google.golang.org/grpc" "github.com/oasislabs/ekiden/go/common/logging" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" registry "github.com/oasislabs/ekiden/go/registry/api" + scheduler "github.com/oasislabs/ekiden/go/scheduler/api" + ticker "github.com/oasislabs/ekiden/go/ticker/api" dbgPB "github.com/oasislabs/ekiden/go/grpc/dummydebug" ) var ( - errIncompatibleBackend = errors.New("epochtime/grpc: incompatible backend for call") + errIncompatibleBackend = errors.New("ticker/grpc: incompatible backend for call") _ dbgPB.DummyDebugServer = (*grpcServer)(nil) ) @@ -22,23 +23,44 @@ var ( type grpcServer struct { logger *logging.Logger - timeSource epochtime.Backend + timeSource ticker.Backend registry registry.Backend + scheduler scheduler.Backend } -func (s *grpcServer) SetEpoch(ctx context.Context, req *dbgPB.SetEpochRequest) (*dbgPB.SetEpochResponse, error) { - mockTS, ok := s.timeSource.(epochtime.SetableBackend) +func (s *grpcServer) AdvanceEpoch(ctx context.Context, req *dbgPB.AdvanceEpochRequest) (*dbgPB.AdvanceEpochResponse, error) { + mockTS, ok := s.timeSource.(ticker.SetableBackend) if !ok { return nil, errIncompatibleBackend } - epoch := epochtime.EpochTime(req.GetEpoch()) - err := mockTS.SetEpoch(ctx, epoch) + epoch, err := s.scheduler.GetEpoch(ctx, 0) if err != nil { return nil, err } - return &dbgPB.SetEpochResponse{}, nil + // TODO: make it not get stuck + for { + err := mockTS.DoTick(ctx) + if err != nil { + return nil, err + } + + newEpoch, nerr := s.scheduler.GetEpoch(ctx, 0) + if nerr != nil { + return nil, nerr + } + if epoch != newEpoch { + // After epoch changed, do one more tick. + err = mockTS.DoTick(ctx) + if err != nil { + return nil, err + } + break + } + } + + return &dbgPB.AdvanceEpochResponse{}, nil } func (s *grpcServer) WaitNodes(ctx context.Context, req *dbgPB.WaitNodesRequest) (*dbgPB.WaitNodesResponse, error) { @@ -81,11 +103,12 @@ Loop: // NewGRPCServer initializes and registers a gRPC dummydebug server // backed by the provided backends. -func NewGRPCServer(srv *grpc.Server, timeSource epochtime.Backend, registry registry.Backend) { +func NewGRPCServer(srv *grpc.Server, timeSource ticker.Backend, registry registry.Backend, scheduler scheduler.Backend) { s := &grpcServer{ logger: logging.GetLogger("dummydebug/grpc"), timeSource: timeSource, registry: registry, + scheduler: scheduler, } dbgPB.RegisterDummyDebugServer(srv, s) } diff --git a/go/ekiden/cmd/debug/dummy/dummy.go b/go/ekiden/cmd/debug/dummy/dummy.go index 02634a1b6ee..e59468e1593 100644 --- a/go/ekiden/cmd/debug/dummy/dummy.go +++ b/go/ekiden/cmd/debug/dummy/dummy.go @@ -18,7 +18,6 @@ import ( ) var ( - epoch uint64 nodes uint64 dummyCmd = &cobra.Command{ @@ -26,10 +25,10 @@ var ( Short: "control dummy node during tests", } - dummySetEpochCmd = &cobra.Command{ - Use: "set-epoch", - Short: "set mock epochtime", - Run: doSetEpoch, + dummyAdvanceEpochCmd = &cobra.Command{ + Use: "advance-epoch", + Short: "advance an epoch", + Run: advanceEpoch, } dummyWaitNodesCmd = &cobra.Command{ @@ -59,18 +58,16 @@ func doConnect(cmd *cobra.Command) (*grpc.ClientConn, dummydebug.DummyDebugClien return conn, client } -func doSetEpoch(cmd *cobra.Command, args []string) { +func advanceEpoch(cmd *cobra.Command, args []string) { conn, client := doConnect(cmd) defer conn.Close() - logger.Info("setting epoch", - "epoch", epoch, - ) + logger.Info("advancing epoch") // Use background context to block until mock epoch transition is done. - _, err := client.SetEpoch(context.Background(), &dummydebug.SetEpochRequest{Epoch: epoch}) + _, err := client.AdvanceEpoch(context.Background(), &dummydebug.AdvanceEpochRequest{}) if err != nil { - logger.Error("failed to set epoch", + logger.Error("failed to do tick", "err", err, ) } @@ -111,10 +108,9 @@ func doWaitNodes(cmd *cobra.Command, args []string) { // Register registers the dummy sub-command and all of it's children. func Register(parentCmd *cobra.Command) { cmdGrpc.RegisterClientFlags(dummyCmd, true) - dummySetEpochCmd.Flags().Uint64VarP(&epoch, "epoch", "e", 0, "set epoch to given value") dummyWaitNodesCmd.Flags().Uint64VarP(&nodes, "nodes", "n", 1, "number of nodes to wait for") - dummyCmd.AddCommand(dummySetEpochCmd) + dummyCmd.AddCommand(dummyAdvanceEpochCmd) dummyCmd.AddCommand(dummyWaitNodesCmd) parentCmd.AddCommand(dummyCmd) } diff --git a/go/ekiden/cmd/node/node.go b/go/ekiden/cmd/node/node.go index e97ae34a127..4b60cc87f9f 100644 --- a/go/ekiden/cmd/node/node.go +++ b/go/ekiden/cmd/node/node.go @@ -27,8 +27,6 @@ import ( "github.com/oasislabs/ekiden/go/ekiden/cmd/common/metrics" "github.com/oasislabs/ekiden/go/ekiden/cmd/common/pprof" "github.com/oasislabs/ekiden/go/ekiden/cmd/common/tracing" - "github.com/oasislabs/ekiden/go/epochtime" - epochtimeAPI "github.com/oasislabs/ekiden/go/epochtime/api" "github.com/oasislabs/ekiden/go/genesis" genesisAPI "github.com/oasislabs/ekiden/go/genesis/api" "github.com/oasislabs/ekiden/go/ias" @@ -47,6 +45,8 @@ import ( storageAPI "github.com/oasislabs/ekiden/go/storage/api" "github.com/oasislabs/ekiden/go/tendermint" tmService "github.com/oasislabs/ekiden/go/tendermint/service" + "github.com/oasislabs/ekiden/go/ticker" + tickerAPI "github.com/oasislabs/ekiden/go/ticker/api" workerCommon "github.com/oasislabs/ekiden/go/worker/common" "github.com/oasislabs/ekiden/go/worker/common/p2p" "github.com/oasislabs/ekiden/go/worker/compute" @@ -83,7 +83,7 @@ type Node struct { Genesis genesisAPI.Provider Identity *identity.Identity Beacon beaconAPI.Backend - Epochtime epochtimeAPI.Backend + Ticker tickerAPI.Backend Registry registryAPI.Backend RootHash roothashAPI.Backend Scheduler schedulerAPI.Backend @@ -126,24 +126,24 @@ func (n *Node) initBackends() error { var err error // Initialize the various backends. - if n.Epochtime, err = epochtime.New(n.svcMgr.Ctx, n.svcTmnt); err != nil { + if n.Ticker, err = ticker.New(n.svcMgr.Ctx, n.svcTmnt); err != nil { return err } - if n.Beacon, err = beacon.New(n.svcMgr.Ctx, n.Epochtime, n.svcTmnt); err != nil { + if n.Beacon, err = beacon.New(n.svcMgr.Ctx, n.Ticker, n.svcTmnt); err != nil { return err } if n.Staking, err = staking.New(n.svcMgr.Ctx, n.svcTmnt); err != nil { return err } - if n.Registry, err = registry.New(n.svcMgr.Ctx, n.Epochtime, n.svcTmnt); err != nil { + if n.Registry, err = registry.New(n.svcMgr.Ctx, n.Ticker, n.svcTmnt); err != nil { return err } n.svcMgr.RegisterCleanupOnly(n.Registry, "registry backend") - if n.KeyManager, err = keymanager.New(n.svcMgr.Ctx, n.Epochtime, n.Registry, n.svcTmnt); err != nil { + if n.KeyManager, err = keymanager.New(n.svcMgr.Ctx, n.Ticker, n.Registry, n.svcTmnt); err != nil { return err } n.svcMgr.RegisterCleanupOnly(n.Staking, "staking backend") - if n.Scheduler, err = scheduler.New(n.svcMgr.Ctx, n.Epochtime, n.Registry, n.Beacon, n.svcTmnt); err != nil { + if n.Scheduler, err = scheduler.New(n.svcMgr.Ctx, n.Ticker, n.Registry, n.Beacon, n.svcTmnt); err != nil { return err } n.svcMgr.RegisterCleanupOnly(n.Scheduler, "scheduler backend") @@ -152,7 +152,7 @@ func (n *Node) initBackends() error { return err } n.svcMgr.RegisterCleanupOnly(n.Storage, "storage backend") - if n.RootHash, err = roothash.New(n.svcMgr.Ctx, dataDir, n.Epochtime, n.Scheduler, n.Registry, n.Beacon, n.svcTmnt); err != nil { + if n.RootHash, err = roothash.New(n.svcMgr.Ctx, dataDir, n.Ticker, n.Scheduler, n.Registry, n.Beacon, n.svcTmnt); err != nil { return err } n.svcMgr.RegisterCleanupOnly(n.RootHash, "roothash backend") @@ -162,7 +162,7 @@ func (n *Node) initBackends() error { registry.NewGRPCServer(grpcSrv, n.Registry) staking.NewGRPCServer(grpcSrv, n.Staking) storage.NewGRPCServer(grpcSrv, n.Storage) - dummydebug.NewGRPCServer(grpcSrv, n.Epochtime, n.Registry) + dummydebug.NewGRPCServer(grpcSrv, n.Ticker, n.Registry, n.Scheduler) cmdCommon.Logger().Debug("backends initialized") @@ -203,7 +203,7 @@ func (n *Node) initAndStartWorkers(logger *logging.Logger) error { workerCommonCfg := n.CommonWorker.GetConfig() n.WorkerRegistration, err = registration.New( dataDir, - n.Epochtime, + n.Ticker, n.Registry, n.Identity, n.svcTmnt, @@ -231,7 +231,6 @@ func (n *Node) initAndStartWorkers(logger *logging.Logger) error { // Initialize the storage worker. n.StorageWorker, err = workerStorage.New( - n.Epochtime, n.Storage, n.CommonWorker.Grpc, n.WorkerRegistration, @@ -564,7 +563,7 @@ func RegisterFlags(cmd *cobra.Command) { pprof.RegisterFlags, genesis.RegisterFlags, beacon.RegisterFlags, - epochtime.RegisterFlags, + ticker.RegisterFlags, registry.RegisterFlags, roothash.RegisterFlags, scheduler.RegisterFlags, diff --git a/go/ekiden/node_test.go b/go/ekiden/node_test.go index 4ffd84e0b8b..76ae9528362 100644 --- a/go/ekiden/node_test.go +++ b/go/ekiden/node_test.go @@ -19,8 +19,6 @@ import ( "github.com/oasislabs/ekiden/go/common/entity" cmdCommon "github.com/oasislabs/ekiden/go/ekiden/cmd/common" "github.com/oasislabs/ekiden/go/ekiden/cmd/node" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" - epochtimeTests "github.com/oasislabs/ekiden/go/epochtime/tests" registry "github.com/oasislabs/ekiden/go/registry/api" registryTests "github.com/oasislabs/ekiden/go/registry/tests" roothashTests "github.com/oasislabs/ekiden/go/roothash/tests" @@ -29,6 +27,8 @@ import ( storageClient "github.com/oasislabs/ekiden/go/storage/client" storageClientTests "github.com/oasislabs/ekiden/go/storage/client/tests" storageTests "github.com/oasislabs/ekiden/go/storage/tests" + ticker "github.com/oasislabs/ekiden/go/ticker/api" + tickerTests "github.com/oasislabs/ekiden/go/ticker/tests" computeCommittee "github.com/oasislabs/ekiden/go/worker/compute/committee" computeWorkerTests "github.com/oasislabs/ekiden/go/worker/compute/tests" storageWorkerTests "github.com/oasislabs/ekiden/go/worker/storage/tests" @@ -47,7 +47,7 @@ var ( value interface{} }{ {"log.level.default", "DEBUG"}, - {"epochtime.backend", "tendermint_mock"}, + {"ticker.debug.settable", true}, {"consensus.backend", "tendermint"}, {"registry.debug.allow_runtime_registration", true}, {"registry.debug.bypass_stake", true}, @@ -184,7 +184,7 @@ func TestNode(t *testing.T) { // Clean up and ensure the registry is empty for the following tests. {"DeregisterTestEntityRuntime", testDeregisterEntityRuntime}, - {"EpochTime", testEpochTime}, + {"Ticker", testTicker}, {"Beacon", testBeacon}, {"Storage", testStorage}, {"Registry", testRegistry}, @@ -260,14 +260,14 @@ func testDeregisterEntityRuntime(t *testing.T, node *testNode) { registryTests.EnsureRegistryEmpty(t, node.Node.Registry) } -func testEpochTime(t *testing.T, node *testNode) { - epochtimeTests.EpochtimeSetableImplementationTest(t, node.Epochtime) +func testTicker(t *testing.T, node *testNode) { + tickerTests.TickerSetableImplementationTest(t, node.Ticker) } func testBeacon(t *testing.T, node *testNode) { - timeSource := (node.Epochtime).(epochtime.SetableBackend) + timeSource := (node.Ticker).(ticker.SetableBackend) - beaconTests.BeaconImplementationTests(t, node.Beacon, timeSource) + beaconTests.BeaconImplementationTests(t, node.Beacon, timeSource, node.Scheduler) } func testStorage(t *testing.T, node *testNode) { @@ -275,13 +275,13 @@ func testStorage(t *testing.T, node *testNode) { } func testRegistry(t *testing.T, node *testNode) { - timeSource := (node.Epochtime).(epochtime.SetableBackend) + timeSource := (node.Ticker).(ticker.SetableBackend) - registryTests.RegistryImplementationTests(t, node.Registry, timeSource) + registryTests.RegistryImplementationTests(t, node.Registry, timeSource, node.Scheduler) } func testScheduler(t *testing.T, node *testNode) { - timeSource := (node.Epochtime).(epochtime.SetableBackend) + timeSource := (node.Ticker).(ticker.SetableBackend) schedulerTests.SchedulerImplementationTests(t, node.Scheduler, timeSource, node.Registry) } @@ -291,16 +291,16 @@ func testStaking(t *testing.T, node *testNode) { } func testRootHash(t *testing.T, node *testNode) { - timeSource := (node.Epochtime).(epochtime.SetableBackend) + timeSource := (node.Ticker).(ticker.SetableBackend) roothashTests.RootHashImplementationTests(t, node.RootHash, timeSource, node.Scheduler, node.Storage, node.Registry) } func testComputeWorker(t *testing.T, node *testNode) { - timeSource := (node.Epochtime).(epochtime.SetableBackend) + timeSource := (node.Ticker).(ticker.SetableBackend) require.NotNil(t, node.computeCommitteeNode) - computeWorkerTests.WorkerImplementationTests(t, node.ComputeWorker, node.runtimeID, node.computeCommitteeNode, timeSource) + computeWorkerTests.WorkerImplementationTests(t, node.ComputeWorker, node.runtimeID, node.computeCommitteeNode, timeSource, node.Scheduler) } func testStorageWorker(t *testing.T, node *testNode) { @@ -308,10 +308,10 @@ func testStorageWorker(t *testing.T, node *testNode) { } func testTransactionSchedulerWorker(t *testing.T, node *testNode) { - timeSource := (node.Epochtime).(epochtime.SetableBackend) + timeSource := (node.Ticker).(ticker.SetableBackend) require.NotNil(t, node.txnschedulerCommitteeNode) - txnschedulerWorkerTests.WorkerImplementationTests(t, node.TransactionSchedulerWorker, node.runtimeID, node.txnschedulerCommitteeNode, timeSource, node.RootHash, node.Storage) + txnschedulerWorkerTests.WorkerImplementationTests(t, node.TransactionSchedulerWorker, node.runtimeID, node.txnschedulerCommitteeNode, timeSource, node.RootHash, node.Storage, node.Scheduler) } func testClient(t *testing.T, node *testNode) { @@ -319,7 +319,7 @@ func testClient(t *testing.T, node *testNode) { } func testStorageClient(t *testing.T, node *testNode) { - timeSource := (node.Epochtime).(epochtime.SetableBackend) + timeSource := (node.Ticker).(ticker.SetableBackend) ctx := context.Background() // Storage client tests. diff --git a/go/epochtime/api/api.go b/go/epochtime/api/api.go deleted file mode 100644 index 6ed4e260375..00000000000 --- a/go/epochtime/api/api.go +++ /dev/null @@ -1,41 +0,0 @@ -// Package api implements the Oasis timekeeping API and common types. -package api - -import ( - "context" - - "github.com/oasislabs/ekiden/go/common/pubsub" -) - -// EpochTime is the number of intervals (epochs) since a fixed instant -// in time (epoch date). -type EpochTime uint64 - -// EpochInvalid is the placeholder invalid epoch. -const EpochInvalid EpochTime = 0xffffffffffffffff // ~50 quadrillion years away. - -// Backend is a timekeeping implementation. -type Backend interface { - // GetEpoch returns the epoch at the specified block height. - // Calling this method with height `0`, should return the - // epoch of latest known block. - GetEpoch(context.Context, int64) (epoch EpochTime, err error) - - // GetEpochBlock returns the block height at the start of the said - // epoch. - GetEpochBlock(context.Context, EpochTime) (int64, error) - - // WatchEpochs returns a channel that produces a stream of messages - // on epoch transitions. - // - // Upon subscription the current epoch is sent immediately. - WatchEpochs() (<-chan EpochTime, *pubsub.Subscription) -} - -// SetableBackend is a Backend that supports setting the current epoch. -type SetableBackend interface { - Backend - - // SetEpoch sets the current epoch. - SetEpoch(context.Context, EpochTime) error -} diff --git a/go/epochtime/init.go b/go/epochtime/init.go deleted file mode 100644 index 5f5ed502134..00000000000 --- a/go/epochtime/init.go +++ /dev/null @@ -1,51 +0,0 @@ -// Package epochtime implements the Oasis timekeeping backend. -package epochtime - -import ( - "context" - "fmt" - "strings" - - "github.com/spf13/cobra" - "github.com/spf13/viper" - - "github.com/oasislabs/ekiden/go/epochtime/api" - "github.com/oasislabs/ekiden/go/epochtime/tendermint" - "github.com/oasislabs/ekiden/go/epochtime/tendermint_mock" - "github.com/oasislabs/ekiden/go/tendermint/service" -) - -const ( - cfgBackend = "epochtime.backend" - cfgTendermintInterval = "epochtime.tendermint.interval" -) - -// New constructs a new Backend based on the configuration flags. -func New(ctx context.Context, tmService service.TendermintService) (api.Backend, error) { - backend := viper.GetString(cfgBackend) - switch strings.ToLower(backend) { - case tendermint.BackendName: - interval := viper.GetInt64(cfgTendermintInterval) - return tendermint.New(ctx, tmService, interval) - case tendermintmock.BackendName: - return tendermintmock.New(ctx, tmService) - default: - return nil, fmt.Errorf("epochtime: unsupported backend: '%v'", backend) - } -} - -// RegisterFlags registers the configuration flags with the provided -// command. -func RegisterFlags(cmd *cobra.Command) { - if !cmd.Flags().Parsed() { - cmd.Flags().String(cfgBackend, tendermint.BackendName, "Epoch time backend") - cmd.Flags().Int64(cfgTendermintInterval, 86400, "Epoch interval (in blocks)") - } - - for _, v := range []string{ - cfgBackend, - cfgTendermintInterval, - } { - viper.BindPFlag(v, cmd.Flags().Lookup(v)) //nolint: errcheck - } -} diff --git a/go/epochtime/tendermint/tendermint.go b/go/epochtime/tendermint/tendermint.go deleted file mode 100644 index eee83120dfc..00000000000 --- a/go/epochtime/tendermint/tendermint.go +++ /dev/null @@ -1,123 +0,0 @@ -// Package tendermint implements the tendermint backed epochtime backend. -package tendermint - -import ( - "context" - "sync" - - "github.com/eapache/channels" - tmtypes "github.com/tendermint/tendermint/types" - - "github.com/oasislabs/ekiden/go/common/logging" - "github.com/oasislabs/ekiden/go/common/pubsub" - "github.com/oasislabs/ekiden/go/epochtime/api" - tmapi "github.com/oasislabs/ekiden/go/tendermint/api" - "github.com/oasislabs/ekiden/go/tendermint/service" -) - -const ( - // BackendName is the name of this implementation. - BackendName = tmapi.BackendName -) - -var _ api.Backend = (*tendermintBackend)(nil) - -type tendermintBackend struct { - sync.RWMutex - - logger *logging.Logger - - service service.TendermintService - notifier *pubsub.Broker - - interval int64 - lastNotified api.EpochTime - epoch api.EpochTime -} - -func (t *tendermintBackend) GetEpoch(ctx context.Context, height int64) (api.EpochTime, error) { - if height == 0 { - t.RLock() - defer t.RUnlock() - return t.epoch, nil - } - epoch := api.EpochTime(height / t.interval) - - return epoch, nil -} - -func (t *tendermintBackend) GetEpochBlock(ctx context.Context, epoch api.EpochTime) (int64, error) { - height := int64(epoch) * t.interval - - return height, nil -} - -func (t *tendermintBackend) WatchEpochs() (<-chan api.EpochTime, *pubsub.Subscription) { - typedCh := make(chan api.EpochTime) - sub := t.notifier.Subscribe() - sub.Unwrap(typedCh) - - return typedCh, sub -} - -func (t *tendermintBackend) worker(ctx context.Context) { - ch, sub := t.service.WatchBlocks() - defer sub.Close() - - for { - block, ok := <-ch - if !ok { - return - } - - if t.updateCached(ctx, block) { - // Safe to look at `t.epoch`, only mutator is the line above. - t.notifier.Broadcast(t.epoch) - } - } -} - -func (t *tendermintBackend) updateCached(ctx context.Context, block *tmtypes.Block) bool { - t.Lock() - defer t.Unlock() - - epoch, _ := t.GetEpoch(ctx, block.Header.Height) - - t.epoch = epoch - - if t.lastNotified != epoch { - t.logger.Debug("epoch transition", - "prev_epoch", t.lastNotified, - "epoch", epoch, - ) - t.lastNotified = t.epoch - return true - } - return false -} - -// New constructs a new tendermint backed epochtime Backend instance, -// with the specified epoch interval. -func New(ctx context.Context, service service.TendermintService, interval int64) (api.Backend, error) { - if err := service.ForceInitialize(); err != nil { - return nil, err - } - - r := &tendermintBackend{ - logger: logging.GetLogger("epochtime/tendermint"), - service: service, - interval: interval, - } - r.notifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) { - r.RLock() - defer r.RUnlock() - - if r.lastNotified == r.epoch { - ch.In() <- r.epoch - } - }) - - go r.worker(ctx) - - return r, nil -} diff --git a/go/epochtime/tendermint_mock/tendermint_mock.go b/go/epochtime/tendermint_mock/tendermint_mock.go deleted file mode 100644 index 83e5c5c265c..00000000000 --- a/go/epochtime/tendermint_mock/tendermint_mock.go +++ /dev/null @@ -1,225 +0,0 @@ -// Package tendermintmock implements the mock (settable) tendermint backed epochtime backend. -package tendermintmock - -import ( - "bytes" - "context" - "sync" - - "github.com/eapache/channels" - "github.com/pkg/errors" - tmtypes "github.com/tendermint/tendermint/types" - - "github.com/oasislabs/ekiden/go/common/cbor" - "github.com/oasislabs/ekiden/go/common/logging" - "github.com/oasislabs/ekiden/go/common/pubsub" - "github.com/oasislabs/ekiden/go/epochtime/api" - tmapi "github.com/oasislabs/ekiden/go/tendermint/api" - app "github.com/oasislabs/ekiden/go/tendermint/apps/epochtime_mock" - "github.com/oasislabs/ekiden/go/tendermint/service" -) - -const ( - // BackendName is the name of this implementation. - BackendName = "tendermint_mock" -) - -var _ api.Backend = (*tendermintMockBackend)(nil) - -type tendermintMockBackend struct { - sync.RWMutex - - logger *logging.Logger - - service service.TendermintService - notifier *pubsub.Broker - - lastNotified api.EpochTime - epoch api.EpochTime - currentBlock int64 -} - -func (t *tendermintMockBackend) GetEpoch(ctx context.Context, height int64) (api.EpochTime, error) { - response, err := t.service.Query(app.QueryGetEpoch, nil, height) - if err != nil { - return 0, errors.Wrap(err, "epochtime: get block epoch query failed") - } - - var data app.QueryGetEpochResponse - if err := cbor.Unmarshal(response, &data); err != nil { - return 0, errors.Wrap(err, "epochtime: get block epoch malformed response") - } - - return data.Epoch, nil -} - -func (t *tendermintMockBackend) GetEpochBlock(ctx context.Context, epoch api.EpochTime) (int64, error) { - t.RLock() - defer t.RUnlock() - - if epoch == t.epoch { - return t.currentBlock, nil - } - - t.logger.Error("epochtime: attempted to get block for historic epoch", - "epoch", epoch, - "current_epoch", t.epoch, - ) - - return 0, errors.New("epochtime: not implemented for historic epochs") -} - -func (t *tendermintMockBackend) WatchEpochs() (<-chan api.EpochTime, *pubsub.Subscription) { - typedCh := make(chan api.EpochTime) - sub := t.notifier.Subscribe() - sub.Unwrap(typedCh) - - return typedCh, sub -} - -func (t *tendermintMockBackend) SetEpoch(ctx context.Context, epoch api.EpochTime) error { - tx := app.Tx{ - TxSetEpoch: &app.TxSetEpoch{ - Epoch: epoch, - }, - } - - ch, sub := t.WatchEpochs() - defer sub.Close() - - if err := t.service.BroadcastTx(app.TransactionTag, tx); err != nil { - return errors.Wrap(err, "epochtime: set epoch failed") - } - - for { - select { - case newEpoch, ok := <-ch: - if !ok { - return context.Canceled - } - if newEpoch == epoch { - return nil - } - case <-ctx.Done(): - return context.Canceled - } - } -} - -func (t *tendermintMockBackend) worker(ctx context.Context) { - // Subscribe to blocks which advance the epoch. - sub, err := t.service.Subscribe("epochtime-worker", app.QueryApp) - if err != nil { - t.logger.Error("failed to subscribe", - "err", err, - ) - return - } - defer t.service.Unsubscribe("epochtime-worker", app.QueryApp) // nolint: errcheck - - // Populate current epoch (if available). - response, err := t.service.Query(app.QueryGetEpoch, nil, 0) - if err == nil { - var data app.QueryGetEpochResponse - if err := cbor.Unmarshal(response, &data); err != nil { - panic("worker: malformed current epoch response") - } - - t.Lock() - t.epoch = data.Epoch - t.currentBlock = data.Height - t.notifier.Broadcast(t.epoch) - t.Unlock() - } - - for { - var event interface{} - - select { - case msg := <-sub.Out(): - event = msg.Data() - case <-sub.Cancelled(): - t.logger.Debug("worker: terminating, subscription closed") - return - case <-ctx.Done(): - return - } - - switch ev := event.(type) { - case tmtypes.EventDataNewBlock: - t.onEventDataNewBlock(ctx, ev) - default: - } - } -} - -func (t *tendermintMockBackend) onEventDataNewBlock(ctx context.Context, ev tmtypes.EventDataNewBlock) { - events := ev.ResultBeginBlock.GetEvents() - - for _, tmEv := range events { - if tmEv.GetType() != tmapi.EventTypeEkiden { - continue - } - - for _, pair := range tmEv.GetAttributes() { - if bytes.Equal(pair.GetKey(), app.TagEpoch) { - var epoch api.EpochTime - if err := cbor.Unmarshal(pair.GetValue(), &epoch); err != nil { - t.logger.Error("worker: malformed mock epoch", - "err", err, - ) - continue - } - - if t.updateCached(ev.Block.Header.Height, epoch) { - t.notifier.Broadcast(t.epoch) - } - } - } - } -} - -func (t *tendermintMockBackend) updateCached(height int64, epoch api.EpochTime) bool { - t.Lock() - defer t.Unlock() - - t.epoch = epoch - t.currentBlock = height - - if t.lastNotified != epoch { - t.logger.Debug("epoch transition", - "prev_epoch", t.lastNotified, - "epoch", epoch, - "height", height, - ) - t.lastNotified = t.epoch - return true - } - return false -} - -// New constructs a new mock tendermint backed epochtime Backend instance. -func New(ctx context.Context, service service.TendermintService) (api.SetableBackend, error) { - // Initialze and register the tendermint service component. - app := app.New() - if err := service.RegisterApplication(app, nil); err != nil { - return nil, err - } - - r := &tendermintMockBackend{ - logger: logging.GetLogger("epochtime/tendermint_mock"), - service: service, - } - r.notifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) { - r.RLock() - defer r.RUnlock() - - if r.lastNotified == r.epoch { - ch.In() <- r.epoch - } - }) - - go r.worker(ctx) - - return r, nil -} diff --git a/go/epochtime/tests/mock_tester.go b/go/epochtime/tests/mock_tester.go deleted file mode 100644 index 72a670028ad..00000000000 --- a/go/epochtime/tests/mock_tester.go +++ /dev/null @@ -1,68 +0,0 @@ -// Package tests is a collection of epochtime implementation test cases. -package tests - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/oasislabs/ekiden/go/epochtime/api" -) - -const recvTimeout = 1 * time.Second - -// EpochtimeSetableImplementationTest exercises the basic functionality of -// a setable (mock) epochtime backend. -func EpochtimeSetableImplementationTest(t *testing.T, backend api.Backend) { - require := require.New(t) - - // Ensure that the backend is setable. - require.Implements((*api.SetableBackend)(nil), backend, "epoch time backend is mock") - timeSource := (backend).(api.SetableBackend) - - epoch, err := timeSource.GetEpoch(context.Background(), 0) - require.NoError(err, "GetEpoch") - - var e api.EpochTime - - ch, sub := timeSource.WatchEpochs() - defer sub.Close() - select { - case e = <-ch: - require.Equal(epoch, e, "WatchEpochs initial") - case <-time.After(recvTimeout): - t.Fatalf("failed to receive current epoch on WatchEpochs") - } - - epoch++ - err = timeSource.SetEpoch(context.Background(), epoch) - require.NoError(err, "SetEpoch") - - select { - case e = <-ch: - require.Equal(epoch, e, "WatchEpochs after set") - case <-time.After(recvTimeout): - t.Fatalf("failed to receive epoch notification after transition") - } - - e, err = timeSource.GetEpoch(context.Background(), 0) - require.NoError(err, "GetEpoch after set") - require.Equal(epoch, e, "GetEpoch after set, epoch") -} - -// MustAdvanceEpoch advances the epoch by the specified increment, and returns -// the new epoch. -func MustAdvanceEpoch(t *testing.T, backend api.SetableBackend, increment uint64) api.EpochTime { - require := require.New(t) - - epoch, err := backend.GetEpoch(context.Background(), 0) - require.NoError(err, "GetEpoch") - - epoch = epoch + api.EpochTime(increment) - err = backend.SetEpoch(context.Background(), epoch) - require.NoError(err, "SetEpoch") - - return epoch -} diff --git a/go/go.mod b/go/go.mod index ef2a518ab5a..95039404a55 100644 --- a/go/go.mod +++ b/go/go.mod @@ -76,5 +76,5 @@ require ( golang.org/x/text v0.3.2 // indirect google.golang.org/appengine v1.4.0 // indirect google.golang.org/genproto v0.0.0-20190611190212-a7e196e89fd3 // indirect - google.golang.org/grpc v1.21.1 + google.golang.org/grpc v1.22.0 ) diff --git a/go/go.sum b/go/go.sum index f2411db192e..ad05c14cf6b 100644 --- a/go/go.sum +++ b/go/go.sum @@ -500,6 +500,7 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -524,6 +525,7 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= @@ -536,6 +538,8 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= google.golang.org/grpc v1.21.1 h1:j6XxA85m/6txkUCHvzlV5f+HBNl/1r5cZ2A/3IEFOO8= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= +google.golang.org/grpc v1.22.0 h1:J0UbZOIrCAl+fpTOf8YLs4dJo8L/owV4LYVtAXQoPkw= +google.golang.org/grpc v1.22.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= @@ -550,3 +554,4 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/go/grpc/dummydebug/dummy_debug.proto b/go/grpc/dummydebug/dummy_debug.proto index a09dd6ac49a..fb42a274eff 100644 --- a/go/grpc/dummydebug/dummy_debug.proto +++ b/go/grpc/dummydebug/dummy_debug.proto @@ -4,18 +4,15 @@ package dummy_debug; option go_package = "github.com/oasislabs/ekiden/go/grpc/dummydebug"; service DummyDebug { - // Manually set the epoch and notify subscribers. - rpc SetEpoch (SetEpochRequest) returns (SetEpochResponse) {} + // Manually do a tick and notify subscribers. + rpc AdvanceEpoch (AdvanceEpochRequest) returns (AdvanceEpochResponse) {} // Watch the registry and wait for given number of compute nodes. rpc WaitNodes (WaitNodesRequest) returns (WaitNodesResponse) {} } -message SetEpochRequest { - // The new epoch. - uint64 epoch = 1; -} +message AdvanceEpochRequest {} -message SetEpochResponse {} +message AdvanceEpochResponse {} message WaitNodesRequest { // Number of nodes to wait for. diff --git a/go/keymanager/init.go b/go/keymanager/init.go index 9993d3297f6..9f0e5df452c 100644 --- a/go/keymanager/init.go +++ b/go/keymanager/init.go @@ -9,17 +9,17 @@ import ( "github.com/spf13/cobra" commonFlags "github.com/oasislabs/ekiden/go/ekiden/cmd/common/flags" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" "github.com/oasislabs/ekiden/go/keymanager/api" "github.com/oasislabs/ekiden/go/keymanager/tendermint" registry "github.com/oasislabs/ekiden/go/registry/api" "github.com/oasislabs/ekiden/go/tendermint/service" + ticker "github.com/oasislabs/ekiden/go/ticker/api" ) // New constructs a new Backend based on the configuration flags. func New( ctx context.Context, - timeSource epochtime.Backend, + timeSource ticker.Backend, registry registry.Backend, service service.TendermintService, ) (api.Backend, error) { diff --git a/go/keymanager/tendermint/tendermint.go b/go/keymanager/tendermint/tendermint.go index a5bf28a1476..22df1ce2e00 100644 --- a/go/keymanager/tendermint/tendermint.go +++ b/go/keymanager/tendermint/tendermint.go @@ -14,12 +14,12 @@ import ( "github.com/oasislabs/ekiden/go/common/crypto/signature" "github.com/oasislabs/ekiden/go/common/logging" "github.com/oasislabs/ekiden/go/common/pubsub" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" "github.com/oasislabs/ekiden/go/keymanager/api" tmapi "github.com/oasislabs/ekiden/go/tendermint/api" app "github.com/oasislabs/ekiden/go/tendermint/apps/keymanager" registryapp "github.com/oasislabs/ekiden/go/tendermint/apps/registry" "github.com/oasislabs/ekiden/go/tendermint/service" + ticker "github.com/oasislabs/ekiden/go/ticker/api" ) // BackendName is the name of the backend. @@ -134,7 +134,7 @@ func (r *tendermintBackend) onEventDataNewBlock(ev tmtypes.EventDataNewBlock) { // New constructs a new tendermint backed key manager management Backend // instance. -func New(ctx context.Context, timeSource epochtime.Backend, service service.TendermintService) (api.Backend, error) { +func New(ctx context.Context, timeSource ticker.Backend, service service.TendermintService) (api.Backend, error) { app := app.New(timeSource) if err := service.RegisterApplication(app, []string{registryapp.AppName}); err != nil { return nil, errors.Wrap(err, "keymanager/tendermint: failed to register app") diff --git a/go/registry/init.go b/go/registry/init.go index 802c06c89ed..72175a1a136 100644 --- a/go/registry/init.go +++ b/go/registry/init.go @@ -10,10 +10,10 @@ import ( "github.com/spf13/viper" commonFlags "github.com/oasislabs/ekiden/go/ekiden/cmd/common/flags" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" "github.com/oasislabs/ekiden/go/registry/api" "github.com/oasislabs/ekiden/go/registry/tendermint" "github.com/oasislabs/ekiden/go/tendermint/service" + ticker "github.com/oasislabs/ekiden/go/ticker/api" ) const ( @@ -22,7 +22,7 @@ const ( ) // New constructs a new Backend based on the configuration flags. -func New(ctx context.Context, timeSource epochtime.Backend, tmService service.TendermintService) (api.Backend, error) { +func New(ctx context.Context, timeSource ticker.Backend, tmService service.TendermintService) (api.Backend, error) { backend := commonFlags.ConsensusBackend() var impl api.Backend diff --git a/go/registry/tendermint/tendermint.go b/go/registry/tendermint/tendermint.go index e285c5e43f6..eac3429b6a6 100644 --- a/go/registry/tendermint/tendermint.go +++ b/go/registry/tendermint/tendermint.go @@ -16,12 +16,12 @@ import ( "github.com/oasislabs/ekiden/go/common/logging" "github.com/oasislabs/ekiden/go/common/node" "github.com/oasislabs/ekiden/go/common/pubsub" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" "github.com/oasislabs/ekiden/go/registry/api" tmapi "github.com/oasislabs/ekiden/go/tendermint/api" app "github.com/oasislabs/ekiden/go/tendermint/apps/registry" stakingapp "github.com/oasislabs/ekiden/go/tendermint/apps/staking" "github.com/oasislabs/ekiden/go/tendermint/service" + ticker "github.com/oasislabs/ekiden/go/ticker/api" ) // BackendName is the name of this implementation. @@ -432,7 +432,7 @@ func (r *tendermintBackend) getNodeList(ctx context.Context, height int64) (*api } // New constructs a new tendermint backed registry Backend instance. -func New(ctx context.Context, timeSource epochtime.Backend, service service.TendermintService, cfg *api.Config) (api.Backend, error) { +func New(ctx context.Context, timeSource ticker.Backend, service service.TendermintService, cfg *api.Config) (api.Backend, error) { // Initialize and register the tendermint service component. app := app.New(timeSource, cfg) if err := service.RegisterApplication(app, []string{stakingapp.AppName}); err != nil { diff --git a/go/registry/tests/tester.go b/go/registry/tests/tester.go index ad9b876ad90..e23de7cd47c 100644 --- a/go/registry/tests/tester.go +++ b/go/registry/tests/tester.go @@ -15,9 +15,10 @@ import ( memorySigner "github.com/oasislabs/ekiden/go/common/crypto/signature/signers/memory" "github.com/oasislabs/ekiden/go/common/entity" "github.com/oasislabs/ekiden/go/common/node" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" - epochtimeTests "github.com/oasislabs/ekiden/go/epochtime/tests" "github.com/oasislabs/ekiden/go/registry/api" + schedulerApi "github.com/oasislabs/ekiden/go/scheduler/api" + ticker "github.com/oasislabs/ekiden/go/ticker/api" + tickerTests "github.com/oasislabs/ekiden/go/ticker/tests" ) const recvTimeout = 1 * time.Second @@ -27,10 +28,10 @@ const recvTimeout = 1 * time.Second // // WARNING: This assumes that the registry is empty, and will leave // a Runtime registered. -func RegistryImplementationTests(t *testing.T, backend api.Backend, timeSource epochtime.SetableBackend) { +func RegistryImplementationTests(t *testing.T, backend api.Backend, timeSource ticker.SetableBackend, scheduler schedulerApi.Backend) { EnsureRegistryEmpty(t, backend) - testRegistryEntityNodes(t, backend, timeSource) + testRegistryEntityNodes(t, backend, timeSource, scheduler) // Runtime registry tests are after the entity/node tests to avoid // interacting with the scheduler as much as possible. @@ -39,12 +40,12 @@ func RegistryImplementationTests(t *testing.T, backend api.Backend, timeSource e }) } -func testRegistryEntityNodes(t *testing.T, backend api.Backend, timeSource epochtime.SetableBackend) { // nolint: gocyclo +func testRegistryEntityNodes(t *testing.T, backend api.Backend, timeSource ticker.SetableBackend, scheduler schedulerApi.Backend) { // nolint: gocyclo // Generate the entities used for the test cases. entities, err := NewTestEntities([]byte("testRegistryEntityNodes"), 3) require.NoError(t, err, "NewTestEntities") - epoch, err := timeSource.GetEpoch(context.Background(), 0) + epoch, err := scheduler.GetEpoch(context.Background(), 0) require.NoError(t, err, "GetEpoch") // All of these tests are combined because the Entity and Node structures @@ -100,7 +101,7 @@ func testRegistryEntityNodes(t *testing.T, backend api.Backend, timeSource epoch nodes := make([][]*TestNode, 0, len(entities)) for i, v := range entities { // Stagger the expirations so that it's possible to test it. - entityNodes, err := v.NewTestNodes(i+1, 1, nil, epoch+epochtime.EpochTime(i)+1) + entityNodes, err := v.NewTestNodes(i+1, 1, nil, schedulerApi.EpochTime(epoch+uint64(i)+1)) require.NoError(t, err, "NewTestNodes") nodes = append(nodes, entityNodes) @@ -155,7 +156,7 @@ func testRegistryEntityNodes(t *testing.T, backend api.Backend, timeSource epoch require := require.New(t) expectedNodeList := getExpectedNodeList() - epoch = epochtimeTests.MustAdvanceEpoch(t, timeSource, 1) + tickerTests.MustAdvanceEpoch(t, timeSource, scheduler) registeredNodes, nerr := backend.GetNodes(context.Background()) require.NoError(nerr, "GetNodes") @@ -170,7 +171,7 @@ func testRegistryEntityNodes(t *testing.T, backend api.Backend, timeSource epoch expectedDeregEvents := len(nodes[0]) deregisteredNodes := make(map[signature.MapKey]*node.Node) - epoch = epochtimeTests.MustAdvanceEpoch(t, timeSource, 1) + tickerTests.MustAdvanceEpoch(t, timeSource, scheduler) for i := 0; i < expectedDeregEvents; i++ { select { @@ -341,7 +342,7 @@ type TestNode struct { // NewTestNodes returns the specified number of TestNodes, generated // deterministically using the entity's public key as the seed. -func (ent *TestEntity) NewTestNodes(nCompute int, nStorage int, runtimes []*TestRuntime, expiration epochtime.EpochTime) ([]*TestNode, error) { +func (ent *TestEntity) NewTestNodes(nCompute int, nStorage int, runtimes []*TestRuntime, expiration schedulerApi.EpochTime) ([]*TestNode, error) { if nCompute <= 0 || nStorage <= 0 || nCompute > 254 || nStorage > 254 { return nil, errors.New("registry/tests: test node count out of bounds") } @@ -533,7 +534,7 @@ func BulkPopulate(t *testing.T, backend api.Backend, runtimes []*TestRuntime, se numCompute := int(runtimes[0].Runtime.ReplicaGroupSize + runtimes[0].Runtime.ReplicaGroupBackupSize) numStorage := int(runtimes[0].Runtime.StorageGroupSize) - nodes, err := entity.NewTestNodes(numCompute, numStorage, runtimes, epochtime.EpochInvalid) + nodes, err := entity.NewTestNodes(numCompute, numStorage, runtimes, schedulerApi.EpochInvalid) require.NoError(err, "NewTestNodes") ret := make([]*node.Node, 0, numCompute+numStorage) diff --git a/go/roothash/init.go b/go/roothash/init.go index 2533eb3bb3a..9f696ed4505 100644 --- a/go/roothash/init.go +++ b/go/roothash/init.go @@ -12,12 +12,12 @@ import ( beacon "github.com/oasislabs/ekiden/go/beacon/api" commonFlags "github.com/oasislabs/ekiden/go/ekiden/cmd/common/flags" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" registry "github.com/oasislabs/ekiden/go/registry/api" "github.com/oasislabs/ekiden/go/roothash/api" "github.com/oasislabs/ekiden/go/roothash/tendermint" scheduler "github.com/oasislabs/ekiden/go/scheduler/api" "github.com/oasislabs/ekiden/go/tendermint/service" + ticker "github.com/oasislabs/ekiden/go/ticker/api" ) const ( @@ -28,7 +28,7 @@ const ( func New( ctx context.Context, dataDir string, - timeSource epochtime.Backend, + timeSource ticker.Backend, scheduler scheduler.Backend, registry registry.Backend, beacon beacon.Backend, diff --git a/go/roothash/tendermint/tendermint.go b/go/roothash/tendermint/tendermint.go index d8b9a87b77a..8ac98cf5a6d 100644 --- a/go/roothash/tendermint/tendermint.go +++ b/go/roothash/tendermint/tendermint.go @@ -19,7 +19,6 @@ import ( "github.com/oasislabs/ekiden/go/common/crypto/signature" "github.com/oasislabs/ekiden/go/common/logging" "github.com/oasislabs/ekiden/go/common/pubsub" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" "github.com/oasislabs/ekiden/go/roothash/api" "github.com/oasislabs/ekiden/go/roothash/api/block" "github.com/oasislabs/ekiden/go/roothash/api/commitment" @@ -27,6 +26,7 @@ import ( app "github.com/oasislabs/ekiden/go/tendermint/apps/roothash" schedulerapp "github.com/oasislabs/ekiden/go/tendermint/apps/scheduler" "github.com/oasislabs/ekiden/go/tendermint/service" + ticker "github.com/oasislabs/ekiden/go/ticker/api" ) const ( @@ -388,7 +388,7 @@ func (r *tendermintBackend) worker(ctx context.Context) { // nolint: gocyclo func New( ctx context.Context, dataDir string, - timeSource epochtime.Backend, + timeSource ticker.Backend, beac beacon.Backend, service service.TendermintService, roundTimeout time.Duration, diff --git a/go/roothash/tests/tester.go b/go/roothash/tests/tester.go index 03eb611d445..9cafc0cdd0f 100644 --- a/go/roothash/tests/tester.go +++ b/go/roothash/tests/tester.go @@ -12,8 +12,6 @@ import ( "github.com/oasislabs/ekiden/go/common" "github.com/oasislabs/ekiden/go/common/crypto/hash" "github.com/oasislabs/ekiden/go/common/crypto/signature" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" - epochtimeTests "github.com/oasislabs/ekiden/go/epochtime/tests" registry "github.com/oasislabs/ekiden/go/registry/api" registryTests "github.com/oasislabs/ekiden/go/registry/tests" "github.com/oasislabs/ekiden/go/roothash/api" @@ -22,6 +20,8 @@ import ( scheduler "github.com/oasislabs/ekiden/go/scheduler/api" storage "github.com/oasislabs/ekiden/go/storage/api" "github.com/oasislabs/ekiden/go/storage/mkvs/urkel" + ticker "github.com/oasislabs/ekiden/go/ticker/api" + tickerTests "github.com/oasislabs/ekiden/go/ticker/tests" ) const ( @@ -40,7 +40,7 @@ type runtimeState struct { // RootHashImplementationTests exercises the basic functionality of a // roothash backend. -func RootHashImplementationTests(t *testing.T, backend api.Backend, epochtime epochtime.SetableBackend, scheduler scheduler.Backend, storage storage.Backend, registry registry.Backend) { +func RootHashImplementationTests(t *testing.T, backend api.Backend, timeSource ticker.SetableBackend, scheduler scheduler.Backend, storage storage.Backend, registry registry.Backend) { seedBase := []byte("RootHashImplementationTests") require := require.New(t) @@ -85,7 +85,7 @@ func RootHashImplementationTests(t *testing.T, backend api.Backend, epochtime ep }) } t.Run("EpochTransitionBlock", func(t *testing.T) { - testEpochTransitionBlock(t, backend, epochtime, scheduler, rtStates) + testEpochTransitionBlock(t, backend, timeSource, scheduler, rtStates) }) t.Run("SucessfulRound", func(t *testing.T) { testSucessfulRound(t, backend, storage, rtStates) @@ -130,7 +130,7 @@ func testGenesisBlock(t *testing.T, backend api.Backend, state *runtimeState) { require.EqualValues(genesisBlock, blk, "retreived block is genesis block") } -func testEpochTransitionBlock(t *testing.T, backend api.Backend, epochtime epochtime.SetableBackend, scheduler scheduler.Backend, states []*runtimeState) { +func testEpochTransitionBlock(t *testing.T, backend api.Backend, timeSource ticker.SetableBackend, scheduler scheduler.Backend, states []*runtimeState) { require := require.New(t) // Before an epoch transition there should just be a genesis block. @@ -142,8 +142,8 @@ func testEpochTransitionBlock(t *testing.T, backend api.Backend, epochtime epoch v.genesisBlock = genesisBlock } - // Advance the epoch, get the committee. - epoch, err := epochtime.GetEpoch(context.Background(), 0) + // GetEpoch. + epoch, err := scheduler.GetEpoch(context.Background(), 0) require.NoError(err, "GetEpoch") // Subscribe to blocks for all of the runtimes. @@ -158,7 +158,7 @@ func testEpochTransitionBlock(t *testing.T, backend api.Backend, epochtime epoch } // Advance the epoch. - epochtimeTests.MustAdvanceEpoch(t, epochtime, 1) + tickerTests.MustAdvanceEpoch(t, timeSource, scheduler) // Check for the expected post-epoch transition events. for i, state := range states { @@ -167,7 +167,7 @@ func testEpochTransitionBlock(t *testing.T, backend api.Backend, epochtime epoch } } -func (s *runtimeState) testEpochTransitionBlock(t *testing.T, scheduler scheduler.Backend, epoch epochtime.EpochTime, ch <-chan *api.AnnotatedBlock) { +func (s *runtimeState) testEpochTransitionBlock(t *testing.T, scheduler scheduler.Backend, epoch uint64, ch <-chan *api.AnnotatedBlock) { require := require.New(t) nodes := make(map[signature.MapKey]*registryTests.TestNode) @@ -349,7 +349,7 @@ type testCommittee struct { func mustGetCommittee( t *testing.T, rt *registryTests.TestRuntime, - epoch epochtime.EpochTime, + epoch uint64, sched scheduler.Backend, nodes map[signature.MapKey]*registryTests.TestNode, ) (computeCommittee *testCommittee, mergeCommittee *testCommittee) { @@ -361,7 +361,7 @@ func mustGetCommittee( for { select { case committee := <-ch: - if committee.ValidFor < epoch { + if uint64(committee.ValidFor) < epoch { continue } if !rt.Runtime.ID.Equal(committee.RuntimeID) { diff --git a/go/scheduler/api/api.go b/go/scheduler/api/api.go index d4ae46c76e6..a46bbecf283 100644 --- a/go/scheduler/api/api.go +++ b/go/scheduler/api/api.go @@ -10,7 +10,6 @@ import ( "github.com/oasislabs/ekiden/go/common/crypto/hash" "github.com/oasislabs/ekiden/go/common/crypto/signature" "github.com/oasislabs/ekiden/go/common/pubsub" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" ) var ( @@ -21,6 +20,14 @@ var ( ErrInvalidRole = errors.New("scheduler: invalid role") ) +// EpochTime is the number of scheduler intervals (epochs) since a fixed instant +// in time (epoch date). +// TODO: there will be one per committee. +type EpochTime uint64 + +// EpochInvalid is the placeholder invalid epoch. +const EpochInvalid EpochTime = 0xffffffffffffffff // ~50 quadrillion years away. + // Role is the role a given node plays in a committee. type Role uint8 @@ -126,7 +133,7 @@ type Committee struct { RuntimeID signature.PublicKey `codec:"runtime_id"` // ValidFor is the epoch for which the committee is valid. - ValidFor epochtime.EpochTime `codec:"valid_for"` + ValidFor EpochTime `codec:"valid_for"` } // String returns a string representation of a Committee. @@ -163,6 +170,9 @@ type Backend interface { // be sent immediately. WatchCommittees() (<-chan *Committee, *pubsub.Subscription) + // GetEpoch returns the current epoch. + GetEpoch(context.Context, int64) (uint64, error) + // Cleanup cleans up the scheduler backend. Cleanup() } diff --git a/go/scheduler/init.go b/go/scheduler/init.go index a30b9ef6d37..5a92949f731 100644 --- a/go/scheduler/init.go +++ b/go/scheduler/init.go @@ -11,17 +11,17 @@ import ( beacon "github.com/oasislabs/ekiden/go/beacon/api" commonFlags "github.com/oasislabs/ekiden/go/ekiden/cmd/common/flags" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" registry "github.com/oasislabs/ekiden/go/registry/api" "github.com/oasislabs/ekiden/go/scheduler/api" "github.com/oasislabs/ekiden/go/scheduler/tendermint" "github.com/oasislabs/ekiden/go/tendermint/service" + ticker "github.com/oasislabs/ekiden/go/ticker/api" ) const cfgDebugBypassStake = "scheduler.debug.bypass_stake" // nolint: gosec // New constructs a new Backend based on the configuration flags. -func New(ctx context.Context, timeSource epochtime.Backend, reg registry.Backend, beacon beacon.Backend, service service.TendermintService) (api.Backend, error) { +func New(ctx context.Context, timeSource ticker.Backend, reg registry.Backend, beacon beacon.Backend, service service.TendermintService) (api.Backend, error) { backend := commonFlags.ConsensusBackend() switch strings.ToLower(backend) { case tendermint.BackendName: diff --git a/go/scheduler/tendermint/tendermint.go b/go/scheduler/tendermint/tendermint.go index 19767931e1c..8c26f475045 100644 --- a/go/scheduler/tendermint/tendermint.go +++ b/go/scheduler/tendermint/tendermint.go @@ -11,7 +11,6 @@ import ( "github.com/oasislabs/ekiden/go/common/crypto/signature" "github.com/oasislabs/ekiden/go/common/logging" "github.com/oasislabs/ekiden/go/common/pubsub" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" "github.com/oasislabs/ekiden/go/scheduler/api" tmapi "github.com/oasislabs/ekiden/go/tendermint/api" beaconapp "github.com/oasislabs/ekiden/go/tendermint/apps/beacon" @@ -19,6 +18,7 @@ import ( app "github.com/oasislabs/ekiden/go/tendermint/apps/scheduler" stakingapp "github.com/oasislabs/ekiden/go/tendermint/apps/staking" "github.com/oasislabs/ekiden/go/tendermint/service" + ticker "github.com/oasislabs/ekiden/go/ticker/api" ) // BackendName is the name of this implementation. @@ -38,6 +38,21 @@ type tendermintScheduler struct { func (s *tendermintScheduler) Cleanup() { } +func (s *tendermintScheduler) GetEpoch(ctx context.Context, height int64) (uint64, error) { + raw, err := s.service.Query(app.QueryGetEpoch, nil, height) + if err != nil { + return 0, err + } + + var epoch uint64 + err = cbor.Unmarshal(raw, &epoch) + if err != nil { + return 0, err + } + + return epoch, nil +} + func (s *tendermintScheduler) GetCommittees(ctx context.Context, id signature.PublicKey, height int64) ([]*api.Committee, error) { raw, err := s.service.Query(app.QueryAllCommittees, id, height) if err != nil { @@ -163,7 +178,7 @@ func (s *tendermintScheduler) onEventDataNewBlock(ctx context.Context, ev tmtype // New constracts a new tendermint-based scheduler Backend instance. func New(ctx context.Context, - timeSource epochtime.Backend, + timeSource ticker.Backend, service service.TendermintService, cfg *api.Config, ) (api.Backend, error) { diff --git a/go/scheduler/tests/tester.go b/go/scheduler/tests/tester.go index 30ebec4514f..6524594be56 100644 --- a/go/scheduler/tests/tester.go +++ b/go/scheduler/tests/tester.go @@ -11,18 +11,18 @@ import ( "github.com/oasislabs/ekiden/go/common/crypto/signature" "github.com/oasislabs/ekiden/go/common/node" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" - epochtimeTests "github.com/oasislabs/ekiden/go/epochtime/tests" registry "github.com/oasislabs/ekiden/go/registry/api" registryTests "github.com/oasislabs/ekiden/go/registry/tests" "github.com/oasislabs/ekiden/go/scheduler/api" + ticker "github.com/oasislabs/ekiden/go/ticker/api" + tickerTests "github.com/oasislabs/ekiden/go/ticker/tests" ) const recvTimeout = 1 * time.Second // SchedulerImplementationTests exercises the basic functionality of a // scheduler backend. -func SchedulerImplementationTests(t *testing.T, backend api.Backend, epochtime epochtime.SetableBackend, registry registry.Backend) { +func SchedulerImplementationTests(t *testing.T, backend api.Backend, timeSource ticker.SetableBackend, registry registry.Backend) { seed := []byte("SchedulerImplementationTests") require := require.New(t) @@ -38,7 +38,7 @@ func SchedulerImplementationTests(t *testing.T, backend api.Backend, epochtime e defer sub.Close() // Advance the epoch. - epoch := epochtimeTests.MustAdvanceEpoch(t, epochtime, 1) + epoch := tickerTests.MustAdvanceEpoch(t, timeSource, backend) ensureValidCommittees := func(expectedCompute, expectedStorage, expectedTransactionScheduler int) { var compute, storage, transactionScheduler *api.Committee @@ -46,9 +46,10 @@ func SchedulerImplementationTests(t *testing.T, backend api.Backend, epochtime e for seen < 3 { select { case committee := <-ch: - if committee.ValidFor < epoch { + if uint64(committee.ValidFor) < epoch { continue } + if !rt.Runtime.ID.Equal(committee.RuntimeID) { continue } @@ -70,7 +71,7 @@ func SchedulerImplementationTests(t *testing.T, backend api.Backend, epochtime e requireValidCommitteeMembers(t, committee, rt.Runtime, nodes) require.Equal(rt.Runtime.ID, committee.RuntimeID, "committee is for the correct runtime") // Redundant - require.Equal(epoch, committee.ValidFor, "committee is for current epoch") + require.Equal(epoch, uint64(committee.ValidFor), "committee is for current epoch") seen++ case <-time.After(recvTimeout): @@ -116,7 +117,7 @@ func SchedulerImplementationTests(t *testing.T, backend api.Backend, epochtime e rt.Runtime.StorageGroupSize = 1 rt.MustRegister(t, registry) - epoch = epochtimeTests.MustAdvanceEpoch(t, epochtime, 1) + epoch = tickerTests.MustAdvanceEpoch(t, timeSource, backend) ensureValidCommittees(3, 1, int(rt.Runtime.TransactionSchedulerGroupSize)) diff --git a/go/storage/client/tests/tests.go b/go/storage/client/tests/tests.go index 69ae5e14ae5..e77c694e92b 100644 --- a/go/storage/client/tests/tests.go +++ b/go/storage/client/tests/tests.go @@ -13,17 +13,17 @@ import ( "github.com/oasislabs/ekiden/go/common" "github.com/oasislabs/ekiden/go/common/crypto/hash" "github.com/oasislabs/ekiden/go/common/node" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" - epochtimeTests "github.com/oasislabs/ekiden/go/epochtime/tests" registry "github.com/oasislabs/ekiden/go/registry/api" registryTests "github.com/oasislabs/ekiden/go/registry/tests" scheduler "github.com/oasislabs/ekiden/go/scheduler/api" "github.com/oasislabs/ekiden/go/storage/api" storageClient "github.com/oasislabs/ekiden/go/storage/client" + ticker "github.com/oasislabs/ekiden/go/ticker/api" + tickerTests "github.com/oasislabs/ekiden/go/ticker/tests" ) // ClientWorkerTests implements tests for client worker -func ClientWorkerTests(t *testing.T, beacon beacon.Backend, timeSource epochtime.SetableBackend, registry registry.Backend, scheduler scheduler.Backend) { +func ClientWorkerTests(t *testing.T, beacon beacon.Backend, timeSource ticker.SetableBackend, registry registry.Backend, scheduler scheduler.Backend) { ctx := context.Background() require := require.New(t) seed := []byte("StorageClientTests") @@ -56,7 +56,7 @@ func ClientWorkerTests(t *testing.T, beacon beacon.Backend, timeSource epochtime require.Nil(r, "result should be nil") // Advance the epoch. - epochtimeTests.MustAdvanceEpoch(t, timeSource, 1) + tickerTests.MustAdvanceEpoch(t, timeSource, scheduler) // Wait for initialization <-client.Initialized() diff --git a/go/tendermint/abci/mux.go b/go/tendermint/abci/mux.go index 38d096c5061..c78aa472bf9 100644 --- a/go/tendermint/abci/mux.go +++ b/go/tendermint/abci/mux.go @@ -25,10 +25,11 @@ import ( "github.com/oasislabs/ekiden/go/common/json" "github.com/oasislabs/ekiden/go/common/logging" "github.com/oasislabs/ekiden/go/common/version" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" genesis "github.com/oasislabs/ekiden/go/genesis/api" + scheduler "github.com/oasislabs/ekiden/go/scheduler/api" "github.com/oasislabs/ekiden/go/tendermint/api" "github.com/oasislabs/ekiden/go/tendermint/db" + ticker "github.com/oasislabs/ekiden/go/ticker/api" ) const ( @@ -217,12 +218,12 @@ func (a *ApplicationServer) Pruner() StatePruner { // NewApplicationServer returns a new ApplicationServer, using the provided // directory to persist state. -func NewApplicationServer(ctx context.Context, dataDir string, pruneCfg *PruneConfig) (*ApplicationServer, error) { +func NewApplicationServer(ctx context.Context, dataDir string, pruneCfg *PruneConfig, epochInterval int64) (*ApplicationServer, error) { metricsOnce.Do(func() { prometheus.MustRegister(abciCollectors...) }) - mux, err := newABCIMux(ctx, dataDir, pruneCfg) + mux, err := newABCIMux(ctx, dataDir, pruneCfg, epochInterval) if err != nil { return nil, err } @@ -713,8 +714,8 @@ func (mux *abciMux) extractAppFromTx(tx []byte) (Application, error) { return app, nil } -func newABCIMux(ctx context.Context, dataDir string, pruneCfg *PruneConfig) (*abciMux, error) { - state, err := newApplicationState(ctx, dataDir, pruneCfg) +func newABCIMux(ctx context.Context, dataDir string, pruneCfg *PruneConfig, epochInterval int64) (*abciMux, error) { + state, err := newApplicationState(ctx, dataDir, pruneCfg, epochInterval) if err != nil { return nil, err } @@ -746,6 +747,7 @@ type ApplicationState struct { deliverTxTree *iavl.MutableTree checkTxTree *iavl.MutableTree statePruner StatePruner + epochInterval int64 blockLock sync.RWMutex blockHash []byte @@ -785,32 +787,49 @@ func (s *ApplicationState) CheckTxTree() *iavl.MutableTree { return s.checkTxTree } +// GetEpoch returns the current scheduling epoch. +func (s *ApplicationState) GetEpoch(timeSource ticker.Backend) (scheduler.EpochTime, error) { + blockHeight := s.BlockHeight() + if blockHeight == 0 { + return scheduler.EpochInvalid, errors.New("no epoch in initial blockheight") + } + currentEpoch, err := timeSource.GetTick(s.ctx, blockHeight, s.epochInterval) + if err != nil { + s.logger.Error("GetEpoch: failed to get current epoch", + "err", err, + ) + return scheduler.EpochInvalid, err + } + + return scheduler.EpochTime(currentEpoch), nil +} + // EpochChanged returns true iff the current epoch has changed since the // last block. As a matter of convenience, the current epoch is returned // iff it has changed. -func (s *ApplicationState) EpochChanged(timeSource epochtime.Backend) (bool, epochtime.EpochTime) { +func (s *ApplicationState) EpochChanged(timeSource ticker.Backend) (bool, scheduler.EpochTime) { blockHeight := s.BlockHeight() if blockHeight == 0 { - return false, epochtime.EpochInvalid + return false, scheduler.EpochInvalid } - previousEpoch, err := timeSource.GetEpoch(s.ctx, blockHeight-1) + previousEpoch, err := timeSource.GetTick(s.ctx, blockHeight-1, s.epochInterval) if err != nil { s.logger.Error("EpochChanged: failed to get previous epoch", "err", err, ) - return false, epochtime.EpochInvalid + return false, scheduler.EpochInvalid } - currentEpoch, err := timeSource.GetEpoch(s.ctx, blockHeight) + currentEpoch, err := timeSource.GetTick(s.ctx, blockHeight, s.epochInterval) if err != nil { s.logger.Error("EpochChanged: failed to get current epoch", "err", err, ) - return false, epochtime.EpochInvalid + return false, scheduler.EpochInvalid } if previousEpoch == currentEpoch { - return false, epochtime.EpochInvalid + return false, scheduler.EpochInvalid } s.logger.Debug("EpochChanged: epoch transition detected", @@ -818,7 +837,7 @@ func (s *ApplicationState) EpochChanged(timeSource epochtime.Backend) (bool, epo "epoch", currentEpoch, ) - return true, currentEpoch + return true, scheduler.EpochTime(currentEpoch) } // Genesis returns the ABCI genesis state. @@ -942,7 +961,7 @@ func (s *ApplicationState) metricsWorker() { } } -func newApplicationState(ctx context.Context, dataDir string, pruneCfg *PruneConfig) (*ApplicationState, error) { +func newApplicationState(ctx context.Context, dataDir string, pruneCfg *PruneConfig, epochInterval int64) (*ApplicationState, error) { db, err := db.New(filepath.Join(dataDir, "abci-mux-state"), false) if err != nil { return nil, err @@ -983,6 +1002,7 @@ func newApplicationState(ctx context.Context, dataDir string, pruneCfg *PruneCon deliverTxTree: deliverTxTree, checkTxTree: checkTxTree, statePruner: statePruner, + epochInterval: epochInterval, blockHash: blockHash, blockHeight: blockHeight, metricsCloseCh: make(chan struct{}), diff --git a/go/tendermint/apps/beacon/beacon.go b/go/tendermint/apps/beacon/beacon.go index 303235ffae4..b3f6e972af6 100644 --- a/go/tendermint/apps/beacon/beacon.go +++ b/go/tendermint/apps/beacon/beacon.go @@ -11,10 +11,11 @@ import ( beacon "github.com/oasislabs/ekiden/go/beacon/api" "github.com/oasislabs/ekiden/go/common/logging" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" genesis "github.com/oasislabs/ekiden/go/genesis/api" + scheduler "github.com/oasislabs/ekiden/go/scheduler/api" "github.com/oasislabs/ekiden/go/tendermint/abci" "github.com/oasislabs/ekiden/go/tendermint/api" + ticker "github.com/oasislabs/ekiden/go/ticker/api" ) var ( @@ -30,7 +31,7 @@ type beaconApplication struct { logger *logging.Logger state *abci.ApplicationState - timeSource epochtime.Backend + timeSource ticker.Backend cfg *beacon.Config } @@ -110,7 +111,7 @@ func (app *beaconApplication) queryGetBeacon(s interface{}, r interface{}) ([]by return state.GetBeacon() } -func (app *beaconApplication) onBeaconEpochChange(ctx *abci.Context, epoch epochtime.EpochTime, req types.RequestBeginBlock) error { +func (app *beaconApplication) onBeaconEpochChange(ctx *abci.Context, epoch scheduler.EpochTime, req types.RequestBeginBlock) error { var entropyCtx, entropy []byte switch app.cfg.DebugDeterministic { @@ -172,7 +173,7 @@ func (app *beaconApplication) onNewBeacon(ctx *abci.Context, beacon []byte) erro } // New constructs a new beacon application instance. -func New(timeSource epochtime.Backend, cfg *beacon.Config) abci.Application { +func New(timeSource ticker.Backend, cfg *beacon.Config) abci.Application { app := &beaconApplication{ logger: logging.GetLogger("tendermint/beacon"), timeSource: timeSource, @@ -185,7 +186,7 @@ func New(timeSource epochtime.Backend, cfg *beacon.Config) abci.Application { return app } -func getBeacon(beaconEpoch epochtime.EpochTime, entropyCtx []byte, entropy []byte) []byte { +func getBeacon(beaconEpoch scheduler.EpochTime, entropyCtx []byte, entropy []byte) []byte { var tmp [8]byte binary.LittleEndian.PutUint64(tmp[:], uint64(beaconEpoch)) diff --git a/go/tendermint/apps/epochtime_mock/api.go b/go/tendermint/apps/epochtime_mock/api.go deleted file mode 100644 index a5f0fba3062..00000000000 --- a/go/tendermint/apps/epochtime_mock/api.go +++ /dev/null @@ -1,48 +0,0 @@ -package epochtimemock - -import ( - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" - "github.com/oasislabs/ekiden/go/tendermint/api" -) - -const ( - // TransactionTag is a unique byte used to identify transactions - // for the mock epochtime application. - TransactionTag byte = 0x03 - - // AppName is the ABCI application name. - // - // Note: It must be lexographically before any application that - // uses time keeping. - AppName string = "000_epochtime_mock" - - // QueryGetEpoch is a path for GetLatestBlock query. - QueryGetEpoch = AppName + "/epoch" -) - -var ( - // TagEpoch is an ABCI begin block tag for specifying the set epoch. - TagEpoch = []byte("epochtime_mock.epoch") - - // QueryApp is a query for filtering events processed by - // the mock epochtime application. - QueryApp = api.QueryForEvent([]byte(AppName), api.TagAppNameValue) -) - -// Tx is a transaction to be accepted by the mock epochtime app. -type Tx struct { - _struct struct{} `codec:",omitempty"` // nolint - - *TxSetEpoch `codec:"SetEpoch"` -} - -// TxSetEpoch is a transaction for submitting an epoch to be set. -type TxSetEpoch struct { - Epoch epochtime.EpochTime -} - -// QueryGetEpochResponse is a response to QueryGetEpoch. -type QueryGetEpochResponse struct { - Epoch epochtime.EpochTime - Height int64 -} diff --git a/go/tendermint/apps/epochtime_mock/epochtime_mock.go b/go/tendermint/apps/epochtime_mock/epochtime_mock.go deleted file mode 100644 index ced6d00e2b1..00000000000 --- a/go/tendermint/apps/epochtime_mock/epochtime_mock.go +++ /dev/null @@ -1,186 +0,0 @@ -// Package epochtimemock implements the mock epochtime application. -package epochtimemock - -import ( - "encoding/hex" - - "github.com/pkg/errors" - "github.com/tendermint/iavl" - "github.com/tendermint/tendermint/abci/types" - - "github.com/oasislabs/ekiden/go/common/cbor" - "github.com/oasislabs/ekiden/go/common/logging" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" - genesis "github.com/oasislabs/ekiden/go/genesis/api" - "github.com/oasislabs/ekiden/go/tendermint/abci" - "github.com/oasislabs/ekiden/go/tendermint/api" -) - -var _ abci.Application = (*epochTimeMockApplication)(nil) - -type epochTimeMockApplication struct { - logger *logging.Logger - state *abci.ApplicationState -} - -func (app *epochTimeMockApplication) Name() string { - return AppName -} - -func (app *epochTimeMockApplication) TransactionTag() byte { - return TransactionTag -} - -func (app *epochTimeMockApplication) Blessed() bool { - return false -} - -func (app *epochTimeMockApplication) OnRegister(state *abci.ApplicationState, queryRouter abci.QueryRouter) { - app.state = state - - // Register query handlers. - queryRouter.AddRoute(QueryGetEpoch, nil, app.queryGetEpoch) -} - -func (app *epochTimeMockApplication) OnCleanup() { -} - -func (app *epochTimeMockApplication) SetOption(request types.RequestSetOption) types.ResponseSetOption { - return types.ResponseSetOption{} -} - -func (app *epochTimeMockApplication) GetState(height int64) (interface{}, error) { - return newImmutableState(app.state, height) -} - -func (app *epochTimeMockApplication) queryGetEpoch(s interface{}, r interface{}) ([]byte, error) { - state := s.(*immutableState) - - var ( - response QueryGetEpochResponse - err error - ) - response.Epoch, response.Height, err = state.getEpoch() - if err != nil { - return nil, err - } - - return cbor.Marshal(response), nil -} - -func (app *epochTimeMockApplication) CheckTx(ctx *abci.Context, tx []byte) error { - request := &Tx{} - if err := cbor.Unmarshal(tx, request); err != nil { - app.logger.Error("CheckTx: failed to unmarshal", - "tx", hex.EncodeToString(tx), - ) - return errors.Wrap(err, "epochtime_mock: failed to unmarshal") - } - - if err := app.executeTx(ctx, app.state.CheckTxTree(), request); err != nil { - return err - } - - return nil -} - -func (app *epochTimeMockApplication) ForeignCheckTx(ctx *abci.Context, other abci.Application, tx []byte) error { - return nil -} - -func (app *epochTimeMockApplication) InitChain(ctx *abci.Context, request types.RequestInitChain, doc *genesis.Document) error { - return nil -} - -func (app *epochTimeMockApplication) BeginBlock(ctx *abci.Context, request types.RequestBeginBlock) error { - state := newMutableState(app.state.DeliverTxTree()) - - future, err := state.getFutureEpoch() - if err != nil { - return errors.Wrap(err, "BeginBlock: failed to get future epoch") - } - if future == nil { - return nil - } - defer state.clearFutureEpoch() - - height := app.state.BlockHeight() - if future.Height != height { - app.logger.Error("BeginBlock: height mismatch in defered set", - "height", height, - "expected_height", future.Height, - ) - return errors.New("epochtime_mock: height mismatch in defered set") - } - - app.logger.Info("setting epoch", - "epoch", future.Epoch, - "current_height", height, - ) - - state.setEpoch(future.Epoch, height) - ctx.EmitTag([]byte(app.Name()), api.TagAppNameValue) - ctx.EmitTag(TagEpoch, cbor.Marshal(future.Epoch)) - - return nil -} - -func (app *epochTimeMockApplication) DeliverTx(ctx *abci.Context, tx []byte) error { - request := &Tx{} - if err := cbor.Unmarshal(tx, request); err != nil { - app.logger.Error("DeliverTx: failed to unmarshal", - "tx", hex.EncodeToString(tx), - ) - return errors.Wrap(err, "epochtime_mock: failed to unmarshal") - } - - return app.executeTx(ctx, app.state.DeliverTxTree(), request) -} - -func (app *epochTimeMockApplication) ForeignDeliverTx(ctx *abci.Context, other abci.Application, tx []byte) error { - return nil -} - -func (app *epochTimeMockApplication) EndBlock(request types.RequestEndBlock) (types.ResponseEndBlock, error) { - return types.ResponseEndBlock{}, nil -} - -func (app *epochTimeMockApplication) FireTimer(ctx *abci.Context, timer *abci.Timer) { -} - -func (app *epochTimeMockApplication) executeTx( - ctx *abci.Context, - tree *iavl.MutableTree, - tx *Tx, -) error { - state := newMutableState(tree) - - if tx.TxSetEpoch != nil { - return app.setEpoch(ctx, state, tx.TxSetEpoch.Epoch) - } - return errors.New("epochtime_mock: invalid argument") -} - -func (app *epochTimeMockApplication) setEpoch( - ctx *abci.Context, - state *mutableState, - epoch epochtime.EpochTime, -) error { - height := app.state.BlockHeight() - - app.logger.Info("scheduling epoch transition", - "epoch", epoch, - "current_height", height, - "next_height", height+1, - "is_check_only", ctx.IsCheckOnly(), - ) - - return state.setFutureEpoch(epoch, height+1) -} - -// New constructs a new mock epochtime application instance. -func New() abci.Application { - return &epochTimeMockApplication{ - logger: logging.GetLogger("tendermint/epochtime_mock"), - } -} diff --git a/go/tendermint/apps/epochtime_mock/state.go b/go/tendermint/apps/epochtime_mock/state.go deleted file mode 100644 index 5306d8b0d18..00000000000 --- a/go/tendermint/apps/epochtime_mock/state.go +++ /dev/null @@ -1,118 +0,0 @@ -package epochtimemock - -import ( - "github.com/pkg/errors" - "github.com/tendermint/iavl" - - "github.com/oasislabs/ekiden/go/common/cbor" - "github.com/oasislabs/ekiden/go/epochtime/api" - "github.com/oasislabs/ekiden/go/tendermint/abci" -) - -const ( - // Mock epochtime state. - stateCurrentEpoch = "epochtime_mock/current" - stateFutureEpoch = "epochtime_mock/future" -) - -var ( - _ cbor.Marshaler = (*mockEpochTimeState)(nil) - _ cbor.Unmarshaler = (*mockEpochTimeState)(nil) -) - -type mockEpochTimeState struct { - Epoch api.EpochTime `codec:"epoch"` - Height int64 `codec:"height"` -} - -func (s *mockEpochTimeState) MarshalCBOR() []byte { - return cbor.Marshal(s) -} - -func (s *mockEpochTimeState) UnmarshalCBOR(data []byte) error { - return cbor.Unmarshal(data, s) -} - -type immutableState struct { - *abci.ImmutableState -} - -func (s *immutableState) getEpoch() (api.EpochTime, int64, error) { - _, raw := s.Snapshot.Get([]byte(stateCurrentEpoch)) - if raw == nil { - return api.EpochTime(0), 0, nil - } - - var state mockEpochTimeState - err := state.UnmarshalCBOR(raw) - return state.Epoch, state.Height, err -} - -func (s *immutableState) getFutureEpoch() (*mockEpochTimeState, error) { - _, raw := s.Snapshot.Get([]byte(stateFutureEpoch)) - if raw == nil { - return nil, nil - } - - var state mockEpochTimeState - if err := state.UnmarshalCBOR(raw); err != nil { - return nil, errors.Wrap(err, "epochtime_mock: failed to unmarshal future epoch") - } - return &state, nil -} - -func newImmutableState(state *abci.ApplicationState, version int64) (*immutableState, error) { - inner, err := abci.NewImmutableState(state, version) - if err != nil { - return nil, err - } - - return &immutableState{inner}, nil -} - -type mutableState struct { - *immutableState - - tree *iavl.MutableTree -} - -func (s *mutableState) setEpoch(epoch api.EpochTime, height int64) { - state := mockEpochTimeState{Epoch: epoch, Height: height} - - s.tree.Set( - []byte(stateCurrentEpoch), - state.MarshalCBOR(), - ) -} - -func (s *mutableState) setFutureEpoch(epoch api.EpochTime, height int64) error { - future, err := s.getFutureEpoch() - if err != nil { - return err - } - if future != nil { - return errors.New("epochtime_mock: future epoch already pending") - } - - state := mockEpochTimeState{Epoch: epoch, Height: height} - - s.tree.Set( - []byte(stateFutureEpoch), - state.MarshalCBOR(), - ) - - return nil -} - -func (s *mutableState) clearFutureEpoch() { - s.tree.Remove([]byte(stateFutureEpoch)) -} - -func newMutableState(tree *iavl.MutableTree) *mutableState { - inner := &abci.ImmutableState{Snapshot: tree.ImmutableTree} - - return &mutableState{ - immutableState: &immutableState{inner}, - tree: tree, - } -} diff --git a/go/tendermint/apps/keymanager/keymanager.go b/go/tendermint/apps/keymanager/keymanager.go index 6f36a22ddbe..53e370998cf 100644 --- a/go/tendermint/apps/keymanager/keymanager.go +++ b/go/tendermint/apps/keymanager/keymanager.go @@ -13,13 +13,14 @@ import ( "github.com/oasislabs/ekiden/go/common/json" "github.com/oasislabs/ekiden/go/common/logging" "github.com/oasislabs/ekiden/go/common/node" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" genesis "github.com/oasislabs/ekiden/go/genesis/api" "github.com/oasislabs/ekiden/go/keymanager/api" registry "github.com/oasislabs/ekiden/go/registry/api" + scheduler "github.com/oasislabs/ekiden/go/scheduler/api" "github.com/oasislabs/ekiden/go/tendermint/abci" tmapi "github.com/oasislabs/ekiden/go/tendermint/api" registryapp "github.com/oasislabs/ekiden/go/tendermint/apps/registry" + ticker "github.com/oasislabs/ekiden/go/ticker/api" ) var emptyHashSha3 = sha3.Sum256(nil) @@ -28,7 +29,7 @@ type keymanagerApplication struct { logger *logging.Logger state *abci.ApplicationState - timeSource epochtime.Backend + timeSource ticker.Backend } func (app *keymanagerApplication) Name() string { @@ -167,7 +168,7 @@ func (app *keymanagerApplication) queryGetStatuses(s interface{}, r interface{}) return cbor.Marshal(statuses), nil } -func (app *keymanagerApplication) onEpochChange(ctx *abci.Context, epoch epochtime.EpochTime) error { +func (app *keymanagerApplication) onEpochChange(ctx *abci.Context, epoch scheduler.EpochTime) error { tree := app.state.DeliverTxTree() // Query the runtime and node lists. @@ -350,7 +351,7 @@ func (app *keymanagerApplication) generateStatus(kmrt *registry.Runtime, oldStat return status } -func New(timeSource epochtime.Backend) abci.Application { +func New(timeSource ticker.Backend) abci.Application { return &keymanagerApplication{ logger: logging.GetLogger("tendermint/keymanager"), timeSource: timeSource, diff --git a/go/tendermint/apps/registry/registry.go b/go/tendermint/apps/registry/registry.go index f66db89e4bc..0d4b296fd5a 100644 --- a/go/tendermint/apps/registry/registry.go +++ b/go/tendermint/apps/registry/registry.go @@ -2,7 +2,6 @@ package registry import ( - "context" "encoding/hex" "github.com/pkg/errors" @@ -15,13 +14,14 @@ import ( "github.com/oasislabs/ekiden/go/common/json" "github.com/oasislabs/ekiden/go/common/logging" "github.com/oasislabs/ekiden/go/common/node" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" genesis "github.com/oasislabs/ekiden/go/genesis/api" registry "github.com/oasislabs/ekiden/go/registry/api" + scheduler "github.com/oasislabs/ekiden/go/scheduler/api" staking "github.com/oasislabs/ekiden/go/staking/api" "github.com/oasislabs/ekiden/go/tendermint/abci" "github.com/oasislabs/ekiden/go/tendermint/api" stakingapp "github.com/oasislabs/ekiden/go/tendermint/apps/staking" + ticker "github.com/oasislabs/ekiden/go/ticker/api" ) var _ abci.Application = (*registryApplication)(nil) @@ -30,7 +30,7 @@ type registryApplication struct { logger *logging.Logger state *abci.ApplicationState - timeSource epochtime.Backend + timeSource ticker.Backend cfg *registry.Config } @@ -206,7 +206,7 @@ func (app *registryApplication) EndBlock(request types.RequestEndBlock) (types.R func (app *registryApplication) FireTimer(*abci.Context, *abci.Timer) { } -func (app *registryApplication) onRegistryEpochChanged(ctx *abci.Context, registryEpoch epochtime.EpochTime) error { +func (app *registryApplication) onRegistryEpochChanged(ctx *abci.Context, registryEpoch scheduler.EpochTime) error { state := NewMutableState(app.state.DeliverTxTree()) nodes, err := state.GetNodes() @@ -219,7 +219,7 @@ func (app *registryApplication) onRegistryEpochChanged(ctx *abci.Context, regist var expiredNodes []*node.Node for _, node := range nodes { - if epochtime.EpochTime(node.Expiration) >= registryEpoch { + if scheduler.EpochTime(node.Expiration) >= registryEpoch { continue } expiredNodes = append(expiredNodes, node) @@ -391,11 +391,15 @@ func (app *registryApplication) registerNode( } // Ensure node is not expired. - epoch, err := app.timeSource.GetEpoch(context.Background(), app.state.BlockHeight()) + epoch, err := app.state.GetEpoch(app.timeSource) if err != nil { return err } - if epochtime.EpochTime(node.Expiration) < epoch { + if node.Expiration == 0 { + // If Expiration == 0, register for next epoch. + node.Expiration = uint64(epoch) + 2 + } + if scheduler.EpochTime(node.Expiration) < epoch { return registry.ErrNodeExpired } @@ -472,7 +476,7 @@ func (app *registryApplication) registerRuntime( } // New constructs a new registry application instance. -func New(timeSource epochtime.Backend, cfg *registry.Config) abci.Application { +func New(timeSource ticker.Backend, cfg *registry.Config) abci.Application { return ®istryApplication{ logger: logging.GetLogger("tendermint/registry"), timeSource: timeSource, diff --git a/go/tendermint/apps/roothash/roothash.go b/go/tendermint/apps/roothash/roothash.go index 30842b6f8b5..0481fafc4f7 100644 --- a/go/tendermint/apps/roothash/roothash.go +++ b/go/tendermint/apps/roothash/roothash.go @@ -17,7 +17,6 @@ import ( "github.com/oasislabs/ekiden/go/common/crypto/signature" "github.com/oasislabs/ekiden/go/common/logging" "github.com/oasislabs/ekiden/go/common/node" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" genesis "github.com/oasislabs/ekiden/go/genesis/api" registry "github.com/oasislabs/ekiden/go/registry/api" roothash "github.com/oasislabs/ekiden/go/roothash/api" @@ -28,6 +27,7 @@ import ( "github.com/oasislabs/ekiden/go/tendermint/api" registryapp "github.com/oasislabs/ekiden/go/tendermint/apps/registry" schedulerapp "github.com/oasislabs/ekiden/go/tendermint/apps/scheduler" + ticker "github.com/oasislabs/ekiden/go/ticker/api" ) var ( @@ -55,7 +55,7 @@ type rootHashApplication struct { logger *logging.Logger state *abci.ApplicationState - timeSource epochtime.Backend + timeSource ticker.Backend beacon beacon.Backend roundTimeout time.Duration @@ -160,7 +160,7 @@ func (app *rootHashApplication) BeginBlock(ctx *abci.Context, request types.Requ return nil } -func (app *rootHashApplication) onEpochChange(ctx *abci.Context, epoch epochtime.EpochTime) error { // nolint: gocyclo +func (app *rootHashApplication) onEpochChange(ctx *abci.Context, epoch scheduler.EpochTime) error { // nolint: gocyclo tree := app.state.DeliverTxTree() state := newMutableState(tree) @@ -775,7 +775,7 @@ func (app *rootHashApplication) tryFinalizeMerge( // New constructs a new roothash application instance. func New( ctx context.Context, - timeSource epochtime.Backend, + timeSource ticker.Backend, beacon beacon.Backend, roundTimeout time.Duration, ) abci.Application { diff --git a/go/tendermint/apps/scheduler/api.go b/go/tendermint/apps/scheduler/api.go index efaa9fb444c..7757dcb1361 100644 --- a/go/tendermint/apps/scheduler/api.go +++ b/go/tendermint/apps/scheduler/api.go @@ -17,6 +17,9 @@ const ( // QueryKindsCommittees is a query path for getting the committees of given kinds. QueryKindsCommittees = AppName + "/kinds-committees" + + // QueryGetEpoch is a query path for getting current scheduler epoch. + QueryGetEpoch = AppName + "/epoch" ) var ( diff --git a/go/tendermint/apps/scheduler/scheduler.go b/go/tendermint/apps/scheduler/scheduler.go index 2992d7e1e3e..4563f18a271 100644 --- a/go/tendermint/apps/scheduler/scheduler.go +++ b/go/tendermint/apps/scheduler/scheduler.go @@ -17,7 +17,6 @@ import ( "github.com/oasislabs/ekiden/go/common/crypto/signature" "github.com/oasislabs/ekiden/go/common/logging" "github.com/oasislabs/ekiden/go/common/node" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" genesis "github.com/oasislabs/ekiden/go/genesis/api" registry "github.com/oasislabs/ekiden/go/registry/api" scheduler "github.com/oasislabs/ekiden/go/scheduler/api" @@ -27,6 +26,7 @@ import ( beaconapp "github.com/oasislabs/ekiden/go/tendermint/apps/beacon" registryapp "github.com/oasislabs/ekiden/go/tendermint/apps/registry" stakingapp "github.com/oasislabs/ekiden/go/tendermint/apps/staking" + ticker "github.com/oasislabs/ekiden/go/ticker/api" ) var ( @@ -92,7 +92,7 @@ type schedulerApplication struct { logger *logging.Logger state *abci.ApplicationState - timeSource epochtime.Backend + timeSource ticker.Backend cfg *scheduler.Config } @@ -119,6 +119,7 @@ func (app *schedulerApplication) OnRegister(state *abci.ApplicationState, queryR // Register query handlers. queryRouter.AddRoute(QueryAllCommittees, nil, app.queryAllCommittees) queryRouter.AddRoute(QueryKindsCommittees, []scheduler.CommitteeKind{}, app.queryKindsCommittees) + queryRouter.AddRoute(QueryGetEpoch, nil, app.queryGetEpoch) } func (app *schedulerApplication) OnCleanup() {} @@ -238,6 +239,14 @@ func (app *schedulerApplication) queryAllCommittees(s interface{}, r interface{} return cbor.Marshal(committees), nil } +func (app *schedulerApplication) queryGetEpoch(s interface{}, r interface{}) ([]byte, error) { + epoch, err := app.state.GetEpoch(app.timeSource) + if err != nil { + return nil, err + } + return cbor.Marshal(epoch), nil +} + func (app *schedulerApplication) queryKindsCommittees(s interface{}, r interface{}) ([]byte, error) { state := s.(*immutableState) request := *r.(*[]scheduler.CommitteeKind) @@ -317,7 +326,7 @@ func (app *schedulerApplication) isSuitableMergeWorker(n *node.Node, rt *registr // Operates on consensus connection. // Return error if node should crash. // For non-fatal problems, save a problem condition to the state and return successfully. -func (app *schedulerApplication) elect(ctx *abci.Context, request types.RequestBeginBlock, epoch epochtime.EpochTime, beacon []byte, entityStake *stakeAccumulator, rt *registry.Runtime, nodes []*node.Node, kind scheduler.CommitteeKind) error { +func (app *schedulerApplication) elect(ctx *abci.Context, request types.RequestBeginBlock, epoch scheduler.EpochTime, beacon []byte, entityStake *stakeAccumulator, rt *registry.Runtime, nodes []*node.Node, kind scheduler.CommitteeKind) error { // Only generic compute runtimes need to elect all the committees. if !rt.IsCompute() && kind != scheduler.KindCompute { return nil @@ -443,7 +452,7 @@ func (app *schedulerApplication) elect(ctx *abci.Context, request types.RequestB } // Operates on consensus connection. -func (app *schedulerApplication) electAll(ctx *abci.Context, request types.RequestBeginBlock, epoch epochtime.EpochTime, beacon []byte, entityStake *stakeAccumulator, runtimes []*registry.Runtime, nodes []*node.Node, kind scheduler.CommitteeKind) error { +func (app *schedulerApplication) electAll(ctx *abci.Context, request types.RequestBeginBlock, epoch scheduler.EpochTime, beacon []byte, entityStake *stakeAccumulator, runtimes []*registry.Runtime, nodes []*node.Node, kind scheduler.CommitteeKind) error { for _, runtime := range runtimes { if err := app.elect(ctx, request, epoch, beacon, entityStake, runtime, nodes, kind); err != nil { return err @@ -454,7 +463,7 @@ func (app *schedulerApplication) electAll(ctx *abci.Context, request types.Reque // New constructs a new scheduler application instance. func New( - timeSource epochtime.Backend, + timeSource ticker.Backend, cfg *scheduler.Config, ) abci.Application { return &schedulerApplication{ diff --git a/go/tendermint/apps/ticker_mock/api.go b/go/tendermint/apps/ticker_mock/api.go new file mode 100644 index 00000000000..d13bea54a5f --- /dev/null +++ b/go/tendermint/apps/ticker_mock/api.go @@ -0,0 +1,48 @@ +package tickermock + +import ( + "github.com/oasislabs/ekiden/go/tendermint/api" + ticker "github.com/oasislabs/ekiden/go/ticker/api" +) + +const ( + // TransactionTag is a unique byte used to identify transactions + // for the mock ticker application. + TransactionTag byte = 0x03 + + // AppName is the ABCI application name. + // + // Note: It must be lexographically before any application that + // uses time keeping. + AppName string = "000_ticker_mock" + + // QueryGetTick is a path for GetLatestBlock query. + QueryGetTick = AppName + "/tick" +) + +var ( + // TagTick is an ABCI begin block tag for specifying the set tick. + TagTick = []byte("tickertime_mock.tick") + + // QueryApp is a query for filtering events processed by + // the mock epochtime application. + QueryApp = api.QueryForEvent([]byte(AppName), api.TagAppNameValue) +) + +// Tx is a transaction to be accepted by the mock ticker app. +type Tx struct { + _struct struct{} `codec:",omitempty"` // nolint + + *TxDoTick `codec:"DoTick"` +} + +// TxDoTick is a transaction for triggering a tick. +type TxDoTick struct { + // Nonce is ued to avoid duplicate DoTick transactions to be ignored by the application. + Nonce uint64 +} + +// QueryGetTickResponse is a response to QueryGetTick. +type QueryGetTickResponse struct { + Tick ticker.TickTime +} diff --git a/go/tendermint/apps/ticker_mock/state.go b/go/tendermint/apps/ticker_mock/state.go new file mode 100644 index 00000000000..782e64de315 --- /dev/null +++ b/go/tendermint/apps/ticker_mock/state.go @@ -0,0 +1,143 @@ +package tickermock + +import ( + "github.com/pkg/errors" + "github.com/tendermint/iavl" + + "github.com/oasislabs/ekiden/go/common/cbor" + "github.com/oasislabs/ekiden/go/tendermint/abci" + "github.com/oasislabs/ekiden/go/ticker/api" +) + +const ( + // Mock ticker state. + stateCurrentTick = "ticker_mock/current" +) + +var ( + _ cbor.Marshaler = (*mockTickerState)(nil) + _ cbor.Unmarshaler = (*mockTickerState)(nil) +) + +type mockTickerState struct { + Tick api.TickTime `codec:"tick"` + TickScheduled bool `codec:"tick_scheduled"` +} + +func (s *mockTickerState) MarshalCBOR() []byte { + return cbor.Marshal(s) +} + +func (s *mockTickerState) UnmarshalCBOR(data []byte) error { + return cbor.Unmarshal(data, s) +} + +type immutableState struct { + *abci.ImmutableState +} + +func (s *immutableState) getTick() (api.TickTime, error) { + _, raw := s.Snapshot.Get([]byte(stateCurrentTick)) + if raw == nil { + return api.TickTime(0), nil + } + + var state mockTickerState + err := state.UnmarshalCBOR(raw) + return state.Tick, err +} + +func (s *immutableState) isTickScheduled() (bool, error) { + _, raw := s.Snapshot.Get([]byte(stateCurrentTick)) + if raw == nil { + return false, nil + } + + var state mockTickerState + if err := state.UnmarshalCBOR(raw); err != nil { + return false, errors.Wrap(err, "ticker_settable: failed to check scheduled tick") + } + return state.TickScheduled, nil +} + +func newImmutableState(state *abci.ApplicationState, version int64) (*immutableState, error) { + inner, err := abci.NewImmutableState(state, version) + if err != nil { + return nil, err + } + + return &immutableState{inner}, nil +} + +type mutableState struct { + *immutableState + + tree *iavl.MutableTree +} + +func (s *mutableState) doTick() (api.TickTime, error) { + tick, err := s.getTick() + if err != nil { + return api.TickTime(0), errors.Wrap(err, "ticker_settable: failed to do tick") + } + state := mockTickerState{Tick: tick + 1} + + s.tree.Set( + []byte(stateCurrentTick), + state.MarshalCBOR(), + ) + + return tick + 1, nil +} + +func (s *mutableState) scheduleTick() error { + scheduledTick, err := s.isTickScheduled() + if err != nil { + return errors.Wrap(err, "ticker_settable: failed to check if tick is scheduled") + } + if scheduledTick { + return errors.New("ticker_settable: tick already scheduled") + } + + tick, err := s.getTick() + if err != nil { + return errors.Wrap(err, "ticker_settable: failed to get current tick state") + } + + state := mockTickerState{Tick: tick, TickScheduled: true} + s.tree.Set( + []byte(stateCurrentTick), + state.MarshalCBOR(), + ) + + return nil +} + +func (s *mutableState) clearScheduledTick() error { + _, raw := s.Snapshot.Get([]byte(stateCurrentTick)) + if raw == nil { + return errors.New("ticker_settable: failed to get current tick state") + } + + var state mockTickerState + if err := state.UnmarshalCBOR(raw); err != nil { + return errors.Wrap(err, "ticker_settable: failed to check scheduled tick") + } + + state = mockTickerState{Tick: state.Tick, TickScheduled: false} + s.tree.Set( + []byte(stateCurrentTick), + state.MarshalCBOR(), + ) + + return nil +} + +func newMutableState(tree *iavl.MutableTree) *mutableState { + inner := &abci.ImmutableState{Snapshot: tree.ImmutableTree} + + return &mutableState{ + immutableState: &immutableState{inner}, + tree: tree, + } +} diff --git a/go/tendermint/apps/ticker_mock/ticker_mock.go b/go/tendermint/apps/ticker_mock/ticker_mock.go new file mode 100644 index 00000000000..e9680ae2252 --- /dev/null +++ b/go/tendermint/apps/ticker_mock/ticker_mock.go @@ -0,0 +1,173 @@ +// Package tickermock implements the mock ticker application. +package tickermock + +import ( + "encoding/hex" + + "github.com/pkg/errors" + "github.com/tendermint/iavl" + "github.com/tendermint/tendermint/abci/types" + + "github.com/oasislabs/ekiden/go/common/cbor" + "github.com/oasislabs/ekiden/go/common/logging" + genesis "github.com/oasislabs/ekiden/go/genesis/api" + "github.com/oasislabs/ekiden/go/tendermint/abci" + "github.com/oasislabs/ekiden/go/tendermint/api" +) + +var _ abci.Application = (*tickerMockApplication)(nil) + +type tickerMockApplication struct { + logger *logging.Logger + state *abci.ApplicationState +} + +func (app *tickerMockApplication) Name() string { + return AppName +} + +func (app *tickerMockApplication) TransactionTag() byte { + return TransactionTag +} + +func (app *tickerMockApplication) Blessed() bool { + return false +} + +func (app *tickerMockApplication) OnRegister(state *abci.ApplicationState, queryRouter abci.QueryRouter) { + app.state = state + + // Register query handlers. + queryRouter.AddRoute(QueryGetTick, nil, app.queryGetTick) +} + +func (app *tickerMockApplication) OnCleanup() { +} + +func (app *tickerMockApplication) SetOption(request types.RequestSetOption) types.ResponseSetOption { + return types.ResponseSetOption{} +} + +func (app *tickerMockApplication) GetState(height int64) (interface{}, error) { + return newImmutableState(app.state, height) +} + +func (app *tickerMockApplication) queryGetTick(s interface{}, r interface{}) ([]byte, error) { + state := s.(*immutableState) + + var ( + response QueryGetTickResponse + err error + ) + response.Tick, err = state.getTick() + if err != nil { + return nil, err + } + + return cbor.Marshal(response), nil +} + +func (app *tickerMockApplication) CheckTx(ctx *abci.Context, tx []byte) error { + request := &Tx{} + if err := cbor.Unmarshal(tx, request); err != nil { + app.logger.Error("CheckTx: failed to unmarshal", + "tx", hex.EncodeToString(tx), + ) + return errors.Wrap(err, "ticker_mock: failed to unmarshal") + } + + if err := app.executeTx(ctx, app.state.CheckTxTree(), request); err != nil { + return err + } + + return nil +} + +func (app *tickerMockApplication) ForeignCheckTx(ctx *abci.Context, other abci.Application, tx []byte) error { + return nil +} + +func (app *tickerMockApplication) InitChain(ctx *abci.Context, request types.RequestInitChain, doc *genesis.Document) error { + return nil +} + +func (app *tickerMockApplication) BeginBlock(ctx *abci.Context, request types.RequestBeginBlock) error { + state := newMutableState(app.state.DeliverTxTree()) + + isScheduled, err := state.isTickScheduled() + if err != nil { + return errors.Wrap(err, "BeginBlock: failed to get scheduled tick") + } + if !isScheduled { + return nil + } + defer state.clearScheduledTick() // nolint: errcheck + + app.logger.Info("doing tick") + + tick, err := state.doTick() + if err != nil { + return errors.Wrap(err, "BeginBlock: failed to do tick") + } + ctx.EmitTag([]byte(app.Name()), api.TagAppNameValue) + ctx.EmitTag(TagTick, cbor.Marshal(tick)) + + return nil +} + +func (app *tickerMockApplication) DeliverTx(ctx *abci.Context, tx []byte) error { + request := &Tx{} + if err := cbor.Unmarshal(tx, request); err != nil { + app.logger.Error("DeliverTx: failed to unmarshal", + "tx", hex.EncodeToString(tx), + ) + return errors.Wrap(err, "ticker_mock: failed to unmarshal") + } + + return app.executeTx(ctx, app.state.DeliverTxTree(), request) +} + +func (app *tickerMockApplication) ForeignDeliverTx(ctx *abci.Context, other abci.Application, tx []byte) error { + return nil +} + +func (app *tickerMockApplication) EndBlock(request types.RequestEndBlock) (types.ResponseEndBlock, error) { + return types.ResponseEndBlock{}, nil +} + +func (app *tickerMockApplication) FireTimer(ctx *abci.Context, timer *abci.Timer) { +} + +func (app *tickerMockApplication) executeTx( + ctx *abci.Context, + tree *iavl.MutableTree, + tx *Tx, +) error { + state := newMutableState(tree) + + if tx.TxDoTick != nil { + return app.doTick(ctx, state) + } + return errors.New("ticker_mock: invalid argument") +} + +func (app *tickerMockApplication) doTick( + ctx *abci.Context, + state *mutableState, +) error { + height := app.state.BlockHeight() + + app.logger.Info("scheduling tick", + "current_height", height, + "is_check_only", ctx.IsCheckOnly(), + ) + + return state.scheduleTick() +} + +// New constructs a new mock epochtime application instance. +func New() abci.Application { + return &tickerMockApplication{ + logger: logging.GetLogger("tendermint/ticker_mock"), + } +} diff --git a/go/tendermint/tendermint.go b/go/tendermint/tendermint.go index d3b58844fdb..709fd1ffd78 100644 --- a/go/tendermint/tendermint.go +++ b/go/tendermint/tendermint.go @@ -50,6 +50,13 @@ const ( cfgConsensusSkipTimeoutCommit = "tendermint.consensus.skip_timeout_commit" cfgConsensusEmptyBlockInterval = "tendermint.consensus.empty_block_interval" + // TODO: add support for different epoch intervals: + // - different scheduling intervals, beacon epoch interval, registry epoch interval + // - some invariants need to hold between intervals: + // - beacon & registry epoch intervals should be the shortest scheduling interval (=txscheduler?) + // - the shortest scheduling interval should probably be the GCD of all intervals? + cfgABCIEpochInterval = "tendermint.abci.epoch_interval" + cfgABCIPruneStrategy = "tendermint.abci.prune.strategy" cfgABCIPruneNumKept = "tendermint.abci.prune.num_kept" @@ -336,7 +343,8 @@ func (t *tendermintService) lazyInit() error { pruneNumKept := int64(viper.GetInt(cfgABCIPruneNumKept)) pruneCfg.NumKept = pruneNumKept - t.mux, err = abci.NewApplicationServer(t.ctx, t.dataDir, &pruneCfg) + epochInterval := viper.GetInt64(cfgABCIEpochInterval) + t.mux, err = abci.NewApplicationServer(t.ctx, t.dataDir, &pruneCfg, epochInterval) if err != nil { return err } @@ -836,6 +844,7 @@ func RegisterFlags(cmd *cobra.Command) { cmd.Flags().Duration(cfgConsensusEmptyBlockInterval, 0*time.Second, "tendermint empty block interval") cmd.Flags().String(cfgABCIPruneStrategy, abci.PruneDefault, "ABCI state pruning strategy") cmd.Flags().Int64(cfgABCIPruneNumKept, 3600, "ABCI state versions kept (when applicable)") + cmd.Flags().Int64(cfgABCIEpochInterval, 20, "ABCI epoch interval") cmd.Flags().Bool(cfgP2PSeedMode, false, "run the tendermint node in seed mode") cmd.Flags().String(cfgP2PSeeds, "", "comma-delimited id@host:port tendermint seed nodes") cmd.Flags().Bool(cfgLogDebug, false, "enable tendermint debug logs (very verbose)") @@ -852,6 +861,7 @@ func RegisterFlags(cmd *cobra.Command) { cfgConsensusEmptyBlockInterval, cfgABCIPruneStrategy, cfgABCIPruneNumKept, + cfgABCIEpochInterval, cfgP2PSeedMode, cfgP2PSeeds, cfgLogDebug, diff --git a/go/ticker/api/api.go b/go/ticker/api/api.go new file mode 100644 index 00000000000..1fa1ed448d2 --- /dev/null +++ b/go/ticker/api/api.go @@ -0,0 +1,29 @@ +// Package api implements the Oasis timekeeping API and common types. +package api + +import ( + "context" + + "github.com/oasislabs/ekiden/go/common/pubsub" +) + +// TickTime is the number of intervals (ticks) since a fixed instant in time. +type TickTime uint64 + +// Backend is a timekeeping implementation. +type Backend interface { + // GetTick returns the tick number at the specified block height with specified tick multiplier. + // Calling this method with height `0`, should return the tick of latest known block. + GetTick(ctx context.Context, height int64, multiplier int64) (tick TickTime, err error) + + // WatchTicks returns a channel that produces a message on every `multiplier` ticks. + WatchTicks(multiplier int64) (<-chan TickTime, *pubsub.Subscription) +} + +// SetableBackend is a Backend that supports manually triggering ticks. +type SetableBackend interface { + Backend + + // DoTick triggers a new tick. + DoTick(context.Context) error +} diff --git a/go/ticker/init.go b/go/ticker/init.go new file mode 100644 index 00000000000..ff28b9b2f4d --- /dev/null +++ b/go/ticker/init.go @@ -0,0 +1,54 @@ +// Package ticker implements the Oasis timekeeping backend. +package ticker + +import ( + "context" + "fmt" + "strings" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + + commonFlags "github.com/oasislabs/ekiden/go/ekiden/cmd/common/flags" + "github.com/oasislabs/ekiden/go/tendermint/service" + "github.com/oasislabs/ekiden/go/ticker/api" + "github.com/oasislabs/ekiden/go/ticker/tendermint" + tendermintMock "github.com/oasislabs/ekiden/go/ticker/tendermint_mock" +) + +const ( + cfgTickerDebugSettable = "ticker.debug.settable" + cfgTickerInterval = "ticker.interval" +) + +// New constructs a new Backend based on the configuration flags. +func New(ctx context.Context, tmService service.TendermintService) (api.Backend, error) { + backend := commonFlags.ConsensusBackend() + switch strings.ToLower(backend) { + case tendermint.BackendName: + settable := viper.GetBool(cfgTickerDebugSettable) + tickInterval := viper.GetInt64(cfgTickerInterval) + if settable { + return tendermintMock.New(ctx, tmService) + } + return tendermint.New(ctx, tmService, tickInterval) + default: + return nil, fmt.Errorf("ticker: unsupported backend: '%v'", backend) + } +} + +// RegisterFlags registers the configuration flags with the provided +// command. +func RegisterFlags(cmd *cobra.Command) { + if !cmd.Flags().Parsed() { + cmd.Flags().Bool(cfgTickerDebugSettable, false, "enable settable ticker (should be used for DEBUG purposes only)") + cmd.Flags().Int64(cfgTickerInterval, 8640, "Tick interval (in blocks)") + } + + for _, v := range []string{ + cfgTickerDebugSettable, + cfgTickerInterval, + } { + viper.BindPFlag(v, cmd.Flags().Lookup(v)) //nolint: errcheck + } +} diff --git a/go/ticker/tendermint/tendermint.go b/go/ticker/tendermint/tendermint.go new file mode 100644 index 00000000000..5bf2c57bd69 --- /dev/null +++ b/go/ticker/tendermint/tendermint.go @@ -0,0 +1,130 @@ +// Package tendermint implements the tendermint backed ticker backend. +package tendermint + +import ( + "context" + "sync" + + "github.com/eapache/channels" + tmtypes "github.com/tendermint/tendermint/types" + + "github.com/oasislabs/ekiden/go/common/logging" + "github.com/oasislabs/ekiden/go/common/pubsub" + tmapi "github.com/oasislabs/ekiden/go/tendermint/api" + "github.com/oasislabs/ekiden/go/tendermint/service" + "github.com/oasislabs/ekiden/go/ticker/api" +) + +const ( + // BackendName is the name of this implementation. + BackendName = tmapi.BackendName +) + +var _ api.Backend = (*tendermintBackend)(nil) + +type tendermintBackend struct { + sync.RWMutex + + logger *logging.Logger + + service service.TendermintService + notifier *pubsub.Broker + + tickInterval int64 + + tick api.TickTime +} + +func (t *tendermintBackend) GetTick(ctx context.Context, height int64, multiplier int64) (api.TickTime, error) { + var tick api.TickTime + if height == 0 { + t.RLock() + defer t.RUnlock() + tick = t.tick + } else { + tick = api.TickTime(height / t.tickInterval) + } + tickTime := api.TickTime(int64(tick) / multiplier) + + return tickTime, nil +} + +func (t *tendermintBackend) WatchTicks(multiplier int64) (<-chan api.TickTime, *pubsub.Subscription) { + outCh := make(chan api.TickTime) + typedCh := make(chan api.TickTime) + sub := t.notifier.Subscribe() + sub.Unwrap(typedCh) + + go func() { + defer close(outCh) + + var currentTick api.TickTime + // Always start with sending current tick. + t, ok := <-typedCh + if !ok { + return + } + currentTick = api.TickTime((uint(t) / uint(multiplier))) + outCh <- currentTick + // Send on tick changes + for { + t, ok := <-typedCh + if !ok { + return + } + nextTick := api.TickTime((uint(t) / uint(multiplier))) + if nextTick != currentTick { + outCh <- nextTick + currentTick = nextTick + } + } + }() + + return outCh, sub +} + +func (t *tendermintBackend) worker(ctx context.Context) { + ch, sub := t.service.WatchBlocks() + defer sub.Close() + + for { + block, ok := <-ch + if !ok { + return + } + + t.updateTick(ctx, block) + } +} + +func (t *tendermintBackend) updateTick(ctx context.Context, block *tmtypes.Block) { + t.Lock() + defer t.Unlock() + + t.tick = api.TickTime(block.Header.Height / t.tickInterval) + t.notifier.Broadcast(t.tick) +} + +// New constructs a new tendermint backed ticker Backend instance, +// with the specified tick and epoch intervals. +func New(ctx context.Context, service service.TendermintService, tickInterval int64) (api.Backend, error) { + if err := service.ForceInitialize(); err != nil { + return nil, err + } + + r := &tendermintBackend{ + logger: logging.GetLogger("ticker/tendermint"), + service: service, + tickInterval: tickInterval, + } + r.notifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) { + r.RLock() + defer r.RUnlock() + + ch.In() <- r.tick + }) + + go r.worker(ctx) + + return r, nil +} diff --git a/go/ticker/tendermint_mock/tendermint_mock.go b/go/ticker/tendermint_mock/tendermint_mock.go new file mode 100644 index 00000000000..4efc1a59bc8 --- /dev/null +++ b/go/ticker/tendermint_mock/tendermint_mock.go @@ -0,0 +1,216 @@ +// Package tendermintmock implements the mock (settable) tendermint backed epochtime backend. +package tendermintmock + +import ( + "bytes" + "context" + cryptorand "crypto/rand" + "math/rand" + "sync" + + "github.com/eapache/channels" + "github.com/pkg/errors" + tmtypes "github.com/tendermint/tendermint/types" + + "github.com/oasislabs/ekiden/go/common/cbor" + "github.com/oasislabs/ekiden/go/common/crypto/mathrand" + "github.com/oasislabs/ekiden/go/common/logging" + "github.com/oasislabs/ekiden/go/common/pubsub" + tmapi "github.com/oasislabs/ekiden/go/tendermint/api" + app "github.com/oasislabs/ekiden/go/tendermint/apps/ticker_mock" + "github.com/oasislabs/ekiden/go/tendermint/service" + "github.com/oasislabs/ekiden/go/ticker/api" +) + +const ( + // BackendName is the name of this implementation. + BackendName = "tendermint_mock" +) + +var _ api.Backend = (*tendermintMockBackend)(nil) + +type tendermintMockBackend struct { + sync.RWMutex + + logger *logging.Logger + + service service.TendermintService + notifier *pubsub.Broker + + tick api.TickTime +} + +func (t *tendermintMockBackend) GetTick(ctx context.Context, height int64, multiplier int64) (api.TickTime, error) { + response, err := t.service.Query(app.QueryGetTick, nil, height) + if err != nil { + return 0, errors.Wrap(err, "ticker: get block epoch query failed") + } + + var data app.QueryGetTickResponse + if err := cbor.Unmarshal(response, &data); err != nil { + return 0, errors.Wrap(err, "ticker: get block epoch malformed response") + } + + return api.TickTime(int64(data.Tick) / multiplier), nil +} + +func (t *tendermintMockBackend) WatchTicks(multiplier int64) (<-chan api.TickTime, *pubsub.Subscription) { + outCh := make(chan api.TickTime) + typedCh := make(chan api.TickTime) + sub := t.notifier.Subscribe() + sub.Unwrap(typedCh) + + go func() { + defer close(outCh) + + var currentTick api.TickTime + // Always start with sending current tick. + t, ok := <-typedCh + if !ok { + return + } + currentTick = api.TickTime((uint(t) / uint(multiplier))) + outCh <- currentTick + // Send on tick changes + for { + t, ok := <-typedCh + if !ok { + return + } + nextTick := api.TickTime((uint(t) / uint(multiplier))) + if nextTick != currentTick { + outCh <- nextTick + currentTick = nextTick + } + } + }() + + return outCh, sub +} + +func makeNonce() uint64 { + rng := rand.New(mathrand.New(cryptorand.Reader)) + return rng.Uint64() +} + +func (t *tendermintMockBackend) DoTick(ctx context.Context) error { + tx := app.Tx{ + TxDoTick: &app.TxDoTick{Nonce: makeNonce()}, + } + tick := t.tick + ch, sub := t.WatchTicks(1) + defer sub.Close() + + if err := t.service.BroadcastTx(app.TransactionTag, tx); err != nil { + return errors.Wrap(err, "ticker: do tick failed") + } + for { + select { + case newTick, ok := <-ch: + if !ok { + return context.Canceled + } + if newTick > tick { + return nil + } + case <-ctx.Done(): + return context.Canceled + } + } +} + +func (t *tendermintMockBackend) worker(ctx context.Context) { + // Subscribe to blocks which advance the epoch. + sub, err := t.service.Subscribe("ticker-worker", app.QueryApp) + if err != nil { + t.logger.Error("failed to subscribe", + "err", err, + ) + return + } + defer t.service.Unsubscribe("ticker-worker", app.QueryApp) // nolint: errcheck + + // Populate current tick (if available). + response, err := t.service.Query(app.QueryGetTick, nil, 0) + if err == nil { + var data app.QueryGetTickResponse + if err := cbor.Unmarshal(response, &data); err != nil { + panic("worker: malformed current epoch response") + } + + t.Lock() + t.tick = data.Tick + t.notifier.Broadcast(t.tick) + t.Unlock() + } + + for { + var event interface{} + + select { + case msg := <-sub.Out(): + event = msg.Data() + case <-sub.Cancelled(): + t.logger.Debug("worker: terminating, subscription closed") + return + case <-ctx.Done(): + return + } + + switch ev := event.(type) { + case tmtypes.EventDataNewBlock: + t.onEventDataNewBlock(ctx, ev) + default: + } + } +} + +func (t *tendermintMockBackend) onEventDataNewBlock(ctx context.Context, ev tmtypes.EventDataNewBlock) { + events := ev.ResultBeginBlock.GetEvents() + + for _, tmEv := range events { + if tmEv.GetType() != tmapi.EventTypeEkiden { + continue + } + + for _, pair := range tmEv.GetAttributes() { + if bytes.Equal(pair.GetKey(), app.TagTick) { + var tick api.TickTime + if err := cbor.Unmarshal(pair.GetValue(), &tick); err != nil { + t.logger.Error("worker: malformed mock tick", + "err", err, + ) + continue + } + t.Lock() + t.tick = tick + t.notifier.Broadcast(t.tick) + t.Unlock() + } + } + } +} + +// New constructs a new mock tendermint backed epochtime Backend instance. +func New(ctx context.Context, service service.TendermintService) (api.SetableBackend, error) { + // Initialze and register the tendermint service component. + app := app.New() + if err := service.RegisterApplication(app, nil); err != nil { + return nil, err + } + + t := &tendermintMockBackend{ + logger: logging.GetLogger("ticker/tendermint_mock"), + service: service, + } + t.notifier = pubsub.NewBrokerEx(func(ch *channels.InfiniteChannel) { + t.RLock() + defer t.RUnlock() + + ch.In() <- t.tick + }) + + go t.worker(ctx) + + return t, nil +} diff --git a/go/ticker/tests/mock_tester.go b/go/ticker/tests/mock_tester.go new file mode 100644 index 00000000000..ab1a8947bff --- /dev/null +++ b/go/ticker/tests/mock_tester.go @@ -0,0 +1,89 @@ +// Package tests is a collection of epochtime implementation test cases. +package tests + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + scheduler "github.com/oasislabs/ekiden/go/scheduler/api" + "github.com/oasislabs/ekiden/go/ticker/api" +) + +const recvTimeout = 1 * time.Second + +// TickerSetableImplementationTest exercises the basic functionality of +// a setable (mock) ticker backend. +func TickerSetableImplementationTest(t *testing.T, backend api.Backend) { + require := require.New(t) + + // Ensure that the backend is setable. + require.Implements((*api.SetableBackend)(nil), backend, "ticker time backend is mock") + timeSource := (backend).(api.SetableBackend) + + tick, err := timeSource.GetTick(context.Background(), 0, 1) + require.NoError(err, "GetTick") + + var e api.TickTime + + ch, sub := timeSource.WatchTicks(1) + defer sub.Close() + select { + case e = <-ch: + require.Equal(tick, e, "WatchTicks initial") + case <-time.After(recvTimeout): + t.Fatalf("failed to receive current epoch on WatchTicks") + } + + tick++ + MustAdvanceTicks(t, timeSource, 1) + + select { + case e = <-ch: + require.Equal(tick, e, "WatchTicks after advancing") + case <-time.After(recvTimeout): + t.Fatalf("failed to receive epoch notification after transition") + } + + e, err = timeSource.GetTick(context.Background(), 0, 1) + require.NoError(err, "GetTick after set") + require.Equal(tick, e, "GetTick after set, tick") +} + +// MustAdvanceTicks advances the epoch, and returns +// the new epoch. +func MustAdvanceTicks(t *testing.T, backend api.SetableBackend, advance int) { + require := require.New(t) + + for i := 0; i < advance; i++ { + err := backend.DoTick(context.Background()) + require.NoError(err, "DoTick") + _, err = backend.GetTick(context.Background(), 0, 1) + require.NoError(err, "GetTick") + } +} + +// MustAdvanceEpoch advances the epoch, and returns the new epoch. +func MustAdvanceEpoch(t *testing.T, backend api.SetableBackend, scheduler scheduler.Backend) uint64 { + require := require.New(t) + ctx := context.Background() + + epoch, err := scheduler.GetEpoch(ctx, 0) + require.NoError(err, "GetEpoch") + // TODO: make it not get stuck + for { + err := backend.DoTick(ctx) + require.NoError(err, "DoTick") + + newEpoch, nerr := scheduler.GetEpoch(ctx, 0) + require.NoError(nerr, "GetEpoch") + if epoch != newEpoch { + // After epoch changed, do one more tick. + err = backend.DoTick(ctx) + require.NoError(err, "DoTick") + return newEpoch + } + } +} diff --git a/go/worker/compute/tests/tester.go b/go/worker/compute/tests/tester.go index 0f4c4b4221f..89481389e0f 100644 --- a/go/worker/compute/tests/tester.go +++ b/go/worker/compute/tests/tester.go @@ -6,8 +6,9 @@ import ( "time" "github.com/oasislabs/ekiden/go/common/crypto/signature" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" - epochtimeTests "github.com/oasislabs/ekiden/go/epochtime/tests" + scheduler "github.com/oasislabs/ekiden/go/scheduler/api" + ticker "github.com/oasislabs/ekiden/go/ticker/api" + tickerTests "github.com/oasislabs/ekiden/go/ticker/tests" "github.com/oasislabs/ekiden/go/worker/compute" "github.com/oasislabs/ekiden/go/worker/compute/committee" ) @@ -24,7 +25,8 @@ func WorkerImplementationTests( worker *compute.Worker, runtimeID signature.PublicKey, rtNode *committee.Node, - epochtime epochtime.SetableBackend, + timeSource ticker.SetableBackend, + scheduler scheduler.Backend, ) { // Wait for worker to start and register. <-worker.Initialized() @@ -35,15 +37,15 @@ func WorkerImplementationTests( // Run the various test cases. (Ordering matters.) t.Run("InitialEpochTransition", func(t *testing.T) { - testInitialEpochTransition(t, stateCh, epochtime) + testInitialEpochTransition(t, stateCh, timeSource, scheduler) }) // TODO: Add more tests. } -func testInitialEpochTransition(t *testing.T, stateCh <-chan committee.NodeState, epochtime epochtime.SetableBackend) { +func testInitialEpochTransition(t *testing.T, stateCh <-chan committee.NodeState, timeSource ticker.SetableBackend, scheduler scheduler.Backend) { // Perform an epoch transition, so that the node gets elected leader. - epochtimeTests.MustAdvanceEpoch(t, epochtime, 1) + tickerTests.MustAdvanceEpoch(t, timeSource, scheduler) // Node should transition to WaitingForBatch state. waitForNodeTransition(t, stateCh, committee.WaitingForBatch) diff --git a/go/worker/registration/registration.go b/go/worker/registration/registration.go index 6176ac0f6bf..a3552c74dcf 100644 --- a/go/worker/registration/registration.go +++ b/go/worker/registration/registration.go @@ -17,8 +17,8 @@ import ( "github.com/oasislabs/ekiden/go/common/logging" "github.com/oasislabs/ekiden/go/common/node" "github.com/oasislabs/ekiden/go/ekiden/cmd/common/flags" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" registry "github.com/oasislabs/ekiden/go/registry/api" + ticker "github.com/oasislabs/ekiden/go/ticker/api" workerCommon "github.com/oasislabs/ekiden/go/worker/common" "github.com/oasislabs/ekiden/go/worker/common/p2p" ) @@ -27,18 +27,22 @@ const ( cfgEntityPrivateKey = "worker.entity_private_key" ) +// TODO: this needs to be less than all epoch intervals. +const registrationTickInterval = 10 + // Registration is a service handling worker node registration. type Registration struct { sync.Mutex workerCommonCfg *workerCommon.Config - epochtime epochtime.Backend + timeSource ticker.Backend registry registry.Backend identity *identity.Identity p2p *p2p.P2P entitySigner signature.Signer ctx context.Context + // Bandaid: Idempotent Stop for testing. stopped bool quitCh chan struct{} @@ -59,13 +63,12 @@ func (r *Registration) doNodeRegistration() { } } - // (re-)register the node on each epoch transition. This doesn't - // need to be strict block-epoch time, since it just serves to - // extend the node's expiration. - ch, sub := r.epochtime.WatchEpochs() + // (re-)register the node periodically. This just serves + // to extend the node's expiration. + ch, sub := r.timeSource.WatchTicks(registrationTickInterval) defer sub.Close() - regFn := func(epoch epochtime.EpochTime, retry bool) error { + regFn := func(tick ticker.TickTime, retry bool) error { var off backoff.BackOff switch retry { @@ -85,22 +88,22 @@ func (r *Registration) doNodeRegistration() { // but it's entirely possible to sit around in an infinite // retry loop with no hope of success. return backoff.Retry(func() error { - // Update the epoch if it happens to change while retrying. + // Update the tick if it happens to change while retrying. var ok bool select { - case epoch, ok = <-ch: + case tick, ok = <-ch: if !ok { return context.Canceled } default: } - return r.registerNode(epoch) + return r.registerNode(tick) }, off) } - epoch := <-ch - err := regFn(epoch, true) + tick := <-ch + err := regFn(tick, true) close(r.regCh) if err != nil { // This by definition is a cancellation. @@ -111,8 +114,8 @@ func (r *Registration) doNodeRegistration() { select { case <-r.quitCh: return - case epoch = <-ch: - if err := regFn(epoch, false); err != nil { + case tick = <-ch: + if err := regFn(tick, false); err != nil { r.logger.Error("failed to re-register node", "err", err, ) @@ -137,9 +140,9 @@ func (r *Registration) RegisterRole(hook func(*node.Node) error) { r.roleHooks = append(r.roleHooks, hook) } -func (r *Registration) registerNode(epoch epochtime.EpochTime) error { +func (r *Registration) registerNode(tick ticker.TickTime) error { r.logger.Info("performing node (re-)registration", - "epoch", epoch, + "tick", tick, ) addresses, err := r.workerCommonCfg.GetNodeAddresses() @@ -151,10 +154,9 @@ func (r *Registration) registerNode(epoch epochtime.EpochTime) error { } identityPublic := r.identity.NodeSigner.Public() nodeDesc := node.Node{ - ID: identityPublic, - EntityID: r.entitySigner.Public(), - Expiration: uint64(epoch) + 2, - P2P: r.p2p.Info(), + ID: identityPublic, + EntityID: r.entitySigner.Public(), + P2P: r.p2p.Info(), Certificate: &node.Certificate{ DER: r.identity.TLSCertificate.Certificate[0], }, @@ -228,7 +230,7 @@ func getEntitySigner(dataDir string) (signature.Signer, error) { // New constructs a new worker node registration service. func New( dataDir string, - epochtime epochtime.Backend, + timeSource ticker.Backend, registry registry.Backend, identity *identity.Identity, consensus common.ConsensusBackend, @@ -245,7 +247,7 @@ func New( r := &Registration{ workerCommonCfg: workerCommonCfg, - epochtime: epochtime, + timeSource: timeSource, registry: registry, identity: identity, entitySigner: entitySigner, diff --git a/go/worker/storage/storage.go b/go/worker/storage/storage.go index 1b7cf1f84b1..135578c2ece 100644 --- a/go/worker/storage/storage.go +++ b/go/worker/storage/storage.go @@ -12,7 +12,6 @@ import ( "github.com/oasislabs/ekiden/go/common/grpc" "github.com/oasislabs/ekiden/go/common/logging" "github.com/oasislabs/ekiden/go/common/node" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" genesis "github.com/oasislabs/ekiden/go/genesis/api" registry "github.com/oasislabs/ekiden/go/registry/api" "github.com/oasislabs/ekiden/go/storage" @@ -37,7 +36,6 @@ type Storage struct { // New constructs a new storage worker. func New( - epochtime epochtime.Backend, sb storageApi.Backend, g *grpc.Server, r *registration.Registration, diff --git a/go/worker/txnscheduler/tests/tester.go b/go/worker/txnscheduler/tests/tester.go index 333a96680df..c36d9bdda6d 100644 --- a/go/worker/txnscheduler/tests/tester.go +++ b/go/worker/txnscheduler/tests/tester.go @@ -11,12 +11,13 @@ import ( "github.com/oasislabs/ekiden/go/common/crypto/hash" "github.com/oasislabs/ekiden/go/common/crypto/signature" "github.com/oasislabs/ekiden/go/common/runtime" - epochtime "github.com/oasislabs/ekiden/go/epochtime/api" - epochtimeTests "github.com/oasislabs/ekiden/go/epochtime/tests" roothash "github.com/oasislabs/ekiden/go/roothash/api" "github.com/oasislabs/ekiden/go/roothash/api/block" + scheduler "github.com/oasislabs/ekiden/go/scheduler/api" storage "github.com/oasislabs/ekiden/go/storage/api" "github.com/oasislabs/ekiden/go/storage/mkvs/urkel" + ticker "github.com/oasislabs/ekiden/go/ticker/api" + tickerTests "github.com/oasislabs/ekiden/go/ticker/tests" "github.com/oasislabs/ekiden/go/worker/txnscheduler" "github.com/oasislabs/ekiden/go/worker/txnscheduler/committee" ) @@ -33,9 +34,10 @@ func WorkerImplementationTests( worker *txnscheduler.Worker, runtimeID signature.PublicKey, rtNode *committee.Node, - epochtime epochtime.SetableBackend, + ticker ticker.SetableBackend, roothash roothash.Backend, storage storage.Backend, + scheduler scheduler.Backend, ) { // Wait for worker to start and register. <-worker.Initialized() @@ -46,7 +48,7 @@ func WorkerImplementationTests( // Run the various test cases. (Ordering matters.) t.Run("InitialEpochTransition", func(t *testing.T) { - testInitialEpochTransition(t, stateCh, epochtime) + testInitialEpochTransition(t, stateCh, ticker, scheduler) }) t.Run("QueueCall", func(t *testing.T) { @@ -56,9 +58,9 @@ func WorkerImplementationTests( // TODO: Add more tests. } -func testInitialEpochTransition(t *testing.T, stateCh <-chan committee.NodeState, epochtime epochtime.SetableBackend) { +func testInitialEpochTransition(t *testing.T, stateCh <-chan committee.NodeState, ticker ticker.SetableBackend, scheduler scheduler.Backend) { // Perform an epoch transition, so that the node gets elected leader. - epochtimeTests.MustAdvanceEpoch(t, epochtime, 1) + tickerTests.MustAdvanceEpoch(t, ticker, scheduler) // Node should transition to WaitingForBatch state. waitForNodeTransition(t, stateCh, committee.WaitingForBatch) diff --git a/scripts/benchmark-e2e.sh b/scripts/benchmark-e2e.sh index 53a941a2aac..f1977540b1b 100755 --- a/scripts/benchmark-e2e.sh +++ b/scripts/benchmark-e2e.sh @@ -12,7 +12,7 @@ run_dummy_node_storage_dummy() { --log.level info \ --grpc.port 42261 \ --consensus.backend tendermint \ - --epochtime.backend tendermint_mock \ + --ticker.debug.settable \ --storage.backend memory \ --datadir ${datadir} \ >${LOGDIR}/dummy.log & @@ -26,7 +26,7 @@ run_dummy_node_storage_persistent() { --log.level info \ --grpc.port 42261 \ --consensus.backend tendermint \ - --epochtime.backend tendermint_mock \ + --ticker.debug.settable \ --storage.backend leveldb \ --datadir ${datadir} \ >${LOGDIR}/dummy.log & @@ -40,7 +40,7 @@ run_dummy_node_tendermint() { --log.level info \ --grpc.port 42261 \ --consensus.backend tendermint \ - --epochtime.backend tendermint_mock \ + --ticker.debug.settable \ --storage.backend memory \ --tendermint.consensus.timeout_commit 250ms \ --datadir ${datadir} \ @@ -130,7 +130,7 @@ run_benchmark() { # Advance epoch to elect a new committee. for epoch in $(seq $epochs); do sleep 2 - ${WORKDIR}/go/ekiden/ekiden debug dummy set-epoch --epoch $epoch + ${WORKDIR}/go/ekiden/ekiden debug dummy advance-epoch done # Run the client.