Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHORE] processing discovered targets async #3517

Merged
merged 12 commits into from
Dec 19, 2024
21 changes: 21 additions & 0 deletions .chloggen/discovering-target-async.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
component: target allocator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Process discovered targets asyncchronously

# One or more tracking issues related to the change
issues: [1842]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
This change enables the target allocator to process discovered targets asynchronously.
This is a significant performance improvement for the target allocator, as it allows it to process targets in parallel, rather than sequentially.
This change also introduces new metrics to track the performance of the target allocator.
- opentelemetry_allocator_process_targets_duration_seconds: The duration of the process targets operation.
- opentelemetry_allocator_process_target_groups_duration_seconds: The duration of the process target groups operation.
28 changes: 13 additions & 15 deletions cmd/otel-allocator/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/stretchr/testify/require"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -45,18 +44,17 @@ import (
// the HTTP server afterward. Test data is chosen to be reasonably representative of what the Prometheus service discovery
// outputs in the real world.
func BenchmarkProcessTargets(b *testing.B) {
numTargets := 10000
numTargets := 800000
targetsPerGroup := 5
groupsPerJob := 20
tsets := prepareBenchmarkData(numTargets, targetsPerGroup, groupsPerJob)
labelsBuilder := labels.NewBuilder(labels.EmptyLabels())

b.ResetTimer()
for _, strategy := range allocation.GetRegisteredAllocatorNames() {
b.Run(strategy, func(b *testing.B) {
targetDiscoverer, allocator := createTestDiscoverer(strategy, map[string][]*relabel.Config{})
targetDiscoverer := createTestDiscoverer(strategy, map[string][]*relabel.Config{})
targetDiscoverer.UpdateTsets(tsets)
b.ResetTimer()
for i := 0; i < b.N; i++ {
targetDiscoverer.ProcessTargets(labelsBuilder, tsets, allocator.SetTargets)
targetDiscoverer.Reload()
}
})
}
Expand All @@ -65,11 +63,10 @@ func BenchmarkProcessTargets(b *testing.B) {
// BenchmarkProcessTargetsWithRelabelConfig is BenchmarkProcessTargets with a relabel config set. The relabel config
// does not actually modify any records, but does force the prehook to perform any necessary conversions along the way.
func BenchmarkProcessTargetsWithRelabelConfig(b *testing.B) {
numTargets := 10000
numTargets := 800000
targetsPerGroup := 5
groupsPerJob := 20
tsets := prepareBenchmarkData(numTargets, targetsPerGroup, groupsPerJob)
labelsBuilder := labels.NewBuilder(labels.EmptyLabels())
prehookConfig := make(map[string][]*relabel.Config, len(tsets))
for jobName := range tsets {
// keep all targets in half the jobs, drop the rest
Expand All @@ -91,12 +88,13 @@ func BenchmarkProcessTargetsWithRelabelConfig(b *testing.B) {
}
}

b.ResetTimer()
for _, strategy := range allocation.GetRegisteredAllocatorNames() {
b.Run(strategy, func(b *testing.B) {
targetDiscoverer, allocator := createTestDiscoverer(strategy, prehookConfig)
targetDiscoverer := createTestDiscoverer(strategy, prehookConfig)
targetDiscoverer.UpdateTsets(tsets)
b.ResetTimer()
for i := 0; i < b.N; i++ {
targetDiscoverer.ProcessTargets(labelsBuilder, tsets, allocator.SetTargets)
targetDiscoverer.Reload()
}
})
}
Expand Down Expand Up @@ -172,7 +170,7 @@ func prepareBenchmarkData(numTargets, targetsPerGroup, groupsPerJob int) map[str
return tsets
}

func createTestDiscoverer(allocationStrategy string, prehookConfig map[string][]*relabel.Config) (*target.Discoverer, allocation.Allocator) {
func createTestDiscoverer(allocationStrategy string, prehookConfig map[string][]*relabel.Config) *target.Discoverer {
ctx := context.Background()
logger := ctrl.Log.WithName(fmt.Sprintf("bench-%s", allocationStrategy))
ctrl.SetLogger(logr.New(log.NullLogSink{}))
Expand All @@ -187,6 +185,6 @@ func createTestDiscoverer(allocationStrategy string, prehookConfig map[string][]
registry := prometheus.NewRegistry()
sdMetrics, _ := discovery.CreateAndRegisterSDMetrics(registry)
discoveryManager := discovery.NewManager(ctx, gokitlog.NewNopLogger(), registry, sdMetrics)
targetDiscoverer := target.NewDiscoverer(logger, discoveryManager, allocatorPrehook, srv)
return targetDiscoverer, allocator
targetDiscoverer := target.NewDiscoverer(logger, discoveryManager, allocatorPrehook, srv, allocator.SetTargets)
return targetDiscoverer
}
4 changes: 2 additions & 2 deletions cmd/otel-allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func main() {
}
discoveryManager = discovery.NewManager(discoveryCtx, gokitlog.NewNopLogger(), prometheus.DefaultRegisterer, sdMetrics)

targetDiscoverer = target.NewDiscoverer(log, discoveryManager, allocatorPrehook, srv)
targetDiscoverer = target.NewDiscoverer(log, discoveryManager, allocatorPrehook, srv, allocator.SetTargets)
collectorWatcher, collectorWatcherErr := collector.NewCollectorWatcher(log, cfg.ClusterConfig)
if collectorWatcherErr != nil {
setupLog.Error(collectorWatcherErr, "Unable to initialize collector watcher")
Expand Down Expand Up @@ -175,7 +175,7 @@ func main() {
setupLog.Info("Prometheus config empty, skipping initial discovery configuration")
}

err := targetDiscoverer.Watch(allocator.SetTargets)
err := targetDiscoverer.Run()
setupLog.Info("Target discoverer exited")
return err
},
Expand Down
177 changes: 139 additions & 38 deletions cmd/otel-allocator/target/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package target
import (
"hash"
"hash/fnv"
"sync"
"time"

"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"go.uber.org/zap/zapcore"
"gopkg.in/yaml.v3"

allocatorWatcher "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/watcher"
Expand All @@ -37,16 +40,33 @@ var (
Name: "opentelemetry_allocator_targets",
Help: "Number of targets discovered.",
}, []string{"job_name"})

processTargetsDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "opentelemetry_allocator_process_targets_duration_seconds",
Help: "Duration of processing targets.",
Buckets: []float64{1, 5, 10, 30, 60, 120},
})

processTargetGroupsDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "opentelemetry_allocator_process_target_groups_duration_seconds",
Help: "Duration of processing target groups.",
Buckets: []float64{1, 5, 10, 30, 60, 120},
}, []string{"job_name"})
nicolastakashi marked this conversation as resolved.
Show resolved Hide resolved
)

type Discoverer struct {
log logr.Logger
manager *discovery.Manager
close chan struct{}
configsMap map[allocatorWatcher.EventSource][]*promconfig.ScrapeConfig
hook discoveryHook
scrapeConfigsHash hash.Hash
scrapeConfigsUpdater scrapeConfigsUpdater
log logr.Logger
manager *discovery.Manager
close chan struct{}
mtxScrape sync.Mutex // Guards the fields below.
configsMap map[allocatorWatcher.EventSource][]*promconfig.ScrapeConfig
hook discoveryHook
scrapeConfigsHash hash.Hash
scrapeConfigsUpdater scrapeConfigsUpdater
targetSets map[string][]*targetgroup.Group
triggerReload chan struct{}
processTargetsCallBack func(targets map[string]*Item)
mtxTargets sync.Mutex
}

type discoveryHook interface {
Expand All @@ -57,15 +77,17 @@ type scrapeConfigsUpdater interface {
UpdateScrapeConfigResponse(map[string]*promconfig.ScrapeConfig) error
}

func NewDiscoverer(log logr.Logger, manager *discovery.Manager, hook discoveryHook, scrapeConfigsUpdater scrapeConfigsUpdater) *Discoverer {
func NewDiscoverer(log logr.Logger, manager *discovery.Manager, hook discoveryHook, scrapeConfigsUpdater scrapeConfigsUpdater, setTargets func(targets map[string]*Item)) *Discoverer {
return &Discoverer{
log: log,
manager: manager,
close: make(chan struct{}),
configsMap: make(map[allocatorWatcher.EventSource][]*promconfig.ScrapeConfig),
hook: hook,
scrapeConfigsHash: nil, // we want the first update to succeed even if the config is empty
scrapeConfigsUpdater: scrapeConfigsUpdater,
log: log,
manager: manager,
close: make(chan struct{}),
triggerReload: make(chan struct{}, 1),
configsMap: make(map[allocatorWatcher.EventSource][]*promconfig.ScrapeConfig),
hook: hook,
scrapeConfigsHash: nil, // we want the first update to succeed even if the config is empty
scrapeConfigsUpdater: scrapeConfigsUpdater,
processTargetsCallBack: setTargets,
}
}

Expand Down Expand Up @@ -105,43 +127,122 @@ func (m *Discoverer) ApplyConfig(source allocatorWatcher.EventSource, scrapeConf
return m.manager.ApplyConfig(discoveryCfg)
}

func (m *Discoverer) Watch(fn func(targets map[string]*Item)) error {
labelsBuilder := labels.NewBuilder(labels.EmptyLabels())
func (m *Discoverer) Run() error {
err := m.run(m.manager.SyncCh())
if err != nil {
m.log.Error(err, "Service Discovery watch event failed")
return err
}
<-m.close
m.log.Info("Service Discovery watch event stopped: discovery manager closed")
return nil
}

// UpdateTsets updates the target sets to be scraped.
func (m *Discoverer) UpdateTsets(tsets map[string][]*targetgroup.Group) {
m.mtxScrape.Lock()
m.targetSets = tsets
m.mtxScrape.Unlock()
swiatekm marked this conversation as resolved.
Show resolved Hide resolved
}

// reloader triggers a reload of the scrape configs at regular intervals.
// The time between reloads is defined by reloadIntervalDuration to avoid overloading the system
// with too many reloads, because some service discovery mechanisms can be quite chatty.
func (m *Discoverer) reloader() {
reloadIntervalDuration := model.Duration(5 * time.Second)
ticker := time.NewTicker(time.Duration(reloadIntervalDuration))

defer ticker.Stop()

for {
select {
case <-m.close:
m.log.Info("Service Discovery watch event stopped: discovery manager closed")
return nil
case tsets := <-m.manager.SyncCh():
m.ProcessTargets(labelsBuilder, tsets, fn)
return
case <-ticker.C:
select {
case <-m.triggerReload:
m.Reload()
case <-m.close:
return
}
}
}
}

func (m *Discoverer) ProcessTargets(builder *labels.Builder, tsets map[string][]*targetgroup.Group, fn func(targets map[string]*Item)) {
// Reload triggers a reload of the scrape configs.
// This will process the target groups and update the targets concurrently.
func (m *Discoverer) Reload() {
m.mtxScrape.Lock()
var wg sync.WaitGroup
targets := map[string]*Item{}
timer := prometheus.NewTimer(processTargetsDuration)
defer timer.ObserveDuration()

for jobName, groups := range m.targetSets {
wg.Add(1)
// Run the sync in parallel as these take a while and at high load can't catch up.
go func(jobName string, groups []*targetgroup.Group) {
processedTargets := m.processTargetGroups(jobName, groups)
m.mtxTargets.Lock()
for k, v := range processedTargets {
targets[k] = v
}
m.mtxTargets.Unlock()
wg.Done()
}(jobName, groups)
}
m.mtxScrape.Unlock()
swiatekm marked this conversation as resolved.
Show resolved Hide resolved
wg.Wait()
m.processTargetsCallBack(targets)
}

for jobName, tgs := range tsets {
var count float64 = 0
for _, tg := range tgs {
builder.Reset(labels.EmptyLabels())
for ln, lv := range tg.Labels {
// processTargetGroups processes the target groups and returns a map of targets.
func (m *Discoverer) processTargetGroups(jobName string, groups []*targetgroup.Group) map[string]*Item {
builder := labels.NewBuilder(labels.Labels{})
timer := prometheus.NewTimer(processTargetGroupsDuration.WithLabelValues(jobName))
targets := map[string]*Item{}
defer timer.ObserveDuration()
var count float64 = 0
for _, tg := range groups {
builder.Reset(labels.EmptyLabels())
for ln, lv := range tg.Labels {
builder.Set(string(ln), string(lv))
}
groupLabels := builder.Labels()
for _, t := range tg.Targets {
count++
builder.Reset(groupLabels)
for ln, lv := range t {
builder.Set(string(ln), string(lv))
}
groupLabels := builder.Labels()
for _, t := range tg.Targets {
count++
builder.Reset(groupLabels)
for ln, lv := range t {
builder.Set(string(ln), string(lv))
}
item := NewItem(jobName, string(t[model.AddressLabel]), builder.Labels(), "")
targets[item.Hash()] = item
item := NewItem(jobName, string(t[model.AddressLabel]), builder.Labels(), "")
targets[item.Hash()] = item
}
}
targetsDiscovered.WithLabelValues(jobName).Set(count)
return targets
}

// Run receives and saves target set updates and triggers the scraping loops reloading.
// Reloading happens in the background so that it doesn't block receiving targets updates.
func (m *Discoverer) run(tsets <-chan map[string][]*targetgroup.Group) error {
go m.reloader()
for {
select {
case ts := <-tsets:
m.log.V(int(zapcore.DebugLevel)).Info("Service Discovery watch event received", "targets groups", len(ts))
m.UpdateTsets(ts)

select {
case m.triggerReload <- struct{}{}:
default:
}

case <-m.close:
m.log.Info("Service Discovery watch event stopped: discovery manager closed")
return nil
}
targetsDiscovered.WithLabelValues(jobName).Set(count)
}
fn(targets)
}

func (m *Discoverer) Close() {
Expand Down
Loading
Loading