Skip to content

Commit

Permalink
Add OnFirstTimeHealthy
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Oct 9, 2024
1 parent fa043d2 commit 5cc03d2
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 19 deletions.
33 changes: 27 additions & 6 deletions pkg/beacon/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

eth2client "github.com/attestantio/go-eth2-client"
Expand Down Expand Up @@ -142,6 +143,8 @@ type Node interface {
OnHealthCheckSucceeded(ctx context.Context, handler func(ctx context.Context, event *HealthCheckSucceededEvent) error)
// OnFinalityCheckpointUpdated is called when a the head finality checkpoint is updated.
OnFinalityCheckpointUpdated(ctx context.Context, handler func(ctx context.Context, event *FinalityCheckpointUpdated) error)
// OnFirstTimeHealthy is called when the node is healthy for the first time.
OnFirstTimeHealthy(ctx context.Context, handler func(ctx context.Context, event *FirstTimeHealthyEvent) error)
}

// Node represents an Ethereum beacon node. It computes values based on the spec.
Expand Down Expand Up @@ -175,6 +178,11 @@ type node struct {

metrics *Metrics

Ready bool

hasEmittedFirstTimeHealthy bool
firstHealthyMutex sync.Mutex

crons *gocron.Scheduler
}

Expand All @@ -189,6 +197,8 @@ func NewNode(log logrus.FieldLogger, config *Config, namespace string, options O
broker: emission.NewEmitter(),

stat: NewStatus(options.HealthCheck.SuccessfulResponses, options.HealthCheck.FailedResponses),

firstHealthyMutex: sync.Mutex{},
}

if options.PrometheusMetrics {
Expand All @@ -203,6 +213,8 @@ func NewNode(log logrus.FieldLogger, config *Config, namespace string, options O
}

func (n *node) Start(ctx context.Context) error {
n.log.Info("Starting beacon...")

ctx, cancel := context.WithCancel(ctx)
n.ctx = ctx
n.cancel = cancel
Expand Down Expand Up @@ -263,6 +275,8 @@ func (n *node) Start(ctx context.Context) error {

s.StartAsync()

n.log.Info("Beacon started!")

return nil
}

Expand Down Expand Up @@ -351,11 +365,13 @@ func (n *node) bootstrap(ctx context.Context) error {
return err
}

n.publishReady(ctx)

//nolint:errcheck // we dont care if this errors out since it runs indefinitely in a goroutine
go n.ensureBeaconSubscription(ctx)

n.Ready = true

go n.publishReady(ctx)

return nil
}

Expand Down Expand Up @@ -428,12 +444,19 @@ func (n *node) runHealthcheck(ctx context.Context) {

n.stat.Health().RecordSuccess()

n.firstHealthyMutex.Lock()
defer n.firstHealthyMutex.Unlock()

if !n.hasEmittedFirstTimeHealthy {
n.hasEmittedFirstTimeHealthy = true

go n.publishFirstTimeHealthy(ctx)
}

n.publishHealthCheckSucceeded(ctx, time.Since(start))
}

func (n *node) initializeState(ctx context.Context) error {
n.log.Info("Initializing beacon state")

spec, err := n.FetchSpec(ctx)
if err != nil {
return err
Expand All @@ -446,8 +469,6 @@ func (n *node) initializeState(ctx context.Context) error {

n.wallclock = ethwallclock.NewEthereumBeaconChain(genesis.GenesisTime, spec.SecondsPerSlot.AsDuration(), uint64(spec.SlotsPerEpoch))

n.log.Info("Beacon state initialized! Ready to serve requests...")

return nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/beacon/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
topicHealthCheckSucceeded = "health_check_suceeded"
topicHealthCheckFailed = "health_check_failed"
topicFinalityCheckpointUpdated = "finality_checkpoint_updated"
topicFirstTimeHealthy = "first_time_healthy"

// Official beacon events that are proxied
topicAttestation = "attestation"
Expand Down Expand Up @@ -92,3 +93,7 @@ type HealthCheckFailedEvent struct {
type FinalityCheckpointUpdated struct {
Finality *v1.Finality
}

// FirstTimeHealthyEvent is emitted when the node is first considered healthy.
type FirstTimeHealthyEvent struct {
}
1 change: 1 addition & 0 deletions pkg/beacon/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func DefaultEnabledBeaconSubscriptionOptions() BeaconSubscriptionOptions {
topicHead,
topicVoluntaryExit,
topicContributionAndProof,
topicBlobSidecar,
},
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/beacon/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,7 @@ func (n *node) publishFinalityCheckpointUpdated(ctx context.Context, finality *v
Finality: finality,
})
}

func (n *node) publishFirstTimeHealthy(ctx context.Context) {
n.broker.Emit(topicFirstTimeHealthy, &FirstTimeHealthyEvent{})
}
6 changes: 6 additions & 0 deletions pkg/beacon/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,9 @@ func (n *node) OnFinalityCheckpointUpdated(ctx context.Context, handler func(ctx
n.handleSubscriberError(handler(ctx, event), topicFinalityCheckpointUpdated)
})
}

func (n *node) OnFirstTimeHealthy(ctx context.Context, handler func(ctx context.Context, event *FirstTimeHealthyEvent) error) {
n.broker.On(topicFirstTimeHealthy, func(event *FirstTimeHealthyEvent) {
n.handleSubscriberError(handler(ctx, event), topicFirstTimeHealthy)
})
}
14 changes: 1 addition & 13 deletions pkg/beacon/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,7 @@ func (n *node) subscribeToBeaconEvents(ctx context.Context) error {
return errors.New("client does not implement eth2client.Subscriptions")
}

topics := []string{}

for key, supported := range v1.SupportedEventTopics {
if !supported {
continue
}

if !n.options.BeaconSubscription.Topics.Exists(key) {
continue
}

topics = append(topics, key)
}
topics := n.options.BeaconSubscription.Topics

n.log.WithField("topics", topics).Info("Subscribing to events upstream")

Expand Down

0 comments on commit 5cc03d2

Please sign in to comment.