Skip to content

Commit

Permalink
cleanup names & data model, add ResourceNameHeader
Browse files Browse the repository at this point in the history
  • Loading branch information
colelaven committed Dec 13, 2024
1 parent 00cfedb commit 2b3bf05
Show file tree
Hide file tree
Showing 14 changed files with 216 additions and 214 deletions.
2 changes: 1 addition & 1 deletion collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (c *collector) Stop(ctx context.Context) {

// After shutting down, we reset the registries so they're fresh for the next collector startup.
measurements.BindplaneAgentThroughputMeasurementsRegistry.Reset()
topology.BindplaneAgentConfigTopologyRegistry.Reset()
topology.BindplaneAgentTopologyRegistry.Reset()
}

// Restart will restart the collector. It will also reset the status channel.
Expand Down
8 changes: 4 additions & 4 deletions extension/bindplaneextension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type bindplaneExtension struct {
logger *zap.Logger
cfg *Config
measurementsRegistry *measurements.ResettableThroughputMeasurementsRegistry
topologyRegistry *topology.ResettableConfigTopologyRegistry
topologyRegistry *topology.ResettableTopologyRegistry
customCapabilityHandlerThroughput opampcustommessages.CustomCapabilityHandler
customCapabilityHandlerTopology opampcustommessages.CustomCapabilityHandler
topologyInterval time.Duration
Expand All @@ -50,7 +50,7 @@ func newBindplaneExtension(logger *zap.Logger, cfg *Config) *bindplaneExtension
logger: logger,
cfg: cfg,
measurementsRegistry: measurements.NewResettableThroughputMeasurementsRegistry(false),
topologyRegistry: topology.NewResettableConfigTopologyRegistry(),
topologyRegistry: topology.NewResettableTopologyRegistry(),
doneChan: make(chan struct{}),
wg: &sync.WaitGroup{},
}
Expand Down Expand Up @@ -89,8 +89,8 @@ func (b *bindplaneExtension) RegisterThroughputMeasurements(processorID string,
return b.measurementsRegistry.RegisterThroughputMeasurements(processorID, measurements)
}

func (b *bindplaneExtension) RegisterConfigTopology(processorID string, topology *topology.ConfigTopologyState) error {
return b.topologyRegistry.RegisterConfigTopology(processorID, topology)
func (b *bindplaneExtension) RegisterTopologyState(processorID string, topology *topology.TopoState) error {
return b.topologyRegistry.RegisterTopologyState(processorID, topology)
}

func (b *bindplaneExtension) setupCustomCapabilities(host component.Host) error {
Expand Down
2 changes: 1 addition & 1 deletion internal/service/managed.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewManagedCollectorService(col collector.Collector, logger *zap.Logger, man
CollectorConfigPath: collectorConfigPath,
LoggerConfigPath: loggerConfigPath,
MeasurementsReporter: measurements.BindplaneAgentThroughputMeasurementsRegistry,
TopologyReporter: topology.BindplaneAgentConfigTopologyRegistry,
TopologyReporter: topology.BindplaneAgentTopologyRegistry,
}

// Create new client
Expand Down
4 changes: 2 additions & 2 deletions internal/topology/bindplane_agent_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@

package topology

// BindplaneAgentConfigTopologyRegistry is the registry singleton used by bindplane agent to track topology state
var BindplaneAgentConfigTopologyRegistry = NewResettableConfigTopologyRegistry()
// BindplaneAgentTopologyRegistry is the registry singleton used by bindplane agent to track topology state
var BindplaneAgentTopologyRegistry = NewResettableTopologyRegistry()
135 changes: 71 additions & 64 deletions internal/topology/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,86 +22,90 @@ import (
"time"
)

// ConfigTopologyRegistry represents a registry for the topology processor to register their ConfigTopology.
type ConfigTopologyRegistry interface {
// RegisterConfigTopology registers the topology state for the given processor.
// TopoRegistry represents a registry for the topology processor to register their TopologyState.
type TopoRegistry interface {
// RegisterTopologyState registers the topology state for the given processor.
// It should return an error if the processor has already been registered.
RegisterConfigTopology(processorID string, data *ConfigTopologyState) error
RegisterTopologyState(processorID string, data *TopoState) error
SetIntervalChan() chan time.Duration
Reset()
}

// GatewayInfo represents the unique identifiable information about a bindplane gateway's configuration
type GatewayInfo struct {
GatewayID string
ConfigName string
AccountID string
OrgID string
// TopoState represents the data captured through topology processors.
type TopoState struct {
Topology topology
mux sync.Mutex
}

// ConfigTopologyState represents the data captured through topology processors.
type ConfigTopologyState struct {
ConfigTopology configTopology
mux sync.Mutex
type topology struct {
// GatewaySource is the gateway source that the entries in the route table point to
GatewaySource GatewayInfo
// RouteTable is a map of gateway destinations to the time at which they were last detected
RouteTable map[GatewayInfo]time.Time
}

type configTopology struct {
DestGateway GatewayInfo
RouteTable map[GatewayInfo]time.Time
// GatewayInfo represents a bindplane gateway source or destination
type GatewayInfo struct {
// OrganizationID is the organizationID where this gateway dest/source lives
OrganizationID string `json:"organizationID"`
// AccountID is the accountID where this gateway dest/source lives
AccountID string `json:"accountID"`
// Configuration is the name of the configuration where this gateway dest/source lives
Configuration string `json:"configuration"`
// GatewayID is the ComponentID of a gateway source, or the resource name of a gateway destination
GatewayID string `json:"gatewayID"`
}

// ConfigTopologyInfo represents topology relationships between configs.
type ConfigTopologyInfo struct {
GatewayID string `json:"gatewayID"`
ConfigName string `json:"configName"`
AccountID string `json:"accountID"`
OrgID string `json:"orgID"`
SourceConfigs []ConfigRecord `json:"sourceConfigs"`
// GatewayRecord represents a gateway destination and the time it was last detected
type GatewayRecord struct {
// Gateway represents a gateway destinations
Gateway GatewayInfo `json:"gateway"`
// LastUpdated is a timestamp of the last time a message w/ topology headers was detected from the above gateway destination
LastUpdated time.Time `json:"lastUpdated"`
}

// ConfigRecord represents a gateway source and the time it was last detected
type ConfigRecord struct {
ConfigName string `json:"configName"`
AccountID string `json:"accountID"`
OrgID string `json:"orgID"`
LastUpdated time.Time `json:"lastUpdated"`
// TopoInfo represents a gateway source & the gateway destinations that point to it.
type TopoInfo struct {
GatewaySource GatewayInfo `json:"gatewaySource"`
GatewayDestinations []GatewayRecord `json:"gatewayDestinations"`
}

// NewConfigTopologyState initializes a new ConfigTopologyState
func NewConfigTopologyState(destGateway GatewayInfo) (*ConfigTopologyState, error) {
return &ConfigTopologyState{
ConfigTopology: configTopology{
DestGateway: destGateway,
RouteTable: make(map[GatewayInfo]time.Time),
// NewTopologyState initializes a new TopologyState
func NewTopologyState(gw GatewayInfo) (*TopoState, error) {
return &TopoState{
Topology: topology{
GatewaySource: gw,
RouteTable: make(map[GatewayInfo]time.Time),
},
mux: sync.Mutex{},
}, nil
}

// UpsertRoute upserts given route.
func (ts *ConfigTopologyState) UpsertRoute(_ context.Context, gw GatewayInfo) {
func (ts *TopoState) UpsertRoute(_ context.Context, gw GatewayInfo) {
ts.mux.Lock()
defer ts.mux.Unlock()

ts.ConfigTopology.RouteTable[gw] = time.Now()
ts.Topology.RouteTable[gw] = time.Now()
}

// ResettableConfigTopologyRegistry is a concrete version of TopologyDataRegistry that is able to be reset.
type ResettableConfigTopologyRegistry struct {
// ResettableTopologyRegistry is a concrete version of TopologyDataRegistry that is able to be reset.
type ResettableTopologyRegistry struct {
topology *sync.Map
setIntervalChan chan time.Duration
}

// NewResettableConfigTopologyRegistry creates a new ResettableConfigTopologyRegistry
func NewResettableConfigTopologyRegistry() *ResettableConfigTopologyRegistry {
return &ResettableConfigTopologyRegistry{
topology: &sync.Map{},
// NewResettableTopologyRegistry creates a new ResettableTopologyRegistry
func NewResettableTopologyRegistry() *ResettableTopologyRegistry {
return &ResettableTopologyRegistry{
topology: &sync.Map{},
setIntervalChan: make(chan time.Duration, 1),
}
}

// RegisterConfigTopology registers the ConfigTopology with the registry.
func (rtsr *ResettableConfigTopologyRegistry) RegisterConfigTopology(processorID string, configTopology *ConfigTopologyState) error {
_, alreadyExists := rtsr.topology.LoadOrStore(processorID, configTopology)
// RegisterTopologyState registers the TopologyState with the registry.
func (rtsr *ResettableTopologyRegistry) RegisterTopologyState(processorID string, topoState *TopoState) error {
_, alreadyExists := rtsr.topology.LoadOrStore(processorID, topoState)
if alreadyExists {
return fmt.Errorf("topology for processor %q was already registered", processorID)
}
Expand All @@ -110,41 +114,44 @@ func (rtsr *ResettableConfigTopologyRegistry) RegisterConfigTopology(processorID
}

// Reset unregisters all topology states in this registry
func (rtsr *ResettableConfigTopologyRegistry) Reset() {
func (rtsr *ResettableTopologyRegistry) Reset() {
rtsr.topology = &sync.Map{}
}

// SetIntervalChan returns the setIntervalChan
func (rtsr *ResettableConfigTopologyRegistry) SetIntervalChan() chan time.Duration {
func (rtsr *ResettableTopologyRegistry) SetIntervalChan() chan time.Duration {
return rtsr.setIntervalChan
}

// TopologyInfos returns all the topology data in this registry.
func (rtsr *ResettableConfigTopologyRegistry) TopologyInfos() []ConfigTopologyInfo {
states := []configTopology{}
func (rtsr *ResettableTopologyRegistry) TopologyInfos() []TopoInfo {
states := []topology{}

rtsr.topology.Range(func(_, value any) bool {
ts := value.(*ConfigTopologyState)
states = append(states, ts.ConfigTopology)
ts := value.(*TopoState)
states = append(states, ts.Topology)
return true
})

ti := []ConfigTopologyInfo{}
ti := []TopoInfo{}
for _, ts := range states {
curInfo := ConfigTopologyInfo{}
curInfo.GatewayID = ts.DestGateway.GatewayID
curInfo.ConfigName = ts.DestGateway.ConfigName
curInfo.AccountID = ts.DestGateway.AccountID
curInfo.OrgID = ts.DestGateway.OrgID
curInfo := TopoInfo{}
curInfo.GatewaySource.OrganizationID = ts.GatewaySource.OrganizationID
curInfo.GatewaySource.AccountID = ts.GatewaySource.AccountID
curInfo.GatewaySource.Configuration = ts.GatewaySource.Configuration
curInfo.GatewaySource.GatewayID = ts.GatewaySource.GatewayID
for gw, updated := range ts.RouteTable {
curInfo.SourceConfigs = append(curInfo.SourceConfigs, ConfigRecord{
ConfigName: gw.ConfigName,
AccountID: gw.AccountID,
OrgID: gw.OrgID,
curInfo.GatewayDestinations = append(curInfo.GatewayDestinations, GatewayRecord{
Gateway: GatewayInfo{
OrganizationID: gw.OrganizationID,
AccountID: gw.AccountID,
Configuration: gw.Configuration,
GatewayID: gw.GatewayID,
},
LastUpdated: updated.UTC(),
})
}
if len(curInfo.SourceConfigs) > 0 {
if len(curInfo.GatewayDestinations) > 0 {
ti = append(ti, curInfo)
}
}
Expand Down
6 changes: 1 addition & 5 deletions opamp/observiq/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package observiq
import (
"encoding/json"
"errors"
"fmt"
"sync"
"time"

Expand All @@ -31,7 +30,7 @@ import (

// TopologyReporter represents an object that reports topology state.
type TopologyReporter interface {
TopologyInfos() []topology.ConfigTopologyInfo
TopologyInfos() []topology.TopoInfo
SetIntervalChan() chan time.Duration
}

Expand Down Expand Up @@ -96,12 +95,9 @@ func (ts *topologySender) loop() {
t := newTicker()
defer t.Stop()

fmt.Println("topology sender loop")

for {
select {
case setInterval := <-ts.reporter.SetIntervalChan():
fmt.Println("topology sender loop: received interval: ", setInterval)
t.SetInterval(setInterval)
case <-ts.done:
return
Expand Down
21 changes: 10 additions & 11 deletions processor/topologyprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@
This processor utilizes request headers to provide extended topology functionality in BindPlane.

## Minimum agent versions
- Introduced: [v1.6.6](https://github.com/observIQ/bindplane-agent/releases/tag/v1.6.6)
- Introduced: [v1.6.7](https://github.com/observIQ/bindplane-agent/releases/tag/v1.6.7)

## Supported pipelines:
- Logs
- Metrics
- Traces

## Configuration
| Field | Type | Default | Description |
|---------------------|-----------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `enabled` | bool | `false` | When `true`, this processor will look for incoming topology headers and track the relevant connections accordingly. If false this processor acts as a no-op. |
| `interval` | duration | `1m` | The interval at which topology data is sent to Bindplane via OpAMP. |
| `configName` | string | | The name of the Bindplane configuration this processor is running on. |
| `orgID` | string | | The Organization ID of the Bindplane configuration where this processor is running. |
| `accountID` | string | | The Account ID of the Bindplane configuration where this processor is running. |
| Field | Type | Default | Description |
|----------------------|-----------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `interval` | duration | `1m` | The interval at which topology data is sent to Bindplane via OpAMP. |
| `organizationID` | string | | The Organization ID of the Bindplane configuration where this processor is running. |
| `accountID` | string | | The Account ID of the Bindplane configuration where this processor is running. |
| `configuration` | string | | The name of the Bindplane configuration this processor is running on. |


### Example configuration
Expand All @@ -30,11 +29,11 @@ receivers:

processors:
topology:
enabled: true
interval: 1m
configName: "myConfigName"
orgID: "myOrgID"
organizationID: "myOrganizationID"
accountID: "myAccountID"
configuration: "myConfiguration"


exporters:
googlecloud:
Expand Down
9 changes: 2 additions & 7 deletions processor/topologyprocessor/bindplane_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,12 @@
package topologyprocessor

import (
"fmt"

"github.com/observiq/bindplane-agent/internal/topology"
"go.opentelemetry.io/collector/component"
)

// GetTopologyRegistry returns the topology registry that should be registered to based on the component ID.
// nil, nil may be returned by this function. In this case, the processor should not register it's topology state anywhere.
func GetTopologyRegistry(host component.Host, bindplane component.ID) (topology.ConfigTopologyRegistry, error) {

fmt.Println("in Bindplane Registry")

return topology.BindplaneAgentConfigTopologyRegistry, nil
func GetTopologyRegistry(host component.Host, bindplane component.ID) (topology.TopoRegistry, error) {
return topology.BindplaneAgentTopologyRegistry, nil
}
21 changes: 7 additions & 14 deletions processor/topologyprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,11 @@ type Config struct {
// Bindplane extension to use in order to report topology. Optional.
BindplaneExtension component.ID `mapstructure:"bindplane_extension"`

// ComponentID of the Gateway Source where this processor is present
GatewayID string `mapstructure:"gatewayID"`

// Name of the Config where this processor is present
ConfigName string `mapstructure:"configName"`
Configuration string `mapstructure:"configuration"`

// OrgID of the Org where this processor is present
OrgID string `mapstructure:"orgID"`
// OrganizationID of the Org where this processor is present
OrganizationID string `mapstructure:"organizationID"`

// AccountID of the Account where this processor is present
AccountID string `mapstructure:"accountID"`
Expand All @@ -55,16 +52,12 @@ func (cfg Config) Validate() error {
return nil
}

if cfg.GatewayID == "" {
return errors.New("`gatewayID` must be specified")
}

if cfg.ConfigName == "" {
return errors.New("`configName` must be specified")
if cfg.Configuration == "" {
return errors.New("`configuration` must be specified")
}

if cfg.OrgID == "" {
return errors.New("`orgID` must be specified")
if cfg.OrganizationID == "" {
return errors.New("`organizationID` must be specified")
}

if cfg.AccountID == "" {
Expand Down
Loading

0 comments on commit 2b3bf05

Please sign in to comment.