Skip to content

Commit

Permalink
mapic: Fix stats collection deadlock (#263)
Browse files Browse the repository at this point in the history
* mapic/stats: Fix deadlock on limited errgroup

Realized that we can't really use SetLimit with
recursive Go calls, as we might get to a state
where all routines are blocked waiting for another
routine to finish.

Fixed it by removing that recursion and parallelizing
only the SetActive+Publish calls.

* mapic/stats: Stop inheriting from errgroup

* mapic: Make variable redefinition clearer

I hate that go bug
  • Loading branch information
victorges authored Feb 8, 2023
1 parent 703d532 commit 11d1911
Showing 1 changed file with 41 additions and 45 deletions.
86 changes: 41 additions & 45 deletions internal/app/mistapiconnector/stats_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,59 +70,57 @@ func (c *metricsCollector) collectMetrics(ctx context.Context) error {
}
streamsMetrics := compileStreamMetrics(mistStats)

eg := errGroupRecv{}
eg := errgroup.Group{}
eg.SetLimit(5)

for streamID, metrics := range streamsMetrics {
if err := ctx.Err(); err != nil {
return err
}

streamID, metrics := streamID, metrics
eg.GoRecovered(func() {
info, err := c.getStreamInfo(streamID)
if err != nil {
glog.Errorf("Error getting stream info for streamId=%s err=%q", streamID, err)

info, err := c.getStreamInfo(streamID)
if err != nil {
glog.Errorf("Error getting stream info for streamId=%s err=%q", streamID, err)
continue
}
if info.isLazy {
// avoid spamming metrics for playback-only catalyst instances. This means
// that if mapic restarts we will stop sending metrics from previous
// streams as well, but that's a minor issue (curr stream health is dying).
glog.Infof("Skipping metrics for lazily created stream info. streamId=%q metrics=%+v", streamID, metrics)
continue
}

eg.Go(recovered(func() {
info.mu.Lock()
timeSinceBumped := time.Since(info.lastSeenBumpedAt)
info.mu.Unlock()
if timeSinceBumped <= lastSeenBumpPeriod {
return
}
if info.isLazy {
// avoid spamming metrics for playback-only catalyst instances. This means
// that if mapic restarts we will stop sending metrics from previous
// streams as well, but that's a minor issue (curr stream health is dying).
glog.Infof("Skipping metrics for lazily created stream info. streamId=%q metrics=%+v", streamID, metrics)

if _, err := c.lapi.SetActive(info.stream.ID, true, info.startedAt); err != nil {
glog.Errorf("Error updating stream last seen. err=%q streamId=%q", err, info.stream.ID)
return
}

eg.GoRecovered(func() {
info.mu.Lock()
timeSinceBumped := time.Since(info.lastSeenBumpedAt)
info.mu.Unlock()
if timeSinceBumped <= lastSeenBumpPeriod {
return
}

if _, err := c.lapi.SetActive(info.stream.ID, true, info.startedAt); err != nil {
glog.Errorf("Error updating stream last seen. err=%q streamId=%q", err, info.stream.ID)
return
}

info.mu.Lock()
info.lastSeenBumpedAt = time.Now()
info.mu.Unlock()
})

eg.GoRecovered(func() {
mseEvent := createMetricsEvent(c.nodeID, c.ownRegion, info, metrics)
err = c.producer.Publish(ctx, event.AMQPMessage{
Exchange: c.amqpExchange,
Key: fmt.Sprintf("stream.metrics.%s", info.stream.ID),
Body: mseEvent,
})
if err != nil {
glog.Errorf("Error sending mist stream metrics event. err=%q streamId=%q event=%+v", err, info.stream.ID, mseEvent)
}
info.mu.Lock()
info.lastSeenBumpedAt = time.Now()
info.mu.Unlock()
}))

eg.Go(recovered(func() {
mseEvent := createMetricsEvent(c.nodeID, c.ownRegion, info, metrics)
err = c.producer.Publish(ctx, event.AMQPMessage{
Exchange: c.amqpExchange,
Key: fmt.Sprintf("stream.metrics.%s", info.stream.ID),
Body: mseEvent,
})
})
if err != nil {
glog.Errorf("Error sending mist stream metrics event. err=%q streamId=%q event=%+v", err, info.stream.ID, mseEvent)
}
}))
}

return eg.Wait()
Expand Down Expand Up @@ -202,10 +200,8 @@ func compileStreamMetrics(mistStats *mist.MistStats) map[string]*streamMetrics {
return streamsMetrics
}

type errGroupRecv struct{ errgroup.Group }

func (eg *errGroupRecv) GoRecovered(f func()) {
eg.Group.Go(func() (err error) {
func recovered(f func()) func() error {
return func() (err error) {
defer func() {
if r := recover(); r != nil {
glog.Errorf("Panic in metrics collector. value=%v stack=%s", r, debug.Stack())
Expand All @@ -214,5 +210,5 @@ func (eg *errGroupRecv) GoRecovered(f func()) {
}()
f()
return nil
})
}
}

0 comments on commit 11d1911

Please sign in to comment.