Skip to content

Commit

Permalink
ingest: introduce apm_data_stream_migration
Browse files Browse the repository at this point in the history
  • Loading branch information
axw committed Jul 20, 2021
1 parent 3ee3355 commit 38ec1d5
Show file tree
Hide file tree
Showing 10 changed files with 295 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"description": "Migrate APM events to data streams",
"processors": [
{
"script": {
"if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'",
"source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'error'",
"source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'metric'",
"source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n"
}
},
{
"set": {
"if": "ctx.data_stream != null",
"field": "_index",
"value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"description": "Migrate APM events to data streams",
"processors": [
{
"script": {
"if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'",
"source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'error'",
"source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'metric'",
"source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n"
}
},
{
"set": {
"if": "ctx.data_stream != null",
"field": "_index",
"value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"description": "Migrate APM events to data streams",
"processors": [
{
"script": {
"if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'",
"source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'error'",
"source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'metric'",
"source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n"
}
},
{
"set": {
"if": "ctx.data_stream != null",
"field": "_index",
"value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"description": "Migrate APM events to data streams",
"processors": [
{
"script": {
"if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'",
"source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'error'",
"source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'metric'",
"source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n"
}
},
{
"set": {
"if": "ctx.data_stream != null",
"field": "_index",
"value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"description": "Migrate APM events to data streams",
"processors": [
{
"script": {
"if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'",
"source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'error'",
"source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'metric'",
"source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n"
}
},
{
"set": {
"if": "ctx.data_stream != null",
"field": "_index",
"value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}"
}
}
]
}
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits]
* Add HTTP span fields as top level ECS fields {pull}5396[5396]
* Introduce `apm-server.auth.anonymous.*` config {pull}5623[5623]
* Upgrade Go to 1.16.6 {pull}5754[5754]
* Introduce ingest pipeline `apm_data_stream_migration` for migrating pre-data stream indices {5768}[5768]

[float]
==== Deprecated
Expand Down
33 changes: 33 additions & 0 deletions ingest/pipeline/definition.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,39 @@
]
}
},
{
"id": "apm_data_stream_migration",
"body": {
"description": "Migrate APM events to data streams",
"processors": [
{
"script": {
"if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'",
"source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'error'",
"source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n"
}
},
{
"script": {
"if": "ctx.processor?.event == 'metric'",
"source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n"
}
},
{
"set": {
"if": "ctx.data_stream != null",
"field": "_index",
"value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}"
}
}
]
}
},
{
"id": "apm_user_agent",
"body": {
Expand Down
32 changes: 32 additions & 0 deletions ingest/pipeline/definition.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,38 @@ apm:
name: apm_metrics_dynamic_template
if: ctx.processor?.event == 'metric'

# apm_data_stream_migration is not used in the main apm pipeline,
# it is installed for migrating legacy indices to data streams,
# e.g. using the Kibana Upgrade Assistant.
apm_data_stream_migration:
description: Migrate APM events to data streams
processors:
- script:
if: ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'
source: |
ctx.data_stream = ["type": "traces", "dataset": "apm", "namespace": "migrated"]
- script:
if: ctx.processor?.event == 'error'
source: |
ctx.data_stream = ["type": "logs", "dataset": "apm.error", "namespace": "migrated"]
- script:
if: ctx.processor?.event == 'metric'
source: |
String dataset;
if (ctx["metricset.name"] != "app") {
dataset = "apm.internal";
} else {
String serviceName = ctx.service.name;
serviceName = serviceName.toLowerCase();
serviceName = /[\\\/*?"<>| ,#:-]/.matcher(serviceName).replaceAll('_');
dataset = "apm.app." + serviceName;
}
ctx.data_stream = ["type": "metrics", "dataset": dataset, "namespace": "migrated"];
- set:
if: ctx.data_stream != null
field: _index
value: "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}"

apm_user_agent:
description: Add user agent information for APM events
processors:
Expand Down
10 changes: 9 additions & 1 deletion systemtest/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,15 @@ func CleanupElasticsearch(t testing.TB) {
}

// Delete indices, data streams, and ingest pipelines.
if err := doReq(esapi.IndicesDeleteRequest{Index: []string{legacyPrefix}}); err != nil {
if err := doReq(esapi.IndicesDeleteRequest{Index: []string{
legacyPrefix,
// traces-apm*, metrics-apm*, and logs-apm* could get created
// as indices instead of data streams in some tests, so issue
// index delete requests for those too.
apmTracesPrefix,
apmMetricsPrefix,
apmLogsPrefix,
}}); err != nil {
t.Fatal(err)
}
doParallel(
Expand Down
70 changes: 70 additions & 0 deletions systemtest/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@
package systemtest_test

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"

"go.elastic.co/apm"

"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/elastic/go-elasticsearch/v7/esutil"

"github.com/elastic/apm-server/systemtest"
"github.com/elastic/apm-server/systemtest/apmservertest"
"github.com/elastic/apm-server/systemtest/estest"
Expand Down Expand Up @@ -53,3 +60,66 @@ func TestIngestPipeline(t *testing.T) {
assert.True(t, destinationIP.Exists())
assert.Equal(t, "::1", destinationIP.String())
}

func TestDataStreamMigrationIngestPipeline(t *testing.T) {
systemtest.CleanupElasticsearch(t)
srv := apmservertest.NewServer(t)

// Send a transaction, span, error, and metrics.
tracer := srv.Tracer()
tracer.RegisterMetricsGatherer(apm.GatherMetricsFunc(func(ctx context.Context, m *apm.Metrics) error {
m.Add("custom_metric", nil, 123)
return nil
}))
tx := tracer.StartTransaction("name", "type")
span := tx.StartSpan("name", "type", nil)
tracer.NewError(errors.New("boom")).Send()
span.End()
tx.End()
tracer.Flush(nil)
tracer.SendMetrics(nil)

// We expect at least 6 events:
// - onboarding
// - transaction
// - span
// - error
// - internal metricset
// - app metricset
for _, query := range []interface{}{
estest.TermQuery{Field: "processor.event", Value: "onboarding"},
estest.TermQuery{Field: "processor.event", Value: "transaction"},
estest.TermQuery{Field: "processor.event", Value: "span"},
estest.TermQuery{Field: "processor.event", Value: "error"},
estest.TermQuery{Field: "metricset.name", Value: "transaction_breakdown"},
estest.TermQuery{Field: "metricset.name", Value: "app"},
} {
systemtest.Elasticsearch.ExpectDocs(t, "apm-*", query)
}

refresh := true
_, err := systemtest.Elasticsearch.Do(context.Background(), &esapi.ReindexRequest{
Refresh: &refresh,
Body: esutil.NewJSONReader(map[string]interface{}{
"source": map[string]interface{}{
"index": "apm-*",
},
"dest": map[string]interface{}{
"index": "apm-migration",
"pipeline": "apm_data_stream_migration",
"op_type": "create",
},
}),
}, nil)
require.NoError(t, err)

// There should only be an onboarding doc in "apm-migration".
result := systemtest.Elasticsearch.ExpectDocs(t, "apm-migration", nil)
require.Len(t, result.Hits.Hits, 1)
assert.Equal(t, "onboarding", gjson.GetBytes(result.Hits.Hits[0].RawSource, "processor.event").String())

systemtest.Elasticsearch.ExpectMinDocs(t, 2, "traces-apm-migrated", nil) // transaction, span
systemtest.Elasticsearch.ExpectMinDocs(t, 1, "logs-apm.error-migrated", nil)
systemtest.Elasticsearch.ExpectMinDocs(t, 1, "metrics-apm.internal-migrated", nil)
systemtest.Elasticsearch.ExpectMinDocs(t, 1, "metrics-apm.app.systemtest-migrated", nil)
}

0 comments on commit 38ec1d5

Please sign in to comment.