Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest: introduce apm_data_stream_migration #5768

Merged
merged 1 commit into from
Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"description": "Migrate APM events to data streams",
"processors": [
{
"script": {
"if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'",
"source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'error'",
"source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'metric'",
"source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n"
}
},
{
"set": {
"if": "ctx.data_stream != null",
"field": "_index",
"value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"description": "Migrate APM events to data streams",
"processors": [
{
"script": {
"if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'",
"source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'error'",
"source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'metric'",
"source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n"
}
},
{
"set": {
"if": "ctx.data_stream != null",
"field": "_index",
"value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"description": "Migrate APM events to data streams",
"processors": [
{
"script": {
"if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'",
"source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'error'",
"source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'metric'",
"source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n"
}
},
{
"set": {
"if": "ctx.data_stream != null",
"field": "_index",
"value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"description": "Migrate APM events to data streams",
"processors": [
{
"script": {
"if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'",
"source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'error'",
"source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'metric'",
"source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n"
}
},
{
"set": {
"if": "ctx.data_stream != null",
"field": "_index",
"value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"description": "Migrate APM events to data streams",
"processors": [
{
"script": {
"if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'",
"source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'error'",
"source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'metric'",
"source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n"
}
},
{
"set": {
"if": "ctx.data_stream != null",
"field": "_index",
"value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}"
}
}
]
}
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits]
* Add HTTP span fields as top level ECS fields {pull}5396[5396]
* Introduce `apm-server.auth.anonymous.*` config {pull}5623[5623]
* Upgrade Go to 1.16.6 {pull}5754[5754]
* Introduce ingest pipeline `apm_data_stream_migration` for migrating pre-data stream indices {5768}[5768]

[float]
==== Deprecated
Expand Down
33 changes: 33 additions & 0 deletions ingest/pipeline/definition.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,39 @@
]
}
},
{
"id": "apm_data_stream_migration",
"body": {
"description": "Migrate APM events to data streams",
"processors": [
{
"script": {
"if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'",
"source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'error'",
"source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'metric'",
"source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n"
}
},
{
"set": {
"if": "ctx.data_stream != null",
"field": "_index",
"value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}"
}
}
]
}
},
{
"id": "apm_user_agent",
"body": {
Expand Down
32 changes: 32 additions & 0 deletions ingest/pipeline/definition.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,38 @@ apm:
name: apm_metrics_dynamic_template
if: ctx.processor?.event == 'metric'

# apm_data_stream_migration is not used in the main apm pipeline,
# it is installed for migrating legacy indices to data streams,
# e.g. using the Kibana Upgrade Assistant.
apm_data_stream_migration:
description: Migrate APM events to data streams
processors:
- script:
if: ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'
source: |
ctx.data_stream = ["type": "traces", "dataset": "apm", "namespace": "migrated"]
- script:
if: ctx.processor?.event == 'error'
source: |
ctx.data_stream = ["type": "logs", "dataset": "apm.error", "namespace": "migrated"]
- script:
if: ctx.processor?.event == 'metric'
source: |
String dataset;
if (ctx["metricset.name"] != "app") {
dataset = "apm.internal";
} else {
String serviceName = ctx.service.name;
serviceName = serviceName.toLowerCase();
serviceName = /[\\\/*?"<>| ,#:-]/.matcher(serviceName).replaceAll('_');
dataset = "apm.app." + serviceName;
}
ctx.data_stream = ["type": "metrics", "dataset": dataset, "namespace": "migrated"];
- set:
if: ctx.data_stream != null
field: _index
value: "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}"

apm_user_agent:
description: Add user agent information for APM events
processors:
Expand Down
10 changes: 9 additions & 1 deletion systemtest/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,15 @@ func CleanupElasticsearch(t testing.TB) {
}

// Delete indices, data streams, and ingest pipelines.
if err := doReq(esapi.IndicesDeleteRequest{Index: []string{legacyPrefix}}); err != nil {
if err := doReq(esapi.IndicesDeleteRequest{Index: []string{
legacyPrefix,
// traces-apm*, metrics-apm*, and logs-apm* could get created
// as indices instead of data streams in some tests, so issue
// index delete requests for those too.
apmTracesPrefix,
apmMetricsPrefix,
apmLogsPrefix,
}}); err != nil {
t.Fatal(err)
}
doParallel(
Expand Down
70 changes: 70 additions & 0 deletions systemtest/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@
package systemtest_test

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"

"go.elastic.co/apm"

"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/elastic/go-elasticsearch/v7/esutil"

"github.com/elastic/apm-server/systemtest"
"github.com/elastic/apm-server/systemtest/apmservertest"
"github.com/elastic/apm-server/systemtest/estest"
Expand Down Expand Up @@ -53,3 +60,66 @@ func TestIngestPipeline(t *testing.T) {
assert.True(t, destinationIP.Exists())
assert.Equal(t, "::1", destinationIP.String())
}

func TestDataStreamMigrationIngestPipeline(t *testing.T) {
systemtest.CleanupElasticsearch(t)
srv := apmservertest.NewServer(t)

// Send a transaction, span, error, and metrics.
tracer := srv.Tracer()
tracer.RegisterMetricsGatherer(apm.GatherMetricsFunc(func(ctx context.Context, m *apm.Metrics) error {
m.Add("custom_metric", nil, 123)
return nil
}))
tx := tracer.StartTransaction("name", "type")
span := tx.StartSpan("name", "type", nil)
tracer.NewError(errors.New("boom")).Send()
span.End()
tx.End()
tracer.Flush(nil)
tracer.SendMetrics(nil)

// We expect at least 6 events:
// - onboarding
// - transaction
// - span
// - error
// - internal metricset
// - app metricset
for _, query := range []interface{}{
estest.TermQuery{Field: "processor.event", Value: "onboarding"},
estest.TermQuery{Field: "processor.event", Value: "transaction"},
estest.TermQuery{Field: "processor.event", Value: "span"},
estest.TermQuery{Field: "processor.event", Value: "error"},
estest.TermQuery{Field: "metricset.name", Value: "transaction_breakdown"},
estest.TermQuery{Field: "metricset.name", Value: "app"},
} {
systemtest.Elasticsearch.ExpectDocs(t, "apm-*", query)
}

refresh := true
_, err := systemtest.Elasticsearch.Do(context.Background(), &esapi.ReindexRequest{
Refresh: &refresh,
Body: esutil.NewJSONReader(map[string]interface{}{
"source": map[string]interface{}{
"index": "apm-*",
},
"dest": map[string]interface{}{
"index": "apm-migration",
"pipeline": "apm_data_stream_migration",
"op_type": "create",
},
}),
}, nil)
require.NoError(t, err)

// There should only be an onboarding doc in "apm-migration".
result := systemtest.Elasticsearch.ExpectDocs(t, "apm-migration", nil)
require.Len(t, result.Hits.Hits, 1)
assert.Equal(t, "onboarding", gjson.GetBytes(result.Hits.Hits[0].RawSource, "processor.event").String())

systemtest.Elasticsearch.ExpectMinDocs(t, 2, "traces-apm-migrated", nil) // transaction, span
systemtest.Elasticsearch.ExpectMinDocs(t, 1, "logs-apm.error-migrated", nil)
systemtest.Elasticsearch.ExpectMinDocs(t, 1, "metrics-apm.internal-migrated", nil)
systemtest.Elasticsearch.ExpectMinDocs(t, 1, "metrics-apm.app.systemtest-migrated", nil)
}