Skip to content

Commit

Permalink
Merge pull request #2514 from oasislabs/pro-wh/feature/onerole
Browse files Browse the repository at this point in the history
go node: unite compute, merge, and transaction scheduler roles
  • Loading branch information
pro-wh authored Jan 17, 2020
2 parents a1deea6 + db985d3 commit 194461b
Show file tree
Hide file tree
Showing 24 changed files with 385 additions and 199 deletions.
5 changes: 5 additions & 0 deletions .changelog/2107.breaking.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
go node: Unite compute, merge, and transaction scheduler roles.

We're removing the separation among registering nodes for the compute, merge, and transaction scheduler roles.
You now have to register for and enable all or none of these roles, under a new, broadened, and confusing--you're
welcome--term "compute."
14 changes: 2 additions & 12 deletions go/common/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,10 @@ const (
RoleComputeWorker RolesMask = 1 << 0
// RoleStorageWorker is Oasis storage worker role.
RoleStorageWorker RolesMask = 1 << 1
// RoleTransactionScheduler is Oasis transaction scheduler role.
RoleTransactionScheduler RolesMask = 1 << 2
// RoleKeyManager is the Oasis key manager role.
RoleKeyManager RolesMask = 1 << 3
// RoleMergeWorker is the Oasis merge worker role.
RoleMergeWorker RolesMask = 1 << 4
RoleKeyManager RolesMask = 1 << 2
// RoleValidator is the Oasis validator role.
RoleValidator RolesMask = 1 << 5
RoleValidator RolesMask = 1 << 3

// RoleReserved are all the bits of the Oasis node roles bitmask
// that are reserved and must not be used.
Expand All @@ -104,15 +100,9 @@ func (m RolesMask) String() string {
if m&RoleStorageWorker != 0 {
ret = append(ret, "storage")
}
if m&RoleTransactionScheduler != 0 {
ret = append(ret, "txn_scheduler")
}
if m&RoleKeyManager != 0 {
ret = append(ret, "key_manager")
}
if m&RoleMergeWorker != 0 {
ret = append(ret, "merge")
}
if m&RoleValidator != 0 {
ret = append(ret, "validator")
}
Expand Down
15 changes: 10 additions & 5 deletions go/consensus/tendermint/apps/beacon/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var (
errUnexpectedTimer = errors.New("beacon: unexpected timer")

prodEntropyCtx = []byte("EkB-tmnt")
debugEntropyCtx = []byte("Ekb-Dumm")
DebugEntropyCtx = []byte("Ekb-Dumm")

_ abci.Application = (*beaconApplication)(nil)
)
Expand Down Expand Up @@ -121,11 +121,16 @@ func (app *beaconApplication) onBeaconEpochChange(ctx *abci.Context, epoch epoch
}
case true:
// UNSAFE/DEBUG - Deterministic beacon.
entropyCtx = debugEntropyCtx
entropy = []byte("If you change this, you will fuck up the byzantine tests!!!")
entropyCtx = DebugEntropyCtx
// We're setting this random seed so that we have suitable committee schedules for Byzantine E2E scenarios,
// where we want nodes to be scheduled for only one committee. The permutations derived from this on the first
// epoch need to have (i) an index that's compute worker only and (ii) an index that's merge worker only. See
// /go/oasis-test-runner/scenario/e2e/byzantine.go for the permutations generated from this seed. These
// permutations are generated independently of the deterministic node IDs.
entropy = []byte("If you change this, you will fuck up the byzantine tests!!")
}

b := getBeacon(epoch, entropyCtx, entropy)
b := GetBeacon(epoch, entropyCtx, entropy)

app.logger.Debug("onBeaconEpochChange: generated beacon",
"epoch", epoch,
Expand Down Expand Up @@ -161,7 +166,7 @@ func New() abci.Application {
return app
}

func getBeacon(beaconEpoch epochtime.EpochTime, entropyCtx []byte, entropy []byte) []byte {
func GetBeacon(beaconEpoch epochtime.EpochTime, entropyCtx []byte, entropy []byte) []byte {
var tmp [8]byte
binary.LittleEndian.PutUint64(tmp[:], uint64(beaconEpoch))

Expand Down
65 changes: 46 additions & 19 deletions go/consensus/tendermint/apps/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pkg/errors"
"github.com/tendermint/tendermint/abci/types"

"github.com/oasislabs/oasis-core/go/common"
"github.com/oasislabs/oasis-core/go/common/cbor"
"github.com/oasislabs/oasis-core/go/common/crypto/drbg"
"github.com/oasislabs/oasis-core/go/common/crypto/mathrand"
Expand All @@ -37,12 +38,12 @@ import (
var (
_ abci.Application = (*schedulerApplication)(nil)

rngContextCompute = []byte("EkS-ABCI-Compute")
rngContextStorage = []byte("EkS-ABCI-Storage")
rngContextTransactionScheduler = []byte("EkS-ABCI-TransactionScheduler")
rngContextMerge = []byte("EkS-ABCI-Merge")
rngContextValidators = []byte("EkS-ABCI-Validators")
rngContextEntities = []byte("EkS-ABCI-Entities")
RNGContextCompute = []byte("EkS-ABCI-Compute")
RNGContextStorage = []byte("EkS-ABCI-Storage")
RNGContextTransactionScheduler = []byte("EkS-ABCI-TransactionScheduler")
RNGContextMerge = []byte("EkS-ABCI-Merge")
RNGContextValidators = []byte("EkS-ABCI-Validators")
RNGContextEntities = []byte("EkS-ABCI-Entities")

errUnexpectedTransaction = errors.New("tendermint/scheduler: unexpected transaction")
)
Expand Down Expand Up @@ -376,11 +377,20 @@ func (app *schedulerApplication) isSuitableComputeWorker(n *node.Node, rt *regis
}

func (app *schedulerApplication) isSuitableStorageWorker(n *node.Node, rt *registry.Runtime, ts time.Time) bool {
return n.HasRoles(node.RoleStorageWorker)
if !n.HasRoles(node.RoleStorageWorker) {
return false
}
for _, nrt := range n.Runtimes {
if !nrt.ID.Equal(&rt.ID) {
continue
}
return true
}
return false
}

func (app *schedulerApplication) isSuitableTransactionScheduler(n *node.Node, rt *registry.Runtime, ts time.Time) bool {
if !n.HasRoles(node.RoleTransactionScheduler) {
if !n.HasRoles(node.RoleComputeWorker) {
return false
}
for _, nrt := range n.Runtimes {
Expand All @@ -393,7 +403,26 @@ func (app *schedulerApplication) isSuitableTransactionScheduler(n *node.Node, rt
}

func (app *schedulerApplication) isSuitableMergeWorker(n *node.Node, rt *registry.Runtime, ts time.Time) bool {
return n.HasRoles(node.RoleMergeWorker)
if !n.HasRoles(node.RoleComputeWorker) {
return false
}
for _, nrt := range n.Runtimes {
if !nrt.ID.Equal(&rt.ID) {
continue
}
return true
}
return false
}

// GetPerm generates a permutation that we use to choose nodes from a list of eligible nodes to elect.
func GetPerm(beacon []byte, runtimeID common.Namespace, rngCtx []byte, nrNodes int) ([]int, error) {
drbg, err := drbg.New(crypto.SHA512, beacon, runtimeID[:], rngCtx)
if err != nil {
return nil, errors.Wrap(err, "tendermint/scheduler: couldn't instantiate DRBG")
}
rng := rand.New(mathrand.New(drbg))
return rng.Perm(nrNodes), nil
}

// Operates on consensus connection.
Expand All @@ -419,24 +448,24 @@ func (app *schedulerApplication) electCommittee(ctx *abci.Context, request types

switch kind {
case scheduler.KindCompute:
rngCtx = rngContextCompute
rngCtx = RNGContextCompute
threshold = staking.KindCompute
isSuitableFn = app.isSuitableComputeWorker
workerSize = int(rt.Compute.GroupSize)
backupSize = int(rt.Compute.GroupBackupSize)
case scheduler.KindMerge:
rngCtx = rngContextMerge
rngCtx = RNGContextMerge
threshold = staking.KindCompute
isSuitableFn = app.isSuitableMergeWorker
workerSize = int(rt.Merge.GroupSize)
backupSize = int(rt.Merge.GroupBackupSize)
case scheduler.KindTransactionScheduler:
rngCtx = rngContextTransactionScheduler
rngCtx = RNGContextTransactionScheduler
threshold = staking.KindCompute
isSuitableFn = app.isSuitableTransactionScheduler
workerSize = int(rt.TxnScheduler.GroupSize)
case scheduler.KindStorage:
rngCtx = rngContextStorage
rngCtx = RNGContextStorage
threshold = staking.KindStorage
isSuitableFn = app.isSuitableStorageWorker
workerSize = int(rt.Storage.GroupSize)
Expand Down Expand Up @@ -481,12 +510,10 @@ func (app *schedulerApplication) electCommittee(ctx *abci.Context, request types
}

// Do the actual election.
drbg, err := drbg.New(crypto.SHA512, beacon, rt.ID[:], rngCtx)
idxs, err := GetPerm(beacon, rt.ID, rngCtx, nrNodes)
if err != nil {
return errors.Wrap(err, "tendermint/scheduler: couldn't instantiate DRBG")
return err
}
rng := rand.New(mathrand.New(drbg))
idxs := rng.Perm(nrNodes)

var members []*scheduler.CommitteeNode
for i := 0; i < len(idxs); i++ {
Expand Down Expand Up @@ -567,7 +594,7 @@ func (app *schedulerApplication) electValidators(ctx *abci.Context, beacon []byt
}

// Shuffle the node list.
drbg, err := drbg.New(crypto.SHA512, beacon, nil, rngContextValidators)
drbg, err := drbg.New(crypto.SHA512, beacon, nil, RNGContextValidators)
if err != nil {
return errors.Wrap(err, "tendermint/scheduler: couldn't instantiate DRBG")
}
Expand Down Expand Up @@ -644,7 +671,7 @@ func publicKeyMapToSliceByStake(entMap map[signature.PublicKey]bool, entityStake
entities := publicKeyMapToSortedSlice(entMap)

// Shuffle the sorted slice to make tie-breaks "random".
drbg, err := drbg.New(crypto.SHA512, beacon, nil, rngContextEntities)
drbg, err := drbg.New(crypto.SHA512, beacon, nil, RNGContextEntities)
if err != nil {
return nil, errors.Wrap(err, "tendermint/scheduler: couldn't instantiate DRBG")
}
Expand Down
82 changes: 79 additions & 3 deletions go/oasis-node/cmd/debug/byzantine/byzantine.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,20 @@ func doComputeHonest(cmd *cobra.Command, args []string) {
if err != nil {
panic(fmt.Sprintf("scheduler get committee %s failed: %+v", scheduler.KindStorage, err))
}
transactionSchedulerCommittee, err := schedulerGetCommittee(ht, electionHeight, scheduler.KindTransactionScheduler, defaultRuntimeID)
if err != nil {
panic(fmt.Sprintf("scheduler get committee %s failed: %+v", scheduler.KindTransactionScheduler, err))
}
if err = schedulerCheckNotScheduled(transactionSchedulerCommittee, defaultIdentity.NodeSigner.Public()); err != nil {
panic(fmt.Sprintf("scheduler check not scheduled txnscheduler failed: %+v", err))
}
mergeCommittee, err := schedulerGetCommittee(ht, electionHeight, scheduler.KindMerge, defaultRuntimeID)
if err != nil {
panic(fmt.Sprintf("scheduler get committee %s failed: %+v", scheduler.KindMerge, err))
}
if err = schedulerCheckNotScheduled(mergeCommittee, defaultIdentity.NodeSigner.Public()); err != nil {
panic(fmt.Sprintf("scheduler check not scheduled merge failed: %+v", err))
}

logger.Debug("compute honest: connecting to storage committee")
hnss, err := storageConnectToCommittee(ht, electionHeight, storageCommittee, scheduler.Worker, defaultIdentity)
Expand Down Expand Up @@ -262,10 +272,20 @@ func doComputeWrong(cmd *cobra.Command, args []string) {
if err != nil {
panic(fmt.Sprintf("scheduler get committee %s failed: %+v", scheduler.KindStorage, err))
}
transactionSchedulerCommittee, err := schedulerGetCommittee(ht, electionHeight, scheduler.KindTransactionScheduler, defaultRuntimeID)
if err != nil {
panic(fmt.Sprintf("scheduler get committee %s failed: %+v", scheduler.KindTransactionScheduler, err))
}
if err = schedulerCheckNotScheduled(transactionSchedulerCommittee, defaultIdentity.NodeSigner.Public()); err != nil {
panic(fmt.Sprintf("scheduler check not scheduled txnscheduler failed: %+v", err))
}
mergeCommittee, err := schedulerGetCommittee(ht, electionHeight, scheduler.KindMerge, defaultRuntimeID)
if err != nil {
panic(fmt.Sprintf("scheduler get committee %s failed: %+v", scheduler.KindMerge, err))
}
if err = schedulerCheckNotScheduled(mergeCommittee, defaultIdentity.NodeSigner.Public()); err != nil {
panic(fmt.Sprintf("scheduler check not scheduled merge failed: %+v", err))
}

logger.Debug("compute honest: connecting to storage committee")
hnss, err := storageConnectToCommittee(ht, electionHeight, storageCommittee, scheduler.Worker, defaultIdentity)
Expand Down Expand Up @@ -379,6 +399,20 @@ func doComputeStraggler(cmd *cobra.Command, args []string) {
panic(fmt.Sprintf("scheduler check scheduled failed: %+v", err))
}
logger.Debug("compute straggler: compute schedule ok")
transactionSchedulerCommittee, err := schedulerGetCommittee(ht, electionHeight, scheduler.KindTransactionScheduler, defaultRuntimeID)
if err != nil {
panic(fmt.Sprintf("scheduler get committee %s failed: %+v", scheduler.KindTransactionScheduler, err))
}
if err = schedulerCheckNotScheduled(transactionSchedulerCommittee, defaultIdentity.NodeSigner.Public()); err != nil {
panic(fmt.Sprintf("scheduler check not scheduled txnscheduler failed: %+v", err))
}
mergeCommittee, err := schedulerGetCommittee(ht, electionHeight, scheduler.KindMerge, defaultRuntimeID)
if err != nil {
panic(fmt.Sprintf("scheduler get committee %s failed: %+v", scheduler.KindMerge, err))
}
if err = schedulerCheckNotScheduled(mergeCommittee, defaultIdentity.NodeSigner.Public()); err != nil {
panic(fmt.Sprintf("scheduler check not scheduled merge failed: %+v", err))
}

cbc := newComputeBatchContext()

Expand Down Expand Up @@ -424,7 +458,7 @@ func doMergeHonest(cmd *cobra.Command, args []string) {
panic(fmt.Sprintf("epochtimeWaitForEpoch: %+v", err))
}

if err = registryRegisterNode(ht.service, defaultIdentity, common.DataDir(), fakeAddresses, ph.service.Addresses(), defaultRuntimeID, nil, node.RoleMergeWorker); err != nil {
if err = registryRegisterNode(ht.service, defaultIdentity, common.DataDir(), fakeAddresses, ph.service.Addresses(), defaultRuntimeID, nil, node.RoleComputeWorker); err != nil {
panic(fmt.Sprintf("registryRegisterNode: %+v", err))
}

Expand All @@ -440,10 +474,24 @@ func doMergeHonest(cmd *cobra.Command, args []string) {
panic(fmt.Sprintf("scheduler check scheduled failed: %+v", err))
}
logger.Debug("merge honest: merge schedule ok")
computeCommittee, err := schedulerGetCommittee(ht, electionHeight, scheduler.KindCompute, defaultRuntimeID)
if err != nil {
panic(fmt.Sprintf("scheduler get committee %s failed: %+v", scheduler.KindCompute, err))
}
if err = schedulerCheckNotScheduled(computeCommittee, defaultIdentity.NodeSigner.Public()); err != nil {
panic(fmt.Sprintf("scheduler check not scheduled compute failed: %+v", err))
}
storageCommittee, err := schedulerGetCommittee(ht, electionHeight, scheduler.KindStorage, defaultRuntimeID)
if err != nil {
panic(fmt.Sprintf("scheduler get committee %s failed: %+v", scheduler.KindStorage, err))
}
transactionSchedulerCommittee, err := schedulerGetCommittee(ht, electionHeight, scheduler.KindTransactionScheduler, defaultRuntimeID)
if err != nil {
panic(fmt.Sprintf("scheduler get committee %s failed: %+v", scheduler.KindTransactionScheduler, err))
}
if err = schedulerCheckNotScheduled(transactionSchedulerCommittee, defaultIdentity.NodeSigner.Public()); err != nil {
panic(fmt.Sprintf("scheduler check not scheduled txnscheduler failed: %+v", err))
}

logger.Debug("merge honest: connecting to storage committee")
hnss, err := storageConnectToCommittee(ht, electionHeight, storageCommittee, scheduler.Worker, defaultIdentity)
Expand Down Expand Up @@ -518,7 +566,7 @@ func doMergeWrong(cmd *cobra.Command, args []string) {
panic(fmt.Sprintf("epochtimeWaitForEpoch: %+v", err))
}

if err = registryRegisterNode(ht.service, defaultIdentity, common.DataDir(), fakeAddresses, ph.service.Addresses(), defaultRuntimeID, nil, node.RoleMergeWorker); err != nil {
if err = registryRegisterNode(ht.service, defaultIdentity, common.DataDir(), fakeAddresses, ph.service.Addresses(), defaultRuntimeID, nil, node.RoleComputeWorker); err != nil {
panic(fmt.Sprintf("registryRegisterNode: %+v", err))
}

Expand All @@ -534,10 +582,24 @@ func doMergeWrong(cmd *cobra.Command, args []string) {
panic(fmt.Sprintf("scheduler check scheduled failed: %+v", err))
}
logger.Debug("merge wrong: merge schedule ok")
computeCommittee, err := schedulerGetCommittee(ht, electionHeight, scheduler.KindCompute, defaultRuntimeID)
if err != nil {
panic(fmt.Sprintf("scheduler get committee %s failed: %+v", scheduler.KindCompute, err))
}
if err = schedulerCheckNotScheduled(computeCommittee, defaultIdentity.NodeSigner.Public()); err != nil {
panic(fmt.Sprintf("scheduler check not scheduled compute failed: %+v", err))
}
storageCommittee, err := schedulerGetCommittee(ht, electionHeight, scheduler.KindStorage, defaultRuntimeID)
if err != nil {
panic(fmt.Sprintf("scheduler get committee %s failed: %+v", scheduler.KindStorage, err))
}
transactionSchedulerCommittee, err := schedulerGetCommittee(ht, electionHeight, scheduler.KindTransactionScheduler, defaultRuntimeID)
if err != nil {
panic(fmt.Sprintf("scheduler get committee %s failed: %+v", scheduler.KindTransactionScheduler, err))
}
if err = schedulerCheckNotScheduled(transactionSchedulerCommittee, defaultIdentity.NodeSigner.Public()); err != nil {
panic(fmt.Sprintf("scheduler check not scheduled txnscheduler failed: %+v", err))
}

logger.Debug("merge wrong: connecting to storage committee")
hnss, err := storageConnectToCommittee(ht, electionHeight, storageCommittee, scheduler.Worker, defaultIdentity)
Expand Down Expand Up @@ -636,7 +698,7 @@ func doMergeStraggler(cmd *cobra.Command, args []string) {
panic(fmt.Sprintf("epochtimeWaitForEpoch: %+v", err))
}

if err = registryRegisterNode(ht.service, defaultIdentity, common.DataDir(), fakeAddresses, ph.service.Addresses(), defaultRuntimeID, nil, node.RoleMergeWorker); err != nil {
if err = registryRegisterNode(ht.service, defaultIdentity, common.DataDir(), fakeAddresses, ph.service.Addresses(), defaultRuntimeID, nil, node.RoleComputeWorker); err != nil {
panic(fmt.Sprintf("registryRegisterNode: %+v", err))
}

Expand All @@ -652,6 +714,20 @@ func doMergeStraggler(cmd *cobra.Command, args []string) {
panic(fmt.Sprintf("scheduler check scheduled failed: %+v", err))
}
logger.Debug("merge straggler: merge schedule ok")
computeCommittee, err := schedulerGetCommittee(ht, electionHeight, scheduler.KindCompute, defaultRuntimeID)
if err != nil {
panic(fmt.Sprintf("scheduler get committee %s failed: %+v", scheduler.KindCompute, err))
}
if err = schedulerCheckNotScheduled(computeCommittee, defaultIdentity.NodeSigner.Public()); err != nil {
panic(fmt.Sprintf("scheduler check not scheduled compute failed: %+v", err))
}
transactionSchedulerCommittee, err := schedulerGetCommittee(ht, electionHeight, scheduler.KindTransactionScheduler, defaultRuntimeID)
if err != nil {
panic(fmt.Sprintf("scheduler get committee %s failed: %+v", scheduler.KindTransactionScheduler, err))
}
if err = schedulerCheckNotScheduled(transactionSchedulerCommittee, defaultIdentity.NodeSigner.Public()); err != nil {
panic(fmt.Sprintf("scheduler check not scheduled txnscheduler failed: %+v", err))
}

mbc := newMergeBatchContext()

Expand Down
2 changes: 1 addition & 1 deletion go/oasis-node/cmd/debug/byzantine/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func mergeReceiveCommitment(ph *p2pHandle) (*commitment.OpenComputeCommitment, e
req.responseCh <- nil

if req.msg.ComputeWorkerFinished == nil {
return nil, errors.Errorf("expecting signed transaction scheduler batch dispatch message, got %+v", req.msg)
return nil, errors.Errorf("expecting compute worker finished message, got %+v", req.msg)
}

openCom, err := req.msg.ComputeWorkerFinished.Commitment.Open()
Expand Down
Loading

0 comments on commit 194461b

Please sign in to comment.