Skip to content

Commit

Permalink
sync container tests from main
Browse files Browse the repository at this point in the history
  • Loading branch information
chouetz committed Nov 20, 2024
1 parent 11b2344 commit e9999a2
Show file tree
Hide file tree
Showing 10 changed files with 894 additions and 87 deletions.
47 changes: 37 additions & 10 deletions test/fakeintake/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ const (
// ErrNoFlareAvailable is returned when no flare is available
var ErrNoFlareAvailable = errors.New("no flare available")

//nolint:revive // TODO(APL) Fix revive linter
// Client tbd
type Client struct {
fakeIntakeURL string

Expand Down Expand Up @@ -355,6 +355,39 @@ func (c *Client) FilterMetrics(name string, options ...MatchOpt[*aggregator.Metr
return filteredMetrics, nil
}

// filterPayload returns payloads matching any [MatchOpt](#MatchOpt) options
func filterPayload[T aggregator.PayloadItem](payloads []T, options ...MatchOpt[T]) ([]T, error) {
// apply filters one after the other
filteredPayloads := make([]T, 0, len(payloads))
for _, payload := range payloads {
matchCount := 0
for _, matchOpt := range options {
isMatch, err := matchOpt(payload)
if err != nil {
return nil, err
}
if !isMatch {
break
}
matchCount++
}
if matchCount == len(options) {
filteredPayloads = append(filteredPayloads, payload)
}
}
return filteredPayloads, nil
}

// FilterCheckRuns fetches fakeintake on `/api/v1/check_run` endpoint and returns
// metrics matching `name` and any [MatchOpt](#MatchOpt) options
func (c *Client) FilterCheckRuns(name string, options ...MatchOpt[*aggregator.CheckRun]) ([]*aggregator.CheckRun, error) {
checkRuns, err := c.GetCheckRun(name)
if err != nil {
return nil, err
}
return filterPayload(checkRuns, options...)
}

// WithTags filters by `tags`
func WithTags[P aggregator.PayloadItem](tags []string) MatchOpt[P] {
return func(payload P) (bool, error) {
Expand Down Expand Up @@ -401,9 +434,7 @@ func WithMetricValueLowerThan(maxValue float64) MatchOpt[*aggregator.MetricSerie
}
}

// WithMetricValueLowerThan filters metrics with values higher than `minValue`
//
//nolint:revive // TODO(APL) Fix revive linter
// WithMetricValueHigherThan filters metrics with values higher than `minValue`
func WithMetricValueHigherThan(minValue float64) MatchOpt[*aggregator.MetricSeries] {
return func(metric *aggregator.MetricSeries) (bool, error) {
for _, point := range metric.Points {
Expand All @@ -424,10 +455,8 @@ func (c *Client) getLog(service string) ([]*aggregator.Log, error) {
return c.logAggregator.GetPayloadsByName(service), nil
}

// GetLogNames fetches fakeintake on `/api/v2/logs` endpoint and returns
// GetLogServiceNames fetches fakeintake on `/api/v2/logs` endpoint and returns
// all received log service names
//
//nolint:revive // TODO(APL) Fix revive linter
func (c *Client) GetLogServiceNames() ([]string, error) {
err := c.getLogs()
if err != nil {
Expand Down Expand Up @@ -500,10 +529,8 @@ func (c *Client) GetCheckRunNames() ([]string, error) {
return c.checkRunAggregator.GetNames(), nil
}

// FilterLogs fetches fakeintake on `/api/v1/check_run` endpoint, unpackage payloads and returns
// GetCheckRun fetches fakeintake on `/api/v1/check_run` endpoint, unpackage payloads and returns
// checks matching `name`
//
//nolint:revive // TODO(APL) Fix revive linter
func (c *Client) GetCheckRun(name string) ([]*aggregator.CheckRun, error) {
err := c.getCheckRuns()
if err != nil {
Expand Down
153 changes: 146 additions & 7 deletions test/new-e2e/tests/containers/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"gopkg.in/zorkian/go-datadog-api.v2"

"github.com/DataDog/agent-payload/v5/gogen"

"github.com/DataDog/datadog-agent/pkg/util/pointer"
"github.com/DataDog/datadog-agent/test/fakeintake/aggregator"
fakeintake "github.com/DataDog/datadog-agent/test/fakeintake/client"
Expand Down Expand Up @@ -50,9 +51,18 @@ func (suite *baseSuite) TearDownSuite() {
suite.endTime = time.Now()
}

func (suite *baseSuite) BeforeTest(suiteName, testName string) {
suite.T().Logf("START %s/%s %s", suiteName, testName, time.Now())
}

func (suite *baseSuite) AfterTest(suiteName, testName string) {
suite.T().Logf("FINISH %s/%s %s", suiteName, testName, time.Now())
}

type testMetricArgs struct {
Filter testMetricFilterArgs
Expect testMetricExpectArgs
Filter testMetricFilterArgs
Expect testMetricExpectArgs
Optional testMetricExpectArgs
}

type testMetricFilterArgs struct {
Expand Down Expand Up @@ -97,6 +107,11 @@ func (suite *baseSuite) testMetric(args *testMetricArgs) {
expectedTags = lo.Map(*args.Expect.Tags, func(tag string, _ int) *regexp.Regexp { return regexp.MustCompile(tag) })
}

var optionalTags []*regexp.Regexp
if args.Optional.Tags != nil {
optionalTags = lo.Map(*args.Optional.Tags, func(tag string, _ int) *regexp.Regexp { return regexp.MustCompile(tag) })
}

sendEvent := func(alertType, text string) {
formattedArgs, err := yaml.Marshal(args)
suite.Require().NoError(err)
Expand Down Expand Up @@ -156,13 +171,13 @@ func (suite *baseSuite) testMetric(args *testMetricArgs) {
}
}()

filterTags := lo.Map(args.Filter.Tags, func(tag string, _ int) *regexp.Regexp {
regexTags := lo.Map(args.Filter.Tags, func(tag string, _ int) *regexp.Regexp {
return regexp.MustCompile(tag)
})

metrics, err := suite.Fakeintake.FilterMetrics(
args.Filter.Name,
fakeintake.WithMatchingTags[*aggregator.MetricSeries](filterTags),
fakeintake.WithMatchingTags[*aggregator.MetricSeries](regexTags),
)
// Can be replaced by require.NoErrorf(…) once https://github.com/stretchr/testify/pull/1481 is merged
if !assert.NoErrorf(c, err, "Failed to query fake intake") {
Expand All @@ -175,7 +190,7 @@ func (suite *baseSuite) testMetric(args *testMetricArgs) {

// Check tags
if expectedTags != nil {
err := assertTags(metrics[len(metrics)-1].GetTags(), expectedTags, args.Expect.AcceptUnexpectedTags)
err := assertTags(metrics[len(metrics)-1].GetTags(), expectedTags, optionalTags, args.Expect.AcceptUnexpectedTags)
assert.NoErrorf(c, err, "Tags mismatch on `%s`", prettyMetricQuery)
}

Expand Down Expand Up @@ -285,9 +300,13 @@ func (suite *baseSuite) testLog(args *testLogArgs) {
}
}()

regexTags := lo.Map(args.Filter.Tags, func(tag string, _ int) *regexp.Regexp {
return regexp.MustCompile(tag)
})

logs, err := suite.Fakeintake.FilterLogs(
args.Filter.Service,
fakeintake.WithTags[*aggregator.Log](args.Filter.Tags),
fakeintake.WithMatchingTags[*aggregator.Log](regexTags),
)
// Can be replaced by require.NoErrorf(…) once https://github.com/stretchr/testify/pull/1481 is merged
if !assert.NoErrorf(c, err, "Failed to query fake intake") {
Expand All @@ -300,7 +319,7 @@ func (suite *baseSuite) testLog(args *testLogArgs) {

// Check tags
if expectedTags != nil {
err := assertTags(logs[len(logs)-1].GetTags(), expectedTags, false)
err := assertTags(logs[len(logs)-1].GetTags(), expectedTags, []*regexp.Regexp{}, false)
assert.NoErrorf(c, err, "Tags mismatch on `%s`", prettyLogQuery)
}

Expand All @@ -316,3 +335,123 @@ func (suite *baseSuite) testLog(args *testLogArgs) {
}, 2*time.Minute, 10*time.Second, "Failed finding `%s` with proper tags and message", prettyLogQuery)
})
}

type testCheckRunArgs struct {
Filter testCheckRunFilterArgs
Expect testCheckRunExpectArgs
Optional testCheckRunExpectArgs
}

type testCheckRunFilterArgs struct {
Name string
// Tags are used to filter the checkRun
// Regexes are supported
Tags []string
}

type testCheckRunExpectArgs struct {
// Tags are the tags expected to be present
// Regexes are supported
Tags *[]string
AcceptUnexpectedTags bool
}

func (suite *baseSuite) testCheckRun(args *testCheckRunArgs) {
prettyCheckRunQuery := fmt.Sprintf("%s{%s}", args.Filter.Name, strings.Join(args.Filter.Tags, ","))

suite.Run("checkRun "+prettyCheckRunQuery, func() {
var expectedTags []*regexp.Regexp
if args.Expect.Tags != nil {
expectedTags = lo.Map(*args.Expect.Tags, func(tag string, _ int) *regexp.Regexp { return regexp.MustCompile(tag) })
}

var optionalTags []*regexp.Regexp
if args.Optional.Tags != nil {
optionalTags = lo.Map(*args.Optional.Tags, func(tag string, _ int) *regexp.Regexp { return regexp.MustCompile(tag) })
}

sendEvent := func(alertType, text string) {
formattedArgs, err := yaml.Marshal(args)
suite.Require().NoError(err)

tags := lo.Map(args.Filter.Tags, func(tag string, _ int) string {
return "filter_tag_" + tag
})

if _, err := suite.datadogClient.PostEvent(&datadog.Event{
Title: pointer.Ptr(fmt.Sprintf("testCheckRun %s", prettyCheckRunQuery)),
Text: pointer.Ptr(fmt.Sprintf(`%%%%%%
### Result
`+"```"+`
%s
`+"```"+`
### Query
`+"```"+`
%s
`+"```"+`
%%%%%%`, text, formattedArgs)),
AlertType: &alertType,
Tags: append([]string{
"app:agent-new-e2e-tests-containers",
"cluster_name:" + suite.clusterName,
"check_run:" + args.Filter.Name,
"test:" + suite.T().Name(),
}, tags...),
}); err != nil {
suite.T().Logf("Failed to post event: %s", err)
}
}

defer func() {
if suite.T().Failed() {
sendEvent("error", fmt.Sprintf("Failed finding %s with proper tags and value", prettyCheckRunQuery))
} else {
sendEvent("success", "All good!")
}
}()

suite.EventuallyWithTf(func(collect *assert.CollectT) {
c := &myCollectT{
CollectT: collect,
errors: []error{},
}
// To enforce the use of myCollectT instead
collect = nil //nolint:ineffassign

defer func() {
if len(c.errors) == 0 {
sendEvent("success", "All good!")
} else {
sendEvent("warning", errors.Join(c.errors...).Error())
}
}()

regexTags := lo.Map(args.Filter.Tags, func(tag string, _ int) *regexp.Regexp {
return regexp.MustCompile(tag)
})

checkRuns, err := suite.Fakeintake.FilterCheckRuns(
args.Filter.Name,
fakeintake.WithMatchingTags[*aggregator.CheckRun](regexTags),
)
// Can be replaced by require.NoErrorf(…) once https://github.com/stretchr/testify/pull/1481 is merged
if !assert.NoErrorf(c, err, "Failed to query fake intake") {
return
}
// Can be replaced by require.NoEmptyf(…) once https://github.com/stretchr/testify/pull/1481 is merged
if !assert.NotEmptyf(c, checkRuns, "No `%s` checkRun yet", prettyCheckRunQuery) {
return
}

// Check tags
if expectedTags != nil {
err := assertTags(checkRuns[len(checkRuns)-1].GetTags(), expectedTags, optionalTags, args.Expect.AcceptUnexpectedTags)
assert.NoErrorf(c, err, "Tags mismatch on `%s`", prettyCheckRunQuery)
}

}, 2*time.Minute, 10*time.Second, "Failed finding `%s` with proper tags and value", prettyCheckRunQuery)
})
}
62 changes: 57 additions & 5 deletions test/new-e2e/tests/containers/dump_cluster_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ import (
awsekstypes "github.com/aws/aws-sdk-go-v2/service/eks/types"
"golang.org/x/crypto/ssh"
"golang.org/x/crypto/ssh/agent"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/genericiooptions"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
kubectlget "k8s.io/kubectl/pkg/cmd/get"
Expand Down Expand Up @@ -163,11 +166,18 @@ func dumpKindClusterState(ctx context.Context, name string) (ret string) {
auth = append(auth, ssh.PublicKeys(signer))
}

sshClient, err := ssh.Dial("tcp", *instanceIP+":22", &ssh.ClientConfig{
User: "ubuntu",
Auth: auth,
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
})
var sshClient *ssh.Client
err = nil
for _, user := range []string{"ec2-user", "ubuntu"} {
sshClient, err = ssh.Dial("tcp", *instanceIP+":22", &ssh.ClientConfig{
User: user,
Auth: auth,
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
})
if err == nil {
break
}
}
if err != nil {
fmt.Fprintf(&out, "Failed to dial SSH server %s: %v\n", *instanceIP, err)
return
Expand Down Expand Up @@ -286,4 +296,46 @@ func dumpK8sClusterState(ctx context.Context, kubeconfig *clientcmdapi.Config, o
fmt.Fprintf(out, "Failed to execute Get command: %v\n", err)
return
}

// Get the logs of containers that have restarted
config, err := clientcmd.BuildConfigFromFlags("", kubeconfigFile.Name())
if err != nil {
fmt.Fprintf(out, "Failed to build Kubernetes config: %v\n", err)
return
}
k8sClient, err := kubernetes.NewForConfig(config)
if err != nil {
fmt.Fprintf(out, "Failed to create Kubernetes client: %v\n", err)
return
}

pods, err := k8sClient.CoreV1().Pods("").List(ctx, metav1.ListOptions{})
if err != nil {
fmt.Fprintf(out, "Failed to list pods: %v\n", err)
return
}

for _, pod := range pods.Items {
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerStatus.RestartCount > 0 {
fmt.Fprintf(out, "\nLOGS FOR POD %s/%s CONTAINER %s:\n", pod.Namespace, pod.Name, containerStatus.Name)
logs, err := k8sClient.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{
Container: containerStatus.Name,
Previous: true,
// TailLines: pointer.Ptr(int64(100)),
}).Stream(ctx)
if err != nil {
fmt.Fprintf(out, "Failed to get logs: %v\n", err)
continue
}
defer logs.Close()

_, err = io.Copy(out, logs)
if err != nil {
fmt.Fprintf(out, "Failed to copy logs: %v\n", err)
continue
}
}
}
}
}
Loading

0 comments on commit e9999a2

Please sign in to comment.