Skip to content

Commit

Permalink
feat(sentry): Add a summary log
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Oct 9, 2024
1 parent fc52094 commit 4dcc12c
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 0 deletions.
13 changes: 13 additions & 0 deletions pkg/sentry/sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type Sentry struct {
preset *Preset

shutdownFuncs []func(context.Context) error

summary *Summary
}

func New(ctx context.Context, log logrus.FieldLogger, config *Config, overrides *Override) (*Sentry, error) {
Expand Down Expand Up @@ -179,6 +181,7 @@ func New(ctx context.Context, log logrus.FieldLogger, config *Config, overrides
latestForkChoiceMu: sync.RWMutex{},
shutdownFuncs: []func(context.Context) error{},
preset: preset,
summary: NewSummary(log, time.Duration(60)*time.Second, b),
}

// If the output authorization override is set, use it
Expand Down Expand Up @@ -255,6 +258,12 @@ func (s *Sentry) Start(ctx context.Context) error {
return err
}

s.beacon.Node().OnEvent(ctx, func(ctx context.Context, event *eth2v1.Event) error {
s.summary.AddEventStreamEvents(event.Topic, 1)

return nil
})

s.beacon.OnReady(ctx, func(ctx context.Context) error {
s.log.Info("Internal beacon node is ready, subscribing to events")

Expand Down Expand Up @@ -554,6 +563,8 @@ func (s *Sentry) Start(ctx context.Context) error {
}
}

go s.summary.Start(ctx)

if s.Config.Ethereum.OverrideNetworkName != "" {
s.log.WithField("network", s.Config.Ethereum.OverrideNetworkName).Info("Overriding network name")
}
Expand Down Expand Up @@ -724,6 +735,8 @@ func (s *Sentry) handleNewDecoratedEvent(ctx context.Context, event *xatu.Decora

s.metrics.AddDecoratedEvent(1, eventType, networkStr)

s.summary.AddEventsExported(1)

for _, sink := range s.sinks {
if err := sink.HandleNewDecoratedEvent(ctx, event); err != nil {
s.log.
Expand Down
134 changes: 134 additions & 0 deletions pkg/sentry/summary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package sentry

import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ethpandaops/xatu/pkg/sentry/ethereum"
"github.com/sirupsen/logrus"
)

// Summary is a struct that holds the summary of the sentry.
type Summary struct {
log logrus.FieldLogger
printInterval time.Duration

beacon *ethereum.BeaconNode

eventStreamEvents sync.Map
eventsExported atomic.Uint64
failedEvents atomic.Uint64
}

// NewSummary creates a new summary with the given print interval.
func NewSummary(log logrus.FieldLogger, printInterval time.Duration, beacon *ethereum.BeaconNode) *Summary {
return &Summary{
log: log,
printInterval: printInterval,
beacon: beacon,
}
}

func (s *Summary) Start(ctx context.Context) {
s.log.WithField("interval", s.printInterval).Info("Starting summary")
ticker := time.NewTicker(s.printInterval)

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.Print()
}
}
}

func (s *Summary) Print() {
isSyncing := "unknown"
status := s.beacon.Node().Status()

if status != nil {
isSyncing = strconv.FormatBool(status.Syncing())
}

events := s.GetEventStreamEvents()

// Build a sorted slice of event stream topics and counts
type topicCount struct {
topic string
count uint64
}

sortedEvents := make([]topicCount, 0, len(events))
for topic, count := range events {
sortedEvents = append(sortedEvents, topicCount{topic, count})
}

sort.Slice(sortedEvents, func(i, j int) bool {
return sortedEvents[i].count > sortedEvents[j].count
})

// Create formatted strings for each topic and count
eventTopics := make([]string, len(sortedEvents))
for i, tc := range sortedEvents {
eventTopics[i] = fmt.Sprintf("%s: %d", tc.topic, tc.count)
}

eventStream := strings.Join(eventTopics, ", ")

s.log.WithFields(logrus.Fields{
"events_exported": s.GetEventsExported(),
"events_failed": s.GetFailedEvents(),
"node_is_healthy": s.beacon.Node().Healthy(),
"node_is_syncing": isSyncing,
"event_stream_events": eventStream,
}).Infof("Summary of the last %s", s.printInterval)

s.Reset()
}

func (s *Summary) AddEventsExported(count uint64) {
s.eventsExported.Add(count)
}

func (s *Summary) GetEventsExported() uint64 {
return s.eventsExported.Load()
}

func (s *Summary) AddFailedEvents(count uint64) {
s.failedEvents.Add(count)
}

func (s *Summary) GetFailedEvents() uint64 {
return s.failedEvents.Load()
}

func (s *Summary) AddEventStreamEvents(topic string, count uint64) {
current, _ := s.eventStreamEvents.LoadOrStore(topic, count)

s.eventStreamEvents.Store(topic, current.(uint64)+count)
}

func (s *Summary) GetEventStreamEvents() map[string]uint64 {
events := make(map[string]uint64)

s.eventStreamEvents.Range(func(key, value any) bool {
events[key.(string)] = value.(uint64)

Check failure on line 122 in pkg/sentry/summary.go

View workflow job for this annotation

GitHub Actions / lint

Error return value is not checked (errcheck)

return true
})

return events
}

func (s *Summary) Reset() {
s.eventsExported.Store(0)
s.failedEvents.Store(0)
s.eventStreamEvents = sync.Map{}
}

0 comments on commit 4dcc12c

Please sign in to comment.