Skip to content

Commit

Permalink
fix concurrent map write
Browse files Browse the repository at this point in the history
  • Loading branch information
colelaven committed Dec 12, 2024
1 parent 279bea3 commit 5da9bb3
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 65 deletions.
62 changes: 25 additions & 37 deletions internal/topology/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package topology
import (
"context"
"fmt"
"slices"
"sync"
"time"
)
Expand All @@ -27,7 +26,7 @@ import (
type ConfigTopologyRegistry interface {
// RegisterConfigTopology registers the topology state for the given processor.
// It should return an error if the processor has already been registered.
RegisterConfigTopology(processorID string, data *ConfigTopology) error
RegisterConfigTopology(processorID string, data *ConfigTopologyState) error
SetIntervalChan() chan time.Duration
Reset()
}
Expand All @@ -40,8 +39,13 @@ type GatewayInfo struct {
OrgID string
}

// ConfigTopology represents the data captured through topology processors.
type ConfigTopology struct {
// ConfigTopologyState represents the data captured through topology processors.
type ConfigTopologyState struct {
ConfigTopology configTopology
mux sync.Mutex
}

type configTopology struct {
DestGateway GatewayInfo
RouteTable map[GatewayInfo]time.Time
}
Expand All @@ -63,17 +67,23 @@ type ConfigRecord struct {
LastUpdated time.Time `json:"lastUpdated"`
}

// NewConfigTopology initializes a new ConfigTopology
func NewConfigTopology(destGateway GatewayInfo) (*ConfigTopology, error) {
return &ConfigTopology{
DestGateway: destGateway,
RouteTable: make(map[GatewayInfo]time.Time),
// NewConfigTopologyState initializes a new ConfigTopologyState
func NewConfigTopologyState(destGateway GatewayInfo) (*ConfigTopologyState, error) {
return &ConfigTopologyState{
ConfigTopology: configTopology{
DestGateway: destGateway,
RouteTable: make(map[GatewayInfo]time.Time),
},
mux: sync.Mutex{},
}, nil
}

// UpsertRoute upserts given route.
func (ts *ConfigTopology) UpsertRoute(_ context.Context, gw GatewayInfo) {
ts.RouteTable[gw] = time.Now()
func (ts *ConfigTopologyState) UpsertRoute(_ context.Context, gw GatewayInfo) {
ts.mux.Lock()
defer ts.mux.Unlock()

ts.ConfigTopology.RouteTable[gw] = time.Now()
}

// ResettableConfigTopologyRegistry is a concrete version of TopologyDataRegistry that is able to be reset.
Expand All @@ -86,12 +96,11 @@ type ResettableConfigTopologyRegistry struct {
func NewResettableConfigTopologyRegistry() *ResettableConfigTopologyRegistry {
return &ResettableConfigTopologyRegistry{
topology: &sync.Map{},
setIntervalChan: make(chan time.Duration, 1),
}
}

// RegisterConfigTopology registers the ConfigTopology with the registry.
func (rtsr *ResettableConfigTopologyRegistry) RegisterConfigTopology(processorID string, configTopology *ConfigTopology) error {
func (rtsr *ResettableConfigTopologyRegistry) RegisterConfigTopology(processorID string, configTopology *ConfigTopologyState) error {
_, alreadyExists := rtsr.topology.LoadOrStore(processorID, configTopology)
if alreadyExists {
return fmt.Errorf("topology for processor %q was already registered", processorID)
Expand All @@ -112,11 +121,11 @@ func (rtsr *ResettableConfigTopologyRegistry) SetIntervalChan() chan time.Durati

// TopologyInfos returns all the topology data in this registry.
func (rtsr *ResettableConfigTopologyRegistry) TopologyInfos() []ConfigTopologyInfo {
states := []ConfigTopology{}
states := []configTopology{}

rtsr.topology.Range(func(_, value any) bool {
ts := value.(*ConfigTopology)
states = append(states, *ts)
ts := value.(*ConfigTopologyState)
states = append(states, ts.ConfigTopology)
return true
})

Expand All @@ -136,27 +145,6 @@ func (rtsr *ResettableConfigTopologyRegistry) TopologyInfos() []ConfigTopologyIn
})
}
if len(curInfo.SourceConfigs) > 0 {
slices.SortFunc(curInfo.SourceConfigs, func(a ConfigRecord, b ConfigRecord) int {
if a.OrgID < b.OrgID {
return -1
}
if a.OrgID > b.OrgID {
return 1
}
if a.AccountID < b.AccountID {
return -1
}
if a.AccountID > b.AccountID {
return 1
}
if a.ConfigName < b.ConfigName {
return -1
}
if a.ConfigName > b.ConfigName {
return 1
}
return 0
})
ti = append(ti, curInfo)
}
}
Expand Down
31 changes: 8 additions & 23 deletions opamp/observiq/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package observiq
import (
"encoding/json"
"errors"
"fmt"
"sync"
"time"

Expand All @@ -39,10 +40,6 @@ type topologySender struct {
logger *zap.Logger
reporter TopologyReporter
opampClient client.OpAMPClient
interval time.Duration

changeIntervalChan chan time.Duration
changeAttributesChan chan map[string]string

mux *sync.Mutex
isRunning bool
Expand All @@ -56,11 +53,10 @@ func newTopologySender(l *zap.Logger, reporter TopologyReporter, opampClient cli
reporter: reporter,
opampClient: opampClient,

changeIntervalChan: make(chan time.Duration, 1),
mux: &sync.Mutex{},
isRunning: false,
done: make(chan struct{}),
wg: &sync.WaitGroup{},
mux: &sync.Mutex{},
isRunning: false,
done: make(chan struct{}),
wg: &sync.WaitGroup{},
}
}

Expand All @@ -82,15 +78,6 @@ func (ts *topologySender) Start() {
}()
}

// SetInterval changes the interval of the measurements sender.
func (ts topologySender) SetInterval(d time.Duration) {
select {
case ts.changeIntervalChan <- d:
case <-ts.done:
}

}

func (ts *topologySender) Stop() {
ts.mux.Lock()
defer ts.mux.Unlock()
Expand All @@ -107,16 +94,14 @@ func (ts *topologySender) Stop() {

func (ts *topologySender) loop() {
t := newTicker()
t.SetInterval(ts.interval)
defer t.Stop()

fmt.Println("topology sender loop")

for {
select {
case newInterval := <-ts.changeIntervalChan:
ts.interval = newInterval
t.SetInterval(newInterval)
case setInterval := <-ts.reporter.SetIntervalChan():
ts.interval = setInterval
fmt.Println("topology sender loop: received interval: ", setInterval)
t.SetInterval(setInterval)
case <-ts.done:
return
Expand Down
10 changes: 5 additions & 5 deletions processor/topologyprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type topologyUpdate struct {
type topologyProcessor struct {
logger *zap.Logger
enabled bool
topology *topology.ConfigTopology
topology *topology.ConfigTopologyState
interval time.Duration
processorID component.ID
bindplane component.ID
Expand All @@ -60,7 +60,7 @@ func newTopologyProcessor(logger *zap.Logger, cfg *Config, processorID component
AccountID: cfg.AccountID,
OrgID: cfg.OrgID,
}
topology, err := topology.NewConfigTopology(destGw)
topology, err := topology.NewConfigTopologyState(destGw)
if err != nil {
return nil, fmt.Errorf("create topology state: %w", err)
}
Expand Down Expand Up @@ -116,17 +116,17 @@ func (tp *topologyProcessor) processTopologyHeaders(ctx context.Context) {
var configName, accountID, orgID string

configNameHeaders := metadata.Get(configNameHeader)
if len(configNameHeader) > 0 {
if len(configNameHeaders) > 0 {
configName = configNameHeaders[0]
}

accountIDHeaders := metadata.Get(accountIDHeader)
if len(configNameHeader) > 0 {
if len(accountIDHeaders) > 0 {
accountID = accountIDHeaders[0]
}

orgIDHeaders := metadata.Get(organizationIDHeader)
if len(configNameHeader) > 0 {
if len(orgIDHeaders) > 0 {
orgID = orgIDHeaders[0]
}

Expand Down

0 comments on commit 5da9bb3

Please sign in to comment.