Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.0] systemtest: data streams everywhere (backport #6568) #6574

Merged
merged 2 commits into from
Nov 11, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .ci/scripts/linux-test.sh
Original file line number Diff line number Diff line change
@@ -13,6 +13,9 @@ trap cleanup EXIT

make update apm-server

# Start docker-compose environment first, so it doesn't count towards the test timeout.
docker-compose up -d

SYSTEM_TESTS_XUNIT_PATH="$(pwd)/build"
# TODO(axw) make this a Makefile target
# TODO(mdelapenya) meanwhile there is no 'DefaultGoTestSystemArgs' at beats' GoTest implementation, this command is reproducing what Beats should provide: a gotestsum representation for system tests.
2 changes: 1 addition & 1 deletion .ci/scripts/windows-build.ps1
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@ exec { go get github.com/docker/libcompose }
if (Test-Path "build") { Remove-Item -Recurse -Force build }
New-Item -ItemType directory -Path build\coverage | Out-Null

choco install python -y -r --no-progress --version 3.8.1.20200110
choco install python -y -r --no-progress --version 3.8.5
refreshenv
$env:PATH = "C:\Python38;C:\Python38\Scripts;$env:PATH"
$env:PYTHON_ENV = "$env:TEMP\python-env"
10 changes: 5 additions & 5 deletions systemtest/aggregation_test.go
Original file line number Diff line number Diff line change
@@ -66,7 +66,7 @@ func TestTransactionAggregation(t *testing.T) {
}
tracer.Flush(nil)

result := systemtest.Elasticsearch.ExpectMinDocs(t, 2, "apm-*",
result := systemtest.Elasticsearch.ExpectMinDocs(t, 2, "metrics-apm.internal-*",
estest.ExistsQuery{Field: "transaction.duration.histogram"},
)
systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits)
@@ -79,7 +79,7 @@ func TestTransactionAggregation(t *testing.T) {
// the appropriate per-bucket doc_count values.
result = estest.SearchResult{}
_, err = systemtest.Elasticsearch.Do(context.Background(), &esapi.SearchRequest{
Index: []string{"apm-*"},
Index: []string{"metrics-apm.internal-*"},
Body: strings.NewReader(`{
"size": 0,
"query": {"exists":{"field":"transaction.duration.histogram"}},
@@ -135,14 +135,14 @@ func TestTransactionAggregationShutdown(t *testing.T) {
// Wait for the transaction to be indexed, indicating that Elasticsearch
// indices have been setup and we should not risk triggering the shutdown
// timeout while waiting for the aggregated metrics to be indexed.
systemtest.Elasticsearch.ExpectDocs(t, "apm-*",
systemtest.Elasticsearch.ExpectDocs(t, "traces-apm*",
estest.TermQuery{Field: "processor.event", Value: "transaction"},
)

// Stop server to ensure metrics are flushed on shutdown.
assert.NoError(t, srv.Close())

result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*",
result := systemtest.Elasticsearch.ExpectDocs(t, "metrics-apm.internal-*",
estest.ExistsQuery{Field: "transaction.duration.histogram"},
)
systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits)
@@ -175,7 +175,7 @@ func TestServiceDestinationAggregation(t *testing.T) {
tx.End()
tracer.Flush(nil)

result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*",
result := systemtest.Elasticsearch.ExpectDocs(t, "metrics-apm.internal-*",
estest.ExistsQuery{Field: "span.destination.service.response_time.count"},
)
systemtest.ApproveEvents(t, t.Name(), result.Hits.Hits)
8 changes: 1 addition & 7 deletions systemtest/apmservertest/config.go
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ type Config struct {
Aggregation *AggregationConfig `json:"apm-server.aggregation,omitempty"`
Sampling *SamplingConfig `json:"apm-server.sampling,omitempty"`
RUM *RUMConfig `json:"apm-server.rum,omitempty"`
DataStreams *DataStreamsConfig `json:"apm-server.data_streams,omitempty"`
WaitForIntegration *bool `json:"apm-server.data_streams.wait_for_integration,omitempty"`
DefaultServiceEnvironment string `json:"apm-server.default_service_environment,omitempty"`
KibanaAgentConfig *KibanaAgentConfig `json:"apm-server.agent.config,omitempty"`
TLS *TLSConfig `json:"apm-server.ssl,omitempty"`
@@ -198,12 +198,6 @@ type RUMSourcemapCacheConfig struct {
Expiration time.Duration `json:"expiration,omitempty"`
}

// DataStreamsConfig holds APM Server data streams configuration.
type DataStreamsConfig struct {
Enabled bool `json:"enabled"`
WaitForIntegration *bool `json:"wait_for_integration,omitempty"`
}

// APIKeyConfig holds agent auth configuration.
type AgentAuthConfig struct {
SecretToken string `json:"secret_token,omitempty"`
4 changes: 4 additions & 0 deletions systemtest/apmservertest/server.go
Original file line number Diff line number Diff line change
@@ -144,6 +144,10 @@ func (s *Server) start(tls bool) error {
"logging.to_stderr": true,
"apm-server.expvar.enabled": true,
"apm-server.host": "127.0.0.1:0",

// TODO(axw) remove this when we switch over to data streams
// as the default and only indexing method.
"apm-server.data_streams.enabled": true,
}
if tls {
certPath, keyPath, caCertPath, err := s.initTLS()
10 changes: 10 additions & 0 deletions systemtest/approvals/TestCompressedSpans.approved.json
Original file line number Diff line number Diff line change
@@ -6,10 +6,15 @@
"name": "go",
"version": "0.0.0"
},
"data_stream.dataset": "apm",
"data_stream.namespace": "default",
"data_stream.type": "traces",
"ecs": {
"version": "dynamic"
},
"event": {
"agent_id_status": "missing",
"ingested": "dynamic",
"outcome": "success"
},
"observer": {
@@ -68,10 +73,15 @@
"name": "go",
"version": "0.0.0"
},
"data_stream.dataset": "apm",
"data_stream.namespace": "default",
"data_stream.type": "traces",
"ecs": {
"version": "dynamic"
},
"event": {
"agent_id_status": "missing",
"ingested": "dynamic",
"outcome": "success"
},
"observer": {
5 changes: 5 additions & 0 deletions systemtest/approvals/TestNoMatchingSourcemap.approved.json
Original file line number Diff line number Diff line change
@@ -9,10 +9,15 @@
"client": {
"ip": "127.0.0.1"
},
"data_stream.dataset": "apm.rum",
"data_stream.namespace": "default",
"data_stream.type": "traces",
"ecs": {
"version": "dynamic"
},
"event": {
"agent_id_status": "missing",
"ingested": "dynamic",
"outcome": "unknown"
},
"observer": {
4 changes: 4 additions & 0 deletions systemtest/approvals/TestOTLPGRPCMetrics.approved.json
Original file line number Diff line number Diff line change
@@ -6,10 +6,14 @@
"name": "opentelemetry/go",
"version": "1.0.0"
},
"data_stream.dataset": "apm.app.unknown_service_systemtest_test",
"data_stream.namespace": "default",
"data_stream.type": "metrics",
"ecs": {
"version": "dynamic"
},
"event": {
"agent_id_status": "missing",
"ingested": "dynamic"
},
"float64_counter": 1,
4 changes: 4 additions & 0 deletions systemtest/approvals/TestRUMErrorSourcemapping.approved.json
Original file line number Diff line number Diff line change
@@ -9,6 +9,9 @@
"client": {
"ip": "127.0.0.1"
},
"data_stream.dataset": "apm.error",
"data_stream.namespace": "default",
"data_stream.type": "logs",
"ecs": {
"version": "dynamic"
},
@@ -252,6 +255,7 @@
}
},
"event": {
"agent_id_status": "missing",
"ingested": "dynamic"
},
"http": {
5 changes: 5 additions & 0 deletions systemtest/approvals/TestRUMSpanSourcemapping.approved.json
Original file line number Diff line number Diff line change
@@ -9,10 +9,15 @@
"client": {
"ip": "127.0.0.1"
},
"data_stream.dataset": "apm.rum",
"data_stream.namespace": "default",
"data_stream.type": "traces",
"ecs": {
"version": "dynamic"
},
"event": {
"agent_id_status": "missing",
"ingested": "dynamic",
"outcome": "unknown"
},
"observer": {
Original file line number Diff line number Diff line change
@@ -5,10 +5,14 @@
"agent": {
"name": "go"
},
"data_stream.dataset": "apm.internal",
"data_stream.namespace": "default",
"data_stream.type": "metrics",
"ecs": {
"version": "dynamic"
},
"event": {
"agent_id_status": "missing",
"ingested": "dynamic",
"outcome": "unknown"
},
8 changes: 8 additions & 0 deletions systemtest/approvals/TestTransactionAggregation.approved.json
Original file line number Diff line number Diff line change
@@ -6,10 +6,14 @@
"agent": {
"name": "go"
},
"data_stream.dataset": "apm.internal",
"data_stream.namespace": "default",
"data_stream.type": "metrics",
"ecs": {
"version": "dynamic"
},
"event": {
"agent_id_status": "missing",
"ingested": "dynamic",
"outcome": "unknown"
},
@@ -59,10 +63,14 @@
"agent": {
"name": "go"
},
"data_stream.dataset": "apm.internal",
"data_stream.namespace": "default",
"data_stream.type": "metrics",
"ecs": {
"version": "dynamic"
},
"event": {
"agent_id_status": "missing",
"ingested": "dynamic",
"outcome": "unknown"
},
Original file line number Diff line number Diff line change
@@ -6,10 +6,14 @@
"agent": {
"name": "go"
},
"data_stream.dataset": "apm.internal",
"data_stream.namespace": "default",
"data_stream.type": "metrics",
"ecs": {
"version": "dynamic"
},
"event": {
"agent_id_status": "missing",
"ingested": "dynamic",
"outcome": "unknown"
},
Original file line number Diff line number Diff line change
@@ -5,10 +5,14 @@
"agent": {
"name": "go"
},
"data_stream.dataset": "apm.internal",
"data_stream.namespace": "default",
"data_stream.type": "metrics",
"ecs": {
"version": "dynamic"
},
"event": {
"agent_id_status": "missing",
"ingested": "dynamic",
"outcome": "success"
},
@@ -45,10 +49,14 @@
"agent": {
"name": "go"
},
"data_stream.dataset": "apm.internal",
"data_stream.namespace": "default",
"data_stream.type": "metrics",
"ecs": {
"version": "dynamic"
},
"event": {
"agent_id_status": "missing",
"ingested": "dynamic",
"outcome": "success"
},
Original file line number Diff line number Diff line change
@@ -6,10 +6,14 @@
"name": "go",
"version": "0.0.0"
},
"data_stream.dataset": "apm",
"data_stream.namespace": "default",
"data_stream.type": "traces",
"ecs": {
"version": "dynamic"
},
"event": {
"agent_id_status": "missing",
"ingested": "dynamic",
"outcome": "success"
},
78 changes: 8 additions & 70 deletions systemtest/elasticsearch.go
Original file line number Diff line number Diff line change
@@ -19,12 +19,11 @@ package systemtest

import (
"context"
"net/http"
"net/url"
"testing"
"time"

"golang.org/x/sync/errgroup"
"github.com/stretchr/testify/require"

"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
@@ -87,75 +86,14 @@ func newElasticsearchConfig() elasticsearch.Config {
}
}

// CleanupElasticsearch deletes all indices, index templates,
// and ingest node pipelines whose names start with "apm",
// and deletes the default ILM policy "apm-rollover-30-days".
// CleanupElasticsearch deletes all data streams created by APM Server.
func CleanupElasticsearch(t testing.TB) {
const (
legacyPrefix = "apm*" // Not "apm-*", as that would not capture the "apm" ingest pipeline.
apmTracesPrefix = "traces-apm*"
apmMetricsPrefix = "metrics-apm*"
apmLogsPrefix = "logs-apm*"
)

doReq := func(req estest.Request) error {
_, err := Elasticsearch.Do(context.Background(), req, nil)
if err, ok := err.(*estest.Error); ok && err.StatusCode == http.StatusNotFound {
return nil
}
return err
}

doParallel := func(requests ...estest.Request) {
t.Helper()
var g errgroup.Group
for _, req := range requests {
req := req // copy for closure
g.Go(func() error { return doReq(req) })
}
if err := g.Wait(); err != nil {
t.Fatal(err)
}
}

// Delete indices, data streams, and ingest pipelines.
if err := doReq(esapi.IndicesDeleteRequest{Index: []string{
legacyPrefix,
// traces-apm*, metrics-apm*, and logs-apm* could get created
// as indices instead of data streams in some tests, so issue
// index delete requests for those too.
apmTracesPrefix,
apmMetricsPrefix,
apmLogsPrefix,
}}); err != nil {
t.Fatal(err)
}
doParallel(
esapi.IndicesDeleteDataStreamRequest{Name: []string{
legacyPrefix,
apmTracesPrefix,
apmMetricsPrefix,
apmLogsPrefix,
}},
esapi.IngestDeletePipelineRequest{PipelineID: legacyPrefix},
)

// Delete index templates after deleting data streams.
if err := doReq(esapi.IndicesDeleteTemplateRequest{Name: legacyPrefix}); err != nil {
t.Fatal(err)
}

// Delete the ILM policy last or we'll get an error due to it being in use.
for {
err := doReq(esapi.ILMDeleteLifecycleRequest{Policy: "apm-rollover-30-days"})
if err == nil {
break
}
// Retry deleting, in case indices are still being deleted.
const delay = 100 * time.Millisecond
t.Logf("failed to delete ILM policy (retrying in %s): %s", delay, err)
time.Sleep(delay)
}
_, err := Elasticsearch.Do(context.Background(), &esapi.IndicesDeleteDataStreamRequest{Name: []string{
"traces-apm*",
"metrics-apm*",
"logs-apm*",
}}, nil)
require.NoError(t, err)
}

// ChangeUserPassword changes the password for a given user.
Loading