Skip to content

Commit

Permalink
Route agent crash detection through agent telemetry component
Browse files Browse the repository at this point in the history
  • Loading branch information
iglendd committed Jan 2, 2025
1 parent 234771b commit 40bf0b9
Show file tree
Hide file tree
Showing 9 changed files with 444 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package agentcrashdetectimpl

import (
"context"
"encoding/json"
"fmt"
"strings"

Expand All @@ -18,15 +19,13 @@ import (
yaml "gopkg.in/yaml.v2"

"github.com/DataDog/datadog-agent/comp/checks/agentcrashdetect"
agenttelemetry "github.com/DataDog/datadog-agent/comp/core/agenttelemetry/def"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
compsysconfig "github.com/DataDog/datadog-agent/comp/core/sysprobeconfig"
comptraceconfig "github.com/DataDog/datadog-agent/comp/trace/config"
"github.com/DataDog/datadog-agent/pkg/aggregator/sender"
"github.com/DataDog/datadog-agent/pkg/collector/check"
core "github.com/DataDog/datadog-agent/pkg/collector/corechecks"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/system/wincrashdetect/probe"
"github.com/DataDog/datadog-agent/pkg/internaltelemetry"
traceconfig "github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/util/crashreport"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"github.com/DataDog/datadog-agent/pkg/util/log"
Expand Down Expand Up @@ -60,6 +59,13 @@ var (
baseKey = `SOFTWARE\Datadog\Datadog Agent\agent_crash_reporting`
)

// AgentBSOD for Agent Telemetry reporting
type AgentBSOD struct {
Date string `json:"date"`
Offender string `json:"offender"`
BugCheck string `json:"bugcheck"`
}

// Module defines the fx options for this component.
func Module() fxutil.Module {
return fxutil.Component(
Expand All @@ -79,19 +85,19 @@ type AgentCrashDetect struct {
instance *WinCrashConfig
reporter *crashreport.WinCrashReporter
crashDetectionEnabled bool
tconfig *traceconfig.AgentConfig
probeconfig compsysconfig.Component
atel agenttelemetry.Component
}

type agentCrashComponent struct {
tconfig *traceconfig.AgentConfig
}

type dependencies struct {
fx.In

TConfig comptraceconfig.Component
SConfig compsysconfig.Component
Config compsysconfig.Component
Atel agenttelemetry.Component

Lifecycle fx.Lifecycle
}

Expand Down Expand Up @@ -168,33 +174,37 @@ func (wcd *AgentCrashDetect) Run() error {
}

log.Infof("Sending crash: %v", formatText(crash))
lts := internaltelemetry.NewClient(wcd.tconfig.NewHTTPClient(), toTelemEndpoints(wcd.tconfig.TelemetryConfig.Endpoints), "ddnpm", true)
lts.SendLog("WARN", formatText(crash))
return nil
}

func toTelemEndpoints(endpoints []*traceconfig.Endpoint) []*internaltelemetry.Endpoint {
telemEndpoints := make([]*internaltelemetry.Endpoint, 0, len(endpoints))
for _, e := range endpoints {
telemEndpoints = append(telemEndpoints, &internaltelemetry.Endpoint{
Host: e.Host,
APIKey: e.APIKey,
})
bsod := AgentBSOD{
Date: crash.DateString,
Offender: crash.Offender,
BugCheck: crash.BugCheck,
}
var bsodPayload []byte
bsodPayload, err = json.Marshal(bsod)
if err != nil {
return err
}

// "agentbsod" is payload type registered with the Agent Telemetry component
err = wcd.atel.SendEvent("agentbsod", bsodPayload)
if err != nil {
return err
}
return telemEndpoints

return nil
}

func newAgentCrashComponent(deps dependencies) agentcrashdetect.Component {
instance := &agentCrashComponent{}
instance.tconfig = deps.TConfig.Object()
deps.Lifecycle.Append(fx.Hook{
OnStart: func(_ context.Context) error {
core.RegisterCheck(CheckName, optional.NewOption(func() check.Check {
checkInstance := &AgentCrashDetect{
CheckBase: core.NewCheckBase(CheckName),
instance: &WinCrashConfig{},
tconfig: instance.tconfig,
probeconfig: deps.SConfig,
probeconfig: deps.Config,
atel: deps.Atel,
}
return checkInstance
}))
Expand Down
4 changes: 2 additions & 2 deletions comp/checks/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"go.uber.org/fx"

"github.com/DataDog/datadog-agent/comp/core"
agenttelemetryfx "github.com/DataDog/datadog-agent/comp/core/agenttelemetry/fx"
tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
"github.com/DataDog/datadog-agent/comp/core/tagger/mock"
comptraceconfig "github.com/DataDog/datadog-agent/comp/trace/config"
"github.com/DataDog/datadog-agent/pkg/util/crashreport"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
)
Expand All @@ -24,10 +24,10 @@ func TestBundleDependencies(t *testing.T) {
fakeTagger := mock.SetupFakeTagger(t)

fxutil.TestBundle(t, Bundle(),
comptraceconfig.Module(),
core.MockBundle(),
fx.Provide(func() tagger.Component { return fakeTagger }),
fx.Supply(core.BundleParams{}),
agenttelemetryfx.Module(),
fx.Supply(crashreport.WinCrashReporter{}),
)
}
5 changes: 5 additions & 0 deletions comp/core/agenttelemetry/def/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,9 @@ package agenttelemetry
type Component interface {
// GetAsJSON returns the payload as a JSON string. Useful to be displayed in the CLI or added to a flare.
GetAsJSON() ([]byte, error)

// Sends event payload.
// payloadType - should be registered in datadog-agent\comp\core\agenttelemetry\impl\config.go
// payload - de-serializable into JSON
SendEvent(eventType string, eventPayload []byte) error
}
33 changes: 33 additions & 0 deletions comp/core/agenttelemetry/impl/agenttelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,39 @@ func (a *atel) GetAsJSON() ([]byte, error) {
return prettyPayload.Bytes(), nil
}

func (a *atel) SendEvent(eventType string, eventPayload []byte) error {
// Check if the telemetry is enabled
if !a.enabled {
return errors.New("agent telemetry is not enabled")
}

// Check if the payload type is registered
eventInfo, ok := a.atelCfg.events[eventType]
if !ok {
a.logComp.Errorf("Payload type `%s` has to be registered to be sent", eventType)
return fmt.Errorf("Payload type `%s` is not registered", eventType)
}

// Convert payload to JSON
var eventPayloadJSON map[string]interface{}
err := json.Unmarshal(eventPayload, &eventPayloadJSON)
if err != nil {
a.logComp.Errorf("Failed to unmarshal payload: %s", err)
return fmt.Errorf("failed to unmarshal payload: %w", err)
}

// Send the payload
ss := a.sender.startSession(a.cancelCtx)
a.sender.sendEventPayload(ss, eventInfo, eventPayloadJSON)
err = a.sender.flushSession(ss)
if err != nil {
a.logComp.Errorf("failed to flush sent payload: %w", err)
return err
}

return nil
}

// start is called by FX when the application starts.
func (a *atel) start() error {
a.logComp.Infof("Starting agent telemetry for %d schedules and %d profiles", len(a.atelCfg.schedule), len(a.atelCfg.Profiles))
Expand Down
193 changes: 193 additions & 0 deletions comp/core/agenttelemetry/impl/agenttelemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/DataDog/datadog-agent/comp/core/telemetry"
"github.com/DataDog/datadog-agent/comp/core/telemetry/telemetryimpl"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"github.com/DataDog/datadog-agent/pkg/util/jsonquery"
"github.com/DataDog/zstd"
)

Expand Down Expand Up @@ -62,6 +63,8 @@ func (s *senderMock) flushSession(_ *senderSession) error {
func (s *senderMock) sendAgentMetricPayloads(_ *senderSession, metrics []*agentmetric) {
s.sentMetrics = append(s.sentMetrics, metrics...)
}
func (s *senderMock) sendEventPayload(_ *senderSession, _ *Event, _ map[string]interface{}) {
}

// Runner mock (TODO: use use mock.Mock)
type runnerMock struct {
Expand Down Expand Up @@ -1967,3 +1970,193 @@ func TestUsingPayloadCompressionInAgentTelemetrySender(t *testing.T) {
nonCompressBodyLen := len(cl2.(*clientMock).body)
assert.True(t, float64(nonCompressBodyLen)/float64(compressBodyLen) > 1.5)
}

func TestAgentTelemetryParseDefaultConfiguration(t *testing.T) {
c := defaultProfiles
o := convertYamlStrToMap(t, c)
cfg := makeCfgMock(t, o)
atCfg, err := parseConfig(cfg)

require.NoError(t, err)

assert.True(t, len(atCfg.events) > 0)
assert.True(t, len(atCfg.schedule) > 0)
assert.True(t, len(atCfg.Profiles) > len(atCfg.events))
}

func TestAgentTelemetryEventConfiguration(t *testing.T) {
// Use nearly full
c := `
agent_telemetry:
enabled: true
profiles:
- name: checks
metric:
metrics:
- name: checks.execution_time
aggregate_tags:
- check_name
- name: pymem.inuse
schedule:
start_after: 123
iterations: 0
period: 456
- name: logs-and-metrics
metric:
exclude:
zero_metric: true
metrics:
- name: dogstatsd.udp_packets_bytes
- name: dogstatsd.uds_packets_bytes
schedule:
start_after: 30
iterations: 0
period: 900
- name: ondemand
events:
- name: agentbsod
request_type: agent-bsod
payload_key: agent_bsod
message: 'Agent BSOD'
- name: foobar
request_type: agent-foobar
payload_key: agent_foobar
message: 'Agent foobar'
- name: ondemand2
events:
- name: agentbsod
request_type: agent-bsod
payload_key: agent_bsod
message: 'Agent BSOD'
- name: barfoo
request_type: agent-barfoo
payload_key: agent_barfoo
message: 'Agent barfoo'
`

o := convertYamlStrToMap(t, c)
cfg := makeCfgMock(t, o)
atCfg, err := parseConfig(cfg)

require.NoError(t, err)

// single event map keeps unique event names
assert.Len(t, atCfg.events, 3)
assert.Len(t, atCfg.schedule, 2)
assert.Len(t, atCfg.Profiles, 4)
}

func TestAgentTelemetrySendRegisteredEvent(t *testing.T) {
// Use nearly full
var cfg = `
agent_telemetry:
enabled: true
use_compression: false
profiles:
- name: xxx
metric:
metrics:
- name: foo.bar
- name: ondemand
events:
- name: agentbsod
request_type: agent-bsod
payload_key: agent_bsod
message: 'Agent BSOD'
- name: foobar
request_type: agent-foobar
payload_key: agent_foobar
message: 'Agent foobar'
`

payloadObj := struct {
Date string `json:"date"`
Offender string `json:"offender"`
BugCheck string `json:"bugcheck"`
}{
Date: "2024-30-02 17:31:12",
Offender: "ddnpm+0x1a3",
BugCheck: "0x7A",
}
// conert to json
payload, err := json.Marshal(payloadObj)
require.NoError(t, err)

// setup and initiate atel
o := convertYamlStrToMap(t, cfg)
cl := newClientMock()
s := makeSenderImpl(t, cl, cfg)
r := newRunnerMock()
a := getTestAtel(t, nil, o, s, cl, r)
require.True(t, a.enabled)

a.start()
err = a.SendEvent("agentbsod", payload)
require.NoError(t, err)
assert.True(t, len(cl.(*clientMock).body) > 0)

//deserialize the payload of cl.(*clientMock).body
var topPayload map[string]interface{}
err = json.Unmarshal(cl.(*clientMock).body, &topPayload)
require.NoError(t, err)
fmt.Print(string(cl.(*clientMock).body))

v, ok, err2 := jsonquery.RunSingleOutput(".payload.message", topPayload)
require.NoError(t, err2)
require.True(t, ok)
assert.Equal(t, "Agent BSOD", v)

v, ok, err2 = jsonquery.RunSingleOutput(".payload.agent_bsod.offender", topPayload)
require.NoError(t, err2)
require.True(t, ok)
assert.Equal(t, "ddnpm+0x1a3", v)
}

func TestAgentTelemetrySendNonRegisteredEvent(t *testing.T) {
// Use nearly full
var cfg = `
agent_telemetry:
enabled: true
use_compression: false
profiles:
- name: xxx
metric:
metrics:
- name: foo.bar
- name: ondemand
events:
- name: agentbsod
request_type: agent-bsod
payload_key: agentbsod
message: 'Agent BSOD'
- name: foobar
request_type: agent-foobar
payload_key: agentfoobar
message: 'Agent foobar'
`

payloadObj := struct {
Date string `json:"date"`
Offender string `json:"offender"`
BugCheck string `json:"bugcheck"`
}{
Date: "2024-30-02 17:31:12",
Offender: "ddnpm+0x1a3",
BugCheck: "0x7A",
}
// conert to json
payload, err := json.Marshal(payloadObj)
require.NoError(t, err)

// setup and initiate atel
o := convertYamlStrToMap(t, cfg)
cl := newClientMock()
s := makeSenderImpl(t, cl, cfg)
r := newRunnerMock()
a := getTestAtel(t, nil, o, s, cl, r)
require.True(t, a.enabled)

a.start()
err = a.SendEvent("agentbsod2", payload)
require.Error(t, err)
}
Loading

0 comments on commit 40bf0b9

Please sign in to comment.