Skip to content

Commit

Permalink
Use RWMutex in demultiplexer agent
Browse files Browse the repository at this point in the history
  • Loading branch information
gh123man committed May 30, 2024
1 parent c7efe43 commit 7403eac
Showing 1 changed file with 11 additions and 11 deletions.
22 changes: 11 additions & 11 deletions pkg/aggregator/demultiplexer_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type DemultiplexerWithAggregator interface {
type AgentDemultiplexer struct {
log log.Component

m sync.Mutex
m sync.RWMutex

// stopChan completely stops the flushLoop of the Demultiplexer when receiving
// a message, not doing anything else.
Expand Down Expand Up @@ -391,8 +391,8 @@ func (d *AgentDemultiplexer) ForceFlushToSerializer(start time.Time, waitForSeri
// - to have an implementation of SendIterableSeries listening on multiple sinks in parallel, or,
// - to have a thread-safe implementation of the underlying `util.BufferedChan`.
func (d *AgentDemultiplexer) flushToSerializer(start time.Time, waitForSerializer bool) {
d.m.Lock()
defer d.m.Unlock()
d.m.RLock()
defer d.m.RUnlock()

if d.aggregator == nil {
// NOTE(remy): we could consider flushing only the time samplers
Expand Down Expand Up @@ -561,8 +561,8 @@ func (d *AgentDemultiplexer) DumpDogstatsdContexts(dest io.Writer) error {
// If no error is returned here, DestroySender must be called with the same ID
// once the sender is not used anymore
func (d *AgentDemultiplexer) GetSender(id checkid.ID) (sender.Sender, error) {
d.m.Lock()
defer d.m.Unlock()
d.m.RLock()
defer d.m.RUnlock()

if d.senders == nil {
return nil, errors.New("demultiplexer is stopped")
Expand All @@ -574,8 +574,8 @@ func (d *AgentDemultiplexer) GetSender(id checkid.ID) (sender.Sender, error) {
// SetSender returns the passed sender with the passed ID.
// This is largely for testing purposes
func (d *AgentDemultiplexer) SetSender(s sender.Sender, id checkid.ID) error {
d.m.Lock()
defer d.m.Unlock()
d.m.RLock()
defer d.m.RUnlock()
if d.senders == nil {
return errors.New("demultiplexer is stopped")
}
Expand All @@ -587,8 +587,8 @@ func (d *AgentDemultiplexer) SetSender(s sender.Sender, id checkid.ID) error {
// Should be called when no sender with this ID is used anymore
// The metrics of this (these) sender(s) that haven't been flushed yet will be lost
func (d *AgentDemultiplexer) DestroySender(id checkid.ID) {
d.m.Lock()
defer d.m.Unlock()
d.m.RLock()
defer d.m.RUnlock()

if d.senders == nil {
return
Expand All @@ -599,8 +599,8 @@ func (d *AgentDemultiplexer) DestroySender(id checkid.ID) {

// GetDefaultSender returns a default sender.
func (d *AgentDemultiplexer) GetDefaultSender() (sender.Sender, error) {
d.m.Lock()
defer d.m.Unlock()
d.m.RLock()
defer d.m.RUnlock()

if d.senders == nil {
return nil, errors.New("demultiplexer is stopped")
Expand Down

0 comments on commit 7403eac

Please sign in to comment.