Skip to content

Commit

Permalink
Add runtime scheduling constraints
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Feb 11, 2021
1 parent 79ee196 commit 96db053
Show file tree
Hide file tree
Showing 21 changed files with 489 additions and 167 deletions.
255 changes: 166 additions & 89 deletions go/consensus/tendermint/apps/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,9 @@ func (app *schedulerApplication) BeginBlock(ctx *api.Context, request types.Requ

// Handle the validator election first, because no consensus is
// catastrophic, while failing to elect other committees is not.
var validatorEntities map[staking.Address]bool
if !params.DebugStaticValidators {
if err = app.electValidators(ctx, entropy, stakeAcc, entitiesEligibleForReward, nodes, params); err != nil {
if validatorEntities, err = app.electValidators(ctx, entropy, stakeAcc, entitiesEligibleForReward, nodes, params); err != nil {
// It is unclear what the behavior should be if the validator
// election fails. The system can not ensure integrity, so
// presumably manual intervention is required...
Expand All @@ -177,7 +178,18 @@ func (app *schedulerApplication) BeginBlock(ctx *api.Context, request types.Requ
scheduler.KindStorage,
}
for _, kind := range kinds {
if err = app.electAllCommittees(ctx, request, epoch, entropy, stakeAcc, entitiesEligibleForReward, runtimes, committeeNodes, kind); err != nil {
if err = app.electAllCommittees(
ctx,
request,
epoch,
entropy,
stakeAcc,
entitiesEligibleForReward,
validatorEntities,
runtimes,
committeeNodes,
kind,
); err != nil {
return fmt.Errorf("tendermint/scheduler: couldn't elect %s committees: %w", kind, err)
}
}
Expand Down Expand Up @@ -349,12 +361,13 @@ func GetPerm(beacon []byte, runtimeID common.Namespace, rngCtx []byte, nrNodes i
// Operates on consensus connection.
// Return error if node should crash.
// For non-fatal problems, save a problem condition to the state and return successfully.
func (app *schedulerApplication) electCommittee(
func (app *schedulerApplication) electCommittee( //nolint: gocyclo
ctx *api.Context,
epoch beacon.EpochTime,
beacon []byte,
stakeAcc *stakingState.StakeAccumulatorCache,
entitiesEligibleForReward map[staking.Address]bool,
validatorEntities map[staking.Address]bool,
rt *registry.Runtime,
nodes []*node.Node,
kind scheduler.CommitteeKind,
Expand All @@ -367,31 +380,52 @@ func (app *schedulerApplication) electCommittee(
// Determine the context, committee size, and pre-filter the node-list
// based on eligibility and entity stake.
var (
err error
nodeList []*node.Node
err error

rngCtx []byte
isSuitableFn func(*api.Context, *node.Node, *registry.Runtime) bool

workerSize, backupSize, minPoolSize int
)

groupSizes := make(map[scheduler.Role]int)
switch kind {
case scheduler.KindComputeExecutor:
rngCtx = RNGContextExecutor
isSuitableFn = app.isSuitableExecutorWorker
workerSize = int(rt.Executor.GroupSize)
backupSize = int(rt.Executor.GroupBackupSize)
minPoolSize = int(rt.Executor.MinPoolSize)
groupSizes[scheduler.RoleWorker] = int(rt.Executor.GroupSize)
groupSizes[scheduler.RoleBackupWorker] = int(rt.Executor.GroupBackupSize)
case scheduler.KindStorage:
rngCtx = RNGContextStorage
isSuitableFn = app.isSuitableStorageWorker
workerSize = int(rt.Storage.GroupSize)
minPoolSize = int(rt.Storage.MinPoolSize)
groupSizes[scheduler.RoleWorker] = int(rt.Storage.GroupSize)
default:
return fmt.Errorf("tendermint/scheduler: invalid committee type: %v", kind)
}

// Ensure that it is theoretically possible to elect a valid committee.
if groupSizes[scheduler.RoleWorker] == 0 {
ctx.Logger().Error("empty committee not allowed",
"kind", kind,
"runtime_id", rt.ID,
)
if err = schedulerState.NewMutableState(ctx.State()).DropCommittee(ctx, kind, rt.ID); err != nil {
return fmt.Errorf("failed to drop committee: %w", err)
}
return nil
}

// Decode per-role constraints.
var cs map[scheduler.Role]registry.SchedulingConstraints
switch {
case rt.Constraints == nil:
fallthrough
case rt.Constraints[kind] == nil:
cs = rt.Constraints[kind]
default:
cs = make(map[scheduler.Role]registry.SchedulingConstraints)
}

// Perform pre-election eligiblity filtering.
nodeLists := make(map[scheduler.Role][]*node.Node)
for _, n := range nodes {
// Check if an entity has enough stake.
entAddr := staking.NewAddress(n.EntityID)
Expand All @@ -400,89 +434,129 @@ func (app *schedulerApplication) electCommittee(
continue
}
}
if isSuitableFn(ctx, n, rt) {
nodeList = append(nodeList, n)
if entitiesEligibleForReward != nil {
entitiesEligibleForReward[entAddr] = true
// Check general node compatibility.
if !isSuitableFn(ctx, n, rt) {
continue
}

// Check pre-election scheduling constraints.
var eligible bool
for _, role := range []scheduler.Role{scheduler.RoleWorker, scheduler.RoleBackupWorker} {
if groupSizes[role] == 0 {
continue
}

// Validator set membership constraint.
if cs[role].ValidatorSet != nil {
if !validatorEntities[entAddr] {
// Not eligible if not in the validator set.
continue
}
}

nodeLists[role] = append(nodeLists[role], n)
eligible = true
}
if !eligible {
continue
}
}

// Ensure that it is theoretically possible to elect a valid committee.
if workerSize == 0 {
ctx.Logger().Error("empty committee not allowed",
"kind", kind,
"runtime_id", rt.ID,
)
if err = schedulerState.NewMutableState(ctx.State()).DropCommittee(ctx, kind, rt.ID); err != nil {
return fmt.Errorf("failed to drop committee: %w", err)
if entitiesEligibleForReward != nil {
entitiesEligibleForReward[entAddr] = true
}
return nil
}

nrNodes := len(nodeList)
// Perform election.
var members []*scheduler.CommitteeNode
memberSet := make(map[signature.PublicKey]bool)
for _, role := range []scheduler.Role{scheduler.RoleWorker, scheduler.RoleBackupWorker} {
if groupSizes[role] == 0 {
continue
}

if nrNodes < minPoolSize {
ctx.Logger().Error("not enough eligible nodes",
"kind", kind,
"runtime_id", rt.ID,
"nr_nodes", nrNodes,
"min_pool_size", minPoolSize,
)
if err = schedulerState.NewMutableState(ctx.State()).DropCommittee(ctx, kind, rt.ID); err != nil {
return fmt.Errorf("failed to drop committee: %w", err)
nrNodes := len(nodeLists[role])

// Check pre-election scheduling constraints.
var minPoolSize int
if cs[role].MinPoolSize != nil {
minPoolSize = int(cs[role].MinPoolSize.Limit)
}
return nil
}

wantedNodes := workerSize + backupSize
if wantedNodes > nrNodes {
ctx.Logger().Error("committee size exceeds available nodes (pre-stake)",
"kind", kind,
"runtime_id", rt.ID,
"worker_size", workerSize,
"backup_size", backupSize,
"nr_nodes", nrNodes,
)
if err = schedulerState.NewMutableState(ctx.State()).DropCommittee(ctx, kind, rt.ID); err != nil {
return fmt.Errorf("failed to drop committee: %w", err)
if nrNodes < minPoolSize {
ctx.Logger().Error("not enough eligible nodes",
"kind", kind,
"role", role,
"runtime_id", rt.ID,
"nr_nodes", nrNodes,
"min_pool_size", minPoolSize,
)
if err = schedulerState.NewMutableState(ctx.State()).DropCommittee(ctx, kind, rt.ID); err != nil {
return fmt.Errorf("failed to drop committee: %w", err)
}
return nil
}
return nil
}

// Do the actual election.
idxs, err := GetPerm(beacon, rt.ID, rngCtx, nrNodes)
if err != nil {
return err
}
wantedNodes := groupSizes[role]
if wantedNodes > nrNodes {
ctx.Logger().Error("committee size exceeds available nodes (pre-stake)",
"kind", kind,
"runtime_id", rt.ID,
"wanted_nodes", wantedNodes,
"nr_nodes", nrNodes,
)
if err = schedulerState.NewMutableState(ctx.State()).DropCommittee(ctx, kind, rt.ID); err != nil {
return fmt.Errorf("failed to drop committee: %w", err)
}
return nil
}

var members []*scheduler.CommitteeNode
for i := 0; i < len(idxs); i++ {
role := scheduler.RoleWorker
if i >= workerSize {
role = scheduler.RoleBackupWorker
// Do the actual election.
var idxs []int
idxs, err = GetPerm(beacon, rt.ID, rngCtx, nrNodes)
if err != nil {
return fmt.Errorf("failed to derive permutation: %w", err)
}
members = append(members, &scheduler.CommitteeNode{
Role: role,
PublicKey: nodeList[idxs[i]].ID,
})
if len(members) >= wantedNodes {
break

var elected []*scheduler.CommitteeNode
nodesPerEntity := make(map[signature.PublicKey]int)
for _, idx := range idxs {
n := nodeLists[role][idx]
if memberSet[n.ID] {
continue
}

// Check election-time scheduling constraints.
if mn := cs[role].MaxNodes; mn != nil {
if nodesPerEntity[n.EntityID] > int(mn.Limit) {
continue
}
nodesPerEntity[n.EntityID]++
}

elected = append(elected, &scheduler.CommitteeNode{
Role: role,
PublicKey: n.ID,
})
memberSet[n.ID] = true
if len(elected) >= wantedNodes {
break
}
}
}

if len(members) != wantedNodes {
ctx.Logger().Error("insufficient nodes with adequate stake to elect",
"kind", kind,
"runtime_id", rt.ID,
"worker_size", workerSize,
"backup_size", backupSize,
"available", len(members),
)
if err = schedulerState.NewMutableState(ctx.State()).DropCommittee(ctx, kind, rt.ID); err != nil {
return fmt.Errorf("failed to drop committee: %w", err)
if len(elected) != wantedNodes {
ctx.Logger().Error("insufficient nodes with adequate stake to elect",
"kind", kind,
"role", role,
"runtime_id", rt.ID,
"available", len(elected),
)
if err = schedulerState.NewMutableState(ctx.State()).DropCommittee(ctx, kind, rt.ID); err != nil {
return fmt.Errorf("failed to drop committee: %w", err)
}
return nil
}
return nil

members = append(members, elected...)
}

err = schedulerState.NewMutableState(ctx.State()).PutCommittee(ctx, &scheduler.Committee{
Expand All @@ -505,12 +579,13 @@ func (app *schedulerApplication) electAllCommittees(
beacon []byte,
stakeAcc *stakingState.StakeAccumulatorCache,
entitiesEligibleForReward map[staking.Address]bool,
validatorEntities map[staking.Address]bool,
runtimes []*registry.Runtime,
nodes []*node.Node,
kind scheduler.CommitteeKind,
) error {
for _, runtime := range runtimes {
if err := app.electCommittee(ctx, epoch, beacon, stakeAcc, entitiesEligibleForReward, runtime, nodes, kind); err != nil {
if err := app.electCommittee(ctx, epoch, beacon, stakeAcc, entitiesEligibleForReward, validatorEntities, runtime, nodes, kind); err != nil {
return err
}
}
Expand All @@ -524,7 +599,7 @@ func (app *schedulerApplication) electValidators(
entitiesEligibleForReward map[staking.Address]bool,
nodes []*node.Node,
params *scheduler.ConsensusParameters,
) error {
) (map[staking.Address]bool, error) {
// Filter the node list based on eligibility and minimum required
// entity stake.
var nodeList []*node.Node
Expand All @@ -547,13 +622,13 @@ func (app *schedulerApplication) electValidators(
// nodes by descending stake.
sortedEntities, err := stakingAddressMapToSliceByStake(entities, stakeAcc, beacon)
if err != nil {
return err
return nil, err
}

// Shuffle the node list.
drbg, err := drbg.New(crypto.SHA512, beacon, nil, RNGContextValidators)
if err != nil {
return fmt.Errorf("tendermint/scheduler: couldn't instantiate DRBG: %w", err)
return nil, fmt.Errorf("tendermint/scheduler: couldn't instantiate DRBG: %w", err)
}
rngSrc := mathrand.New(drbg)
rng := rand.New(rngSrc)
Expand All @@ -574,6 +649,7 @@ func (app *schedulerApplication) electValidators(

// Go down the list of entities running nodes by stake, picking one node
// to act as a validator till the maximum is reached.
validatorEntities := make(map[staking.Address]bool)
newValidators := make(map[signature.PublicKey]int64)
electLoop:
for _, entAddr := range sortedEntities {
Expand Down Expand Up @@ -604,16 +680,17 @@ electLoop:
var stake *quantity.Quantity
stake, err = stakeAcc.GetEscrowBalance(entAddr)
if err != nil {
return fmt.Errorf("failed to fetch escrow balance for account %s: %w", entAddr, err)
return nil, fmt.Errorf("failed to fetch escrow balance for account %s: %w", entAddr, err)
}
power, err = scheduler.VotingPowerFromStake(stake)
if err != nil {
return fmt.Errorf("computing voting power for account %s with balance %v: %w",
return nil, fmt.Errorf("computing voting power for account %s with balance %v: %w",
entAddr, stake, err,
)
}
}

validatorEntities[entAddr] = true
newValidators[n.Consensus.ID] = power
if len(newValidators) >= params.MaxValidators {
break electLoop
Expand All @@ -622,20 +699,20 @@ electLoop:
}

if len(newValidators) == 0 {
return fmt.Errorf("tendermint/scheduler: failed to elect any validators")
return nil, fmt.Errorf("tendermint/scheduler: failed to elect any validators")
}
if len(newValidators) < params.MinValidators {
return fmt.Errorf("tendermint/scheduler: insufficient validators")
return nil, fmt.Errorf("tendermint/scheduler: insufficient validators")
}

// Set the new pending validator set in the ABCI state. It needs to be
// applied in EndBlock.
state := schedulerState.NewMutableState(ctx.State())
if err = state.PutPendingValidators(ctx, newValidators); err != nil {
return fmt.Errorf("failed to set pending validators: %w", err)
return nil, fmt.Errorf("failed to set pending validators: %w", err)
}

return nil
return validatorEntities, nil
}

func stakingAddressMapToSliceByStake(
Expand Down
Loading

0 comments on commit 96db053

Please sign in to comment.