From 0bf2b2ecad2c66e7e536abbce5657b2596cc804a Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 10 Nov 2021 20:39:34 +0800 Subject: [PATCH 1/2] systemtest: data streams everywhere (#6568) * systemtest: data streams everywhere - Force the use of data streams in system tests. - Remove the onboarding test, which will be irrelevant when we stop indexing these documents. - (Temporarily?) Remove the data stream migration pipeline test. If we add the test back, it will require indexing old events directly in the test, rather than having apm-server do it. - Remove the template coverage test; we now explicitly disable dynamic mapping except for metrics. Maybe this test can come back later, but at the moment it's causing friction. * .ci: docker-compose up first (cherry picked from commit 9acf4ba8d444f2cc982b36911646822952d73663) --- .ci/scripts/linux-test.sh | 3 + systemtest/aggregation_test.go | 10 +- systemtest/apmservertest/config.go | 8 +- systemtest/apmservertest/server.go | 4 + ...json => TestApprovedMetrics.approved.json} | 0 .../TestCompressedSpans.approved.json | 10 ++ .../TestNoMatchingSourcemap.approved.json | 5 + .../TestOTLPGRPCMetrics.approved.json | 4 + ....json => TestOTLPGRPCTraces.approved.json} | 0 .../TestRUMErrorSourcemapping.approved.json | 4 + .../TestRUMSpanSourcemapping.approved.json | 5 + ...son => TestRUMXForwardedFor.approved.json} | 0 ...erviceDestinationAggregation.approved.json | 4 + .../TestTransactionAggregation.approved.json | 8 + ...ansactionAggregationShutdown.approved.json | 4 + ...tionDroppedSpansStatsMetrics.approved.json | 8 + ...DroppedSpansStatsTransaction.approved.json | 4 + systemtest/elasticsearch.go | 78 +------- systemtest/environment_test.go | 2 +- systemtest/errorgrouping_test.go | 2 +- systemtest/helpers_test.go | 44 ----- systemtest/huge_traces_test.go | 6 +- systemtest/ingest_test.go | 72 +------- systemtest/instrumentation_test.go | 6 +- systemtest/jaeger_test.go | 4 +- systemtest/kibana.go | 8 +- systemtest/metrics_test.go | 28 +-- systemtest/onboarding_test.go | 44 ----- systemtest/otlp_test.go | 30 +--- systemtest/rum_test.go | 8 +- systemtest/sampling_test.go | 9 +- systemtest/sourcemap_test.go | 18 +- systemtest/template_test.go | 167 ------------------ 33 files changed, 131 insertions(+), 476 deletions(-) rename systemtest/approvals/{TestApprovedMetrics/data_streams_enabled.approved.json => TestApprovedMetrics.approved.json} (100%) rename systemtest/approvals/{TestOTLPGRPCTraces/data_streams_enabled.approved.json => TestOTLPGRPCTraces.approved.json} (100%) rename systemtest/approvals/{TestRUMXForwardedFor/data_streams_enabled.approved.json => TestRUMXForwardedFor.approved.json} (100%) delete mode 100644 systemtest/helpers_test.go delete mode 100644 systemtest/onboarding_test.go delete mode 100644 systemtest/template_test.go diff --git a/.ci/scripts/linux-test.sh b/.ci/scripts/linux-test.sh index 4f2132ed856..a861d94419a 100755 --- a/.ci/scripts/linux-test.sh +++ b/.ci/scripts/linux-test.sh @@ -13,6 +13,9 @@ trap cleanup EXIT make update apm-server +# Start docker-compose environment first, so it doesn't count towards the test timeout. +docker-compose up -d + SYSTEM_TESTS_XUNIT_PATH="$(pwd)/build" # TODO(axw) make this a Makefile target # TODO(mdelapenya) meanwhile there is no 'DefaultGoTestSystemArgs' at beats' GoTest implementation, this command is reproducing what Beats should provide: a gotestsum representation for system tests. diff --git a/systemtest/aggregation_test.go b/systemtest/aggregation_test.go index 8a248ff70e3..3de75a9250a 100644 --- a/systemtest/aggregation_test.go +++ b/systemtest/aggregation_test.go @@ -66,7 +66,7 @@ func TestTransactionAggregation(t *testing.T) { } tracer.Flush(nil) - result := systemtest.Elasticsearch.ExpectMinDocs(t, 2, "apm-*", + result := systemtest.Elasticsearch.ExpectMinDocs(t, 2, "metrics-apm.internal-*", estest.ExistsQuery{Field: "transaction.duration.histogram"}, ) systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits) @@ -79,7 +79,7 @@ func TestTransactionAggregation(t *testing.T) { // the appropriate per-bucket doc_count values. result = estest.SearchResult{} _, err = systemtest.Elasticsearch.Do(context.Background(), &esapi.SearchRequest{ - Index: []string{"apm-*"}, + Index: []string{"metrics-apm.internal-*"}, Body: strings.NewReader(`{ "size": 0, "query": {"exists":{"field":"transaction.duration.histogram"}}, @@ -135,14 +135,14 @@ func TestTransactionAggregationShutdown(t *testing.T) { // Wait for the transaction to be indexed, indicating that Elasticsearch // indices have been setup and we should not risk triggering the shutdown // timeout while waiting for the aggregated metrics to be indexed. - systemtest.Elasticsearch.ExpectDocs(t, "apm-*", + systemtest.Elasticsearch.ExpectDocs(t, "traces-apm*", estest.TermQuery{Field: "processor.event", Value: "transaction"}, ) // Stop server to ensure metrics are flushed on shutdown. assert.NoError(t, srv.Close()) - result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*", + result := systemtest.Elasticsearch.ExpectDocs(t, "metrics-apm.internal-*", estest.ExistsQuery{Field: "transaction.duration.histogram"}, ) systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits) @@ -175,7 +175,7 @@ func TestServiceDestinationAggregation(t *testing.T) { tx.End() tracer.Flush(nil) - result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*", + result := systemtest.Elasticsearch.ExpectDocs(t, "metrics-apm.internal-*", estest.ExistsQuery{Field: "span.destination.service.response_time.count"}, ) systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits) diff --git a/systemtest/apmservertest/config.go b/systemtest/apmservertest/config.go index cb62f536211..70126cc1b7a 100644 --- a/systemtest/apmservertest/config.go +++ b/systemtest/apmservertest/config.go @@ -46,7 +46,7 @@ type Config struct { Aggregation *AggregationConfig `json:"apm-server.aggregation,omitempty"` Sampling *SamplingConfig `json:"apm-server.sampling,omitempty"` RUM *RUMConfig `json:"apm-server.rum,omitempty"` - DataStreams *DataStreamsConfig `json:"apm-server.data_streams,omitempty"` + WaitForIntegration *bool `json:"apm-server.data_streams.wait_for_integration,omitempty"` DefaultServiceEnvironment string `json:"apm-server.default_service_environment,omitempty"` KibanaAgentConfig *KibanaAgentConfig `json:"apm-server.agent.config,omitempty"` TLS *TLSConfig `json:"apm-server.ssl,omitempty"` @@ -198,12 +198,6 @@ type RUMSourcemapCacheConfig struct { Expiration time.Duration `json:"expiration,omitempty"` } -// DataStreamsConfig holds APM Server data streams configuration. -type DataStreamsConfig struct { - Enabled bool `json:"enabled"` - WaitForIntegration *bool `json:"wait_for_integration,omitempty"` -} - // APIKeyConfig holds agent auth configuration. type AgentAuthConfig struct { SecretToken string `json:"secret_token,omitempty"` diff --git a/systemtest/apmservertest/server.go b/systemtest/apmservertest/server.go index 9dc8813ff4d..8d9ce615eaa 100644 --- a/systemtest/apmservertest/server.go +++ b/systemtest/apmservertest/server.go @@ -144,6 +144,10 @@ func (s *Server) start(tls bool) error { "logging.to_stderr": true, "apm-server.expvar.enabled": true, "apm-server.host": "127.0.0.1:0", + + // TODO(axw) remove this when we switch over to data streams + // as the default and only indexing method. + "apm-server.data_streams.enabled": true, } if tls { certPath, keyPath, caCertPath, err := s.initTLS() diff --git a/systemtest/approvals/TestApprovedMetrics/data_streams_enabled.approved.json b/systemtest/approvals/TestApprovedMetrics.approved.json similarity index 100% rename from systemtest/approvals/TestApprovedMetrics/data_streams_enabled.approved.json rename to systemtest/approvals/TestApprovedMetrics.approved.json diff --git a/systemtest/approvals/TestCompressedSpans.approved.json b/systemtest/approvals/TestCompressedSpans.approved.json index e4ef26ecd5d..3a7c1c0f127 100644 --- a/systemtest/approvals/TestCompressedSpans.approved.json +++ b/systemtest/approvals/TestCompressedSpans.approved.json @@ -6,10 +6,15 @@ "name": "go", "version": "0.0.0" }, + "data_stream.dataset": "apm", + "data_stream.namespace": "default", + "data_stream.type": "traces", "ecs": { "version": "dynamic" }, "event": { + "agent_id_status": "missing", + "ingested": "dynamic", "outcome": "success" }, "observer": { @@ -68,10 +73,15 @@ "name": "go", "version": "0.0.0" }, + "data_stream.dataset": "apm", + "data_stream.namespace": "default", + "data_stream.type": "traces", "ecs": { "version": "dynamic" }, "event": { + "agent_id_status": "missing", + "ingested": "dynamic", "outcome": "success" }, "observer": { diff --git a/systemtest/approvals/TestNoMatchingSourcemap.approved.json b/systemtest/approvals/TestNoMatchingSourcemap.approved.json index ff802017a04..da4c1788495 100644 --- a/systemtest/approvals/TestNoMatchingSourcemap.approved.json +++ b/systemtest/approvals/TestNoMatchingSourcemap.approved.json @@ -9,10 +9,15 @@ "client": { "ip": "127.0.0.1" }, + "data_stream.dataset": "apm.rum", + "data_stream.namespace": "default", + "data_stream.type": "traces", "ecs": { "version": "dynamic" }, "event": { + "agent_id_status": "missing", + "ingested": "dynamic", "outcome": "unknown" }, "observer": { diff --git a/systemtest/approvals/TestOTLPGRPCMetrics.approved.json b/systemtest/approvals/TestOTLPGRPCMetrics.approved.json index bcfe3c6da0a..2577fbb3672 100644 --- a/systemtest/approvals/TestOTLPGRPCMetrics.approved.json +++ b/systemtest/approvals/TestOTLPGRPCMetrics.approved.json @@ -6,10 +6,14 @@ "name": "opentelemetry/go", "version": "1.0.0" }, + "data_stream.dataset": "apm.app.unknown_service_systemtest_test", + "data_stream.namespace": "default", + "data_stream.type": "metrics", "ecs": { "version": "dynamic" }, "event": { + "agent_id_status": "missing", "ingested": "dynamic" }, "float64_counter": 1, diff --git a/systemtest/approvals/TestOTLPGRPCTraces/data_streams_enabled.approved.json b/systemtest/approvals/TestOTLPGRPCTraces.approved.json similarity index 100% rename from systemtest/approvals/TestOTLPGRPCTraces/data_streams_enabled.approved.json rename to systemtest/approvals/TestOTLPGRPCTraces.approved.json diff --git a/systemtest/approvals/TestRUMErrorSourcemapping.approved.json b/systemtest/approvals/TestRUMErrorSourcemapping.approved.json index 2928f4b2dcb..c8fdda071d1 100644 --- a/systemtest/approvals/TestRUMErrorSourcemapping.approved.json +++ b/systemtest/approvals/TestRUMErrorSourcemapping.approved.json @@ -9,6 +9,9 @@ "client": { "ip": "127.0.0.1" }, + "data_stream.dataset": "apm.error", + "data_stream.namespace": "default", + "data_stream.type": "logs", "ecs": { "version": "dynamic" }, @@ -252,6 +255,7 @@ } }, "event": { + "agent_id_status": "missing", "ingested": "dynamic" }, "http": { diff --git a/systemtest/approvals/TestRUMSpanSourcemapping.approved.json b/systemtest/approvals/TestRUMSpanSourcemapping.approved.json index 090352b6b9c..98dafc57ea3 100644 --- a/systemtest/approvals/TestRUMSpanSourcemapping.approved.json +++ b/systemtest/approvals/TestRUMSpanSourcemapping.approved.json @@ -9,10 +9,15 @@ "client": { "ip": "127.0.0.1" }, + "data_stream.dataset": "apm.rum", + "data_stream.namespace": "default", + "data_stream.type": "traces", "ecs": { "version": "dynamic" }, "event": { + "agent_id_status": "missing", + "ingested": "dynamic", "outcome": "unknown" }, "observer": { diff --git a/systemtest/approvals/TestRUMXForwardedFor/data_streams_enabled.approved.json b/systemtest/approvals/TestRUMXForwardedFor.approved.json similarity index 100% rename from systemtest/approvals/TestRUMXForwardedFor/data_streams_enabled.approved.json rename to systemtest/approvals/TestRUMXForwardedFor.approved.json diff --git a/systemtest/approvals/TestServiceDestinationAggregation.approved.json b/systemtest/approvals/TestServiceDestinationAggregation.approved.json index 29cd7fc07db..f108a191c3b 100644 --- a/systemtest/approvals/TestServiceDestinationAggregation.approved.json +++ b/systemtest/approvals/TestServiceDestinationAggregation.approved.json @@ -5,10 +5,14 @@ "agent": { "name": "go" }, + "data_stream.dataset": "apm.internal", + "data_stream.namespace": "default", + "data_stream.type": "metrics", "ecs": { "version": "dynamic" }, "event": { + "agent_id_status": "missing", "ingested": "dynamic", "outcome": "unknown" }, diff --git a/systemtest/approvals/TestTransactionAggregation.approved.json b/systemtest/approvals/TestTransactionAggregation.approved.json index 685a9c9b911..d6be2d7dfb4 100644 --- a/systemtest/approvals/TestTransactionAggregation.approved.json +++ b/systemtest/approvals/TestTransactionAggregation.approved.json @@ -6,10 +6,14 @@ "agent": { "name": "go" }, + "data_stream.dataset": "apm.internal", + "data_stream.namespace": "default", + "data_stream.type": "metrics", "ecs": { "version": "dynamic" }, "event": { + "agent_id_status": "missing", "ingested": "dynamic", "outcome": "unknown" }, @@ -59,10 +63,14 @@ "agent": { "name": "go" }, + "data_stream.dataset": "apm.internal", + "data_stream.namespace": "default", + "data_stream.type": "metrics", "ecs": { "version": "dynamic" }, "event": { + "agent_id_status": "missing", "ingested": "dynamic", "outcome": "unknown" }, diff --git a/systemtest/approvals/TestTransactionAggregationShutdown.approved.json b/systemtest/approvals/TestTransactionAggregationShutdown.approved.json index dc7b8598245..6a967370d5d 100644 --- a/systemtest/approvals/TestTransactionAggregationShutdown.approved.json +++ b/systemtest/approvals/TestTransactionAggregationShutdown.approved.json @@ -6,10 +6,14 @@ "agent": { "name": "go" }, + "data_stream.dataset": "apm.internal", + "data_stream.namespace": "default", + "data_stream.type": "metrics", "ecs": { "version": "dynamic" }, "event": { + "agent_id_status": "missing", "ingested": "dynamic", "outcome": "unknown" }, diff --git a/systemtest/approvals/TestTransactionDroppedSpansStatsMetrics.approved.json b/systemtest/approvals/TestTransactionDroppedSpansStatsMetrics.approved.json index 56015cf2e2f..31914dc83f2 100644 --- a/systemtest/approvals/TestTransactionDroppedSpansStatsMetrics.approved.json +++ b/systemtest/approvals/TestTransactionDroppedSpansStatsMetrics.approved.json @@ -5,10 +5,14 @@ "agent": { "name": "go" }, + "data_stream.dataset": "apm.internal", + "data_stream.namespace": "default", + "data_stream.type": "metrics", "ecs": { "version": "dynamic" }, "event": { + "agent_id_status": "missing", "ingested": "dynamic", "outcome": "success" }, @@ -45,10 +49,14 @@ "agent": { "name": "go" }, + "data_stream.dataset": "apm.internal", + "data_stream.namespace": "default", + "data_stream.type": "metrics", "ecs": { "version": "dynamic" }, "event": { + "agent_id_status": "missing", "ingested": "dynamic", "outcome": "success" }, diff --git a/systemtest/approvals/TestTransactionDroppedSpansStatsTransaction.approved.json b/systemtest/approvals/TestTransactionDroppedSpansStatsTransaction.approved.json index 663a7b5ac3c..cbf5c3d4637 100644 --- a/systemtest/approvals/TestTransactionDroppedSpansStatsTransaction.approved.json +++ b/systemtest/approvals/TestTransactionDroppedSpansStatsTransaction.approved.json @@ -6,10 +6,14 @@ "name": "go", "version": "0.0.0" }, + "data_stream.dataset": "apm", + "data_stream.namespace": "default", + "data_stream.type": "traces", "ecs": { "version": "dynamic" }, "event": { + "agent_id_status": "missing", "ingested": "dynamic", "outcome": "success" }, diff --git a/systemtest/elasticsearch.go b/systemtest/elasticsearch.go index 472dece5ee7..995c9ec2f96 100644 --- a/systemtest/elasticsearch.go +++ b/systemtest/elasticsearch.go @@ -19,12 +19,11 @@ package systemtest import ( "context" - "net/http" "net/url" "testing" "time" - "golang.org/x/sync/errgroup" + "github.com/stretchr/testify/require" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esapi" @@ -87,75 +86,14 @@ func newElasticsearchConfig() elasticsearch.Config { } } -// CleanupElasticsearch deletes all indices, index templates, -// and ingest node pipelines whose names start with "apm", -// and deletes the default ILM policy "apm-rollover-30-days". +// CleanupElasticsearch deletes all data streams created by APM Server. func CleanupElasticsearch(t testing.TB) { - const ( - legacyPrefix = "apm*" // Not "apm-*", as that would not capture the "apm" ingest pipeline. - apmTracesPrefix = "traces-apm*" - apmMetricsPrefix = "metrics-apm*" - apmLogsPrefix = "logs-apm*" - ) - - doReq := func(req estest.Request) error { - _, err := Elasticsearch.Do(context.Background(), req, nil) - if err, ok := err.(*estest.Error); ok && err.StatusCode == http.StatusNotFound { - return nil - } - return err - } - - doParallel := func(requests ...estest.Request) { - t.Helper() - var g errgroup.Group - for _, req := range requests { - req := req // copy for closure - g.Go(func() error { return doReq(req) }) - } - if err := g.Wait(); err != nil { - t.Fatal(err) - } - } - - // Delete indices, data streams, and ingest pipelines. - if err := doReq(esapi.IndicesDeleteRequest{Index: []string{ - legacyPrefix, - // traces-apm*, metrics-apm*, and logs-apm* could get created - // as indices instead of data streams in some tests, so issue - // index delete requests for those too. - apmTracesPrefix, - apmMetricsPrefix, - apmLogsPrefix, - }}); err != nil { - t.Fatal(err) - } - doParallel( - esapi.IndicesDeleteDataStreamRequest{Name: []string{ - legacyPrefix, - apmTracesPrefix, - apmMetricsPrefix, - apmLogsPrefix, - }}, - esapi.IngestDeletePipelineRequest{PipelineID: legacyPrefix}, - ) - - // Delete index templates after deleting data streams. - if err := doReq(esapi.IndicesDeleteTemplateRequest{Name: legacyPrefix}); err != nil { - t.Fatal(err) - } - - // Delete the ILM policy last or we'll get an error due to it being in use. - for { - err := doReq(esapi.ILMDeleteLifecycleRequest{Policy: "apm-rollover-30-days"}) - if err == nil { - break - } - // Retry deleting, in case indices are still being deleted. - const delay = 100 * time.Millisecond - t.Logf("failed to delete ILM policy (retrying in %s): %s", delay, err) - time.Sleep(delay) - } + _, err := Elasticsearch.Do(context.Background(), &esapi.IndicesDeleteDataStreamRequest{Name: []string{ + "traces-apm*", + "metrics-apm*", + "logs-apm*", + }}, nil) + require.NoError(t, err) } // ChangeUserPassword changes the password for a given user. diff --git a/systemtest/environment_test.go b/systemtest/environment_test.go index 5bc4ce4a0a7..0e6d70d335d 100644 --- a/systemtest/environment_test.go +++ b/systemtest/environment_test.go @@ -49,7 +49,7 @@ func TestDefaultServiceEnvironment(t *testing.T) { tracerSpecifiedEnvironment.StartTransaction("specified_environment", "type").End() tracerSpecifiedEnvironment.Flush(nil) - result := systemtest.Elasticsearch.ExpectMinDocs(t, 2, "apm-*", + result := systemtest.Elasticsearch.ExpectMinDocs(t, 2, "traces-apm*", estest.TermQuery{Field: "processor.event", Value: "transaction"}, ) environments := make(map[string]string) diff --git a/systemtest/errorgrouping_test.go b/systemtest/errorgrouping_test.go index 88b88ce7948..255b6eae1d0 100644 --- a/systemtest/errorgrouping_test.go +++ b/systemtest/errorgrouping_test.go @@ -40,7 +40,7 @@ func TestErrorGroupingName(t *testing.T) { tracer.NewErrorLog(apm.ErrorLogRecord{Message: "log_message_overrides", Error: errors.New("exception_message_overridden")}).Send() tracer.Flush(nil) - result := systemtest.Elasticsearch.ExpectMinDocs(t, 3, "apm-*", estest.TermQuery{ + result := systemtest.Elasticsearch.ExpectMinDocs(t, 3, "logs-apm.error-*", estest.TermQuery{ Field: "processor.event", Value: "error", }) diff --git a/systemtest/helpers_test.go b/systemtest/helpers_test.go deleted file mode 100644 index 4c4a6a9ff33..00000000000 --- a/systemtest/helpers_test.go +++ /dev/null @@ -1,44 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package systemtest_test - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "github.com/elastic/apm-server/systemtest" - "github.com/elastic/apm-server/systemtest/apmservertest" -) - -// withDataStreams runs two sub-tests, calling f with and without data streams enabled. -func withDataStreams(t *testing.T, f func(t *testing.T, unstartedServer *apmservertest.Server)) { - t.Run("data_streams_disabled", func(t *testing.T) { - systemtest.CleanupElasticsearch(t) - srv := apmservertest.NewUnstartedServer(t) - f(t, srv) - }) - t.Run("data_streams_enabled", func(t *testing.T) { - systemtest.CleanupElasticsearch(t) - err := systemtest.Fleet.InstallPackage(systemtest.IntegrationPackage.Name, systemtest.IntegrationPackage.Version) - require.NoError(t, err) - srv := apmservertest.NewUnstartedServer(t) - srv.Config.DataStreams = &apmservertest.DataStreamsConfig{Enabled: true} - f(t, srv) - }) -} diff --git a/systemtest/huge_traces_test.go b/systemtest/huge_traces_test.go index baec5147f65..ef66334e825 100644 --- a/systemtest/huge_traces_test.go +++ b/systemtest/huge_traces_test.go @@ -67,12 +67,12 @@ func TestTransactionDroppedSpansStats(t *testing.T) { tx.End() tracer.Flush(nil) - metricsResult := systemtest.Elasticsearch.ExpectMinDocs(t, 2, "apm*metric", + metricsResult := systemtest.Elasticsearch.ExpectMinDocs(t, 2, "metrics-apm.internal-*", estest.TermQuery{Field: "metricset.name", Value: "service_destination"}, ) systemtest.ApproveEvents(t, t.Name()+"Metrics", metricsResult.Hits.Hits, "@timestamp") - txResult := systemtest.Elasticsearch.ExpectDocs(t, "apm*transaction", + txResult := systemtest.Elasticsearch.ExpectDocs(t, "traces-apm-*", estest.TermQuery{Field: "transaction.id", Value: tx.TraceContext().Span.String()}, ) systemtest.ApproveEvents(t, t.Name()+"Transaction", txResult.Hits.Hits, @@ -147,7 +147,7 @@ func TestCompressedSpans(t *testing.T) { tx.End() tracer.Flush(nil) - spanResults := systemtest.Elasticsearch.ExpectMinDocs(t, 2, "apm*span", + spanResults := systemtest.Elasticsearch.ExpectMinDocs(t, 2, "traces-apm-*", estest.TermQuery{Field: "span.type", Value: "db"}, ) systemtest.ApproveEvents(t, t.Name(), spanResults.Hits.Hits) diff --git a/systemtest/ingest_test.go b/systemtest/ingest_test.go index 8ceab7eb0d4..fa8807f1e45 100644 --- a/systemtest/ingest_test.go +++ b/systemtest/ingest_test.go @@ -18,8 +18,6 @@ package systemtest_test import ( - "context" - "errors" "net/http" "net/url" "testing" @@ -28,11 +26,6 @@ import ( "github.com/stretchr/testify/require" "github.com/tidwall/gjson" - "go.elastic.co/apm" - - "github.com/elastic/go-elasticsearch/v8/esapi" - "github.com/elastic/go-elasticsearch/v8/esutil" - "github.com/elastic/apm-server/systemtest" "github.com/elastic/apm-server/systemtest/apmservertest" "github.com/elastic/apm-server/systemtest/estest" @@ -65,7 +58,7 @@ func TestIngestPipelinePipeline(t *testing.T) { tracer.Flush(nil) getDoc := func(query estest.TermQuery) estest.SearchHit { - result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*", query) + result := systemtest.Elasticsearch.ExpectDocs(t, "traces-apm*", query) require.Len(t, result.Hits.Hits, 1) return result.Hits.Hits[0] } @@ -83,66 +76,3 @@ func TestIngestPipelinePipeline(t *testing.T) { destinationIP = gjson.GetBytes(span2Doc.RawSource, "destination.ip") assert.False(t, destinationIP.Exists()) // destination.address is not an IP } - -func TestDataStreamMigrationIngestPipeline(t *testing.T) { - systemtest.CleanupElasticsearch(t) - srv := apmservertest.NewServer(t) - - // Send a transaction, span, error, and metrics. - tracer := srv.Tracer() - tracer.RegisterMetricsGatherer(apm.GatherMetricsFunc(func(ctx context.Context, m *apm.Metrics) error { - m.Add("custom_metric", nil, 123) - return nil - })) - tx := tracer.StartTransaction("name", "type") - span := tx.StartSpan("name", "type", nil) - tracer.NewError(errors.New("boom")).Send() - span.End() - tx.End() - tracer.Flush(nil) - tracer.SendMetrics(nil) - - // We expect at least 6 events: - // - onboarding - // - transaction - // - span - // - error - // - internal metricset - // - app metricset - for _, query := range []interface{}{ - estest.TermQuery{Field: "processor.event", Value: "onboarding"}, - estest.TermQuery{Field: "processor.event", Value: "transaction"}, - estest.TermQuery{Field: "processor.event", Value: "span"}, - estest.TermQuery{Field: "processor.event", Value: "error"}, - estest.TermQuery{Field: "metricset.name", Value: "span_breakdown"}, - estest.TermQuery{Field: "metricset.name", Value: "app"}, - } { - systemtest.Elasticsearch.ExpectDocs(t, "apm-*", query) - } - - refresh := true - _, err := systemtest.Elasticsearch.Do(context.Background(), &esapi.ReindexRequest{ - Refresh: &refresh, - Body: esutil.NewJSONReader(map[string]interface{}{ - "source": map[string]interface{}{ - "index": "apm-*", - }, - "dest": map[string]interface{}{ - "index": "apm-migration", - "pipeline": "apm_data_stream_migration", - "op_type": "create", - }, - }), - }, nil) - require.NoError(t, err) - - // There should only be an onboarding doc in "apm-migration". - result := systemtest.Elasticsearch.ExpectDocs(t, "apm-migration", nil) - require.Len(t, result.Hits.Hits, 1) - assert.Equal(t, "onboarding", gjson.GetBytes(result.Hits.Hits[0].RawSource, "processor.event").String()) - - systemtest.Elasticsearch.ExpectMinDocs(t, 2, "traces-apm-migrated", nil) // transaction, span - systemtest.Elasticsearch.ExpectMinDocs(t, 1, "logs-apm.error-migrated", nil) - systemtest.Elasticsearch.ExpectMinDocs(t, 1, "metrics-apm.internal-migrated", nil) - systemtest.Elasticsearch.ExpectMinDocs(t, 1, "metrics-apm.app.systemtest-migrated", nil) -} diff --git a/systemtest/instrumentation_test.go b/systemtest/instrumentation_test.go index 05ab031b2c3..cf9d1c5ff5a 100644 --- a/systemtest/instrumentation_test.go +++ b/systemtest/instrumentation_test.go @@ -50,7 +50,7 @@ func TestAPMServerInstrumentation(t *testing.T) { tracer.StartTransaction("name", "type").End() tracer.Flush(nil) - result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.BoolQuery{ + result := systemtest.Elasticsearch.ExpectDocs(t, "traces-apm*", estest.BoolQuery{ Filter: []interface{}{ estest.TermQuery{ Field: "processor.event", @@ -148,7 +148,7 @@ func TestAPMServerInstrumentationAuth(t *testing.T) { tracer.StartTransaction("name", "type").End() tracer.Flush(nil) - systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.BoolQuery{ + systemtest.Elasticsearch.ExpectDocs(t, "traces-apm*", estest.BoolQuery{ Filter: []interface{}{ estest.TermQuery{ Field: "processor.event", @@ -201,7 +201,7 @@ func TestAPMServerProfiling(t *testing.T) { } tracer.Flush(nil) - result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.TermQuery{ + result := systemtest.Elasticsearch.ExpectDocs(t, "metrics-apm.profiling*", estest.TermQuery{ Field: "processor.event", Value: "profile", }) diff --git a/systemtest/jaeger_test.go b/systemtest/jaeger_test.go index 7ca07daa134..94584435aa0 100644 --- a/systemtest/jaeger_test.go +++ b/systemtest/jaeger_test.go @@ -70,7 +70,7 @@ func testJaegerGRPC(t *testing.T, srv *apmservertest.Server, addr string, dialOp doc := getBeatsMonitoringStats(t, srv, nil) assert.Equal(t, int64(1), gjson.GetBytes(doc.RawSource, "beats_stats.metrics.apm-server.jaeger.grpc.collect.request.count").Int()) - systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.BoolQuery{Filter: []interface{}{ + systemtest.Elasticsearch.ExpectDocs(t, "traces-apm*", estest.BoolQuery{Filter: []interface{}{ estest.TermQuery{Field: "processor.event", Value: "transaction"}, }}) @@ -124,7 +124,7 @@ func TestJaegerGRPCAuth(t *testing.T) { _, err = client.PostSpans(context.Background(), request) require.NoError(t, err) - systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.BoolQuery{Filter: []interface{}{ + systemtest.Elasticsearch.ExpectDocs(t, "traces-apm*", estest.BoolQuery{Filter: []interface{}{ estest.TermQuery{Field: "processor.event", Value: "transaction"}, }}) } diff --git a/systemtest/kibana.go b/systemtest/kibana.go index 37d789d814c..6b483590dd1 100644 --- a/systemtest/kibana.go +++ b/systemtest/kibana.go @@ -69,9 +69,9 @@ func init() { } // InitFleet ensures Fleet is set up, destroys any existing agent policies previously -// created by the system tests and unenrolls the associated agents, and uninstalls the -// integration package if it is installed. After InitFleet returns successfully, the -// IntegrationPackage var will be initialised. +// created by the system tests and unenrolls the associated agents, uninstalls the +// integration package if it is installed, and finally installs the integration pacakge. +// After InitFleet returns successfully, the IntegrationPackage var will be initialised. func InitFleet() error { if err := Fleet.Setup(); err != nil { log.Fatal(err) @@ -115,7 +115,7 @@ func InitFleet() error { if IntegrationPackage == nil { return errors.New("could not find package 'apm'") } - return nil + return Fleet.InstallPackage(IntegrationPackage.Name, IntegrationPackage.Version) } // CreateAgentPolicy creates an Agent policy with the given name and namespace, diff --git a/systemtest/metrics_test.go b/systemtest/metrics_test.go index 56a1152d928..052bb397178 100644 --- a/systemtest/metrics_test.go +++ b/systemtest/metrics_test.go @@ -39,13 +39,8 @@ import ( ) func TestApprovedMetrics(t *testing.T) { - withDataStreams(t, testApprovedMetrics) -} - -func testApprovedMetrics(t *testing.T, srv *apmservertest.Server) { - err := srv.Start() - require.NoError(t, err) - + systemtest.CleanupElasticsearch(t) + srv := apmservertest.NewServer(t) eventsPayload, err := ioutil.ReadFile("../testdata/intake-v2/metricsets.ndjson") require.NoError(t, err) @@ -62,10 +57,15 @@ func testApprovedMetrics(t *testing.T, srv *apmservertest.Server) { assert.NoError(t, err) // Check the metrics documents are exactly as we expect. - indices := []string{"apm-*", "metrics-apm.*"} - result := systemtest.Elasticsearch.ExpectMinDocs(t, ingestResult.Accepted, strings.Join(indices, ","), estest.TermQuery{ - Field: "processor.event", - Value: "metric", + indices := []string{"metrics-apm*"} + result := systemtest.Elasticsearch.ExpectMinDocs(t, ingestResult.Accepted, strings.Join(indices, ","), estest.BoolQuery{ + Filter: []interface{}{ + estest.TermQuery{Field: "processor.event", Value: "metric"}, + }, + MustNot: []interface{}{ + // Ignore server-produced transaction metrics; we're only interested in the metrics sent by the agent. + estest.TermQuery{Field: "metricset.name", Value: "transaction"}, + }, }) systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits) @@ -97,7 +97,7 @@ func TestBreakdownMetrics(t *testing.T) { tracer.SendMetrics(nil) tracer.Flush(nil) - result := systemtest.Elasticsearch.ExpectMinDocs(t, 2, "apm-*", estest.BoolQuery{ + result := systemtest.Elasticsearch.ExpectMinDocs(t, 2, "metrics-apm.internal-*", estest.BoolQuery{ Filter: []interface{}{ estest.TermQuery{ Field: "processor.event", @@ -135,7 +135,7 @@ func TestApplicationMetrics(t *testing.T) { tracer.SendMetrics(nil) tracer.Flush(nil) - result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.TermQuery{ + result := systemtest.Elasticsearch.ExpectDocs(t, "metrics-apm.app.*", estest.TermQuery{ Field: "metricset.name", Value: "app", }) @@ -164,7 +164,7 @@ func TestApplicationMetrics(t *testing.T) { // Check that the index mapping has been updated for the custom // metrics, with the expected dynamically mapped field types. - mappings := getFieldMappings(t, []string{"apm-*"}, []string{"a.b.c", "x.y.z"}) + mappings := getFieldMappings(t, []string{"metrics-apm.app.*"}, []string{"a.b.c", "x.y.z"}) assert.Equal(t, map[string]interface{}{ "a.b.c": map[string]interface{}{ "full_name": "a.b.c", diff --git a/systemtest/onboarding_test.go b/systemtest/onboarding_test.go deleted file mode 100644 index 5d88b168018..00000000000 --- a/systemtest/onboarding_test.go +++ /dev/null @@ -1,44 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package systemtest_test - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/elastic/apm-server/systemtest" - "github.com/elastic/apm-server/systemtest/apmservertest" - "github.com/elastic/apm-server/systemtest/estest" -) - -func TestAPMServerOnboarding(t *testing.T) { - systemtest.CleanupElasticsearch(t) - srv := apmservertest.NewServer(t) - - result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.TermQuery{ - Field: "processor.event", - Value: "onboarding", - }) - - require.Len(t, result.Hits.Hits, 1) - expvar := srv.GetExpvar() - observer := result.Hits.Hits[0].Source["observer"].(map[string]interface{}) - assert.Equal(t, expvar.Vars["beat.info.ephemeral_id"], observer["ephemeral_id"]) -} diff --git a/systemtest/otlp_test.go b/systemtest/otlp_test.go index 23b3fb23677..347e0d68315 100644 --- a/systemtest/otlp_test.go +++ b/systemtest/otlp_test.go @@ -66,11 +66,8 @@ func init() { } func TestOTLPGRPCTraces(t *testing.T) { - withDataStreams(t, testOTLPGRPCTraces) -} - -func testOTLPGRPCTraces(t *testing.T, srv *apmservertest.Server) { - srv.Start() + systemtest.CleanupElasticsearch(t) + srv := apmservertest.NewServer(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -90,13 +87,8 @@ func testOTLPGRPCTraces(t *testing.T, srv *apmservertest.Server) { }) require.NoError(t, err) - expectMin := 1 - if srv.Config.DataStreams != nil && srv.Config.DataStreams.Enabled { - expectMin++ // span events only indexed into data streams - } - - indices := "apm-*,traces-apm*,logs-apm*" - result := systemtest.Elasticsearch.ExpectMinDocs(t, expectMin, indices, estest.BoolQuery{ + indices := "traces-apm*,logs-apm*" + result := systemtest.Elasticsearch.ExpectMinDocs(t, 2, indices, estest.BoolQuery{ Should: []interface{}{ estest.TermQuery{Field: "processor.event", Value: "transaction"}, estest.TermQuery{Field: "processor.event", Value: "log"}, @@ -104,12 +96,6 @@ func testOTLPGRPCTraces(t *testing.T, srv *apmservertest.Server) { MinimumShouldMatch: 1, }) systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits) - - // Ensure that the log event was filtered by libbeat. - if srv.Config.DataStreams == nil || !srv.Config.DataStreams.Enabled { - filtered := srv.GetExpvar().Vars["libbeat.pipeline.events.filtered"].(float64) - assert.Equal(t, float64(1), filtered, "libbeat pipeline processor should have filtered the span log event") - } } func TestOTLPGRPCMetrics(t *testing.T) { @@ -134,7 +120,7 @@ func TestOTLPGRPCMetrics(t *testing.T) { }) require.NoError(t, err) - result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.BoolQuery{Filter: []interface{}{ + result := systemtest.Elasticsearch.ExpectDocs(t, "metrics-apm.app.*", estest.BoolQuery{Filter: []interface{}{ estest.TermQuery{Field: "processor.event", Value: "metric"}, }}) systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits, "@timestamp") @@ -162,7 +148,7 @@ func TestOTLPGRPCAuth(t *testing.T) { "Authorization": "Bearer abc123", })))) require.NoError(t, err) - systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.BoolQuery{Filter: []interface{}{ + systemtest.Elasticsearch.ExpectDocs(t, "traces-apm*", estest.BoolQuery{Filter: []interface{}{ estest.TermQuery{Field: "processor.event", Value: "transaction"}, }}) } @@ -193,13 +179,13 @@ func TestOTLPClientIP(t *testing.T) { require.NoError(t, err) // Non-iOS agent documents should have no client.ip field set. - result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.TermQuery{ + result := systemtest.Elasticsearch.ExpectDocs(t, "traces-apm*", estest.TermQuery{ Field: "service.name", Value: "service1", }) assert.False(t, gjson.GetBytes(result.Hits.Hits[0].RawSource, "client.ip").Exists()) // iOS agent documents should have a client.ip field set. - result = systemtest.Elasticsearch.ExpectDocs(t, "apm-*", estest.TermQuery{ + result = systemtest.Elasticsearch.ExpectDocs(t, "traces-apm*", estest.TermQuery{ Field: "service.name", Value: "service2", }) assert.True(t, gjson.GetBytes(result.Hits.Hits[0].RawSource, "client.ip").Exists()) diff --git a/systemtest/rum_test.go b/systemtest/rum_test.go index fdbdd3d9f6e..3e8ae2e0bb5 100644 --- a/systemtest/rum_test.go +++ b/systemtest/rum_test.go @@ -38,10 +38,8 @@ import ( ) func TestRUMXForwardedFor(t *testing.T) { - withDataStreams(t, testRUMXForwardedFor) -} - -func testRUMXForwardedFor(t *testing.T, srv *apmservertest.Server) { + systemtest.CleanupElasticsearch(t) + srv := apmservertest.NewUnstartedServer(t) srv.Config.RUM = &apmservertest.RUMConfig{Enabled: true} err := srv.Start() require.NoError(t, err) @@ -67,7 +65,7 @@ func testRUMXForwardedFor(t *testing.T, srv *apmservertest.Server) { io.Copy(ioutil.Discard, resp.Body) resp.Body.Close() - result := systemtest.Elasticsearch.ExpectMinDocs(t, 2, "apm-*,traces-apm*,metrics-apm*", estest.TermsQuery{ + result := systemtest.Elasticsearch.ExpectMinDocs(t, 2, "traces-apm*,metrics-apm*", estest.TermsQuery{ Field: "processor.event", Values: []interface{}{"transaction", "metric"}, }) diff --git a/systemtest/sampling_test.go b/systemtest/sampling_test.go index 523dc1939ea..1d13fe3d2d5 100644 --- a/systemtest/sampling_test.go +++ b/systemtest/sampling_test.go @@ -51,7 +51,7 @@ func TestDropUnsampled(t *testing.T) { tracer.StartTransaction("unsampled", transactionType).End() tracer.Flush(nil) - result := systemtest.Elasticsearch.ExpectMinDocs(t, 1, "apm-*", estest.TermQuery{ + result := systemtest.Elasticsearch.ExpectMinDocs(t, 1, "traces-apm*", estest.TermQuery{ Field: "transaction.type", Value: transactionType, }) @@ -60,8 +60,6 @@ func TestDropUnsampled(t *testing.T) { func TestTailSampling(t *testing.T) { systemtest.CleanupElasticsearch(t) - err := systemtest.Fleet.InstallPackage(systemtest.IntegrationPackage.Name, systemtest.IntegrationPackage.Version) - require.NoError(t, err) apmIntegration1 := newAPMIntegration(t, map[string]interface{}{ "tail_sampling_interval": "1s", @@ -144,10 +142,7 @@ func TestTailSamplingUnlicensed(t *testing.T) { waitForIntegration := false srv := apmservertest.NewUnstartedServer(t) srv.Config.Output.Elasticsearch.Hosts = []string{es.Addr} - srv.Config.DataStreams = &apmservertest.DataStreamsConfig{ - Enabled: true, - WaitForIntegration: &waitForIntegration, - } + srv.Config.WaitForIntegration = &waitForIntegration srv.Config.Sampling = &apmservertest.SamplingConfig{ Tail: &apmservertest.TailSamplingConfig{ Enabled: true, diff --git a/systemtest/sourcemap_test.go b/systemtest/sourcemap_test.go index cde7388f5a2..968dc60cf4a 100644 --- a/systemtest/sourcemap_test.go +++ b/systemtest/sourcemap_test.go @@ -43,7 +43,7 @@ func TestRUMErrorSourcemapping(t *testing.T) { ) systemtest.SendRUMEventsPayload(t, srv, "../testdata/intake-v2/errors_rum.ndjson") - result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*-error", nil) + result := systemtest.Elasticsearch.ExpectDocs(t, "logs-apm.error-*", nil) systemtest.ApproveEvents( t, t.Name(), result.Hits.Hits, @@ -67,7 +67,10 @@ func TestRUMSpanSourcemapping(t *testing.T) { "http://localhost:8000/test/e2e/general-usecase/bundle.js.map", ) systemtest.SendRUMEventsPayload(t, srv, "../testdata/intake-v2/transactions_spans_rum_2.ndjson") - result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*-span", nil) + result := systemtest.Elasticsearch.ExpectDocs(t, "traces-apm*", estest.TermQuery{ + Field: "processor.event", + Value: "span", + }) systemtest.ApproveEvents( t, t.Name(), result.Hits.Hits, @@ -93,7 +96,10 @@ func TestNoMatchingSourcemap(t *testing.T) { ) systemtest.SendRUMEventsPayload(t, srv, "../testdata/intake-v2/transactions_spans_rum_2.ndjson") - result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*-span", nil) + result := systemtest.Elasticsearch.ExpectDocs(t, "traces-apm*", estest.TermQuery{ + Field: "processor.event", + Value: "span", + }) systemtest.ApproveEvents( t, t.Name(), result.Hits.Hits, @@ -119,14 +125,14 @@ func TestSourcemapCaching(t *testing.T) { // Index an error, applying source mapping and caching the source map in the process. systemtest.SendRUMEventsPayload(t, srv, "../testdata/intake-v2/errors_rum.ndjson") - result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*-error", nil) + result := systemtest.Elasticsearch.ExpectDocs(t, "logs-apm.error-*", nil) assertSourcemapUpdated(t, result, true) // Delete the source map and error, and try again. systemtest.DeleteSourceMap(t, sourcemapID) - deleteIndex(t, "apm-*-error*") + systemtest.CleanupElasticsearch(t) systemtest.SendRUMEventsPayload(t, srv, "../testdata/intake-v2/errors_rum.ndjson") - result = systemtest.Elasticsearch.ExpectMinDocs(t, 1, "apm-*-error", nil) + result = systemtest.Elasticsearch.ExpectMinDocs(t, 1, "logs-apm.error-*", nil) assertSourcemapUpdated(t, result, true) } diff --git a/systemtest/template_test.go b/systemtest/template_test.go deleted file mode 100644 index dd67bb760dc..00000000000 --- a/systemtest/template_test.go +++ /dev/null @@ -1,167 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package systemtest_test - -import ( - "bytes" - "context" - "encoding/json" - "io/ioutil" - "net/http" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/elastic/apm-server/systemtest" - "github.com/elastic/apm-server/systemtest/apmservertest" - "github.com/elastic/apm-server/systemtest/estest" - "github.com/elastic/go-elasticsearch/v8/esapi" -) - -func TestIndexTemplateCoverage(t *testing.T) { - systemtest.CleanupElasticsearch(t) - srv := apmservertest.NewServer(t) - - // Index each supported event type. - var totalEvents int - for _, payloadFile := range []string{ - "../testdata/intake-v2/errors.ndjson", - "../testdata/intake-v2/metricsets.ndjson", - "../testdata/intake-v2/spans.ndjson", - "../testdata/intake-v2/transactions.ndjson", - } { - data, err := ioutil.ReadFile(payloadFile) - require.NoError(t, err) - req, _ := http.NewRequest("POST", srv.URL+"/intake/v2/events?verbose=true", bytes.NewReader(data)) - req.Header.Set("Content-Type", "application/x-ndjson") - - resp, err := http.DefaultClient.Do(req) - require.NoError(t, err) - defer resp.Body.Close() - assert.Equal(t, http.StatusAccepted, resp.StatusCode) - - var result struct { - Accepted int - } - err = json.NewDecoder(resp.Body).Decode(&result) - require.NoError(t, err) - assert.NotZero(t, result.Accepted) - totalEvents += result.Accepted - } - - // Wait for events to be indexed. - systemtest.Elasticsearch.ExpectMinDocs(t, totalEvents, "apm-*", estest.BoolQuery{ - MustNot: []interface{}{estest.TermQuery{Field: "processor.event", Value: "onboarding"}}, - }) - - // Check index mappings are covered by the template with the exception of known dynamic fields (e.g. labels). - var indexMappings map[string]struct { - Mappings map[string]interface{} - } - _, err := systemtest.Elasticsearch.Do(context.Background(), - &esapi.IndicesGetMappingRequest{Index: []string{"apm-*"}}, - &indexMappings, - ) - require.NoError(t, err) - - indexTemplate := getIndexTemplate(t, srv.Version) - indexTemplateFlattenedFields := make(map[string]interface{}) - indexTemplateMappings := indexTemplate["mappings"].(map[string]interface{}) - getFlattenedFields(indexTemplateMappings["properties"].(map[string]interface{}), "", indexTemplateFlattenedFields) - - knownMetrics := []string{ - "negative", // negative.d.o.t.t.e.d - "dotted", // dotted.float.gauge - "go", // go.memstats.heap.sys - "short_gauge", - "integer_gauge", - "long_gauge", - "float_gauge", - "double_gauge", - "byte_counter", - "short_counter", - "latency_distribution", - } - - for index, indexMappings := range indexMappings { - metricIndex := strings.Contains(index, "-metric-") - indexFlattenedFields := make(map[string]interface{}) - getFlattenedFields(indexMappings.Mappings["properties"].(map[string]interface{}), "", indexFlattenedFields) - for field := range indexFlattenedFields { - if strings.HasPrefix(field, "labels.") || strings.HasPrefix(field, "transaction.marks.") { - // Labels and RUM page marks are dynamically indexed. - continue - } - _, ok := indexTemplateFlattenedFields[field] - if !ok && metricIndex { - var isKnownMetric bool - for _, knownMetric := range knownMetrics { - if strings.HasPrefix(field, knownMetric) { - isKnownMetric = true - break - } - } - if isKnownMetric { - continue - } - } - assert.True(t, ok, "%s: field %s not defined in index template", index, field) - } - } -} - -func getIndexTemplate(t testing.TB, serverVersion string) map[string]interface{} { - indexTemplateName := "apm-" + serverVersion - indexTemplates := make(map[string]interface{}) - - // Wait for the index template to be created. - timeout := time.NewTimer(10 * time.Second) - defer timeout.Stop() - for { - select { - case <-timeout.C: - t.Fatal("timed out waiting for index template") - default: - } - if _, err := systemtest.Elasticsearch.Do(context.Background(), - &esapi.IndicesGetTemplateRequest{Name: []string{indexTemplateName}}, - &indexTemplates, - ); err == nil { - break - } - time.Sleep(100 * time.Millisecond) - } - - require.Len(t, indexTemplates, 1) - require.Contains(t, indexTemplates, indexTemplateName) - indexTemplate := indexTemplates[indexTemplateName].(map[string]interface{}) - return indexTemplate -} - -func getFlattenedFields(properties map[string]interface{}, prefix string, out map[string]interface{}) { - for field, mapping := range properties { - mapping := mapping.(map[string]interface{}) - out[prefix+field] = mapping - if properties, ok := mapping["properties"].(map[string]interface{}); ok { - getFlattenedFields(properties, prefix+field+".", out) - } - } -} From e5dd0779b392114103cab46c04dfc57103d64742 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Thu, 11 Nov 2021 10:24:21 +0800 Subject: [PATCH 2/2] Update to Python 3.8.5 on Windows, like Beats --- .ci/scripts/windows-build.ps1 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.ci/scripts/windows-build.ps1 b/.ci/scripts/windows-build.ps1 index d51e282e28e..932c9283c3b 100644 --- a/.ci/scripts/windows-build.ps1 +++ b/.ci/scripts/windows-build.ps1 @@ -41,7 +41,7 @@ exec { go get github.com/docker/libcompose } if (Test-Path "build") { Remove-Item -Recurse -Force build } New-Item -ItemType directory -Path build\coverage | Out-Null -choco install python -y -r --no-progress --version 3.8.1.20200110 +choco install python -y -r --no-progress --version 3.8.5 refreshenv $env:PATH = "C:\Python38;C:\Python38\Scripts;$env:PATH" $env:PYTHON_ENV = "$env:TEMP\python-env"