diff --git a/pkg/aggregator/demultiplexer_agent.go b/pkg/aggregator/demultiplexer_agent.go index c541d95a108a0..1512c756d86fe 100644 --- a/pkg/aggregator/demultiplexer_agent.go +++ b/pkg/aggregator/demultiplexer_agent.go @@ -46,7 +46,7 @@ type DemultiplexerWithAggregator interface { type AgentDemultiplexer struct { log log.Component - m sync.Mutex + m sync.RWMutex // stopChan completely stops the flushLoop of the Demultiplexer when receiving // a message, not doing anything else. @@ -391,8 +391,8 @@ func (d *AgentDemultiplexer) ForceFlushToSerializer(start time.Time, waitForSeri // - to have an implementation of SendIterableSeries listening on multiple sinks in parallel, or, // - to have a thread-safe implementation of the underlying `util.BufferedChan`. func (d *AgentDemultiplexer) flushToSerializer(start time.Time, waitForSerializer bool) { - d.m.Lock() - defer d.m.Unlock() + d.m.RLock() + defer d.m.RUnlock() if d.aggregator == nil { // NOTE(remy): we could consider flushing only the time samplers @@ -561,8 +561,8 @@ func (d *AgentDemultiplexer) DumpDogstatsdContexts(dest io.Writer) error { // If no error is returned here, DestroySender must be called with the same ID // once the sender is not used anymore func (d *AgentDemultiplexer) GetSender(id checkid.ID) (sender.Sender, error) { - d.m.Lock() - defer d.m.Unlock() + d.m.RLock() + defer d.m.RUnlock() if d.senders == nil { return nil, errors.New("demultiplexer is stopped") @@ -574,8 +574,8 @@ func (d *AgentDemultiplexer) GetSender(id checkid.ID) (sender.Sender, error) { // SetSender returns the passed sender with the passed ID. // This is largely for testing purposes func (d *AgentDemultiplexer) SetSender(s sender.Sender, id checkid.ID) error { - d.m.Lock() - defer d.m.Unlock() + d.m.RLock() + defer d.m.RUnlock() if d.senders == nil { return errors.New("demultiplexer is stopped") } @@ -587,8 +587,8 @@ func (d *AgentDemultiplexer) SetSender(s sender.Sender, id checkid.ID) error { // Should be called when no sender with this ID is used anymore // The metrics of this (these) sender(s) that haven't been flushed yet will be lost func (d *AgentDemultiplexer) DestroySender(id checkid.ID) { - d.m.Lock() - defer d.m.Unlock() + d.m.RLock() + defer d.m.RUnlock() if d.senders == nil { return @@ -599,8 +599,8 @@ func (d *AgentDemultiplexer) DestroySender(id checkid.ID) { // GetDefaultSender returns a default sender. func (d *AgentDemultiplexer) GetDefaultSender() (sender.Sender, error) { - d.m.Lock() - defer d.m.Unlock() + d.m.RLock() + defer d.m.RUnlock() if d.senders == nil { return nil, errors.New("demultiplexer is stopped")