Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

app: eth2wrap best selector refactoring #2806

Merged
merged 2 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 38 additions & 50 deletions app/eth2wrap/eth2wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
func newMulti(clients []Client) Client {
return multi{
clients: clients,
selector: newBestSelector(len(clients), bestPeriod),
selector: newBestSelector(bestPeriod),
}
}

Expand All @@ -114,17 +114,17 @@
selector *bestSelector
}

// bestIdx increments the selector with the best client index.
func (m multi) bestIdx(i int) {
m.selector.Increment(i)
}

func (multi) Name() string {
return "eth2wrap.multi"
}

func (m multi) Address() string {
return m.clients[m.selector.Best()].Address()
address, ok := m.selector.BestAddress()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what case could BestAddress return ok == false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory this should never happen, because even after period elapsed, we always add at least one entry to the map. So, this is a safeguard in case somebody changes the logic in Increment and not look into this function.

if !ok {
return m.clients[0].Address()
}

return address
}

func (m multi) SetValidatorCache(valCache func(context.Context) (ActiveValidators, error)) {
Expand Down Expand Up @@ -159,7 +159,7 @@
func(ctx context.Context, cl Client) (*eth2exp.ProposerConfigResponse, error) {
return cl.ProposerConfig(ctx)
},
nil, m.bestIdx,
nil, m.selector,
)
if err != nil {
incError(label)
Expand All @@ -177,7 +177,7 @@
func(ctx context.Context, cl Client) ([]*eth2exp.BeaconCommitteeSelection, error) {
return cl.AggregateBeaconCommitteeSelections(ctx, selections)
},
nil, m.bestIdx,
nil, m.selector,
)
if err != nil {
incError(label)
Expand All @@ -195,7 +195,7 @@
func(ctx context.Context, cl Client) ([]*eth2exp.SyncCommitteeSelection, error) {
return cl.AggregateSyncCommitteeSelections(ctx, selections)
},
nil, m.bestIdx,
nil, m.selector,
)
if err != nil {
incError(label)
Expand All @@ -213,7 +213,7 @@
func(ctx context.Context, cl Client) ([]*eth2p0.Attestation, error) {
return cl.BlockAttestations(ctx, stateID)
},
nil, m.bestIdx,
nil, m.selector,
)
if err != nil {
incError(label)
Expand All @@ -231,7 +231,7 @@
func(ctx context.Context, cl Client) (int, error) {
return cl.NodePeerCount(ctx)
},
nil, m.bestIdx,
nil, m.selector,
)
if err != nil {
incError(label)
Expand All @@ -245,14 +245,11 @@
// first successful result or first error.
// The bestIdxFunc is called with the index of the client returning a successful response.
func provide[O any](ctx context.Context, clients []Client,
work forkjoin.Work[Client, O], isSuccessFunc func(O) bool, bestIdxFunc func(int),
work forkjoin.Work[Client, O], isSuccessFunc func(O) bool, bestSelector *bestSelector,
) (O, error) {
if isSuccessFunc == nil {
isSuccessFunc = func(O) bool { return true }
}
if bestIdxFunc == nil {
bestIdxFunc = func(int) {}
}

fork, join, cancel := forkjoin.New(ctx, work,
forkjoin.WithoutFailFast(),
Expand All @@ -272,11 +269,8 @@
if ctx.Err() != nil {
return zero, ctx.Err()
} else if res.Err == nil && isSuccessFunc(res.Output) {
// TODO(corver): Find a better way to get the index of successful client.
for i, client := range clients {
if client.Address() == res.Input.Address() {
bestIdxFunc(i)
}
if bestSelector != nil {
bestSelector.Increment(res.Input.Address())
}

return res.Output, nil
Expand All @@ -298,14 +292,12 @@
type empty struct{}

// submit proxies provide, but returns nil instead of a successful result.
func submit(ctx context.Context, clients []Client, work func(context.Context, Client) error,
bestIdxFunc func(int),
) error {
func submit(ctx context.Context, clients []Client, work func(context.Context, Client) error, selector *bestSelector) error {
_, err := provide(ctx, clients,
func(ctx context.Context, cl Client) (empty, error) {
return empty{}, work(ctx, cl)
},
nil, bestIdxFunc,
nil, selector,
)

return err
Expand Down Expand Up @@ -368,52 +360,48 @@
}

// newBestSelector returns a new bestSelector.
func newBestSelector(n int, period time.Duration) *bestSelector {
func newBestSelector(period time.Duration) *bestSelector {
return &bestSelector{
n: n,
counts: make(map[string]int),
start: time.Now(),
period: period,
counts: make([]int, n),
}
}

// bestSelector calculates the "best client index" per period.
type bestSelector struct {
n int
mu sync.Mutex
mu sync.RWMutex
counts map[string]int
start time.Time
period time.Duration
counts []int
}

// Best returns the best index or 0 if no counts.
func (s *bestSelector) Best() int {
s.mu.Lock()
defer s.mu.Unlock()

var resp, count int
for i, c := range s.counts {
if c > count {
resp = i
count = c
// BestAddress returns the best client address when ok is true.
func (s *bestSelector) BestAddress() (address string, ok bool) {
s.mu.RLock()
defer s.mu.RUnlock()

var maxCount int
for addr, count := range s.counts {
if count > maxCount {
ok = true
address = addr
maxCount = count
}
}

return resp
return address, ok
}

// Increment increments the counter for the given index.
func (s *bestSelector) Increment(i int) {
// Increment increments the counter for the given address.
func (s *bestSelector) Increment(address string) {
s.mu.Lock()
defer s.mu.Unlock()

if i < 0 || i >= s.n {
panic("invalid index") // This should never happen
}

if time.Since(s.start) > s.period { // Reset counters after period.
s.counts = make([]int, s.n)
s.counts = make(map[string]int)

Check warning on line 402 in app/eth2wrap/eth2wrap.go

View check run for this annotation

Codecov / codecov/patch

app/eth2wrap/eth2wrap.go#L402

Added line #L402 was not covered by tests
s.start = time.Now()
}

s.counts[i]++
s.counts[address]++
}
Loading
Loading