diff --git a/collector/collector.go b/collector/collector.go index 085d93587..bd535aeac 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -204,7 +204,7 @@ func (c *collector) Stop(ctx context.Context) { // After shutting down, we reset the registries so they're fresh for the next collector startup. measurements.BindplaneAgentThroughputMeasurementsRegistry.Reset() - topology.BindplaneAgentConfigTopologyRegistry.Reset() + topology.BindplaneAgentTopologyRegistry.Reset() } // Restart will restart the collector. It will also reset the status channel. diff --git a/extension/bindplaneextension/extension.go b/extension/bindplaneextension/extension.go index b80ad5724..a7f9aee86 100644 --- a/extension/bindplaneextension/extension.go +++ b/extension/bindplaneextension/extension.go @@ -36,7 +36,7 @@ type bindplaneExtension struct { logger *zap.Logger cfg *Config measurementsRegistry *measurements.ResettableThroughputMeasurementsRegistry - topologyRegistry *topology.ResettableConfigTopologyRegistry + topologyRegistry *topology.ResettableTopologyRegistry customCapabilityHandlerThroughput opampcustommessages.CustomCapabilityHandler customCapabilityHandlerTopology opampcustommessages.CustomCapabilityHandler topologyInterval time.Duration @@ -50,7 +50,7 @@ func newBindplaneExtension(logger *zap.Logger, cfg *Config) *bindplaneExtension logger: logger, cfg: cfg, measurementsRegistry: measurements.NewResettableThroughputMeasurementsRegistry(false), - topologyRegistry: topology.NewResettableConfigTopologyRegistry(), + topologyRegistry: topology.NewResettableTopologyRegistry(), doneChan: make(chan struct{}), wg: &sync.WaitGroup{}, } @@ -89,8 +89,8 @@ func (b *bindplaneExtension) RegisterThroughputMeasurements(processorID string, return b.measurementsRegistry.RegisterThroughputMeasurements(processorID, measurements) } -func (b *bindplaneExtension) RegisterConfigTopology(processorID string, topology *topology.ConfigTopologyState) error { - return b.topologyRegistry.RegisterConfigTopology(processorID, topology) +func (b *bindplaneExtension) RegisterTopologyState(processorID string, topology *topology.TopoState) error { + return b.topologyRegistry.RegisterTopologyState(processorID, topology) } func (b *bindplaneExtension) setupCustomCapabilities(host component.Host) error { diff --git a/internal/service/managed.go b/internal/service/managed.go index 359c82326..bb816fda2 100644 --- a/internal/service/managed.go +++ b/internal/service/managed.go @@ -58,7 +58,7 @@ func NewManagedCollectorService(col collector.Collector, logger *zap.Logger, man CollectorConfigPath: collectorConfigPath, LoggerConfigPath: loggerConfigPath, MeasurementsReporter: measurements.BindplaneAgentThroughputMeasurementsRegistry, - TopologyReporter: topology.BindplaneAgentConfigTopologyRegistry, + TopologyReporter: topology.BindplaneAgentTopologyRegistry, } // Create new client diff --git a/internal/topology/bindplane_agent_topology.go b/internal/topology/bindplane_agent_topology.go index 44466b9fe..6089c187f 100644 --- a/internal/topology/bindplane_agent_topology.go +++ b/internal/topology/bindplane_agent_topology.go @@ -14,5 +14,5 @@ package topology -// BindplaneAgentConfigTopologyRegistry is the registry singleton used by bindplane agent to track topology state -var BindplaneAgentConfigTopologyRegistry = NewResettableConfigTopologyRegistry() +// BindplaneAgentTopologyRegistry is the registry singleton used by bindplane agent to track topology state +var BindplaneAgentTopologyRegistry = NewResettableTopologyRegistry() diff --git a/internal/topology/topology.go b/internal/topology/topology.go index 280c7d647..b963dddf5 100644 --- a/internal/topology/topology.go +++ b/internal/topology/topology.go @@ -22,86 +22,90 @@ import ( "time" ) -// ConfigTopologyRegistry represents a registry for the topology processor to register their ConfigTopology. -type ConfigTopologyRegistry interface { - // RegisterConfigTopology registers the topology state for the given processor. +// TopoRegistry represents a registry for the topology processor to register their TopologyState. +type TopoRegistry interface { + // RegisterTopologyState registers the topology state for the given processor. // It should return an error if the processor has already been registered. - RegisterConfigTopology(processorID string, data *ConfigTopologyState) error + RegisterTopologyState(processorID string, data *TopoState) error SetIntervalChan() chan time.Duration Reset() } -// GatewayInfo represents the unique identifiable information about a bindplane gateway's configuration -type GatewayInfo struct { - GatewayID string - ConfigName string - AccountID string - OrgID string +// TopoState represents the data captured through topology processors. +type TopoState struct { + Topology topology + mux sync.Mutex } -// ConfigTopologyState represents the data captured through topology processors. -type ConfigTopologyState struct { - ConfigTopology configTopology - mux sync.Mutex +type topology struct { + // GatewaySource is the gateway source that the entries in the route table point to + GatewaySource GatewayInfo + // RouteTable is a map of gateway destinations to the time at which they were last detected + RouteTable map[GatewayInfo]time.Time } -type configTopology struct { - DestGateway GatewayInfo - RouteTable map[GatewayInfo]time.Time +// GatewayInfo represents a bindplane gateway source or destination +type GatewayInfo struct { + // OrganizationID is the organizationID where this gateway dest/source lives + OrganizationID string `json:"organizationID"` + // AccountID is the accountID where this gateway dest/source lives + AccountID string `json:"accountID"` + // Configuration is the name of the configuration where this gateway dest/source lives + Configuration string `json:"configuration"` + // GatewayID is the ComponentID of a gateway source, or the resource name of a gateway destination + GatewayID string `json:"gatewayID"` } -// ConfigTopologyInfo represents topology relationships between configs. -type ConfigTopologyInfo struct { - GatewayID string `json:"gatewayID"` - ConfigName string `json:"configName"` - AccountID string `json:"accountID"` - OrgID string `json:"orgID"` - SourceConfigs []ConfigRecord `json:"sourceConfigs"` +// GatewayRecord represents a gateway destination and the time it was last detected +type GatewayRecord struct { + // Gateway represents a gateway destinations + Gateway GatewayInfo `json:"gateway"` + // LastUpdated is a timestamp of the last time a message w/ topology headers was detected from the above gateway destination + LastUpdated time.Time `json:"lastUpdated"` } -// ConfigRecord represents a gateway source and the time it was last detected -type ConfigRecord struct { - ConfigName string `json:"configName"` - AccountID string `json:"accountID"` - OrgID string `json:"orgID"` - LastUpdated time.Time `json:"lastUpdated"` +// TopoInfo represents a gateway source & the gateway destinations that point to it. +type TopoInfo struct { + GatewaySource GatewayInfo `json:"gatewaySource"` + GatewayDestinations []GatewayRecord `json:"gatewayDestinations"` } -// NewConfigTopologyState initializes a new ConfigTopologyState -func NewConfigTopologyState(destGateway GatewayInfo) (*ConfigTopologyState, error) { - return &ConfigTopologyState{ - ConfigTopology: configTopology{ - DestGateway: destGateway, - RouteTable: make(map[GatewayInfo]time.Time), +// NewTopologyState initializes a new TopologyState +func NewTopologyState(gw GatewayInfo) (*TopoState, error) { + return &TopoState{ + Topology: topology{ + GatewaySource: gw, + RouteTable: make(map[GatewayInfo]time.Time), }, mux: sync.Mutex{}, }, nil } // UpsertRoute upserts given route. -func (ts *ConfigTopologyState) UpsertRoute(_ context.Context, gw GatewayInfo) { +func (ts *TopoState) UpsertRoute(_ context.Context, gw GatewayInfo) { ts.mux.Lock() defer ts.mux.Unlock() - ts.ConfigTopology.RouteTable[gw] = time.Now() + ts.Topology.RouteTable[gw] = time.Now() } -// ResettableConfigTopologyRegistry is a concrete version of TopologyDataRegistry that is able to be reset. -type ResettableConfigTopologyRegistry struct { +// ResettableTopologyRegistry is a concrete version of TopologyDataRegistry that is able to be reset. +type ResettableTopologyRegistry struct { topology *sync.Map setIntervalChan chan time.Duration } -// NewResettableConfigTopologyRegistry creates a new ResettableConfigTopologyRegistry -func NewResettableConfigTopologyRegistry() *ResettableConfigTopologyRegistry { - return &ResettableConfigTopologyRegistry{ - topology: &sync.Map{}, +// NewResettableTopologyRegistry creates a new ResettableTopologyRegistry +func NewResettableTopologyRegistry() *ResettableTopologyRegistry { + return &ResettableTopologyRegistry{ + topology: &sync.Map{}, + setIntervalChan: make(chan time.Duration, 1), } } -// RegisterConfigTopology registers the ConfigTopology with the registry. -func (rtsr *ResettableConfigTopologyRegistry) RegisterConfigTopology(processorID string, configTopology *ConfigTopologyState) error { - _, alreadyExists := rtsr.topology.LoadOrStore(processorID, configTopology) +// RegisterTopologyState registers the TopologyState with the registry. +func (rtsr *ResettableTopologyRegistry) RegisterTopologyState(processorID string, topoState *TopoState) error { + _, alreadyExists := rtsr.topology.LoadOrStore(processorID, topoState) if alreadyExists { return fmt.Errorf("topology for processor %q was already registered", processorID) } @@ -110,41 +114,44 @@ func (rtsr *ResettableConfigTopologyRegistry) RegisterConfigTopology(processorID } // Reset unregisters all topology states in this registry -func (rtsr *ResettableConfigTopologyRegistry) Reset() { +func (rtsr *ResettableTopologyRegistry) Reset() { rtsr.topology = &sync.Map{} } // SetIntervalChan returns the setIntervalChan -func (rtsr *ResettableConfigTopologyRegistry) SetIntervalChan() chan time.Duration { +func (rtsr *ResettableTopologyRegistry) SetIntervalChan() chan time.Duration { return rtsr.setIntervalChan } // TopologyInfos returns all the topology data in this registry. -func (rtsr *ResettableConfigTopologyRegistry) TopologyInfos() []ConfigTopologyInfo { - states := []configTopology{} +func (rtsr *ResettableTopologyRegistry) TopologyInfos() []TopoInfo { + states := []topology{} rtsr.topology.Range(func(_, value any) bool { - ts := value.(*ConfigTopologyState) - states = append(states, ts.ConfigTopology) + ts := value.(*TopoState) + states = append(states, ts.Topology) return true }) - ti := []ConfigTopologyInfo{} + ti := []TopoInfo{} for _, ts := range states { - curInfo := ConfigTopologyInfo{} - curInfo.GatewayID = ts.DestGateway.GatewayID - curInfo.ConfigName = ts.DestGateway.ConfigName - curInfo.AccountID = ts.DestGateway.AccountID - curInfo.OrgID = ts.DestGateway.OrgID + curInfo := TopoInfo{} + curInfo.GatewaySource.OrganizationID = ts.GatewaySource.OrganizationID + curInfo.GatewaySource.AccountID = ts.GatewaySource.AccountID + curInfo.GatewaySource.Configuration = ts.GatewaySource.Configuration + curInfo.GatewaySource.GatewayID = ts.GatewaySource.GatewayID for gw, updated := range ts.RouteTable { - curInfo.SourceConfigs = append(curInfo.SourceConfigs, ConfigRecord{ - ConfigName: gw.ConfigName, - AccountID: gw.AccountID, - OrgID: gw.OrgID, + curInfo.GatewayDestinations = append(curInfo.GatewayDestinations, GatewayRecord{ + Gateway: GatewayInfo{ + OrganizationID: gw.OrganizationID, + AccountID: gw.AccountID, + Configuration: gw.Configuration, + GatewayID: gw.GatewayID, + }, LastUpdated: updated.UTC(), }) } - if len(curInfo.SourceConfigs) > 0 { + if len(curInfo.GatewayDestinations) > 0 { ti = append(ti, curInfo) } } diff --git a/opamp/observiq/topology.go b/opamp/observiq/topology.go index 78f9dad68..414777d96 100644 --- a/opamp/observiq/topology.go +++ b/opamp/observiq/topology.go @@ -17,7 +17,6 @@ package observiq import ( "encoding/json" "errors" - "fmt" "sync" "time" @@ -31,7 +30,7 @@ import ( // TopologyReporter represents an object that reports topology state. type TopologyReporter interface { - TopologyInfos() []topology.ConfigTopologyInfo + TopologyInfos() []topology.TopoInfo SetIntervalChan() chan time.Duration } @@ -96,12 +95,9 @@ func (ts *topologySender) loop() { t := newTicker() defer t.Stop() - fmt.Println("topology sender loop") - for { select { case setInterval := <-ts.reporter.SetIntervalChan(): - fmt.Println("topology sender loop: received interval: ", setInterval) t.SetInterval(setInterval) case <-ts.done: return diff --git a/processor/topologyprocessor/README.md b/processor/topologyprocessor/README.md index 1d8628b96..e31062a5f 100644 --- a/processor/topologyprocessor/README.md +++ b/processor/topologyprocessor/README.md @@ -2,7 +2,7 @@ This processor utilizes request headers to provide extended topology functionality in BindPlane. ## Minimum agent versions -- Introduced: [v1.6.6](https://github.com/observIQ/bindplane-agent/releases/tag/v1.6.6) +- Introduced: [v1.6.7](https://github.com/observIQ/bindplane-agent/releases/tag/v1.6.7) ## Supported pipelines: - Logs @@ -10,13 +10,12 @@ This processor utilizes request headers to provide extended topology functionali - Traces ## Configuration -| Field | Type | Default | Description | -|---------------------|-----------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `enabled` | bool | `false` | When `true`, this processor will look for incoming topology headers and track the relevant connections accordingly. If false this processor acts as a no-op. | -| `interval` | duration | `1m` | The interval at which topology data is sent to Bindplane via OpAMP. | -| `configName` | string | | The name of the Bindplane configuration this processor is running on. | -| `orgID` | string | | The Organization ID of the Bindplane configuration where this processor is running. | -| `accountID` | string | | The Account ID of the Bindplane configuration where this processor is running. | +| Field | Type | Default | Description | +|----------------------|-----------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `interval` | duration | `1m` | The interval at which topology data is sent to Bindplane via OpAMP. | +| `organizationID` | string | | The Organization ID of the Bindplane configuration where this processor is running. | +| `accountID` | string | | The Account ID of the Bindplane configuration where this processor is running. | +| `configuration` | string | | The name of the Bindplane configuration this processor is running on. | ### Example configuration @@ -30,11 +29,11 @@ receivers: processors: topology: - enabled: true interval: 1m - configName: "myConfigName" - orgID: "myOrgID" + organizationID: "myOrganizationID" accountID: "myAccountID" + configuration: "myConfiguration" + exporters: googlecloud: diff --git a/processor/topologyprocessor/bindplane_registry.go b/processor/topologyprocessor/bindplane_registry.go index 468af5031..d70924774 100644 --- a/processor/topologyprocessor/bindplane_registry.go +++ b/processor/topologyprocessor/bindplane_registry.go @@ -17,17 +17,12 @@ package topologyprocessor import ( - "fmt" - "github.com/observiq/bindplane-agent/internal/topology" "go.opentelemetry.io/collector/component" ) // GetTopologyRegistry returns the topology registry that should be registered to based on the component ID. // nil, nil may be returned by this function. In this case, the processor should not register it's topology state anywhere. -func GetTopologyRegistry(host component.Host, bindplane component.ID) (topology.ConfigTopologyRegistry, error) { - - fmt.Println("in Bindplane Registry") - - return topology.BindplaneAgentConfigTopologyRegistry, nil +func GetTopologyRegistry(host component.Host, bindplane component.ID) (topology.TopoRegistry, error) { + return topology.BindplaneAgentTopologyRegistry, nil } diff --git a/processor/topologyprocessor/config.go b/processor/topologyprocessor/config.go index 7b0d8d021..d5763742b 100644 --- a/processor/topologyprocessor/config.go +++ b/processor/topologyprocessor/config.go @@ -35,14 +35,11 @@ type Config struct { // Bindplane extension to use in order to report topology. Optional. BindplaneExtension component.ID `mapstructure:"bindplane_extension"` - // ComponentID of the Gateway Source where this processor is present - GatewayID string `mapstructure:"gatewayID"` - // Name of the Config where this processor is present - ConfigName string `mapstructure:"configName"` + Configuration string `mapstructure:"configuration"` - // OrgID of the Org where this processor is present - OrgID string `mapstructure:"orgID"` + // OrganizationID of the Org where this processor is present + OrganizationID string `mapstructure:"organizationID"` // AccountID of the Account where this processor is present AccountID string `mapstructure:"accountID"` @@ -55,16 +52,12 @@ func (cfg Config) Validate() error { return nil } - if cfg.GatewayID == "" { - return errors.New("`gatewayID` must be specified") - } - - if cfg.ConfigName == "" { - return errors.New("`configName` must be specified") + if cfg.Configuration == "" { + return errors.New("`configuration` must be specified") } - if cfg.OrgID == "" { - return errors.New("`orgID` must be specified") + if cfg.OrganizationID == "" { + return errors.New("`organizationID` must be specified") } if cfg.AccountID == "" { diff --git a/processor/topologyprocessor/config_test.go b/processor/topologyprocessor/config_test.go index a0b355e6f..a7d43a203 100644 --- a/processor/topologyprocessor/config_test.go +++ b/processor/topologyprocessor/config_test.go @@ -26,12 +26,12 @@ func TestConfigValidate(t *testing.T) { require.NoError(t, err) }) - t.Run("Empty configName", func(t *testing.T) { + t.Run("Empty configuration", func(t *testing.T) { cfg := Config{ - Enabled: true, - Interval: defaultInterval, - AccountID: "myacct", - OrgID: "myorg", + Enabled: true, + Interval: defaultInterval, + AccountID: "myacct", + OrganizationID: "myorg", } err := cfg.Validate() require.Error(t, err) @@ -39,21 +39,21 @@ func TestConfigValidate(t *testing.T) { t.Run("Empty AccountID", func(t *testing.T) { cfg := Config{ - Enabled: true, - Interval: defaultInterval, - OrgID: "myorg", - ConfigName: "myconfig", + Enabled: true, + Interval: defaultInterval, + OrganizationID: "myorg", + Configuration: "myconfig", } err := cfg.Validate() require.Error(t, err) }) - t.Run("Empty OrgID", func(t *testing.T) { + t.Run("Empty OrganizationID", func(t *testing.T) { cfg := Config{ - Enabled: true, - Interval: defaultInterval, - AccountID: "myacct", - ConfigName: "myconfig", + Enabled: true, + Interval: defaultInterval, + AccountID: "myacct", + Configuration: "myconfig", } err := cfg.Validate() require.Error(t, err) diff --git a/processor/topologyprocessor/factory_test.go b/processor/topologyprocessor/factory_test.go index 33d273525..f6ee25cf9 100644 --- a/processor/topologyprocessor/factory_test.go +++ b/processor/topologyprocessor/factory_test.go @@ -70,9 +70,9 @@ func TestCreateProcessorTwice_Logs(t *testing.T) { cfg := &Config{ Enabled: true, Interval: defaultInterval, - ConfigName: "myConf", + Configuration: "myConf", AccountID: "myAcct", - OrgID: "myOrg", + OrganizationID: "myOrg", BindplaneExtension: bindplaneExtensionID, } @@ -82,7 +82,7 @@ func TestCreateProcessorTwice_Logs(t *testing.T) { require.NoError(t, err) mockBindplane := mockTopologyRegistry{ - ResettableConfigTopologyRegistry: topology.NewResettableConfigTopologyRegistry(), + ResettableTopologyRegistry: topology.NewResettableTopologyRegistry(), } mh := mockHost{ @@ -108,9 +108,9 @@ func TestCreateProcessorTwice_Metrics(t *testing.T) { cfg := &Config{ Enabled: true, Interval: defaultInterval, - ConfigName: "myConf", + Configuration: "myConf", AccountID: "myAcct", - OrgID: "myOrg", + OrganizationID: "myOrg", BindplaneExtension: bindplaneExtensionID, } @@ -120,7 +120,7 @@ func TestCreateProcessorTwice_Metrics(t *testing.T) { require.NoError(t, err) mockBindplane := mockTopologyRegistry{ - ResettableConfigTopologyRegistry: topology.NewResettableConfigTopologyRegistry(), + ResettableTopologyRegistry: topology.NewResettableTopologyRegistry(), } mh := mockHost{ @@ -146,9 +146,9 @@ func TestCreateProcessorTwice_Traces(t *testing.T) { cfg := &Config{ Enabled: true, Interval: defaultInterval, - ConfigName: "myConf", + Configuration: "myConf", AccountID: "myAcct", - OrgID: "myOrg", + OrganizationID: "myOrg", BindplaneExtension: bindplaneExtensionID, } @@ -158,7 +158,7 @@ func TestCreateProcessorTwice_Traces(t *testing.T) { require.NoError(t, err) mockBindplane := mockTopologyRegistry{ - ResettableConfigTopologyRegistry: topology.NewResettableConfigTopologyRegistry(), + ResettableTopologyRegistry: topology.NewResettableTopologyRegistry(), } mh := mockHost{ @@ -186,7 +186,7 @@ func (m mockHost) GetExtensions() map[component.ID]component.Component { } type mockTopologyRegistry struct { - *topology.ResettableConfigTopologyRegistry + *topology.ResettableTopologyRegistry } func (mockTopologyRegistry) Start(_ context.Context, _ component.Host) error { return nil } diff --git a/processor/topologyprocessor/ocb_registry.go b/processor/topologyprocessor/ocb_registry.go index cac1a560d..28f5f937e 100644 --- a/processor/topologyprocessor/ocb_registry.go +++ b/processor/topologyprocessor/ocb_registry.go @@ -25,8 +25,7 @@ import ( // GetTopologyRegistry returns the topology registry that should be registered to based on the component ID. // nil, nil may be returned by this function. In this case, the processor should not register it's topology state anywhere. -func GetTopologyRegistry(host component.Host, bindplane component.ID) (topology.ConfigTopologyRegistry, error) { - fmt.Println("in OCB Registry") +func GetTopologyRegistry(host component.Host, bindplane component.ID) (topology.TopoRegistry, error) { var emptyComponentID component.ID if bindplane == emptyComponentID { // No bindplane component referenced, so we won't register our topology state anywhere. @@ -38,7 +37,7 @@ func GetTopologyRegistry(host component.Host, bindplane component.ID) (topology. return nil, fmt.Errorf("bindplane extension %q does not exist", bindplane) } - registry, ok := ext.(topology.ConfigTopologyRegistry) + registry, ok := ext.(topology.TopoRegistry) if !ok { return nil, fmt.Errorf("extension %q is not an topology state registry", bindplane) } diff --git a/processor/topologyprocessor/processor.go b/processor/topologyprocessor/processor.go index 3c3311c2f..075a3bba3 100644 --- a/processor/topologyprocessor/processor.go +++ b/processor/topologyprocessor/processor.go @@ -33,7 +33,8 @@ import ( const ( organizationIDHeader = "X-Bindplane-Organization-ID" accountIDHeader = "X-Bindplane-Account-ID" - configNameHeader = "X-Bindplane-Configuration" + configurationHeader = "X-Bindplane-Configuration" + resourceNameHeader = "X-Bindplane-ResourceName" ) type topologyUpdate struct { @@ -44,7 +45,7 @@ type topologyUpdate struct { type topologyProcessor struct { logger *zap.Logger enabled bool - topology *topology.ConfigTopologyState + topology *topology.TopoState interval time.Duration processorID component.ID bindplane component.ID @@ -55,12 +56,12 @@ type topologyProcessor struct { // newTopologyProcessor creates a new topology processor func newTopologyProcessor(logger *zap.Logger, cfg *Config, processorID component.ID) (*topologyProcessor, error) { destGw := topology.GatewayInfo{ - GatewayID: strings.TrimPrefix(cfg.GatewayID, "bindplane_gateway/"), - ConfigName: cfg.ConfigName, - AccountID: cfg.AccountID, - OrgID: cfg.OrgID, + GatewayID: strings.TrimPrefix(processorID.String(), "topology/"), + Configuration: cfg.Configuration, + AccountID: cfg.AccountID, + OrganizationID: cfg.OrganizationID, } - topology, err := topology.NewConfigTopologyState(destGw) + topology, err := topology.NewTopologyState(destGw) if err != nil { return nil, fmt.Errorf("create topology state: %w", err) } @@ -84,7 +85,7 @@ func (tp *topologyProcessor) start(_ context.Context, host component.Host) error } if registry != nil { - registerErr := registry.RegisterConfigTopology(tp.processorID.String(), tp.topology) + registerErr := registry.RegisterTopologyState(tp.processorID.String(), tp.topology) if registerErr != nil { return } @@ -113,11 +114,11 @@ func (tp *topologyProcessor) processMetrics(ctx context.Context, md pmetric.Metr func (tp *topologyProcessor) processTopologyHeaders(ctx context.Context) { metadata, ok := metadata.FromIncomingContext(ctx) if ok { - var configName, accountID, orgID string + var configuration, accountID, organizationID, resourceName string - configNameHeaders := metadata.Get(configNameHeader) - if len(configNameHeaders) > 0 { - configName = configNameHeaders[0] + configurationHeaders := metadata.Get(configurationHeader) + if len(configurationHeaders) > 0 { + configuration = configurationHeaders[0] } accountIDHeaders := metadata.Get(accountIDHeader) @@ -125,17 +126,23 @@ func (tp *topologyProcessor) processTopologyHeaders(ctx context.Context) { accountID = accountIDHeaders[0] } - orgIDHeaders := metadata.Get(organizationIDHeader) - if len(orgIDHeaders) > 0 { - orgID = orgIDHeaders[0] + organizationIDHeaders := metadata.Get(organizationIDHeader) + if len(organizationIDHeaders) > 0 { + organizationID = organizationIDHeaders[0] + } + + resourceNameHeaders := metadata.Get(resourceNameHeader) + if len(resourceNameHeaders) > 0 { + resourceName = resourceNameHeaders[0] } // only upsert if all headers are present - if configName != "" && accountID != "" && orgID != "" { + if configuration != "" && accountID != "" && organizationID != "" && resourceName != "" { gw := topology.GatewayInfo{ - ConfigName: configName, - AccountID: accountID, - OrgID: orgID, + Configuration: configuration, + AccountID: accountID, + OrganizationID: organizationID, + GatewayID: resourceName, } tp.topology.UpsertRoute(ctx, gw) } diff --git a/processor/topologyprocessor/processor_test.go b/processor/topologyprocessor/processor_test.go index 07eb9592d..a0d676de9 100644 --- a/processor/topologyprocessor/processor_test.go +++ b/processor/topologyprocessor/processor_test.go @@ -35,11 +35,11 @@ func TestProcessor_Logs(t *testing.T) { processorID := component.MustNewIDWithName("topology", "1") tmp, err := newTopologyProcessor(zap.NewNop(), &Config{ - Enabled: true, - Interval: time.Second, - OrgID: "myOrgID", - AccountID: "myAccountID", - ConfigName: "myConfigName", + Enabled: true, + Interval: time.Second, + OrganizationID: "myOrgID", + AccountID: "myAccountID", + Configuration: "myConfigName", }, processorID) require.NoError(t, err) @@ -49,7 +49,8 @@ func TestProcessor_Logs(t *testing.T) { ctx := metadata.NewIncomingContext(context.Background(), metadata.MD{ accountIDHeader: []string{"myAccountID1"}, organizationIDHeader: []string{"myOrgID1"}, - configNameHeader: []string{"myConfigName1"}, + configurationHeader: []string{"myConfigName1"}, + resourceNameHeader: []string{"myResourceName1"}, }) processedLogs, err := tmp.processLogs(ctx, logs) require.NoError(t, err) @@ -58,15 +59,16 @@ func TestProcessor_Logs(t *testing.T) { require.NoError(t, plogtest.CompareLogs(logs, processedLogs)) // validate that upsert route was performed - require.True(t, tmp.topology.ConfigTopology.DestGateway.AccountID == "myAccountID") - require.True(t, tmp.topology.ConfigTopology.DestGateway.OrgID == "myOrgID") - require.True(t, tmp.topology.ConfigTopology.DestGateway.ConfigName == "myConfigName") + require.True(t, tmp.topology.Topology.GatewaySource.AccountID == "myAccountID") + require.True(t, tmp.topology.Topology.GatewaySource.OrganizationID == "myOrgID") + require.True(t, tmp.topology.Topology.GatewaySource.Configuration == "myConfigName") ci := topology.GatewayInfo{ - ConfigName: "myConfigName1", - AccountID: "myAccountID1", - OrgID: "myOrgID1", + Configuration: "myConfigName1", + AccountID: "myAccountID1", + OrganizationID: "myOrgID1", + GatewayID: "myResourceName1", } - _, ok := tmp.topology.ConfigTopology.RouteTable[ci] + _, ok := tmp.topology.Topology.RouteTable[ci] require.True(t, ok) } @@ -74,11 +76,11 @@ func TestProcessor_Metrics(t *testing.T) { processorID := component.MustNewIDWithName("topology", "1") tmp, err := newTopologyProcessor(zap.NewNop(), &Config{ - Enabled: true, - Interval: time.Second, - OrgID: "myOrgID", - AccountID: "myAccountID", - ConfigName: "myConfigName", + Enabled: true, + Interval: time.Second, + OrganizationID: "myOrgID", + AccountID: "myAccountID", + Configuration: "myConfigName", }, processorID) require.NoError(t, err) @@ -88,7 +90,8 @@ func TestProcessor_Metrics(t *testing.T) { ctx := metadata.NewIncomingContext(context.Background(), metadata.MD{ accountIDHeader: []string{"myAccountID1"}, organizationIDHeader: []string{"myOrgID1"}, - configNameHeader: []string{"myConfigName1"}, + configurationHeader: []string{"myConfigName1"}, + resourceNameHeader: []string{"myResourceName1"}, }) processedMetrics, err := tmp.processMetrics(ctx, metrics) @@ -98,15 +101,16 @@ func TestProcessor_Metrics(t *testing.T) { require.NoError(t, pmetrictest.CompareMetrics(metrics, processedMetrics)) // validate that upsert route was performed - require.True(t, tmp.topology.ConfigTopology.DestGateway.AccountID == "myAccountID") - require.True(t, tmp.topology.ConfigTopology.DestGateway.OrgID == "myOrgID") - require.True(t, tmp.topology.ConfigTopology.DestGateway.ConfigName == "myConfigName") + require.True(t, tmp.topology.Topology.GatewaySource.AccountID == "myAccountID") + require.True(t, tmp.topology.Topology.GatewaySource.OrganizationID == "myOrgID") + require.True(t, tmp.topology.Topology.GatewaySource.Configuration == "myConfigName") ci := topology.GatewayInfo{ - ConfigName: "myConfigName1", - AccountID: "myAccountID1", - OrgID: "myOrgID1", + Configuration: "myConfigName1", + AccountID: "myAccountID1", + OrganizationID: "myOrgID1", + GatewayID: "myResourceName1", } - _, ok := tmp.topology.ConfigTopology.RouteTable[ci] + _, ok := tmp.topology.Topology.RouteTable[ci] require.True(t, ok) } @@ -114,11 +118,11 @@ func TestProcessor_Traces(t *testing.T) { processorID := component.MustNewIDWithName("topology", "1") tmp, err := newTopologyProcessor(zap.NewNop(), &Config{ - Enabled: true, - Interval: time.Second, - OrgID: "myOrgID", - AccountID: "myAccountID", - ConfigName: "myConfigName", + Enabled: true, + Interval: time.Second, + OrganizationID: "myOrgID", + AccountID: "myAccountID", + Configuration: "myConfigName", }, processorID) require.NoError(t, err) @@ -128,7 +132,8 @@ func TestProcessor_Traces(t *testing.T) { ctx := metadata.NewIncomingContext(context.Background(), metadata.MD{ accountIDHeader: []string{"myAccountID1"}, organizationIDHeader: []string{"myOrgID1"}, - configNameHeader: []string{"myConfigName1"}, + configurationHeader: []string{"myConfigName1"}, + resourceNameHeader: []string{"myResourceName1"}, }) processedTraces, err := tmp.processTraces(ctx, traces) @@ -138,15 +143,16 @@ func TestProcessor_Traces(t *testing.T) { require.NoError(t, ptracetest.CompareTraces(traces, processedTraces)) // validate that upsert route was performed - require.True(t, tmp.topology.ConfigTopology.DestGateway.AccountID == "myAccountID") - require.True(t, tmp.topology.ConfigTopology.DestGateway.OrgID == "myOrgID") - require.True(t, tmp.topology.ConfigTopology.DestGateway.ConfigName == "myConfigName") + require.True(t, tmp.topology.Topology.GatewaySource.AccountID == "myAccountID") + require.True(t, tmp.topology.Topology.GatewaySource.OrganizationID == "myOrgID") + require.True(t, tmp.topology.Topology.GatewaySource.Configuration == "myConfigName") ci := topology.GatewayInfo{ - ConfigName: "myConfigName1", - AccountID: "myAccountID1", - OrgID: "myOrgID1", + Configuration: "myConfigName1", + AccountID: "myAccountID1", + OrganizationID: "myOrgID1", + GatewayID: "myResourceName1", } - _, ok := tmp.topology.ConfigTopology.RouteTable[ci] + _, ok := tmp.topology.Topology.RouteTable[ci] require.True(t, ok) } @@ -155,20 +161,20 @@ func TestProcessor_Logs_TwoInstancesSameID(t *testing.T) { processorID := component.MustNewIDWithName("topology", "1") tmp1, err := newTopologyProcessor(zap.NewNop(), &Config{ - Enabled: true, - Interval: time.Second, - OrgID: "myOrgID", - AccountID: "myAccountID", - ConfigName: "myConfigName", + Enabled: true, + Interval: time.Second, + OrganizationID: "myOrgID", + AccountID: "myAccountID", + Configuration: "myConfigName", }, processorID) require.NoError(t, err) tmp2, err := newTopologyProcessor(zap.NewNop(), &Config{ - Enabled: true, - Interval: time.Second, - OrgID: "myOrgID2", - AccountID: "myAccountID2", - ConfigName: "myConfigName2", + Enabled: true, + Interval: time.Second, + OrganizationID: "myOrgID2", + AccountID: "myAccountID2", + Configuration: "myConfigName2", }, processorID) require.NoError(t, err) @@ -187,20 +193,20 @@ func TestProcessor_Logs_TwoInstancesDifferentID(t *testing.T) { processorID2 := component.MustNewIDWithName("topology", "2") tmp1, err := newTopologyProcessor(zap.NewNop(), &Config{ - Enabled: true, - Interval: time.Second, - OrgID: "myOrgID", - AccountID: "myAccountID", - ConfigName: "myConfigName", + Enabled: true, + Interval: time.Second, + OrganizationID: "myOrgID", + AccountID: "myAccountID", + Configuration: "myConfigName", }, processorID) require.NoError(t, err) tmp2, err := newTopologyProcessor(zap.NewNop(), &Config{ - Enabled: true, - Interval: time.Second, - OrgID: "myOrgID2", - AccountID: "myAccountID2", - ConfigName: "myConfigName2", + Enabled: true, + Interval: time.Second, + OrganizationID: "myOrgID2", + AccountID: "myAccountID2", + Configuration: "myConfigName2", }, processorID2) require.NoError(t, err)