Skip to content

Commit

Permalink
glorious: nits
Browse files Browse the repository at this point in the history
  • Loading branch information
shazbert committed Dec 5, 2024
1 parent 73304c1 commit 09ec12f
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 19 deletions.
1 change: 1 addition & 0 deletions config/config_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ type Exchange struct {
HTTPTimeout time.Duration `json:"httpTimeout"`
HTTPUserAgent string `json:"httpUserAgent,omitempty"`
HTTPDebugging bool `json:"httpDebugging,omitempty"`
WebsocketMetricsLogging bool `json:"websocketMetricsLogging"`
WebsocketResponseCheckTimeout time.Duration `json:"websocketResponseCheckTimeout"`
WebsocketResponseMaxLimit time.Duration `json:"websocketResponseMaxLimit"`
WebsocketTrafficTimeout time.Duration `json:"websocketTrafficTimeout"`
Expand Down
4 changes: 0 additions & 4 deletions engine/websocketroutine_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,6 @@ func (m *WebsocketRoutineManager) websocketRoutine() {
continue
}

if m.verbose {
ws.SetProcessReportManager(&stream.DefaultProcessReporterManager{})
}

wg.Add(1)
go func() {
defer wg.Done()
Expand Down
35 changes: 21 additions & 14 deletions exchanges/stream/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,56 +46,63 @@ func (d DefaultProcessReporterManager) New(conn Connection) ProcessReporter {
// It tracks operation metrics, including the number of operations, average processing time, and peak processing time.
type DefaultProcessReporter struct {
operations int64
errors int64
totalProcessingTime time.Duration
peakProcessingTime time.Duration
peakCause []byte
ch chan struct{}
m sync.Mutex
}

// Report logs the processing time for a received data packet and updates metrics.
func (r *DefaultProcessReporter) Report(read time.Time, _ []byte, _ error) {
func (r *DefaultProcessReporter) Report(read time.Time, data []byte, err error) {
processingDuration := time.Since(read)
r.m.Lock()
defer r.m.Unlock()
r.operations++
if err != nil {
r.errors++
}
r.totalProcessingTime += processingDuration
if processingDuration > r.peakProcessingTime {
r.peakProcessingTime = processingDuration
r.peakCause = data
}
}

// Close closes the process reporter
func (r *DefaultProcessReporter) Close() {
r.m.Lock()
defer r.m.Unlock()
if r.ch != nil {
close(r.ch)
}
close(r.ch)
r.m.Unlock()
}

// collectMetrics runs in a separate goroutine to periodically log aggregated metrics.
func (r *DefaultProcessReporter) collectMetrics(conn Connection) {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
timer := time.NewTimer(time.Until(time.Now().Truncate(time.Minute).Add(time.Minute)))
defer timer.Stop()

for {
select {
case <-r.ch:
return
case <-ticker.C:
case <-timer.C:
timer.Reset(time.Until(time.Now().Truncate(time.Minute).Add(time.Minute)))
r.m.Lock()
if r.operations > 0 {
avgOperationsPerSecond := r.operations / 60
avgOperationsPerSecond := float64(r.operations) / 60
avgProcessingTime := r.totalProcessingTime / time.Duration(r.operations)
peakTime := r.peakProcessingTime

peakCause := r.peakCause
errors := r.errors
// Reset metrics for the next interval.
r.operations, r.totalProcessingTime, r.peakProcessingTime = 0, 0, 0

r.operations, r.totalProcessingTime, r.peakProcessingTime, r.peakCause, r.errors = 0, 0, 0, nil, 0
r.m.Unlock()

if len(peakCause) > 100 {
peakCause = append(peakCause[:100], []byte("...")...)
}
// Log metrics outside of the critical section to avoid blocking other threads.
log.Debugf(log.WebsocketMgr, "%v: Operations/Second: %d, Avg Processing/Operation: %v, Peak: %v", conn.GetURL(), avgOperationsPerSecond, avgProcessingTime, peakTime)
log.Debugf(log.WebsocketMgr, "Connection: %v Operations/Second: %.2f, Avg Processing/Operation: %v, Errors: %v Peak: %v Cause: %v...", conn.GetURL(), avgOperationsPerSecond, avgProcessingTime, errors, peakTime, string(peakCause))
} else {
r.m.Unlock()
}
Expand Down
12 changes: 11 additions & 1 deletion exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ func (w *Websocket) Setup(s *WebsocketSetup) error {
w.setState(disconnectedState)

w.rateLimitDefinitions = s.RateLimitDefinitions

if s.ExchangeConfig.WebsocketMetricsLogging {
w.processReporter = &DefaultProcessReporterManager{}
}
return nil
}

Expand Down Expand Up @@ -1081,13 +1085,19 @@ func (w *Websocket) Reader(ctx context.Context, conn Connection, handler func(ct
}
for {
resp := conn.ReadMessage()
readAt := time.Now()

var readAt time.Time
if reporter != nil {
readAt = time.Now()
}

if resp.Raw == nil {
if reporter != nil {
reporter.Close()
}
return // Connection has been closed
}

err := handler(ctx, resp.Raw)
if err != nil {
w.DataHandler <- fmt.Errorf("connection URL:[%v] error: %w", conn.GetURL(), err)
Expand Down

0 comments on commit 09ec12f

Please sign in to comment.