diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 0f114dcaa4a..cbd0ae35884 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -140,7 +140,7 @@ steps: key: "serverless-integration-tests" env: TEST_INTEG_AUTH_ESS_REGION: us-east-1 - command: ".buildkite/scripts/steps/integration_tests.sh serverless integration:single TestMonitoringLogsShipped" #right now, run a single test in serverless mode as a sort of smoke test, instead of re-running the entire suite + command: ".buildkite/scripts/steps/integration_tests.sh serverless integration:single TestLogIngestionFleetManaged" #right now, run a single test in serverless mode as a sort of smoke test, instead of re-running the entire suite artifact_paths: - "build/TEST-**" - "build/diagnostics/*" diff --git a/changelog/fragments/1695389490-Support-flattened-data_stream.-fields.yaml b/changelog/fragments/1695389490-Support-flattened-data_stream.-fields.yaml new file mode 100644 index 00000000000..1ce991c2c38 --- /dev/null +++ b/changelog/fragments/1695389490-Support-flattened-data_stream.-fields.yaml @@ -0,0 +1,35 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Support flattened data_stream.* fields in input configuration + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +description: >- + An input configuration supports flattened fields, however the + 'data_stream' field was not being correctly decoded when + flattened. This commit fixes this issue. + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/3465 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/3191 diff --git a/pkg/component/component_test.go b/pkg/component/component_test.go index 424f3a93147..00c4d1c63cb 100644 --- a/pkg/component/component_test.go +++ b/pkg/component/component_test.go @@ -2361,3 +2361,74 @@ func gatherDurationFieldPaths(s interface{}, pathSoFar string) []string { return gatheredPaths } + +func TestFlattenedDataStream(t *testing.T) { + expectedNamespace := "test-namespace" + expectedType := "test-type" + expectedDataset := "test-dataset" + + policy := map[string]any{ + "outputs": map[string]any{ + "default": map[string]any{ + "type": "elasticsearch", + "enabled": true, + }, + }, + "inputs": []any{ + map[string]any{ + "type": "filestream", + "id": "filestream-0", + "enabled": true, + "data_stream.type": expectedType, + "data_stream.dataset": expectedDataset, + "data_stream": map[string]any{ + "namespace": expectedNamespace, + }, + }, + }, + } + runtime, err := LoadRuntimeSpecs(filepath.Join("..", "..", "specs"), PlatformDetail{}, SkipBinaryCheck()) + if err != nil { + t.Fatalf("cannot load runtime specs: %s", err) + } + + result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil) + if err != nil { + t.Fatalf("cannot convert policy to component: %s", err) + } + + if len(result) != 1 { + t.Fatalf("expecting result to have one element, got %d", len(result)) + } + + if len(result[0].Units) != 2 { + t.Fatalf("expecting result[0].Units to have two elements, got %d", len(result)) + } + + // We do not make assumptions about ordering. + // Get the input Unit + var dataStream *proto.DataStream + for _, unit := range result[0].Units { + if unit.Err != nil { + t.Fatalf("unit.Err: %s", unit.Err) + } + if unit.Type == client.UnitTypeInput { + dataStream = unit.Config.DataStream + break + } + } + + if dataStream == nil { + t.Fatal("DataStream cannot be nil") + } + + if dataStream.Dataset != expectedDataset { + t.Errorf("expecting DataStream.Dataset: %q, got: %q", expectedDataset, dataStream.Dataset) + } + if dataStream.Type != expectedType { + t.Errorf("expecting DataStream.Type: %q, got: %q", expectedType, dataStream.Type) + } + if dataStream.Namespace != expectedNamespace { + t.Errorf("expecting DataStream.Namespace: %q, got: %q", expectedNamespace, dataStream.Namespace) + } +} diff --git a/pkg/component/config.go b/pkg/component/config.go index a0c75d00e32..781e2e7624f 100644 --- a/pkg/component/config.go +++ b/pkg/component/config.go @@ -15,6 +15,7 @@ import ( "google.golang.org/protobuf/types/known/structpb" "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent/pkg/limits" ) @@ -100,9 +101,88 @@ func ExpectedConfig(cfg map[string]interface{}) (*proto.UnitExpectedConfig, erro return nil, err } + if err := updateDataStreamsFromSource(result); err != nil { + return nil, fmt.Errorf("could not dedot 'data_stream': %w", err) + } + return result, nil } +func deDotDataStream(ds *proto.DataStream, source *structpb.Struct) (*proto.DataStream, error) { + if ds == nil { + ds = &proto.DataStream{} + } + + cfg, err := config.NewConfigFrom(source.AsMap()) + if err != nil { + return nil, fmt.Errorf("cannot generate config from source field: %w", err) + } + + // Create a temporary struct to unpack the configuration. + // Unpack correctly handles any flattened fields like + // data_stream.type. So all we need to do is to call Unpack, + // ensure the DataStream does not have a different value, + // them merge them both. + tmp := struct { + DataStream struct { + Dataset string `config:"dataset" yaml:"dataset"` + Type string `config:"type" yaml:"type"` + Namespace string `config:"namespace" yaml:"namespace"` + } `config:"data_stream" yaml:"data_stream"` + }{} + + if err := cfg.Unpack(&tmp); err != nil { + return nil, fmt.Errorf("cannot unpack source field into struct: %w", err) + } + + if (ds.Dataset != tmp.DataStream.Dataset) && (ds.Dataset != "" && tmp.DataStream.Dataset != "") { + return nil, errors.New("duplicated key 'datastream.dataset'") + } + + if (ds.Type != tmp.DataStream.Type) && (ds.Type != "" && tmp.DataStream.Type != "") { + return nil, errors.New("duplicated key 'datastream.type'") + } + + if (ds.Namespace != tmp.DataStream.Namespace) && (ds.Namespace != "" && tmp.DataStream.Namespace != "") { + return nil, errors.New("duplicated key 'datastream.namespace'") + } + + ret := &proto.DataStream{ + Dataset: valueOrDefault(tmp.DataStream.Dataset, ds.Dataset), + Type: valueOrDefault(tmp.DataStream.Type, ds.Type), + Namespace: valueOrDefault(tmp.DataStream.Namespace, ds.Namespace), + Source: ds.GetSource(), + } + + return ret, nil +} + +// valueOrDefault returns b if a is an empty string +func valueOrDefault(a, b string) string { + if a == "" { + return b + } + return a +} + +func updateDataStreamsFromSource(unitConfig *proto.UnitExpectedConfig) error { + var err error + unitConfig.DataStream, err = deDotDataStream(unitConfig.GetDataStream(), unitConfig.GetSource()) + if err != nil { + return fmt.Errorf("could not parse data_stream from input: %w", err) + } + + for i, stream := range unitConfig.Streams { + stream.DataStream, err = deDotDataStream(stream.GetDataStream(), stream.GetSource()) + if err != nil { + return fmt.Errorf("could not parse data_stream from stream [%d]: %w", + i, err) + } + } + + return nil +} + func setSource(val interface{}, cfg map[string]interface{}) error { // find the source field on the val resVal := reflect.ValueOf(val).Elem() diff --git a/pkg/component/config_test.go b/pkg/component/config_test.go index 64dcfe3a697..7cdef177829 100644 --- a/pkg/component/config_test.go +++ b/pkg/component/config_test.go @@ -8,8 +8,10 @@ import ( "errors" "testing" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/known/structpb" "github.com/elastic/elastic-agent-client/v7/pkg/proto" @@ -197,7 +199,12 @@ func TestExpectedConfig(t *testing.T) { assert.Equal(t, err.Error(), scenario.Err.Error()) } else { require.NoError(t, err) - assert.EqualValues(t, scenario.Expected, observed) + // protocmp.Transform ensures we do not compare any internal + // protobuf fields + if !cmp.Equal(scenario.Expected, observed, protocmp.Transform()) { + t.Errorf("mismatch (-want +got) \n%s", + cmp.Diff(scenario.Expected, observed, protocmp.Transform())) + } } }) } diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index 1d852e50277..3774808efaa 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -357,7 +357,14 @@ func (f *Fixture) RunBeat(ctx context.Context) error { // Elastic Agent is stopped. If at any time the Elastic Agent logs an error log and the Fixture is not started // with `WithAllowErrors()` then `Run` will exit early and return the logged error. // -// If no `states` are provided then the Elastic Agent runs until the context is or the timeout specified with WithRunLength is reached. +// If no `states` are provided then the Elastic Agent runs until the context is cancelled. +// +// The Elastic-Agent is started agent in test mode (--testing-mode) this mode +// expects the initial configuration (full YAML config) via gRPC. +// This configuration should be passed in the State.Configure field. +// +// The `elastic-agent.yml` generated by `Fixture.Configure` is ignored +// when `Run` is called. func (f *Fixture) Run(ctx context.Context, states ...State) error { if f.binaryName != "elastic-agent" { return errors.New("Run() can only be used with elastic-agent, use RunBeat()") diff --git a/pkg/testing/tools/estools/elasticsearch.go b/pkg/testing/tools/estools/elasticsearch.go index d6bf69369cd..304e917d7ee 100644 --- a/pkg/testing/tools/estools/elasticsearch.go +++ b/pkg/testing/tools/estools/elasticsearch.go @@ -438,9 +438,9 @@ func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictranspor } -// GetLogsForDatastream returns any logs associated with the datastream -func GetLogsForDatastream(client elastictransport.Interface, index string) (Documents, error) { - return GetLogsForDatastreamWithContext(context.Background(), client, index) +// GetLogsForDataset returns any logs associated with the datastream +func GetLogsForDataset(client elastictransport.Interface, index string) (Documents, error) { + return GetLogsForDatasetWithContext(context.Background(), client, index) } // GetLogsForAgentID returns any logs associated with the agent ID @@ -478,8 +478,8 @@ func GetLogsForAgentID(client elastictransport.Interface, id string) (Documents, return handleDocsResponse(res) } -// GetLogsForDatastreamWithContext returns any logs associated with the datastream -func GetLogsForDatastreamWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) { +// GetLogsForDatasetWithContext returns any logs associated with the datastream +func GetLogsForDatasetWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) { indexQuery := map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ @@ -536,6 +536,60 @@ func performQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{ return handleDocsResponse(res) } +// GetLogsForDatastream returns any logs associated with the datastream +func GetLogsForDatastream( + ctx context.Context, + client elastictransport.Interface, + dsType, dataset, namespace string) (Documents, error) { + + query := map[string]any{ + "_source": []string{"message"}, + "query": map[string]any{ + "bool": map[string]any{ + "must": []any{ + map[string]any{ + "match": map[string]any{ + "data_stream.dataset": dataset, + }, + }, + map[string]any{ + "match": map[string]any{ + "data_stream.namespace": namespace, + }, + }, + map[string]any{ + "match": map[string]any{ + "data_stream.type": dsType, + }, + }, + }, + }, + }, + } + + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(query); err != nil { + return Documents{}, fmt.Errorf("error creating ES query: %w", err) + } + + es := esapi.New(client) + res, err := es.Search( + es.Search.WithIndex(fmt.Sprintf(".ds-%s*", dsType)), + es.Search.WithExpandWildcards("all"), + es.Search.WithBody(&buf), + es.Search.WithTrackTotalHits(true), + es.Search.WithPretty(), + es.Search.WithContext(ctx), + ) + if err != nil { + return Documents{}, fmt.Errorf("error performing ES search: %w", err) + } + + return handleDocsResponse(res) +} + +// handleDocsResponse converts the esapi.Response into Documents, +// it closes the response.Body after reading func handleDocsResponse(res *esapi.Response) (Documents, error) { resultBuf, err := handleResponseRaw(res) if err != nil { diff --git a/testing/integration/logs_ingestion_test.go b/testing/integration/logs_ingestion_test.go new file mode 100644 index 00000000000..ba9a84673b0 --- /dev/null +++ b/testing/integration/logs_ingestion_test.go @@ -0,0 +1,430 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "math/rand" + "net/http" + "net/http/httputil" + "os" + "path/filepath" + "regexp" + "strings" + "testing" + "text/template" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent-libs/kibana" + "github.com/elastic/elastic-agent/pkg/control/v2/client" + atesting "github.com/elastic/elastic-agent/pkg/testing" + "github.com/elastic/elastic-agent/pkg/testing/define" + "github.com/elastic/elastic-agent/pkg/testing/tools" + "github.com/elastic/elastic-agent/pkg/testing/tools/check" + "github.com/elastic/elastic-agent/pkg/testing/tools/estools" + "github.com/elastic/elastic-agent/pkg/testing/tools/fleettools" + "github.com/elastic/elastic-transport-go/v8/elastictransport" +) + +func TestLogIngestionFleetManaged(t *testing.T) { + info := define.Require(t, define.Requirements{ + Stack: &define.Stack{}, + Local: false, + Sudo: true, + }) + ctx := context.Background() + + agentFixture, err := define.NewFixture(t, define.Version()) + require.NoError(t, err) + + // 1. Create a policy in Fleet with monitoring enabled. + // To ensure there are no conflicts with previous test runs against + // the same ESS stack, we add the current time at the end of the policy + // name. This policy does not contain any integration. + t.Log("Enrolling agent in Fleet with a test policy") + createPolicyReq := kibana.AgentPolicy{ + Name: fmt.Sprintf("test-policy-enroll-%d", time.Now().Unix()), + Namespace: info.Namespace, + Description: "test policy for agent enrollment", + MonitoringEnabled: []kibana.MonitoringEnabledOption{ + kibana.MonitoringEnabledLogs, + kibana.MonitoringEnabledMetrics, + }, + AgentFeatures: []map[string]interface{}{ + { + "name": "test_enroll", + "enabled": true, + }, + }, + } + + installOpts := atesting.InstallOpts{ + NonInteractive: true, + Force: true, + } + + // 2. Install the Elastic-Agent with the policy that + // was just created. + policy, err := tools.InstallAgentWithPolicy( + ctx, + t, + installOpts, + agentFixture, + info.KibanaClient, + createPolicyReq) + require.NoError(t, err) + t.Logf("created policy: %s", policy.ID) + check.ConnectedToFleet(t, agentFixture, 5*time.Minute) + + t.Run("Monitoring logs are shipped", func(t *testing.T) { + testMonitoringLogsAreShipped(t, ctx, info, agentFixture, policy) + }) + + t.Run("Normal logs with flattened data_stream are shipped", func(t *testing.T) { + testFlattenedDatastreamFleetPolicy(t, ctx, info, agentFixture, policy) + }) +} + +func testMonitoringLogsAreShipped( + t *testing.T, + ctx context.Context, + info *define.Info, + agentFixture *atesting.Fixture, + policy kibana.PolicyResponse, +) { + // Stage 1: Make sure metricbeat logs are populated + t.Log("Making sure metricbeat logs are populated") + docs := findESDocs(t, func() (estools.Documents, error) { + return estools.GetLogsForDataset(info.ESClient, "elastic_agent.metricbeat") + }) + t.Logf("metricbeat: Got %d documents", len(docs.Hits.Hits)) + require.NotZero(t, len(docs.Hits.Hits)) + + // Stage 2: make sure all components are healthy + t.Log("Making sure all components are healthy") + status, err := agentFixture.ExecStatus(ctx) + require.NoError(t, err, + "could not get agent status to verify all components are healthy") + for _, c := range status.Components { + assert.Equalf(t, client.Healthy, client.State(c.State), + "component %s: want %s, got %s", + c.Name, client.Healthy, client.State(c.State)) + } + + // Stage 3: Make sure there are no errors in logs + t.Log("Making sure there are no error logs") + docs = findESDocs(t, func() (estools.Documents, error) { + return estools.CheckForErrorsInLogs(info.ESClient, info.Namespace, []string{ + // acceptable error messages (include reason) + "Error dialing dial tcp 127.0.0.1:9200: connect: connection refused", // beat is running default config before its config gets updated + "Global configuration artifact is not available", // Endpoint: failed to load user artifact due to connectivity issues + "Failed to download artifact", + "Failed to initialize artifact", + "Failed to apply initial policy from on disk configuration", + "elastic-agent-client error: rpc error: code = Canceled desc = context canceled", // can happen on restart + }) + }) + t.Logf("errors: Got %d documents", len(docs.Hits.Hits)) + for _, doc := range docs.Hits.Hits { + t.Logf("%#v", doc.Source) + } + require.Empty(t, docs.Hits.Hits) + + // Stage 4: Make sure we have message confirming central management is running + t.Log("Making sure we have message confirming central management is running") + docs = findESDocs(t, func() (estools.Documents, error) { + return estools.FindMatchingLogLines(info.ESClient, info.Namespace, + "Parsed configuration and determined agent is managed by Fleet") + }) + require.NotZero(t, len(docs.Hits.Hits)) + + // Stage 5: verify logs from the monitoring components are not sent to the output + t.Log("Check monitoring logs") + hostname, err := os.Hostname() + if err != nil { + t.Fatalf("could not get hostname to filter Agent: %s", err) + } + + agentID, err := fleettools.GetAgentIDByHostname(info.KibanaClient, policy.ID, hostname) + require.NoError(t, err, "could not get Agent ID by hostname") + t.Logf("Agent ID: %q", agentID) + + // We cannot search for `component.id` because at the moment of writing + // this field is not mapped. There is an issue for that: + // https://github.com/elastic/integrations/issues/6545 + // TODO: use runtime fields while the above issue is not resolved. + + docs = findESDocs(t, func() (estools.Documents, error) { + return estools.GetLogsForAgentID(info.ESClient, agentID) + }) + require.NoError(t, err, "could not get logs from Agent ID: %q, err: %s", + agentID, err) + + monRegExp := regexp.MustCompile(".*-monitoring$") + for i, d := range docs.Hits.Hits { + // Lazy way to navigate a map[string]any: convert to JSON then + // decode into a struct. + jsonData, err := json.Marshal(d.Source) + if err != nil { + t.Fatalf("could not encode document source as JSON: %s", err) + } + + doc := ESDocument{} + if err := json.Unmarshal(jsonData, &doc); err != nil { + t.Fatalf("could not unmarshal document source: %s", err) + } + + if monRegExp.MatchString(doc.Component.ID) { + t.Errorf("[%d] Document on index %q with 'component.id': %q "+ + "and 'elastic_agent.id': %q. 'elastic_agent.id' must not "+ + "end in '-monitoring'\n", + i, d.Index, doc.Component.ID, doc.ElasticAgent.ID) + } + } +} + +func findESDocs(t *testing.T, findFn func() (estools.Documents, error)) estools.Documents { + var docs estools.Documents + require.Eventually( + t, + func() bool { + var err error + docs, err = findFn() + return err == nil + }, + 3*time.Minute, + 15*time.Second, + ) + + return docs +} + +func testFlattenedDatastreamFleetPolicy( + t *testing.T, + ctx context.Context, + info *define.Info, + agentFixture *atesting.Fixture, + policy kibana.PolicyResponse, +) { + dsType := "logs" + dsNamespace := cleanString(fmt.Sprintf("%snamespace%d", t.Name(), rand.Uint64())) + dsDataset := cleanString(fmt.Sprintf("%s-dataset", t.Name())) + numEvents := 60 + + tempDir := t.TempDir() + logFilePath := filepath.Join(tempDir, "log.log") + generateLogFile(t, logFilePath, 2*time.Millisecond, numEvents) + + agentFixture, err := define.NewFixture(t, define.Version()) + if err != nil { + t.Fatalf("could not create new fixture: %s", err) + } + + // 1. Prepare a request to add an integration to the policy + tmpl, err := template.New(t.Name() + "custom-log-policy").Parse(policyJSON) + if err != nil { + t.Fatalf("cannot parse template: %s", err) + } + + // The time here ensures there are no conflicts with the integration name + // in Fleet. + agentPolicyBuilder := strings.Builder{} + err = tmpl.Execute(&agentPolicyBuilder, policyVars{ + Name: "Log-Input-" + t.Name() + "-" + time.Now().Format(time.RFC3339), + PolicyID: policy.ID, + LogFilePath: logFilePath, + Namespace: dsNamespace, + Dataset: dsDataset, + }) + if err != nil { + t.Fatalf("could not render template: %s", err) + } + // We keep a copy of the policy for debugging prurposes + agentPolicy := agentPolicyBuilder.String() + + // 2. Call Kibana to create the policy. + // Docs: https://www.elastic.co/guide/en/fleet/current/fleet-api-docs.html#create-integration-policy-api + resp, err := info.KibanaClient.Connection.Send( + http.MethodPost, + "/api/fleet/package_policies", + nil, + nil, + bytes.NewBufferString(agentPolicy)) + if err != nil { + t.Fatalf("could not execute request to Kibana/Fleet: %s", err) + } + if resp.StatusCode != http.StatusOK { + // On error dump the whole request response so we can easily spot + // what went wrong. + t.Errorf("received a non 200-OK when adding package to policy. "+ + "Status code: %d", resp.StatusCode) + respDump, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("could not dump error response from Kibana: %s", err) + } + // Make debugging as easy as possible + t.Log("================================================================================") + t.Log("Kibana error response:") + t.Log(string(respDump)) + t.Log("================================================================================") + t.Log("Rendered policy:") + t.Log(agentPolicy) + t.Log("================================================================================") + t.FailNow() + } + + require.Eventually( + t, + ensureDocumentsInES(t, ctx, info.ESClient, dsType, dsDataset, dsNamespace, numEvents), + 120*time.Second, + time.Second, + "could not get all expected documents form ES") +} + +// ensureDocumentsInES asserts the documents were ingested into the correct +// datastream +func ensureDocumentsInES( + t *testing.T, + ctx context.Context, + esClient elastictransport.Interface, + dsType, dsDataset, dsNamespace string, + numEvents int, +) func() bool { + + f := func() bool { + t.Helper() + + docs, err := estools.GetLogsForDatastream(ctx, esClient, dsType, dsDataset, dsNamespace) + if err != nil { + t.Logf("error quering ES, will retry later: %s", err) + } + + if docs.Hits.Total.Value == numEvents { + return true + } + + return false + + } + + return f +} + +// generateLogFile generates a log file by appending new lines every tick +// the lines are composed by the test name and the current time in RFC3339Nano +// This function spans a new goroutine and does not block +func generateLogFile(t *testing.T, fullPath string, tick time.Duration, events int) { + t.Helper() + f, err := os.Create(fullPath) + if err != nil { + t.Fatalf("could not create file '%s: %s", fullPath, err) + } + + go func() { + t.Helper() + ticker := time.NewTicker(tick) + t.Cleanup(ticker.Stop) + + done := make(chan struct{}) + t.Cleanup(func() { close(done) }) + + defer func() { + if err := f.Close(); err != nil { + t.Errorf("could not close log file '%s': %s", fullPath, err) + } + }() + + i := 0 + for { + select { + case <-done: + return + case now := <-ticker.C: + i++ + _, err := fmt.Fprintln(f, t.Name(), "Iteration: ", i, now.Format(time.RFC3339Nano)) + if err != nil { + // The Go compiler does not allow me to call t.Fatalf from a non-test + // goroutine, t.Errorf is our only option + t.Errorf("could not write data to log file '%s': %s", fullPath, err) + return + } + // make sure log lines are synced as quickly as possible + if err := f.Sync(); err != nil { + t.Errorf("could not sync file '%s': %s", fullPath, err) + } + if i == events { + return + } + } + } + }() +} + +func cleanString(s string) string { + return nonAlphanumericRegex.ReplaceAllString(strings.ToLower(s), "") +} + +var nonAlphanumericRegex = regexp.MustCompile(`[^a-zA-Z0-9 ]+`) + +var policyJSON = ` +{ + "policy_id": "{{.PolicyID}}", + "package": { + "name": "log", + "version": "2.3.0" + }, + "name": "{{.Name}}", + "namespace": "{{.Namespace}}", + "inputs": { + "logs-logfile": { + "enabled": true, + "streams": { + "log.logs": { + "enabled": true, + "vars": { + "paths": [ + "{{.LogFilePath | js}}" {{/* we need to escape windows paths */}} + ], + "data_stream.dataset": "{{.Dataset}}" + } + } + } + } + } +}` + +type policyVars struct { + Name string + PolicyID string + LogFilePath string + Namespace string + Dataset string +} + +type ESDocument struct { + ElasticAgent ElasticAgent `json:"elastic_agent"` + Component Component `json:"component"` + Host Host `json:"host"` +} +type ElasticAgent struct { + ID string `json:"id"` + Version string `json:"version"` + Snapshot bool `json:"snapshot"` +} +type Component struct { + Binary string `json:"binary"` + ID string `json:"id"` +} +type Host struct { + Hostname string `json:"hostname"` +} diff --git a/testing/integration/monitoring_logs_test.go b/testing/integration/monitoring_logs_test.go deleted file mode 100644 index 5ebce0043de..00000000000 --- a/testing/integration/monitoring_logs_test.go +++ /dev/null @@ -1,213 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build integration - -package integration - -import ( - "context" - "encoding/json" - "fmt" - "os" - "regexp" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/elastic/elastic-agent-libs/kibana" - "github.com/elastic/elastic-agent/pkg/control/v2/client" - atesting "github.com/elastic/elastic-agent/pkg/testing" - "github.com/elastic/elastic-agent/pkg/testing/define" - "github.com/elastic/elastic-agent/pkg/testing/tools" - "github.com/elastic/elastic-agent/pkg/testing/tools/check" - "github.com/elastic/elastic-agent/pkg/testing/tools/estools" - "github.com/elastic/elastic-agent/pkg/testing/tools/fleettools" -) - -func TestMonitoringLogsShipped(t *testing.T) { - info := define.Require(t, define.Requirements{ - Stack: &define.Stack{}, - Local: false, - Sudo: true, - }) - ctx := context.Background() - - t.Logf("got namespace: %s", info.Namespace) - - agentFixture, err := define.NewFixture(t, define.Version()) - require.NoError(t, err) - - t.Log("Enrolling agent in Fleet with a test policy") - createPolicyReq := kibana.AgentPolicy{ - Name: fmt.Sprintf("test-policy-enroll-%d", time.Now().Unix()), - Namespace: info.Namespace, - Description: "test policy for agent enrollment", - MonitoringEnabled: []kibana.MonitoringEnabledOption{ - kibana.MonitoringEnabledLogs, - kibana.MonitoringEnabledMetrics, - }, - AgentFeatures: []map[string]interface{}{ - { - "name": "test_enroll", - "enabled": true, - }, - }, - } - - // Stage 1: Install - // As part of the cleanup process, we'll uninstall the agent - installOpts := atesting.InstallOpts{ - NonInteractive: true, - Force: true, - } - policy, err := tools.InstallAgentWithPolicy(ctx, t, - installOpts, agentFixture, info.KibanaClient, createPolicyReq) - require.NoError(t, err) - t.Logf("created policy: %s", policy.ID) - - check.ConnectedToFleet(t, agentFixture, 5*time.Minute) - - // Stage 2: check indices - // This is mostly for debugging - resp, err := estools.GetAllindicies(info.ESClient) - require.NoError(t, err) - for _, run := range resp { - t.Logf("%s: %d/%d deleted: %d\n", - run.Index, run.DocsCount, run.StoreSizeBytes, run.DocsDeleted) - } - - // Stage 3: Make sure metricbeat logs are populated - t.Log("Making sure metricbeat logs are populated") - docs := findESDocs(t, func() (estools.Documents, error) { - return estools.GetLogsForDatastream(info.ESClient, "elastic_agent.metricbeat") - }) - t.Logf("metricbeat: Got %d documents", len(docs.Hits.Hits)) - require.NotZero(t, len(docs.Hits.Hits)) - - // Stage 4: make sure all components are healthy - t.Log("Making sure all components are healthy") - status, err := agentFixture.ExecStatus(ctx) - require.NoError(t, err, - "could not get agent status to verify all components are healthy") - for _, c := range status.Components { - assert.Equalf(t, client.Healthy, client.State(c.State), - "component %s: want %s, got %s", - c.Name, client.Healthy, client.State(c.State)) - } - - // Stage 5: Make sure there are no errors in logs - t.Log("Making sure there are no error logs") - docs = findESDocs(t, func() (estools.Documents, error) { - return estools.CheckForErrorsInLogs(info.ESClient, info.Namespace, []string{ - // acceptable error messages (include reason) - "Error dialing dial tcp 127.0.0.1:9200: connect: connection refused", // beat is running default config before its config gets updated - "Global configuration artifact is not available", // Endpoint: failed to load user artifact due to connectivity issues - "Failed to download artifact", - "Failed to initialize artifact", - "Failed to apply initial policy from on disk configuration", - "elastic-agent-client error: rpc error: code = Canceled desc = context canceled", // can happen on restart - }) - }) - t.Logf("errors: Got %d documents", len(docs.Hits.Hits)) - for _, doc := range docs.Hits.Hits { - t.Logf("%#v", doc.Source) - } - require.Empty(t, docs.Hits.Hits) - - // Stage 6: Make sure we have message confirming central management is running - t.Log("Making sure we have message confirming central management is running") - docs = findESDocs(t, func() (estools.Documents, error) { - return estools.FindMatchingLogLines(info.ESClient, info.Namespace, - "Parsed configuration and determined agent is managed by Fleet") - }) - require.NotZero(t, len(docs.Hits.Hits)) - - // Stage 7: verify logs from the monitoring components are not sent to the output - t.Log("Check monitoring logs") - hostname, err := os.Hostname() - if err != nil { - t.Fatalf("could not get hostname to filter Agent: %s", err) - } - - agentID, err := fleettools.GetAgentIDByHostname(info.KibanaClient, policy.ID, hostname) - require.NoError(t, err, "could not get Agent ID by hostname") - t.Logf("Agent ID: %q", agentID) - - // We cannot search for `component.id` because at the moment of writing - // this field is not mapped. There is an issue for that: - // https://github.com/elastic/integrations/issues/6545 - - docs = findESDocs(t, func() (estools.Documents, error) { - return estools.GetLogsForAgentID(info.ESClient, agentID) - }) - require.NoError(t, err, "could not get logs from Agent ID: %q, err: %s", - agentID, err) - - monRegExp := regexp.MustCompile(".*-monitoring$") - for i, d := range docs.Hits.Hits { - // Lazy way to navigate a map[string]any: convert to JSON then - // decode into a struct. - jsonData, err := json.Marshal(d.Source) - if err != nil { - t.Fatalf("could not encode document source as JSON: %s", err) - } - - doc := ESDocument{} - if err := json.Unmarshal(jsonData, &doc); err != nil { - t.Fatalf("could not unmarshal document source: %s", err) - } - - if monRegExp.MatchString(doc.Component.ID) { - t.Errorf("[%d] Document on index %q with 'component.id': %q "+ - "and 'elastic_agent.id': %q. 'elastic_agent.id' must not "+ - "end in '-monitoring'\n", - i, d.Index, doc.Component.ID, doc.ElasticAgent.ID) - } - } -} - -func findESDocs(t *testing.T, findFn func() (estools.Documents, error)) estools.Documents { - var docs estools.Documents - - require.Eventually( - t, - func() bool { - var err error - docs, err = findFn() - return err == nil - }, - 3*time.Minute, - 15*time.Second, - ) - - // TODO: remove after debugging - t.Log("--- debugging: results from ES --- START ---") - for _, doc := range docs.Hits.Hits { - t.Logf("%#v", doc.Source) - } - t.Log("--- debugging: results from ES --- END ---") - - return docs -} - -type ESDocument struct { - ElasticAgent ElasticAgent `json:"elastic_agent"` - Component Component `json:"component"` - Host Host `json:"host"` -} -type ElasticAgent struct { - ID string `json:"id"` - Version string `json:"version"` - Snapshot bool `json:"snapshot"` -} -type Component struct { - Binary string `json:"binary"` - ID string `json:"id"` -} -type Host struct { - Hostname string `json:"hostname"` -}