Skip to content

Commit

Permalink
[PROCS-4228] Add E2E tests in K8s for running the process checks in t…
Browse files Browse the repository at this point in the history
…he core agent (#28640)
  • Loading branch information
daniel-taf authored Aug 28, 2024
1 parent fb92620 commit c81921c
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 20 deletions.
16 changes: 14 additions & 2 deletions test/new-e2e/tests/process/config/helm-template.tmpl
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
datadog:
processAgent:
enabled: {{.ProcessAgentEnabled}}
processCollection: {{.ProcessCollection}}
processDiscovery: {{.ProcessDiscoveryCollection}}
containerCollection: {{.ContainerCollection}}
runInCoreAgent: false
networkMonitoring:
enabled: false
enabled: {{.NetworkPerformanceMonitoring}}

agents:
containers:
agent:
env:
- name: DD_PROCESS_CONFIG_RUN_IN_CORE_AGENT_ENABLED
value: {{.RunInCoreAgent}}
processAgent:
env:
- name: DD_PROCESS_CONFIG_RUN_IN_CORE_AGENT_ENABLED
value: {{.RunInCoreAgent}}
115 changes: 97 additions & 18 deletions test/new-e2e/tests/process/k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ import (
var helmTemplate string

type helmConfig struct {
ProcessAgentEnabled bool
ProcessCollection bool
ProcessDiscoveryCollection bool
ProcessCollection bool
ProcessDiscoveryCollection bool
ContainerCollection bool
RunInCoreAgent bool
NetworkPerformanceMonitoring bool
}

func createHelmValues(cfg helmConfig) (string, error) {
Expand All @@ -64,8 +66,7 @@ type K8sSuite struct {
func TestK8sTestSuite(t *testing.T) {
t.Parallel()
helmValues, err := createHelmValues(helmConfig{
ProcessAgentEnabled: true,
ProcessCollection: true,
ProcessCollection: true,
})
require.NoError(t, err)

Expand Down Expand Up @@ -103,12 +104,24 @@ func (s *K8sSuite) TestProcessCheck() {
assertContainersCollected(t, payloads, []string{"stress-ng"})
}

func (s *K8sSuite) TestManualProcessCheck() {
checkOutput := execProcessAgentCheck(s.T(), s.Env().KubernetesCluster, "process")
assertManualProcessCheck(s.T(), checkOutput, false, "stress-ng-cpu [run]", "stress-ng")
}

func (s *K8sSuite) TestManualProcessDiscoveryCheck() {
checkOutput := execProcessAgentCheck(s.T(), s.Env().KubernetesCluster, "process_discovery")
assertManualProcessDiscoveryCheck(s.T(), checkOutput, "stress-ng-cpu [run]")
}

func (s *K8sSuite) TestManualContainerCheck() {
checkOutput := execProcessAgentCheck(s.T(), s.Env().KubernetesCluster, "container")
assertManualContainerCheck(s.T(), checkOutput, "stress-ng")
}

func (s *K8sSuite) TestProcessDiscoveryCheck() {
s.T().Skip("WIP: test is failing as process collection is still enabled with 'DD_PROCESS_AGENT_ENABLED=true'." +
"The bug appears to be in test-infra-definitions and it's default helm values taking precedence")
t := s.T()
helmValues, err := createHelmValues(helmConfig{
ProcessAgentEnabled: true,
ProcessDiscoveryCollection: true,
})
require.NoError(t, err)
Expand Down Expand Up @@ -136,19 +149,85 @@ func (s *K8sSuite) TestProcessDiscoveryCheck() {
assertProcessDiscoveryCollected(t, payloads, "stress-ng-cpu [run]")
}

func (s *K8sSuite) TestManualProcessCheck() {
checkOutput := execProcessAgentCheck(s.T(), s.Env().KubernetesCluster, "process")
assertManualProcessCheck(s.T(), checkOutput, false, "stress-ng-cpu [run]", "stress-ng")
}
func (s *K8sSuite) TestProcessCheckInCoreAgent() {
t := s.T()
helmValues, err := createHelmValues(helmConfig{
ProcessCollection: true,
RunInCoreAgent: true,
})
require.NoError(t, err)

func (s *K8sSuite) TestManualProcessDiscoveryCheck() {
checkOutput := execProcessAgentCheck(s.T(), s.Env().KubernetesCluster, "process_discovery")
assertManualProcessDiscoveryCheck(s.T(), checkOutput, "stress-ng-cpu [run]")
s.UpdateEnv(awskubernetes.KindProvisioner(
awskubernetes.WithWorkloadApp(func(e config.Env, kubeProvider *kubernetes.Provider) (*kubeComp.Workload, error) {
return cpustress.K8sAppDefinition(e, kubeProvider, "workload-stress")
}),
awskubernetes.WithAgentOptions(kubernetesagentparams.WithHelmValues(helmValues)),
))

assert.EventuallyWithT(t, func(*assert.CollectT) {
status := k8sAgentStatus(t, s.Env().KubernetesCluster)

// verify the standalone process-agent is not running
assert.NotEmpty(t, status.ProcessAgentStatus.Error, "status: %+v", status)
assert.Empty(t, status.ProcessAgentStatus.Expvars.Map.EnabledChecks)

// Verify the process component is running in the core agent
assert.ElementsMatch(t, []string{"process", "rtprocess"}, status.ProcessComponentStatus.Expvars.Map.EnabledChecks)
}, 2*time.Minute, 5*time.Second)

// Flush fake intake to remove any payloads which may have
s.Env().FakeIntake.Client().FlushServerAndResetAggregators()

var payloads []*aggregator.ProcessPayload
assert.EventuallyWithT(t, func(c *assert.CollectT) {
var err error
payloads, err = s.Env().FakeIntake.Client().GetProcesses()
assert.NoError(c, err, "failed to get process payloads from fakeintake")
// Wait for two payloads, as processes must be detected in two check runs to be returned
assert.GreaterOrEqual(c, len(payloads), 2, "fewer than 2 payloads returned")
}, 2*time.Minute, 10*time.Second)

assertProcessCollected(t, payloads, false, "stress-ng-cpu [run]")
assertContainersCollected(t, payloads, []string{"stress-ng"})
requireProcessNotCollected(t, payloads, "process-agent")
assertContainersNotCollected(t, payloads, []string{"process-agent"})
}

func (s *K8sSuite) TestManualContainerCheck() {
checkOutput := execProcessAgentCheck(s.T(), s.Env().KubernetesCluster, "container")
assertManualContainerCheck(s.T(), checkOutput, "stress-ng")
func (s *K8sSuite) TestProcessCheckInCoreAgentWithNPM() {
t := s.T()
helmValues, err := createHelmValues(helmConfig{
ProcessCollection: true,
RunInCoreAgent: true,
NetworkPerformanceMonitoring: true,
})
require.NoError(t, err)

s.UpdateEnv(awskubernetes.KindProvisioner(
awskubernetes.WithWorkloadApp(func(e config.Env, kubeProvider *kubernetes.Provider) (*kubeComp.Workload, error) {
return cpustress.K8sAppDefinition(e, kubeProvider, "workload-stress")
}),
awskubernetes.WithAgentOptions(kubernetesagentparams.WithHelmValues(helmValues)),
))

assert.EventuallyWithT(t, func(*assert.CollectT) {
status := k8sAgentStatus(t, s.Env().KubernetesCluster)
assert.ElementsMatch(t, []string{"process", "rtprocess"}, status.ProcessComponentStatus.Expvars.Map.EnabledChecks)
assert.ElementsMatch(t, []string{"connections"}, status.ProcessAgentStatus.Expvars.Map.EnabledChecks)
}, 2*time.Minute, 5*time.Second)

var payloads []*aggregator.ProcessPayload
assert.EventuallyWithT(t, func(c *assert.CollectT) {
var err error
payloads, err = s.Env().FakeIntake.Client().GetProcesses()
assert.NoError(c, err, "failed to get process payloads from fakeintake")

// Wait for two payloads, as processes must be detected in two check runs to be returned
assert.GreaterOrEqual(c, len(payloads), 2, "fewer than 2 payloads returned")
}, 2*time.Minute, 10*time.Second)

assertProcessCollected(t, payloads, false, "stress-ng-cpu [run]")
assertProcessCollected(t, payloads, false, "process-agent")
assertContainersCollected(t, payloads, []string{"stress-ng", "process-agent"})
}

func execProcessAgentCheck(t *testing.T, cluster *components.KubernetesCluster, check string) string {
Expand Down
15 changes: 15 additions & 0 deletions test/new-e2e/tests/process/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,21 @@ func assertContainersCollected(t *testing.T, payloads []*aggregator.ProcessPaylo
}
}

// assertContainersNotCollected asserts that the given containers are not collected
func assertContainersNotCollected(t *testing.T, payloads []*aggregator.ProcessPayload, containers []string) {
for _, container := range containers {
var found bool
for _, payload := range payloads {
if findContainer(container, payload.Containers) {
found = true
t.Logf("Payload:\n%+v\n", payload)
break
}
}
assert.False(t, found, "%s container found", container)
}
}

// findContainer returns whether the container with the given name exists in the given list of
// containers and whether it has the expected data populated
func findContainer(name string, containers []*agentmodel.Container) bool {
Expand Down

0 comments on commit c81921c

Please sign in to comment.