Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/scheduler: add duty query method #384

Merged
merged 10 commits into from
Apr 12, 2022
96 changes: 86 additions & 10 deletions core/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package scheduler
import (
"context"
"fmt"
"math"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -68,7 +70,7 @@ func New(pubkeys []core.PubKey, eth2Svc eth2client.Service) (*Scheduler, error)
quit: make(chan struct{}),
duties: make(map[core.Duty]core.FetchArgSet),
clock: clockwork.NewRealClock(),
resolvedEpoch: -1,
resolvedEpoch: math.MaxUint64,
}, nil
}

Expand All @@ -78,8 +80,9 @@ type Scheduler struct {
quit chan struct{}
clock clockwork.Clock

resolvedEpoch int64
resolvedEpoch uint64
duties map[core.Duty]core.FetchArgSet
dutiesMutex sync.Mutex
subs []func(context.Context, core.Duty, core.FetchArgSet) error
}

Expand Down Expand Up @@ -123,9 +126,29 @@ func (s *Scheduler) Run() error {
}
}

// GetDuty returns the argSet for a duty if resolved already, otherwise an error.
func (s *Scheduler) GetDuty(ctx context.Context, duty core.Duty) (core.FetchArgSet, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a godoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sorry about that

slotsPerEpoch, err := s.eth2Cl.SlotsPerEpoch(ctx)
if err != nil {
return nil, err
}

epoch := uint64(duty.Slot) / slotsPerEpoch
if !s.isEpochResolved(epoch) {
return nil, errors.New("epoch not resolved yet")
}

argSet, ok := s.getFetchArgSet(duty)
if !ok {
return nil, errors.New("duty not resolved although epoch is marked as resolved")
}

return argSet, nil
}

// scheduleSlot resolves upcoming duties and triggers resolved duties for the slot.
func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error {
if s.resolvedEpoch != int64(slot.Epoch()) {
if s.getResolvedEpoch() != uint64(slot.Epoch()) {
err := s.resolveDuties(ctx, slot)
if err != nil {
log.Warn(ctx, "Resolving duties error (retrying next slot)", z.Err(err))
Expand All @@ -138,7 +161,7 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error {
Type: dutyType,
}

argSet, ok := s.duties[duty]
argSet, ok := s.getFetchArgSet(duty)
if !ok {
// Nothing for this duty.
continue
Expand All @@ -159,7 +182,7 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error {
}

span.End()
delete(s.duties, duty)
s.deleteDuty(duty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we delete duties after scheduling, so calls to GetDuty will error after it has been scheduled.

Suggest removing the deleteDuty function and adding a TODO:

// TODO(leo): Trim duties after some time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we had the shared component this would not be necessary

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we will indeed not need to do this here . But we will still need to do it in DutyResolver though. So same logic is required somewhere.

}

if slot.IsLastInEpoch() {
Expand All @@ -173,6 +196,7 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error {
}

// resolveDuties resolves the duties for the slot's epoch, caching the results.
// Do not call if you do not hold the dutiesMutex.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you do not hold the mutex when call this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller holds the mutex

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment not accurate anymore, can remove

func (s *Scheduler) resolveDuties(ctx context.Context, slot slot) error {
vals, err := resolveActiveValidators(ctx, s.eth2Cl, s.pubkeys, slot.Slot)
if err != nil {
Expand Down Expand Up @@ -206,7 +230,7 @@ func (s *Scheduler) resolveDuties(ctx context.Context, slot slot) error {

duty := core.Duty{Slot: int64(attDuty.Slot), Type: core.DutyAttester}

argSet, ok := s.duties[duty]
argSet, ok := s.getFetchArgSet(duty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to call getFetchArgSet first. Just call setFetchArgSet below.

if !s.setFetchArg(duty, pubkey, arg){
  log.Debug(ctx, "Ignoring previously resolved duty", z.Any("duty", duty))
  continue
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if !ok {
argSet = make(core.FetchArgSet)
}
Expand All @@ -221,7 +245,7 @@ func (s *Scheduler) resolveDuties(ctx context.Context, slot slot) error {
}

argSet[pubkey] = arg
s.duties[duty] = argSet
s.setFetchArg(duty, pubkey, arg)

log.Debug(ctx, "Resolved attester duty",
z.U64("epoch", uint64(slot.Epoch())),
Expand Down Expand Up @@ -252,7 +276,7 @@ func (s *Scheduler) resolveDuties(ctx context.Context, slot slot) error {

duty := core.Duty{Slot: int64(proDuty.Slot), Type: core.DutyProposer}

argSet, ok := s.duties[duty]
argSet, ok := s.getFetchArgSet(duty)
if !ok {
argSet = make(core.FetchArgSet)
}
Expand All @@ -267,7 +291,7 @@ func (s *Scheduler) resolveDuties(ctx context.Context, slot slot) error {
}

argSet[pubkey] = arg
s.duties[duty] = argSet
s.setFetchArg(duty, pubkey, arg)

log.Debug(ctx, "Resolved proposer duty",
z.U64("epoch", uint64(slot.Epoch())),
Expand All @@ -277,11 +301,63 @@ func (s *Scheduler) resolveDuties(ctx context.Context, slot slot) error {
}
}

s.resolvedEpoch = int64(slot.Epoch())
s.setResolvedEpoch(uint64(slot.Epoch()))

return nil
}

func (s *Scheduler) getFetchArgSet(duty core.Duty) (core.FetchArgSet, bool) {
s.dutiesMutex.Lock()
defer s.dutiesMutex.Unlock()

argSet, ok := s.duties[duty]

return argSet, ok
}

func (s *Scheduler) setFetchArg(duty core.Duty, pubkey core.PubKey, set core.FetchArg) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used?

s.dutiesMutex.Lock()
defer s.dutiesMutex.Unlock()

argSet, ok := s.duties[duty]
if !ok {
argSet = make(core.FetchArgSet)
}
if _, ok := argSet[pubkey]; ok {
return false
}

argSet[pubkey] = set
s.duties[duty] = argSet

return true
}

func (s *Scheduler) deleteDuty(duty core.Duty) {
s.dutiesMutex.Lock()
defer s.dutiesMutex.Unlock()

delete(s.duties, duty)
}

func (s *Scheduler) getResolvedEpoch() uint64 {
s.dutiesMutex.Lock()
defer s.dutiesMutex.Unlock()

return s.resolvedEpoch
}

func (s *Scheduler) setResolvedEpoch(epoch uint64) {
s.dutiesMutex.Lock()
defer s.dutiesMutex.Unlock()

s.resolvedEpoch = epoch
}

func (s *Scheduler) isEpochResolved(epoch uint64) bool {
return s.getResolvedEpoch() >= epoch
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this will return true for default s.resolvedEpoch=math.MaxUint64 before the first epoch is resolved which is technically not correct, but maybe that is not a big issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@corverroos ok, so we need to take this into account

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice


// slot is a beacon chain slot and includes chain metadata to infer epoch and next slot.
type slot struct {
Slot int64
Expand Down