Skip to content

Commit

Permalink
Scraping: stop storing discovered labels
Browse files Browse the repository at this point in the history
Instead of storing discovered labels on every target, recompute them if
required. The `Target` struct now needs to hold some more data required
to recompute them, such as ScrapeConfig.

This moves the load from every Prometheus all of the time, to just when
someone views Service Discovery in the UI.

The way `PopulateLabels` is used changes; you are no longer expected to
call it with a part-populated `labels.Builder`.

The signature of `Target.Labels` changes to take a `labels.Builder`
instead of a `ScratchBuilder`, for consistency with `DiscoveredLabels`.

This will save a lot of work when many targets are filtered out in
relabeling. Combine with `keep_dropped_targets` to avoid ever computing
most labels for dropped targets.

Signed-off-by: Bryan Boreham <[email protected]>
  • Loading branch information
bboreham committed Nov 4, 2024
1 parent d8d52f3 commit 2fc6eaa
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 133 deletions.
4 changes: 3 additions & 1 deletion cmd/promtool/sd.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ func getSDCheckResult(targetGroups []*targetgroup.Group, scrapeConfig *config.Sc
}
}

res, orig, err := scrape.PopulateLabels(lb, scrapeConfig, noDefaultScrapePort)
scrape.PopulateDiscoveredLabels(lb, scrapeConfig, target, targetGroup.Labels)
orig := lb.Labels()
res, err := scrape.PopulateLabels(lb, scrapeConfig, noDefaultScrapePort, target, targetGroup.Labels)
result := sdCheckResult{
DiscoveredLabels: orig,
Labels: res,
Expand Down
68 changes: 36 additions & 32 deletions scrape/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package scrape
import (
"context"
"fmt"
"maps"
"net/http"
"net/http/httptest"
"net/url"
Expand Down Expand Up @@ -45,7 +46,7 @@ import (

func TestPopulateLabels(t *testing.T) {
cases := []struct {
in labels.Labels
in model.LabelSet
cfg *config.ScrapeConfig
noDefaultPort bool
res labels.Labels
Expand All @@ -54,10 +55,10 @@ func TestPopulateLabels(t *testing.T) {
}{
// Regular population of scrape config options.
{
in: labels.FromMap(map[string]string{
in: model.LabelSet{
model.AddressLabel: "1.2.3.4:1000",
"custom": "value",
}),
},
cfg: &config.ScrapeConfig{
Scheme: "https",
MetricsPath: "/metrics",
Expand Down Expand Up @@ -88,14 +89,14 @@ func TestPopulateLabels(t *testing.T) {
// Pre-define/overwrite scrape config labels.
// Leave out port and expect it to be defaulted to scheme.
{
in: labels.FromMap(map[string]string{
in: model.LabelSet{
model.AddressLabel: "1.2.3.4",
model.SchemeLabel: "http",
model.MetricsPathLabel: "/custom",
model.JobLabel: "custom-job",
model.ScrapeIntervalLabel: "2s",
model.ScrapeTimeoutLabel: "2s",
}),
},
cfg: &config.ScrapeConfig{
Scheme: "https",
MetricsPath: "/metrics",
Expand Down Expand Up @@ -123,10 +124,10 @@ func TestPopulateLabels(t *testing.T) {
},
// Provide instance label. HTTPS port default for IPv6.
{
in: labels.FromMap(map[string]string{
in: model.LabelSet{
model.AddressLabel: "[::1]",
model.InstanceLabel: "custom-instance",
}),
},
cfg: &config.ScrapeConfig{
Scheme: "https",
MetricsPath: "/metrics",
Expand Down Expand Up @@ -155,7 +156,7 @@ func TestPopulateLabels(t *testing.T) {
},
// Address label missing.
{
in: labels.FromStrings("custom", "value"),
in: model.LabelSet{"custom": "value"},
cfg: &config.ScrapeConfig{
Scheme: "https",
MetricsPath: "/metrics",
Expand All @@ -169,7 +170,7 @@ func TestPopulateLabels(t *testing.T) {
},
// Address label missing, but added in relabelling.
{
in: labels.FromStrings("custom", "host:1234"),
in: model.LabelSet{"custom": "host:1234"},
cfg: &config.ScrapeConfig{
Scheme: "https",
MetricsPath: "/metrics",
Expand Down Expand Up @@ -207,7 +208,7 @@ func TestPopulateLabels(t *testing.T) {
},
// Address label missing, but added in relabelling.
{
in: labels.FromStrings("custom", "host:1234"),
in: model.LabelSet{"custom": "host:1234"},
cfg: &config.ScrapeConfig{
Scheme: "https",
MetricsPath: "/metrics",
Expand Down Expand Up @@ -245,10 +246,10 @@ func TestPopulateLabels(t *testing.T) {
},
// Invalid UTF-8 in label.
{
in: labels.FromMap(map[string]string{
in: model.LabelSet{
model.AddressLabel: "1.2.3.4:1000",
"custom": "\xbd",
}),
},
cfg: &config.ScrapeConfig{
Scheme: "https",
MetricsPath: "/metrics",
Expand All @@ -262,10 +263,10 @@ func TestPopulateLabels(t *testing.T) {
},
// Invalid duration in interval label.
{
in: labels.FromMap(map[string]string{
in: model.LabelSet{
model.AddressLabel: "1.2.3.4:1000",
model.ScrapeIntervalLabel: "2notseconds",
}),
},
cfg: &config.ScrapeConfig{
Scheme: "https",
MetricsPath: "/metrics",
Expand All @@ -279,10 +280,10 @@ func TestPopulateLabels(t *testing.T) {
},
// Invalid duration in timeout label.
{
in: labels.FromMap(map[string]string{
in: model.LabelSet{
model.AddressLabel: "1.2.3.4:1000",
model.ScrapeTimeoutLabel: "2notseconds",
}),
},
cfg: &config.ScrapeConfig{
Scheme: "https",
MetricsPath: "/metrics",
Expand All @@ -296,10 +297,10 @@ func TestPopulateLabels(t *testing.T) {
},
// 0 interval in timeout label.
{
in: labels.FromMap(map[string]string{
in: model.LabelSet{
model.AddressLabel: "1.2.3.4:1000",
model.ScrapeIntervalLabel: "0s",
}),
},
cfg: &config.ScrapeConfig{
Scheme: "https",
MetricsPath: "/metrics",
Expand All @@ -313,10 +314,10 @@ func TestPopulateLabels(t *testing.T) {
},
// 0 duration in timeout label.
{
in: labels.FromMap(map[string]string{
in: model.LabelSet{
model.AddressLabel: "1.2.3.4:1000",
model.ScrapeTimeoutLabel: "0s",
}),
},
cfg: &config.ScrapeConfig{
Scheme: "https",
MetricsPath: "/metrics",
Expand All @@ -330,11 +331,11 @@ func TestPopulateLabels(t *testing.T) {
},
// Timeout less than interval.
{
in: labels.FromMap(map[string]string{
in: model.LabelSet{
model.AddressLabel: "1.2.3.4:1000",
model.ScrapeIntervalLabel: "1s",
model.ScrapeTimeoutLabel: "2s",
}),
},
cfg: &config.ScrapeConfig{
Scheme: "https",
MetricsPath: "/metrics",
Expand All @@ -348,9 +349,9 @@ func TestPopulateLabels(t *testing.T) {
},
// Don't attach default port.
{
in: labels.FromMap(map[string]string{
in: model.LabelSet{
model.AddressLabel: "1.2.3.4",
}),
},
cfg: &config.ScrapeConfig{
Scheme: "https",
MetricsPath: "/metrics",
Expand Down Expand Up @@ -379,9 +380,9 @@ func TestPopulateLabels(t *testing.T) {
},
// Remove default port (http).
{
in: labels.FromMap(map[string]string{
in: model.LabelSet{
model.AddressLabel: "1.2.3.4:80",
}),
},
cfg: &config.ScrapeConfig{
Scheme: "http",
MetricsPath: "/metrics",
Expand Down Expand Up @@ -410,9 +411,9 @@ func TestPopulateLabels(t *testing.T) {
},
// Remove default port (https).
{
in: labels.FromMap(map[string]string{
in: model.LabelSet{
model.AddressLabel: "1.2.3.4:443",
}),
},
cfg: &config.ScrapeConfig{
Scheme: "https",
MetricsPath: "/metrics",
Expand Down Expand Up @@ -441,17 +442,20 @@ func TestPopulateLabels(t *testing.T) {
},
}
for _, c := range cases {
in := c.in.Copy()

res, orig, err := PopulateLabels(labels.NewBuilder(c.in), c.cfg, c.noDefaultPort)
in := maps.Clone(c.in)
lb := labels.NewBuilder(labels.EmptyLabels())
res, err := PopulateLabels(lb, c.cfg, c.noDefaultPort, c.in, nil)
if c.err != "" {
require.EqualError(t, err, c.err)
} else {
require.NoError(t, err)
}
require.Equal(t, c.in, in)
testutil.RequireEqual(t, c.res, res)
testutil.RequireEqual(t, c.resOrig, orig)
if err == nil {
PopulateDiscoveredLabels(lb, c.cfg, c.in, nil)
testutil.RequireEqual(t, c.resOrig, lb.Labels())
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
switch {
case nonEmpty:
all = append(all, t)
case !t.discoveredLabels.IsEmpty():
default:
if sp.config.KeepDroppedTargets == 0 || uint(len(sp.droppedTargets)) < sp.config.KeepDroppedTargets {
sp.droppedTargets = append(sp.droppedTargets, t)
}
Expand Down Expand Up @@ -490,9 +490,9 @@ func (sp *scrapePool) sync(targets []*Target) {
if _, ok := uniqueLoops[hash]; !ok {
uniqueLoops[hash] = nil
}
// Need to keep the most updated labels information
// for displaying it in the Service Discovery web page.
sp.activeTargets[hash].SetDiscoveredLabels(t.DiscoveredLabels())
// Need to keep the most updated ScrapeConfig for
// displaying labels in the Service Discovery web page.
sp.activeTargets[hash].SetScrapeConfig(sp.config, t.tlset, t.tgLabels)
}
}

Expand Down
33 changes: 23 additions & 10 deletions scrape/scrape_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ func TestDroppedTargetsList(t *testing.T) {
sp.Sync(tgs)
require.Len(t, sp.droppedTargets, expectedLength)
require.Equal(t, expectedLength, sp.droppedTargetsCount)
require.Equal(t, expectedLabelSetString, sp.droppedTargets[0].DiscoveredLabels().String())
lb := labels.NewBuilder(labels.EmptyLabels())
require.Equal(t, expectedLabelSetString, sp.droppedTargets[0].DiscoveredLabels(lb).String())

// Check that count is still correct when we don't retain all dropped targets.
sp.config.KeepDroppedTargets = 1
Expand All @@ -136,16 +137,19 @@ func TestDiscoveredLabelsUpdate(t *testing.T) {
}
sp.activeTargets = make(map[uint64]*Target)
t1 := &Target{
discoveredLabels: labels.FromStrings("label", "name"),
tlset: model.LabelSet{"label": "name"},
scrapeConfig: sp.config,
}
sp.activeTargets[t1.hash()] = t1

t2 := &Target{
discoveredLabels: labels.FromStrings("labelNew", "nameNew"),
tlset: model.LabelSet{"labelNew": "nameNew"},
scrapeConfig: sp.config,
}
sp.sync([]*Target{t2})

require.Equal(t, t2.DiscoveredLabels(), sp.activeTargets[t1.hash()].DiscoveredLabels())
lb := labels.NewBuilder(labels.EmptyLabels())
require.Equal(t, t2.DiscoveredLabels(lb), sp.activeTargets[t1.hash()].DiscoveredLabels(lb))
}

type testLoop struct {
Expand Down Expand Up @@ -207,7 +211,8 @@ func TestScrapePoolStop(t *testing.T) {

for i := 0; i < numTargets; i++ {
t := &Target{
labels: labels.FromStrings(model.AddressLabel, fmt.Sprintf("example.com:%d", i)),
labels: labels.FromStrings(model.AddressLabel, fmt.Sprintf("example.com:%d", i)),
scrapeConfig: &config.ScrapeConfig{},
}
l := &testLoop{}
d := time.Duration((i+1)*20) * time.Millisecond
Expand Down Expand Up @@ -291,8 +296,8 @@ func TestScrapePoolReload(t *testing.T) {
for i := 0; i < numTargets; i++ {
labels := labels.FromStrings(model.AddressLabel, fmt.Sprintf("example.com:%d", i))
t := &Target{
labels: labels,
discoveredLabels: labels,
labels: labels,
scrapeConfig: &config.ScrapeConfig{},
}
l := &testLoop{}
d := time.Duration((i+1)*20) * time.Millisecond
Expand Down Expand Up @@ -2370,6 +2375,7 @@ func TestTargetScraperScrapeOK(t *testing.T) {
model.SchemeLabel, serverURL.Scheme,
model.AddressLabel, serverURL.Host,
),
scrapeConfig: &config.ScrapeConfig{},
},
client: http.DefaultClient,
timeout: configTimeout,
Expand Down Expand Up @@ -2411,6 +2417,7 @@ func TestTargetScrapeScrapeCancel(t *testing.T) {
model.SchemeLabel, serverURL.Scheme,
model.AddressLabel, serverURL.Host,
),
scrapeConfig: &config.ScrapeConfig{},
},
client: http.DefaultClient,
acceptHeader: acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols),
Expand Down Expand Up @@ -2466,6 +2473,7 @@ func TestTargetScrapeScrapeNotFound(t *testing.T) {
model.SchemeLabel, serverURL.Scheme,
model.AddressLabel, serverURL.Host,
),
scrapeConfig: &config.ScrapeConfig{},
},
client: http.DefaultClient,
acceptHeader: acceptHeader(config.DefaultGlobalConfig.ScrapeProtocols),
Expand Down Expand Up @@ -2509,6 +2517,7 @@ func TestTargetScraperBodySizeLimit(t *testing.T) {
model.SchemeLabel, serverURL.Scheme,
model.AddressLabel, serverURL.Host,
),
scrapeConfig: &config.ScrapeConfig{},
},
client: http.DefaultClient,
bodySizeLimit: bodySizeLimit,
Expand Down Expand Up @@ -2779,7 +2788,8 @@ func TestReuseScrapeCache(t *testing.T) {
}
sp, _ = newScrapePool(cfg, app, 0, nil, nil, &Options{}, newTestScrapeMetrics(t))
t1 = &Target{
discoveredLabels: labels.FromStrings("labelNew", "nameNew", "labelNew1", "nameNew1", "labelNew2", "nameNew2"),
labels: labels.FromStrings("labelNew", "nameNew", "labelNew1", "nameNew1", "labelNew2", "nameNew2"),
scrapeConfig: &config.ScrapeConfig{},
}
proxyURL, _ = url.Parse("http://localhost:2128")
)
Expand Down Expand Up @@ -2963,7 +2973,8 @@ func TestReuseCacheRace(t *testing.T) {
buffers = pool.New(1e3, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
sp, _ = newScrapePool(cfg, app, 0, nil, buffers, &Options{}, newTestScrapeMetrics(t))
t1 = &Target{
discoveredLabels: labels.FromStrings("labelNew", "nameNew"),
labels: labels.FromStrings("labelNew", "nameNew"),
scrapeConfig: &config.ScrapeConfig{},
}
)
defer sp.stop()
Expand Down Expand Up @@ -3602,7 +3613,9 @@ func BenchmarkTargetScraperGzip(b *testing.B) {
model.SchemeLabel, serverURL.Scheme,
model.AddressLabel, serverURL.Host,
),
params: url.Values{"count": []string{strconv.Itoa(scenario.metricsCount)}},
scrapeConfig: &config.ScrapeConfig{
Params: url.Values{"count": []string{strconv.Itoa(scenario.metricsCount)}},
},
},
client: client,
timeout: time.Second,
Expand Down
Loading

0 comments on commit 2fc6eaa

Please sign in to comment.