diff --git a/go.mod b/go.mod index d00f81b4..e482e1ad 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/chuckpreslar/emission v0.0.0-20170206194824-a7ddd980baf9 github.com/creasty/defaults v1.7.0 github.com/ethereum/go-ethereum v1.14.10 - github.com/ethpandaops/beacon v0.41.0 + github.com/ethpandaops/beacon v0.42.0 github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756 github.com/ethpandaops/ethwallclock v0.3.0 github.com/go-co-op/gocron v1.27.1 diff --git a/go.sum b/go.sum index 5b9a9184..6f63c6a0 100644 --- a/go.sum +++ b/go.sum @@ -248,8 +248,8 @@ github.com/ethereum/go-ethereum v1.14.10 h1:kC24WjYeRjDy86LVo6MfF5Xs7nnUu+XG4Aja github.com/ethereum/go-ethereum v1.14.10/go.mod h1:+l/fr42Mma+xBnhefL/+z11/hcmJ2egl+ScIVPjhc7E= github.com/ethereum/go-verkle v0.1.1-0.20240829091221-dffa7562dbe9 h1:8NfxH2iXvJ60YRB8ChToFTUzl8awsc3cJ8CbLjGIl/A= github.com/ethereum/go-verkle v0.1.1-0.20240829091221-dffa7562dbe9/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk= -github.com/ethpandaops/beacon v0.41.0 h1:9CmgNeTZ6X+B1U7SOJzy3rf6WFtFb3CA2DTFEgGwLc8= -github.com/ethpandaops/beacon v0.41.0/go.mod h1:hKfalJGsF4BuWPwcGCX/4fdQR31zDJVaTLWwrkfNTzw= +github.com/ethpandaops/beacon v0.42.0 h1:5a3ld5wuAgX+N5KxEPuNfxDhdeiBG4gXlTAgCm0AuSE= +github.com/ethpandaops/beacon v0.42.0/go.mod h1:hKfalJGsF4BuWPwcGCX/4fdQR31zDJVaTLWwrkfNTzw= github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756 h1:8JWjrRfP14m0oxOk03m11n/xgdY5ceyUf/ZxYdOs5gE= github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756/go.mod h1:ZvKqL6CKxiraefdXPHeJurV2pDD/f2HF2uklDVdrry8= github.com/ethpandaops/ethwallclock v0.3.0 h1:xF5fwtBf+bHFHZKBnwiPFEuelW3sMM7SD3ZNFq1lJY4= diff --git a/pkg/sentry/ethereum/beacon.go b/pkg/sentry/ethereum/beacon.go index aaaf739b..3f0b208c 100644 --- a/pkg/sentry/ethereum/beacon.go +++ b/pkg/sentry/ethereum/beacon.go @@ -67,8 +67,13 @@ func (b *BeaconNode) Start(ctx context.Context) error { s := gocron.NewScheduler(time.Local) errs := make(chan error, 1) + healthyFirstTime := make(chan struct{}) + + b.beacon.OnFirstTimeHealthy(ctx, func(ctx context.Context, event *beacon.FirstTimeHealthyEvent) error { + b.log.Info("Upstream beacon node is healthy") + + close(healthyFirstTime) - go func() { wg := sync.WaitGroup{} for _, service := range b.services { @@ -88,6 +93,8 @@ func (b *BeaconNode) Start(ctx context.Context) error { errs <- fmt.Errorf("failed to start service: %w", err) } + b.log.WithField("service", service.Name()).Info("Waiting for service to be ready") + wg.Wait() } @@ -98,14 +105,26 @@ func (b *BeaconNode) Start(ctx context.Context) error { errs <- fmt.Errorf("failed to run on ready callback: %w", err) } } - }() + + return nil + }) s.StartAsync() - if err := b.beacon.Start(ctx); err != nil { + b.beacon.StartAsync(ctx) + + select { + case err := <-errs: return err + case <-ctx.Done(): + return ctx.Err() + case <-healthyFirstTime: + // Beacon node is healthy, continue with normal operation + case <-time.After(10 * time.Minute): + return errors.New("upstream beacon node is not healthy. check your configuration.") } + // Wait for any errors after the first healthy event select { case err := <-errs: return err diff --git a/pkg/sentry/ethereum/services/metadata.go b/pkg/sentry/ethereum/services/metadata.go index ddc2c29d..7295460f 100644 --- a/pkg/sentry/ethereum/services/metadata.go +++ b/pkg/sentry/ethereum/services/metadata.go @@ -67,10 +67,14 @@ func (m *MetadataService) Start(ctx context.Context) error { return nil } - if err := backoff.Retry(operation, backoff.NewExponentialBackOff()); err != nil { + if err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), func(err error, duration time.Duration) { + m.log.WithError(err).Warnf("Failed to refresh metadata, retrying in %s", duration) + }); err != nil { m.log.WithError(err).Warn("Failed to refresh metadata") } + m.log.Info("Metadata service is ready") + for _, cb := range m.onReadyCallbacks { if err := cb(ctx); err != nil { m.log.WithError(err).Warn("Failed to execute onReady callback") diff --git a/pkg/sentry/sentry.go b/pkg/sentry/sentry.go index cb7338b4..07d71117 100644 --- a/pkg/sentry/sentry.go +++ b/pkg/sentry/sentry.go @@ -64,6 +64,8 @@ type Sentry struct { preset *Preset shutdownFuncs []func(context.Context) error + + summary *Summary } func New(ctx context.Context, log logrus.FieldLogger, config *Config, overrides *Override) (*Sentry, error) { @@ -179,6 +181,7 @@ func New(ctx context.Context, log logrus.FieldLogger, config *Config, overrides latestForkChoiceMu: sync.RWMutex{}, shutdownFuncs: []func(context.Context) error{}, preset: preset, + summary: NewSummary(log, time.Duration(60)*time.Second, b), } // If the output authorization override is set, use it @@ -255,6 +258,12 @@ func (s *Sentry) Start(ctx context.Context) error { return err } + s.beacon.Node().OnEvent(ctx, func(ctx context.Context, event *eth2v1.Event) error { + s.summary.AddEventStreamEvents(event.Topic, 1) + + return nil + }) + s.beacon.OnReady(ctx, func(ctx context.Context) error { s.log.Info("Internal beacon node is ready, subscribing to events") @@ -554,6 +563,8 @@ func (s *Sentry) Start(ctx context.Context) error { } } + go s.summary.Start(ctx) + if s.Config.Ethereum.OverrideNetworkName != "" { s.log.WithField("network", s.Config.Ethereum.OverrideNetworkName).Info("Overriding network name") } @@ -700,7 +711,12 @@ func (s *Sentry) syncClockDrift(_ context.Context) error { } s.clockDrift = response.ClockOffset - s.log.WithField("drift", s.clockDrift).Info("Updated clock drift") + + s.log.WithField("drift", s.clockDrift).Debug("Updated clock drift") + + if s.clockDrift > 2*time.Second || s.clockDrift < -2*time.Second { + s.log.WithField("drift", s.clockDrift).Warn("Large clock drift detected, consider configuring an NTP server on your instance") + } return err } @@ -724,6 +740,8 @@ func (s *Sentry) handleNewDecoratedEvent(ctx context.Context, event *xatu.Decora s.metrics.AddDecoratedEvent(1, eventType, networkStr) + s.summary.AddEventsExported(1) + for _, sink := range s.sinks { if err := sink.HandleNewDecoratedEvent(ctx, event); err != nil { s.log. diff --git a/pkg/sentry/summary.go b/pkg/sentry/summary.go new file mode 100644 index 00000000..aa3a7e08 --- /dev/null +++ b/pkg/sentry/summary.go @@ -0,0 +1,134 @@ +package sentry + +import ( + "context" + "fmt" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/ethpandaops/xatu/pkg/sentry/ethereum" + "github.com/sirupsen/logrus" +) + +// Summary is a struct that holds the summary of the sentry. +type Summary struct { + log logrus.FieldLogger + printInterval time.Duration + + beacon *ethereum.BeaconNode + + eventStreamEvents sync.Map + eventsExported atomic.Uint64 + failedEvents atomic.Uint64 +} + +// NewSummary creates a new summary with the given print interval. +func NewSummary(log logrus.FieldLogger, printInterval time.Duration, beacon *ethereum.BeaconNode) *Summary { + return &Summary{ + log: log, + printInterval: printInterval, + beacon: beacon, + } +} + +func (s *Summary) Start(ctx context.Context) { + s.log.WithField("interval", s.printInterval).Info("Starting summary") + ticker := time.NewTicker(s.printInterval) + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.Print() + } + } +} + +func (s *Summary) Print() { + isSyncing := "unknown" + status := s.beacon.Node().Status() + + if status != nil { + isSyncing = strconv.FormatBool(status.Syncing()) + } + + events := s.GetEventStreamEvents() + + // Build a sorted slice of event stream topics and counts + type topicCount struct { + topic string + count uint64 + } + + sortedEvents := make([]topicCount, 0, len(events)) + for topic, count := range events { + sortedEvents = append(sortedEvents, topicCount{topic, count}) + } + + sort.Slice(sortedEvents, func(i, j int) bool { + return sortedEvents[i].count > sortedEvents[j].count + }) + + // Create formatted strings for each topic and count + eventTopics := make([]string, len(sortedEvents)) + for i, tc := range sortedEvents { + eventTopics[i] = fmt.Sprintf("%s: %d", tc.topic, tc.count) + } + + eventStream := strings.Join(eventTopics, ", ") + + s.log.WithFields(logrus.Fields{ + "events_exported": s.GetEventsExported(), + "events_failed": s.GetFailedEvents(), + "node_is_healthy": s.beacon.Node().Healthy(), + "node_is_syncing": isSyncing, + "event_stream_events": eventStream, + }).Infof("Summary of the last %s", s.printInterval) + + s.Reset() +} + +func (s *Summary) AddEventsExported(count uint64) { + s.eventsExported.Add(count) +} + +func (s *Summary) GetEventsExported() uint64 { + return s.eventsExported.Load() +} + +func (s *Summary) AddFailedEvents(count uint64) { + s.failedEvents.Add(count) +} + +func (s *Summary) GetFailedEvents() uint64 { + return s.failedEvents.Load() +} + +func (s *Summary) AddEventStreamEvents(topic string, count uint64) { + current, _ := s.eventStreamEvents.LoadOrStore(topic, count) + + s.eventStreamEvents.Store(topic, current.(uint64)+count) +} + +func (s *Summary) GetEventStreamEvents() map[string]uint64 { + events := make(map[string]uint64) + + s.eventStreamEvents.Range(func(key, value any) bool { + events[key.(string)], _ = value.(uint64) + + return true + }) + + return events +} + +func (s *Summary) Reset() { + s.eventsExported.Store(0) + s.failedEvents.Store(0) + s.eventStreamEvents = sync.Map{} +}