From d97fd5a1bc698a39da1e814532d2c6c42466a6a0 Mon Sep 17 00:00:00 2001 From: corverroos Date: Fri, 21 Apr 2023 15:46:51 +0200 Subject: [PATCH 01/10] core/tracker: log braodcast vs inclusion delay --- app/app.go | 10 +- core/tracker/incldelay.go | 226 +++++++++++++++++------- core/tracker/incldelay_internal_test.go | 33 +++- core/tracking.go | 8 +- 4 files changed, 203 insertions(+), 74 deletions(-) diff --git a/app/app.go b/app/app.go index c9b4853e1..3eeb5ab85 100644 --- a/app/app.go +++ b/app/app.go @@ -398,9 +398,13 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, feeRecipientFunc := func(pubkey core.PubKey) string { return feeRecipientAddrByCorePubkey[pubkey] } - sched.SubscribeSlots(setFeeRecipient(eth2Cl, eth2Pubkeys, feeRecipientFunc)) - sched.SubscribeSlots(tracker.NewInclDelayFunc(eth2Cl, sched.GetDutyDefinition)) + + inclDelay, err := tracker.NewInclusionDelay(ctx, eth2Cl, sched.GetDutyDefinition) + if err != nil { + return err + } + sched.SubscribeSlots(inclDelay.Instrument) fetch, err := fetcher.New(eth2Cl, feeRecipientFunc) if err != nil { @@ -464,7 +468,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, } opts := []core.WireOption{ core.WithTracing(), - core.WithTracking(track), + core.WithTracking(track, inclDelay.Broadcasted), core.WithAsyncRetry(retryer), } core.Wire(sched, fetch, cons, dutyDB, vapi, parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster, opts...) diff --git a/core/tracker/incldelay.go b/core/tracker/incldelay.go index 5cc4bf1e9..1efb682d9 100644 --- a/core/tracker/incldelay.go +++ b/core/tracker/incldelay.go @@ -6,112 +6,208 @@ import ( "context" "fmt" "sync" + "time" + + eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/jonboulle/clockwork" "github.com/obolnetwork/charon/app/errors" "github.com/obolnetwork/charon/app/eth2wrap" + "github.com/obolnetwork/charon/app/log" + "github.com/obolnetwork/charon/app/z" "github.com/obolnetwork/charon/core" ) // inclDelayLag is the number of slots to lag before calculating inclusion delay. // Half an epoch is good compromise between finality and small gaps on startup. -const inclDelayLag = 16 +const ( + inclDelayLag = 16 + trimEpochOffset = 3 +) // dutiesFunc returns the duty definitions for a given duty. type dutiesFunc func(context.Context, core.Duty) (core.DutyDefinitionSet, error) -// NewInclDelayFunc returns a function that calculates attestation inclusion delay for a block. +// NewInclusionDelay returns a new inclusion delay tracker. +func NewInclusionDelay(ctx context.Context, eth2Cl eth2wrap.Client, dutiesFunc dutiesFunc) (*InclusionDelay, error) { + genesisTime, err := eth2Cl.GenesisTime(ctx) + if err != nil { + return nil, err + } + + slotDuration, err := eth2Cl.SlotDuration(ctx) + if err != nil { + return nil, err + } + + slotsPerEpoch, err := eth2Cl.SlotsPerEpoch(ctx) + if err != nil { + return nil, err + } + + return &InclusionDelay{ + eth2Cl: eth2Cl, + dutiesFunc: dutiesFunc, + instrumentFunc: instrumentAvgDelay, + logMappingFunc: logMapping, + bcastDelays: make(map[int64]map[eth2p0.Root]time.Duration), + genesisTime: genesisTime, + slotDuration: slotDuration, + slotsPerEpoch: int(slotsPerEpoch), + clock: clockwork.NewRealClock(), + }, nil +} + +// InclusionDelay tracks the inclusion delay of attestations. +type InclusionDelay struct { + eth2Cl eth2wrap.Client + dutiesFunc dutiesFunc + instrumentFunc func(inclDelaySlots []int64) + logMappingFunc func(ctx context.Context, slot int64, bcastDelay time.Duration, inclDelay int64) + genesisTime time.Time + slotDuration time.Duration + slotsPerEpoch int + clock clockwork.Clock + + mu sync.Mutex + bcastDelays map[int64]map[eth2p0.Root]time.Duration + dutyStartSlot int64 +} + +// Broadcasted records the broadcast delay of an attestation. +func (d *InclusionDelay) Broadcasted(slot int64, att core.Attestation) { + d.mu.Lock() + defer d.mu.Unlock() + + if _, ok := d.bcastDelays[slot]; !ok { + d.bcastDelays[slot] = make(map[eth2p0.Root]time.Duration) + } + + d.bcastDelays[slot][att.Data.BeaconBlockRoot] = d.clock.Now().Sub(d.slotStartTime(slot)) +} + +// Instrument calculates attestation inclusion delay for a block. // // Inclusion delay is the average of the distance between the slot a validator’s attestation // is expected by the network and the slot the attestation is actually included on-chain. // See https://rated.gitbook.io/rated-documentation/rating-methodologies/ethereum-beacon-chain/network-explorer-definitions/top-screener#inclusion-delay. -func NewInclDelayFunc(eth2Cl eth2wrap.Client, dutiesFunc dutiesFunc) func(context.Context, core.Slot) error { - return newInclDelayFunc(eth2Cl, dutiesFunc, instrumentAvgDelay) -} +func (d *InclusionDelay) Instrument(ctx context.Context, current core.Slot) error { + // blockSlot the block we want to instrument. + blockSlot := current.Slot - inclDelayLag -// newInclDelayFunc extends NewInclDelayFunc with abstracted callback. -func newInclDelayFunc(eth2Cl eth2wrap.Client, dutiesFunc dutiesFunc, callback func([]int64)) func(context.Context, core.Slot) error { - // dutyStartSlot is the first slot we can instrument (since dutiesFunc will not have duties from older slots). - var ( - dutyStartSlot int64 - dssMutex sync.Mutex - ) - - // getOrSetStartSlot returns a previously set duty start slot and true or it sets it and returns false. - getOrSetStartSlot := func(slot int64) (int64, bool) { - dssMutex.Lock() - defer dssMutex.Unlock() - - if dutyStartSlot == 0 { - dutyStartSlot = slot - return 0, false - } + startSlot, ok := d.getOrSetStartSlot(current.Slot) + if !ok { // Set start slot. + return nil + } else if blockSlot < startSlot { + return nil // Still need to wait + } - return dutyStartSlot, true + atts, err := d.eth2Cl.BlockAttestations(ctx, fmt.Sprint(blockSlot)) + if err != nil { + return err } - return func(ctx context.Context, current core.Slot) error { - // blockSlot the block we want to instrument. - blockSlot := current.Slot - inclDelayLag + var delays []int64 + for _, att := range atts { + if att == nil || att.Data == nil { + return errors.New("attestation fields cannot be nil") + } - startSlot, ok := getOrSetStartSlot(current.Slot) - if !ok { // Set start slot. - return nil - } else if blockSlot < startSlot { - return nil // Still need to wait + attSlot := att.Data.Slot + if int64(attSlot) < startSlot { + continue } - atts, err := eth2Cl.BlockAttestations(ctx, fmt.Sprint(blockSlot)) - if err != nil { + // Get all our duties for this attestation blockSlot + set, err := d.dutiesFunc(ctx, core.NewAttesterDuty(int64(attSlot))) + if errors.Is(err, core.ErrNotFound) { + continue // No duties for this slot. + } else if err != nil { return err } - var delays []int64 - for _, att := range atts { - if att == nil || att.Data == nil { - return errors.New("attestation fields cannot be nil") + // Get all our validator committee indexes for this attestation. + for _, def := range set { + duty, ok := def.(core.AttesterDefinition) + if !ok { + return errors.New("invalid attester definition") } - attSlot := att.Data.Slot - if int64(attSlot) < startSlot { - continue + if duty.CommitteeIndex != att.Data.Index { + continue // This duty is for another committee } - // Get all our duties for this attestation blockSlot - set, err := dutiesFunc(ctx, core.NewAttesterDuty(int64(attSlot))) - if errors.Is(err, core.ErrNotFound) { - continue // No duties for this slot. - } else if err != nil { - return err + if !att.AggregationBits.BitAt(duty.ValidatorCommitteeIndex) { + continue // We are not included in attestation + // Note that to track missed attestations, we'd need to keep state of seen attestations. } - // Get all our validator committee indexes for this attestation. - for _, def := range set { - duty, ok := def.(core.AttesterDefinition) - if !ok { - return errors.New("invalid attester definition") - } + inclDelay := blockSlot - int64(attSlot) + d.logDelayMapping(ctx, int64(attSlot), att, inclDelay) + delays = append(delays, inclDelay) + } + } - if duty.CommitteeIndex != att.Data.Index { - continue // This duty is for another committee - } + if len(delays) > 0 { + d.instrumentFunc(delays) + } - if !att.AggregationBits.BitAt(duty.ValidatorCommitteeIndex) { - continue // We are not included in attestation - // Note that to track missed attestations, we'd need to keep state of seen attestations. - } + d.trimBcastDelays(blockSlot - int64(d.slotsPerEpoch*trimEpochOffset)) - delays = append(delays, blockSlot-int64(attSlot)) - } - } + return nil +} + +func (d *InclusionDelay) slotStartTime(slot int64) time.Time { + return d.genesisTime.Add(time.Duration(slot) * d.slotDuration) +} + +// getOrSetStartSlot returns a previously set duty start slot and true or it sets it and returns false. +func (d *InclusionDelay) getOrSetStartSlot(slot int64) (int64, bool) { + d.mu.Lock() + defer d.mu.Unlock() - if len(delays) > 0 { - callback(delays) + if d.dutyStartSlot == 0 { + d.dutyStartSlot = slot + return 0, false + } + + return d.dutyStartSlot, true +} + +// trimBcastDelays deletes all broadcast delays that are older than the given slot. +func (d *InclusionDelay) trimBcastDelays(slot int64) { + d.mu.Lock() + defer d.mu.Unlock() + + for s := range d.bcastDelays { + if s <= slot { + delete(d.bcastDelays, s) } + } +} - return nil +// logDelayMapping logs the broadcast delay vs inclusion delay for a given attestation. +// TODO(corver): Find a better less verbose way to track this. +func (d *InclusionDelay) logDelayMapping(ctx context.Context, slot int64, att *eth2p0.Attestation, inclDelay int64) { + d.mu.Lock() + defer d.mu.Unlock() + + bcastDelay, ok := d.bcastDelays[slot][att.Data.BeaconBlockRoot] + if !ok { + log.Debug(ctx, "Missing broadcast delay found for included attestation", z.Int("slot", int(slot))) + } else { + d.logMappingFunc(ctx, slot, bcastDelay, inclDelay) } } +func logMapping(ctx context.Context, slot int64, bcastDelay time.Duration, inclDelay int64) { + log.Debug(ctx, "Attestation broadcast delay (secs) vs inclusion distance (slots)", + z.Int("slot", int(slot)), + z.F64("bcast_delay", bcastDelay.Seconds()), + z.I64("incl_delay", inclDelay), + ) +} + // instrumentAvgDelay sets the avg inclusion delay metric. func instrumentAvgDelay(delays []int64) { var sum int64 diff --git a/core/tracker/incldelay_internal_test.go b/core/tracker/incldelay_internal_test.go index 56751512f..b1d67c6d3 100644 --- a/core/tracker/incldelay_internal_test.go +++ b/core/tracker/incldelay_internal_test.go @@ -6,9 +6,11 @@ import ( "context" "fmt" "testing" + "time" eth2v1 "github.com/attestantio/go-eth2-client/api/v1" eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/jonboulle/clockwork" "github.com/prysmaticlabs/go-bitfield" "github.com/stretchr/testify/require" @@ -20,12 +22,21 @@ func TestInclDelay(t *testing.T) { const ( blockSlot = 10 slotsPerEpoch = 16 + slotDuration = time.Second slot = blockSlot + inclDelayLag ) + ctx := context.Background() + clock := clockwork.NewFakeClockAt(time.Now().Truncate(time.Hour)) - bmock, err := beaconmock.New() + bmock, err := beaconmock.New( + beaconmock.WithSlotDuration(slotDuration), + beaconmock.WithSlotsPerEpoch(slotsPerEpoch), + beaconmock.WithGenesisTime(clock.Now()), + ) require.NoError(t, err) + clock.Advance(blockSlot * slotDuration) + expect := []int64{1, 2, 4, 8} var atts []att @@ -55,18 +66,30 @@ func TestInclDelay(t *testing.T) { return res, nil } + inclDelay, err := NewInclusionDelay(ctx, bmock, dutiesFunc) + require.NoError(t, err) + inclDelay.clock = clock + done := make(chan struct{}) - fn := newInclDelayFunc(bmock, dutiesFunc, func(delays []int64) { + inclDelay.instrumentFunc = func(delays []int64) { require.EqualValues(t, expect, delays) close(done) - }) + } + + for _, att := range atts { + inclDelay.Broadcasted(int64(att.Att.Data.Slot), core.NewAttestation(att.Att)) + } + + inclDelay.logMappingFunc = func(ctx context.Context, slot int64, bcastDelay time.Duration, inclDelay int64) { + require.Equal(t, time.Duration(blockSlot-slot)*slotDuration, bcastDelay) + } - err = fn(context.Background(), core.Slot{ + err = inclDelay.Instrument(ctx, core.Slot{ Slot: 1, }) require.NoError(t, err) - err = fn(context.Background(), core.Slot{ + err = inclDelay.Instrument(ctx, core.Slot{ Slot: slot, SlotsPerEpoch: slotsPerEpoch, }) diff --git a/core/tracking.go b/core/tracking.go index c8cef1d08..3619954d6 100644 --- a/core/tracking.go +++ b/core/tracking.go @@ -7,7 +7,7 @@ import ( ) // WithTracking wraps component input functions to support tracking of core components. -func WithTracking(tracker Tracker) WireOption { +func WithTracking(tracker Tracker, bcastDelayFunc func(int64, Attestation)) WireOption { return func(w *wireFuncs) { clone := *w @@ -63,6 +63,12 @@ func WithTracking(tracker Tracker) WireOption { err := clone.BroadcasterBroadcast(ctx, duty, pubkey, data) tracker.BroadcasterBroadcast(duty, pubkey, data, err) + if err == nil && duty.Type == DutyAttester { + if att, ok := data.(Attestation); ok { + bcastDelayFunc(duty.Slot, att) + } + } + return err } } From 9c0c243ea8508dc82835279a113a6cae28e3dd74 Mon Sep 17 00:00:00 2001 From: corverroos Date: Fri, 21 Apr 2023 16:07:35 +0200 Subject: [PATCH 02/10] cleanup --- core/tracker/incldelay.go | 15 +++++++++------ core/tracker/incldelay_internal_test.go | 4 +++- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/core/tracker/incldelay.go b/core/tracker/incldelay.go index 1efb682d9..4de8cd3d2 100644 --- a/core/tracker/incldelay.go +++ b/core/tracker/incldelay.go @@ -18,10 +18,12 @@ import ( "github.com/obolnetwork/charon/core" ) -// inclDelayLag is the number of slots to lag before calculating inclusion delay. -// Half an epoch is good compromise between finality and small gaps on startup. const ( - inclDelayLag = 16 + // inclDelayLag is the number of slots to lag before calculating inclusion delay. + // Half an epoch is good compromise between finality and small gaps on startup. + inclDelayLag = 16 + // trimEpochOffset is the number of epochs after which we delete cached broadcast delays. + // This matches scheduler trimEpochOffset. trimEpochOffset = 3 ) @@ -63,7 +65,7 @@ type InclusionDelay struct { eth2Cl eth2wrap.Client dutiesFunc dutiesFunc instrumentFunc func(inclDelaySlots []int64) - logMappingFunc func(ctx context.Context, slot int64, bcastDelay time.Duration, inclDelay int64) + logMappingFunc func(ctx context.Context, slot int64, bcastDelay time.Duration, inclDelay int64, isAggregated bool) genesisTime time.Time slotDuration time.Duration slotsPerEpoch int @@ -196,15 +198,16 @@ func (d *InclusionDelay) logDelayMapping(ctx context.Context, slot int64, att *e if !ok { log.Debug(ctx, "Missing broadcast delay found for included attestation", z.Int("slot", int(slot))) } else { - d.logMappingFunc(ctx, slot, bcastDelay, inclDelay) + d.logMappingFunc(ctx, slot, bcastDelay, inclDelay, len(att.AggregationBits.BitIndices()) > 1) } } -func logMapping(ctx context.Context, slot int64, bcastDelay time.Duration, inclDelay int64) { +func logMapping(ctx context.Context, slot int64, bcastDelay time.Duration, inclDelay int64, isAggregated bool) { log.Debug(ctx, "Attestation broadcast delay (secs) vs inclusion distance (slots)", z.Int("slot", int(slot)), z.F64("bcast_delay", bcastDelay.Seconds()), z.I64("incl_delay", inclDelay), + z.Bool("aggregated", isAggregated), ) } diff --git a/core/tracker/incldelay_internal_test.go b/core/tracker/incldelay_internal_test.go index b1d67c6d3..85499bc92 100644 --- a/core/tracker/incldelay_internal_test.go +++ b/core/tracker/incldelay_internal_test.go @@ -80,8 +80,10 @@ func TestInclDelay(t *testing.T) { inclDelay.Broadcasted(int64(att.Att.Data.Slot), core.NewAttestation(att.Att)) } - inclDelay.logMappingFunc = func(ctx context.Context, slot int64, bcastDelay time.Duration, inclDelay int64) { + inclDelay.logMappingFunc = func(ctx context.Context, slot int64, bcastDelay time.Duration, inclDelay int64, isAggregated bool) { require.Equal(t, time.Duration(blockSlot-slot)*slotDuration, bcastDelay) + require.Equal(t, blockSlot-slot, inclDelay) + require.Equal(t, false, isAggregated) } err = inclDelay.Instrument(ctx, core.Slot{ From 9214d8216a36d33321e02b37e269a031faa03ab2 Mon Sep 17 00:00:00 2001 From: corverroos Date: Sat, 22 Apr 2023 13:33:18 +0200 Subject: [PATCH 03/10] refactor --- .golangci.yml | 1 + app/app.go | 15 +- app/monitoringapi.go | 1 - app/privkeylock/privkeylock.go | 1 - cluster/definition.go | 2 - core/tracker/incldelay.go | 223 ------------------- core/tracker/incldelay_internal_test.go | 125 ----------- core/tracker/inclusion.go | 282 ++++++++++++++++++++++++ core/tracker/inclusion_internal_test.go | 72 ++++++ core/tracker/metrics.go | 7 + core/tracking.go | 15 +- testutil/verifypr/verify.go | 2 +- 12 files changed, 380 insertions(+), 366 deletions(-) delete mode 100644 core/tracker/incldelay.go delete mode 100644 core/tracker/incldelay_internal_test.go create mode 100644 core/tracker/inclusion.go create mode 100644 core/tracker/inclusion_internal_test.go diff --git a/.golangci.yml b/.golangci.yml index da84b8d98..342ea14d2 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -146,6 +146,7 @@ linters: - maintidx - maligned - musttag + - nestif - nonamedreturns - paralleltest - prealloc diff --git a/app/app.go b/app/app.go index 3eeb5ab85..a91936c09 100644 --- a/app/app.go +++ b/app/app.go @@ -400,12 +400,6 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, } sched.SubscribeSlots(setFeeRecipient(eth2Cl, eth2Pubkeys, feeRecipientFunc)) - inclDelay, err := tracker.NewInclusionDelay(ctx, eth2Cl, sched.GetDutyDefinition) - if err != nil { - return err - } - sched.SubscribeSlots(inclDelay.Instrument) - fetch, err := fetcher.New(eth2Cl, feeRecipientFunc) if err != nil { return err @@ -466,9 +460,15 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, if err != nil { return err } + + inclusion, err := tracker.NewInclusion(ctx, eth2Cl) + if err != nil { + return err + } + opts := []core.WireOption{ core.WithTracing(), - core.WithTracking(track, inclDelay.Broadcasted), + core.WithTracking(track, inclusion.Submitted), core.WithAsyncRetry(retryer), } core.Wire(sched, fetch, cons, dutyDB, vapi, parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster, opts...) @@ -486,6 +486,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PConsensus, startCons) life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartAggSigDB, lifecycle.HookFuncCtx(aggSigDB.Run)) life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartParSigDB, lifecycle.HookFuncCtx(parSigDB.Trim)) + life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartTracker, lifecycle.HookFuncCtx(inclusion.Run)) life.RegisterStop(lifecycle.StopScheduler, lifecycle.HookFuncMin(sched.Stop)) life.RegisterStop(lifecycle.StopDutyDB, lifecycle.HookFuncMin(dutyDB.Shutdown)) life.RegisterStop(lifecycle.StopRetryer, lifecycle.HookFuncCtx(retryer.Shutdown)) diff --git a/app/monitoringapi.go b/app/monitoringapi.go index a647b7364..700b9f39a 100644 --- a/app/monitoringapi.go +++ b/app/monitoringapi.go @@ -149,7 +149,6 @@ func startReadyChecker(ctx context.Context, tcpNode host.Host, eth2Cl eth2wrap.C } syncing, syncDistance, err := beaconNodeSyncing(ctx, eth2Cl) - //nolint:nestif if err != nil { err = errReadyBeaconNodeDown readyzGauge.Set(readyzBeaconNodeDown) diff --git a/app/privkeylock/privkeylock.go b/app/privkeylock/privkeylock.go index 349311884..76f4a306d 100644 --- a/app/privkeylock/privkeylock.go +++ b/app/privkeylock/privkeylock.go @@ -12,7 +12,6 @@ import ( // New creates a private key lock file in path, writing contextStr in it. // If a private key lock file exists at path New returns an error, a clean-up function otherwise. func New(path, contextStr string) (func() error, error) { - //nolint:nestif if _, err := os.Stat(path); err == nil { readCtxStr, err := os.ReadFile(path) if err != nil { diff --git a/cluster/definition.go b/cluster/definition.go index e72919e36..e5e3c8de9 100644 --- a/cluster/definition.go +++ b/cluster/definition.go @@ -167,8 +167,6 @@ func (d Definition) NodeIdx(pID peer.ID) (NodeIdx, error) { } // VerifySignatures returns true if all config signatures are fully populated and valid. A verified definition is ready for use in DKG. -// -//nolint:nestif func (d Definition) VerifySignatures() error { // Skip signature verification for definition versions earlier than v1.3 since there are no EIP712 signatures before v1.3.0. if !supportEIP712Sigs(d.Version) && !eip712SigsPresent(d.Operators) { diff --git a/core/tracker/incldelay.go b/core/tracker/incldelay.go deleted file mode 100644 index 4de8cd3d2..000000000 --- a/core/tracker/incldelay.go +++ /dev/null @@ -1,223 +0,0 @@ -// Copyright © 2022-2023 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1 - -package tracker - -import ( - "context" - "fmt" - "sync" - "time" - - eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" - "github.com/jonboulle/clockwork" - - "github.com/obolnetwork/charon/app/errors" - "github.com/obolnetwork/charon/app/eth2wrap" - "github.com/obolnetwork/charon/app/log" - "github.com/obolnetwork/charon/app/z" - "github.com/obolnetwork/charon/core" -) - -const ( - // inclDelayLag is the number of slots to lag before calculating inclusion delay. - // Half an epoch is good compromise between finality and small gaps on startup. - inclDelayLag = 16 - // trimEpochOffset is the number of epochs after which we delete cached broadcast delays. - // This matches scheduler trimEpochOffset. - trimEpochOffset = 3 -) - -// dutiesFunc returns the duty definitions for a given duty. -type dutiesFunc func(context.Context, core.Duty) (core.DutyDefinitionSet, error) - -// NewInclusionDelay returns a new inclusion delay tracker. -func NewInclusionDelay(ctx context.Context, eth2Cl eth2wrap.Client, dutiesFunc dutiesFunc) (*InclusionDelay, error) { - genesisTime, err := eth2Cl.GenesisTime(ctx) - if err != nil { - return nil, err - } - - slotDuration, err := eth2Cl.SlotDuration(ctx) - if err != nil { - return nil, err - } - - slotsPerEpoch, err := eth2Cl.SlotsPerEpoch(ctx) - if err != nil { - return nil, err - } - - return &InclusionDelay{ - eth2Cl: eth2Cl, - dutiesFunc: dutiesFunc, - instrumentFunc: instrumentAvgDelay, - logMappingFunc: logMapping, - bcastDelays: make(map[int64]map[eth2p0.Root]time.Duration), - genesisTime: genesisTime, - slotDuration: slotDuration, - slotsPerEpoch: int(slotsPerEpoch), - clock: clockwork.NewRealClock(), - }, nil -} - -// InclusionDelay tracks the inclusion delay of attestations. -type InclusionDelay struct { - eth2Cl eth2wrap.Client - dutiesFunc dutiesFunc - instrumentFunc func(inclDelaySlots []int64) - logMappingFunc func(ctx context.Context, slot int64, bcastDelay time.Duration, inclDelay int64, isAggregated bool) - genesisTime time.Time - slotDuration time.Duration - slotsPerEpoch int - clock clockwork.Clock - - mu sync.Mutex - bcastDelays map[int64]map[eth2p0.Root]time.Duration - dutyStartSlot int64 -} - -// Broadcasted records the broadcast delay of an attestation. -func (d *InclusionDelay) Broadcasted(slot int64, att core.Attestation) { - d.mu.Lock() - defer d.mu.Unlock() - - if _, ok := d.bcastDelays[slot]; !ok { - d.bcastDelays[slot] = make(map[eth2p0.Root]time.Duration) - } - - d.bcastDelays[slot][att.Data.BeaconBlockRoot] = d.clock.Now().Sub(d.slotStartTime(slot)) -} - -// Instrument calculates attestation inclusion delay for a block. -// -// Inclusion delay is the average of the distance between the slot a validator’s attestation -// is expected by the network and the slot the attestation is actually included on-chain. -// See https://rated.gitbook.io/rated-documentation/rating-methodologies/ethereum-beacon-chain/network-explorer-definitions/top-screener#inclusion-delay. -func (d *InclusionDelay) Instrument(ctx context.Context, current core.Slot) error { - // blockSlot the block we want to instrument. - blockSlot := current.Slot - inclDelayLag - - startSlot, ok := d.getOrSetStartSlot(current.Slot) - if !ok { // Set start slot. - return nil - } else if blockSlot < startSlot { - return nil // Still need to wait - } - - atts, err := d.eth2Cl.BlockAttestations(ctx, fmt.Sprint(blockSlot)) - if err != nil { - return err - } - - var delays []int64 - for _, att := range atts { - if att == nil || att.Data == nil { - return errors.New("attestation fields cannot be nil") - } - - attSlot := att.Data.Slot - if int64(attSlot) < startSlot { - continue - } - - // Get all our duties for this attestation blockSlot - set, err := d.dutiesFunc(ctx, core.NewAttesterDuty(int64(attSlot))) - if errors.Is(err, core.ErrNotFound) { - continue // No duties for this slot. - } else if err != nil { - return err - } - - // Get all our validator committee indexes for this attestation. - for _, def := range set { - duty, ok := def.(core.AttesterDefinition) - if !ok { - return errors.New("invalid attester definition") - } - - if duty.CommitteeIndex != att.Data.Index { - continue // This duty is for another committee - } - - if !att.AggregationBits.BitAt(duty.ValidatorCommitteeIndex) { - continue // We are not included in attestation - // Note that to track missed attestations, we'd need to keep state of seen attestations. - } - - inclDelay := blockSlot - int64(attSlot) - d.logDelayMapping(ctx, int64(attSlot), att, inclDelay) - delays = append(delays, inclDelay) - } - } - - if len(delays) > 0 { - d.instrumentFunc(delays) - } - - d.trimBcastDelays(blockSlot - int64(d.slotsPerEpoch*trimEpochOffset)) - - return nil -} - -func (d *InclusionDelay) slotStartTime(slot int64) time.Time { - return d.genesisTime.Add(time.Duration(slot) * d.slotDuration) -} - -// getOrSetStartSlot returns a previously set duty start slot and true or it sets it and returns false. -func (d *InclusionDelay) getOrSetStartSlot(slot int64) (int64, bool) { - d.mu.Lock() - defer d.mu.Unlock() - - if d.dutyStartSlot == 0 { - d.dutyStartSlot = slot - return 0, false - } - - return d.dutyStartSlot, true -} - -// trimBcastDelays deletes all broadcast delays that are older than the given slot. -func (d *InclusionDelay) trimBcastDelays(slot int64) { - d.mu.Lock() - defer d.mu.Unlock() - - for s := range d.bcastDelays { - if s <= slot { - delete(d.bcastDelays, s) - } - } -} - -// logDelayMapping logs the broadcast delay vs inclusion delay for a given attestation. -// TODO(corver): Find a better less verbose way to track this. -func (d *InclusionDelay) logDelayMapping(ctx context.Context, slot int64, att *eth2p0.Attestation, inclDelay int64) { - d.mu.Lock() - defer d.mu.Unlock() - - bcastDelay, ok := d.bcastDelays[slot][att.Data.BeaconBlockRoot] - if !ok { - log.Debug(ctx, "Missing broadcast delay found for included attestation", z.Int("slot", int(slot))) - } else { - d.logMappingFunc(ctx, slot, bcastDelay, inclDelay, len(att.AggregationBits.BitIndices()) > 1) - } -} - -func logMapping(ctx context.Context, slot int64, bcastDelay time.Duration, inclDelay int64, isAggregated bool) { - log.Debug(ctx, "Attestation broadcast delay (secs) vs inclusion distance (slots)", - z.Int("slot", int(slot)), - z.F64("bcast_delay", bcastDelay.Seconds()), - z.I64("incl_delay", inclDelay), - z.Bool("aggregated", isAggregated), - ) -} - -// instrumentAvgDelay sets the avg inclusion delay metric. -func instrumentAvgDelay(delays []int64) { - var sum int64 - for _, delay := range delays { - sum += delay - } - - avg := sum / int64(len(delays)) - inclusionDelay.Set(float64(avg)) -} diff --git a/core/tracker/incldelay_internal_test.go b/core/tracker/incldelay_internal_test.go deleted file mode 100644 index 85499bc92..000000000 --- a/core/tracker/incldelay_internal_test.go +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright © 2022-2023 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1 - -package tracker - -import ( - "context" - "fmt" - "testing" - "time" - - eth2v1 "github.com/attestantio/go-eth2-client/api/v1" - eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" - "github.com/jonboulle/clockwork" - "github.com/prysmaticlabs/go-bitfield" - "github.com/stretchr/testify/require" - - "github.com/obolnetwork/charon/core" - "github.com/obolnetwork/charon/testutil/beaconmock" -) - -func TestInclDelay(t *testing.T) { - const ( - blockSlot = 10 - slotsPerEpoch = 16 - slotDuration = time.Second - slot = blockSlot + inclDelayLag - ) - ctx := context.Background() - clock := clockwork.NewFakeClockAt(time.Now().Truncate(time.Hour)) - - bmock, err := beaconmock.New( - beaconmock.WithSlotDuration(slotDuration), - beaconmock.WithSlotsPerEpoch(slotsPerEpoch), - beaconmock.WithGenesisTime(clock.Now()), - ) - require.NoError(t, err) - - clock.Advance(blockSlot * slotDuration) - - expect := []int64{1, 2, 4, 8} - - var atts []att - for i, e := range expect { - atts = append(atts, makeAtt(blockSlot-e, int64(i), int64(i))) - } - - bmock.BlockAttestationsFunc = func(ctx context.Context, stateID string) ([]*eth2p0.Attestation, error) { - require.Equal(t, fmt.Sprint(blockSlot), stateID) - var res []*eth2p0.Attestation - for _, att := range atts { - res = append(res, att.Att) - } - - return res, nil - } - - dutiesFunc := func(_ context.Context, duty core.Duty) (core.DutyDefinitionSet, error) { - res := make(core.DutyDefinitionSet) - for i, att := range atts { - if int64(att.Att.Data.Slot) != duty.Slot { - continue - } - res[core.PubKey(fmt.Sprint(i))] = core.NewAttesterDefinition(att.Duty) - } - - return res, nil - } - - inclDelay, err := NewInclusionDelay(ctx, bmock, dutiesFunc) - require.NoError(t, err) - inclDelay.clock = clock - - done := make(chan struct{}) - inclDelay.instrumentFunc = func(delays []int64) { - require.EqualValues(t, expect, delays) - close(done) - } - - for _, att := range atts { - inclDelay.Broadcasted(int64(att.Att.Data.Slot), core.NewAttestation(att.Att)) - } - - inclDelay.logMappingFunc = func(ctx context.Context, slot int64, bcastDelay time.Duration, inclDelay int64, isAggregated bool) { - require.Equal(t, time.Duration(blockSlot-slot)*slotDuration, bcastDelay) - require.Equal(t, blockSlot-slot, inclDelay) - require.Equal(t, false, isAggregated) - } - - err = inclDelay.Instrument(ctx, core.Slot{ - Slot: 1, - }) - require.NoError(t, err) - - err = inclDelay.Instrument(ctx, core.Slot{ - Slot: slot, - SlotsPerEpoch: slotsPerEpoch, - }) - require.NoError(t, err) - - <-done -} - -type att struct { - Att *eth2p0.Attestation - Duty *eth2v1.AttesterDuty -} - -func makeAtt(slot int64, commIdx int64, valCommIdx int64) att { - aggBits := bitfield.NewBitlist(1024) - aggBits.SetBitAt(uint64(valCommIdx), true) - - return att{ - Att: ð2p0.Attestation{ - AggregationBits: aggBits, - Data: ð2p0.AttestationData{ - Slot: eth2p0.Slot(slot), - Index: eth2p0.CommitteeIndex(commIdx), - }, - }, - Duty: ð2v1.AttesterDuty{ - CommitteeIndex: eth2p0.CommitteeIndex(commIdx), - ValidatorCommitteeIndex: uint64(valCommIdx), - }, - } -} diff --git a/core/tracker/inclusion.go b/core/tracker/inclusion.go new file mode 100644 index 000000000..47d3d7a73 --- /dev/null +++ b/core/tracker/inclusion.go @@ -0,0 +1,282 @@ +// Copyright © 2022-2023 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1 + +package tracker + +import ( + "context" + "fmt" + "sync" + "time" + + eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" + + "github.com/obolnetwork/charon/app/errors" + "github.com/obolnetwork/charon/app/eth2wrap" + "github.com/obolnetwork/charon/app/log" + "github.com/obolnetwork/charon/app/z" + "github.com/obolnetwork/charon/core" +) + +// submission represents a duty submitted to the beacon node/chain. +type submission struct { + Duty core.Duty + Pubkey core.PubKey + Data core.SignedData + AttRoot eth2p0.Root + Delay time.Duration + Included bool +} + +// block is a simplified block with its attestations. +type block struct { + Slot int64 + Attestations map[eth2p0.Root]*eth2p0.Attestation +} + +// supported duty types. +var supported = map[core.DutyType]bool{ + core.DutyAttester: true, + core.DutyAggregator: true, + core.DutyProposer: true, + core.DutyBuilderProposer: true, + // TODO(corver) Add support for sync committee and exit duties +} + +// inclusion tracks the inclusion of submitted duties. +// It has a simplified API to allow for easy testing. +type inclusion struct { + mu sync.Mutex + submissions []submission + + missedFunc func(context.Context, submission) + attIncludedFunc func(context.Context, submission, block) +} + +// Submitted is called when a duty is submitted to the beacon node. +// It adds the duty to the list of submitted duties. +func (i *inclusion) Submitted(duty core.Duty, pubkey core.PubKey, data core.SignedData, delay time.Duration) error { + if !supported[duty.Type] { + return nil + } + + var ( + attRoot eth2p0.Root + err error + ) + if duty.Type == core.DutyAttester { + att, ok := data.(core.Attestation) + if !ok { + return errors.New("invalid attestation") + } + attRoot, err = att.HashTreeRoot() + if err != nil { + return errors.Wrap(err, "hash attestation") + } + } else if duty.Type == core.DutyAggregator { + agg, ok := data.(core.SignedAggregateAndProof) + if !ok { + return errors.New("invalid aggregate and proof") + } + attRoot, err = agg.Message.Aggregate.HashTreeRoot() + if err != nil { + return errors.Wrap(err, "hash aggregate") + } + } + + i.mu.Lock() + defer i.mu.Unlock() + i.submissions = append(i.submissions, submission{ + Duty: duty, + Pubkey: pubkey, + Data: data, + AttRoot: attRoot, + Delay: delay, + }) + + return nil +} + +// Trim removes all duties that are older than the specified slot. +// It also calls the missedFunc for any duties that have not been included. +func (i *inclusion) Trim(ctx context.Context, slot int64) { + i.mu.Lock() + defer i.mu.Unlock() + + var newElements []submission + for _, sub := range i.submissions { + if sub.Duty.Slot > slot { + newElements = append(newElements, sub) + } + if !sub.Included { + i.missedFunc(ctx, sub) + } + } + i.submissions = newElements +} + +// CheckBlock checks whether the block includes any of the submitted duties. +func (i *inclusion) CheckBlock(ctx context.Context, block block) { + i.mu.Lock() + defer i.mu.Unlock() + + for j, sub := range i.submissions { + if sub.Included { + continue + } + + switch sub.Duty.Type { + case core.DutyAttester, core.DutyAggregator: + _, ok := block.Attestations[sub.AttRoot] + if !ok { + continue + } + i.submissions[j].Included = true + i.attIncludedFunc(ctx, sub, block) + case core.DutyProposer, core.DutyBuilderProposer: + if sub.Duty.Slot != block.Slot { + continue + } + i.submissions[j].Included = true + // Nothing to report for block inclusions + default: + panic("bug: unexpected type") // Sanity check, this should never happen + } + } +} + +// reportMissed reports duties that were broadcast but never included on chain. +func reportMissed(ctx context.Context, sub submission) { + inclusionMisses.WithLabelValues(sub.Duty.Type.String()).Inc() + + switch sub.Duty.Type { + case core.DutyAttester, core.DutyAggregator: + msg := "Broadcasted attestation never included in any block" + if sub.Duty.Type == core.DutyAggregator { + msg = "Broadcasted attestation aggregate never included in any block" + } + + log.Warn(ctx, msg, nil, + z.Any("pubkey", sub.Pubkey), + z.I64("attestation_slot", sub.Duty.Slot), + ) + case core.DutyProposer, core.DutyBuilderProposer: + msg := "Broadcasted block never included in the chain" + if sub.Duty.Type == core.DutyBuilderProposer { + msg = "Broadcasted blinded block never included in the chain" + } + + log.Warn(ctx, msg, nil, + z.Any("pubkey", sub.Pubkey), + z.I64("block_slot", sub.Duty.Slot), + ) + default: + panic("bug: unexpected type") // Sanity check, this should never happen + } +} + +func reportAttInclusion(ctx context.Context, sub submission, block block) { + blockSlot := block.Slot + attSlot := int64(block.Attestations[sub.AttRoot].Data.Slot) + inclDelay := block.Slot - attSlot + + msg := "Block included attestation" + if sub.Duty.Type == core.DutyAggregator { + msg += " aggregate" + } + + log.Info(ctx, msg, + z.I64("block_slot", blockSlot), + z.I64("attestation_slot", attSlot), + z.Any("pubkey", sub.Pubkey), + z.I64("inclusion_delay", inclDelay), + z.Any("broadcast_delay", sub.Delay), + ) + + inclusionDelay.Set(float64(blockSlot - attSlot)) +} + +// NewInclusion returns a new InclusionChecker. +func NewInclusion(ctx context.Context, eth2Cl eth2wrap.Client) (*InclusionChecker, error) { + genesis, err := eth2Cl.GenesisTime(ctx) + if err != nil { + return nil, err + } + + slotDuration, err := eth2Cl.SlotDuration(ctx) + if err != nil { + return nil, err + } + + return &InclusionChecker{ + incl: &inclusion{ + attIncludedFunc: reportAttInclusion, + missedFunc: reportMissed, + }, + genesis: genesis, + slotDuration: slotDuration, + }, nil +} + +// InclusionChecker checks whether duties have been included in blocks. +type InclusionChecker struct { + genesis time.Time + slotDuration time.Duration + eth2Cl eth2wrap.Client + incl *inclusion +} + +// Submitted is called when a duty has been submitted. +func (a *InclusionChecker) Submitted(duty core.Duty, pubkey core.PubKey, data core.SignedData) error { + slotStart := a.genesis.Add(a.slotDuration * time.Duration(duty.Slot)) + return a.incl.Submitted(duty, pubkey, data, time.Since(slotStart)) +} + +func (a *InclusionChecker) Run(ctx context.Context) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + var checkedSlot int64 + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + slot := int64(time.Since(a.genesis) / a.slotDuration) + if checkedSlot == slot { + continue + } else if checkedSlot != 0 && checkedSlot+1 != slot { + slot = checkedSlot + 1 // We missed a slot, check the next one first + } + + if err := a.checkBlock(ctx, slot); err != nil { + log.Warn(ctx, "Failed to check inclusion", err, z.I64("slot", slot)) + } else { + checkedSlot = slot + } + } + } +} + +func (a *InclusionChecker) checkBlock(ctx context.Context, slot int64) error { + atts, err := a.eth2Cl.BlockAttestations(ctx, fmt.Sprint(slot)) + if err != nil { + return err + } else if len(atts) == 0 { + return nil // No block for this slot + } + + attsMap := make(map[eth2p0.Root]*eth2p0.Attestation) + for _, att := range atts { + root, err := att.HashTreeRoot() + if err != nil { + return errors.Wrap(err, "hash attestation") + } + + attsMap[root] = att + } + + a.incl.CheckBlock(ctx, block{Slot: slot, Attestations: attsMap}) + + return nil +} diff --git a/core/tracker/inclusion_internal_test.go b/core/tracker/inclusion_internal_test.go new file mode 100644 index 000000000..1a833fae1 --- /dev/null +++ b/core/tracker/inclusion_internal_test.go @@ -0,0 +1,72 @@ +// Copyright © 2022-2023 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1 + +package tracker + +import ( + "context" + "testing" + + eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/stretchr/testify/require" + + "github.com/obolnetwork/charon/core" + "github.com/obolnetwork/charon/testutil" +) + +func TestInclusion(t *testing.T) { + // Setup inclusion with a mock missedFunc and attIncludedFunc + var missed, included []core.Duty + incl := &inclusion{ + missedFunc: func(ctx context.Context, sub submission) { + missed = append(missed, sub.Duty) + }, + attIncludedFunc: func(ctx context.Context, sub submission, block block) { + included = append(included, sub.Duty) + }, + } + + // Create some duties + att1 := testutil.RandomAttestation() + att1Duty := core.NewAttesterDuty(int64(att1.Data.Slot)) + agg2 := testutil.RandomSignedAggregateAndProof() + agg2Duty := core.NewAggregatorDuty(int64(agg2.Message.Aggregate.Data.Slot)) + att3 := testutil.RandomAttestation() + att3Duty := core.NewAttesterDuty(int64(att3.Data.Slot)) + block4 := testutil.RandomCapellaVersionedSignedBeaconBlock() + block4Duty := core.NewProposerDuty(int64(block4.Capella.Message.Slot)) + + // Submit the duties + err := incl.Submitted(att1Duty, "", core.NewAttestation(att1), 0) + require.NoError(t, err) + err = incl.Submitted(agg2Duty, "", core.NewSignedAggregateAndProof(agg2), 0) + require.NoError(t, err) + err = incl.Submitted(att3Duty, "", core.NewAttestation(att3), 0) + require.NoError(t, err) + + coreBlock3, err := core.NewVersionedSignedBeaconBlock(block4) + require.NoError(t, err) + err = incl.Submitted(block4Duty, "", coreBlock3, 0) + require.NoError(t, err) + + // Create a mock block with the first two attestations. + att1Root, err := att1.HashTreeRoot() + require.NoError(t, err) + att2Root, err := agg2.Message.Aggregate.HashTreeRoot() + require.NoError(t, err) + + // Check the block + incl.CheckBlock(context.Background(), block{ + Slot: block4Duty.Slot, + Attestations: map[eth2p0.Root]*eth2p0.Attestation{ + att1Root: att1, + att2Root: agg2.Message.Aggregate, + }, + }) + // Assert that the first two duties were included + require.Equal(t, []core.Duty{att1Duty, agg2Duty}, included) + + // Trim the duties + incl.Trim(context.Background(), att3Duty.Slot) + // Assert that the third duty was missed + require.Equal(t, []core.Duty{att3Duty}, missed) +} diff --git a/core/tracker/metrics.go b/core/tracker/metrics.go index fa1c4fce9..35a2d88d3 100644 --- a/core/tracker/metrics.go +++ b/core/tracker/metrics.go @@ -86,4 +86,11 @@ var ( Name: "inclusion_delay", Help: "Cluster's average attestation inclusion delay in slots", }) + + inclusionMisses = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "core", + Subsystem: "tracker", + Name: "inclusion_missed_total", + Help: "Total number of broadcast duties never included in any block by type", + }, []string{"duty"}) ) diff --git a/core/tracking.go b/core/tracking.go index 3619954d6..4eb76cee2 100644 --- a/core/tracking.go +++ b/core/tracking.go @@ -4,10 +4,12 @@ package core import ( "context" + + "github.com/obolnetwork/charon/app/log" ) // WithTracking wraps component input functions to support tracking of core components. -func WithTracking(tracker Tracker, bcastDelayFunc func(int64, Attestation)) WireOption { +func WithTracking(tracker Tracker, submittedFunc func(Duty, PubKey, SignedData) error) WireOption { return func(w *wireFuncs) { clone := *w @@ -62,14 +64,15 @@ func WithTracking(tracker Tracker, bcastDelayFunc func(int64, Attestation)) Wire w.BroadcasterBroadcast = func(ctx context.Context, duty Duty, pubkey PubKey, data SignedData) error { err := clone.BroadcasterBroadcast(ctx, duty, pubkey, data) tracker.BroadcasterBroadcast(duty, pubkey, data, err) + if err != nil { + return err + } - if err == nil && duty.Type == DutyAttester { - if att, ok := data.(Attestation); ok { - bcastDelayFunc(duty.Slot, att) - } + if err := submittedFunc(duty, pubkey, data); err != nil { + log.Error(ctx, "Failed to submit duty", err) } - return err + return nil } } } diff --git a/testutil/verifypr/verify.go b/testutil/verifypr/verify.go index 60ca19dfd..19b3ca82f 100644 --- a/testutil/verifypr/verify.go +++ b/testutil/verifypr/verify.go @@ -2,7 +2,7 @@ // Command verifypr provides a tool to verify charon PRs against the template defined in docs/contibuting.md. // -//nolint:revive,cyclop,nestif +//nolint:revive,cyclop package main import ( From af8f48ff061d4ec33ddad0603721ec56b2f8eb4a Mon Sep 17 00:00:00 2001 From: corverroos Date: Sat, 22 Apr 2023 13:43:51 +0200 Subject: [PATCH 04/10] cleanup --- core/tracker/inclusion.go | 37 ++++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/core/tracker/inclusion.go b/core/tracker/inclusion.go index 47d3d7a73..f81ee41d5 100644 --- a/core/tracker/inclusion.go +++ b/core/tracker/inclusion.go @@ -17,6 +17,15 @@ import ( "github.com/obolnetwork/charon/core" ) +const ( + // inclCheckLag is the number of slots to lag before checking inclusion. + // Half an epoch is good compromise between finality and responsiveness. + inclCheckLag = 16 + // trimEpochOffset is the number of epochs after which we delete cached submissions. + // This matches scheduler trimEpochOffset. + trimEpochOffset = 3 +) + // submission represents a duty submitted to the beacon node/chain. type submission struct { Duty core.Duty @@ -207,22 +216,30 @@ func NewInclusion(ctx context.Context, eth2Cl eth2wrap.Client) (*InclusionChecke return nil, err } + slotsPerEpoch, err := eth2Cl.SlotsPerEpoch(ctx) + if err != nil { + return nil, err + } + return &InclusionChecker{ incl: &inclusion{ attIncludedFunc: reportAttInclusion, missedFunc: reportMissed, }, - genesis: genesis, - slotDuration: slotDuration, + eth2Cl: eth2Cl, + genesis: genesis, + slotDuration: slotDuration, + slotsPerEpoch: int64(slotsPerEpoch), }, nil } // InclusionChecker checks whether duties have been included in blocks. type InclusionChecker struct { - genesis time.Time - slotDuration time.Duration - eth2Cl eth2wrap.Client - incl *inclusion + genesis time.Time + slotDuration time.Duration + slotsPerEpoch int64 + eth2Cl eth2wrap.Client + incl *inclusion } // Submitted is called when a duty has been submitted. @@ -242,7 +259,7 @@ func (a *InclusionChecker) Run(ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - slot := int64(time.Since(a.genesis) / a.slotDuration) + slot := int64(time.Since(a.genesis)/a.slotDuration) - inclCheckLag if checkedSlot == slot { continue } else if checkedSlot != 0 && checkedSlot+1 != slot { @@ -251,9 +268,11 @@ func (a *InclusionChecker) Run(ctx context.Context) { if err := a.checkBlock(ctx, slot); err != nil { log.Warn(ctx, "Failed to check inclusion", err, z.I64("slot", slot)) - } else { - checkedSlot = slot + continue } + + checkedSlot = slot + a.incl.Trim(ctx, slot-(trimEpochOffset*a.slotsPerEpoch)) } } } From 57ec21d470c56561b8bb21e662da08a416ca63a8 Mon Sep 17 00:00:00 2001 From: corverroos Date: Sat, 22 Apr 2023 13:46:09 +0200 Subject: [PATCH 05/10] cleanup --- core/tracker/inclusion.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/tracker/inclusion.go b/core/tracker/inclusion.go index f81ee41d5..b23e688db 100644 --- a/core/tracker/inclusion.go +++ b/core/tracker/inclusion.go @@ -111,16 +111,17 @@ func (i *inclusion) Trim(ctx context.Context, slot int64) { i.mu.Lock() defer i.mu.Unlock() - var newElements []submission + var remaining []submission for _, sub := range i.submissions { if sub.Duty.Slot > slot { - newElements = append(newElements, sub) + remaining = append(remaining, sub) + continue } if !sub.Included { i.missedFunc(ctx, sub) } } - i.submissions = newElements + i.submissions = remaining } // CheckBlock checks whether the block includes any of the submitted duties. From 6cbc85b33a5201678db171e7c4342531b8e317b3 Mon Sep 17 00:00:00 2001 From: corverroos Date: Sat, 22 Apr 2023 13:47:33 +0200 Subject: [PATCH 06/10] cleanup --- core/tracker/inclusion.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/tracker/inclusion.go b/core/tracker/inclusion.go index b23e688db..2628d9211 100644 --- a/core/tracker/inclusion.go +++ b/core/tracker/inclusion.go @@ -168,6 +168,7 @@ func reportMissed(ctx context.Context, sub submission) { log.Warn(ctx, msg, nil, z.Any("pubkey", sub.Pubkey), z.I64("attestation_slot", sub.Duty.Slot), + z.Any("broadcast_delay", sub.Delay), ) case core.DutyProposer, core.DutyBuilderProposer: msg := "Broadcasted block never included in the chain" @@ -178,6 +179,7 @@ func reportMissed(ctx context.Context, sub submission) { log.Warn(ctx, msg, nil, z.Any("pubkey", sub.Pubkey), z.I64("block_slot", sub.Duty.Slot), + z.Any("broadcast_delay", sub.Delay), ) default: panic("bug: unexpected type") // Sanity check, this should never happen From 1c66d7c1d6a144f72eca86db66436625bf890f38 Mon Sep 17 00:00:00 2001 From: corverroos Date: Sat, 22 Apr 2023 14:31:24 +0200 Subject: [PATCH 07/10] cleanup --- core/tracker/inclusion.go | 36 ++++++++++++++++-------------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/core/tracker/inclusion.go b/core/tracker/inclusion.go index 2628d9211..377f925b2 100644 --- a/core/tracker/inclusion.go +++ b/core/tracker/inclusion.go @@ -21,9 +21,9 @@ const ( // inclCheckLag is the number of slots to lag before checking inclusion. // Half an epoch is good compromise between finality and responsiveness. inclCheckLag = 16 - // trimEpochOffset is the number of epochs after which we delete cached submissions. + // inclTrimLag is the number of slots after which we delete cached submissions. // This matches scheduler trimEpochOffset. - trimEpochOffset = 3 + inclTrimLag = 32 * 3 ) // submission represents a duty submitted to the beacon node/chain. @@ -219,30 +219,23 @@ func NewInclusion(ctx context.Context, eth2Cl eth2wrap.Client) (*InclusionChecke return nil, err } - slotsPerEpoch, err := eth2Cl.SlotsPerEpoch(ctx) - if err != nil { - return nil, err - } - return &InclusionChecker{ incl: &inclusion{ attIncludedFunc: reportAttInclusion, missedFunc: reportMissed, }, - eth2Cl: eth2Cl, - genesis: genesis, - slotDuration: slotDuration, - slotsPerEpoch: int64(slotsPerEpoch), + eth2Cl: eth2Cl, + genesis: genesis, + slotDuration: slotDuration, }, nil } // InclusionChecker checks whether duties have been included in blocks. type InclusionChecker struct { - genesis time.Time - slotDuration time.Duration - slotsPerEpoch int64 - eth2Cl eth2wrap.Client - incl *inclusion + genesis time.Time + slotDuration time.Duration + eth2Cl eth2wrap.Client + incl *inclusion } // Submitted is called when a duty has been submitted. @@ -263,10 +256,8 @@ func (a *InclusionChecker) Run(ctx context.Context) { return case <-ticker.C: slot := int64(time.Since(a.genesis)/a.slotDuration) - inclCheckLag - if checkedSlot == slot { + if checkedSlot == slot || slot < 0 { continue - } else if checkedSlot != 0 && checkedSlot+1 != slot { - slot = checkedSlot + 1 // We missed a slot, check the next one first } if err := a.checkBlock(ctx, slot); err != nil { @@ -275,7 +266,7 @@ func (a *InclusionChecker) Run(ctx context.Context) { } checkedSlot = slot - a.incl.Trim(ctx, slot-(trimEpochOffset*a.slotsPerEpoch)) + a.incl.Trim(ctx, slot-inclTrimLag) } } } @@ -285,9 +276,14 @@ func (a *InclusionChecker) checkBlock(ctx context.Context, slot int64) error { if err != nil { return err } else if len(atts) == 0 { + // TODO(corver): Remove this log, its probably too verbose + log.Debug(ctx, "Skipping missed block inclusion check", z.I64("slot", slot)) return nil // No block for this slot } + // TODO(corver): Remove this log, its probably too verbose + log.Debug(ctx, "Checking block inclusion", z.I64("slot", slot)) + attsMap := make(map[eth2p0.Root]*eth2p0.Attestation) for _, att := range atts { root, err := att.HashTreeRoot() From e3f13ff61c7ae21bf41436fd6c88a36a90de9513 Mon Sep 17 00:00:00 2001 From: corverroos Date: Sun, 23 Apr 2023 20:30:42 +0200 Subject: [PATCH 08/10] cleanup --- core/tracker/inclusion.go | 31 ++++++++++++++----------- core/tracker/inclusion_internal_test.go | 4 ++-- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/core/tracker/inclusion.go b/core/tracker/inclusion.go index 377f925b2..5d5d7b9a1 100644 --- a/core/tracker/inclusion.go +++ b/core/tracker/inclusion.go @@ -51,9 +51,9 @@ var supported = map[core.DutyType]bool{ // TODO(corver) Add support for sync committee and exit duties } -// inclusion tracks the inclusion of submitted duties. +// inclusionCore tracks the inclusionCore of submitted duties. // It has a simplified API to allow for easy testing. -type inclusion struct { +type inclusionCore struct { mu sync.Mutex submissions []submission @@ -63,7 +63,7 @@ type inclusion struct { // Submitted is called when a duty is submitted to the beacon node. // It adds the duty to the list of submitted duties. -func (i *inclusion) Submitted(duty core.Duty, pubkey core.PubKey, data core.SignedData, delay time.Duration) error { +func (i *inclusionCore) Submitted(duty core.Duty, pubkey core.PubKey, data core.SignedData, delay time.Duration) error { if !supported[duty.Type] { return nil } @@ -107,7 +107,7 @@ func (i *inclusion) Submitted(duty core.Duty, pubkey core.PubKey, data core.Sign // Trim removes all duties that are older than the specified slot. // It also calls the missedFunc for any duties that have not been included. -func (i *inclusion) Trim(ctx context.Context, slot int64) { +func (i *inclusionCore) Trim(ctx context.Context, slot int64) { i.mu.Lock() defer i.mu.Unlock() @@ -125,7 +125,7 @@ func (i *inclusion) Trim(ctx context.Context, slot int64) { } // CheckBlock checks whether the block includes any of the submitted duties. -func (i *inclusion) CheckBlock(ctx context.Context, block block) { +func (i *inclusionCore) CheckBlock(ctx context.Context, block block) { i.mu.Lock() defer i.mu.Unlock() @@ -186,9 +186,11 @@ func reportMissed(ctx context.Context, sub submission) { } } +// reportAttInclusion reports the inclusionCore of an attestation. func reportAttInclusion(ctx context.Context, sub submission, block block) { - blockSlot := block.Slot + aggregated := len(block.Attestations[sub.AttRoot].AggregationBits.BitIndices()) > 1 attSlot := int64(block.Attestations[sub.AttRoot].Data.Slot) + blockSlot := block.Slot inclDelay := block.Slot - attSlot msg := "Block included attestation" @@ -202,6 +204,7 @@ func reportAttInclusion(ctx context.Context, sub submission, block block) { z.Any("pubkey", sub.Pubkey), z.I64("inclusion_delay", inclDelay), z.Any("broadcast_delay", sub.Delay), + z.Bool("aggregated", aggregated), ) inclusionDelay.Set(float64(blockSlot - attSlot)) @@ -220,7 +223,7 @@ func NewInclusion(ctx context.Context, eth2Cl eth2wrap.Client) (*InclusionChecke } return &InclusionChecker{ - incl: &inclusion{ + core: &inclusionCore{ attIncludedFunc: reportAttInclusion, missedFunc: reportMissed, }, @@ -235,13 +238,13 @@ type InclusionChecker struct { genesis time.Time slotDuration time.Duration eth2Cl eth2wrap.Client - incl *inclusion + core *inclusionCore } // Submitted is called when a duty has been submitted. func (a *InclusionChecker) Submitted(duty core.Duty, pubkey core.PubKey, data core.SignedData) error { slotStart := a.genesis.Add(a.slotDuration * time.Duration(duty.Slot)) - return a.incl.Submitted(duty, pubkey, data, time.Since(slotStart)) + return a.core.Submitted(duty, pubkey, data, time.Since(slotStart)) } func (a *InclusionChecker) Run(ctx context.Context) { @@ -261,12 +264,12 @@ func (a *InclusionChecker) Run(ctx context.Context) { } if err := a.checkBlock(ctx, slot); err != nil { - log.Warn(ctx, "Failed to check inclusion", err, z.I64("slot", slot)) + log.Warn(ctx, "Failed to check inclusionCore", err, z.I64("slot", slot)) continue } checkedSlot = slot - a.incl.Trim(ctx, slot-inclTrimLag) + a.core.Trim(ctx, slot-inclTrimLag) } } } @@ -277,12 +280,12 @@ func (a *InclusionChecker) checkBlock(ctx context.Context, slot int64) error { return err } else if len(atts) == 0 { // TODO(corver): Remove this log, its probably too verbose - log.Debug(ctx, "Skipping missed block inclusion check", z.I64("slot", slot)) + log.Debug(ctx, "Skipping missed block inclusionCore check", z.I64("slot", slot)) return nil // No block for this slot } // TODO(corver): Remove this log, its probably too verbose - log.Debug(ctx, "Checking block inclusion", z.I64("slot", slot)) + log.Debug(ctx, "Checking block inclusionCore", z.I64("slot", slot)) attsMap := make(map[eth2p0.Root]*eth2p0.Attestation) for _, att := range atts { @@ -294,7 +297,7 @@ func (a *InclusionChecker) checkBlock(ctx context.Context, slot int64) error { attsMap[root] = att } - a.incl.CheckBlock(ctx, block{Slot: slot, Attestations: attsMap}) + a.core.CheckBlock(ctx, block{Slot: slot, Attestations: attsMap}) return nil } diff --git a/core/tracker/inclusion_internal_test.go b/core/tracker/inclusion_internal_test.go index 1a833fae1..c1e0407b4 100644 --- a/core/tracker/inclusion_internal_test.go +++ b/core/tracker/inclusion_internal_test.go @@ -14,9 +14,9 @@ import ( ) func TestInclusion(t *testing.T) { - // Setup inclusion with a mock missedFunc and attIncludedFunc + // Setup inclusionCore with a mock missedFunc and attIncludedFunc var missed, included []core.Duty - incl := &inclusion{ + incl := &inclusionCore{ missedFunc: func(ctx context.Context, sub submission) { missed = append(missed, sub.Duty) }, From bed3af4d90f4d95c0be0b9c56d534e0df08e550c Mon Sep 17 00:00:00 2001 From: corverroos Date: Mon, 24 Apr 2023 07:51:22 +0200 Subject: [PATCH 09/10] cleanup --- app/eth2wrap/synthproposer.go | 65 ++++++++++++--------- core/tracker/inclusion.go | 76 ++++++++++++++++--------- core/tracker/inclusion_internal_test.go | 10 +++- 3 files changed, 95 insertions(+), 56 deletions(-) diff --git a/app/eth2wrap/synthproposer.go b/app/eth2wrap/synthproposer.go index 6fe8582d5..59ec5e474 100644 --- a/app/eth2wrap/synthproposer.go +++ b/app/eth2wrap/synthproposer.go @@ -136,9 +136,6 @@ func (h *synthWrapper) syntheticBlock(ctx context.Context, slot eth2p0.Slot, vId // Convert signed block into unsigned block with synthetic graffiti and correct slot. - var synthGraffiti [32]byte - copy(synthGraffiti[:], syntheticBlockGraffiti) - feeRecipient := h.getFeeRecipient(vIdx) block := &spec.VersionedBeaconBlock{Version: signedBlock.Version} @@ -146,24 +143,24 @@ func (h *synthWrapper) syntheticBlock(ctx context.Context, slot eth2p0.Slot, vId switch signedBlock.Version { case spec.DataVersionPhase0: block.Phase0 = signedBlock.Phase0.Message - block.Phase0.Body.Graffiti = synthGraffiti + block.Phase0.Body.Graffiti = GetSyntheticGraffiti() block.Phase0.Slot = slot block.Phase0.ProposerIndex = vIdx case spec.DataVersionAltair: block.Altair = signedBlock.Altair.Message - block.Altair.Body.Graffiti = synthGraffiti + block.Altair.Body.Graffiti = GetSyntheticGraffiti() block.Altair.Slot = slot block.Altair.ProposerIndex = vIdx case spec.DataVersionBellatrix: block.Bellatrix = signedBlock.Bellatrix.Message - block.Bellatrix.Body.Graffiti = synthGraffiti + block.Bellatrix.Body.Graffiti = GetSyntheticGraffiti() block.Bellatrix.Slot = slot block.Bellatrix.ProposerIndex = vIdx block.Bellatrix.Body.ExecutionPayload.FeeRecipient = feeRecipient block.Bellatrix.Body.ExecutionPayload.Transactions = fraction(block.Bellatrix.Body.ExecutionPayload.Transactions) case spec.DataVersionCapella: block.Capella = signedBlock.Capella.Message - block.Capella.Body.Graffiti = synthGraffiti + block.Capella.Body.Graffiti = GetSyntheticGraffiti() block.Capella.Slot = slot block.Capella.ProposerIndex = vIdx block.Capella.Body.ExecutionPayload.FeeRecipient = feeRecipient @@ -183,6 +180,34 @@ func fraction(transactions []bellatrix.Transaction) []bellatrix.Transaction { // SubmitBlindedBeaconBlock submits a blinded beacon block or swallows it if marked as synthetic. func (h *synthWrapper) SubmitBlindedBeaconBlock(ctx context.Context, block *api.VersionedSignedBlindedBeaconBlock) error { + if IsSyntheticBlindedBlock(block) { + log.Debug(ctx, "Synthetic blinded beacon block swallowed") + return nil + } + + return h.Client.SubmitBlindedBeaconBlock(ctx, block) +} + +// SubmitBeaconBlock submits a beacon block or swallows it if marked as synthetic. +func (h *synthWrapper) SubmitBeaconBlock(ctx context.Context, block *spec.VersionedSignedBeaconBlock) error { + if IsSyntheticBlock(block) { + log.Debug(ctx, "Synthetic beacon block swallowed") + return nil + } + + return h.Client.SubmitBeaconBlock(ctx, block) +} + +// GetSyntheticGraffiti returns the graffiti used to mark synthetic blocks. +func GetSyntheticGraffiti() [32]byte { + var synthGraffiti [32]byte + copy(synthGraffiti[:], syntheticBlockGraffiti) + + return synthGraffiti +} + +// IsSyntheticBlindedBlock returns true if the blinded block is a synthetic block. +func IsSyntheticBlindedBlock(block *api.VersionedSignedBlindedBeaconBlock) bool { var graffiti [32]byte switch block.Version { case spec.DataVersionBellatrix: @@ -190,21 +215,14 @@ func (h *synthWrapper) SubmitBlindedBeaconBlock(ctx context.Context, block *api. case spec.DataVersionCapella: graffiti = block.Capella.Message.Body.Graffiti default: - return errors.New("unknown block version") - } - - var synthGraffiti [32]byte - copy(synthGraffiti[:], syntheticBlockGraffiti) - if graffiti == synthGraffiti { - log.Debug(ctx, "Synthetic blinded beacon block swallowed") - return nil + return false } - return h.Client.SubmitBlindedBeaconBlock(ctx, block) + return graffiti == GetSyntheticGraffiti() } -// SubmitBeaconBlock submits a beacon block or swallows it if marked as synthetic. -func (h *synthWrapper) SubmitBeaconBlock(ctx context.Context, block *spec.VersionedSignedBeaconBlock) error { +// IsSyntheticBlock returns true if the block is a synthetic block. +func IsSyntheticBlock(block *spec.VersionedSignedBeaconBlock) bool { var graffiti [32]byte switch block.Version { case spec.DataVersionPhase0: @@ -216,17 +234,10 @@ func (h *synthWrapper) SubmitBeaconBlock(ctx context.Context, block *spec.Versio case spec.DataVersionCapella: graffiti = block.Capella.Message.Body.Graffiti default: - return errors.New("unknown block version") - } - - var synthGraffiti [32]byte - copy(synthGraffiti[:], syntheticBlockGraffiti) - if graffiti == synthGraffiti { - log.Debug(ctx, "Synthetic beacon block swallowed") - return nil + return false } - return h.Client.SubmitBeaconBlock(ctx, block) + return graffiti == GetSyntheticGraffiti() } // synthProposerCache returns a new cache for synthetic proposer duties. diff --git a/core/tracker/inclusion.go b/core/tracker/inclusion.go index 5d5d7b9a1..a86fb4d55 100644 --- a/core/tracker/inclusion.go +++ b/core/tracker/inclusion.go @@ -28,12 +28,11 @@ const ( // submission represents a duty submitted to the beacon node/chain. type submission struct { - Duty core.Duty - Pubkey core.PubKey - Data core.SignedData - AttRoot eth2p0.Root - Delay time.Duration - Included bool + Duty core.Duty + Pubkey core.PubKey + Data core.SignedData + AttRoot eth2p0.Root + Delay time.Duration } // block is a simplified block with its attestations. @@ -90,6 +89,22 @@ func (i *inclusionCore) Submitted(duty core.Duty, pubkey core.PubKey, data core. if err != nil { return errors.Wrap(err, "hash aggregate") } + } else if duty.Type == core.DutyProposer { + block, ok := data.(core.VersionedSignedBeaconBlock) + if !ok { + return errors.New("invalid block") + } + if eth2wrap.IsSyntheticBlock(&block.VersionedSignedBeaconBlock) { + return nil + } + } else if duty.Type == core.DutyBuilderProposer { + block, ok := data.(core.VersionedSignedBlindedBeaconBlock) + if !ok { + return errors.New("invalid blinded block") + } + if eth2wrap.IsSyntheticBlindedBlock(&block.VersionedSignedBlindedBeaconBlock) { + return nil + } } i.mu.Lock() @@ -117,9 +132,9 @@ func (i *inclusionCore) Trim(ctx context.Context, slot int64) { remaining = append(remaining, sub) continue } - if !sub.Included { - i.missedFunc(ctx, sub) - } + + // Report missed and trim + i.missedFunc(ctx, sub) } i.submissions = remaining } @@ -129,29 +144,34 @@ func (i *inclusionCore) CheckBlock(ctx context.Context, block block) { i.mu.Lock() defer i.mu.Unlock() - for j, sub := range i.submissions { - if sub.Included { - continue - } - + var remaining []submission + for _, sub := range i.submissions { switch sub.Duty.Type { case core.DutyAttester, core.DutyAggregator: _, ok := block.Attestations[sub.AttRoot] if !ok { + remaining = append(remaining, sub) continue } - i.submissions[j].Included = true + + // TODO(corver): We should probably check if validator included in AggregationBits + // for attester duty by caching scheduler duty data to get validator committee index. + + // Report included and trim i.attIncludedFunc(ctx, sub, block) case core.DutyProposer, core.DutyBuilderProposer: if sub.Duty.Slot != block.Slot { + remaining = append(remaining, sub) continue } - i.submissions[j].Included = true - // Nothing to report for block inclusions + + // Nothing to report for block inclusions, just trim default: panic("bug: unexpected type") // Sanity check, this should never happen } } + + i.submissions = remaining } // reportMissed reports duties that were broadcast but never included on chain. @@ -160,9 +180,9 @@ func reportMissed(ctx context.Context, sub submission) { switch sub.Duty.Type { case core.DutyAttester, core.DutyAggregator: - msg := "Broadcasted attestation never included in any block" + msg := "Broadcasted attestation never included on-chain" if sub.Duty.Type == core.DutyAggregator { - msg = "Broadcasted attestation aggregate never included in any block" + msg = "Broadcasted attestation aggregate never included on-chain" } log.Warn(ctx, msg, nil, @@ -171,9 +191,9 @@ func reportMissed(ctx context.Context, sub submission) { z.Any("broadcast_delay", sub.Delay), ) case core.DutyProposer, core.DutyBuilderProposer: - msg := "Broadcasted block never included in the chain" + msg := "Broadcasted block never included on-chain" if sub.Duty.Type == core.DutyBuilderProposer { - msg = "Broadcasted blinded block never included in the chain" + msg = "Broadcasted blinded block never included on-chain" } log.Warn(ctx, msg, nil, @@ -186,16 +206,17 @@ func reportMissed(ctx context.Context, sub submission) { } } -// reportAttInclusion reports the inclusionCore of an attestation. +// reportAttInclusion reports attestations that were included in a block. func reportAttInclusion(ctx context.Context, sub submission, block block) { - aggregated := len(block.Attestations[sub.AttRoot].AggregationBits.BitIndices()) > 1 - attSlot := int64(block.Attestations[sub.AttRoot].Data.Slot) + att := block.Attestations[sub.AttRoot] + aggIndices := att.AggregationBits.BitIndices() + attSlot := int64(att.Data.Slot) blockSlot := block.Slot inclDelay := block.Slot - attSlot - msg := "Block included attestation" + msg := "Broadcasted attestation included on-chain" if sub.Duty.Type == core.DutyAggregator { - msg += " aggregate" + msg = "Broadcasted attestation aggregate included on-chain" } log.Info(ctx, msg, @@ -204,7 +225,8 @@ func reportAttInclusion(ctx context.Context, sub submission, block block) { z.Any("pubkey", sub.Pubkey), z.I64("inclusion_delay", inclDelay), z.Any("broadcast_delay", sub.Delay), - z.Bool("aggregated", aggregated), + z.Int("aggregate_len", len(aggIndices)), + z.Bool("aggregated", len(aggIndices) > 1), ) inclusionDelay.Set(float64(blockSlot - attSlot)) diff --git a/core/tracker/inclusion_internal_test.go b/core/tracker/inclusion_internal_test.go index c1e0407b4..2175da162 100644 --- a/core/tracker/inclusion_internal_test.go +++ b/core/tracker/inclusion_internal_test.go @@ -9,6 +9,7 @@ import ( eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/stretchr/testify/require" + "github.com/obolnetwork/charon/app/eth2wrap" "github.com/obolnetwork/charon/core" "github.com/obolnetwork/charon/testutil" ) @@ -34,6 +35,9 @@ func TestInclusion(t *testing.T) { att3Duty := core.NewAttesterDuty(int64(att3.Data.Slot)) block4 := testutil.RandomCapellaVersionedSignedBeaconBlock() block4Duty := core.NewProposerDuty(int64(block4.Capella.Message.Slot)) + block5 := testutil.RandomCapellaVersionedSignedBlindedBeaconBlock() + block5.Capella.Message.Body.Graffiti = eth2wrap.GetSyntheticGraffiti() // Ignored, not included or missed. + block5Duty := core.NewBuilderProposerDuty(int64(block5.Capella.Message.Slot)) // Submit the duties err := incl.Submitted(att1Duty, "", core.NewAttestation(att1), 0) @@ -43,9 +47,11 @@ func TestInclusion(t *testing.T) { err = incl.Submitted(att3Duty, "", core.NewAttestation(att3), 0) require.NoError(t, err) - coreBlock3, err := core.NewVersionedSignedBeaconBlock(block4) + coreBlock4, err := core.NewVersionedSignedBeaconBlock(block4) require.NoError(t, err) - err = incl.Submitted(block4Duty, "", coreBlock3, 0) + err = incl.Submitted(block4Duty, "", coreBlock4, 0) + require.NoError(t, err) + err = incl.Submitted(block5Duty, "", block5, 0) require.NoError(t, err) // Create a mock block with the first two attestations. From e1c841fed66d67a299a59ee402399d54c110ce5c Mon Sep 17 00:00:00 2001 From: corverroos Date: Mon, 24 Apr 2023 07:57:36 +0200 Subject: [PATCH 10/10] cleanup --- core/tracker/inclusion.go | 8 ++++---- core/tracker/inclusion_internal_test.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/tracker/inclusion.go b/core/tracker/inclusion.go index a86fb4d55..f0a3889b2 100644 --- a/core/tracker/inclusion.go +++ b/core/tracker/inclusion.go @@ -50,7 +50,7 @@ var supported = map[core.DutyType]bool{ // TODO(corver) Add support for sync committee and exit duties } -// inclusionCore tracks the inclusionCore of submitted duties. +// inclusionCore tracks the inclusion of submitted duties. // It has a simplified API to allow for easy testing. type inclusionCore struct { mu sync.Mutex @@ -286,7 +286,7 @@ func (a *InclusionChecker) Run(ctx context.Context) { } if err := a.checkBlock(ctx, slot); err != nil { - log.Warn(ctx, "Failed to check inclusionCore", err, z.I64("slot", slot)) + log.Warn(ctx, "Failed to check inclusion", err, z.I64("slot", slot)) continue } @@ -302,12 +302,12 @@ func (a *InclusionChecker) checkBlock(ctx context.Context, slot int64) error { return err } else if len(atts) == 0 { // TODO(corver): Remove this log, its probably too verbose - log.Debug(ctx, "Skipping missed block inclusionCore check", z.I64("slot", slot)) + log.Debug(ctx, "Skipping missed block inclusion check", z.I64("slot", slot)) return nil // No block for this slot } // TODO(corver): Remove this log, its probably too verbose - log.Debug(ctx, "Checking block inclusionCore", z.I64("slot", slot)) + log.Debug(ctx, "Checking block inclusion", z.I64("slot", slot)) attsMap := make(map[eth2p0.Root]*eth2p0.Attestation) for _, att := range atts { diff --git a/core/tracker/inclusion_internal_test.go b/core/tracker/inclusion_internal_test.go index 2175da162..cef3b0807 100644 --- a/core/tracker/inclusion_internal_test.go +++ b/core/tracker/inclusion_internal_test.go @@ -15,7 +15,7 @@ import ( ) func TestInclusion(t *testing.T) { - // Setup inclusionCore with a mock missedFunc and attIncludedFunc + // Setup inclusion with a mock missedFunc and attIncludedFunc var missed, included []core.Duty incl := &inclusionCore{ missedFunc: func(ctx context.Context, sub submission) {