Skip to content

Commit

Permalink
core: wire recaster component (#2752)
Browse files Browse the repository at this point in the history
Wire recaster component in the Wire function of `interfaces.go` to integrate `DutyBuilderRegistration` to tracker. This will enable tracking of all the rebroadcast submissions by recaster.

category: feature
ticket: #2669
  • Loading branch information
dB2510 authored Dec 18, 2023
1 parent 931ab97 commit 83599f8
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 20 deletions.
36 changes: 16 additions & 20 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,9 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
return err
}

if err = wireRecaster(ctx, eth2Cl, sched, sigAgg, broadcaster, cluster.Validators,
conf.BuilderAPI, conf.TestConfig.BroadcastCallback); err != nil {
recaster, err := newRecaster(ctx, eth2Cl, cluster.Validators,
conf.BuilderAPI, conf.TestConfig.BroadcastCallback)
if err != nil {
return errors.Wrap(err, "wire recaster")
}

Expand All @@ -505,7 +506,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
core.WithTracking(track, inclusion),
core.WithAsyncRetry(retryer),
}
core.Wire(sched, fetch, cons, dutyDB, vapi, parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster, opts...)
core.Wire(sched, fetch, cons, dutyDB, vapi, parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster, recaster, opts...)

err = wireValidatorMock(ctx, conf, eth2Cl, pubshares, sched)
if err != nil {
Expand Down Expand Up @@ -576,12 +577,11 @@ func wirePrioritise(ctx context.Context, conf Config, life *lifecycle.Manager, t
return nil
}

// wireRecaster wires the rebroadcaster component to scheduler, sigAgg and broadcaster.
// This is not done in core.Wire since recaster isn't really part of the official core workflow (yet).
func wireRecaster(ctx context.Context, eth2Cl eth2wrap.Client, sched core.Scheduler, sigAgg core.SigAgg,
broadcaster core.Broadcaster, validators []*manifestpb.Validator, builderAPI bool,
// newRecaster returns the rebroadcaster component with pre-generate registration stored in its memory.
// The wiring of recaster is done in core.Wire to support tracking of re-broadcasted duties.
func newRecaster(ctx context.Context, eth2Cl eth2wrap.Client, validators []*manifestpb.Validator, builderAPI bool,
callback func(context.Context, core.Duty, core.SignedDataSet) error,
) error {
) (core.Recaster, error) {
recaster, err := bcast.NewRecaster(func(ctx context.Context) (map[eth2p0.BLSPubKey]struct{}, error) {
valList, err := eth2Cl.ActiveValidators(ctx)
if err != nil {
Expand All @@ -597,19 +597,15 @@ func wireRecaster(ctx context.Context, eth2Cl eth2wrap.Client, sched core.Schedu
return ret, nil
})
if err != nil {
return errors.Wrap(err, "recaster init")
return nil, errors.Wrap(err, "recaster init")
}

sched.SubscribeSlots(recaster.SlotTicked)
sigAgg.Subscribe(recaster.Store)
recaster.Subscribe(broadcaster.Broadcast)

if callback != nil {
recaster.Subscribe(callback)
}

if !builderAPI {
return nil
return recaster, nil
}

for _, val := range validators {
Expand All @@ -620,30 +616,30 @@ func wireRecaster(ctx context.Context, eth2Cl eth2wrap.Client, sched core.Schedu

reg := new(eth2api.VersionedSignedValidatorRegistration)
if err := json.Unmarshal(val.BuilderRegistrationJson, reg); err != nil {
return errors.Wrap(err, "unmarshal validator registration")
return nil, errors.Wrap(err, "unmarshal validator registration")
}

pubkey, err := core.PubKeyFromBytes(val.PublicKey)
if err != nil {
return errors.Wrap(err, "core pubkey from bytes")
return nil, errors.Wrap(err, "core pubkey from bytes")
}

signedData, err := core.NewVersionedSignedValidatorRegistration(reg)
if err != nil {
return errors.Wrap(err, "new versioned signed validator registration")
return nil, errors.Wrap(err, "new versioned signed validator registration")
}

slot, err := slotFromTimestamp(ctx, eth2Cl, reg.V1.Message.Timestamp)
if err != nil {
return errors.Wrap(err, "calculate slot from timestamp")
return nil, errors.Wrap(err, "calculate slot from timestamp")
}

if err = recaster.Store(ctx, core.NewBuilderRegistrationDuty(slot), core.SignedDataSet{pubkey: signedData}); err != nil {
return errors.Wrap(err, "recaster store registration")
return nil, errors.Wrap(err, "recaster store registration")
}
}

return nil
return recaster, nil
}

// newTracker creates and starts a new tracker instance.
Expand Down
22 changes: 22 additions & 0 deletions core/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ type Broadcaster interface {
Broadcast(context.Context, Duty, SignedDataSet) error
}

// Recaster rebroadcasts aggregated signed duty set periodically to the beacon node.
type Recaster interface {
// SlotTicked is called when new slots tick.
SlotTicked(context.Context, Slot) error

// Store stores aggregate signed duty for rebroadcasting.
Store(context.Context, Duty, SignedDataSet) error

// Subscribe subscribes to rebroadcasted duties.
Subscribe(func(context.Context, Duty, SignedDataSet) error)
}

// InclusionChecker checks whether submitted duties have been included on-chain.
// TODO(corver): Merge this with tracker below as a compose multi tracker.
type InclusionChecker interface {
Expand Down Expand Up @@ -242,6 +254,9 @@ type wireFuncs struct {
AggSigDBStore func(context.Context, Duty, SignedDataSet) error
AggSigDBAwait func(context.Context, Duty, PubKey) (SignedData, error)
BroadcasterBroadcast func(context.Context, Duty, SignedDataSet) error
RecasterSlotTicked func(context.Context, Slot) error
RecasterSubscribe func(func(context.Context, Duty, SignedDataSet) error)
RecasterStore func(context.Context, Duty, SignedDataSet) error
}

// WireOption defines a functional option to configure wiring.
Expand All @@ -258,6 +273,7 @@ func Wire(sched Scheduler,
sigAgg SigAgg,
aggSigDB AggSigDB,
bcast Broadcaster,
recast Recaster,
opts ...WireOption,
) {
w := wireFuncs{
Expand Down Expand Up @@ -298,6 +314,9 @@ func Wire(sched Scheduler,
AggSigDBStore: aggSigDB.Store,
AggSigDBAwait: aggSigDB.Await,
BroadcasterBroadcast: bcast.Broadcast,
RecasterSubscribe: recast.Subscribe,
RecasterSlotTicked: recast.SlotTicked,
RecasterStore: recast.Store,
}

for _, opt := range opts {
Expand Down Expand Up @@ -326,4 +345,7 @@ func Wire(sched Scheduler,
w.ParSigDBSubscribeThreshold(w.SigAggAggregate)
w.SigAggSubscribe(w.AggSigDBStore)
w.SigAggSubscribe(w.BroadcasterBroadcast)
w.SchedulerSubscribeSlots(w.RecasterSlotTicked)
w.SigAggSubscribe(w.RecasterStore)
w.RecasterSubscribe(w.BroadcasterBroadcast)
}

0 comments on commit 83599f8

Please sign in to comment.