Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge extended reference handling back into primary experimental branch. #20585

Closed
wants to merge 14 commits into from
4 changes: 2 additions & 2 deletions Dockerfiles/agent/entrypoint.d/agent
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
set -euo pipefail

"Running entrypoint.d/agent"
#exec /root/go/bin/dlv --listen=:2345 --headless=true --log=true --log-output=debugger,debuglineerr,gdbwire,lldbout,rpc --accept-multiclient --api-version=2 exec /opt/datadog-agent/bin/agent/agent -- run
/opt/datadog-agent/bin/agent run
exec /root/go/bin/dlv --listen=:2345 --headless=true --log=true --log-output=debugger,debuglineerr,gdbwire,lldbout,rpc --accept-multiclient --api-version=2 exec /opt/datadog-agent/bin/agent/agent -- run
#/opt/datadog-agent/bin/agent run
1 change: 0 additions & 1 deletion cmd/agent/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,6 @@ func getSharedFxOption() fx.Option {
params.Options.EnabledFeatures = pkgforwarder.SetFeature(params.Options.EnabledFeatures, pkgforwarder.CoreFeatures)
return params
}),
fx.Provide(cache.NewKeyedStringInterner),
dogstatsd.Bundle,
otelcol.Bundle,
rcclient.Module,
Expand Down
2 changes: 1 addition & 1 deletion cmd/cluster-agent-cloudfoundry/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func run(log log.Component, config config.Component, forwarder defaultforwarder.
opts := aggregator.DefaultAgentDemultiplexerOptions()
opts.UseEventPlatformForwarder = false
opts.UseOrchestratorForwarder = false
demux := aggregator.InitAndStartAgentDemultiplexer(log, forwarder, opts, hname)
demux := aggregator.InitAndStartAgentDemultiplexer(log, forwarder, opts, hname, nil)
demux.AddAgentStartupTelemetry(fmt.Sprintf("%s - Datadog Cluster Agent", version.AgentVersion))

pkglog.Infof("Datadog Cluster Agent is now running.")
Expand Down
2 changes: 1 addition & 1 deletion cmd/cluster-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func start(log log.Component, config config.Component, telemetry telemetry.Compo
// Serving stale data is better than serving no data at all.
opts := aggregator.DefaultAgentDemultiplexerOptions()
opts.UseEventPlatformForwarder = false
demux := aggregator.InitAndStartAgentDemultiplexer(log, forwarder, opts, hname)
demux := aggregator.InitAndStartAgentDemultiplexer(log, forwarder, opts, hname, nil)
demux.AddAgentStartupTelemetry(fmt.Sprintf("%s - Datadog Cluster Agent", version.AgentVersion))

le, err := leaderelection.GetLeaderEngine()
Expand Down
2 changes: 1 addition & 1 deletion cmd/dogstatsd/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func RunAgent(ctx context.Context, cliParams *CLIParams, config config.Component
hname = ""
}
log.Debugf("Using hostname: %s", hname)
demux := aggregator.InitAndStartAgentDemultiplexer(log, forwarder, opts, hname)
demux := aggregator.InitAndStartAgentDemultiplexer(log, forwarder, opts, hname, nil)
demux.AddAgentStartupTelemetry(version.AgentVersion)

// setup the metadata collector
Expand Down
2 changes: 1 addition & 1 deletion cmd/otel-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,5 @@ func newDemultiplexer(logcomp corelog.Component, cfg config.Component, fwd defau
opts.EnableNoAggregationPipeline = cfg.GetBool("dogstatsd_no_aggregation_pipeline")
opts.UseDogstatsdContextLimiter = true
opts.DogstatsdMaxMetricsTags = cfg.GetInt("dogstatsd_max_metrics_tags")
return aggregator.InitAndStartAgentDemultiplexer(logcomp, fwd, opts, host)
return aggregator.InitAndStartAgentDemultiplexer(logcomp, fwd, opts, host, nil)
}
2 changes: 1 addition & 1 deletion cmd/security-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func RunAgent(ctx context.Context, log log.Component, config config.Component, s
opts := aggregator.DefaultAgentDemultiplexerOptions()
opts.UseEventPlatformForwarder = false
opts.UseOrchestratorForwarder = false
demux := aggregator.InitAndStartAgentDemultiplexer(log, forwarder, opts, hostnameDetected)
demux := aggregator.InitAndStartAgentDemultiplexer(log, forwarder, opts, hostnameDetected, nil)
demux.AddAgentStartupTelemetry(fmt.Sprintf("%s - Datadog Security Agent", version.AgentVersion))

stopper = startstop.NewSerialStopper()
Expand Down
3 changes: 2 additions & 1 deletion comp/aggregator/demultiplexer/demultiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func newDemultiplexer(deps dependencies) (Component, error) {
deps.Log,
deps.SharedForwarder,
deps.Params.Options,
hostnameDetected)
hostnameDetected,
nil)

return demux, nil
}
3 changes: 3 additions & 0 deletions comp/dogstatsd/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/DataDog/datadog-agent/comp/dogstatsd/replay"
"github.com/DataDog/datadog-agent/comp/dogstatsd/server"
"github.com/DataDog/datadog-agent/comp/dogstatsd/serverDebug"
"github.com/DataDog/datadog-agent/pkg/util/cache"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
)

Expand All @@ -19,11 +20,13 @@ var Bundle = fxutil.Bundle(
serverDebug.Module,
replay.Module,
server.Module,
cache.Module,
)

// MockBundle defines the mock fx options for this bundle.
var MockBundle = fxutil.Bundle(
serverDebug.MockModule,
server.MockModule,
replay.Module,
cache.Module,
)
27 changes: 6 additions & 21 deletions comp/dogstatsd/server/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ type batcher struct {
noAggPipelineEnabled bool

// Retain intern'd objects long enough to serialize them back out.
retentions map[cache.Refcounted]int32
cache.InternRetainer
retentions *cache.RetainerBlock
}

// Use fastrange instead of a modulo for better performance.
Expand Down Expand Up @@ -113,7 +112,7 @@ func newBatcher(demux aggregator.DemultiplexerWithAggregator) *batcher {
keyGenerator: ckey.NewKeyGenerator(),

noAggPipelineEnabled: demux.Options().EnableNoAggregationPipeline,
retentions: make(map[cache.Refcounted]int32),
retentions: cache.NewRetainerBlock(),
}
}

Expand Down Expand Up @@ -183,6 +182,7 @@ func (b *batcher) appendLateSample(sample metrics.MetricSample) {
return
}

// TODO: Fix reference work here.
if b.samplesWithTsCount == len(b.samplesWithTs) {
b.flushSamplesWithTs()
}
Expand Down Expand Up @@ -222,7 +222,7 @@ func (b *batcher) flushSamplesWithTs() {
// the demux for processing. Do this last so our worst mistake is to
// retain things too long instead of long enough.
func (b *batcher) forwardRetentions() {
b.demux.TakeRetentions(b, "batcher")
b.demux.TakeRetentions(b.Retainer(), "batcher")
}

// flush pushes all batched metrics to the aggregator.
Expand Down Expand Up @@ -258,21 +258,6 @@ func (b *batcher) flush() {
b.forwardRetentions()
}

// Interner Retention
// -------- ---------

func (b *batcher) Reference(obj cache.Refcounted) {
b.retentions[obj] += 1
}
func (b *batcher) ReleaseAllWith(callback func(obj cache.Refcounted, count int32)) {
for k, v := range b.retentions {
callback(k, v)
delete(b.retentions, k)
}
}

func (b *batcher) ReleaseAll() {
b.ReleaseAllWith(func(obj cache.Refcounted, count int32) {
obj.Release(count)
})
func (b *batcher) Retainer() cache.InternRetainer {
return b.retentions
}
7 changes: 5 additions & 2 deletions comp/dogstatsd/server/convert_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package server

import (
"fmt"
"github.com/DataDog/datadog-agent/pkg/util/cache"
"testing"

"github.com/DataDog/datadog-agent/comp/core/config"
Expand All @@ -33,7 +34,9 @@ var (

func runParseMetricBenchmark(b *testing.B, multipleValues bool) {
cfg := fxutil.Test[config.Component](b, config.MockModule)
parser := newParser(cfg, newFloat64ListPool())
interner := cache.NewKeyedStringInternerMemOnly(16384)
parser := newParser(cfg, newFloat64ListPool(), interner)
retainerCtx := cache.NewInternerContext(interner, "", &cache.SmallRetainer{})

conf := enrichConfig{
defaultHostname: "default-hostname",
Expand All @@ -48,7 +51,7 @@ func runParseMetricBenchmark(b *testing.B, multipleValues bool) {

for n := 0; n < sb.N; n++ {

parsed, err := parser.parseMetricSample(rawSample)
parsed, err := parser.parseMetricSample(rawSample, retainerCtx)
if err != nil {
continue
}
Expand Down
41 changes: 27 additions & 14 deletions comp/dogstatsd/server/enrich_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package server

import (
"fmt"
"github.com/DataDog/datadog-agent/pkg/util/cache"
"testing"

"github.com/DataDog/datadog-agent/comp/core/config"
Expand All @@ -31,10 +32,16 @@ var (
}
)

func internerAndContext() (*cache.KeyedInterner, cache.InternerContext) {
interner := cache.NewKeyedStringInternerMemOnly(16384)
return interner, cache.NewInternerContext(interner, "", &cache.SmallRetainer{})
}

func parseAndEnrichSingleMetricMessage(t *testing.T, message []byte, conf enrichConfig) (metrics.MetricSample, error) {
cfg := fxutil.Test[config.Component](t, config.MockModule)
parser := newParser(cfg, newFloat64ListPool())
parsed, err := parser.parseMetricSample(message)
in, ctx := internerAndContext()
parser := newParser(cfg, newFloat64ListPool(), in)
parsed, err := parser.parseMetricSample(message, ctx)
if err != nil {
return metrics.MetricSample{}, err
}
Expand All @@ -49,8 +56,9 @@ func parseAndEnrichSingleMetricMessage(t *testing.T, message []byte, conf enrich

func parseAndEnrichMultipleMetricMessage(t *testing.T, message []byte, conf enrichConfig) ([]metrics.MetricSample, error) {
cfg := fxutil.Test[config.Component](t, config.MockModule)
parser := newParser(cfg, newFloat64ListPool())
parsed, err := parser.parseMetricSample(message)
in, ctx := internerAndContext()
parser := newParser(cfg, newFloat64ListPool(), in)
parsed, err := parser.parseMetricSample(message, ctx)
if err != nil {
return []metrics.MetricSample{}, err
}
Expand All @@ -61,8 +69,9 @@ func parseAndEnrichMultipleMetricMessage(t *testing.T, message []byte, conf enri

func parseAndEnrichServiceCheckMessage(t *testing.T, message []byte, conf enrichConfig) (*servicecheck.ServiceCheck, error) {
cfg := fxutil.Test[config.Component](t, config.MockModule)
parser := newParser(cfg, newFloat64ListPool())
parsed, err := parser.parseServiceCheck(message)
in, ctx := internerAndContext()
parser := newParser(cfg, newFloat64ListPool(), in)
parsed, err := parser.parseServiceCheck(message, ctx)
if err != nil {
return nil, err
}
Expand All @@ -71,8 +80,9 @@ func parseAndEnrichServiceCheckMessage(t *testing.T, message []byte, conf enrich

func parseAndEnrichEventMessage(t *testing.T, message []byte, conf enrichConfig) (*event.Event, error) {
cfg := fxutil.Test[config.Component](t, config.MockModule)
parser := newParser(cfg, newFloat64ListPool())
parsed, err := parser.parseEvent(message)
in, ctx := internerAndContext()
parser := newParser(cfg, newFloat64ListPool(), in)
parsed, err := parser.parseEvent(message, ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -959,8 +969,9 @@ func TestMetricBlocklistShouldBlock(t *testing.T) {
}

cfg := fxutil.Test[config.Component](t, config.MockModule)
parser := newParser(cfg, newFloat64ListPool())
parsed, err := parser.parseMetricSample(message)
in, ctx := internerAndContext()
parser := newParser(cfg, newFloat64ListPool(), in)
parsed, err := parser.parseMetricSample(message, ctx)
assert.NoError(t, err)
samples := []metrics.MetricSample{}
samples = enrichMetricSample(samples, parsed, "", conf)
Expand All @@ -976,8 +987,9 @@ func TestServerlessModeShouldSetEmptyHostname(t *testing.T) {

message := []byte("custom.metric.a:21|ms")
cfg := fxutil.Test[config.Component](t, config.MockModule)
parser := newParser(cfg, newFloat64ListPool())
parsed, err := parser.parseMetricSample(message)
in, ctx := internerAndContext()
parser := newParser(cfg, newFloat64ListPool(), in)
parsed, err := parser.parseMetricSample(message, ctx)
assert.NoError(t, err)
samples := []metrics.MetricSample{}
samples = enrichMetricSample(samples, parsed, "", conf)
Expand All @@ -996,8 +1008,9 @@ func TestMetricBlocklistShouldNotBlock(t *testing.T) {
defaultHostname: "default",
}
cfg := fxutil.Test[config.Component](t, config.MockModule)
parser := newParser(cfg, newFloat64ListPool())
parsed, err := parser.parseMetricSample(message)
in, ctx := internerAndContext()
parser := newParser(cfg, newFloat64ListPool(), in)
parsed, err := parser.parseMetricSample(message, ctx)
assert.NoError(t, err)
samples := []metrics.MetricSample{}
samples = enrichMetricSample(samples, parsed, "", conf)
Expand Down
7 changes: 5 additions & 2 deletions comp/dogstatsd/server/parse_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package server

import (
"github.com/DataDog/datadog-agent/pkg/util/cache"
"testing"

"github.com/DataDog/datadog-agent/comp/core/config"
Expand All @@ -16,8 +17,10 @@ import (

func parseEvent(t *testing.T, rawEvent []byte) (dogstatsdEvent, error) {
cfg := fxutil.Test[config.Component](t, config.MockModule)
parser := newParser(cfg, newFloat64ListPool())
return parser.parseEvent(rawEvent)
kint := cache.NewKeyedStringInternerForTest()

parser := newParser(cfg, newFloat64ListPool(), kint)
return parser.parseEvent(rawEvent, cache.NewInternerContext(kint, "", &cache.SmallRetainer{}))
}

func TestEventMinimal(t *testing.T) {
Expand Down
7 changes: 5 additions & 2 deletions comp/dogstatsd/server/parse_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package server

import (
"github.com/DataDog/datadog-agent/pkg/util/cache"
"testing"
"time"

Expand All @@ -21,8 +22,10 @@ func parseMetricSample(t *testing.T, overrides map[string]any, rawSample []byte)
config.MockModule,
fx.Replace(config.MockParams{Overrides: overrides}),
))
parser := newParser(cfg, newFloat64ListPool())
return parser.parseMetricSample(rawSample)
kint := cache.NewKeyedStringInternerForTest()

parser := newParser(cfg, newFloat64ListPool(), kint)
return parser.parseMetricSample(rawSample, cache.NewInternerContext(kint, "", &cache.SmallRetainer{}))
}

const epsilon = 0.00001
Expand Down
6 changes: 4 additions & 2 deletions comp/dogstatsd/server/parse_service_checks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package server

import (
"github.com/DataDog/datadog-agent/pkg/util/cache"
"testing"

"github.com/DataDog/datadog-agent/comp/core/config"
Expand All @@ -16,8 +17,9 @@ import (

func parseServiceCheck(t *testing.T, rawServiceCheck []byte) (dogstatsdServiceCheck, error) {
cfg := fxutil.Test[config.Component](t, config.MockModule)
parser := newParser(cfg, newFloat64ListPool())
return parser.parseServiceCheck(rawServiceCheck)
kint := cache.NewKeyedStringInternerForTest()
parser := newParser(cfg, newFloat64ListPool(), kint)
return parser.parseServiceCheck(rawServiceCheck, cache.NewInternerContext(kint, "", &cache.SmallRetainer{}))
}

func TestServiceCheckMinimal(t *testing.T) {
Expand Down
14 changes: 9 additions & 5 deletions comp/dogstatsd/server/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package server

import (
"github.com/DataDog/datadog-agent/pkg/util/cache"
"strconv"
"testing"

Expand Down Expand Up @@ -40,18 +41,20 @@ func TestIdentifyRandomString(t *testing.T) {

func TestParseTags(t *testing.T) {
cfg := fxutil.Test[config.Component](t, config.MockModule)
p := newParser(cfg, newFloat64ListPool())
kint := cache.NewKeyedStringInternerForTest()
p := newParser(cfg, newFloat64ListPool(), kint)
rawTags := []byte("tag:test,mytag,good:boy")
tags := p.parseTags(rawTags)
tags := p.parseTags(rawTags, cache.NewInternerContext(kint, "", &cache.SmallRetainer{}))
expectedTags := []string{"tag:test", "mytag", "good:boy"}
assert.ElementsMatch(t, expectedTags, tags)
}

func TestParseTagsEmpty(t *testing.T) {
cfg := fxutil.Test[config.Component](t, config.MockModule)
p := newParser(cfg, newFloat64ListPool())
kint := cache.NewKeyedStringInternerForTest()
p := newParser(cfg, newFloat64ListPool(), kint)
rawTags := []byte("")
tags := p.parseTags(rawTags)
tags := p.parseTags(rawTags, cache.NewInternerContext(kint, "", &cache.SmallRetainer{}))
assert.Nil(t, tags)
}

Expand All @@ -68,7 +71,8 @@ func TestUnsafeParseFloat(t *testing.T) {

func TestUnsafeParseFloatList(t *testing.T) {
cfg := fxutil.Test[config.Component](t, config.MockModule)
p := newParser(cfg, newFloat64ListPool())
kint := cache.NewKeyedStringInternerForTest()
p := newParser(cfg, newFloat64ListPool(), kint)
unsafeFloats, err := p.parseFloat64List([]byte("1.1234:21.5:13"))
assert.NoError(t, err)
assert.Len(t, unsafeFloats, 3)
Expand Down
Loading