diff --git a/app/eth2wrap/eth2wrap.go b/app/eth2wrap/eth2wrap.go index 00f4eabc5..5dda9a396 100644 --- a/app/eth2wrap/eth2wrap.go +++ b/app/eth2wrap/eth2wrap.go @@ -101,7 +101,7 @@ func NewMultiHTTP(timeout time.Duration, addresses ...string) (Client, error) { func newMulti(clients []Client) Client { return multi{ clients: clients, - selector: newBestSelector(len(clients), bestPeriod), + selector: newBestSelector(bestPeriod), } } @@ -114,17 +114,17 @@ type multi struct { selector *bestSelector } -// bestIdx increments the selector with the best client index. -func (m multi) bestIdx(i int) { - m.selector.Increment(i) -} - func (multi) Name() string { return "eth2wrap.multi" } func (m multi) Address() string { - return m.clients[m.selector.Best()].Address() + address, ok := m.selector.BestAddress() + if !ok { + return m.clients[0].Address() + } + + return address } func (m multi) SetValidatorCache(valCache func(context.Context) (ActiveValidators, error)) { @@ -159,7 +159,7 @@ func (m multi) ProposerConfig(ctx context.Context) (*eth2exp.ProposerConfigRespo func(ctx context.Context, cl Client) (*eth2exp.ProposerConfigResponse, error) { return cl.ProposerConfig(ctx) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { incError(label) @@ -177,7 +177,7 @@ func (m multi) AggregateBeaconCommitteeSelections(ctx context.Context, selection func(ctx context.Context, cl Client) ([]*eth2exp.BeaconCommitteeSelection, error) { return cl.AggregateBeaconCommitteeSelections(ctx, selections) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { incError(label) @@ -195,7 +195,7 @@ func (m multi) AggregateSyncCommitteeSelections(ctx context.Context, selections func(ctx context.Context, cl Client) ([]*eth2exp.SyncCommitteeSelection, error) { return cl.AggregateSyncCommitteeSelections(ctx, selections) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { incError(label) @@ -213,7 +213,7 @@ func (m multi) BlockAttestations(ctx context.Context, stateID string) ([]*eth2p0 func(ctx context.Context, cl Client) ([]*eth2p0.Attestation, error) { return cl.BlockAttestations(ctx, stateID) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { incError(label) @@ -231,7 +231,7 @@ func (m multi) NodePeerCount(ctx context.Context) (int, error) { func(ctx context.Context, cl Client) (int, error) { return cl.NodePeerCount(ctx) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { incError(label) @@ -245,14 +245,11 @@ func (m multi) NodePeerCount(ctx context.Context) (int, error) { // first successful result or first error. // The bestIdxFunc is called with the index of the client returning a successful response. func provide[O any](ctx context.Context, clients []Client, - work forkjoin.Work[Client, O], isSuccessFunc func(O) bool, bestIdxFunc func(int), + work forkjoin.Work[Client, O], isSuccessFunc func(O) bool, bestSelector *bestSelector, ) (O, error) { if isSuccessFunc == nil { isSuccessFunc = func(O) bool { return true } } - if bestIdxFunc == nil { - bestIdxFunc = func(int) {} - } fork, join, cancel := forkjoin.New(ctx, work, forkjoin.WithoutFailFast(), @@ -272,11 +269,8 @@ func provide[O any](ctx context.Context, clients []Client, if ctx.Err() != nil { return zero, ctx.Err() } else if res.Err == nil && isSuccessFunc(res.Output) { - // TODO(corver): Find a better way to get the index of successful client. - for i, client := range clients { - if client.Address() == res.Input.Address() { - bestIdxFunc(i) - } + if bestSelector != nil { + bestSelector.Increment(res.Input.Address()) } return res.Output, nil @@ -298,14 +292,12 @@ func provide[O any](ctx context.Context, clients []Client, type empty struct{} // submit proxies provide, but returns nil instead of a successful result. -func submit(ctx context.Context, clients []Client, work func(context.Context, Client) error, - bestIdxFunc func(int), -) error { +func submit(ctx context.Context, clients []Client, work func(context.Context, Client) error, selector *bestSelector) error { _, err := provide(ctx, clients, func(ctx context.Context, cl Client) (empty, error) { return empty{}, work(ctx, cl) }, - nil, bestIdxFunc, + nil, selector, ) return err @@ -368,52 +360,48 @@ func wrapError(ctx context.Context, err error, label string, fields ...z.Field) } // newBestSelector returns a new bestSelector. -func newBestSelector(n int, period time.Duration) *bestSelector { +func newBestSelector(period time.Duration) *bestSelector { return &bestSelector{ - n: n, + counts: make(map[string]int), + start: time.Now(), period: period, - counts: make([]int, n), } } // bestSelector calculates the "best client index" per period. type bestSelector struct { - n int - mu sync.Mutex + mu sync.RWMutex + counts map[string]int start time.Time period time.Duration - counts []int } -// Best returns the best index or 0 if no counts. -func (s *bestSelector) Best() int { - s.mu.Lock() - defer s.mu.Unlock() - - var resp, count int - for i, c := range s.counts { - if c > count { - resp = i - count = c +// BestAddress returns the best client address when ok is true. +func (s *bestSelector) BestAddress() (address string, ok bool) { + s.mu.RLock() + defer s.mu.RUnlock() + + var maxCount int + for addr, count := range s.counts { + if count > maxCount { + ok = true + address = addr + maxCount = count } } - return resp + return address, ok } -// Increment increments the counter for the given index. -func (s *bestSelector) Increment(i int) { +// Increment increments the counter for the given address. +func (s *bestSelector) Increment(address string) { s.mu.Lock() defer s.mu.Unlock() - if i < 0 || i >= s.n { - panic("invalid index") // This should never happen - } - if time.Since(s.start) > s.period { // Reset counters after period. - s.counts = make([]int, s.n) + s.counts = make(map[string]int) s.start = time.Now() } - s.counts[i]++ + s.counts[address]++ } diff --git a/app/eth2wrap/eth2wrap_gen.go b/app/eth2wrap/eth2wrap_gen.go index c98044151..1aae71c95 100644 --- a/app/eth2wrap/eth2wrap_gen.go +++ b/app/eth2wrap/eth2wrap_gen.go @@ -75,7 +75,7 @@ func (m multi) SlotDuration(ctx context.Context) (time.Duration, error) { func(ctx context.Context, cl Client) (time.Duration, error) { return cl.SlotDuration(ctx) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { @@ -97,7 +97,7 @@ func (m multi) SlotsPerEpoch(ctx context.Context) (uint64, error) { func(ctx context.Context, cl Client) (uint64, error) { return cl.SlotsPerEpoch(ctx) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { @@ -117,7 +117,7 @@ func (m multi) SignedBeaconBlock(ctx context.Context, opts *api.SignedBeaconBloc func(ctx context.Context, cl Client) (*api.Response[*spec.VersionedSignedBeaconBlock], error) { return cl.SignedBeaconBlock(ctx, opts) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { @@ -137,7 +137,7 @@ func (m multi) AggregateAttestation(ctx context.Context, opts *api.AggregateAtte func(ctx context.Context, cl Client) (*api.Response[*phase0.Attestation], error) { return cl.AggregateAttestation(ctx, opts) }, - isAggregateAttestationOk, m.bestIdx, + isAggregateAttestationOk, m.selector, ) if err != nil { @@ -157,7 +157,7 @@ func (m multi) SubmitAggregateAttestations(ctx context.Context, aggregateAndProo func(ctx context.Context, cl Client) error { return cl.SubmitAggregateAttestations(ctx, aggregateAndProofs) }, - m.bestIdx, + m.selector, ) if err != nil { @@ -177,7 +177,7 @@ func (m multi) AttestationData(ctx context.Context, opts *api.AttestationDataOpt func(ctx context.Context, cl Client) (*api.Response[*phase0.AttestationData], error) { return cl.AttestationData(ctx, opts) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { @@ -197,7 +197,7 @@ func (m multi) SubmitAttestations(ctx context.Context, attestations []*phase0.At func(ctx context.Context, cl Client) error { return cl.SubmitAttestations(ctx, attestations) }, - m.bestIdx, + m.selector, ) if err != nil { @@ -217,7 +217,7 @@ func (m multi) AttesterDuties(ctx context.Context, opts *api.AttesterDutiesOpts) func(ctx context.Context, cl Client) (*api.Response[[]*apiv1.AttesterDuty], error) { return cl.AttesterDuties(ctx, opts) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { @@ -237,7 +237,7 @@ func (m multi) DepositContract(ctx context.Context, opts *api.DepositContractOpt func(ctx context.Context, cl Client) (*api.Response[*apiv1.DepositContract], error) { return cl.DepositContract(ctx, opts) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { @@ -258,7 +258,7 @@ func (m multi) SyncCommitteeDuties(ctx context.Context, opts *api.SyncCommitteeD func(ctx context.Context, cl Client) (*api.Response[[]*apiv1.SyncCommitteeDuty], error) { return cl.SyncCommitteeDuties(ctx, opts) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { @@ -278,7 +278,7 @@ func (m multi) SubmitSyncCommitteeMessages(ctx context.Context, messages []*alta func(ctx context.Context, cl Client) error { return cl.SubmitSyncCommitteeMessages(ctx, messages) }, - m.bestIdx, + m.selector, ) if err != nil { @@ -298,7 +298,7 @@ func (m multi) SubmitSyncCommitteeSubscriptions(ctx context.Context, subscriptio func(ctx context.Context, cl Client) error { return cl.SubmitSyncCommitteeSubscriptions(ctx, subscriptions) }, - m.bestIdx, + m.selector, ) if err != nil { @@ -318,7 +318,7 @@ func (m multi) SyncCommitteeContribution(ctx context.Context, opts *api.SyncComm func(ctx context.Context, cl Client) (*api.Response[*altair.SyncCommitteeContribution], error) { return cl.SyncCommitteeContribution(ctx, opts) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { @@ -338,7 +338,7 @@ func (m multi) SubmitSyncCommitteeContributions(ctx context.Context, contributio func(ctx context.Context, cl Client) error { return cl.SubmitSyncCommitteeContributions(ctx, contributionAndProofs) }, - m.bestIdx, + m.selector, ) if err != nil { @@ -358,7 +358,7 @@ func (m multi) Proposal(ctx context.Context, opts *api.ProposalOpts) (*api.Respo func(ctx context.Context, cl Client) (*api.Response[*api.VersionedProposal], error) { return cl.Proposal(ctx, opts) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { @@ -378,7 +378,7 @@ func (m multi) BeaconBlockRoot(ctx context.Context, opts *api.BeaconBlockRootOpt func(ctx context.Context, cl Client) (*api.Response[*phase0.Root], error) { return cl.BeaconBlockRoot(ctx, opts) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { @@ -398,7 +398,7 @@ func (m multi) SubmitProposal(ctx context.Context, block *api.VersionedSignedPro func(ctx context.Context, cl Client) error { return cl.SubmitProposal(ctx, block) }, - m.bestIdx, + m.selector, ) if err != nil { @@ -418,7 +418,7 @@ func (m multi) SubmitBeaconCommitteeSubscriptions(ctx context.Context, subscript func(ctx context.Context, cl Client) error { return cl.SubmitBeaconCommitteeSubscriptions(ctx, subscriptions) }, - m.bestIdx, + m.selector, ) if err != nil { @@ -438,7 +438,7 @@ func (m multi) BlindedProposal(ctx context.Context, opts *api.BlindedProposalOpt func(ctx context.Context, cl Client) (*api.Response[*api.VersionedBlindedProposal], error) { return cl.BlindedProposal(ctx, opts) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { @@ -458,7 +458,7 @@ func (m multi) SubmitBlindedProposal(ctx context.Context, block *api.VersionedSi func(ctx context.Context, cl Client) error { return cl.SubmitBlindedProposal(ctx, block) }, - m.bestIdx, + m.selector, ) if err != nil { @@ -478,7 +478,7 @@ func (m multi) SubmitValidatorRegistrations(ctx context.Context, registrations [ func(ctx context.Context, cl Client) error { return cl.SubmitValidatorRegistrations(ctx, registrations) }, - m.bestIdx, + m.selector, ) if err != nil { @@ -498,7 +498,7 @@ func (m multi) Fork(ctx context.Context, opts *api.ForkOpts) (*api.Response[*pha func(ctx context.Context, cl Client) (*api.Response[*phase0.Fork], error) { return cl.Fork(ctx, opts) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { @@ -518,7 +518,7 @@ func (m multi) ForkSchedule(ctx context.Context, opts *api.ForkScheduleOpts) (*a func(ctx context.Context, cl Client) (*api.Response[[]*phase0.Fork], error) { return cl.ForkSchedule(ctx, opts) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { @@ -538,7 +538,7 @@ func (m multi) Genesis(ctx context.Context, opts *api.GenesisOpts) (*api.Respons func(ctx context.Context, cl Client) (*api.Response[*apiv1.Genesis], error) { return cl.Genesis(ctx, opts) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { @@ -558,7 +558,7 @@ func (m multi) NodeSyncing(ctx context.Context, opts *api.NodeSyncingOpts) (*api func(ctx context.Context, cl Client) (*api.Response[*apiv1.SyncState], error) { return cl.NodeSyncing(ctx, opts) }, - isSyncStateOk, m.bestIdx, + isSyncStateOk, m.selector, ) if err != nil { @@ -578,7 +578,7 @@ func (m multi) NodeVersion(ctx context.Context, opts *api.NodeVersionOpts) (*api func(ctx context.Context, cl Client) (*api.Response[string], error) { return cl.NodeVersion(ctx, opts) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { @@ -599,7 +599,7 @@ func (m multi) SubmitProposalPreparations(ctx context.Context, preparations []*a func(ctx context.Context, cl Client) error { return cl.SubmitProposalPreparations(ctx, preparations) }, - m.bestIdx, + m.selector, ) if err != nil { @@ -619,7 +619,7 @@ func (m multi) ProposerDuties(ctx context.Context, opts *api.ProposerDutiesOpts) func(ctx context.Context, cl Client) (*api.Response[[]*apiv1.ProposerDuty], error) { return cl.ProposerDuties(ctx, opts) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { @@ -639,7 +639,7 @@ func (m multi) Spec(ctx context.Context, opts *api.SpecOpts) (*api.Response[map[ func(ctx context.Context, cl Client) (*api.Response[map[string]any], error) { return cl.Spec(ctx, opts) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { @@ -659,7 +659,7 @@ func (m multi) Validators(ctx context.Context, opts *api.ValidatorsOpts) (*api.R func(ctx context.Context, cl Client) (*api.Response[map[phase0.ValidatorIndex]*apiv1.Validator], error) { return cl.Validators(ctx, opts) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { @@ -679,7 +679,7 @@ func (m multi) SubmitVoluntaryExit(ctx context.Context, voluntaryExit *phase0.Si func(ctx context.Context, cl Client) error { return cl.SubmitVoluntaryExit(ctx, voluntaryExit) }, - m.bestIdx, + m.selector, ) if err != nil { @@ -699,7 +699,7 @@ func (m multi) Domain(ctx context.Context, domainType phase0.DomainType, epoch p func(ctx context.Context, cl Client) (phase0.Domain, error) { return cl.Domain(ctx, domainType, epoch) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { @@ -722,7 +722,7 @@ func (m multi) GenesisDomain(ctx context.Context, domainType phase0.DomainType) func(ctx context.Context, cl Client) (phase0.Domain, error) { return cl.GenesisDomain(ctx, domainType) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { @@ -742,7 +742,7 @@ func (m multi) GenesisTime(ctx context.Context) (time.Time, error) { func(ctx context.Context, cl Client) (time.Time, error) { return cl.GenesisTime(ctx) }, - nil, m.bestIdx, + nil, m.selector, ) if err != nil { diff --git a/app/eth2wrap/genwrap/genwrap.go b/app/eth2wrap/genwrap/genwrap.go index a6ef58e0b..c4d971c26 100644 --- a/app/eth2wrap/genwrap/genwrap.go +++ b/app/eth2wrap/genwrap/genwrap.go @@ -68,7 +68,7 @@ type Client interface { func(ctx context.Context, cl Client) ({{.ResultTypes}}){ return cl.{{.Name}}({{.ParamNames}}) }, - {{.SuccessFunc}} m.bestIdx, + {{.SuccessFunc}} m.selector, ) if err != nil {