Skip to content

Commit

Permalink
core: rename fetch args and update docs (#717)
Browse files Browse the repository at this point in the history
Renames `FetchArgs` to `DutyDefinition` and update docs.

category: refactor
ticket: #698
  • Loading branch information
corverroos authored Jun 15, 2022
1 parent e144345 commit 27022e1
Show file tree
Hide file tree
Showing 18 changed files with 167 additions and 145 deletions.
2 changes: 1 addition & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func wireValidatorMock(conf Config, pubshares []eth2p0.BLSPubKey, sched core.Sch
signer := validatormock.NewSigner(secrets...)

// Trigger validatormock when scheduler triggers new slot.
sched.Subscribe(func(ctx context.Context, duty core.Duty, _ core.FetchArgSet) error {
sched.Subscribe(func(ctx context.Context, duty core.Duty, _ core.DutyDefinitionSet) error {
ctx = log.WithTopic(ctx, "vmock")
go func() {
addr := "http://" + conf.ValidatorAPIAddr
Expand Down
12 changes: 6 additions & 6 deletions core/dutydefinition.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@ import (
)

var (
_ FetchArg = AttesterDefinition{}
_ FetchArg = ProposerDefinition{}
_ DutyDefinition = AttesterDefinition{}
_ DutyDefinition = ProposerDefinition{}
)

// NewAttesterDefinition is a convenience function that returns a new attester definition.
func NewAttesterDefinition(duty *eth2v1.AttesterDuty) AttesterDefinition {
return AttesterDefinition{AttesterDuty: *duty}
}

// AttesterDefinition defines an attester duty. It implements FetchArg.
// AttesterDefinition defines an attester duty. It implements DutyDefinition.
// Note the slight rename from Duty to Definition to avoid overloading the term Duty.
type AttesterDefinition struct {
eth2v1.AttesterDuty
}

func (d AttesterDefinition) Clone() (FetchArg, error) {
func (d AttesterDefinition) Clone() (DutyDefinition, error) {
duty := new(eth2v1.AttesterDuty)
err := cloneJSONMarshaler(&d.AttesterDuty, duty)
if err != nil {
Expand All @@ -56,13 +56,13 @@ func NewProposerDefinition(duty *eth2v1.ProposerDuty) ProposerDefinition {
return ProposerDefinition{ProposerDuty: *duty}
}

// ProposerDefinition defines a block proposer duty. It implements FetchArg.
// ProposerDefinition defines a block proposer duty. It implements DutyDefinition.
// Note the slight rename from Duty to Definition to avoid overloading the term Duty.
type ProposerDefinition struct {
eth2v1.ProposerDuty
}

func (d ProposerDefinition) Clone() (FetchArg, error) {
func (d ProposerDefinition) Clone() (DutyDefinition, error) {
duty := new(eth2v1.ProposerDuty)
err := cloneJSONMarshaler(&d.ProposerDuty, duty)
if err != nil {
Expand Down
16 changes: 8 additions & 8 deletions core/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,20 @@ func (f *Fetcher) Subscribe(fn func(context.Context, core.Duty, core.UnsignedDat
}

// Fetch triggers fetching of a proposed duty data set.
func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, argSet core.FetchArgSet) error {
func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDefinitionSet) error {
var (
unsignedSet core.UnsignedDataSet
err error
)

switch duty.Type {
case core.DutyProposer:
unsignedSet, err = f.fetchProposerData(ctx, duty.Slot, argSet)
unsignedSet, err = f.fetchProposerData(ctx, duty.Slot, defSet)
if err != nil {
return errors.Wrap(err, "fetch proposer data")
}
case core.DutyAttester:
unsignedSet, err = f.fetchAttesterData(ctx, duty.Slot, argSet)
unsignedSet, err = f.fetchAttesterData(ctx, duty.Slot, defSet)
if err != nil {
return errors.Wrap(err, "fetch attester data")
}
Expand Down Expand Up @@ -100,14 +100,14 @@ func (f *Fetcher) RegisterAggSigDB(fn func(context.Context, core.Duty, core.PubK
}

// fetchAttesterData returns the fetched attestation data set for committees and validators in the arg set.
func (f *Fetcher) fetchAttesterData(ctx context.Context, slot int64, argSet core.FetchArgSet,
func (f *Fetcher) fetchAttesterData(ctx context.Context, slot int64, defSet core.DutyDefinitionSet,
) (core.UnsignedDataSet, error) {
// We may have multiple validators in the same committee, use the same attestation data in that case.
dataByCommIdx := make(map[eth2p0.CommitteeIndex]*eth2p0.AttestationData)

resp := make(core.UnsignedDataSet)
for pubkey, fetchArg := range argSet {
attDuty, ok := fetchArg.(core.AttesterDefinition)
for pubkey, def := range defSet {
attDuty, ok := def.(core.AttesterDefinition)
if !ok {
return nil, errors.New("invalid attester definition")
}
Expand All @@ -134,9 +134,9 @@ func (f *Fetcher) fetchAttesterData(ctx context.Context, slot int64, argSet core
return resp, nil
}

func (f *Fetcher) fetchProposerData(ctx context.Context, slot int64, argSet core.FetchArgSet) (core.UnsignedDataSet, error) {
func (f *Fetcher) fetchProposerData(ctx context.Context, slot int64, defSet core.DutyDefinitionSet) (core.UnsignedDataSet, error) {
resp := make(core.UnsignedDataSet)
for pubkey := range argSet {
for pubkey := range defSet {
// Fetch previously aggregated randao reveal from AggSigDB
dutyRandao := core.Duty{
Slot: slot,
Expand Down
8 changes: 4 additions & 4 deletions core/fetcher/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestFetchAttester(t *testing.T) {
CommitteesAtSlot: notZero,
}

argSet := core.FetchArgSet{
defSet := core.DutyDefinitionSet{
pubkeysByIdx[vIdxA]: core.NewAttesterDefinition(&dutyA),
pubkeysByIdx[vIdxB]: core.NewAttesterDefinition(&dutyB),
}
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestFetchAttester(t *testing.T) {
return nil
})

err = fetch.Fetch(ctx, duty, argSet)
err = fetch.Fetch(ctx, duty, defSet)
require.NoError(t, err)
}

Expand All @@ -115,7 +115,7 @@ func TestFetchProposer(t *testing.T) {
Slot: slot,
ValidatorIndex: vIdxB,
}
argSet := core.FetchArgSet{
defSet := core.DutyDefinitionSet{
pubkeysByIdx[vIdxA]: core.NewProposerDefinition(&dutyA),
pubkeysByIdx[vIdxB]: core.NewProposerDefinition(&dutyB),
}
Expand Down Expand Up @@ -156,7 +156,7 @@ func TestFetchProposer(t *testing.T) {
return nil
})

err = fetch.Fetch(ctx, duty, argSet)
err = fetch.Fetch(ctx, duty, defSet)
require.NoError(t, err)
}

Expand Down
26 changes: 13 additions & 13 deletions core/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ import (
// Scheduler triggers the start of a duty workflow.
type Scheduler interface {
// Subscribe registers a callback for fetching a duty.
Subscribe(func(context.Context, Duty, FetchArgSet) error)
Subscribe(func(context.Context, Duty, DutyDefinitionSet) error)

// GetDuty returns the argSet for a duty if resolved already.
GetDuty(context.Context, Duty) (FetchArgSet, error)
// GetDutyDefinition returns the definition set for a duty if already resolved.
GetDutyDefinition(context.Context, Duty) (DutyDefinitionSet, error)
}

// Fetcher fetches proposed unsigned duty data.
type Fetcher interface {
// Fetch triggers fetching of a proposed duty data set.
Fetch(context.Context, Duty, FetchArgSet) error
Fetch(context.Context, Duty, DutyDefinitionSet) error

// Subscribe registers a callback for proposed unsigned duty data sets.
Subscribe(func(context.Context, Duty, UnsignedDataSet) error)
Expand Down Expand Up @@ -85,8 +85,8 @@ type ValidatorAPI interface {
// RegisterPubKeyByAttestation registers a function to query validator by attestation.
RegisterPubKeyByAttestation(func(ctx context.Context, slot, commIdx, valCommIdx int64) (PubKey, error))

// RegisterGetDutyFunc registers a function to query duty data.
RegisterGetDutyFunc(func(context.Context, Duty) (FetchArgSet, error))
// RegisterGetDutyDefinition registers a function to query duty definitions.
RegisterGetDutyDefinition(func(context.Context, Duty) (DutyDefinitionSet, error))

// RegisterParSigDB registers a function to store partially signed data sets.
RegisterParSigDB(func(context.Context, Duty, ParSignedDataSet) error)
Expand Down Expand Up @@ -146,9 +146,9 @@ type Broadcaster interface {
// wireFuncs defines the core workflow components as a list input and output functions
// instead as interfaces, since functions are easier to wrap than interfaces.
type wireFuncs struct {
SchedulerSubscribe func(func(context.Context, Duty, FetchArgSet) error)
SchedulerGetDuty func(context.Context, Duty) (FetchArgSet, error)
FetcherFetch func(context.Context, Duty, FetchArgSet) error
SchedulerSubscribe func(func(context.Context, Duty, DutyDefinitionSet) error)
SchedulerGetDutyDefinition func(context.Context, Duty) (DutyDefinitionSet, error)
FetcherFetch func(context.Context, Duty, DutyDefinitionSet) error
FetcherSubscribe func(func(context.Context, Duty, UnsignedDataSet) error)
FetcherRegisterAggSigDB func(func(context.Context, Duty, PubKey) (SignedData, error))
ConsensusPropose func(context.Context, Duty, UnsignedDataSet) error
Expand All @@ -159,7 +159,7 @@ type wireFuncs struct {
DutyDBPubKeyByAttestation func(ctx context.Context, slot, commIdx, valCommIdx int64) (PubKey, error)
VAPIRegisterAwaitAttestation func(func(ctx context.Context, slot, commIdx int64) (*eth2p0.AttestationData, error))
VAPIRegisterAwaitBeaconBlock func(func(ctx context.Context, slot int64) (*spec.VersionedBeaconBlock, error))
VAPIRegisterGetDutyFunc func(func(context.Context, Duty) (FetchArgSet, error))
VAPIRegisterGetDutyDefinition func(func(context.Context, Duty) (DutyDefinitionSet, error))
VAPIRegisterPubKeyByAttestation func(func(ctx context.Context, slot, commIdx, valCommIdx int64) (PubKey, error))
VAPIRegisterParSigDB func(func(context.Context, Duty, ParSignedDataSet) error)
ParSigDBStoreInternal func(context.Context, Duty, ParSignedDataSet) error
Expand Down Expand Up @@ -193,7 +193,7 @@ func Wire(sched Scheduler,
) {
w := wireFuncs{
SchedulerSubscribe: sched.Subscribe,
SchedulerGetDuty: sched.GetDuty,
SchedulerGetDutyDefinition: sched.GetDutyDefinition,
FetcherFetch: fetch.Fetch,
FetcherSubscribe: fetch.Subscribe,
FetcherRegisterAggSigDB: fetch.RegisterAggSigDB,
Expand All @@ -205,7 +205,7 @@ func Wire(sched Scheduler,
DutyDBPubKeyByAttestation: dutyDB.PubKeyByAttestation,
VAPIRegisterAwaitBeaconBlock: vapi.RegisterAwaitBeaconBlock,
VAPIRegisterAwaitAttestation: vapi.RegisterAwaitAttestation,
VAPIRegisterGetDutyFunc: vapi.RegisterGetDutyFunc,
VAPIRegisterGetDutyDefinition: vapi.RegisterGetDutyDefinition,
VAPIRegisterPubKeyByAttestation: vapi.RegisterPubKeyByAttestation,
VAPIRegisterParSigDB: vapi.RegisterParSigDB,
ParSigDBStoreInternal: parSigDB.StoreInternal,
Expand All @@ -231,7 +231,7 @@ func Wire(sched Scheduler,
w.ConsensusSubscribe(w.DutyDBStore)
w.VAPIRegisterAwaitBeaconBlock(w.DutyDBAwaitBeaconBlock)
w.VAPIRegisterAwaitAttestation(w.DutyDBAwaitAttestation)
w.VAPIRegisterGetDutyFunc(w.SchedulerGetDuty)
w.VAPIRegisterGetDutyDefinition(w.SchedulerGetDutyDefinition)
w.VAPIRegisterPubKeyByAttestation(w.DutyDBPubKeyByAttestation)
w.VAPIRegisterParSigDB(w.ParSigDBStoreInternal)
w.ParSigDBSubscribeInternal(w.ParSigExBroadcast)
Expand Down
2 changes: 1 addition & 1 deletion core/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
func WithAsyncRetry(retryer *retry.Retryer[Duty]) WireOption {
return func(w *wireFuncs) {
clone := *w
w.FetcherFetch = func(ctx context.Context, duty Duty, set FetchArgSet) error {
w.FetcherFetch = func(ctx context.Context, duty Duty, set DutyDefinitionSet) error {
go retryer.DoAsync(ctx, duty, "fetcher fetch", func(ctx context.Context) error {
return clone.FetcherFetch(ctx, duty, set)
})
Expand Down
4 changes: 2 additions & 2 deletions core/scheduler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func instrumentSlot(slot slot) {
}

// instrumentDuty increments the duty counter.
func instrumentDuty(duty core.Duty, argSet core.FetchArgSet) {
for pubkey := range argSet {
func instrumentDuty(duty core.Duty, defSet core.DutyDefinitionSet) {
for pubkey := range defSet {
dutyCounter.WithLabelValues(duty.Type.String(), pubkey.String()).Inc()
}
}
44 changes: 22 additions & 22 deletions core/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func New(pubkeys []core.PubKey, eth2Svc eth2client.Service) (*Scheduler, error)
eth2Cl: eth2Cl,
pubkeys: pubkeys,
quit: make(chan struct{}),
duties: make(map[core.Duty]core.FetchArgSet),
duties: make(map[core.Duty]core.DutyDefinitionSet),
clock: clockwork.NewRealClock(),
delayFunc: func(_ core.Duty, deadline time.Time) <-chan time.Time {
return time.After(time.Until(deadline))
Expand All @@ -88,14 +88,14 @@ type Scheduler struct {
clock clockwork.Clock
delayFunc delayFunc
resolvedEpoch uint64
duties map[core.Duty]core.FetchArgSet
duties map[core.Duty]core.DutyDefinitionSet
dutiesMutex sync.Mutex
subs []func(context.Context, core.Duty, core.FetchArgSet) error
subs []func(context.Context, core.Duty, core.DutyDefinitionSet) error
}

// Subscribe registers a callback for triggering a duty.
// Note this should be called *before* Start.
func (s *Scheduler) Subscribe(fn func(context.Context, core.Duty, core.FetchArgSet) error) {
func (s *Scheduler) Subscribe(fn func(context.Context, core.Duty, core.DutyDefinitionSet) error) {
s.subs = append(s.subs, fn)
}

Expand Down Expand Up @@ -132,8 +132,8 @@ func (s *Scheduler) Run() error {
}
}

// GetDuty returns the argSet for a duty if resolved already, otherwise an error.
func (s *Scheduler) GetDuty(ctx context.Context, duty core.Duty) (core.FetchArgSet, error) {
// GetDutyDefinition returns the definition for a duty if resolved already, otherwise an error.
func (s *Scheduler) GetDutyDefinition(ctx context.Context, duty core.Duty) (core.DutyDefinitionSet, error) {
slotsPerEpoch, err := s.eth2Cl.SlotsPerEpoch(ctx)
if err != nil {
return nil, err
Expand All @@ -144,12 +144,12 @@ func (s *Scheduler) GetDuty(ctx context.Context, duty core.Duty) (core.FetchArgS
return nil, errors.New("epoch not resolved yet")
}

argSet, ok := s.getFetchArgSet(duty)
defSet, ok := s.getDutyDefinitionSet(duty)
if !ok {
return nil, errors.New("duty not resolved although epoch is marked as resolved")
}

return argSet.Clone() // Clone before returning.
return defSet.Clone() // Clone before returning.
}

// scheduleSlot resolves upcoming duties and triggers resolved duties for the slot.
Expand All @@ -167,7 +167,7 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) {
Type: dutyType,
}

argSet, ok := s.getFetchArgSet(duty)
defSet, ok := s.getDutyDefinitionSet(duty)
if !ok {
// Nothing for this duty.
continue
Expand All @@ -179,13 +179,13 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) {
return // context cancelled
}

instrumentDuty(duty, argSet)
instrumentDuty(duty, defSet)
ctx = log.WithCtx(ctx, z.Any("duty", duty))
ctx, span := core.StartDutyTrace(ctx, duty, "core/scheduler.scheduleSlot")
defer span.End()

for _, sub := range s.subs {
clone, err := argSet.Clone() // Clone for each subscriber.
clone, err := defSet.Clone() // Clone for each subscriber.
if err != nil {
log.Error(ctx, "Cloning duty definition set", err)
return
Expand Down Expand Up @@ -281,7 +281,7 @@ func (s *Scheduler) resolveAttDuties(ctx context.Context, slot slot, vals valida
continue
}

if !s.setFetchArg(duty, pubkey, core.NewAttesterDefinition(attDuty)) {
if !s.setDutyDefinition(duty, pubkey, core.NewAttesterDefinition(attDuty)) {
continue
}

Expand Down Expand Up @@ -323,7 +323,7 @@ func (s *Scheduler) resolveProDuties(ctx context.Context, slot slot, vals valida
continue
}

if !s.setFetchArg(duty, pubkey, core.NewProposerDefinition(proDuty)) {
if !s.setDutyDefinition(duty, pubkey, core.NewProposerDefinition(proDuty)) {
continue
}

Expand All @@ -337,29 +337,29 @@ func (s *Scheduler) resolveProDuties(ctx context.Context, slot slot, vals valida
return nil
}

func (s *Scheduler) getFetchArgSet(duty core.Duty) (core.FetchArgSet, bool) {
func (s *Scheduler) getDutyDefinitionSet(duty core.Duty) (core.DutyDefinitionSet, bool) {
s.dutiesMutex.Lock()
defer s.dutiesMutex.Unlock()

argSet, ok := s.duties[duty]
defSet, ok := s.duties[duty]

return argSet, ok
return defSet, ok
}

func (s *Scheduler) setFetchArg(duty core.Duty, pubkey core.PubKey, set core.FetchArg) bool {
func (s *Scheduler) setDutyDefinition(duty core.Duty, pubkey core.PubKey, set core.DutyDefinition) bool {
s.dutiesMutex.Lock()
defer s.dutiesMutex.Unlock()

argSet, ok := s.duties[duty]
defSet, ok := s.duties[duty]
if !ok {
argSet = make(core.FetchArgSet)
defSet = make(core.DutyDefinitionSet)
}
if _, ok := argSet[pubkey]; ok {
if _, ok := defSet[pubkey]; ok {
return false
}

argSet[pubkey] = set
s.duties[duty] = argSet
defSet[pubkey] = set
s.duties[duty] = defSet

return true
}
Expand Down
Loading

0 comments on commit 27022e1

Please sign in to comment.