Skip to content

Commit

Permalink
Fix race conditions, send epochNumber in submitTx, refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
matevz committed Feb 14, 2020
1 parent 74744ad commit 71d53be
Show file tree
Hide file tree
Showing 22 changed files with 222 additions and 173 deletions.
8 changes: 8 additions & 0 deletions .changelog/2650.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Send and check expected epoch number during transaction execution

Stress tests revealed some race conditions during transaction execution when
there is an epoch transition. Runtime client now sends `expectedEpochNumber`
parameter in `SubmitTx` call. The transaction scheduler checks whether the
expected epoch matches its local one. Additionally, if state transition occurs
during transaction execution, Executor and Merge committee correctly abort the
transaction.
11 changes: 11 additions & 0 deletions .changelog/2650.internal.1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Replace redundant fields with `Consensus` accessors

`Backend` in `go/consensus/api` contains among others accessors for
`Beacon`, `EpochTime`, `Registry`, `RootHash`, `Scheduler`, and
`KeyManager`. Use those instead of direct references. The following
structs were affected:
- `Node` in `go/cmd/node`,
- `Node` in `go/common/commmittee`,
- `Worker` in `go/common`,
- `clientCommon` in `go/runtime/client`,
- `Group` in `go/worker/common/committee`.
1 change: 1 addition & 0 deletions .changelog/2650.internal.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
e2e/multiple-runtimes: Enable `EpochtimeMock`, add `numComputeWorkers`
2 changes: 1 addition & 1 deletion go/common/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var (

// CommitteeProtocol versions the P2P protocol used by the
// committee members.
CommitteeProtocol = Version{Major: 0, Minor: 7, Patch: 0}
CommitteeProtocol = Version{Major: 0, Minor: 8, Patch: 0}

// ConsensusProtocol versions all data structures and processing used by
// the epochtime, beacon, registry, roothash, etc. modules that are
Expand Down
7 changes: 4 additions & 3 deletions go/control/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package control

import (
"context"
consensus "github.com/oasislabs/oasis-core/go/consensus/api"

"github.com/oasislabs/oasis-core/go/control/api"
epochtime "github.com/oasislabs/oasis-core/go/epochtime/api"
Expand Down Expand Up @@ -62,9 +63,9 @@ Loop:
}

// New creates a new oasis-node debug controller.
func NewDebug(timeSource epochtime.Backend, registry registry.Backend) api.DebugController {
func NewDebug(consensus consensus.Backend) api.DebugController {
return &debugController{
timeSource: timeSource,
registry: registry,
timeSource: consensus.EpochTime(),
registry: consensus.Registry(),
}
}
4 changes: 2 additions & 2 deletions go/epochtime/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ const EpochInvalid EpochTime = 0xffffffffffffffff // ~50 quadrillion years away.

// Backend is a timekeeping implementation.
type Backend interface {
// GetBaseEPoch returns the base epoch.
// GetBaseEpoch returns the base epoch.
GetBaseEpoch(context.Context) (EpochTime, error)

// GetEpoch returns the epoch at the specified block height.
// GetEpoch returns the epoch number at the specified block height.
// Calling this method with height `0`, should return the
// epoch of latest known block.
GetEpoch(context.Context, int64) (EpochTime, error)
Expand Down
46 changes: 12 additions & 34 deletions go/oasis-node/cmd/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"

beacon "github.com/oasislabs/oasis-core/go/beacon/api"
"github.com/oasislabs/oasis-core/go/common"
"github.com/oasislabs/oasis-core/go/common/crash"
"github.com/oasislabs/oasis-core/go/common/crypto/signature"
Expand Down Expand Up @@ -44,7 +43,6 @@ import (
"github.com/oasislabs/oasis-core/go/oasis-node/cmd/common/tracing"
"github.com/oasislabs/oasis-core/go/oasis-node/cmd/debug/supplementarysanity"
registryAPI "github.com/oasislabs/oasis-core/go/registry/api"
roothash "github.com/oasislabs/oasis-core/go/roothash/api"
runtimeClient "github.com/oasislabs/oasis-core/go/runtime/client"
runtimeClientAPI "github.com/oasislabs/oasis-core/go/runtime/client/api"
runtimeRegistry "github.com/oasislabs/oasis-core/go/runtime/registry"
Expand Down Expand Up @@ -107,17 +105,10 @@ type Node struct {

Consensus consensusAPI.Backend

Genesis genesisAPI.Provider
Identity *identity.Identity
Beacon beacon.Backend
Epochtime epochtime.Backend
Registry registryAPI.Backend
RootHash roothash.Backend
Scheduler scheduler.Backend
Sentry sentryAPI.Backend
Staking stakingAPI.Backend
IAS iasAPI.Endpoint
KeyManager keymanagerAPI.Backend
Genesis genesisAPI.Provider
Identity *identity.Identity
Sentry sentryAPI.Backend
IAS iasAPI.Endpoint

RuntimeRegistry runtimeRegistry.Registry
RuntimeClient runtimeClientAPI.RuntimeClient
Expand Down Expand Up @@ -183,9 +174,9 @@ func (n *Node) initBackends() error {

// Initialize and register the internal gRPC services.
grpcSrv := n.grpcInternal.Server()
scheduler.RegisterService(grpcSrv, n.Scheduler)
registryAPI.RegisterService(grpcSrv, n.Registry)
stakingAPI.RegisterService(grpcSrv, n.Staking)
scheduler.RegisterService(grpcSrv, n.Consensus.Scheduler())
registryAPI.RegisterService(grpcSrv, n.Consensus.Registry())
stakingAPI.RegisterService(grpcSrv, n.Consensus.Staking())
consensusAPI.RegisterService(grpcSrv, n.Consensus)

cmdCommon.Logger().Debug("backends initialized")
Expand Down Expand Up @@ -227,13 +218,10 @@ func (n *Node) initWorkers(logger *logging.Logger) error {
dataDir,
compute.Enabled() || workerStorage.Enabled() || workerKeymanager.Enabled(),
n.Identity,
n.RootHash,
n.Registry,
n.Scheduler,
n.svcTmnt,
n.P2P,
n.IAS,
n.KeyManager,
n.Consensus.KeyManager(),
n.RuntimeRegistry,
genesisDoc,
)
Expand All @@ -251,8 +239,8 @@ func (n *Node) initWorkers(logger *logging.Logger) error {
// Initialize the registration worker.
n.RegistrationWorker, err = registration.New(
dataDir,
n.Epochtime,
n.Registry,
n.Consensus.EpochTime(),
n.Consensus.Registry(),
n.Identity,
n.svcTmnt,
n.P2P,
Expand All @@ -279,7 +267,7 @@ func (n *Node) initWorkers(logger *logging.Logger) error {
n.CommonWorker,
n.IAS,
n.RegistrationWorker,
n.KeyManager,
n.Consensus.KeyManager(),
)
if err != nil {
return err
Expand Down Expand Up @@ -600,13 +588,6 @@ func newNode(testNode bool) (*Node, error) {
}
node.svcMgr.Register(node.svcTmnt)
node.Consensus = node.svcTmnt
node.Epochtime = node.Consensus.EpochTime()
node.Beacon = node.Consensus.Beacon()
node.KeyManager = node.Consensus.KeyManager()
node.Registry = node.Consensus.Registry()
node.Staking = node.Consensus.Staking()
node.Scheduler = node.Consensus.Scheduler()
node.RootHash = node.Consensus.RootHash()

// Initialize node backends.
if err = node.initBackends(); err != nil {
Expand Down Expand Up @@ -681,9 +662,6 @@ func newNode(testNode bool) (*Node, error) {
node.RuntimeClient, err = runtimeClient.New(
node.svcMgr.Ctx,
cmdCommon.DataDir(),
node.RootHash,
node.Scheduler,
node.Registry,
node.svcTmnt,
node.RuntimeRegistry,
)
Expand Down Expand Up @@ -723,7 +701,7 @@ func newNode(testNode bool) (*Node, error) {
controlAPI.RegisterService(node.grpcInternal.Server(), node.NodeController)
if flags.DebugDontBlameOasis() {
// Initialize and start the debug controller if we are in debug mode.
node.DebugController = control.NewDebug(node.Epochtime, node.Registry)
node.DebugController = control.NewDebug(node.Consensus)
controlAPI.RegisterDebugService(node.grpcInternal.Server(), node.DebugController)
}

Expand Down
36 changes: 18 additions & 18 deletions go/oasis-node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,14 @@ func testDeregisterEntityRuntime(t *testing.T, node *testNode) {
<-node.RegistrationWorker.Quit()

// Subscribe to node deregistration event.
nodeCh, sub, err := node.Node.Registry.WatchNodes(context.Background())
nodeCh, sub, err := node.Node.Consensus.Registry().WatchNodes(context.Background())
require.NoError(t, err, "WatchNodes")
defer sub.Close()

// Perform an epoch transition to expire the node as otherwise there is no way
// to deregister the entity.
require.Implements(t, (*epochtime.SetableBackend)(nil), node.Epochtime, "epoch time backend is mock")
timeSource := (node.Epochtime).(epochtime.SetableBackend)
require.Implements(t, (*epochtime.SetableBackend)(nil), node.Consensus.EpochTime(), "epoch time backend is mock")
timeSource := (node.Consensus.EpochTime()).(epochtime.SetableBackend)
_ = epochtimeTests.MustAdvanceEpoch(t, timeSource, 2+1+1) // 2 epochs for expiry, 1 for debonding, 1 for removal.

WaitLoop:
Expand All @@ -335,7 +335,7 @@ WaitLoop:
}

// Subscribe to entity deregistration event.
entityCh, sub, err := node.Node.Registry.WatchEntities(context.Background())
entityCh, sub, err := node.Node.Consensus.Registry().WatchEntities(context.Background())
require.NoError(t, err, "WatchEntities")
defer sub.Close()

Expand All @@ -357,7 +357,7 @@ WaitLoop:
require.Error(t, err, "deregister should fail when an entity has runtimes")
require.Equal(t, err, registry.ErrEntityHasRuntimes)

registryTests.EnsureRegistryEmpty(t, node.Node.Registry)
registryTests.EnsureRegistryEmpty(t, node.Node.Consensus.Registry())
}

func testConsensus(t *testing.T, node *testNode) {
Expand All @@ -374,33 +374,33 @@ func testConsensusClient(t *testing.T, node *testNode) {
}

func testEpochTime(t *testing.T, node *testNode) {
epochtimeTests.EpochtimeSetableImplementationTest(t, node.Epochtime)
epochtimeTests.EpochtimeSetableImplementationTest(t, node.Consensus.EpochTime())
}

func testBeacon(t *testing.T, node *testNode) {
timeSource := (node.Epochtime).(epochtime.SetableBackend)
timeSource := (node.Consensus.EpochTime()).(epochtime.SetableBackend)

beaconTests.BeaconImplementationTests(t, node.Beacon, timeSource)
beaconTests.BeaconImplementationTests(t, node.Consensus.Beacon(), timeSource)
}

func testStorage(t *testing.T, node *testNode) {
dataDir, err := ioutil.TempDir("", "oasis-storage-test_")
require.NoError(t, err, "TempDir")
defer os.RemoveAll(dataDir)

storage, err := storage.New(context.Background(), dataDir, testRuntimeID, node.Identity, node.Scheduler, node.Registry)
storage, err := storage.New(context.Background(), dataDir, testRuntimeID, node.Identity, node.Consensus.Scheduler(), node.Consensus.Registry())
require.NoError(t, err, "storage.New")
defer storage.Cleanup()

storageTests.StorageImplementationTests(t, storage, testRuntimeID, 0)
}

func testRegistry(t *testing.T, node *testNode) {
registryTests.RegistryImplementationTests(t, node.Registry, node.Consensus)
registryTests.RegistryImplementationTests(t, node.Consensus.Registry(), node.Consensus)
}

func testScheduler(t *testing.T, node *testNode) {
schedulerTests.SchedulerImplementationTests(t, "", node.Scheduler, node.Consensus)
schedulerTests.SchedulerImplementationTests(t, "", node.Consensus.Scheduler(), node.Consensus)
}

func testSchedulerClient(t *testing.T, node *testNode) {
Expand All @@ -413,7 +413,7 @@ func testSchedulerClient(t *testing.T, node *testNode) {
}

func testStaking(t *testing.T, node *testNode) {
stakingTests.StakingImplementationTests(t, node.Staking, node.Consensus, node.Identity, node.entity, node.entitySigner, testRuntimeID)
stakingTests.StakingImplementationTests(t, node.Consensus.Staking(), node.Consensus, node.Identity, node.entity, node.entitySigner, testRuntimeID)
}

func testStakingClient(t *testing.T, node *testNode) {
Expand All @@ -426,11 +426,11 @@ func testStakingClient(t *testing.T, node *testNode) {
}

func testRootHash(t *testing.T, node *testNode) {
roothashTests.RootHashImplementationTests(t, node.RootHash, node.Consensus, node.Identity)
roothashTests.RootHashImplementationTests(t, node.Consensus.RootHash(), node.Consensus, node.Identity)
}

func testExecutorWorker(t *testing.T, node *testNode) {
timeSource := (node.Epochtime).(epochtime.SetableBackend)
timeSource := (node.Consensus.EpochTime()).(epochtime.SetableBackend)

require.NotNil(t, node.executorCommitteeNode)
executorWorkerTests.WorkerImplementationTests(t, node.ExecutorWorker, node.runtimeID, node.executorCommitteeNode, timeSource)
Expand All @@ -441,7 +441,7 @@ func testStorageWorker(t *testing.T, node *testNode) {
}

func testTransactionSchedulerWorker(t *testing.T, node *testNode) {
timeSource := (node.Epochtime).(epochtime.SetableBackend)
timeSource := (node.Consensus.EpochTime()).(epochtime.SetableBackend)

require.NotNil(t, node.txnschedulerCommitteeNode)
txnschedulerWorkerTests.WorkerImplementationTests(
Expand All @@ -450,7 +450,7 @@ func testTransactionSchedulerWorker(t *testing.T, node *testNode) {
node.runtimeID,
node.txnschedulerCommitteeNode,
timeSource,
node.RootHash,
node.Consensus.RootHash(),
node.RuntimeRegistry.StorageRouter(),
)
}
Expand All @@ -462,12 +462,12 @@ func testClient(t *testing.T, node *testNode) {
func testStorageClientWithNode(t *testing.T, node *testNode) {
ctx := context.Background()

client, err := storageClient.NewStatic(ctx, testRuntimeID, node.Identity, node.Registry, node.Identity.NodeSigner.Public())
client, err := storageClient.NewStatic(ctx, testRuntimeID, node.Identity, node.Consensus.Registry(), node.Identity.NodeSigner.Public())
require.NoError(t, err, "NewStatic")

// Determine the current round. This is required so that we can commit into
// storage at the next (non-finalized) round.
blk, err := node.RootHash.GetLatestBlock(ctx, testRuntimeID, consensusAPI.HeightLatest)
blk, err := node.Consensus.RootHash().GetLatestBlock(ctx, testRuntimeID, consensusAPI.HeightLatest)
require.NoError(t, err, "GetLatestBlock")

storageTests.StorageImplementationTests(t, client, testRuntimeID, blk.Header.Round+1)
Expand Down
Loading

0 comments on commit 71d53be

Please sign in to comment.