Skip to content

Commit

Permalink
core/fetcher: support aggregate attestation not found (#1184)
Browse files Browse the repository at this point in the history
Fixes panic in fetcher when `AggregateAttestation` isn't found and go-eth2-client returns `nil,nil`. This can happen when using infura and the `SubmitBeaconCommitteeSubscription` was processed by a different node. Just retry in that case.

Also use proper timeout for eth2exp calls (2s is too short).

category: bug
ticket: none
  • Loading branch information
corverroos authored Sep 28, 2022
1 parent 086d2c8 commit 7b9504d
Show file tree
Hide file tree
Showing 12 changed files with 74 additions and 30 deletions.
6 changes: 3 additions & 3 deletions app/eth2wrap/eth2wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func Instrument(clients ...Client) (Client, error) {
}

// AdaptEth2HTTP returns a Client wrapping an eth2http service by adding experimental endpoints.
func AdaptEth2HTTP(eth2Svc *eth2http.Service) Client {
return httpAdapter{Service: eth2Svc}
func AdaptEth2HTTP(eth2Svc *eth2http.Service, timeout time.Duration) Client {
return httpAdapter{Service: eth2Svc, timeout: timeout}
}

// NewMultiHTTP returns a new instrumented multi eth2 http client.
Expand All @@ -84,7 +84,7 @@ func NewMultiHTTP(ctx context.Context, timeout time.Duration, addresses ...strin
return nil, errors.New("invalid eth2 http service")
}

clients = append(clients, AdaptEth2HTTP(eth2Http))
clients = append(clients, AdaptEth2HTTP(eth2Http, timeout))
}

return Instrument(clients...)
Expand Down
2 changes: 1 addition & 1 deletion app/eth2wrap/eth2wrap_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion app/eth2wrap/genwrap/genwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ type Client interface {

// successFuncs indicates which endpoints have custom success functions.
successFuncs = map[string]string{
"NodeSyncing": "isSyncStateOk",
"NodeSyncing": "isSyncStateOk",
"AggregateAttestation": "isAggregateAttestationOk",
}

skipImport = map[string]bool{
Expand Down
11 changes: 6 additions & 5 deletions app/eth2wrap/httpwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
// interfaces not present go-eth2-client.
type httpAdapter struct {
*eth2http.Service
timeout time.Duration
}

type submitBeaconCommitteeSubscriptionsV2JSON struct {
Expand All @@ -48,7 +49,7 @@ func (h httpAdapter) SubmitBeaconCommitteeSubscriptionsV2(ctx context.Context, s
return nil, errors.Wrap(err, "marshal submit beacon committee subscriptions V2 request")
}

respBody, err := httpPost(ctx, h.Address(), "/eth/v2/validator/beacon_committee_subscriptions", bytes.NewReader(reqBody))
respBody, err := httpPost(ctx, h.Address(), "/eth/v2/validator/beacon_committee_subscriptions", bytes.NewReader(reqBody), h.timeout)
if err != nil {
return nil, errors.Wrap(err, "post submit beacon committee subscriptions v2")
}
Expand All @@ -61,7 +62,10 @@ func (h httpAdapter) SubmitBeaconCommitteeSubscriptionsV2(ctx context.Context, s
return resp.Data, nil
}

func httpPost(ctx context.Context, base string, endpoint string, body io.Reader) ([]byte, error) {
func httpPost(ctx context.Context, base string, endpoint string, body io.Reader, timeout time.Duration) ([]byte, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

addr, err := url.JoinPath(base, endpoint)
if err != nil {
return nil, errors.Wrap(err, "invalid address")
Expand All @@ -72,9 +76,6 @@ func httpPost(ctx context.Context, base string, endpoint string, body io.Reader)
return nil, errors.Wrap(err, "invalid endpoint")
}

ctx, cancel := context.WithTimeout(ctx, time.Second*2) // TODO(dhruv): use actual configured timeout.
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodPost, url.String(), body)
if err != nil {
return nil, errors.Wrap(err, "new POST request with ctx")
Expand Down
10 changes: 9 additions & 1 deletion app/eth2wrap/success.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,17 @@

package eth2wrap

import apiv1 "github.com/attestantio/go-eth2-client/api/v1"
import (
apiv1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
)

// isSyncStateOk returns tue if the sync state is not syncing.
func isSyncStateOk(s *apiv1.SyncState) bool {
return !s.IsSyncing
}

// isAggregateAttestationOk returns true if the aggregate attestation is not nil (which can happen if the subscription wasn't successful).
func isAggregateAttestationOk(att *phase0.Attestation) bool {
return att != nil
}
4 changes: 4 additions & 0 deletions app/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ func isTemporaryBeaconErr(err error) bool {
return true
}

if strings.Contains(err.Error(), "retryable") {
return true
}

// TODO(corver): Add more checks here.

return false
Expand Down
11 changes: 7 additions & 4 deletions app/vmock.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ func newVMockEth2Provider(conf Config) func() (eth2wrap.Client, error) {
mu sync.Mutex
)

const timeout = time.Second * 10

return func() (eth2wrap.Client, error) {
mu.Lock()
defer mu.Unlock()
Expand All @@ -160,7 +162,7 @@ func newVMockEth2Provider(conf Config) func() (eth2wrap.Client, error) {
eth2Svc, err = eth2http.New(context.Background(),
eth2http.WithLogLevel(1),
eth2http.WithAddress("http://"+conf.ValidatorAPIAddr),
eth2http.WithTimeout(time.Second*10), // Allow sufficient time to block while fetching duties.
eth2http.WithTimeout(timeout), // Allow sufficient time to block while fetching duties.
)
if err != nil {
time.Sleep(time.Millisecond * 100) // Test startup backoff
Expand All @@ -171,7 +173,7 @@ func newVMockEth2Provider(conf Config) func() (eth2wrap.Client, error) {
return nil, errors.New("invalid eth2 http service")
}

cached = eth2wrap.AdaptEth2HTTP(eth2Http)
cached = eth2wrap.AdaptEth2HTTP(eth2Http, timeout)
}

return cached, err
Expand Down Expand Up @@ -219,11 +221,12 @@ func handleVMockDuty(ctx context.Context, duty core.Duty, eth2Cl eth2wrap.Client
}
log.Info(ctx, "Mock attestation submitted to validatorapi", z.I64("slot", duty.Slot))
case core.DutyAggregator:
err := attester.Aggregate(ctx)
ok, err := attester.Aggregate(ctx)
if err != nil {
return errors.Wrap(err, "mock aggregation failed")
} else if ok {
log.Info(ctx, "Mock aggregation submitted to validatorapi", z.I64("slot", duty.Slot))
}
log.Info(ctx, "Mock aggregation submitted to validatorapi", z.I64("slot", duty.Slot))
case core.DutyProposer:
err := validatormock.ProposeBlock(ctx, eth2Cl, signer, eth2p0.Slot(duty.Slot), pubshares...)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions core/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ func (f *Fetcher) fetchAggregatorData(ctx context.Context, slot int64, defSet co
aggAtt, err := f.eth2Cl.AggregateAttestation(ctx, eth2p0.Slot(slot), dataRoot)
if err != nil {
return core.UnsignedDataSet{}, err
} else if aggAtt == nil {
// Some beacon nodes return nil if the root is not found, return retryable error.
// This could happen if the beacon node didn't subscribe to the correct subnet.
return core.UnsignedDataSet{}, errors.New("aggregate attestation not found by root (retryable)", z.Hex("root", dataRoot[:]))
}

resp[pubkey] = core.AggregatedAttestation{
Expand Down
13 changes: 12 additions & 1 deletion core/fetcher/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func TestFetchAggregator(t *testing.T) {
)

commLen := 0 // commLen of 0, results in eth2exp.CalculateCommitteeSubscriptionResponse to always return IsAggregator=true
nilAggregate := false

duty := core.NewAggregatorDuty(slot)

Expand Down Expand Up @@ -139,6 +140,9 @@ func TestFetchAggregator(t *testing.T) {
}

bmock.AggregateAttestationFunc = func(ctx context.Context, slot eth2p0.Slot, root eth2p0.Root) (*eth2p0.Attestation, error) {
if nilAggregate {
return nil, nil //nolint:nilnil // This reproduces what go-eth2-client does
}
for _, att := range attByCommIdx {
dataRoot, err := att.Data.HashTreeRoot()
require.NoError(t, err)
Expand All @@ -147,7 +151,7 @@ func TestFetchAggregator(t *testing.T) {
}
}

return &eth2p0.Attestation{}, nil
return nil, errors.New("expected unknown root")
}

fetch, err := fetcher.New(bmock)
Expand Down Expand Up @@ -208,6 +212,13 @@ func TestFetchAggregator(t *testing.T) {

err = fetch.Fetch(ctx, duty, defSet)
require.NoError(t, err)

// Test nil, nil AggregateAttestation response.
commLen = 0
nilAggregate = true

err = fetch.Fetch(ctx, duty, defSet)
require.ErrorContains(t, err, "aggregate attestation not found by root (retryable)")
}

func TestFetchProposer(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions core/validatorapi/router_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"os"
"strings"
"testing"
"time"

eth2client "github.com/attestantio/go-eth2-client"
eth2api "github.com/attestantio/go-eth2-client/api"
Expand Down Expand Up @@ -616,7 +617,7 @@ func TestBeaconCommitteeSubscriptionsV2(t *testing.T) {
},
}

eth2Cl := eth2wrap.AdaptEth2HTTP(eth2Svc.(*eth2http.Service))
eth2Cl := eth2wrap.AdaptEth2HTTP(eth2Svc.(*eth2http.Service), time.Second)
actual, err := eth2Cl.SubmitBeaconCommitteeSubscriptionsV2(ctx, subs)
require.NoError(t, err)
require.Equal(t, expected, actual)
Expand Down Expand Up @@ -657,7 +658,7 @@ func TestSubmitAggregateAttestations(t *testing.T) {
)
require.NoError(t, err)

eth2Cl := eth2wrap.AdaptEth2HTTP(eth2Svc.(*eth2http.Service))
eth2Cl := eth2wrap.AdaptEth2HTTP(eth2Svc.(*eth2http.Service), time.Second)
err = eth2Cl.SubmitAggregateAttestations(ctx, []*eth2p0.SignedAggregateAndProof{agg})
require.NoError(t, err)
}
Expand Down
31 changes: 20 additions & 11 deletions testutil/validatormock/attest.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/eth2wrap"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/eth2util"
"github.com/obolnetwork/charon/eth2util/eth2exp"
"github.com/obolnetwork/charon/eth2util/signing"
Expand Down Expand Up @@ -117,7 +119,7 @@ func (a *SlotAttester) Attest(ctx context.Context) error {
}

// Aggregate should be called at latest 2/3 into the slot, it does slot attestation aggregations.
func (a *SlotAttester) Aggregate(ctx context.Context) error {
func (a *SlotAttester) Aggregate(ctx context.Context) (bool, error) {
// Wait for Prepare and Attest to complete
wait(ctx, a.selectionsOK, a.datasOK)

Expand Down Expand Up @@ -248,6 +250,8 @@ func prepareAggregators(ctx context.Context, eth2Cl eth2wrap.Client, signFunc Si
selections = append(selections, selection)
}

log.Info(ctx, "Mock beacon committee subscription submitted", z.Int("aggregators", len(selections)))

return selections, nil
}

Expand Down Expand Up @@ -310,17 +314,18 @@ func attest(ctx context.Context, eth2Cl eth2wrap.Client, signFunc SignFunc, slot
return datas, nil
}

// aggregate does attestation aggregation for the provided validators, selections and attestation datas.
// aggregate does attestation aggregation for the provided validators, selections and attestation datas and returns true.
// It returns false if aggregation is not required.
func aggregate(ctx context.Context, eth2Cl eth2wrap.Client, signFunc SignFunc, slot eth2p0.Slot,
vals validators, selections selections, datas datas,
) error {
) (bool, error) {
if len(selections) == 0 {
return nil
return false, nil
}

epoch, err := epochFromSlot(ctx, eth2Cl, slot)
if err != nil {
return err
return false, err
}

var (
Expand All @@ -339,7 +344,7 @@ func aggregate(ctx context.Context, eth2Cl eth2wrap.Client, signFunc SignFunc, s
var err error
att, err = getAggregateAttestation(ctx, eth2Cl, datas, commIdx)
if err != nil {
return err
return false, err
}
attsByComm[commIdx] = att
}
Expand All @@ -354,22 +359,22 @@ func aggregate(ctx context.Context, eth2Cl eth2wrap.Client, signFunc SignFunc, s

proofRoot, err := proof.HashTreeRoot()
if err != nil {
return err
return false, err
}

sigData, err := signing.GetDataRoot(ctx, eth2Cl, signing.DomainAggregateAndProof, epoch, proofRoot)
if err != nil {
return err
return false, err
}

val, ok := vals[selection.ValidatorIndex]
if !ok {
return errors.New("missing validator index")
return false, errors.New("missing validator index")
}

proofSig, err := signFunc(val.Validator.PublicKey, sigData[:])
if err != nil {
return err
return false, err
}

aggs = append(aggs, &eth2p0.SignedAggregateAndProof{
Expand All @@ -378,7 +383,11 @@ func aggregate(ctx context.Context, eth2Cl eth2wrap.Client, signFunc SignFunc, s
})
}

return eth2Cl.SubmitAggregateAttestations(ctx, aggs)
if err := eth2Cl.SubmitAggregateAttestations(ctx, aggs); err != nil {
return false, err
}

return true, nil
}

// getAggregateAttestation returns an aggregated attestation for the provided committee.
Expand Down
4 changes: 3 additions & 1 deletion testutil/validatormock/validatormock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ func TestAttest(t *testing.T) {

require.NoError(t, attester.Prepare(ctx))
require.NoError(t, attester.Attest(ctx))
require.NoError(t, attester.Aggregate(ctx))
ok, err := attester.Aggregate(ctx)
require.NoError(t, err)
require.Equal(t, test.ExpectAggregations > 0, ok)

// Assert length and expected attestations
require.Len(t, atts, test.ExpectAttestations)
Expand Down

0 comments on commit 7b9504d

Please sign in to comment.