Skip to content

Commit

Permalink
feat: Topology Processor (#1988)
Browse files Browse the repository at this point in the history
* fix: Shut down zombie goroutine in chronicleexporter (#2029)

* Properly shut down chronicleexporter zombie goroutine

* Fix lint

* Fix the same problem for the GRPC workflow

* initial structure & getting headers

* progress

* custom messages sending & receiving properly

* more progress

* more changes

* cleanup

* dont use TopologyInterval, TODO: remove TopologyInterval from BP Extension

* cleanup

* cleanup & tests

* rm print statements

* cleanup, tests, fix tests

* fix bp extension logic, fix lint

* add gatewayid parameter

* fix concurrent map write

* fix tests

* fix test

* cleanup names & data model, add ResourceNameHeader

* fix resource name header

* address pr feedback

* fix lint

* repo rename fixes

* fix rebase issue

* fix gomod versions

* fix go mod

* update topo proc go mod

* fix flaky test

* bump version to 1.68.0

---------

Co-authored-by: Ian Adams <[email protected]>
  • Loading branch information
colelaven and mrsillydog authored Dec 19, 2024
1 parent ee516ea commit 0c80959
Show file tree
Hide file tree
Showing 28 changed files with 4,430 additions and 29 deletions.
4 changes: 3 additions & 1 deletion collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/observiq/bindplane-otel-collector/factories"
"github.com/observiq/bindplane-otel-collector/internal/measurements"
"github.com/observiq/bindplane-otel-collector/internal/topology"
"go.opentelemetry.io/collector/otelcol"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -201,8 +202,9 @@ func (c *collector) Stop(ctx context.Context) {

c.svc = nil

// After shutting down, we reset the throughputs measurements registry so it's fresh for the next collector startup.
// After shutting down, we reset the registries so they're fresh for the next collector startup.
measurements.BindplaneAgentThroughputMeasurementsRegistry.Reset()
topology.BindplaneAgentTopologyRegistry.Reset()
}

// Restart will restart the collector. It will also reset the status channel.
Expand Down
2 changes: 1 addition & 1 deletion extension/bindplaneextension/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type Config struct {
// Labels in "k1=v1,k2=v2" format
Labels string `mapstructure:"labels"`
// Component ID of the opamp extension. If not specified, then
// this extension will not generate any custom messages for throughput metrics.
// this extension will not generate any custom messages for throughput metrics or topology.
OpAMP component.ID `mapstructure:"opamp"`
// MeasurementsInterval is the interval on which to report measurements.
// Measurements reporting is disabled if this duration is 0.
Expand Down
123 changes: 101 additions & 22 deletions extension/bindplaneextension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ package bindplaneextension

import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"

"github.com/golang/snappy"
"github.com/observiq/bindplane-otel-collector/internal/measurements"
"github.com/observiq/bindplane-otel-collector/internal/topology"
"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages"
"go.opentelemetry.io/collector/component"
Expand All @@ -31,43 +33,56 @@ import (
)

type bindplaneExtension struct {
logger *zap.Logger
cfg *Config
ctmr *measurements.ResettableThroughputMeasurementsRegistry
customCapabilityHandler opampcustommessages.CustomCapabilityHandler
doneChan chan struct{}
wg *sync.WaitGroup
logger *zap.Logger
cfg *Config
measurementsRegistry *measurements.ResettableThroughputMeasurementsRegistry
topologyRegistry *topology.ResettableTopologyRegistry
customCapabilityHandlerThroughput opampcustommessages.CustomCapabilityHandler
customCapabilityHandlerTopology opampcustommessages.CustomCapabilityHandler

doneChan chan struct{}
wg *sync.WaitGroup
}

func newBindplaneExtension(logger *zap.Logger, cfg *Config) *bindplaneExtension {
return &bindplaneExtension{
logger: logger,
cfg: cfg,
ctmr: measurements.NewResettableThroughputMeasurementsRegistry(false),
doneChan: make(chan struct{}),
wg: &sync.WaitGroup{},
logger: logger,
cfg: cfg,
measurementsRegistry: measurements.NewResettableThroughputMeasurementsRegistry(false),
topologyRegistry: topology.NewResettableTopologyRegistry(),
doneChan: make(chan struct{}),
wg: &sync.WaitGroup{},
}
}

func (b *bindplaneExtension) Start(_ context.Context, host component.Host) error {
var emptyComponentID component.ID

// Set up measurements if enabled
if b.cfg.OpAMP != emptyComponentID && b.cfg.MeasurementsInterval > 0 {
// Set up custom capabilities if enabled
if b.cfg.OpAMP != emptyComponentID {
err := b.setupCustomCapabilities(host)
if err != nil {
return fmt.Errorf("setup capability handler: %w", err)
}

if b.cfg.MeasurementsInterval > 0 {
b.wg.Add(1)
go b.reportMetricsLoop()
}

b.wg.Add(1)
go b.reportMetricsLoop()
go b.reportTopologyLoop()
}

return nil
}

func (b *bindplaneExtension) RegisterThroughputMeasurements(processorID string, measurements *measurements.ThroughputMeasurements) error {
return b.ctmr.RegisterThroughputMeasurements(processorID, measurements)
return b.measurementsRegistry.RegisterThroughputMeasurements(processorID, measurements)
}

func (b *bindplaneExtension) RegisterTopologyState(processorID string, topology *topology.TopoState) error {
return b.topologyRegistry.RegisterTopologyState(processorID, topology)
}

func (b *bindplaneExtension) setupCustomCapabilities(host component.Host) error {
Expand All @@ -82,9 +97,16 @@ func (b *bindplaneExtension) setupCustomCapabilities(host component.Host) error
}

var err error
b.customCapabilityHandler, err = registry.Register(measurements.ReportMeasurementsV1Capability)
if b.cfg.MeasurementsInterval > 0 {
b.customCapabilityHandlerThroughput, err = registry.Register(measurements.ReportMeasurementsV1Capability)
if err != nil {
return fmt.Errorf("register custom measurements capability: %w", err)
}
}

b.customCapabilityHandlerTopology, err = registry.Register(topology.ReportTopologyCapability)
if err != nil {
return fmt.Errorf("register custom capability: %w", err)
return fmt.Errorf("register custom topology capability: %w", err)
}

return nil
Expand Down Expand Up @@ -119,7 +141,7 @@ func (b *bindplaneExtension) reportMetricsLoop() {
}

func (b *bindplaneExtension) reportMetrics() error {
m := b.ctmr.OTLPMeasurements(b.cfg.ExtraMeasurementsAttributes)
m := b.measurementsRegistry.OTLPMeasurements(b.cfg.ExtraMeasurementsAttributes)

// Send metrics as snappy-encoded otlp proto
marshaller := pmetric.ProtoMarshaler{}
Expand All @@ -130,15 +152,68 @@ func (b *bindplaneExtension) reportMetrics() error {

encoded := snappy.Encode(nil, marshalled)
for {
sendingChannel, err := b.customCapabilityHandler.SendMessage(measurements.ReportMeasurementsType, encoded)
sendingChannel, err := b.customCapabilityHandlerThroughput.SendMessage(measurements.ReportMeasurementsType, encoded)
switch {
case err == nil:
return nil
case errors.Is(err, types.ErrCustomMessagePending):
<-sendingChannel
continue
default:
return fmt.Errorf("send custom message: %w", err)
return fmt.Errorf("send custom throughput message: %w", err)
}
}
}

func (b *bindplaneExtension) reportTopologyLoop() {
defer b.wg.Done()

var topologyInterval time.Duration
select {
case <-b.doneChan:
return
case topologyInterval = <-b.topologyRegistry.SetIntervalChan():
if topologyInterval <= 0 {
return
}
}

t := time.NewTicker(topologyInterval)
defer t.Stop()

for {
select {
case <-t.C:
err := b.reportTopology()
if err != nil {
b.logger.Error("Failed to report topology.", zap.Error(err))
}
case <-b.doneChan:
return
}
}
}

func (b *bindplaneExtension) reportTopology() error {
ts := b.topologyRegistry.TopologyInfos()

// Send topology state snappy-encoded
marshalled, err := json.Marshal(ts)
if err != nil {
return fmt.Errorf("marshal topology state: %w", err)
}

encoded := snappy.Encode(nil, marshalled)
for {
sendingChannel, err := b.customCapabilityHandlerTopology.SendMessage(topology.ReportTopologyType, encoded)
switch {
case err == nil:
return nil
case errors.Is(err, types.ErrCustomMessagePending):
<-sendingChannel
continue
default:
return fmt.Errorf("send custom topology message: %w", err)
}
}
}
Expand All @@ -158,8 +233,12 @@ func (b *bindplaneExtension) Shutdown(ctx context.Context) error {
case <-waitgroupDone: // OK
}

if b.customCapabilityHandler != nil {
b.customCapabilityHandler.Unregister()
if b.customCapabilityHandlerThroughput != nil {
b.customCapabilityHandlerThroughput.Unregister()
}

if b.customCapabilityHandlerTopology != nil {
b.customCapabilityHandlerTopology.Unregister()
}

return nil
Expand Down
3 changes: 3 additions & 0 deletions extension/bindplaneextension/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22.7
require (
github.com/golang/snappy v0.0.4
github.com/observiq/bindplane-otel-collector/internal/measurements v1.68.0
github.com/observiq/bindplane-otel-collector/internal/topology v1.68.0
github.com/open-telemetry/opamp-go v0.17.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages v0.116.0
github.com/stretchr/testify v1.10.0
Expand Down Expand Up @@ -51,3 +52,5 @@ require (
)

replace github.com/observiq/bindplane-otel-collector/internal/measurements => ../../internal/measurements

replace github.com/observiq/bindplane-otel-collector/internal/topology => ../../internal/topology
2 changes: 2 additions & 0 deletions factories/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/observiq/bindplane-otel-collector/processor/samplingprocessor"
"github.com/observiq/bindplane-otel-collector/processor/spancountprocessor"
"github.com/observiq/bindplane-otel-collector/processor/throughputmeasurementprocessor"
"github.com/observiq/bindplane-otel-collector/processor/topologyprocessor"
"github.com/observiq/bindplane-otel-collector/processor/unrollprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor"
Expand Down Expand Up @@ -87,6 +88,7 @@ var defaultProcessors = []processor.Factory{
spanprocessor.NewFactory(),
throughputmeasurementprocessor.NewFactory(),
tailsamplingprocessor.NewFactory(),
topologyprocessor.NewFactory(),
transformprocessor.NewFactory(),
unrollprocessor.NewFactory(),
}
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ require (
)

require (
github.com/observiq/bindplane-otel-collector/internal/topology v1.68.0
github.com/observiq/bindplane-otel-collector/processor/topologyprocessor v1.68.0
github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/aesprovider v0.116.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor v0.116.0
go.opentelemetry.io/collector/extension/extensiontest v0.116.0
Expand Down Expand Up @@ -853,6 +855,8 @@ replace github.com/observiq/bindplane-otel-collector/processor/lookupprocessor =

replace github.com/observiq/bindplane-otel-collector/processor/unrollprocessor => ./processor/unrollprocessor

replace github.com/observiq/bindplane-otel-collector/processor/topologyprocessor => ./processor/topologyprocessor

replace github.com/observiq/bindplane-otel-collector/expr => ./expr

replace github.com/observiq/bindplane-otel-collector/counter => ./counter
Expand All @@ -875,6 +879,8 @@ replace github.com/observiq/bindplane-otel-collector/internal/report => ./intern

replace github.com/observiq/bindplane-otel-collector/internal/measurements => ./internal/measurements

replace github.com/observiq/bindplane-otel-collector/internal/topology => ./internal/topology

replace github.com/observiq/bindplane-otel-collector/receiver/splunksearchapireceiver => ./receiver/splunksearchapireceiver

// Does not build with windows and only used in configschema executable
Expand Down
2 changes: 2 additions & 0 deletions internal/service/managed.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"

"github.com/observiq/bindplane-otel-collector/internal/measurements"
"github.com/observiq/bindplane-otel-collector/internal/topology"

"github.com/observiq/bindplane-otel-collector/collector"
"github.com/observiq/bindplane-otel-collector/internal/version"
Expand Down Expand Up @@ -57,6 +58,7 @@ func NewManagedCollectorService(col collector.Collector, logger *zap.Logger, man
CollectorConfigPath: collectorConfigPath,
LoggerConfigPath: loggerConfigPath,
MeasurementsReporter: measurements.BindplaneAgentThroughputMeasurementsRegistry,
TopologyReporter: topology.BindplaneAgentTopologyRegistry,
}

// Create new client
Expand Down
18 changes: 18 additions & 0 deletions internal/topology/bindplane_agent_topology.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package topology

// BindplaneAgentTopologyRegistry is the registry singleton used by bindplane agent to track topology state
var BindplaneAgentTopologyRegistry = NewResettableTopologyRegistry()
22 changes: 22 additions & 0 deletions internal/topology/custom_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package topology

const (
// ReportTopologyCapability is the capability needed to report topology to bindplane
ReportTopologyCapability = "com.bindplane.topology"
// ReportTopologyType is the type for reporting topology to BindPlane
ReportTopologyType = "reportTopology"
)
3 changes: 3 additions & 0 deletions internal/topology/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/observiq/bindplane-otel-collector/internal/topology

go 1.22.6
Loading

0 comments on commit 0c80959

Please sign in to comment.