Skip to content

Commit

Permalink
ingest: introduce apm_data_stream_migration (#5768)
Browse files Browse the repository at this point in the history
(cherry picked from commit 4f4dca2)

# Conflicts:
#	changelogs/head.asciidoc
  • Loading branch information
axw authored and mergify-bot committed Jul 21, 2021
1 parent 0cf0f1f commit 20ff337
Show file tree
Hide file tree
Showing 10 changed files with 346 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"description": "Migrate APM events to data streams",
"processors": [
{
"script": {
"if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'",
"source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'error'",
"source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'metric'",
"source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n"
}
},
{
"set": {
"if": "ctx.data_stream != null",
"field": "_index",
"value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"description": "Migrate APM events to data streams",
"processors": [
{
"script": {
"if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'",
"source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'error'",
"source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'metric'",
"source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n"
}
},
{
"set": {
"if": "ctx.data_stream != null",
"field": "_index",
"value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"description": "Migrate APM events to data streams",
"processors": [
{
"script": {
"if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'",
"source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'error'",
"source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'metric'",
"source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n"
}
},
{
"set": {
"if": "ctx.data_stream != null",
"field": "_index",
"value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"description": "Migrate APM events to data streams",
"processors": [
{
"script": {
"if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'",
"source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'error'",
"source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'metric'",
"source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n"
}
},
{
"set": {
"if": "ctx.data_stream != null",
"field": "_index",
"value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"description": "Migrate APM events to data streams",
"processors": [
{
"script": {
"if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'",
"source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'error'",
"source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'metric'",
"source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n"
}
},
{
"set": {
"if": "ctx.data_stream != null",
"field": "_index",
"value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}"
}
}
]
}
52 changes: 52 additions & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
[[release-notes-head]]
== APM Server version HEAD

https://github.com/elastic/apm-server/compare/7.13\...master[View commits]

[float]
==== Breaking Changes
* Removed monitoring counters `apm-server.processor.stream.errors.{queue,server,closed}` {pull}5453[5453]
* APM Server now responds with 403 (HTTP) and PermissionDenied (gRPC) for authenticated but unauthorized requests {pull}5545[5545]
* `sourcemap.error` and `sourcemap.updated` are no longer set due to failing to find a matching source map {pull}5631[5631]
* experimental:["This breaking change applies to the experimental <<apm-integration>>."] Removed `service.name` from dataset {pull}5451[5451]

[float]
==== Bug fixes
* Fix panic due to misaligned 64-bit access on 32-bit architectures {pull}5277[5277]
* Fixed tail-based sampling pubsub to use _seq_no correctly {pull}5126[5126]
* Removed service name from dataset {pull}5451[5451]
* Fix panic on Fleet policy change when transaction metrics or tail-based sampling are enabled {pull}5670[5670]
* Remove multipart form temporary files left behind by source map uploads {pull}5718[5718]

[float]
==== Intake API Changes

[float]
==== Added
* Support setting agent configuration from apm-server.yml {pull}5177[5177]
* Add metric_type and unit to field metadata of system metrics {pull}5230[5230]
* Display apm-server url in fleet ui's apm-server integration {pull}4895[4895]
* Translate otel messaging.* semantic conventions to ECS {pull}5334[5334]
* Add support for dynamic histogram metrics {pull}5239[5239]
* Tail-sampling processor now resumes subscription from previous position after restart {pull}5350[5350]
* Add support for histograms to metrics intake {pull}5360[5360]
* Upgrade Go to 1.16.5 {pull}5454[5454]
* Add units to metric fields {pull}5395[5395]
* Support fetching sourcemaps from fleet {pull}5410[5410]
* Add support for adjusting OTel event timestamps using `telemetry.sdk.elastic_export_timestamp` {pull}5433[5433]
* Add support for OpenTelemetry labels describing mobile connectivity {pull}5436[5436]
* Introduce `apm-server.auth.*` config {pull}5457[5457]
* Add debug logging of OpenTelemetry payloads {pull}5474[5474]
* Add support for more input variables in fleet integration {pull}5444[5444]
* Under fleet, report which agent configs have been applied {pull}5481[5481]
* Server sends its raw config to kibana when running on ECE/ESS {pull}5424[5424]
* Add HTTP span fields as top level ECS fields {pull}5396[5396]
* Introduce `apm-server.auth.anonymous.*` config {pull}5623[5623]
* Upgrade Go to 1.16.6 {pull}5754[5754]
* Introduce ingest pipeline `apm_data_stream_migration` for migrating pre-data stream indices {5768}[5768]

[float]
==== Deprecated
* Make `destination.service.name` and `destination.service.type` fields optional and deprecated {pull}5468[5468]
* `apm-server.secret_token` is now `apm-server.auth.secret_token` {pull}5457[5457]
* `apm-server.api_key` is now `apm-server.auth.api_key` {pull}5457[5457]
33 changes: 33 additions & 0 deletions ingest/pipeline/definition.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,39 @@
]
}
},
{
"id": "apm_data_stream_migration",
"body": {
"description": "Migrate APM events to data streams",
"processors": [
{
"script": {
"if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'",
"source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'error'",
"source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'metric'",
"source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n"
}
},
{
"set": {
"if": "ctx.data_stream != null",
"field": "_index",
"value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}"
}
}
]
}
},
{
"id": "apm_user_agent",
"body": {
Expand Down
32 changes: 32 additions & 0 deletions ingest/pipeline/definition.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,38 @@ apm:
name: apm_metrics_dynamic_template
if: ctx.processor?.event == 'metric'

# apm_data_stream_migration is not used in the main apm pipeline,
# it is installed for migrating legacy indices to data streams,
# e.g. using the Kibana Upgrade Assistant.
apm_data_stream_migration:
description: Migrate APM events to data streams
processors:
- script:
if: ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'
source: |
ctx.data_stream = ["type": "traces", "dataset": "apm", "namespace": "migrated"]
- script:
if: ctx.processor?.event == 'error'
source: |
ctx.data_stream = ["type": "logs", "dataset": "apm.error", "namespace": "migrated"]
- script:
if: ctx.processor?.event == 'metric'
source: |
String dataset;
if (ctx["metricset.name"] != "app") {
dataset = "apm.internal";
} else {
String serviceName = ctx.service.name;
serviceName = serviceName.toLowerCase();
serviceName = /[\\\/*?"<>| ,#:-]/.matcher(serviceName).replaceAll('_');
dataset = "apm.app." + serviceName;
}
ctx.data_stream = ["type": "metrics", "dataset": dataset, "namespace": "migrated"];
- set:
if: ctx.data_stream != null
field: _index
value: "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}"

apm_user_agent:
description: Add user agent information for APM events
processors:
Expand Down
10 changes: 9 additions & 1 deletion systemtest/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,15 @@ func CleanupElasticsearch(t testing.TB) {
}

// Delete indices, data streams, and ingest pipelines.
if err := doReq(esapi.IndicesDeleteRequest{Index: []string{legacyPrefix}}); err != nil {
if err := doReq(esapi.IndicesDeleteRequest{Index: []string{
legacyPrefix,
// traces-apm*, metrics-apm*, and logs-apm* could get created
// as indices instead of data streams in some tests, so issue
// index delete requests for those too.
apmTracesPrefix,
apmMetricsPrefix,
apmLogsPrefix,
}}); err != nil {
t.Fatal(err)
}
doParallel(
Expand Down
70 changes: 70 additions & 0 deletions systemtest/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@
package systemtest_test

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"

"go.elastic.co/apm"

"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/elastic/go-elasticsearch/v7/esutil"

"github.com/elastic/apm-server/systemtest"
"github.com/elastic/apm-server/systemtest/apmservertest"
"github.com/elastic/apm-server/systemtest/estest"
Expand Down Expand Up @@ -53,3 +60,66 @@ func TestIngestPipeline(t *testing.T) {
assert.True(t, destinationIP.Exists())
assert.Equal(t, "::1", destinationIP.String())
}

func TestDataStreamMigrationIngestPipeline(t *testing.T) {
systemtest.CleanupElasticsearch(t)
srv := apmservertest.NewServer(t)

// Send a transaction, span, error, and metrics.
tracer := srv.Tracer()
tracer.RegisterMetricsGatherer(apm.GatherMetricsFunc(func(ctx context.Context, m *apm.Metrics) error {
m.Add("custom_metric", nil, 123)
return nil
}))
tx := tracer.StartTransaction("name", "type")
span := tx.StartSpan("name", "type", nil)
tracer.NewError(errors.New("boom")).Send()
span.End()
tx.End()
tracer.Flush(nil)
tracer.SendMetrics(nil)

// We expect at least 6 events:
// - onboarding
// - transaction
// - span
// - error
// - internal metricset
// - app metricset
for _, query := range []interface{}{
estest.TermQuery{Field: "processor.event", Value: "onboarding"},
estest.TermQuery{Field: "processor.event", Value: "transaction"},
estest.TermQuery{Field: "processor.event", Value: "span"},
estest.TermQuery{Field: "processor.event", Value: "error"},
estest.TermQuery{Field: "metricset.name", Value: "transaction_breakdown"},
estest.TermQuery{Field: "metricset.name", Value: "app"},
} {
systemtest.Elasticsearch.ExpectDocs(t, "apm-*", query)
}

refresh := true
_, err := systemtest.Elasticsearch.Do(context.Background(), &esapi.ReindexRequest{
Refresh: &refresh,
Body: esutil.NewJSONReader(map[string]interface{}{
"source": map[string]interface{}{
"index": "apm-*",
},
"dest": map[string]interface{}{
"index": "apm-migration",
"pipeline": "apm_data_stream_migration",
"op_type": "create",
},
}),
}, nil)
require.NoError(t, err)

// There should only be an onboarding doc in "apm-migration".
result := systemtest.Elasticsearch.ExpectDocs(t, "apm-migration", nil)
require.Len(t, result.Hits.Hits, 1)
assert.Equal(t, "onboarding", gjson.GetBytes(result.Hits.Hits[0].RawSource, "processor.event").String())

systemtest.Elasticsearch.ExpectMinDocs(t, 2, "traces-apm-migrated", nil) // transaction, span
systemtest.Elasticsearch.ExpectMinDocs(t, 1, "logs-apm.error-migrated", nil)
systemtest.Elasticsearch.ExpectMinDocs(t, 1, "metrics-apm.internal-migrated", nil)
systemtest.Elasticsearch.ExpectMinDocs(t, 1, "metrics-apm.app.systemtest-migrated", nil)
}

0 comments on commit 20ff337

Please sign in to comment.