diff --git a/app/app.go b/app/app.go index 22ea65708..90f6c88a3 100644 --- a/app/app.go +++ b/app/app.go @@ -575,7 +575,7 @@ func wireValidatorMock(conf Config, pubshares []eth2p0.BLSPubKey, sched core.Sch signer := validatormock.NewSigner(secrets...) // Trigger validatormock when scheduler triggers new slot. - sched.Subscribe(func(ctx context.Context, duty core.Duty, _ core.FetchArgSet) error { + sched.Subscribe(func(ctx context.Context, duty core.Duty, _ core.DutyDefinitionSet) error { ctx = log.WithTopic(ctx, "vmock") go func() { addr := "http://" + conf.ValidatorAPIAddr diff --git a/core/dutydefinition.go b/core/dutydefinition.go index 368901766..036a8d28e 100644 --- a/core/dutydefinition.go +++ b/core/dutydefinition.go @@ -22,8 +22,8 @@ import ( ) var ( - _ FetchArg = AttesterDefinition{} - _ FetchArg = ProposerDefinition{} + _ DutyDefinition = AttesterDefinition{} + _ DutyDefinition = ProposerDefinition{} ) // NewAttesterDefinition is a convenience function that returns a new attester definition. @@ -31,13 +31,13 @@ func NewAttesterDefinition(duty *eth2v1.AttesterDuty) AttesterDefinition { return AttesterDefinition{AttesterDuty: *duty} } -// AttesterDefinition defines an attester duty. It implements FetchArg. +// AttesterDefinition defines an attester duty. It implements DutyDefinition. // Note the slight rename from Duty to Definition to avoid overloading the term Duty. type AttesterDefinition struct { eth2v1.AttesterDuty } -func (d AttesterDefinition) Clone() (FetchArg, error) { +func (d AttesterDefinition) Clone() (DutyDefinition, error) { duty := new(eth2v1.AttesterDuty) err := cloneJSONMarshaler(&d.AttesterDuty, duty) if err != nil { @@ -56,13 +56,13 @@ func NewProposerDefinition(duty *eth2v1.ProposerDuty) ProposerDefinition { return ProposerDefinition{ProposerDuty: *duty} } -// ProposerDefinition defines a block proposer duty. It implements FetchArg. +// ProposerDefinition defines a block proposer duty. It implements DutyDefinition. // Note the slight rename from Duty to Definition to avoid overloading the term Duty. type ProposerDefinition struct { eth2v1.ProposerDuty } -func (d ProposerDefinition) Clone() (FetchArg, error) { +func (d ProposerDefinition) Clone() (DutyDefinition, error) { duty := new(eth2v1.ProposerDuty) err := cloneJSONMarshaler(&d.ProposerDuty, duty) if err != nil { diff --git a/core/fetcher/fetcher.go b/core/fetcher/fetcher.go index 27f6e3e3f..2b94d917c 100644 --- a/core/fetcher/fetcher.go +++ b/core/fetcher/fetcher.go @@ -58,7 +58,7 @@ func (f *Fetcher) Subscribe(fn func(context.Context, core.Duty, core.UnsignedDat } // Fetch triggers fetching of a proposed duty data set. -func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, argSet core.FetchArgSet) error { +func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet) error { var ( unsignedSet core.UnsignedDataSet err error @@ -66,12 +66,12 @@ func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, argSet core.FetchAr switch duty.Type { case core.DutyProposer: - unsignedSet, err = f.fetchProposerData(ctx, duty.Slot, argSet) + unsignedSet, err = f.fetchProposerData(ctx, duty.Slot, defSet) if err != nil { return errors.Wrap(err, "fetch proposer data") } case core.DutyAttester: - unsignedSet, err = f.fetchAttesterData(ctx, duty.Slot, argSet) + unsignedSet, err = f.fetchAttesterData(ctx, duty.Slot, defSet) if err != nil { return errors.Wrap(err, "fetch attester data") } @@ -100,14 +100,14 @@ func (f *Fetcher) RegisterAggSigDB(fn func(context.Context, core.Duty, core.PubK } // fetchAttesterData returns the fetched attestation data set for committees and validators in the arg set. -func (f *Fetcher) fetchAttesterData(ctx context.Context, slot int64, argSet core.FetchArgSet, +func (f *Fetcher) fetchAttesterData(ctx context.Context, slot int64, defSet core.DutyDefinitionSet, ) (core.UnsignedDataSet, error) { // We may have multiple validators in the same committee, use the same attestation data in that case. dataByCommIdx := make(map[eth2p0.CommitteeIndex]*eth2p0.AttestationData) resp := make(core.UnsignedDataSet) - for pubkey, fetchArg := range argSet { - attDuty, ok := fetchArg.(core.AttesterDefinition) + for pubkey, def := range defSet { + attDuty, ok := def.(core.AttesterDefinition) if !ok { return nil, errors.New("invalid attester definition") } @@ -134,9 +134,9 @@ func (f *Fetcher) fetchAttesterData(ctx context.Context, slot int64, argSet core return resp, nil } -func (f *Fetcher) fetchProposerData(ctx context.Context, slot int64, argSet core.FetchArgSet) (core.UnsignedDataSet, error) { +func (f *Fetcher) fetchProposerData(ctx context.Context, slot int64, defSet core.DutyDefinitionSet) (core.UnsignedDataSet, error) { resp := make(core.UnsignedDataSet) - for pubkey := range argSet { + for pubkey := range defSet { // Fetch previously aggregated randao reveal from AggSigDB dutyRandao := core.Duty{ Slot: slot, diff --git a/core/fetcher/fetcher_test.go b/core/fetcher/fetcher_test.go index e574150b6..79e190149 100644 --- a/core/fetcher/fetcher_test.go +++ b/core/fetcher/fetcher_test.go @@ -61,7 +61,7 @@ func TestFetchAttester(t *testing.T) { CommitteesAtSlot: notZero, } - argSet := core.FetchArgSet{ + defSet := core.DutyDefinitionSet{ pubkeysByIdx[vIdxA]: core.NewAttesterDefinition(&dutyA), pubkeysByIdx[vIdxB]: core.NewAttesterDefinition(&dutyB), } @@ -89,7 +89,7 @@ func TestFetchAttester(t *testing.T) { return nil }) - err = fetch.Fetch(ctx, duty, argSet) + err = fetch.Fetch(ctx, duty, defSet) require.NoError(t, err) } @@ -115,7 +115,7 @@ func TestFetchProposer(t *testing.T) { Slot: slot, ValidatorIndex: vIdxB, } - argSet := core.FetchArgSet{ + defSet := core.DutyDefinitionSet{ pubkeysByIdx[vIdxA]: core.NewProposerDefinition(&dutyA), pubkeysByIdx[vIdxB]: core.NewProposerDefinition(&dutyB), } @@ -156,7 +156,7 @@ func TestFetchProposer(t *testing.T) { return nil }) - err = fetch.Fetch(ctx, duty, argSet) + err = fetch.Fetch(ctx, duty, defSet) require.NoError(t, err) } diff --git a/core/interfaces.go b/core/interfaces.go index ce22e1542..136d79cc2 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -25,16 +25,16 @@ import ( // Scheduler triggers the start of a duty workflow. type Scheduler interface { // Subscribe registers a callback for fetching a duty. - Subscribe(func(context.Context, Duty, FetchArgSet) error) + Subscribe(func(context.Context, Duty, DutyDefinitionSet) error) - // GetDuty returns the argSet for a duty if resolved already. - GetDuty(context.Context, Duty) (FetchArgSet, error) + // GetDutyDefinition returns the definition set for a duty if already resolved. + GetDutyDefinition(context.Context, Duty) (DutyDefinitionSet, error) } // Fetcher fetches proposed unsigned duty data. type Fetcher interface { // Fetch triggers fetching of a proposed duty data set. - Fetch(context.Context, Duty, FetchArgSet) error + Fetch(context.Context, Duty, DutyDefinitionSet) error // Subscribe registers a callback for proposed unsigned duty data sets. Subscribe(func(context.Context, Duty, UnsignedDataSet) error) @@ -85,8 +85,8 @@ type ValidatorAPI interface { // RegisterPubKeyByAttestation registers a function to query validator by attestation. RegisterPubKeyByAttestation(func(ctx context.Context, slot, commIdx, valCommIdx int64) (PubKey, error)) - // RegisterGetDutyFunc registers a function to query duty data. - RegisterGetDutyFunc(func(context.Context, Duty) (FetchArgSet, error)) + // RegisterGetDutyDefinition registers a function to query duty definitions. + RegisterGetDutyDefinition(func(context.Context, Duty) (DutyDefinitionSet, error)) // RegisterParSigDB registers a function to store partially signed data sets. RegisterParSigDB(func(context.Context, Duty, ParSignedDataSet) error) @@ -146,9 +146,9 @@ type Broadcaster interface { // wireFuncs defines the core workflow components as a list input and output functions // instead as interfaces, since functions are easier to wrap than interfaces. type wireFuncs struct { - SchedulerSubscribe func(func(context.Context, Duty, FetchArgSet) error) - SchedulerGetDuty func(context.Context, Duty) (FetchArgSet, error) - FetcherFetch func(context.Context, Duty, FetchArgSet) error + SchedulerSubscribe func(func(context.Context, Duty, DutyDefinitionSet) error) + SchedulerGetDutyDefinition func(context.Context, Duty) (DutyDefinitionSet, error) + FetcherFetch func(context.Context, Duty, DutyDefinitionSet) error FetcherSubscribe func(func(context.Context, Duty, UnsignedDataSet) error) FetcherRegisterAggSigDB func(func(context.Context, Duty, PubKey) (SignedData, error)) ConsensusPropose func(context.Context, Duty, UnsignedDataSet) error @@ -159,7 +159,7 @@ type wireFuncs struct { DutyDBPubKeyByAttestation func(ctx context.Context, slot, commIdx, valCommIdx int64) (PubKey, error) VAPIRegisterAwaitAttestation func(func(ctx context.Context, slot, commIdx int64) (*eth2p0.AttestationData, error)) VAPIRegisterAwaitBeaconBlock func(func(ctx context.Context, slot int64) (*spec.VersionedBeaconBlock, error)) - VAPIRegisterGetDutyFunc func(func(context.Context, Duty) (FetchArgSet, error)) + VAPIRegisterGetDutyDefinition func(func(context.Context, Duty) (DutyDefinitionSet, error)) VAPIRegisterPubKeyByAttestation func(func(ctx context.Context, slot, commIdx, valCommIdx int64) (PubKey, error)) VAPIRegisterParSigDB func(func(context.Context, Duty, ParSignedDataSet) error) ParSigDBStoreInternal func(context.Context, Duty, ParSignedDataSet) error @@ -193,7 +193,7 @@ func Wire(sched Scheduler, ) { w := wireFuncs{ SchedulerSubscribe: sched.Subscribe, - SchedulerGetDuty: sched.GetDuty, + SchedulerGetDutyDefinition: sched.GetDutyDefinition, FetcherFetch: fetch.Fetch, FetcherSubscribe: fetch.Subscribe, FetcherRegisterAggSigDB: fetch.RegisterAggSigDB, @@ -205,7 +205,7 @@ func Wire(sched Scheduler, DutyDBPubKeyByAttestation: dutyDB.PubKeyByAttestation, VAPIRegisterAwaitBeaconBlock: vapi.RegisterAwaitBeaconBlock, VAPIRegisterAwaitAttestation: vapi.RegisterAwaitAttestation, - VAPIRegisterGetDutyFunc: vapi.RegisterGetDutyFunc, + VAPIRegisterGetDutyDefinition: vapi.RegisterGetDutyDefinition, VAPIRegisterPubKeyByAttestation: vapi.RegisterPubKeyByAttestation, VAPIRegisterParSigDB: vapi.RegisterParSigDB, ParSigDBStoreInternal: parSigDB.StoreInternal, @@ -231,7 +231,7 @@ func Wire(sched Scheduler, w.ConsensusSubscribe(w.DutyDBStore) w.VAPIRegisterAwaitBeaconBlock(w.DutyDBAwaitBeaconBlock) w.VAPIRegisterAwaitAttestation(w.DutyDBAwaitAttestation) - w.VAPIRegisterGetDutyFunc(w.SchedulerGetDuty) + w.VAPIRegisterGetDutyDefinition(w.SchedulerGetDutyDefinition) w.VAPIRegisterPubKeyByAttestation(w.DutyDBPubKeyByAttestation) w.VAPIRegisterParSigDB(w.ParSigDBStoreInternal) w.ParSigDBSubscribeInternal(w.ParSigExBroadcast) diff --git a/core/retry.go b/core/retry.go index 2fe0c2e09..3345f5681 100644 --- a/core/retry.go +++ b/core/retry.go @@ -25,7 +25,7 @@ import ( func WithAsyncRetry(retryer *retry.Retryer[Duty]) WireOption { return func(w *wireFuncs) { clone := *w - w.FetcherFetch = func(ctx context.Context, duty Duty, set FetchArgSet) error { + w.FetcherFetch = func(ctx context.Context, duty Duty, set DutyDefinitionSet) error { go retryer.DoAsync(ctx, duty, "fetcher fetch", func(ctx context.Context) error { return clone.FetcherFetch(ctx, duty, set) }) diff --git a/core/scheduler/metrics.go b/core/scheduler/metrics.go index 1949d88df..23a91e28f 100644 --- a/core/scheduler/metrics.go +++ b/core/scheduler/metrics.go @@ -52,8 +52,8 @@ func instrumentSlot(slot slot) { } // instrumentDuty increments the duty counter. -func instrumentDuty(duty core.Duty, argSet core.FetchArgSet) { - for pubkey := range argSet { +func instrumentDuty(duty core.Duty, defSet core.DutyDefinitionSet) { + for pubkey := range defSet { dutyCounter.WithLabelValues(duty.Type.String(), pubkey.String()).Inc() } } diff --git a/core/scheduler/scheduler.go b/core/scheduler/scheduler.go index f69abe29a..51ad4fef9 100644 --- a/core/scheduler/scheduler.go +++ b/core/scheduler/scheduler.go @@ -72,7 +72,7 @@ func New(pubkeys []core.PubKey, eth2Svc eth2client.Service) (*Scheduler, error) eth2Cl: eth2Cl, pubkeys: pubkeys, quit: make(chan struct{}), - duties: make(map[core.Duty]core.FetchArgSet), + duties: make(map[core.Duty]core.DutyDefinitionSet), clock: clockwork.NewRealClock(), delayFunc: func(_ core.Duty, deadline time.Time) <-chan time.Time { return time.After(time.Until(deadline)) @@ -88,14 +88,14 @@ type Scheduler struct { clock clockwork.Clock delayFunc delayFunc resolvedEpoch uint64 - duties map[core.Duty]core.FetchArgSet + duties map[core.Duty]core.DutyDefinitionSet dutiesMutex sync.Mutex - subs []func(context.Context, core.Duty, core.FetchArgSet) error + subs []func(context.Context, core.Duty, core.DutyDefinitionSet) error } // Subscribe registers a callback for triggering a duty. // Note this should be called *before* Start. -func (s *Scheduler) Subscribe(fn func(context.Context, core.Duty, core.FetchArgSet) error) { +func (s *Scheduler) Subscribe(fn func(context.Context, core.Duty, core.DutyDefinitionSet) error) { s.subs = append(s.subs, fn) } @@ -132,8 +132,8 @@ func (s *Scheduler) Run() error { } } -// GetDuty returns the argSet for a duty if resolved already, otherwise an error. -func (s *Scheduler) GetDuty(ctx context.Context, duty core.Duty) (core.FetchArgSet, error) { +// GetDutyDefinition returns the definition for a duty if resolved already, otherwise an error. +func (s *Scheduler) GetDutyDefinition(ctx context.Context, duty core.Duty) (core.DutyDefinitionSet, error) { slotsPerEpoch, err := s.eth2Cl.SlotsPerEpoch(ctx) if err != nil { return nil, err @@ -144,12 +144,12 @@ func (s *Scheduler) GetDuty(ctx context.Context, duty core.Duty) (core.FetchArgS return nil, errors.New("epoch not resolved yet") } - argSet, ok := s.getFetchArgSet(duty) + defSet, ok := s.getDutyDefinitionSet(duty) if !ok { return nil, errors.New("duty not resolved although epoch is marked as resolved") } - return argSet.Clone() // Clone before returning. + return defSet.Clone() // Clone before returning. } // scheduleSlot resolves upcoming duties and triggers resolved duties for the slot. @@ -167,7 +167,7 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) { Type: dutyType, } - argSet, ok := s.getFetchArgSet(duty) + defSet, ok := s.getDutyDefinitionSet(duty) if !ok { // Nothing for this duty. continue @@ -179,13 +179,13 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) { return // context cancelled } - instrumentDuty(duty, argSet) + instrumentDuty(duty, defSet) ctx = log.WithCtx(ctx, z.Any("duty", duty)) ctx, span := core.StartDutyTrace(ctx, duty, "core/scheduler.scheduleSlot") defer span.End() for _, sub := range s.subs { - clone, err := argSet.Clone() // Clone for each subscriber. + clone, err := defSet.Clone() // Clone for each subscriber. if err != nil { log.Error(ctx, "Cloning duty definition set", err) return @@ -281,7 +281,7 @@ func (s *Scheduler) resolveAttDuties(ctx context.Context, slot slot, vals valida continue } - if !s.setFetchArg(duty, pubkey, core.NewAttesterDefinition(attDuty)) { + if !s.setDutyDefinition(duty, pubkey, core.NewAttesterDefinition(attDuty)) { continue } @@ -323,7 +323,7 @@ func (s *Scheduler) resolveProDuties(ctx context.Context, slot slot, vals valida continue } - if !s.setFetchArg(duty, pubkey, core.NewProposerDefinition(proDuty)) { + if !s.setDutyDefinition(duty, pubkey, core.NewProposerDefinition(proDuty)) { continue } @@ -337,29 +337,29 @@ func (s *Scheduler) resolveProDuties(ctx context.Context, slot slot, vals valida return nil } -func (s *Scheduler) getFetchArgSet(duty core.Duty) (core.FetchArgSet, bool) { +func (s *Scheduler) getDutyDefinitionSet(duty core.Duty) (core.DutyDefinitionSet, bool) { s.dutiesMutex.Lock() defer s.dutiesMutex.Unlock() - argSet, ok := s.duties[duty] + defSet, ok := s.duties[duty] - return argSet, ok + return defSet, ok } -func (s *Scheduler) setFetchArg(duty core.Duty, pubkey core.PubKey, set core.FetchArg) bool { +func (s *Scheduler) setDutyDefinition(duty core.Duty, pubkey core.PubKey, set core.DutyDefinition) bool { s.dutiesMutex.Lock() defer s.dutiesMutex.Unlock() - argSet, ok := s.duties[duty] + defSet, ok := s.duties[duty] if !ok { - argSet = make(core.FetchArgSet) + defSet = make(core.DutyDefinitionSet) } - if _, ok := argSet[pubkey]; ok { + if _, ok := defSet[pubkey]; ok { return false } - argSet[pubkey] = set - s.duties[duty] = argSet + defSet[pubkey] = set + s.duties[duty] = defSet return true } diff --git a/core/scheduler/scheduler_test.go b/core/scheduler/scheduler_test.go index 1cc78ebd0..e67a29bca 100644 --- a/core/scheduler/scheduler_test.go +++ b/core/scheduler/scheduler_test.go @@ -74,7 +74,7 @@ func TestIntegration(t *testing.T) { count := 10 - s.Subscribe(func(ctx context.Context, duty core.Duty, set core.FetchArgSet) error { + s.Subscribe(func(ctx context.Context, duty core.Duty, set core.DutyDefinitionSet) error { for idx, data := range set { t.Logf("Duty triggered: vidx=%v slot=%v committee=%v\n", idx, duty.Slot, data.(core.AttesterDefinition).CommitteeIndex) } @@ -242,13 +242,13 @@ func TestSchedulerDuties(t *testing.T) { Time string DutyStr string `json:"duty"` Duty core.Duty `json:"-"` - DutyArgSet map[core.PubKey]string + DutyDefSet map[core.PubKey]string } var ( results []result mu sync.Mutex ) - sched.Subscribe(func(ctx context.Context, duty core.Duty, set core.FetchArgSet) error { + sched.Subscribe(func(ctx context.Context, duty core.Duty, set core.DutyDefinitionSet) error { // Make result human-readable resultSet := make(map[core.PubKey]string) for pubkey, def := range set { @@ -264,7 +264,7 @@ func TestSchedulerDuties(t *testing.T) { results = append(results, result{ Duty: duty, DutyStr: duty.String(), - DutyArgSet: resultSet, + DutyDefSet: resultSet, }) if len(results) == test.Results { @@ -316,7 +316,7 @@ func TestScheduler_GetDuty(t *testing.T) { clock := newTestClock(t0) sched := scheduler.NewForT(t, clock, new(delayer).delay, pubkeys, eth2Cl) - _, err = sched.GetDuty(context.Background(), core.Duty{Slot: 0, Type: core.DutyAttester}) + _, err = sched.GetDutyDefinition(context.Background(), core.Duty{Slot: 0, Type: core.DutyAttester}) // due to current design we will return an error if we request the duty of a slot that has not been resolved // by the scheduler yet. With DutyResolver, we will have always an answer require.Error(t, err, "epoch not resolved yet") @@ -325,7 +325,7 @@ func TestScheduler_GetDuty(t *testing.T) { require.NoError(t, err) clock.CallbackAfter(t0.Add(slotDuration).Add(time.Second), func() { - res, err := sched.GetDuty(context.Background(), core.Duty{Slot: 0, Type: core.DutyAttester}) + res, err := sched.GetDutyDefinition(context.Background(), core.Duty{Slot: 0, Type: core.DutyAttester}) require.NoError(t, err) diff --git a/core/scheduler/testdata/TestSchedulerDuties_grouped.golden b/core/scheduler/testdata/TestSchedulerDuties_grouped.golden index 30aba5fd2..ed8d13241 100644 --- a/core/scheduler/testdata/TestSchedulerDuties_grouped.golden +++ b/core/scheduler/testdata/TestSchedulerDuties_grouped.golden @@ -2,14 +2,14 @@ { "Time": "00:00.000", "duty": "0/proposer", - "DutyArgSet": { + "DutyDefSet": { "0x914cff835a769156ba43ad50b931083c2dadd94e8359ce394bc7a3e06424d0214922ddf15f81640530b9c25c0bc0d490": "{\"pubkey\":\"0x914cff835a769156ba43ad50b931083c2dadd94e8359ce394bc7a3e06424d0214922ddf15f81640530b9c25c0bc0d490\",\"slot\":\"0\",\"validator_index\":\"1\"}" } }, { "Time": "00:04.000", "duty": "0/attester", - "DutyArgSet": { + "DutyDefSet": { "0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea": "{\"pubkey\":\"0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea\",\"slot\":\"0\",\"validator_index\":\"2\",\"committee_index\":\"0\",\"committee_length\":\"16\",\"committees_at_slot\":\"16\",\"validator_committee_index\":\"2\"}", "0x8ee91545183c8c2db86633626f5074fd8ef93c4c9b7a2879ad1768f600c5b5906c3af20d47de42c3b032956fa8db1a76": "{\"pubkey\":\"0x8ee91545183c8c2db86633626f5074fd8ef93c4c9b7a2879ad1768f600c5b5906c3af20d47de42c3b032956fa8db1a76\",\"slot\":\"0\",\"validator_index\":\"3\",\"committee_index\":\"0\",\"committee_length\":\"16\",\"committees_at_slot\":\"16\",\"validator_committee_index\":\"3\"}", "0x914cff835a769156ba43ad50b931083c2dadd94e8359ce394bc7a3e06424d0214922ddf15f81640530b9c25c0bc0d490": "{\"pubkey\":\"0x914cff835a769156ba43ad50b931083c2dadd94e8359ce394bc7a3e06424d0214922ddf15f81640530b9c25c0bc0d490\",\"slot\":\"0\",\"validator_index\":\"1\",\"committee_index\":\"0\",\"committee_length\":\"16\",\"committees_at_slot\":\"16\",\"validator_committee_index\":\"1\"}" diff --git a/core/scheduler/testdata/TestSchedulerDuties_spread.golden b/core/scheduler/testdata/TestSchedulerDuties_spread.golden index e639ebc05..b141bfd2d 100644 --- a/core/scheduler/testdata/TestSchedulerDuties_spread.golden +++ b/core/scheduler/testdata/TestSchedulerDuties_spread.golden @@ -2,42 +2,42 @@ { "Time": "00:00.000", "duty": "0/proposer", - "DutyArgSet": { + "DutyDefSet": { "0x914cff835a769156ba43ad50b931083c2dadd94e8359ce394bc7a3e06424d0214922ddf15f81640530b9c25c0bc0d490": "{\"pubkey\":\"0x914cff835a769156ba43ad50b931083c2dadd94e8359ce394bc7a3e06424d0214922ddf15f81640530b9c25c0bc0d490\",\"slot\":\"0\",\"validator_index\":\"1\"}" } }, { "Time": "00:04.000", "duty": "0/attester", - "DutyArgSet": { + "DutyDefSet": { "0x914cff835a769156ba43ad50b931083c2dadd94e8359ce394bc7a3e06424d0214922ddf15f81640530b9c25c0bc0d490": "{\"pubkey\":\"0x914cff835a769156ba43ad50b931083c2dadd94e8359ce394bc7a3e06424d0214922ddf15f81640530b9c25c0bc0d490\",\"slot\":\"0\",\"validator_index\":\"1\",\"committee_index\":\"0\",\"committee_length\":\"16\",\"committees_at_slot\":\"16\",\"validator_committee_index\":\"1\"}" } }, { "Time": "00:00.000", "duty": "1/proposer", - "DutyArgSet": { + "DutyDefSet": { "0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea": "{\"pubkey\":\"0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea\",\"slot\":\"1\",\"validator_index\":\"2\"}" } }, { "Time": "00:16.000", "duty": "1/attester", - "DutyArgSet": { + "DutyDefSet": { "0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea": "{\"pubkey\":\"0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea\",\"slot\":\"1\",\"validator_index\":\"2\",\"committee_index\":\"1\",\"committee_length\":\"16\",\"committees_at_slot\":\"16\",\"validator_committee_index\":\"2\"}" } }, { "Time": "00:00.000", "duty": "2/proposer", - "DutyArgSet": { + "DutyDefSet": { "0x8ee91545183c8c2db86633626f5074fd8ef93c4c9b7a2879ad1768f600c5b5906c3af20d47de42c3b032956fa8db1a76": "{\"pubkey\":\"0x8ee91545183c8c2db86633626f5074fd8ef93c4c9b7a2879ad1768f600c5b5906c3af20d47de42c3b032956fa8db1a76\",\"slot\":\"2\",\"validator_index\":\"3\"}" } }, { "Time": "00:28.000", "duty": "2/attester", - "DutyArgSet": { + "DutyDefSet": { "0x8ee91545183c8c2db86633626f5074fd8ef93c4c9b7a2879ad1768f600c5b5906c3af20d47de42c3b032956fa8db1a76": "{\"pubkey\":\"0x8ee91545183c8c2db86633626f5074fd8ef93c4c9b7a2879ad1768f600c5b5906c3af20d47de42c3b032956fa8db1a76\",\"slot\":\"2\",\"validator_index\":\"3\",\"committee_index\":\"2\",\"committee_length\":\"16\",\"committees_at_slot\":\"16\",\"validator_committee_index\":\"3\"}" } } diff --git a/core/scheduler/testdata/TestSchedulerDuties_spread_errors.golden b/core/scheduler/testdata/TestSchedulerDuties_spread_errors.golden index 0a84871fc..c5cccbc9e 100644 --- a/core/scheduler/testdata/TestSchedulerDuties_spread_errors.golden +++ b/core/scheduler/testdata/TestSchedulerDuties_spread_errors.golden @@ -2,35 +2,35 @@ { "Time": "00:04.000", "duty": "0/attester", - "DutyArgSet": { + "DutyDefSet": { "0x914cff835a769156ba43ad50b931083c2dadd94e8359ce394bc7a3e06424d0214922ddf15f81640530b9c25c0bc0d490": "{\"pubkey\":\"0x914cff835a769156ba43ad50b931083c2dadd94e8359ce394bc7a3e06424d0214922ddf15f81640530b9c25c0bc0d490\",\"slot\":\"0\",\"validator_index\":\"1\",\"committee_index\":\"0\",\"committee_length\":\"16\",\"committees_at_slot\":\"16\",\"validator_committee_index\":\"1\"}" } }, { "Time": "00:00.000", "duty": "1/proposer", - "DutyArgSet": { + "DutyDefSet": { "0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea": "{\"pubkey\":\"0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea\",\"slot\":\"1\",\"validator_index\":\"2\"}" } }, { "Time": "00:16.000", "duty": "1/attester", - "DutyArgSet": { + "DutyDefSet": { "0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea": "{\"pubkey\":\"0x8dae41352b69f2b3a1c0b05330c1bf65f03730c520273028864b11fcb94d8ce8f26d64f979a0ee3025467f45fd2241ea\",\"slot\":\"1\",\"validator_index\":\"2\",\"committee_index\":\"1\",\"committee_length\":\"16\",\"committees_at_slot\":\"16\",\"validator_committee_index\":\"2\"}" } }, { "Time": "00:00.000", "duty": "2/proposer", - "DutyArgSet": { + "DutyDefSet": { "0x8ee91545183c8c2db86633626f5074fd8ef93c4c9b7a2879ad1768f600c5b5906c3af20d47de42c3b032956fa8db1a76": "{\"pubkey\":\"0x8ee91545183c8c2db86633626f5074fd8ef93c4c9b7a2879ad1768f600c5b5906c3af20d47de42c3b032956fa8db1a76\",\"slot\":\"2\",\"validator_index\":\"3\"}" } }, { "Time": "00:28.000", "duty": "2/attester", - "DutyArgSet": { + "DutyDefSet": { "0x8ee91545183c8c2db86633626f5074fd8ef93c4c9b7a2879ad1768f600c5b5906c3af20d47de42c3b032956fa8db1a76": "{\"pubkey\":\"0x8ee91545183c8c2db86633626f5074fd8ef93c4c9b7a2879ad1768f600c5b5906c3af20d47de42c3b032956fa8db1a76\",\"slot\":\"2\",\"validator_index\":\"3\",\"committee_index\":\"2\",\"committee_length\":\"16\",\"committees_at_slot\":\"16\",\"validator_committee_index\":\"3\"}" } } diff --git a/core/tracing.go b/core/tracing.go index 1596fd1ec..d8a034a4a 100644 --- a/core/tracing.go +++ b/core/tracing.go @@ -66,7 +66,7 @@ func WithTracing() WireOption { return func(w *wireFuncs) { clone := *w - w.FetcherFetch = func(parent context.Context, duty Duty, set FetchArgSet) error { + w.FetcherFetch = func(parent context.Context, duty Duty, set DutyDefinitionSet) error { ctx, span := tracer.Start(parent, "core/fetcher.Fetch") defer span.End() diff --git a/core/types.go b/core/types.go index b79e27df8..a6c1045f3 100644 --- a/core/types.go +++ b/core/types.go @@ -168,26 +168,23 @@ func (k PubKey) ToETH2() (eth2p0.BLSPubKey, error) { return resp, nil } -// FetchArg defines a duty containing the parameters required +// DutyDefinition defines the duty including parameters required // to fetch the duty data, it is the result of resolving duties // at the start of an epoch. -// TODO(corver): Rename to DutyDefinition. -type FetchArg interface { - // Clone returns a cloned copy of the FetchArg. For an immutable core workflow architecture, - // remember to clone data when it leaves the current scope (sharing, storing, returning, etc). - Clone() (FetchArg, error) - // Marshaler returns the json serialised unsigned duty data. +type DutyDefinition interface { + // Clone returns a cloned copy of the DutyDefinition. + Clone() (DutyDefinition, error) + // Marshaler returns the json serialised duty definition. json.Marshaler } -// FetchArgSet is a set of fetch args, one per validator. -// TODO(corver): Rename to DutyDefinitionSet. -type FetchArgSet map[PubKey]FetchArg +// DutyDefinitionSet is a set of duty definitions, one per validator. +type DutyDefinitionSet map[PubKey]DutyDefinition -// Clone returns a cloned copy of the FetchArgSet. For an immutable core workflow architecture, +// Clone returns a cloned copy of the DutyDefinitionSet. For an immutable core workflow architecture, // remember to clone data when it leaves the current scope (sharing, storing, returning, etc). -func (s FetchArgSet) Clone() (FetchArgSet, error) { - resp := make(FetchArgSet, len(s)) +func (s DutyDefinitionSet) Clone() (DutyDefinitionSet, error) { + resp := make(DutyDefinitionSet, len(s)) for key, data := range s { var err error resp[key], err = data.Clone() diff --git a/core/validatorapi/validatorapi.go b/core/validatorapi/validatorapi.go index bfda9531e..b88454700 100644 --- a/core/validatorapi/validatorapi.go +++ b/core/validatorapi/validatorapi.go @@ -156,7 +156,7 @@ type Component struct { pubKeyByAttFunc func(ctx context.Context, slot, commIdx, valCommIdx int64) (core.PubKey, error) awaitAttFunc func(ctx context.Context, slot, commIdx int64) (*eth2p0.AttestationData, error) awaitBlockFunc func(ctx context.Context, slot int64) (*spec.VersionedBeaconBlock, error) - getDutyFunc func(ctx context.Context, duty core.Duty) (core.FetchArgSet, error) + dutyDefFunc func(ctx context.Context, duty core.Duty) (core.DutyDefinitionSet, error) parSigDBFuncs []func(context.Context, core.Duty, core.ParSignedDataSet) error } @@ -191,10 +191,10 @@ func (c *Component) RegisterPubKeyByAttestation(fn func(ctx context.Context, slo c.pubKeyByAttFunc = fn } -// RegisterGetDutyFunc registers a function to query duty data by duty. +// RegisterDutyDefinitionFunc registers a function to query duty definitions. // It supports a single function, since it is an input of the component. -func (c *Component) RegisterGetDutyFunc(fn func(ctx context.Context, duty core.Duty) (core.FetchArgSet, error)) { - c.getDutyFunc = fn +func (c *Component) RegisterGetDutyDefinition(fn func(ctx context.Context, duty core.Duty) (core.DutyDefinitionSet, error)) { + c.dutyDefFunc = fn } // RegisterParSigDB registers a partial signed data set store function. @@ -635,16 +635,16 @@ func (c Component) epochFromSlot(ctx context.Context, slot eth2p0.Slot) (eth2p0. func (c Component) getProposerPubkey(ctx context.Context, slot eth2p0.Slot) (core.PubKey, error) { // Get proposer pubkey (this is a blocking query). - dutySet, err := c.getDutyFunc(ctx, core.Duty{Slot: int64(slot), Type: core.DutyProposer}) + defSet, err := c.dutyDefFunc(ctx, core.NewProposerDuty(int64(slot))) if err != nil { return "", err - } else if len(dutySet) != 1 { + } else if len(defSet) != 1 { return "", errors.New("unexpected amount of proposer duties") } // There should be single duty proposer for the slot var pubkey core.PubKey - for pk := range dutySet { + for pk := range defSet { pubkey = pk } diff --git a/core/validatorapi/validatorapi_test.go b/core/validatorapi/validatorapi_test.go index b1d87e8d7..98f7bb1f3 100644 --- a/core/validatorapi/validatorapi_test.go +++ b/core/validatorapi/validatorapi_test.go @@ -351,8 +351,8 @@ func TestComponent_BeaconBlockProposal(t *testing.T) { block1.Phase0.ProposerIndex = vIdx block1.Phase0.Body.RANDAOReveal = randao - component.RegisterGetDutyFunc(func(ctx context.Context, duty core.Duty) (core.FetchArgSet, error) { - return core.FetchArgSet{pubkey: nil}, nil + component.RegisterGetDutyDefinition(func(ctx context.Context, duty core.Duty) (core.DutyDefinitionSet, error) { + return core.DutyDefinitionSet{pubkey: nil}, nil }) component.RegisterAwaitBeaconBlock(func(ctx context.Context, slot int64) (*spec.VersionedBeaconBlock, error) { @@ -413,8 +413,8 @@ func TestComponent_SubmitBeaconBlock(t *testing.T) { unsignedBlock.Phase0.Slot = slot unsignedBlock.Phase0.ProposerIndex = vIdx - vapi.RegisterGetDutyFunc(func(ctx context.Context, duty core.Duty) (core.FetchArgSet, error) { - return core.FetchArgSet{corePubKey: nil}, nil + vapi.RegisterGetDutyDefinition(func(ctx context.Context, duty core.Duty) (core.DutyDefinitionSet, error) { + return core.DutyDefinitionSet{corePubKey: nil}, nil }) // Sign beacon block @@ -491,8 +491,8 @@ func TestComponent_SubmitBeaconBlockInvalidSignature(t *testing.T) { unsignedBlock.Phase0.Slot = slot unsignedBlock.Phase0.ProposerIndex = vIdx - vapi.RegisterGetDutyFunc(func(ctx context.Context, duty core.Duty) (core.FetchArgSet, error) { - return core.FetchArgSet{corePubKey: nil}, nil + vapi.RegisterGetDutyDefinition(func(ctx context.Context, duty core.Duty) (core.DutyDefinitionSet, error) { + return core.DutyDefinitionSet{corePubKey: nil}, nil }) // Add invalid Signature to beacon block @@ -541,8 +541,8 @@ func TestComponent_SubmitBeaconBlockInvalidBlock(t *testing.T) { vapi, err := validatorapi.NewComponent(bmock, pubShareByKey, 0) require.NoError(t, err) - vapi.RegisterGetDutyFunc(func(ctx context.Context, duty core.Duty) (core.FetchArgSet, error) { - return core.FetchArgSet{pubkey: nil}, nil + vapi.RegisterGetDutyDefinition(func(ctx context.Context, duty core.Duty) (core.DutyDefinitionSet, error) { + return core.DutyDefinitionSet{pubkey: nil}, nil }) // invalid block scenarios diff --git a/docs/architecture.md b/docs/architecture.md index 8d33e4abe..94ec377b7 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -117,7 +117,7 @@ We define the following duty types: - `DutyProposer = 1`: Proposing a block - `DutyAttester = 2`: Creating an attestation - `DutyRandao = 3`: Creating a randao reveal signature required as input to DutyProposer -- `DutyAggregator = 4`: Aggregating attestations +- `DutyExit = 4`: Voluntary exit > ℹ️ Duty is on a cluster level, not a DV level. A duty defines the “unit of work” for the whole cluster, > not just a single DV. This allows the workflow to aggregate and batch multiple DVs in some steps, specifically consensus. @@ -145,20 +145,38 @@ and [Get block proposer](https://ethereum.github.io/beacon-APIs/#/ValidatorRequi at the end of each epoch for the next epoch. It then caches the results returned and triggers the duty when the associated slot starts. -An abstract `FetchArg` type is defined that represents the json formatted responses returned by the beacon node above. +An abstract `DutyDefinition` type is defined that represents the json formatted responses returned by the beacon node above. +Note the ETH2 spec refers to these as "duties", but we use a slightly different term to avoid overloading +the term "duty" which we defined above. ```go -// FetchArg contains the arguments required to fetch the duty data, -// it is the result of resolving duties at the start of an epoch. -type FetchArg []byte +// DutyDefinition defines a duty including parameters required +// to fetch the duty data, it is the result of resolving duties +// at the start of an epoch. +type DutyDefinition interface { + // Clone returns a cloned copy of the DutyDefinition. + Clone() (DutyDefinition, error) + // Marshaler returns the json serialised duty definition. + json.Marshaler +} ``` + +The following `DutyDefinition` implementations are provided: + - `AttesterDefinition` which wraps "Get attester duties" response above. + - `ProposerDefinition` which wraps "Get block proposer" response above. + Since a cluster can contain multiple DVs, it may have to perform multiple similar duties for the same slot, e.g. `DutyAttester`. -Multiple `FetchArg`s are combined into a single `FetchArgSet` that is defined as: +Multiple `DutyDefinition`s are combined into a single `DutyDefinitionSet` that is defined as: ```go -// FetchArgSet is a set of fetch args, one per validator. -type FetchArgSet map[PubKey]FetchArg +// DutyDefinitionSet is a set of fetch args, one per validator. +type DutyDefinitionSet map[PubKey]DutyDefinition ``` -Note the `DutyRandao` isn’t scheduled by the scheduler, since it is initiated directly by VC at the start of the epoch. +Note the `DutyRandao` and `DutyExit` isn’t scheduled by the scheduler, since they are initiated directly by VC. + +> ℹ️ The core workflow follows the immutable architecture approach, with immutable self-contained values flowing between components. +> Values can be thought of as events or messages and components can be thought of as actors consuming and generating events. +> This however requires that values are immutable. All abstract value types therefore define a `Clone` method +> that must be called before a value leaves its scope (shared, cached, returned etc.). > 🏗️ TODO: Define the exact timing requirements for different duties. @@ -167,10 +185,10 @@ The scheduler interface is defined as: // Scheduler triggers the start of a duty workflow. type Scheduler interface { // Subscribe registers a callback for triggering a duty. - Subscribe(func(context.Context, Duty, FetchArgSet) error) + Subscribe(func(context.Context, Duty, DutyDefinitionSet) error) - // GetDuty returns the argSet for a duty if resolved already. - GetDuty(context.Context, Duty) (FetchArgSet, error) + // GetDutyDefinition returns the definition for a duty if already resolved. + GetDutyDefinition(context.Context, Duty) (DutyDefinitionSet, error) } ``` > ℹ️ Components of the workflow are decoupled from each other. They are stitched together by callback subscriptions. @@ -186,14 +204,18 @@ For `DutyProposer` it fetches a previously aggregated randao_reveal from the `Ag from the beacon node. An abstract `UnsignedData` type is defined to represent either `AttestationData` or `BeaconBlock` depending on the `DutyType`. -It contains the standard serialised json format of the data as returned from beacon node. ```go // UnsignedData represents an unsigned duty data object. -type UnsignedData []byte +type UnsignedData interface { + // Clone returns a cloned copy of the UnsignedData. + Clone() (UnsignedData, error) + // Marshaler returns the json serialised unsigned duty data. + json.Marshaler +} ``` -Since the input to fetcher is a `FetchArgSet`, it fetches multiple `UnsignedData` objects for the same `Duty`. +Since the input to fetcher is a `DutyDefinitionSet`, it fetches multiple `UnsignedData` objects for the same `Duty`. Multiple `UnsignedData`s are combined into a single `UnsignedDataSet` that is defined as: ```go // UnsignedDataSet is a set of unsigned duty data objects, one per validator. @@ -212,7 +234,7 @@ The fetcher interface is defined as: // Fetcher fetches proposed duty data. type Fetcher interface { // Fetch triggers fetching of a proposed duty data set. - Fetch(context.Context, Duty, FetchArgSet) error + Fetch(context.Context, Duty, DutyDefinitionSet) error // Subscribe registers a callback for proposed duty data sets. Subscribe(func(context.Context, Duty, UnsignedDataSet) error) @@ -315,19 +337,32 @@ The validator API provides a [beacon-node API](https://ethereum.github.io/beacon intercepting some calls and proxying others directly to the upstream beacon node. It mostly serves unsigned duty data requests from the `DutyDB` and sends the resulting partial signed duty objects to the `ParSigDB`. -Partial signed duty data objects are defined as `ParSignedData`: +Partial signed duty data values are defined as `ParSignedData` which extend `SignedData` values: ```go -// ParSignedData is a partially signed duty data. -// Partial refers to it being signed by a single share of the BLS threshold signing scheme. +// SignedData is a signed duty data. +type SignedData interface { + // Signature returns the signed duty data's signature. + Signature() Signature + // SetSignature returns a copy of signed duty data with the signature replaced. + SetSignature(Signature) (SignedData, error) + // Clone returns a cloned copy of the SignedData. + Clone() (SignedData, error) + // Marshaler returns the json serialised signed duty data (including the signature). + json.Marshaler +} + +// ParSignedData is a partially signed duty data only signed by a single threshold BLS share. type ParSignedData struct { - // Data is the partially signed duty data received from VC. - Data []byte - // Signature of tbls share extracted from data. - Signature []byte - // Index of the tbls share. - Index int + // SignedData is a partially signed duty data. + SignedData + // ShareIdx returns the threshold BLS share index. + ShareIdx int } + ``` + +The following `SignedData` implementations are provided: `Attestation`, `VersionedSignedBeaconBlock`, `SignedVoluntaryExit`, and `Signature` which is just a signature without any data used for `DutyRandao`. + Multiple `ParSignedData` are combined into a single `ParSignedDataSet` defines as follows: ```go // ParSignedDataSet is a set of partially signed duty data objects, one per validator. @@ -370,7 +405,7 @@ the public share (what the VC thinks as its public key) to and from the DV root > 🏗️ TODO: Figure out other endpoints required. The validator api interface is defined as: -``` +```go // ValidatorAPI provides a beacon node API to validator clients. It serves duty data from the // DutyDB and stores partial signed data in the ParSigDB. type ValidatorAPI interface { @@ -378,7 +413,7 @@ type ValidatorAPI interface { RegisterAwaitBeaconBlock(func(context.Context, slot int) (beaconapi.BeaconBlock, error)) // RegisterGetDutyFunc registers a function to query duty data. - RegisterGetDutyFunc(func(ctx context.Context, duty Duty) (FetchArgSet, error)) + RegisterGetDutyFunc(func(ctx context.Context, duty Duty) (DutyDefinitionSet, error)) // RegisterAwaitAttestation registers a function to query attestation data. RegisterAwaitAttestation(func(context.Context, slot int, commIdx int) (*beaconapi.AttestationData, error)) @@ -401,7 +436,7 @@ Duties originating in the `scheduler` (`DutyAttester`, `DutyProposer`) are not s Instead of waiting for the `validatorapi` to submit signatures, these duties directly request signatures from the remote signer instance. The flow is otherwise unaffected. -Duties originating in the `validatorapi` (`DutyRandao`, `DutyAggregator`) has to refactored to +Duties originating in the `validatorapi` (`DutyRandao`, `DutyExit`) has to refactored to originate in the `scheduler`, since charon is in full control of the duties in this architecture. The overall core workflow remains the same, `scheduler` just schedules all the duties. @@ -508,17 +543,7 @@ type ParSigEx interface { The signature aggregation service aggregates partial BLS signatures and sends them to the `bcast` component and persists them to the `AggSigDB`. It is a stateless pure function. -Aggregated signed duty data objects are defined as `SignedData`: -```go -// SignedData is an aggregated signed duty data. -// Aggregated refers to it being signed by the aggregated BLS threshold signing scheme. -type SignedData struct { - // Data is the signed duty data to be sent to beacon chain. - Data []byte - // Signature is the result of tbls aggregation and is inserted into the data. - Signature []byte -} -``` +Aggregated signed duty data objects are defined as `SignedData` defined above. The signature aggregation interface is defined as: ```go diff --git a/docs/structure.md b/docs/structure.md index 3378e8b6b..ea8939876 100644 --- a/docs/structure.md +++ b/docs/structure.md @@ -27,7 +27,7 @@ charon/ # project root │ ├─ core/ # core workflow; charon business logic (see architecture doc for details) │ ├─ interfaces.go # component interfaces: Scheduler, Fetcher, Consensus, etc. -│ ├─ types.go # core workflow types: Duty, PubKey, FetchArg, UnsignedData, etc. +│ ├─ types.go # core workflow types: Duty, PubKey, DutyDefinition, UnsignedData, etc. │ ├─ encode.go # encode/decode the abstract types with type safe API │ │ │ │ # core workflow component implementations