From 83599f81b5e04003409c0f53d79d85e967b7e3bf Mon Sep 17 00:00:00 2001 From: Dhruv Bodani Date: Mon, 18 Dec 2023 19:09:35 +0530 Subject: [PATCH] core: wire recaster component (#2752) Wire recaster component in the Wire function of `interfaces.go` to integrate `DutyBuilderRegistration` to tracker. This will enable tracking of all the rebroadcast submissions by recaster. category: feature ticket: #2669 --- app/app.go | 36 ++++++++++++++++-------------------- core/interfaces.go | 22 ++++++++++++++++++++++ 2 files changed, 38 insertions(+), 20 deletions(-) diff --git a/app/app.go b/app/app.go index 04d6a2c5b..d5fbf8136 100644 --- a/app/app.go +++ b/app/app.go @@ -485,8 +485,9 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, return err } - if err = wireRecaster(ctx, eth2Cl, sched, sigAgg, broadcaster, cluster.Validators, - conf.BuilderAPI, conf.TestConfig.BroadcastCallback); err != nil { + recaster, err := newRecaster(ctx, eth2Cl, cluster.Validators, + conf.BuilderAPI, conf.TestConfig.BroadcastCallback) + if err != nil { return errors.Wrap(err, "wire recaster") } @@ -505,7 +506,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, core.WithTracking(track, inclusion), core.WithAsyncRetry(retryer), } - core.Wire(sched, fetch, cons, dutyDB, vapi, parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster, opts...) + core.Wire(sched, fetch, cons, dutyDB, vapi, parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster, recaster, opts...) err = wireValidatorMock(ctx, conf, eth2Cl, pubshares, sched) if err != nil { @@ -576,12 +577,11 @@ func wirePrioritise(ctx context.Context, conf Config, life *lifecycle.Manager, t return nil } -// wireRecaster wires the rebroadcaster component to scheduler, sigAgg and broadcaster. -// This is not done in core.Wire since recaster isn't really part of the official core workflow (yet). -func wireRecaster(ctx context.Context, eth2Cl eth2wrap.Client, sched core.Scheduler, sigAgg core.SigAgg, - broadcaster core.Broadcaster, validators []*manifestpb.Validator, builderAPI bool, +// newRecaster returns the rebroadcaster component with pre-generate registration stored in its memory. +// The wiring of recaster is done in core.Wire to support tracking of re-broadcasted duties. +func newRecaster(ctx context.Context, eth2Cl eth2wrap.Client, validators []*manifestpb.Validator, builderAPI bool, callback func(context.Context, core.Duty, core.SignedDataSet) error, -) error { +) (core.Recaster, error) { recaster, err := bcast.NewRecaster(func(ctx context.Context) (map[eth2p0.BLSPubKey]struct{}, error) { valList, err := eth2Cl.ActiveValidators(ctx) if err != nil { @@ -597,19 +597,15 @@ func wireRecaster(ctx context.Context, eth2Cl eth2wrap.Client, sched core.Schedu return ret, nil }) if err != nil { - return errors.Wrap(err, "recaster init") + return nil, errors.Wrap(err, "recaster init") } - sched.SubscribeSlots(recaster.SlotTicked) - sigAgg.Subscribe(recaster.Store) - recaster.Subscribe(broadcaster.Broadcast) - if callback != nil { recaster.Subscribe(callback) } if !builderAPI { - return nil + return recaster, nil } for _, val := range validators { @@ -620,30 +616,30 @@ func wireRecaster(ctx context.Context, eth2Cl eth2wrap.Client, sched core.Schedu reg := new(eth2api.VersionedSignedValidatorRegistration) if err := json.Unmarshal(val.BuilderRegistrationJson, reg); err != nil { - return errors.Wrap(err, "unmarshal validator registration") + return nil, errors.Wrap(err, "unmarshal validator registration") } pubkey, err := core.PubKeyFromBytes(val.PublicKey) if err != nil { - return errors.Wrap(err, "core pubkey from bytes") + return nil, errors.Wrap(err, "core pubkey from bytes") } signedData, err := core.NewVersionedSignedValidatorRegistration(reg) if err != nil { - return errors.Wrap(err, "new versioned signed validator registration") + return nil, errors.Wrap(err, "new versioned signed validator registration") } slot, err := slotFromTimestamp(ctx, eth2Cl, reg.V1.Message.Timestamp) if err != nil { - return errors.Wrap(err, "calculate slot from timestamp") + return nil, errors.Wrap(err, "calculate slot from timestamp") } if err = recaster.Store(ctx, core.NewBuilderRegistrationDuty(slot), core.SignedDataSet{pubkey: signedData}); err != nil { - return errors.Wrap(err, "recaster store registration") + return nil, errors.Wrap(err, "recaster store registration") } } - return nil + return recaster, nil } // newTracker creates and starts a new tracker instance. diff --git a/core/interfaces.go b/core/interfaces.go index fd551b036..8b114f67d 100644 --- a/core/interfaces.go +++ b/core/interfaces.go @@ -162,6 +162,18 @@ type Broadcaster interface { Broadcast(context.Context, Duty, SignedDataSet) error } +// Recaster rebroadcasts aggregated signed duty set periodically to the beacon node. +type Recaster interface { + // SlotTicked is called when new slots tick. + SlotTicked(context.Context, Slot) error + + // Store stores aggregate signed duty for rebroadcasting. + Store(context.Context, Duty, SignedDataSet) error + + // Subscribe subscribes to rebroadcasted duties. + Subscribe(func(context.Context, Duty, SignedDataSet) error) +} + // InclusionChecker checks whether submitted duties have been included on-chain. // TODO(corver): Merge this with tracker below as a compose multi tracker. type InclusionChecker interface { @@ -242,6 +254,9 @@ type wireFuncs struct { AggSigDBStore func(context.Context, Duty, SignedDataSet) error AggSigDBAwait func(context.Context, Duty, PubKey) (SignedData, error) BroadcasterBroadcast func(context.Context, Duty, SignedDataSet) error + RecasterSlotTicked func(context.Context, Slot) error + RecasterSubscribe func(func(context.Context, Duty, SignedDataSet) error) + RecasterStore func(context.Context, Duty, SignedDataSet) error } // WireOption defines a functional option to configure wiring. @@ -258,6 +273,7 @@ func Wire(sched Scheduler, sigAgg SigAgg, aggSigDB AggSigDB, bcast Broadcaster, + recast Recaster, opts ...WireOption, ) { w := wireFuncs{ @@ -298,6 +314,9 @@ func Wire(sched Scheduler, AggSigDBStore: aggSigDB.Store, AggSigDBAwait: aggSigDB.Await, BroadcasterBroadcast: bcast.Broadcast, + RecasterSubscribe: recast.Subscribe, + RecasterSlotTicked: recast.SlotTicked, + RecasterStore: recast.Store, } for _, opt := range opts { @@ -326,4 +345,7 @@ func Wire(sched Scheduler, w.ParSigDBSubscribeThreshold(w.SigAggAggregate) w.SigAggSubscribe(w.AggSigDBStore) w.SigAggSubscribe(w.BroadcasterBroadcast) + w.SchedulerSubscribeSlots(w.RecasterSlotTicked) + w.SigAggSubscribe(w.RecasterStore) + w.RecasterSubscribe(w.BroadcasterBroadcast) }