Skip to content

Commit

Permalink
core/scheduler: add support for DutyAggregator (#1113)
Browse files Browse the repository at this point in the history
Add support for `DutyAggregator` to package `scheduler`.

category: feature
ticket: #1075
  • Loading branch information
xenowits committed Sep 12, 2022
1 parent 9f32cac commit ecaddb9
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 16 deletions.
16 changes: 16 additions & 0 deletions core/dutydefinition.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,19 @@ func (d ProposerDefinition) Clone() (DutyDefinition, error) {
func (d ProposerDefinition) MarshalJSON() ([]byte, error) {
return d.ProposerDuty.MarshalJSON()
}

// EmptyDefinition is an empty implementation of DutyDefinition.
type EmptyDefinition struct{}

// NewEmptyDefinition returns a convenience function that returns a new EmptyDefinition.
func NewEmptyDefinition() EmptyDefinition {
return EmptyDefinition{}
}

func (EmptyDefinition) Clone() (DutyDefinition, error) {
return EmptyDefinition{}, nil
}

func (EmptyDefinition) MarshalJSON() ([]byte, error) {
return nil, nil
}
2 changes: 1 addition & 1 deletion core/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ type Broadcaster interface {
Broadcast(context.Context, Duty, PubKey, SignedData) error
}

// wireFuncs defines the core workflow components as a list input and output functions
// wireFuncs defines the core workflow components as a list of input and output functions
// instead as interfaces, since functions are easier to wrap than interfaces.
type wireFuncs struct {
SchedulerSubscribeDuties func(func(context.Context, Duty, DutyDefinitionSet) error)
Expand Down
9 changes: 4 additions & 5 deletions core/scheduler/offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ import (
"github.com/obolnetwork/charon/core"
)

// slotOffsets defines the offsets
// at which duties should be triggered.
// slotOffsets defines the offsets at which the duties should be triggered.
var slotOffsets = map[core.DutyType]func(time.Duration) time.Duration{
core.DutyAttester: fraction(1, 3), // 1/3 slot duration
core.DutyAttester: fraction(1, 3), // 1/3 slot duration
core.DutyAggregator: fraction(2, 3), // 2/3 slot duration
// TODO(corver): Add more duties
}

// fraction returns a function that calculates slot offset
// based on the fraction x/y of total slot duration.
// fraction returns a function that calculates slot offset based on the fraction x/y of total slot duration.
func fraction(x, y int64) func(time.Duration) time.Duration {
return func(total time.Duration) time.Duration {
return (total * time.Duration(x)) / time.Duration(y)
Expand Down
16 changes: 11 additions & 5 deletions core/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
// delayFunc abstracts slot offset delaying/sleeping for deterministic tests.
type delayFunc func(duty core.Duty, deadline time.Time) <-chan time.Time

// NewForT returns a new scheduler for testing supporting a fake clock.
// NewForT returns a new scheduler for testing using a fake clock.
func NewForT(t *testing.T, clock clockwork.Clock, delayFunc delayFunc, pubkeys []core.PubKey,
eth2Cl eth2wrap.Client, builderAPI bool,
) *Scheduler {
Expand Down Expand Up @@ -128,7 +128,7 @@ func (s *Scheduler) Run() error {
}
}

// emitCoreSlot calls all slot subscribes with the provided slot.
// emitCoreSlot calls all slot subscriptions with the provided slot.
func (s *Scheduler) emitCoreSlot(ctx context.Context, slot core.Slot) {
for _, sub := range s.slotSubs {
err := sub(ctx, slot)
Expand Down Expand Up @@ -158,8 +158,8 @@ func (s *Scheduler) GetDutyDefinition(ctx context.Context, duty core.Duty) (core

defSet, ok := s.getDutyDefinitionSet(duty)
if !ok {
return nil, errors.New("duty not resolved although epoch is marked as resolved",
z.Any("duty", duty))
return nil, errors.New("duty not resolved although epoch is resolved",
z.Any("duty", duty), z.U64("epoch", epoch))
}

return defSet.Clone() // Clone before returning.
Expand Down Expand Up @@ -288,7 +288,7 @@ func (s *Scheduler) resolveAttDuties(ctx context.Context, slot core.Slot, vals v
continue
}

duty := core.Duty{Slot: int64(attDuty.Slot), Type: core.DutyAttester}
duty := core.NewAttesterDuty(int64(attDuty.Slot))

pubkey, ok := vals.PubKeyFromIndex(attDuty.ValidatorIndex)
if !ok {
Expand All @@ -306,6 +306,12 @@ func (s *Scheduler) resolveAttDuties(ctx context.Context, slot core.Slot, vals v
z.U64("slot", uint64(attDuty.Slot)),
z.U64("commidx", uint64(attDuty.CommitteeIndex)),
z.Any("pubkey", pubkey))

// Schedule aggregation duty as well.
aggDuty := core.NewAggregatorDuty(int64(attDuty.Slot))
if !s.setDutyDefinition(aggDuty, pubkey, core.NewEmptyDefinition()) {
continue
}
}

if len(remaining) > 0 {
Expand Down
18 changes: 13 additions & 5 deletions core/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,20 +176,20 @@ func TestSchedulerDuties(t *testing.T) {
// All duties grouped in first slot of epoch
Name: "grouped",
Factor: 0,
Results: 2,
Results: 3,
},
{
// All duties spread in first N slots of epoch (N is number of validators)
Name: "spread",
Factor: 1,
Results: 6,
Results: 9,
},
{
// All duties spread in first N slots of epoch (except first proposer errors)
Name: "spread_errors",
Factor: 1,
PropErrs: 1,
Results: 5,
Results: 8,
},
}

Expand Down Expand Up @@ -317,15 +317,17 @@ func TestScheduler_GetDuty(t *testing.T) {
_, err = sched.GetDutyDefinition(context.Background(), core.NewAttesterDuty(0))
require.ErrorContains(t, err, "epoch not resolved yet")

_, err = sched.GetDutyDefinition(context.Background(), core.NewAggregatorDuty(0))
require.ErrorContains(t, err, "epoch not resolved yet")

_, err = sched.GetDutyDefinition(context.Background(), core.NewBuilderProposerDuty(0))
require.ErrorContains(t, err, "builder-api not enabled")

slotDuration, err := eth2Cl.SlotDuration(context.Background())
require.NoError(t, err)

clock.CallbackAfter(t0.Add(slotDuration).Add(time.Second), func() {
res, err := sched.GetDutyDefinition(context.Background(), core.Duty{Slot: 0, Type: core.DutyAttester})

res, err := sched.GetDutyDefinition(context.Background(), core.NewAttesterDuty(0))
require.NoError(t, err)

pubKeys, err := valSet.CorePubKeys()
Expand All @@ -335,6 +337,12 @@ func TestScheduler_GetDuty(t *testing.T) {
require.NotNil(t, res[pubKey])
}

res, err = sched.GetDutyDefinition(context.Background(), core.NewAggregatorDuty(0))
require.NoError(t, err)
for _, pubKey := range pubKeys {
require.NotNil(t, res[pubKey])
}

sched.Stop()
})

Expand Down
9 changes: 9 additions & 0 deletions core/scheduler/testdata/TestSchedulerDuties_grouped.golden
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,14 @@
"0x8ee91545183c8c2db86633626f5074fd8ef93c4c9b7a2879ad1768f600c5b5906c3af20d47de42c3b032956fa8db1a76": "{\"pubkey\":\"0x8ee91545183c8c2db86633626f5074fd8ef93c4c9b7a2879ad1768f600c5b5906c3af20d47de42c3b032956fa8db1a76\",\"slot\":\"0\",\"validator_index\":\"3\",\"committee_index\":\"0\",\"committee_length\":\"16\",\"committees_at_slot\":\"16\",\"validator_committee_index\":\"3\"}",
"0x914cff835a769156ba43ad50b931083c2dadd94e8359ce394bc7a3e06424d0214922ddf15f81640530b9c25c0bc0d490": "{\"pubkey\":\"0x914cff835a769156ba43ad50b931083c2dadd94e8359ce394bc7a3e06424d0214922ddf15f81640530b9c25c0bc0d490\",\"slot\":\"0\",\"validator_index\":\"1\",\"committee_index\":\"0\",\"committee_length\":\"16\",\"committees_at_slot\":\"16\",\"validator_committee_index\":\"1\"}"
}
},
{
"Time": "00:08.000",
"duty": "0/aggregator",
"DutyDefSet": {
"0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea": "",
"0x8ee91545183c8c2db86633626f5074fd8ef93c4c9b7a2879ad1768f600c5b5906c3af20d47de42c3b032956fa8db1a76": "",
"0x914cff835a769156ba43ad50b931083c2dadd94e8359ce394bc7a3e06424d0214922ddf15f81640530b9c25c0bc0d490": ""
}
}
]
21 changes: 21 additions & 0 deletions core/scheduler/testdata/TestSchedulerDuties_spread.golden
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@
"0x914cff835a769156ba43ad50b931083c2dadd94e8359ce394bc7a3e06424d0214922ddf15f81640530b9c25c0bc0d490": "{\"pubkey\":\"0x914cff835a769156ba43ad50b931083c2dadd94e8359ce394bc7a3e06424d0214922ddf15f81640530b9c25c0bc0d490\",\"slot\":\"0\",\"validator_index\":\"1\",\"committee_index\":\"0\",\"committee_length\":\"16\",\"committees_at_slot\":\"16\",\"validator_committee_index\":\"1\"}"
}
},
{
"Time": "00:08.000",
"duty": "0/aggregator",
"DutyDefSet": {
"0x914cff835a769156ba43ad50b931083c2dadd94e8359ce394bc7a3e06424d0214922ddf15f81640530b9c25c0bc0d490": ""
}
},
{
"Time": "00:00.000",
"duty": "1/proposer",
Expand All @@ -27,6 +34,13 @@
"0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea": "{\"pubkey\":\"0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea\",\"slot\":\"1\",\"validator_index\":\"2\",\"committee_index\":\"1\",\"committee_length\":\"16\",\"committees_at_slot\":\"16\",\"validator_committee_index\":\"2\"}"
}
},
{
"Time": "00:20.000",
"duty": "1/aggregator",
"DutyDefSet": {
"0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea": ""
}
},
{
"Time": "00:00.000",
"duty": "2/proposer",
Expand All @@ -40,5 +54,12 @@
"DutyDefSet": {
"0x8ee91545183c8c2db86633626f5074fd8ef93c4c9b7a2879ad1768f600c5b5906c3af20d47de42c3b032956fa8db1a76": "{\"pubkey\":\"0x8ee91545183c8c2db86633626f5074fd8ef93c4c9b7a2879ad1768f600c5b5906c3af20d47de42c3b032956fa8db1a76\",\"slot\":\"2\",\"validator_index\":\"3\",\"committee_index\":\"2\",\"committee_length\":\"16\",\"committees_at_slot\":\"16\",\"validator_committee_index\":\"3\"}"
}
},
{
"Time": "00:32.000",
"duty": "2/aggregator",
"DutyDefSet": {
"0x8ee91545183c8c2db86633626f5074fd8ef93c4c9b7a2879ad1768f600c5b5906c3af20d47de42c3b032956fa8db1a76": ""
}
}
]
21 changes: 21 additions & 0 deletions core/scheduler/testdata/TestSchedulerDuties_spread_errors.golden
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
"0x914cff835a769156ba43ad50b931083c2dadd94e8359ce394bc7a3e06424d0214922ddf15f81640530b9c25c0bc0d490": "{\"pubkey\":\"0x914cff835a769156ba43ad50b931083c2dadd94e8359ce394bc7a3e06424d0214922ddf15f81640530b9c25c0bc0d490\",\"slot\":\"0\",\"validator_index\":\"1\",\"committee_index\":\"0\",\"committee_length\":\"16\",\"committees_at_slot\":\"16\",\"validator_committee_index\":\"1\"}"
}
},
{
"Time": "00:08.000",
"duty": "0/aggregator",
"DutyDefSet": {
"0x914cff835a769156ba43ad50b931083c2dadd94e8359ce394bc7a3e06424d0214922ddf15f81640530b9c25c0bc0d490": ""
}
},
{
"Time": "00:00.000",
"duty": "1/proposer",
Expand All @@ -20,6 +27,13 @@
"0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea": "{\"pubkey\":\"0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea\",\"slot\":\"1\",\"validator_index\":\"2\",\"committee_index\":\"1\",\"committee_length\":\"16\",\"committees_at_slot\":\"16\",\"validator_committee_index\":\"2\"}"
}
},
{
"Time": "00:20.000",
"duty": "1/aggregator",
"DutyDefSet": {
"0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea": ""
}
},
{
"Time": "00:00.000",
"duty": "2/proposer",
Expand All @@ -33,5 +47,12 @@
"DutyDefSet": {
"0x8ee91545183c8c2db86633626f5074fd8ef93c4c9b7a2879ad1768f600c5b5906c3af20d47de42c3b032956fa8db1a76": "{\"pubkey\":\"0x8ee91545183c8c2db86633626f5074fd8ef93c4c9b7a2879ad1768f600c5b5906c3af20d47de42c3b032956fa8db1a76\",\"slot\":\"2\",\"validator_index\":\"3\",\"committee_index\":\"2\",\"committee_length\":\"16\",\"committees_at_slot\":\"16\",\"validator_committee_index\":\"3\"}"
}
},
{
"Time": "00:32.000",
"duty": "2/aggregator",
"DutyDefSet": {
"0x8ee91545183c8c2db86633626f5074fd8ef93c4c9b7a2879ad1768f600c5b5906c3af20d47de42c3b032956fa8db1a76": ""
}
}
]

0 comments on commit ecaddb9

Please sign in to comment.