Skip to content

Commit

Permalink
[7.x] Fix pipeline name refs and remove service datasets (#4489, #4491)…
Browse files Browse the repository at this point in the history
… (#4537)

* Fix pipeline name refs (#4489)

Update pipeline names referenced in default.json files to match the one generated by Fleet.

* Remove service datasets (#4491)

Also rename data streams and fix the profiles pipelines referenced names
  • Loading branch information
jalvz authored Dec 15, 2020
1 parent 4c687c6 commit 2f3eb92
Show file tree
Hide file tree
Showing 87 changed files with 266 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@
"processors": [
{
"pipeline": {
"name": "apm_user_agent"
"name": "metrics-apm-0.1.0-apm_user_agent"
}
},
{
"pipeline": {
"name": "apm_user_geo"
"name": "metrics-apm-0.1.0-apm_user_geo"
}
},
{
"pipeline": {
"name": "apm_ingest_timestamp"
"name": "metrics-apm-0.1.0-apm_ingest_timestamp"
}
},
{
"pipeline": {
"name": "apm_remove_span_metadata"
"name": "metrics-apm-0.1.0-apm_remove_span_metadata"
}
}
]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
title: APM application metrics
type: metrics
dataset: apm
ingest_pipeline: apm
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@
"processors": [
{
"pipeline": {
"name": "apm_user_agent"
"name": "logs-apm.error-0.1.0-apm_user_agent"
}
},
{
"pipeline": {
"name": "apm_user_geo"
"name": "logs-apm.error-0.1.0-apm_user_geo"
}
},
{
"pipeline": {
"name": "apm_ingest_timestamp"
"name": "logs-apm.error-0.1.0-apm_ingest_timestamp"
}
},
{
"pipeline": {
"name": "apm_remove_span_metadata"
"name": "logs-apm.error-0.1.0-apm_remove_span_metadata"
}
}
]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"description": "Default enrichment for APM events",
"processors": [
{
"pipeline": {
"name": "metrics-apm.internal-0.1.0-apm_user_agent"
}
},
{
"pipeline": {
"name": "metrics-apm.internal-0.1.0-apm_user_geo"
}
},
{
"pipeline": {
"name": "metrics-apm.internal-0.1.0-apm_ingest_timestamp"
}
},
{
"pipeline": {
"name": "metrics-apm.internal-0.1.0-apm_remove_span_metadata"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
title: APM internal metrics
type: metrics
dataset: apm.internal
ingest_pipeline: apm
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"description": "Default enrichment for APM events",
"processors": [
{
"pipeline": {
"name": "metrics-apm.profiling-0.1.0-apm_user_agent"
}
},
{
"pipeline": {
"name": "metrics-apm.profiling-0.1.0-apm_user_geo"
}
},
{
"pipeline": {
"name": "metrics-apm.profiling-0.1.0-apm_ingest_timestamp"
}
},
{
"pipeline": {
"name": "metrics-apm.profiling-0.1.0-apm_remove_span_metadata"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
title: APM profiles
type: metrics
dataset: apm.profiling
ingest_pipeline: apm

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@
"processors": [
{
"pipeline": {
"name": "apm_user_agent"
"name": "traces-apm-0.1.0-apm_user_agent"
}
},
{
"pipeline": {
"name": "apm_user_geo"
"name": "traces-apm-0.1.0-apm_user_geo"
}
},
{
"pipeline": {
"name": "apm_ingest_timestamp"
"name": "traces-apm-0.1.0-apm_ingest_timestamp"
}
},
{
"pipeline": {
"name": "apm_remove_span_metadata"
"name": "traces-apm-0.1.0-apm_remove_span_metadata"
}
}
]
Expand Down
1 change: 0 additions & 1 deletion apmpackage/apm/0.1.0/data_stream/traces/manifest.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
title: APM traces
type: traces
dataset: apm
ingest_pipeline: apm
4 changes: 2 additions & 2 deletions apmpackage/apm/0.1.0/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ title: Elastic APM
version: 0.1.0
license: basic
description: Ingest APM data
type: solution # integration / solution
type: integration
categories:
- monitoring # TODO do we need a new category?
- monitoring
release: experimental # experimental / beta / ga
conditions:
kibana.version: '^7.11.0'
Expand Down
10 changes: 5 additions & 5 deletions apmpackage/cmd/gen-package/gendocs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
func generateDocs(inputFields map[string][]field, version string) {
data := docsData{
Traces: prepareFields(inputFields, version, "traces"),
Metrics: prepareFields(inputFields, version, "metrics"),
Logs: prepareFields(inputFields, version, "logs"),
Metrics: prepareFields(inputFields, version, "app_metrics"),
Logs: prepareFields(inputFields, version, "error_logs"),
TransactionExample: loadExample("transactions.json"),
SpanExample: loadExample("spans.json"),
MetricsExample: loadExample("metricsets.json"),
Expand Down Expand Up @@ -65,17 +65,17 @@ type docsData struct {
ErrorExample string
}

func prepareFields(inputFields map[string][]field, version, streamType string) []field {
func prepareFields(inputFields map[string][]field, version, stream string) []field {
extend := func(fs []field) []field {
var baseFields []field
for _, f := range loadFieldsFile(baseFieldsFilePath(version, streamType)) {
for _, f := range loadFieldsFile(baseFieldsFilePath(version, stream)) {
f.IsECS = true
baseFields = append(baseFields, f)
}
fs = append(baseFields, fs...)
return fs
}
return extend(inputFields[streamType])
return extend(inputFields[stream])
}

func loadExample(file string) string {
Expand Down
6 changes: 3 additions & 3 deletions apmpackage/cmd/gen-package/genfields.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ func generateFields(version string) map[string][]field {
ecsFlatFields := loadECSFields()

inputFieldsFiles := map[string][]field{
"logs": format("model/error/_meta/fields.yml"),
"error_logs": format("model/error/_meta/fields.yml"),
"internal_metrics": format("model/metricset/_meta/fields.yml", "x-pack/apm-server/fields/_meta/fields.yml"),
"profiles": format("model/profile/_meta/fields.yml"),
"profile_metrics": format("model/profile/_meta/fields.yml"),
"traces": format("model/transaction/_meta/fields.yml", "model/span/_meta/fields.yml"),
}
inputFieldsFiles["metrics"] = filterInternalMetrics(inputFieldsFiles["internal_metrics"])
inputFieldsFiles["app_metrics"] = filterInternalMetrics(inputFieldsFiles["internal_metrics"])

for streamType, inputFields := range inputFieldsFiles {
var ecsFields []field
Expand Down
89 changes: 80 additions & 9 deletions apmpackage/cmd/gen-package/genpipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,70 @@ package main

import (
"encoding/json"
"errors"
"io/ioutil"
"os"
"path/filepath"

"github.com/elastic/apm-server/model"
)

var streamMappings = map[string]string{
"error_logs": "logs-" + model.ErrorsDataset,
"traces": "traces-" + model.TracesDataset,
"app_metrics": "metrics-" + model.AppMetricsDataset,
"internal_metrics": "metrics-" + model.InternalMetricsDataset,
"profile_metrics": "metrics-" + model.ProfilesDataset,
}

type PipelineDef struct {
ID string `json:"id"`
Body PipelineBody `json:"body"`
}

type PipelineBody struct {
Description string `json:"description"`
Processors []Processor `json:"processors"`
}

type Processor struct {
Pipeline *Pipeline `json:"pipeline,omitempty"`
m map[string]interface{}
}

type Pipeline struct {
Name string `json:"name"`
}

type _Processor Processor

func (p *Processor) UnmarshalJSON(bytes []byte) error {
aux := _Processor{}
err := json.Unmarshal(bytes, &aux)
if err != nil {
return err
}

*p = Processor(aux)
m := make(map[string]interface{})

err = json.Unmarshal(bytes, &m)
if err != nil {
return err
}
delete(m, "pipeline")
p.m = m
return nil
}

func (p *Processor) MarshalJSON() ([]byte, error) {
aux := _Processor(*p)
if p.Pipeline != nil {
return json.Marshal(aux)
}
return json.Marshal(p.m)
}

func generatePipelines(version, dataStream string) {
pipelines, err := os.Open("ingest/pipeline/definition.json")
if err != nil {
Expand All @@ -36,29 +95,41 @@ func generatePipelines(version, dataStream string) {
panic(err)
}

var definitions = make([]map[string]interface{}, 0)
var definitions = make([]PipelineDef, 0)
err = json.Unmarshal(bytes, &definitions)
if err != nil {
panic(err)
}

os.MkdirAll(pipelinesPath(version, dataStream), 0755)

var apmPipeline PipelineBody
for _, definition := range definitions {
pipeline, ok := definition["body"]
if !ok {
continue
}
id, ok := definition["id"]
if !ok {
pipeline := definition.Body
if definition.ID == "apm" {
apmPipeline = pipeline
continue
}

out, err := json.MarshalIndent(pipeline, "", " ")
if err != nil {
panic(err)
}
fName := filepath.Join(pipelinesPath(version, dataStream), id.(string)+".json")
fName := filepath.Join(pipelinesPath(version, dataStream), definition.ID+".json")
ioutil.WriteFile(fName, out, 0644)
}

for _, p := range apmPipeline.Processors {
if p.Pipeline == nil {
// should not happen, lets panic loudly
panic(errors.New("expected pipeline processor"))
}
// name is updated to match the one generated by Fleet when installs the pipelines
p.Pipeline.Name = streamMappings[dataStream] + "-" + version + "-" + p.Pipeline.Name
}
out, err := json.MarshalIndent(apmPipeline, "", " ")
if err != nil {
panic(err)
}
fName := filepath.Join(pipelinesPath(version, dataStream), "default.json")
ioutil.WriteFile(fName, out, 0644)
}
Loading

0 comments on commit 2f3eb92

Please sign in to comment.