Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Route agent crash detection through agent telemetry component #32515

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package agentcrashdetectimpl

import (
"context"
"encoding/json"
"fmt"
"strings"

Expand All @@ -18,15 +19,13 @@ import (
yaml "gopkg.in/yaml.v2"

"github.com/DataDog/datadog-agent/comp/checks/agentcrashdetect"
agenttelemetry "github.com/DataDog/datadog-agent/comp/core/agenttelemetry/def"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
compsysconfig "github.com/DataDog/datadog-agent/comp/core/sysprobeconfig"
comptraceconfig "github.com/DataDog/datadog-agent/comp/trace/config"
"github.com/DataDog/datadog-agent/pkg/aggregator/sender"
"github.com/DataDog/datadog-agent/pkg/collector/check"
core "github.com/DataDog/datadog-agent/pkg/collector/corechecks"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/system/wincrashdetect/probe"
"github.com/DataDog/datadog-agent/pkg/internaltelemetry"
traceconfig "github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/util/crashreport"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"github.com/DataDog/datadog-agent/pkg/util/log"
Expand Down Expand Up @@ -60,6 +59,13 @@ var (
baseKey = `SOFTWARE\Datadog\Datadog Agent\agent_crash_reporting`
)

// AgentBSOD for Agent Telemetry reporting
type AgentBSOD struct {
Date string `json:"date"`
Offender string `json:"offender"`
BugCheck string `json:"bugcheck"`
}

// Module defines the fx options for this component.
func Module() fxutil.Module {
return fxutil.Component(
Expand All @@ -79,19 +85,19 @@ type AgentCrashDetect struct {
instance *WinCrashConfig
reporter *crashreport.WinCrashReporter
crashDetectionEnabled bool
tconfig *traceconfig.AgentConfig
probeconfig compsysconfig.Component
atel agenttelemetry.Component
}

type agentCrashComponent struct {
tconfig *traceconfig.AgentConfig
}

type dependencies struct {
fx.In

TConfig comptraceconfig.Component
SConfig compsysconfig.Component
Config compsysconfig.Component
Atel agenttelemetry.Component

Lifecycle fx.Lifecycle
}

Expand Down Expand Up @@ -168,33 +174,37 @@ func (wcd *AgentCrashDetect) Run() error {
}

log.Infof("Sending crash: %v", formatText(crash))
lts := internaltelemetry.NewClient(wcd.tconfig.NewHTTPClient(), toTelemEndpoints(wcd.tconfig.TelemetryConfig.Endpoints), "ddnpm", true)
lts.SendLog("WARN", formatText(crash))
return nil
}

func toTelemEndpoints(endpoints []*traceconfig.Endpoint) []*internaltelemetry.Endpoint {
telemEndpoints := make([]*internaltelemetry.Endpoint, 0, len(endpoints))
for _, e := range endpoints {
telemEndpoints = append(telemEndpoints, &internaltelemetry.Endpoint{
Host: e.Host,
APIKey: e.APIKey,
})
bsod := AgentBSOD{
Date: crash.DateString,
Offender: crash.Offender,
BugCheck: crash.BugCheck,
}
var bsodPayload []byte
bsodPayload, err = json.Marshal(bsod)
if err != nil {
return err
}

// "agentbsod" is payload type registered with the Agent Telemetry component
err = wcd.atel.SendEvent("agentbsod", bsodPayload)
if err != nil {
return err
}
return telemEndpoints

return nil
}

func newAgentCrashComponent(deps dependencies) agentcrashdetect.Component {
instance := &agentCrashComponent{}
instance.tconfig = deps.TConfig.Object()
deps.Lifecycle.Append(fx.Hook{
OnStart: func(_ context.Context) error {
core.RegisterCheck(CheckName, option.New(func() check.Check {
checkInstance := &AgentCrashDetect{
CheckBase: core.NewCheckBase(CheckName),
instance: &WinCrashConfig{},
tconfig: instance.tconfig,
probeconfig: deps.SConfig,
probeconfig: deps.Config,
atel: deps.Atel,
}
return checkInstance
}))
Expand Down
4 changes: 2 additions & 2 deletions comp/checks/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"go.uber.org/fx"

"github.com/DataDog/datadog-agent/comp/core"
agenttelemetryfx "github.com/DataDog/datadog-agent/comp/core/agenttelemetry/fx"
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
"github.com/DataDog/datadog-agent/comp/core/tagger/mock"
comptraceconfig "github.com/DataDog/datadog-agent/comp/trace/config"
"github.com/DataDog/datadog-agent/pkg/util/crashreport"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
)
Expand All @@ -24,10 +24,10 @@ func TestBundleDependencies(t *testing.T) {
fakeTagger := mock.SetupFakeTagger(t)

fxutil.TestBundle(t, Bundle(),
comptraceconfig.Module(),
core.MockBundle(),
fx.Provide(func() tagger.Component { return fakeTagger }),
fx.Supply(core.BundleParams{}),
agenttelemetryfx.Module(),
fx.Supply(crashreport.WinCrashReporter{}),
)
}
5 changes: 5 additions & 0 deletions comp/core/agenttelemetry/def/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,9 @@ package agenttelemetry
type Component interface {
// GetAsJSON returns the payload as a JSON string. Useful to be displayed in the CLI or added to a flare.
GetAsJSON() ([]byte, error)

// Sends event payload.
// payloadType - should be registered in datadog-agent\comp\core\agenttelemetry\impl\config.go
// payload - de-serializable into JSON
SendEvent(eventType string, eventPayload []byte) error
}
33 changes: 33 additions & 0 deletions comp/core/agenttelemetry/impl/agenttelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,39 @@ func (a *atel) GetAsJSON() ([]byte, error) {
return prettyPayload.Bytes(), nil
}

func (a *atel) SendEvent(eventType string, eventPayload []byte) error {
// Check if the telemetry is enabled
if !a.enabled {
return errors.New("agent telemetry is not enabled")
}

// Check if the payload type is registered
eventInfo, ok := a.atelCfg.events[eventType]
if !ok {
a.logComp.Errorf("Payload type `%s` has to be registered to be sent", eventType)
return fmt.Errorf("Payload type `%s` is not registered", eventType)
}

// Convert payload to JSON
var eventPayloadJSON map[string]interface{}
err := json.Unmarshal(eventPayload, &eventPayloadJSON)
if err != nil {
a.logComp.Errorf("Failed to unmarshal payload: %s", err)
return fmt.Errorf("failed to unmarshal payload: %w", err)
}

// Send the payload
ss := a.sender.startSession(a.cancelCtx)
a.sender.sendEventPayload(ss, eventInfo, eventPayloadJSON)
err = a.sender.flushSession(ss)
if err != nil {
a.logComp.Errorf("failed to flush sent payload: %w", err)
return err
}

return nil
}

// start is called by FX when the application starts.
func (a *atel) start() error {
a.logComp.Infof("Starting agent telemetry for %d schedules and %d profiles", len(a.atelCfg.schedule), len(a.atelCfg.Profiles))
Expand Down
193 changes: 193 additions & 0 deletions comp/core/agenttelemetry/impl/agenttelemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/DataDog/datadog-agent/comp/core/telemetry"
"github.com/DataDog/datadog-agent/comp/core/telemetry/telemetryimpl"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"github.com/DataDog/datadog-agent/pkg/util/jsonquery"
"github.com/DataDog/zstd"
)

Expand Down Expand Up @@ -62,6 +63,8 @@ func (s *senderMock) flushSession(_ *senderSession) error {
func (s *senderMock) sendAgentMetricPayloads(_ *senderSession, metrics []*agentmetric) {
s.sentMetrics = append(s.sentMetrics, metrics...)
}
func (s *senderMock) sendEventPayload(_ *senderSession, _ *Event, _ map[string]interface{}) {
}

// Runner mock (TODO: use use mock.Mock)
type runnerMock struct {
Expand Down Expand Up @@ -1967,3 +1970,193 @@ func TestUsingPayloadCompressionInAgentTelemetrySender(t *testing.T) {
nonCompressBodyLen := len(cl2.(*clientMock).body)
assert.True(t, float64(nonCompressBodyLen)/float64(compressBodyLen) > 1.5)
}

func TestAgentTelemetryParseDefaultConfiguration(t *testing.T) {
c := defaultProfiles
o := convertYamlStrToMap(t, c)
cfg := makeCfgMock(t, o)
atCfg, err := parseConfig(cfg)

require.NoError(t, err)

assert.True(t, len(atCfg.events) > 0)
assert.True(t, len(atCfg.schedule) > 0)
assert.True(t, len(atCfg.Profiles) > len(atCfg.events))
}

func TestAgentTelemetryEventConfiguration(t *testing.T) {
// Use nearly full
c := `
agent_telemetry:
enabled: true
profiles:
- name: checks
metric:
metrics:
- name: checks.execution_time
aggregate_tags:
- check_name
- name: pymem.inuse
schedule:
start_after: 123
iterations: 0
period: 456
- name: logs-and-metrics
metric:
exclude:
zero_metric: true
metrics:
- name: dogstatsd.udp_packets_bytes
- name: dogstatsd.uds_packets_bytes
schedule:
start_after: 30
iterations: 0
period: 900
- name: ondemand
events:
- name: agentbsod
request_type: agent-bsod
payload_key: agent_bsod
message: 'Agent BSOD'
- name: foobar
request_type: agent-foobar
payload_key: agent_foobar
message: 'Agent foobar'
- name: ondemand2
events:
- name: agentbsod
request_type: agent-bsod
payload_key: agent_bsod
message: 'Agent BSOD'
- name: barfoo
request_type: agent-barfoo
payload_key: agent_barfoo
message: 'Agent barfoo'
`

o := convertYamlStrToMap(t, c)
cfg := makeCfgMock(t, o)
atCfg, err := parseConfig(cfg)

require.NoError(t, err)

// single event map keeps unique event names
assert.Len(t, atCfg.events, 3)
assert.Len(t, atCfg.schedule, 2)
assert.Len(t, atCfg.Profiles, 4)
}

func TestAgentTelemetrySendRegisteredEvent(t *testing.T) {
// Use nearly full
var cfg = `
agent_telemetry:
enabled: true
use_compression: false
profiles:
- name: xxx
metric:
metrics:
- name: foo.bar
- name: ondemand
events:
- name: agentbsod
request_type: agent-bsod
payload_key: agent_bsod
message: 'Agent BSOD'
- name: foobar
request_type: agent-foobar
payload_key: agent_foobar
message: 'Agent foobar'
`

payloadObj := struct {
Date string `json:"date"`
Offender string `json:"offender"`
BugCheck string `json:"bugcheck"`
}{
Date: "2024-30-02 17:31:12",
Offender: "ddnpm+0x1a3",
BugCheck: "0x7A",
}
// conert to json
payload, err := json.Marshal(payloadObj)
require.NoError(t, err)

// setup and initiate atel
o := convertYamlStrToMap(t, cfg)
cl := newClientMock()
s := makeSenderImpl(t, cl, cfg)
r := newRunnerMock()
a := getTestAtel(t, nil, o, s, cl, r)
require.True(t, a.enabled)

a.start()
err = a.SendEvent("agentbsod", payload)
require.NoError(t, err)
assert.True(t, len(cl.(*clientMock).body) > 0)

//deserialize the payload of cl.(*clientMock).body
var topPayload map[string]interface{}
err = json.Unmarshal(cl.(*clientMock).body, &topPayload)
require.NoError(t, err)
fmt.Print(string(cl.(*clientMock).body))

v, ok, err2 := jsonquery.RunSingleOutput(".payload.message", topPayload)
require.NoError(t, err2)
require.True(t, ok)
assert.Equal(t, "Agent BSOD", v)

v, ok, err2 = jsonquery.RunSingleOutput(".payload.agent_bsod.offender", topPayload)
require.NoError(t, err2)
require.True(t, ok)
assert.Equal(t, "ddnpm+0x1a3", v)
}

func TestAgentTelemetrySendNonRegisteredEvent(t *testing.T) {
// Use nearly full
var cfg = `
agent_telemetry:
enabled: true
use_compression: false
profiles:
- name: xxx
metric:
metrics:
- name: foo.bar
- name: ondemand
events:
- name: agentbsod
request_type: agent-bsod
payload_key: agentbsod
message: 'Agent BSOD'
- name: foobar
request_type: agent-foobar
payload_key: agentfoobar
message: 'Agent foobar'
`

payloadObj := struct {
Date string `json:"date"`
Offender string `json:"offender"`
BugCheck string `json:"bugcheck"`
}{
Date: "2024-30-02 17:31:12",
Offender: "ddnpm+0x1a3",
BugCheck: "0x7A",
}
// conert to json
payload, err := json.Marshal(payloadObj)
require.NoError(t, err)

// setup and initiate atel
o := convertYamlStrToMap(t, cfg)
cl := newClientMock()
s := makeSenderImpl(t, cl, cfg)
r := newRunnerMock()
a := getTestAtel(t, nil, o, s, cl, r)
require.True(t, a.enabled)

a.start()
err = a.SendEvent("agentbsod2", payload)
require.Error(t, err)
}
Loading
Loading