Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/scheduler: trigger duty at slot offset #516

Merged
merged 3 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions core/scheduler/offset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright © 2022 Obol Labs Inc.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
// Software Foundation, either version 3 of the License, or (at your option)
// any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License along with
// this program. If not, see <http://www.gnu.org/licenses/>.

package scheduler

import (
"time"

"github.com/obolnetwork/charon/core"
)

// slotOffsets defines the offsets
// at which duties should be triggered.
var slotOffsets = map[core.DutyType]func(time.Duration) time.Duration{
core.DutyAttester: fraction(1, 3), // 1/3 slot duration
// TODO(corver): Add more duties
}

// fraction returns a function that calculates slot offset
// based on the fraction x/y of total slot duration.
func fraction(x, y int64) func(time.Duration) time.Duration {
return func(total time.Duration) time.Duration {
return (total * time.Duration(x)) / time.Duration(y)
}
}
94 changes: 60 additions & 34 deletions core/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,18 @@ type eth2Provider interface {
eth2client.ProposerDutiesProvider
}

// delayFunc abstracts slot offset delaying/sleeping for deterministic tests.
type delayFunc func(duty core.Duty, deadline time.Time) <-chan time.Time

// NewForT returns a new scheduler for testing supporting a fake clock.
func NewForT(t *testing.T, clock clockwork.Clock, pubkeys []core.PubKey, eth2Svc eth2client.Service) *Scheduler {
func NewForT(t *testing.T, clock clockwork.Clock, delayFunc delayFunc, pubkeys []core.PubKey, eth2Svc eth2client.Service) *Scheduler {
t.Helper()

s, err := New(pubkeys, eth2Svc)
require.NoError(t, err)

s.clock = clock
s.delayFunc = delayFunc

return s
}
Expand All @@ -65,21 +69,24 @@ func New(pubkeys []core.PubKey, eth2Svc eth2client.Service) (*Scheduler, error)
}

return &Scheduler{
eth2Cl: eth2Cl,
pubkeys: pubkeys,
quit: make(chan struct{}),
duties: make(map[core.Duty]core.FetchArgSet),
clock: clockwork.NewRealClock(),
eth2Cl: eth2Cl,
pubkeys: pubkeys,
quit: make(chan struct{}),
duties: make(map[core.Duty]core.FetchArgSet),
clock: clockwork.NewRealClock(),
delayFunc: func(_ core.Duty, deadline time.Time) <-chan time.Time {
return time.After(time.Until(deadline))
},
resolvedEpoch: math.MaxUint64,
}, nil
}

type Scheduler struct {
eth2Cl eth2Provider
pubkeys []core.PubKey
quit chan struct{}
clock clockwork.Clock

eth2Cl eth2Provider
pubkeys []core.PubKey
quit chan struct{}
clock clockwork.Clock
delayFunc delayFunc
resolvedEpoch uint64
duties map[core.Duty]core.FetchArgSet
dutiesMutex sync.Mutex
Expand All @@ -99,6 +106,8 @@ func (s *Scheduler) Stop() {
// Run blocks and runs the scheduler until Stop is called.
func (s *Scheduler) Run() error {
ctx := log.WithTopic(context.Background(), "sched")
ctx, cancel := context.WithCancel(ctx)
defer cancel()

waitChainStart(ctx, s.eth2Cl, s.clock)
waitBeaconSync(ctx, s.eth2Cl, s.clock)
Expand All @@ -118,10 +127,7 @@ func (s *Scheduler) Run() error {

instrumentSlot(slot)

err := s.scheduleSlot(slotCtx, slot)
if err != nil {
log.Error(ctx, "Scheduling slot error", err)
}
s.scheduleSlot(slotCtx, slot)
}
}
}
Expand All @@ -147,7 +153,7 @@ func (s *Scheduler) GetDuty(ctx context.Context, duty core.Duty) (core.FetchArgS
}

// scheduleSlot resolves upcoming duties and triggers resolved duties for the slot.
func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error {
func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) {
if s.getResolvedEpoch() != uint64(slot.Epoch()) {
err := s.resolveDuties(ctx, slot)
if err != nil {
Expand All @@ -167,25 +173,21 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error {
continue
}

instrumentDuty(duty, argSet)
// Trigger duty async
go func() {
if !delaySlotOffset(ctx, slot, duty, s.delayFunc) {
return // context cancelled
}

ctx, span := core.StartDutyTrace(ctx, duty, "core/scheduler.scheduleSlot")
ctx, span := core.StartDutyTrace(ctx, duty, "core/scheduler.scheduleSlot")
defer span.End()

for _, sub := range s.subs {
err := sub(ctx, duty, argSet)
if err != nil {
// TODO(corver): Improve error handling; possibly call subscription async
// with backoff until duty expires.
span.End()
return err
for _, sub := range s.subs {
if err := sub(ctx, duty, argSet); err != nil {
log.Error(ctx, "Trigger duty subscriber error", err)
}
}
}

span.End()
// TODO(leo): This had to be commented out because the scheduler doesn't need the duty anymore,
// but the validatorAPI will need the duty when verifying a randao. Solved when we have the shared
// component to resolve duties.
// s.deleteDuty(duty)
}()
}

if slot.IsLastInEpoch() {
Expand All @@ -194,8 +196,26 @@ func (s *Scheduler) scheduleSlot(ctx context.Context, slot slot) error {
log.Warn(ctx, "Resolving duties error (retrying next slot)", err)
}
}
}

return nil
// delaySlotOffset blocks until the slot offset for the duty has been reached and return true.
// It returns false if the context is cancelled.
func delaySlotOffset(ctx context.Context, slot slot, duty core.Duty, delayFunc delayFunc) bool {
fn, ok := slotOffsets[duty.Type]
if !ok {
return true
}

// Calculate delay until slot offset
offset := fn(slot.SlotDuration)
deadline := slot.Time.Add(offset)

select {
case <-ctx.Done():
return false
case <-delayFunc(duty, deadline):
return true
}
}

// resolveDuties resolves the duties for the slot's epoch, caching the results.
Expand Down Expand Up @@ -321,6 +341,8 @@ func (s *Scheduler) setFetchArg(duty core.Duty, pubkey core.PubKey, set core.Fet
return true
}

// deleteDuty deletes the duty from the cache.
// TODO(corver): Call this on duty deadline to trim duties.
func (s *Scheduler) deleteDuty(duty core.Duty) {
s.dutiesMutex.Lock()
defer s.dutiesMutex.Unlock()
Expand Down Expand Up @@ -410,7 +432,11 @@ func newSlotTicker(ctx context.Context, eth2Cl eth2Provider, clock clockwork.Clo
height++
startTime = startTime.Add(slotDuration)

clock.Sleep(startTime.Sub(clock.Now()))
select {
case <-ctx.Done():
return
case <-clock.After(startTime.Sub(clock.Now())):
}
}
}()

Expand Down
85 changes: 74 additions & 11 deletions core/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"encoding/json"
"flag"
"os"
"sort"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -159,7 +161,7 @@ func TestSchedulerWait(t *testing.T) {
}, err
}

sched := scheduler.NewForT(t, clock, nil, eth2Cl)
sched := scheduler.NewForT(t, clock, new(delayer).Delay, nil, eth2Cl)
sched.Stop() // Just run wait functions, then quit.
require.NoError(t, sched.Run())
require.EqualValues(t, test.WaitSecs, clock.Since(t0).Seconds())
Expand All @@ -175,29 +177,35 @@ func TestSchedulerDuties(t *testing.T) {
Name string
Factor int // Determines how duties are spread per epoch
PropErrs int
Results int
}{
{
// All duties grouped in first slot of epoch
Name: "grouped",
Factor: 0,
Name: "grouped",
Factor: 0,
Results: 2,
},
{
// All duties spread in first N slots of epoch (N is number of validators)
Name: "spread",
Factor: 1,
Name: "spread",
Factor: 1,
Results: 6,
},
{
// All duties spread in first N slots of epoch (except first proposer errors)
Name: "spread_errors",
Factor: 1,
PropErrs: 1,
Results: 5,
},
}

for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
// Configure beacon mock
var t0 time.Time
t0 = t0.Add(time.Minute * 8) // Nice round slot numbers.

valSet := beaconmock.ValidatorSetA
eth2Cl, err := beaconmock.New(
beaconmock.WithValidatorSet(valSet),
Expand All @@ -224,23 +232,28 @@ func TestSchedulerDuties(t *testing.T) {

// Construct scheduler
clock := newTestClock(t0)
sched := scheduler.NewForT(t, clock, pubkeys, eth2Cl)
delayer := new(delayer)
sched := scheduler.NewForT(t, clock, delayer.Delay, pubkeys, eth2Cl)

// Only test scheduler output for first N slots, so Stop scheduler (and slotTicker) after that.
const stopAfter = 3
slotDuration, err := eth2Cl.SlotDuration(context.Background())
require.NoError(t, err)
clock.CallbackAfter(t0.Add(time.Duration(stopAfter)*slotDuration), func() {
sched.Stop()
time.Sleep(time.Hour) // Do not let the slot ticker tick anymore.
})

// Collect results
type result struct {
Duty string
Time string
DutyStr string `json:"duty"`
Duty core.Duty `json:"-"`
DutyArgSet map[core.PubKey]string
}
var results []result
var (
results []result
mu sync.Mutex
)
sched.Subscribe(func(ctx context.Context, duty core.Duty, set core.FetchArgSet) error {
// Make result human-readable
resultSet := make(map[core.PubKey]string)
Expand All @@ -249,17 +262,39 @@ func TestSchedulerDuties(t *testing.T) {
}

// Add result
mu.Lock()
defer mu.Unlock()

results = append(results, result{
Duty: duty.String(),
Duty: duty,
DutyStr: duty.String(),
DutyArgSet: resultSet,
})

if len(results) == test.Results {
sched.Stop()
}

return nil
})

// Run scheduler
require.NoError(t, sched.Run())

// Add deadlines to results
deadlines := delayer.Get()
for i := 0; i < len(results); i++ {
results[i].Time = deadlines[results[i].Duty].Format("04:05.000")
}
// Make result order deterministic
sort.Slice(results, func(i, j int) bool {
if results[i].Duty.Slot == results[j].Duty.Slot {
return results[i].Duty.Type < results[j].Duty.Type
}

return results[i].Duty.Slot < results[j].Duty.Slot
})

// Assert results
testutil.RequireGoldenJSON(t, results)
})
Expand All @@ -283,7 +318,7 @@ func TestScheduler_GetDuty(t *testing.T) {

// Construct scheduler
clock := newTestClock(t0)
sched := scheduler.NewForT(t, clock, pubkeys, eth2Cl)
sched := scheduler.NewForT(t, clock, new(delayer).Delay, pubkeys, eth2Cl)

_, err = sched.GetDuty(context.Background(), core.Duty{Slot: 0, Type: core.DutyAttester})
// due to current design we will return an error if we request the duty of a slot that has not been resolved
Expand Down Expand Up @@ -312,6 +347,34 @@ func TestScheduler_GetDuty(t *testing.T) {
require.NoError(t, sched.Run())
}

// delayer implements scheduler.delayFunc and records the deadline and returns it immediately.
type delayer struct {
mu sync.Mutex
deadlines map[core.Duty]time.Time
}

func (d *delayer) Get() map[core.Duty]time.Time {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this exported method used or aimed to used outside the package?

Copy link
Contributor Author

@corverroos corverroos May 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed before, the type delayer isn't exported, so nothing is leaked. But the function Get is used by other things in this package. So then the method is capitalised. I prefer using private methods/fields for internal use within in a type only.

Copy link
Contributor Author

@corverroos corverroos May 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also since this is a test, it is impossible to use it from outside the package.

Copy link
Contributor

@leolara leolara May 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • test of private methods can be done in _internal_test.go files, no problem for that.
  • If the method is public some developer will think that it is for public consumtion and have a method that returns the struct at some point.

hence, making it public does not give us an adventage, making it private does not requires more complex code but it keeps encapsulation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you make a method public you are documenting that the method is to be comsumed by other packages

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You general approach of making everything private, both types and fields/methods is a subjective opinion which I have already mentioned that I do not agree with. I feel making types private is sufficient to ensure encapsulation. Public fields/method then indicate that they are used by other things in the same package as opposed to only for internal use in the type itself.

public methods in private structs does not provide encapsulation in Go as shown in the code https://github.com/leolara/privateExperiment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are in a loop.

These are facts, not subjective at all:

  • private methods can be tested in Go, _internal_test.go allow that
  • public methods of private structs are accessible outside the package in Go, because in Go it is possible to have a public function that returns a private struct, and that makes the public methods of the struct accessible. However, private methods are never accessible.

This is a common rule in Software Engineering:

  • lower visibility by default, higher visibility needs a justification. Not the other way around

Copy link
Contributor

@leolara leolara May 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here there is an example of code: https://willhaley.com/blog/private-and-public-visibility-with-go-packages/

a private struct with public method and private method, they both have a place in Go code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've documented my approach here: https://github.com/corverroos/go-visibilty

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok made it unexported.

d.mu.Lock()
defer d.mu.Unlock()

return d.deadlines
}

// Delay implements scheduler.delayFunc and records the deadline and returns it immediately.
func (d *delayer) Delay(duty core.Duty, deadline time.Time) <-chan time.Time {
d.mu.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method used or aimed to be used outside the package?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comment above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok made it unexported.

defer d.mu.Unlock()
if d.deadlines == nil {
d.deadlines = make(map[core.Duty]time.Time)
}
d.deadlines[duty] = deadline

resp := make(chan time.Time, 1)
resp <- deadline

return resp
}

func newTestClock(now time.Time) *testClock {
return &testClock{
now: now,
Expand Down
Loading