Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace string interner with an LRU and per-origin cache up top. #20943

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions .gitlab/functional_test/regression_detector.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ single-machine-performance-regression_detector:
- outputs/report.html # for debugging, also on S3
when: always
variables:
SMP_VERSION: 0.10.0
SMP_VERSION: 0.11.0
LADING_VERSION: 0.19.1
CPUS: 7
MEMORY: "30g"
Expand Down Expand Up @@ -71,7 +71,7 @@ single-machine-performance-regression_detector:
- RUST_LOG="info,aws_config::profile::credentials=error"
- RUST_LOG_DEBUG="debug,aws_config::profile::credentials=error"
- RUST_LOG="${RUST_LOG}" ./smp --team-id ${SMP_AGENT_TEAM_ID} --api-base ${SMP_API} --aws-named-profile ${AWS_NAMED_PROFILE}
job submit --use-curta
job submit
--lading-version ${LADING_VERSION}
--baseline-image ${BASELINE_IMAGE}
--comparison-image ${COMPARISON_IMAGE}
Expand All @@ -86,13 +86,15 @@ single-machine-performance-regression_detector:
--submission-metadata submission_metadata
# Wait for job to complete.
- RUST_LOG="${RUST_LOG}" ./smp --team-id ${SMP_AGENT_TEAM_ID} --api-base ${SMP_API} --aws-named-profile ${AWS_NAMED_PROFILE}
job status --use-curta
job status
--use-consignor welch
--wait
--wait-delay-seconds 60
--submission-metadata submission_metadata
# Now that the job is completed pull the analysis report, output it to stdout.
- RUST_LOG="${RUST_LOG}" ./smp --team-id ${SMP_AGENT_TEAM_ID} --api-base ${SMP_API} --aws-named-profile ${AWS_NAMED_PROFILE}
job sync --use-curta
job sync
--use-consignor welch
--submission-metadata submission_metadata
--output-path outputs
# Replace empty lines in the output with lines containing various unicode
Expand Down
8 changes: 8 additions & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,7 @@ core,github.com/hashicorp/go-multierror,MPL-2.0,"Copyright © 2014-2018 HashiCor
core,github.com/hashicorp/go-retryablehttp,MPL-2.0,"Copyright © 2014-2018 HashiCorp, Inc"
core,github.com/hashicorp/go-rootcerts,MPL-2.0,"Copyright © 2014-2018 HashiCorp, Inc"
core,github.com/hashicorp/go-version,MPL-2.0,"Copyright © 2014-2018 HashiCorp, Inc"
core,github.com/hashicorp/golang-lru,MPL-2.0,"Copyright © 2014-2018 HashiCorp, Inc"
core,github.com/hashicorp/golang-lru/simplelru,MPL-2.0,"Copyright © 2014-2018 HashiCorp, Inc"
core,github.com/hashicorp/golang-lru/v2,MPL-2.0,"Copyright © 2014-2018 HashiCorp, Inc"
core,github.com/hashicorp/golang-lru/v2/internal,MPL-2.0,"Copyright © 2014-2018 HashiCorp, Inc"
Expand Down Expand Up @@ -1883,12 +1884,19 @@ core,golang.org/x/text/encoding/korean,BSD-3-Clause,Copyright (c) 2009 The Go Au
core,golang.org/x/text/encoding/simplifiedchinese,BSD-3-Clause,Copyright (c) 2009 The Go Authors. All rights reserved
core,golang.org/x/text/encoding/traditionalchinese,BSD-3-Clause,Copyright (c) 2009 The Go Authors. All rights reserved
core,golang.org/x/text/encoding/unicode,BSD-3-Clause,Copyright (c) 2009 The Go Authors. All rights reserved
core,golang.org/x/text/feature/plural,BSD-3-Clause,Copyright (c) 2009 The Go Authors. All rights reserved
core,golang.org/x/text/internal,BSD-3-Clause,Copyright (c) 2009 The Go Authors. All rights reserved
core,golang.org/x/text/internal/catmsg,BSD-3-Clause,Copyright (c) 2009 The Go Authors. All rights reserved
core,golang.org/x/text/internal/format,BSD-3-Clause,Copyright (c) 2009 The Go Authors. All rights reserved
core,golang.org/x/text/internal/language,BSD-3-Clause,Copyright (c) 2009 The Go Authors. All rights reserved
core,golang.org/x/text/internal/language/compact,BSD-3-Clause,Copyright (c) 2009 The Go Authors. All rights reserved
core,golang.org/x/text/internal/number,BSD-3-Clause,Copyright (c) 2009 The Go Authors. All rights reserved
core,golang.org/x/text/internal/stringset,BSD-3-Clause,Copyright (c) 2009 The Go Authors. All rights reserved
core,golang.org/x/text/internal/tag,BSD-3-Clause,Copyright (c) 2009 The Go Authors. All rights reserved
core,golang.org/x/text/internal/utf8internal,BSD-3-Clause,Copyright (c) 2009 The Go Authors. All rights reserved
core,golang.org/x/text/language,BSD-3-Clause,Copyright (c) 2009 The Go Authors. All rights reserved
core,golang.org/x/text/message,BSD-3-Clause,Copyright (c) 2009 The Go Authors. All rights reserved
core,golang.org/x/text/message/catalog,BSD-3-Clause,Copyright (c) 2009 The Go Authors. All rights reserved
core,golang.org/x/text/runes,BSD-3-Clause,Copyright (c) 2009 The Go Authors. All rights reserved
core,golang.org/x/text/secure/bidirule,BSD-3-Clause,Copyright (c) 2009 The Go Authors. All rights reserved
core,golang.org/x/text/transform,BSD-3-Clause,Copyright (c) 2009 The Go Authors. All rights reserved
Expand Down
35 changes: 13 additions & 22 deletions comp/dogstatsd/server/intern.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ package server
import (
"fmt"

"github.com/DataDog/datadog-agent/pkg/config/utils"
"github.com/DataDog/datadog-agent/pkg/telemetry"
"github.com/DataDog/datadog-agent/pkg/util/cache"
)

var (
Expand All @@ -18,19 +18,6 @@ var (
// Note `New` vs `NewSimple`
tlmSIResets = telemetry.NewCounter("dogstatsd", "string_interner_resets", []string{"interner_id"},
"Amount of resets of the string interner used in dogstatsd")
tlmSIRSize = telemetry.NewGauge("dogstatsd", "string_interner_entries", []string{"interner_id"},
"Number of entries in the string interner")
tlmSIRBytes = telemetry.NewGauge("dogstatsd", "string_interner_bytes", []string{"interner_id"},
"Number of bytes stored in the string interner")
tlmSIRHits = telemetry.NewCounter("dogstatsd", "string_interner_hits", []string{"interner_id"},
"Number of times string interner returned an existing string")
tlmSIRMiss = telemetry.NewCounter("dogstatsd", "string_interner_miss", []string{"interner_id"},
"Number of times string interner created a new string object")
tlmSIRNew = telemetry.NewSimpleCounter("dogstatsd", "string_interner_new",
"Number of times string interner was created")
tlmSIRStrBytes = telemetry.NewSimpleHistogram("dogstatsd", "string_interner_str_bytes",
"Number of times string with specific length were added",
[]float64{1, 2, 4, 8, 16, 32, 64, 128})
)

// stringInterner is a string cache providing a longer life for strings,
Expand All @@ -55,13 +42,13 @@ type siTelemetry struct {
miss telemetry.SimpleCounter
}

func newStringInterner(maxSize int, internerID int) *stringInterner {
func newStringInterner(maxSize int, internerID int, enableTelemetry bool) *stringInterner {
i := &stringInterner{
strings: make(map[string]string),
id: fmt.Sprintf("interner_%d", internerID),
maxSize: maxSize,
telemetry: siTelemetry{
enabled: utils.IsTelemetryEnabled(),
enabled: enableTelemetry,
},
}

Expand All @@ -74,17 +61,21 @@ func newStringInterner(maxSize int, internerID int) *stringInterner {

func (i *stringInterner) prepareTelemetry() {
i.telemetry.resets = tlmSIResets.WithValues(i.id)
i.telemetry.size = tlmSIRSize.WithValues(i.id)
i.telemetry.bytes = tlmSIRBytes.WithValues(i.id)
i.telemetry.hits = tlmSIRHits.WithValues(i.id)
i.telemetry.miss = tlmSIRMiss.WithValues(i.id)
i.telemetry.size = cache.TlmSIRSize.WithValues(i.id)
i.telemetry.bytes = cache.TlmSIRBytes.WithValues(i.id)
i.telemetry.hits = cache.TlmSIRHits.WithValues(i.id)
i.telemetry.miss = cache.TlmSIRMiss.WithValues(i.id)
}

func (i *stringInterner) LoadOrStore(b []byte, _ string, _ cache.InternRetainer) string {
return i.loadOrStore(b)
}

// LoadOrStore always returns the string from the cache, adding it into the
// cache if needed.
// If we need to store a new entry and the cache is at its maximum capacity,
// it is reset.
func (i *stringInterner) LoadOrStore(key []byte) string {
func (i *stringInterner) loadOrStore(key []byte) string {
// here is the string interner trick: the map lookup using
// string(key) doesn't actually allocate a string, but is
// returning the string value -> no new heap allocation
Expand Down Expand Up @@ -114,7 +105,7 @@ func (i *stringInterner) LoadOrStore(key []byte) string {
i.telemetry.miss.Inc()
i.telemetry.size.Inc()
i.telemetry.bytes.Add(float64(len(s)))
tlmSIRStrBytes.Observe(float64(len(s)))
cache.TlmSIRStrBytes.Observe(float64(len(s)))
i.telemetry.curBytes += len(s)
}

Expand Down
52 changes: 26 additions & 26 deletions comp/dogstatsd/server/intern_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

func BenchmarkLoadOrStoreReset(b *testing.B) {
sInterner := newStringInterner(4, 1)
sInterner := newStringInterner(4, 1, false)

// benchmark with the internal telemetry enabled
sInterner.telemetry.enabled = true
Expand All @@ -26,13 +26,13 @@ func BenchmarkLoadOrStoreReset(b *testing.B) {

b.ResetTimer()
for i := 0; i < b.N; i++ {
sInterner.LoadOrStore([]byte(list[i%len(list)]))
sInterner.loadOrStore([]byte(list[i%len(list)]))
}
}

func TestInternLoadOrStoreValue(t *testing.T) {
func TestInternloadOrStoreValue(t *testing.T) {
assert := assert.New(t)
sInterner := newStringInterner(3, 1)
sInterner := newStringInterner(3, 1, false)

foo := []byte("foo")
bar := []byte("bar")
Expand All @@ -41,59 +41,59 @@ func TestInternLoadOrStoreValue(t *testing.T) {

// first test that the good value is returned.

v := sInterner.LoadOrStore(foo)
v := sInterner.loadOrStore(foo)
assert.Equal("foo", v)
v = sInterner.LoadOrStore(bar)
v = sInterner.loadOrStore(bar)
assert.Equal("bar", v)
v = sInterner.LoadOrStore(far)
v = sInterner.loadOrStore(far)
assert.Equal("far", v)
v = sInterner.LoadOrStore(boo)
v = sInterner.loadOrStore(boo)
assert.Equal("boo", v)
}

func TestInternLoadOrStorePointer(t *testing.T) {
func TestInternloadOrStorePointer(t *testing.T) {
assert := assert.New(t)
sInterner := newStringInterner(4, 1)
sInterner := newStringInterner(4, 1, false)

foo := []byte("foo")
bar := []byte("bar")
boo := []byte("boo")

// first test that the good value is returned.

v := sInterner.LoadOrStore(foo)
v := sInterner.loadOrStore(foo)
assert.Equal("foo", v)
v2 := sInterner.LoadOrStore(foo)
v2 := sInterner.loadOrStore(foo)
assert.Equal(&v, &v2, "must point to the same address")
v2 = sInterner.LoadOrStore(bar)
v2 = sInterner.loadOrStore(bar)
assert.NotEqual(&v, &v2, "must point to a different address")
v3 := sInterner.LoadOrStore(bar)
v3 := sInterner.loadOrStore(bar)
assert.Equal(&v2, &v3, "must point to the same address")

v4 := sInterner.LoadOrStore(boo)
v4 := sInterner.loadOrStore(boo)
assert.NotEqual(&v, &v4, "must point to a different address")
assert.NotEqual(&v2, &v4, "must point to a different address")
assert.NotEqual(&v3, &v4, "must point to a different address")
}

func TestInternLoadOrStoreReset(t *testing.T) {
func TestInternloadOrStoreReset(t *testing.T) {
assert := assert.New(t)
sInterner := newStringInterner(4, 1)
sInterner := newStringInterner(4, 1, false)

// first test that the good value is returned.
sInterner.LoadOrStore([]byte("foo"))
sInterner.loadOrStore([]byte("foo"))
assert.Equal(1, len(sInterner.strings))
sInterner.LoadOrStore([]byte("bar"))
sInterner.LoadOrStore([]byte("bar"))
sInterner.loadOrStore([]byte("bar"))
sInterner.loadOrStore([]byte("bar"))
assert.Equal(2, len(sInterner.strings))
sInterner.LoadOrStore([]byte("boo"))
sInterner.loadOrStore([]byte("boo"))
assert.Equal(3, len(sInterner.strings))
sInterner.LoadOrStore([]byte("far"))
sInterner.LoadOrStore([]byte("far"))
sInterner.LoadOrStore([]byte("far"))
sInterner.loadOrStore([]byte("far"))
sInterner.loadOrStore([]byte("far"))
sInterner.loadOrStore([]byte("far"))
assert.Equal(4, len(sInterner.strings))
sInterner.LoadOrStore([]byte("val"))
sInterner.loadOrStore([]byte("val"))
assert.Equal(1, len(sInterner.strings))
sInterner.LoadOrStore([]byte("val"))
sInterner.loadOrStore([]byte("val"))
assert.Equal(1, len(sInterner.strings))
}
46 changes: 39 additions & 7 deletions comp/dogstatsd/server/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import (
"time"
"unsafe"

"github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/pkg/config/utils"
"github.com/DataDog/datadog-agent/pkg/util/cache"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

type messageType int
Expand All @@ -38,7 +41,7 @@ var (
// parser parses dogstatsd messages
// not safe for concurent use
type parser struct {
interner *stringInterner
interner cache.Interner
float64List *float64ListPool

// dsdOriginEnabled controls whether the server should honor the container id sent by the
Expand All @@ -50,12 +53,41 @@ type parser struct {
readTimestamps bool
}

func newParser(cfg config.Reader, float64List *float64ListPool, workerNum int) *parser {
func makeInterner(cfg config.Reader, workerNum int) cache.Interner {
useKeyedInterner := cfg.GetBool("dogstatsd_string_interner_enable_lru")
stringInternerCacheSize := cfg.GetInt("dogstatsd_string_interner_size")
enableMMap := cfg.GetBool("dogstatsd_string_interner_mmap_enable")

var interner cache.Interner
if useKeyedInterner {
kiConfig := cache.DefaultKeyedInternerConfig()
if enableMMap {
kiConfig.CloseOnRelease = !cfg.GetBool("dogstatsd_string_interner_mmap_preserve")
kiConfig.TmpPath = cfg.GetString("dogstatsd_string_interner_tmpdir")
kiConfig.MinFileSize = int64(cfg.GetInt("dogstatsd_string_interner_mmap_minsizekb")) * 1024
kiConfig.MaxPerInterner = cfg.GetInt("dogstatsd_string_interner_per_origin_initial_size")
kiConfig.EnableDiagnostics = cfg.GetBool("dogstatsd_string_interner_diagnostics")
kiConfig.GrowthFactor = cfg.GetFloat64("dogstatsd_string_interner_growth_exp")
if kiConfig.GrowthFactor < 1.0 || kiConfig.GrowthFactor > 4 {
resetGrowthFactor := cache.DefaultKeyedInternerConfig().GrowthFactor
log.Warnf("Growth factor (dogstatsd_string_interner_growth_exp) %f out of bounds. Resetting to %f", kiConfig.GrowthFactor, resetGrowthFactor)
kiConfig.GrowthFactor = resetGrowthFactor
}
}
interner = cache.NewKeyedStringInterner(stringInternerCacheSize, kiConfig)
interner.(*cache.KeyedInterner).SetTelemetry(utils.IsTelemetryEnabled())
} else {
interner = newStringInterner(stringInternerCacheSize, workerNum, utils.IsTelemetryEnabled())
}

return interner
}

func newParser(cfg config.Reader, float64List *float64ListPool, workerNum int) *parser {
readTimestamps := cfg.GetBool("dogstatsd_no_aggregation_pipeline")

return &parser{
interner: newStringInterner(stringInternerCacheSize, workerNum),
interner: makeInterner(cfg, workerNum),
readTimestamps: readTimestamps,
float64List: float64List,
dsdOriginEnabled: cfg.GetBool("dogstatsd_origin_detection_client"),
Expand Down Expand Up @@ -97,11 +129,11 @@ func (p *parser) parseTags(rawTags []byte) []string {
if tagPos < 0 {
break
}
tagsList[i] = p.interner.LoadOrStore(rawTags[:tagPos])
tagsList[i] = p.interner.LoadOrStore(rawTags[:tagPos], "", nil)
rawTags = rawTags[tagPos+len(commaSeparator):]
i++
}
tagsList[i] = p.interner.LoadOrStore(rawTags)
tagsList[i] = p.interner.LoadOrStore(rawTags, "", nil)
return tagsList
}

Expand Down Expand Up @@ -188,7 +220,7 @@ func (p *parser) parseMetricSample(message []byte) (dogstatsdMetricSample, error
}

return dogstatsdMetricSample{
name: p.interner.LoadOrStore(name),
name: p.interner.LoadOrStore(name, "", nil),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If LoadOrStore() is always called with , "", nil) is probably worth adding an helper that doesn't require those arguments

Copy link
Author

@lallydd lallydd Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The third PR in the series has the plumbing - that includes useful values for all 3 args. Use the lally/exp-mem-metrefs branch for reference: https://github.com/DataDog/datadog-agent/blob/lally/exp-mem-metrefs/comp/dogstatsd/server/parse.go#L190

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think i'd be better to only add it when we need it no ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we could start passing in the origin IDs now (I'll have to start integrating the plumbing for this bit) so that we get per-origin (e.g. per-container) tracking now with this PR. How does that sound?

value: value,
values: values,
setValue: string(setValue),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ require (
github.com/DataDog/datadog-agent/pkg/proto v0.50.0-rc.4
github.com/DataDog/datadog-agent/pkg/status/health v0.50.0-rc.4
github.com/DataDog/datadog-agent/pkg/tagset v0.50.0-rc.4
github.com/DataDog/datadog-agent/pkg/telemetry v0.50.0-rc.4
github.com/DataDog/datadog-agent/pkg/telemetry v0.51.0-devel
github.com/DataDog/datadog-agent/pkg/util/backoff v0.50.0-rc.4
github.com/DataDog/datadog-agent/pkg/util/cache v0.50.0-rc.4
github.com/DataDog/datadog-agent/pkg/util/common v0.50.0-rc.4
Expand Down
19 changes: 18 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"strings"
"time"

"gopkg.in/yaml.v2"
yaml "gopkg.in/yaml.v2"

"github.com/DataDog/datadog-agent/comp/core/secrets"
"github.com/DataDog/datadog-agent/pkg/collector/check/defaults"
Expand Down Expand Up @@ -526,6 +526,23 @@ func InitConfig(config Config) {
config.BindEnvAndSetDefault("dogstatsd_tags", []string{})
config.BindEnvAndSetDefault("dogstatsd_mapper_cache_size", 1000)
config.BindEnvAndSetDefault("dogstatsd_string_interner_size", 4096)
// Whether to unmap old interner pages. Generally you do unless debugging something.
config.BindEnvAndSetDefault("dogstatsd_string_interner_mmap_preserve", false)
// Enable diagnostic checks and logging in the interner.
config.BindEnvAndSetDefault("dogstatsd_string_interner_diagnostics", false)
// Directory to hold interner temporary files.
config.BindEnvAndSetDefault("dogstatsd_string_interner_tmpdir", os.TempDir())
// Whether to intern strings within mmap'd temporary files.
config.BindEnvAndSetDefault("dogstatsd_string_interner_mmap_enable", false)
// Minimum size (per origin) for a string interner before using mmap'd temporary flies.
config.BindEnvAndSetDefault("dogstatsd_string_interner_mmap_minsizekb", 65536)
// Initial size used for LRU string cache.
config.BindEnvAndSetDefault("dogstatsd_string_interner_per_origin_initial_size", 64)
// Use multi-level LRU interner, or use old map-based interner
config.BindEnvAndSetDefault("dogstatsd_string_interner_enable_lru", false)
// Growth exponent when resizing an mmap region
config.BindEnvAndSetDefault("dogstatsd_string_interner_growth_exp", 1.5)

// Enable check for Entity-ID presence when enriching Dogstatsd metrics with tags
config.BindEnvAndSetDefault("dogstatsd_entity_id_precedence", false)
// Sends Dogstatsd parse errors to the Debug level instead of the Error level
Expand Down
Loading