diff --git a/beater/api/profile/convert.go b/beater/api/profile/convert.go index 3c0248a5c39..15c1f58de09 100644 --- a/beater/api/profile/convert.go +++ b/beater/api/profile/convert.go @@ -87,6 +87,7 @@ func appendProfileSampleBatch(pp *profile.Profile, baseEvent model.APMEvent, out } event := baseEvent + event.Processor = model.ProfileProcessor event.Labels = event.Labels.Clone() if n := len(sample.Label); n > 0 { for k, v := range sample.Label { diff --git a/beater/beater.go b/beater/beater.go index 296dbbc2856..5d2a84cbdad 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/transport" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/go-ucfg" "github.com/pkg/errors" @@ -479,6 +480,7 @@ func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) R modelprocessor.SetErrorMessage{}, newObserverBatchProcessor(s.beat.Info), model.ProcessBatchFunc(ecsVersionBatchProcessor), + modelprocessor.NewEventCounter(monitoring.Default.GetRegistry("apm-server")), } if s.config.DefaultServiceEnvironment != "" { processors = append(processors, &modelprocessor.SetDefaultServiceEnvironment{ diff --git a/model/apmevent.go b/model/apmevent.go index 7715585a41d..10abbedef97 100644 --- a/model/apmevent.go +++ b/model/apmevent.go @@ -53,6 +53,7 @@ type APMEvent struct { Network Network Session Session URL URL + Processor Processor Trace Trace // Timestamp holds the event timestamp. @@ -91,7 +92,8 @@ func (e *APMEvent) BeatEvent(ctx context.Context) beat.Event { event.Fields = e.Error.fields() case e.ProfileSample != nil: event.Fields = e.ProfileSample.fields() - default: + } + if event.Fields == nil { event.Fields = make(common.MapStr) } @@ -133,6 +135,7 @@ func (e *APMEvent) BeatEvent(ctx context.Context) beat.Event { fields.maybeSetMapStr("event", e.Event.fields()) fields.maybeSetMapStr("url", e.URL.fields()) fields.maybeSetMapStr("session", e.Session.fields()) + fields.maybeSetMapStr("processor", e.Processor.fields()) fields.maybeSetMapStr("trace", e.Trace.fields()) fields.maybeSetString("message", e.Message) return event diff --git a/model/apmevent_test.go b/model/apmevent_test.go index 456689b0372..0900fb41c1d 100644 --- a/model/apmevent_test.go +++ b/model/apmevent_test.go @@ -74,6 +74,7 @@ func TestAPMEventFields(t *testing.T) { Message: "bottle", Transaction: &Transaction{}, Timestamp: time.Date(2019, 1, 3, 15, 17, 4, 908.596*1e6, time.FixedZone("+0100", 3600)), + Processor: Processor{Name: "processor_name", Event: "processor_event"}, Trace: Trace{ID: traceID}, }, output: common.MapStr{ @@ -107,12 +108,12 @@ func TestAPMEventFields(t *testing.T) { "trace": common.MapStr{ "id": traceID, }, - - // fields related to APMEvent.Transaction "processor": common.MapStr{ - "name": "transaction", - "event": "transaction", + "name": "processor_name", + "event": "processor_event", }, + + // fields related to APMEvent.Transaction "timestamp": common.MapStr{"us": int64(1546525024908596)}, "transaction": common.MapStr{ "duration": common.MapStr{"us": 0}, diff --git a/model/error.go b/model/error.go index 2458d6e1fe9..0fe6fabe84e 100644 --- a/model/error.go +++ b/model/error.go @@ -19,21 +19,15 @@ package model import ( "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/monitoring" ) var ( - errorMetrics = monitoring.Default.NewRegistry("apm-server.processor.error") - errorTransformations = monitoring.NewInt(errorMetrics, "transformations") - errorStacktraceCounter = monitoring.NewInt(errorMetrics, "stacktraces") - errorFrameCounter = monitoring.NewInt(errorMetrics, "frames") - errorProcessorEntry = common.MapStr{"name": errorProcessorName, "event": errorDocType} + // ErrorProcessor is the Processor value that should be assigned to error events. + ErrorProcessor = Processor{Name: "error", Event: "error"} ) const ( - errorProcessorName = "error" - errorDocType = "error" - ErrorsDataset = "apm.error" + ErrorsDataset = "apm.error" ) type Error struct { @@ -76,16 +70,7 @@ type Log struct { } func (e *Error) fields() common.MapStr { - errorTransformations.Inc() - - if e.Exception != nil { - addStacktraceCounter(e.Exception.Stacktrace) - } - if e.Log != nil { - addStacktraceCounter(e.Log.Stacktrace) - } - - fields := mapStr{"processor": errorProcessorEntry} + var fields mapStr if e.HTTP != nil { fields.maybeSetMapStr("http", e.HTTP.transactionTopLevelFields()) } @@ -161,13 +146,6 @@ func (e *Error) logFields() common.MapStr { return common.MapStr(log) } -func addStacktraceCounter(st Stacktrace) { - if frames := len(st); frames > 0 { - errorStacktraceCounter.Inc() - errorFrameCounter.Add(int64(frames)) - } -} - // flattenExceptionTree recursively traverses the causes of an exception to return a slice of exceptions. // Tree traversal is Depth First. // The parent of a exception in the resulting slice is at the position indicated by the `parent` property diff --git a/model/metricset.go b/model/metricset.go index 6a44b633605..94f9724a771 100644 --- a/model/metricset.go +++ b/model/metricset.go @@ -19,23 +19,16 @@ package model import ( "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/monitoring" ) const ( - metricsetProcessorName = "metric" - metricsetDocType = "metric" - metricsetEventKey = "event" - metricsetTransactionKey = "transaction" - metricsetSpanKey = "span" - AppMetricsDataset = "apm.app" - InternalMetricsDataset = "apm.internal" + AppMetricsDataset = "apm.app" + InternalMetricsDataset = "apm.internal" ) var ( - metricsetMetrics = monitoring.Default.NewRegistry("apm-server.processor.metric") - metricsetTransformations = monitoring.NewInt(metricsetMetrics, "transformations") - metricsetProcessorEntry = common.MapStr{"name": metricsetProcessorName, "event": metricsetDocType} + // MetricsetProcessor is the Processor value that should be assigned to metricset events. + MetricsetProcessor = Processor{Name: "metric", Event: "metric"} ) // MetricType describes the type of a metric: gauge, counter, or histogram. @@ -141,13 +134,9 @@ type MetricsetSpan struct { } func (me *Metricset) fields() common.MapStr { - metricsetTransformations.Inc() - var fields mapStr - fields.set("processor", metricsetProcessorEntry) - - fields.maybeSetMapStr(metricsetTransactionKey, me.Transaction.fields()) - fields.maybeSetMapStr(metricsetSpanKey, me.Span.fields()) + fields.maybeSetMapStr("transaction", me.Transaction.fields()) + fields.maybeSetMapStr("span", me.Span.fields()) if me.TimeseriesInstanceID != "" { fields.set("timeseries", common.MapStr{"instance": me.TimeseriesInstanceID}) } @@ -158,7 +147,7 @@ func (me *Metricset) fields() common.MapStr { var metricDescriptions mapStr for name, sample := range me.Samples { - sample.set(name, fields) + sample.set(name, &fields) var md mapStr md.maybeSetString("type", string(sample.Type)) @@ -190,7 +179,7 @@ func (s *MetricsetSpan) fields() common.MapStr { return common.MapStr(fields) } -func (s *MetricsetSample) set(name string, fields mapStr) { +func (s *MetricsetSample) set(name string, fields *mapStr) { if s.Type == MetricTypeHistogram { fields.set(name, common.MapStr{ "counts": s.Counts, diff --git a/model/metricset_test.go b/model/metricset_test.go index 28ceb2dcbeb..38476984ce4 100644 --- a/model/metricset_test.go +++ b/model/metricset_test.go @@ -46,15 +46,12 @@ func TestMetricset(t *testing.T) { }{ { Metricset: &Metricset{}, - Output: common.MapStr{ - "processor": common.MapStr{"event": "metric", "name": "metric"}, - }, - Msg: "Payload with empty metric.", + Output: common.MapStr{}, + Msg: "Payload with empty metric.", }, { Metricset: &Metricset{Name: "raj"}, Output: common.MapStr{ - "processor": common.MapStr{"event": "metric", "name": "metric"}, "metricset.name": "raj", }, Msg: "Payload with metricset name.", @@ -67,7 +64,6 @@ func TestMetricset(t *testing.T) { }, }, Output: common.MapStr{ - "processor": common.MapStr{"event": "metric", "name": "metric"}, "a.counter": 612.0, "some.gauge": 9.16, }, @@ -82,7 +78,6 @@ func TestMetricset(t *testing.T) { }, }, Output: common.MapStr{ - "processor": common.MapStr{"event": "metric", "name": "metric"}, "transaction": common.MapStr{"type": trType, "name": trName}, "span": common.MapStr{ "type": spType, "subtype": spSubtype, @@ -111,7 +106,6 @@ func TestMetricset(t *testing.T) { DocCount: 6, }, Output: common.MapStr{ - "processor": common.MapStr{"event": "metric", "name": "metric"}, "timeseries": common.MapStr{"instance": "foo"}, "transaction": common.MapStr{ "type": trType, @@ -143,7 +137,6 @@ func TestMetricset(t *testing.T) { }, }, Output: common.MapStr{ - "processor": common.MapStr{"event": "metric", "name": "metric"}, "span": common.MapStr{ "type": spType, "subtype": spSubtype, "destination": common.MapStr{"service": common.MapStr{"resource": resource}}, @@ -173,7 +166,6 @@ func TestMetricset(t *testing.T) { }, }, Output: common.MapStr{ - "processor": common.MapStr{"event": "metric", "name": "metric"}, "latency_histogram": common.MapStr{ "counts": []int64{1, 2, 3}, "values": []float64{1.1, 2.2, 3.3}, diff --git a/model/modeldecoder/rumv3/decoder.go b/model/modeldecoder/rumv3/decoder.go index 8be60dca7ce..ad4001871ad 100644 --- a/model/modeldecoder/rumv3/decoder.go +++ b/model/modeldecoder/rumv3/decoder.go @@ -161,20 +161,20 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch *batch = append(*batch, transaction) for _, m := range root.Transaction.Metricsets { - metricset := input.Base - mapToMetricsetModel(&m, &metricset) - metricset.Metricset.Transaction.Name = transaction.Transaction.Name - metricset.Metricset.Transaction.Type = transaction.Transaction.Type - *batch = append(*batch, metricset) + event := input.Base + mapToMetricsetModel(&m, &event) + event.Metricset.Transaction.Name = transaction.Transaction.Name + event.Metricset.Transaction.Type = transaction.Transaction.Type + *batch = append(*batch, event) } offset := len(*batch) for _, s := range root.Transaction.Spans { - span := input.Base - mapToSpanModel(&s, &span) - span.Span.TransactionID = transaction.Transaction.ID - span.Trace = transaction.Trace - *batch = append(*batch, span) + event := input.Base + mapToSpanModel(&s, &event) + event.Span.TransactionID = transaction.Transaction.ID + event.Trace = transaction.Trace + *batch = append(*batch, event) } spans := (*batch)[offset:] for i, s := range root.Transaction.Spans { @@ -190,6 +190,7 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch func mapToErrorModel(from *errorEvent, event *model.APMEvent) { out := &model.Error{} event.Error = out + event.Processor = model.ErrorProcessor // overwrite metadata with event specific information mapToServiceModel(from.Context.Service, &event.Service) @@ -391,6 +392,7 @@ func mapToMetadataModel(m *metadata, out *model.APMEvent) { func mapToMetricsetModel(from *metricset, event *model.APMEvent) { out := &model.Metricset{} event.Metricset = out + event.Processor = model.MetricsetProcessor // map samples information if from.Samples.IsSet() { @@ -512,6 +514,7 @@ func mapToAgentModel(from contextServiceAgent, out *model.Agent) { func mapToSpanModel(from *span, event *model.APMEvent) { out := &model.Span{} event.Span = out + event.Processor = model.SpanProcessor // map span specific data if !from.Action.IsSet() && !from.Subtype.IsSet() { @@ -690,6 +693,7 @@ func mapToStracktraceModel(from []stacktraceFrame, out model.Stacktrace) { func mapToTransactionModel(from *transaction, event *model.APMEvent) { out := &model.Transaction{} event.Transaction = out + event.Processor = model.TransactionProcessor // overwrite metadata with event specific information mapToServiceModel(from.Context.Service, &event.Service) diff --git a/model/modeldecoder/rumv3/metadata_test.go b/model/modeldecoder/rumv3/metadata_test.go index 14b8da2ab50..4157f3a99a2 100644 --- a/model/modeldecoder/rumv3/metadata_test.go +++ b/model/modeldecoder/rumv3/metadata_test.go @@ -60,6 +60,7 @@ func metadataExceptions(keys ...string) func(key string) bool { "Network", "Observer", "Process", + "Processor", "Service.Node", "Service.Agent.EphemeralID", "Host", diff --git a/model/modeldecoder/v2/decoder.go b/model/modeldecoder/v2/decoder.go index cb242bdaa58..120291c1692 100644 --- a/model/modeldecoder/v2/decoder.go +++ b/model/modeldecoder/v2/decoder.go @@ -240,6 +240,7 @@ func mapToClientModel(from contextRequest, out *model.Client) { func mapToErrorModel(from *errorEvent, config modeldecoder.Config, event *model.APMEvent) { out := &model.Error{} event.Error = out + event.Processor = model.ErrorProcessor // overwrite metadata with event specific information mapToServiceModel(from.Context.Service, &event.Service) @@ -538,6 +539,7 @@ func mapToMetadataModel(from *metadata, out *model.APMEvent) { func mapToMetricsetModel(from *metricset, config modeldecoder.Config, event *model.APMEvent) { out := &model.Metricset{} event.Metricset = out + event.Processor = model.MetricsetProcessor if !from.Timestamp.Val.IsZero() { event.Timestamp = from.Timestamp.Val @@ -725,6 +727,7 @@ func mapToAgentModel(from contextServiceAgent, out *model.Agent) { func mapToSpanModel(from *span, config modeldecoder.Config, event *model.APMEvent) { out := &model.Span{} event.Span = out + event.Processor = model.SpanProcessor // map span specific data if !from.Action.IsSet() && !from.Subtype.IsSet() { @@ -987,6 +990,7 @@ func mapToStracktraceModel(from []stacktraceFrame, out model.Stacktrace) { func mapToTransactionModel(from *transaction, config modeldecoder.Config, event *model.APMEvent) { out := &model.Transaction{} + event.Processor = model.TransactionProcessor event.Transaction = out // overwrite metadata with event specific information diff --git a/model/modeldecoder/v2/metadata_test.go b/model/modeldecoder/v2/metadata_test.go index 1c4c36007e8..666daa5c0eb 100644 --- a/model/modeldecoder/v2/metadata_test.go +++ b/model/modeldecoder/v2/metadata_test.go @@ -74,6 +74,9 @@ func isUnmappedMetadataField(key string) bool { "Observer.VersionMajor", "Process.CommandLine", "Process.Executable", + "Processor", + "Processor.Event", + "Processor.Name", "Host.OS.Full", "Host.OS.Type", "Host.ID", diff --git a/model/modelprocessor/datastream.go b/model/modelprocessor/datastream.go index 96af159c376..375ed68e999 100644 --- a/model/modelprocessor/datastream.go +++ b/model/modelprocessor/datastream.go @@ -39,14 +39,14 @@ func (s *SetDataStream) ProcessBatch(ctx context.Context, b *model.Batch) error } func (s *SetDataStream) setDataStream(event *model.APMEvent) { - switch { - case event.Transaction != nil || event.Span != nil: + switch event.Processor { + case model.SpanProcessor, model.TransactionProcessor: event.DataStream.Type = datastreams.TracesType event.DataStream.Dataset = model.TracesDataset - case event.Error != nil: + case model.ErrorProcessor: event.DataStream.Type = datastreams.LogsType event.DataStream.Dataset = model.ErrorsDataset - case event.Metricset != nil: + case model.MetricsetProcessor: event.DataStream.Type = datastreams.MetricsType // Metrics that include well-defined transaction/span fields // (i.e. breakdown metrics, transaction and span metrics) will @@ -58,7 +58,7 @@ func (s *SetDataStream) setDataStream(event *model.APMEvent) { datastreams.NormalizeServiceName(event.Service.Name), ) } - case event.ProfileSample != nil: + case model.ProfileProcessor: event.DataStream.Type = datastreams.MetricsType event.DataStream.Dataset = model.ProfilesDataset } diff --git a/model/modelprocessor/datastream_test.go b/model/modelprocessor/datastream_test.go index 01d151c0e0c..eef40765741 100644 --- a/model/modelprocessor/datastream_test.go +++ b/model/modelprocessor/datastream_test.go @@ -35,17 +35,18 @@ func TestSetDataStream(t *testing.T) { input: model.APMEvent{}, output: model.DataStream{Namespace: "custom"}, }, { - input: model.APMEvent{Transaction: &model.Transaction{}}, + input: model.APMEvent{Processor: model.TransactionProcessor}, output: model.DataStream{Type: "traces", Dataset: "apm", Namespace: "custom"}, }, { - input: model.APMEvent{Span: &model.Span{}}, + input: model.APMEvent{Processor: model.SpanProcessor}, output: model.DataStream{Type: "traces", Dataset: "apm", Namespace: "custom"}, }, { - input: model.APMEvent{Error: &model.Error{}}, + input: model.APMEvent{Processor: model.ErrorProcessor}, output: model.DataStream{Type: "logs", Dataset: "apm.error", Namespace: "custom"}, }, { input: model.APMEvent{ - Service: model.Service{Name: "service-name"}, + Processor: model.MetricsetProcessor, + Service: model.Service{Name: "service-name"}, Metricset: &model.Metricset{ Transaction: model.MetricsetTransaction{Name: "foo"}, }, @@ -53,12 +54,16 @@ func TestSetDataStream(t *testing.T) { output: model.DataStream{Type: "metrics", Dataset: "apm.internal", Namespace: "custom"}, }, { input: model.APMEvent{ + Processor: model.MetricsetProcessor, Service: model.Service{Name: "service-name"}, Metricset: &model.Metricset{}, }, output: model.DataStream{Type: "metrics", Dataset: "apm.app.service_name", Namespace: "custom"}, }, { - input: model.APMEvent{ProfileSample: &model.ProfileSample{}}, + input: model.APMEvent{ + Processor: model.ProfileProcessor, + ProfileSample: &model.ProfileSample{}, + }, output: model.DataStream{Type: "metrics", Dataset: "apm.profiling", Namespace: "custom"}, }} diff --git a/model/modelprocessor/eventcounter.go b/model/modelprocessor/eventcounter.go new file mode 100644 index 00000000000..5f02e915a54 --- /dev/null +++ b/model/modelprocessor/eventcounter.go @@ -0,0 +1,79 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package modelprocessor + +import ( + "context" + "sync" + + "github.com/elastic/beats/v7/libbeat/monitoring" + + "github.com/elastic/apm-server/model" +) + +// EventCounter is a model.BatchProcessor that counts the number of events processed, +// recording the counts as metrics in a monitoring.Registry. +// +// Metrics are named after the event type: `processor..transformations`. +// These metrics are used to populate the "Processed Events" graphs in Stack Monitoring. +type EventCounter struct { + registry *monitoring.Registry + + mu sync.RWMutex + eventCounters map[string]*monitoring.Int +} + +// NewEventCounter returns an EventCounter that counts events processed, recording +// them as `.transformations` under the given registry. +func NewEventCounter(registry *monitoring.Registry) *EventCounter { + return &EventCounter{ + registry: registry, + eventCounters: make(map[string]*monitoring.Int), + } +} + +// ProcessBatch counts events in b, grouping by APMEvent.Processor.Event. +func (c *EventCounter) ProcessBatch(ctx context.Context, b *model.Batch) error { + for _, event := range *b { + pe := event.Processor.Event + if pe == "" { + continue + } + c.mu.RLock() + eventCounter := c.eventCounters[pe] + c.mu.RUnlock() + if eventCounter == nil { + c.mu.Lock() + eventCounter = c.eventCounters[pe] + if eventCounter == nil { + // Metric may exist in the registry but not in our map, + // so first check if it exists before attempting to create. + name := "processor." + pe + ".transformations" + var ok bool + eventCounter, ok = c.registry.Get(name).(*monitoring.Int) + if !ok { + eventCounter = monitoring.NewInt(c.registry, name) + } + c.eventCounters[pe] = eventCounter + } + c.mu.Unlock() + } + eventCounter.Inc() + } + return nil +} diff --git a/model/modelprocessor/eventcounter_test.go b/model/modelprocessor/eventcounter_test.go new file mode 100644 index 00000000000..92b073e79af --- /dev/null +++ b/model/modelprocessor/eventcounter_test.go @@ -0,0 +1,51 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package modelprocessor_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/monitoring" + + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/model/modelprocessor" +) + +func TestEventCounter(t *testing.T) { + batch := model.Batch{ + {}, + {Processor: model.TransactionProcessor}, + {Processor: model.SpanProcessor}, + {Processor: model.TransactionProcessor}, + } + + expected := monitoring.MakeFlatSnapshot() + expected.Ints["processor.span.transformations"] = 1 + expected.Ints["processor.transaction.transformations"] = 2 + + registry := monitoring.NewRegistry() + processor := modelprocessor.NewEventCounter(registry) + err := processor.ProcessBatch(context.Background(), &batch) + assert.NoError(t, err) + snapshot := monitoring.CollectFlatSnapshot(registry, monitoring.Full, false) + assert.Equal(t, expected, snapshot) + +} diff --git a/model/processor.go b/model/processor.go new file mode 100644 index 00000000000..fb5116b25e1 --- /dev/null +++ b/model/processor.go @@ -0,0 +1,38 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package model + +import ( + "github.com/elastic/beats/v7/libbeat/common" +) + +// Processor identifies an event type, and is used for routing events +// to the appropriate data stream or index. +// +// TODO(axw) this should be replaced with ECS event categorisation fields. +type Processor struct { + Name string + Event string +} + +func (p *Processor) fields() common.MapStr { + var fields mapStr + fields.maybeSetString("name", p.Name) + fields.maybeSetString("event", p.Event) + return common.MapStr(fields) +} diff --git a/model/profile.go b/model/profile.go index bb2d4a6fbc4..b89e3766b20 100644 --- a/model/profile.go +++ b/model/profile.go @@ -24,15 +24,11 @@ import ( ) const ( - profileProcessorName = "profile" - profileDocType = "profile" - ProfilesDataset = "apm.profiling" + ProfilesDataset = "apm.profiling" ) -var profileProcessorEntry = common.MapStr{ - "name": profileProcessorName, - "event": profileDocType, -} +// ProfileProcessor is the Processor value that should be assigned to profile events. +var ProfileProcessor = Processor{Name: "profile", Event: "profile"} // ProfileSample holds a profiling sample. type ProfileSample struct { @@ -78,8 +74,5 @@ func (p *ProfileSample) fields() common.MapStr { profileFields.set(k, v) } - return common.MapStr{ - "processor": profileProcessorEntry, - profileDocType: common.MapStr(profileFields), - } + return common.MapStr{"profile": common.MapStr(profileFields)} } diff --git a/model/profile_test.go b/model/profile_test.go index dda914e53dc..3d24f11a3a9 100644 --- a/model/profile_test.go +++ b/model/profile_test.go @@ -67,7 +67,6 @@ func TestProfileSampleTransform(t *testing.T) { assert.Equal(t, beat.Event{ Timestamp: timestamp, Fields: common.MapStr{ - "processor": common.MapStr{"event": "profile", "name": "profile"}, "profile": common.MapStr{ "id": "profile_id", "duration": int64(10 * time.Second), diff --git a/model/span.go b/model/span.go index a3a9ce92e4b..0c90319259c 100644 --- a/model/span.go +++ b/model/span.go @@ -19,21 +19,13 @@ package model import ( "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/apm-server/utility" ) -const ( - spanDocType = "span" -) - var ( - spanMetrics = monitoring.Default.NewRegistry("apm-server.processor.span") - spanTransformations = monitoring.NewInt(spanMetrics, "transformations") - spanStacktraceCounter = monitoring.NewInt(spanMetrics, "stacktraces") - spanFrameCounter = monitoring.NewInt(spanMetrics, "frames") - spanProcessorEntry = common.MapStr{"name": "transaction", "event": spanDocType} + // SpanProcessor is the Processor value that should be assigned to span events. + SpanProcessor = Processor{Name: "transaction", Event: "span"} ) type Span struct { @@ -132,14 +124,7 @@ func (c *Composite) fields() common.MapStr { } func (e *Span) fields(apmEvent *APMEvent) common.MapStr { - spanTransformations.Inc() - if frames := len(e.Stacktrace); frames > 0 { - spanStacktraceCounter.Inc() - spanFrameCounter.Add(int64(frames)) - } - - fields := mapStr{"processor": spanProcessorEntry} - + var fields mapStr var transaction, parent mapStr if transaction.maybeSetString("id", e.TransactionID) { fields.set("transaction", common.MapStr(transaction)) @@ -187,6 +172,5 @@ func (e *Span) fields(apmEvent *APMEvent) common.MapStr { span.set("stacktrace", st) } fields.set("span", common.MapStr(span)) - return common.MapStr(fields) } diff --git a/model/span_test.go b/model/span_test.go index b1f4c8419ef..ec2c50554bd 100644 --- a/model/span_test.go +++ b/model/span_test.go @@ -49,7 +49,6 @@ func TestSpanTransform(t *testing.T) { Msg: "Span without a Stacktrace", Span: Span{}, Output: common.MapStr{ - "processor": common.MapStr{"event": "span", "name": "transaction"}, "span": common.MapStr{ "duration": common.MapStr{"us": 0}, "name": "", @@ -132,7 +131,6 @@ func TestSpanTransform(t *testing.T) { "compression_strategy": "exact_match", }, }, - "processor": common.MapStr{"event": "span", "name": "transaction"}, "timestamp": common.MapStr{"us": timestampUs}, "parent": common.MapStr{"id": parentID}, "http": common.MapStr{ diff --git a/model/transaction.go b/model/transaction.go index 9a7fd50765e..ea3935b0c34 100644 --- a/model/transaction.go +++ b/model/transaction.go @@ -19,21 +19,17 @@ package model import ( "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/apm-server/utility" ) const ( - transactionProcessorName = "transaction" - transactionDocType = "transaction" - TracesDataset = "apm" + TracesDataset = "apm" ) var ( - transactionMetrics = monitoring.Default.NewRegistry("apm-server.processor.transaction") - transactionTransformations = monitoring.NewInt(transactionMetrics, "transformations") - transactionProcessorEntry = common.MapStr{"name": transactionProcessorName, "event": transactionDocType} + // TransactionProcessor is the Processor value that should be assigned to transaction events. + TransactionProcessor = Processor{Name: "transaction", Event: "transaction"} ) type Transaction struct { @@ -67,12 +63,7 @@ type SpanCount struct { } func (e *Transaction) fields() common.MapStr { - transactionTransformations.Inc() - - fields := mapStr{ - "processor": transactionProcessorEntry, - } - + var fields mapStr var parent mapStr parent.maybeSetString("id", e.ParentID) fields.maybeSetMapStr("parent", common.MapStr(parent)) diff --git a/model/transaction_test.go b/model/transaction_test.go index 4a8eea22e09..1df41514813 100644 --- a/model/transaction_test.go +++ b/model/transaction_test.go @@ -172,10 +172,6 @@ func TestEventsTransformWithMetadata(t *testing.T) { "platform": "x64", }, }, - "processor": common.MapStr{ - "event": "transaction", - "name": "transaction", - }, "service": common.MapStr{ "name": serviceName, "version": serviceVersion, diff --git a/processor/otel/exceptions_test.go b/processor/otel/exceptions_test.go index 9a547613210..395e9009267 100644 --- a/processor/otel/exceptions_test.go +++ b/processor/otel/exceptions_test.go @@ -114,6 +114,7 @@ Caused by: LowLevelException Service: service, Agent: agent, Timestamp: timestamp, + Processor: model.ErrorProcessor, Trace: transactionEvent.Trace, Error: &model.Error{ ParentID: transactionEvent.Transaction.ID, @@ -161,6 +162,7 @@ Caused by: LowLevelException Service: service, Agent: agent, Timestamp: timestamp, + Processor: model.ErrorProcessor, Trace: transactionEvent.Trace, Error: &model.Error{ ParentID: transactionEvent.Transaction.ID, @@ -317,6 +319,7 @@ func TestEncodeSpanEventsNonJavaExceptions(t *testing.T) { Service: service, Agent: agent, Timestamp: timestamp, + Processor: model.ErrorProcessor, Trace: transactionEvent.Trace, Error: &model.Error{ ParentID: transactionEvent.Transaction.ID, diff --git a/processor/otel/metadata_test.go b/processor/otel/metadata_test.go index 9c9c66fe21b..2a43c9344d7 100644 --- a/processor/otel/metadata_test.go +++ b/processor/otel/metadata_test.go @@ -237,5 +237,6 @@ func transformResourceMetadata(t *testing.T, resourceAttrs map[string]pdata.Attr events[0].Trace = model.Trace{} events[0].Event.Outcome = "" events[0].Timestamp = time.Time{} + events[0].Processor = model.Processor{} return events[0] } diff --git a/processor/otel/metrics.go b/processor/otel/metrics.go index d8d58f1d903..184aa45f187 100644 --- a/processor/otel/metrics.go +++ b/processor/otel/metrics.go @@ -105,6 +105,7 @@ func (c *Consumer) convertInstrumentationLibraryMetrics( } for _, m := range ms { event := baseEvent + event.Processor = model.MetricsetProcessor event.Metricset = m.Metricset event.Timestamp = m.timestamp.Add(timeDelta) if n := len(m.labels); n > 0 { diff --git a/processor/otel/metrics_test.go b/processor/otel/metrics_test.go index 453760a1e47..ec1b75d897d 100644 --- a/processor/otel/metrics_test.go +++ b/processor/otel/metrics_test.go @@ -164,6 +164,7 @@ func TestConsumeMetrics(t *testing.T) { Agent: agent, Service: service, Timestamp: timestamp0, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Samples: map[string]model.MetricsetSample{ "int_gauge_metric": {Value: 1, Type: "gauge"}, @@ -186,6 +187,7 @@ func TestConsumeMetrics(t *testing.T) { Agent: agent, Service: service, Timestamp: timestamp1, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Samples: map[string]model.MetricsetSample{ "int_gauge_metric": {Value: 3, Type: "gauge"}, @@ -197,6 +199,7 @@ func TestConsumeMetrics(t *testing.T) { Service: service, Labels: common.MapStr{"k": "v"}, Timestamp: timestamp1, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Samples: map[string]model.MetricsetSample{ "int_gauge_metric": {Value: 2, Type: "gauge"}, @@ -210,6 +213,7 @@ func TestConsumeMetrics(t *testing.T) { Service: service, Labels: common.MapStr{"k": "v2"}, Timestamp: timestamp1, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Samples: map[string]model.MetricsetSample{ "int_gauge_metric": {Value: 4, Type: "gauge"}, @@ -221,6 +225,7 @@ func TestConsumeMetrics(t *testing.T) { Service: service, Labels: common.MapStr{"k2": "v"}, Timestamp: timestamp1, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Samples: map[string]model.MetricsetSample{ "int_sum_metric": {Value: 11, Type: "counter"}, @@ -270,6 +275,7 @@ func TestConsumeMetrics_JVM(t *testing.T) { Agent: agent, Service: service, Timestamp: timestamp, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Samples: map[string]model.MetricsetSample{ "jvm.memory.heap.used": { @@ -283,6 +289,7 @@ func TestConsumeMetrics_JVM(t *testing.T) { Service: service, Labels: common.MapStr{"gc": "G1 Young Generation"}, Timestamp: timestamp, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Samples: map[string]model.MetricsetSample{ "runtime.jvm.gc.time": { @@ -300,6 +307,7 @@ func TestConsumeMetrics_JVM(t *testing.T) { Service: service, Labels: common.MapStr{"name": "G1 Young Generation"}, Timestamp: timestamp, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Samples: map[string]model.MetricsetSample{ "jvm.gc.time": { @@ -317,6 +325,7 @@ func TestConsumeMetrics_JVM(t *testing.T) { Service: service, Labels: common.MapStr{"area": "heap", "type": "used"}, Timestamp: timestamp, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Samples: map[string]model.MetricsetSample{ "runtime.jvm.memory.area": { diff --git a/processor/otel/traces.go b/processor/otel/traces.go index 7c010f4845b..dbd0391791d 100644 --- a/processor/otel/traces.go +++ b/processor/otel/traces.go @@ -210,6 +210,7 @@ func (c *Consumer) convertSpan( event.Trace.ID = otelSpan.TraceID().HexString() event.Event.Outcome = spanStatusOutcome(otelSpan.Status()) if root || otelSpan.Kind() == pdata.SpanKindServer || otelSpan.Kind() == pdata.SpanKindConsumer { + event.Processor = model.TransactionProcessor event.Transaction = &model.Transaction{ ID: spanID, ParentID: parentID, @@ -219,6 +220,7 @@ func (c *Consumer) convertSpan( } translateTransaction(otelSpan, otelLibrary, &event) } else { + event.Processor = model.SpanProcessor event.Span = &model.Span{ ID: spanID, ParentID: parentID, @@ -874,6 +876,7 @@ func convertSpanEvent( } if e != nil { event := parent + event.Processor = model.ErrorProcessor event.Error = e event.Timestamp = spanEvent.Timestamp().AsTime().Add(timeDelta) if parent.Transaction != nil { diff --git a/x-pack/apm-server/aggregation/spanmetrics/aggregator.go b/x-pack/apm-server/aggregation/spanmetrics/aggregator.go index 8eef44523fe..dc80be5e347 100644 --- a/x-pack/apm-server/aggregation/spanmetrics/aggregator.go +++ b/x-pack/apm-server/aggregation/spanmetrics/aggregator.go @@ -177,7 +177,7 @@ func (a *Aggregator) ProcessBatch(ctx context.Context, b *model.Batch) error { a.mu.RLock() defer a.mu.RUnlock() for _, event := range *b { - if event.Span == nil { + if event.Processor != model.SpanProcessor { continue } if metricsetEvent := a.processSpan(&event); metricsetEvent.Metricset != nil { @@ -277,6 +277,7 @@ func makeMetricset(timestamp time.Time, key aggregationKey, metrics spanMetrics, Event: model.Event{ Outcome: key.outcome, }, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Name: metricsetName, Span: model.MetricsetSpan{ diff --git a/x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go b/x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go index e0a750fce1f..f3a52206d26 100644 --- a/x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go +++ b/x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go @@ -115,9 +115,10 @@ func TestAggregatorRun(t *testing.T) { metricsets := batchMetricsets(t, batch) assert.ElementsMatch(t, []model.APMEvent{{ - Agent: model.Agent{Name: "java"}, - Service: model.Service{Name: "service-A"}, - Event: model.Event{Outcome: "success"}, + Agent: model.Agent{Name: "java"}, + Service: model.Service{Name: "service-A"}, + Event: model.Event{Outcome: "success"}, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Name: "service_destination", Span: model.MetricsetSpan{ @@ -130,9 +131,10 @@ func TestAggregatorRun(t *testing.T) { }, }, }, { - Agent: model.Agent{Name: "java"}, - Service: model.Service{Name: "service-A"}, - Event: model.Event{Outcome: "failure"}, + Agent: model.Agent{Name: "java"}, + Service: model.Service{Name: "service-A"}, + Event: model.Event{Outcome: "failure"}, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Name: "service_destination", Span: model.MetricsetSpan{ @@ -145,9 +147,10 @@ func TestAggregatorRun(t *testing.T) { }, }, }, { - Agent: model.Agent{Name: "java"}, - Service: model.Service{Name: "service-A"}, - Event: model.Event{Outcome: "success"}, + Agent: model.Agent{Name: "java"}, + Service: model.Service{Name: "service-A"}, + Event: model.Event{Outcome: "success"}, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Name: "service_destination", Span: model.MetricsetSpan{ @@ -160,9 +163,10 @@ func TestAggregatorRun(t *testing.T) { }, }, }, { - Agent: model.Agent{Name: "python"}, - Service: model.Service{Name: "service-B"}, - Event: model.Event{Outcome: "success"}, + Agent: model.Agent{Name: "python"}, + Service: model.Service{Name: "service-B"}, + Event: model.Event{Outcome: "success"}, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Name: "service_destination", Span: model.MetricsetSpan{ @@ -205,9 +209,10 @@ func TestAggregateCompositeSpan(t *testing.T) { metricsets := batchMetricsets(t, batch) assert.Equal(t, []model.APMEvent{{ - Agent: model.Agent{Name: "java"}, - Service: model.Service{Name: "service-A"}, - Event: model.Event{Outcome: "success"}, + Agent: model.Agent{Name: "java"}, + Service: model.Service{Name: "service-A"}, + Event: model.Event{Outcome: "success"}, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Name: "service_destination", Span: model.MetricsetSpan{ @@ -254,9 +259,10 @@ func TestAggregatorOverflow(t *testing.T) { for _, m := range metricsets { assert.Equal(t, model.APMEvent{ - Agent: model.Agent{Name: "agent"}, - Service: model.Service{Name: "service"}, - Event: model.Event{Outcome: "success"}, + Agent: model.Agent{Name: "agent"}, + Service: model.Service{Name: "service"}, + Event: model.Event{Outcome: "success"}, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Name: "service_destination", Span: model.MetricsetSpan{ @@ -278,9 +284,10 @@ func makeSpan( count float64, ) model.APMEvent { event := model.APMEvent{ - Agent: model.Agent{Name: agentName}, - Service: model.Service{Name: serviceName}, - Event: model.Event{Outcome: outcome}, + Agent: model.Agent{Name: agentName}, + Service: model.Service{Name: serviceName}, + Event: model.Event{Outcome: outcome}, + Processor: model.SpanProcessor, Span: &model.Span{ Name: serviceName + ":" + destinationServiceResource, Duration: duration.Seconds() * 1000, diff --git a/x-pack/apm-server/aggregation/txmetrics/aggregator.go b/x-pack/apm-server/aggregation/txmetrics/aggregator.go index 4b48adfdb8f..d3f8973d9e2 100644 --- a/x-pack/apm-server/aggregation/txmetrics/aggregator.go +++ b/x-pack/apm-server/aggregation/txmetrics/aggregator.go @@ -239,7 +239,7 @@ func (a *Aggregator) publish(ctx context.Context) error { // included in the same batch. func (a *Aggregator) ProcessBatch(ctx context.Context, b *model.Batch) error { for _, event := range *b { - if event.Transaction == nil { + if event.Processor != model.TransactionProcessor { continue } if metricsetEvent := a.AggregateTransaction(event); metricsetEvent.Metricset != nil { @@ -384,6 +384,7 @@ func makeMetricset( Event: model.Event{ Outcome: key.eventOutcome, }, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Name: metricsetName, Transaction: model.MetricsetTransaction{ diff --git a/x-pack/apm-server/aggregation/txmetrics/aggregator_test.go b/x-pack/apm-server/aggregation/txmetrics/aggregator_test.go index 486cd8d1a4e..fd82c9b9bc9 100644 --- a/x-pack/apm-server/aggregation/txmetrics/aggregator_test.go +++ b/x-pack/apm-server/aggregation/txmetrics/aggregator_test.go @@ -83,8 +83,14 @@ func TestProcessTransformablesOverflow(t *testing.T) { // as we have configured the txmetrics with a maximum of two buckets. batch := make(model.Batch, 20) for i := 0; i < len(batch); i += 2 { - batch[i].Transaction = &model.Transaction{Name: "foo", RepresentativeCount: 1} - batch[i+1].Transaction = &model.Transaction{Name: "bar", RepresentativeCount: 1} + batch[i] = model.APMEvent{ + Processor: model.TransactionProcessor, + Transaction: &model.Transaction{Name: "foo", RepresentativeCount: 1}, + } + batch[i+1] = model.APMEvent{ + Processor: model.TransactionProcessor, + Transaction: &model.Transaction{Name: "bar", RepresentativeCount: 1}, + } } err = agg.ProcessBatch(context.Background(), &batch) require.NoError(t, err) @@ -92,11 +98,14 @@ func TestProcessTransformablesOverflow(t *testing.T) { // The third transaction group will return a metricset for immediate publication. for i := 0; i < 2; i++ { - batch = append(batch, model.APMEvent{Transaction: &model.Transaction{ - Name: "baz", - Duration: float64(time.Minute / time.Millisecond), - RepresentativeCount: 1, - }}) + batch = append(batch, model.APMEvent{ + Processor: model.TransactionProcessor, + Transaction: &model.Transaction{ + Name: "baz", + Duration: float64(time.Minute / time.Millisecond), + RepresentativeCount: 1, + }, + }) } err = agg.ProcessBatch(context.Background(), &batch) require.NoError(t, err) @@ -105,6 +114,7 @@ func TestProcessTransformablesOverflow(t *testing.T) { for _, m := range metricsets { assert.Equal(t, model.APMEvent{ + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Name: "transaction", Transaction: model.MetricsetTransaction{ @@ -152,6 +162,7 @@ func TestAggregatorRun(t *testing.T) { for i := 0; i < 1000; i++ { metricset := agg.AggregateTransaction(model.APMEvent{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ Name: "T-1000", RepresentativeCount: 1, @@ -161,6 +172,7 @@ func TestAggregatorRun(t *testing.T) { } for i := 0; i < 800; i++ { metricset := agg.AggregateTransaction(model.APMEvent{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ Name: "T-800", RepresentativeCount: 1, @@ -221,6 +233,7 @@ func TestAggregatorRunPublishErrors(t *testing.T) { for i := 0; i < 2; i++ { metricset := agg.AggregateTransaction(model.APMEvent{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ Name: "T-1000", RepresentativeCount: 1, @@ -254,12 +267,19 @@ func TestAggregateRepresentativeCount(t *testing.T) { // Record a transaction group so subsequent calls yield immediate metricsets, // and to demonstrate that fractional transaction counts are accumulated. - agg.AggregateTransaction(model.APMEvent{Transaction: &model.Transaction{Name: "fnord", RepresentativeCount: 1}}) - agg.AggregateTransaction(model.APMEvent{Transaction: &model.Transaction{Name: "fnord", RepresentativeCount: 1.5}}) + agg.AggregateTransaction(model.APMEvent{ + Processor: model.TransactionProcessor, + Transaction: &model.Transaction{Name: "fnord", RepresentativeCount: 1}, + }) + agg.AggregateTransaction(model.APMEvent{ + Processor: model.TransactionProcessor, + Transaction: &model.Transaction{Name: "fnord", RepresentativeCount: 1.5}, + }) // For non-positive RepresentativeCounts, no metrics will be accumulated. for _, representativeCount := range []float64{-1, 0} { m := agg.AggregateTransaction(model.APMEvent{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ Name: "foo", RepresentativeCount: representativeCount, @@ -282,6 +302,7 @@ func TestAggregateRepresentativeCount(t *testing.T) { expectedCount: 2, }} { m := agg.AggregateTransaction(model.APMEvent{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ Name: "foo", RepresentativeCount: test.representativeCount, @@ -291,6 +312,7 @@ func TestAggregateRepresentativeCount(t *testing.T) { m.Timestamp = time.Time{} assert.Equal(t, model.APMEvent{ + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Name: "transaction", TimeseriesInstanceID: ":foo:1db641f187113b17", @@ -357,6 +379,7 @@ func testHDRHistogramSignificantFigures(t *testing.T, sigfigs int) { 101111 * time.Microsecond, } { metricset := agg.AggregateTransaction(model.APMEvent{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ Name: "T-1000", Duration: durationMillis(duration), @@ -393,7 +416,10 @@ func TestAggregationFields(t *testing.T) { go agg.Run() defer agg.Stop(context.Background()) - input := model.APMEvent{Transaction: &model.Transaction{RepresentativeCount: 1}} + input := model.APMEvent{ + Processor: model.TransactionProcessor, + Transaction: &model.Transaction{RepresentativeCount: 1}, + } inputFields := []*string{ &input.Transaction.Name, &input.Transaction.Result, @@ -412,6 +438,7 @@ func TestAggregationFields(t *testing.T) { expectedEvent := input expectedEvent.Transaction = nil expectedEvent.Event.Outcome = input.Event.Outcome + expectedEvent.Processor = model.MetricsetProcessor expectedEvent.Metricset = &model.Metricset{ Name: "transaction", Transaction: model.MetricsetTransaction{ @@ -480,6 +507,7 @@ func BenchmarkAggregateTransaction(b *testing.B) { require.NoError(b, err) event := model.APMEvent{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ Name: "T-1000", Duration: 1, diff --git a/x-pack/apm-server/sampling/groups_test.go b/x-pack/apm-server/sampling/groups_test.go index 776b82a7ee5..d3c968631ae 100644 --- a/x-pack/apm-server/sampling/groups_test.go +++ b/x-pack/apm-server/sampling/groups_test.go @@ -25,9 +25,8 @@ func TestTraceGroupsPolicies(t *testing.T) { Event: model.Event{ Outcome: traceOutcome, }, - Trace: model.Trace{ - ID: uuid.Must(uuid.NewV4()).String(), - }, + Processor: model.TransactionProcessor, + Trace: model.Trace{ID: uuid.Must(uuid.NewV4()).String()}, Transaction: &model.Transaction{ Name: traceName, ID: uuid.Must(uuid.NewV4()).String(), @@ -100,9 +99,8 @@ func TestTraceGroupsMax(t *testing.T) { Service: model.Service{ Name: serviceName, }, - Trace: model.Trace{ - ID: uuid.Must(uuid.NewV4()).String(), - }, + Processor: model.TransactionProcessor, + Trace: model.Trace{ID: uuid.Must(uuid.NewV4()).String()}, Transaction: &model.Transaction{ Name: "whatever", ID: uuid.Must(uuid.NewV4()).String(), @@ -114,9 +112,8 @@ func TestTraceGroupsMax(t *testing.T) { } admitted, err := groups.sampleTrace(&model.APMEvent{ - Trace: model.Trace{ - ID: uuid.Must(uuid.NewV4()).String(), - }, + Processor: model.TransactionProcessor, + Trace: model.Trace{ID: uuid.Must(uuid.NewV4()).String()}, Transaction: &model.Transaction{ Name: "overflow", ID: uuid.Must(uuid.NewV4()).String(), @@ -137,12 +134,9 @@ func TestTraceGroupReservoirResize(t *testing.T) { sendTransactions := func(n int) { for i := 0; i < n; i++ { groups.sampleTrace(&model.APMEvent{ - Trace: model.Trace{ - ID: "0102030405060708090a0b0c0d0e0f10", - }, - Transaction: &model.Transaction{ - ID: "0102030405060708", - }, + Processor: model.TransactionProcessor, + Trace: model.Trace{ID: "0102030405060708090a0b0c0d0e0f10"}, + Transaction: &model.Transaction{ID: "0102030405060708"}, }) } } @@ -179,12 +173,9 @@ func TestTraceGroupReservoirResizeMinimum(t *testing.T) { sendTransactions := func(n int) { for i := 0; i < n; i++ { groups.sampleTrace(&model.APMEvent{ - Trace: model.Trace{ - ID: "0102030405060708090a0b0c0d0e0f10", - }, - Transaction: &model.Transaction{ - ID: "0102030405060708", - }, + Processor: model.TransactionProcessor, + Trace: model.Trace{ID: "0102030405060708090a0b0c0d0e0f10"}, + Transaction: &model.Transaction{ID: "0102030405060708"}, }) } } @@ -216,18 +207,21 @@ func TestTraceGroupsRemoval(t *testing.T) { for i := 0; i < 10000; i++ { _, err := groups.sampleTrace(&model.APMEvent{ Service: model.Service{Name: "many"}, + Processor: model.TransactionProcessor, Transaction: &model.Transaction{}, }) assert.NoError(t, err) } _, err := groups.sampleTrace(&model.APMEvent{ Service: model.Service{Name: "few"}, + Processor: model.TransactionProcessor, Transaction: &model.Transaction{}, }) assert.NoError(t, err) _, err = groups.sampleTrace(&model.APMEvent{ Service: model.Service{Name: "another"}, + Processor: model.TransactionProcessor, Transaction: &model.Transaction{}, }) assert.Equal(t, errTooManyTraceGroups, err) @@ -236,6 +230,7 @@ func TestTraceGroupsRemoval(t *testing.T) { // will not be affected by the limit... _, err = groups.sampleTrace(&model.APMEvent{ Service: model.Service{Name: "defined"}, + Processor: model.TransactionProcessor, Transaction: &model.Transaction{}, }) assert.NoError(t, err) @@ -244,6 +239,7 @@ func TestTraceGroupsRemoval(t *testing.T) { // a matching dynamic policy. _, err = groups.sampleTrace(&model.APMEvent{ Service: model.Service{Name: "defined_later"}, + Processor: model.TransactionProcessor, Transaction: &model.Transaction{}, }) assert.Equal(t, errTooManyTraceGroups, err) @@ -255,6 +251,7 @@ func TestTraceGroupsRemoval(t *testing.T) { // We should now be able to add another trace group. _, err = groups.sampleTrace(&model.APMEvent{ Service: model.Service{Name: "another"}, + Processor: model.TransactionProcessor, Transaction: &model.Transaction{}, }) assert.NoError(t, err) @@ -275,6 +272,7 @@ func BenchmarkTraceGroups(b *testing.B) { // Duration is non-zero to ensure transactions have a non-zero chance of // being sampled. tx := model.APMEvent{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ Duration: 1000, Name: uuid.Must(uuid.NewV4()).String(), diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index 5ca32aa753b..3b9bd54c78b 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -156,14 +156,15 @@ func (p *Processor) ProcessBatch(ctx context.Context, batch *model.Batch) error for i := 0; i < len(events); i++ { event := &events[i] var report, stored bool - if event.Transaction != nil { + switch event.Processor { + case model.TransactionProcessor: var err error atomic.AddInt64(&p.eventMetrics.processed, 1) report, stored, err = p.processTransaction(event) if err != nil { return err } - } else if event.Span != nil { + case model.SpanProcessor: var err error atomic.AddInt64(&p.eventMetrics.processed, 1) report, stored, err = p.processSpan(event) @@ -464,11 +465,12 @@ func (p *Processor) Run() error { // we don't publish duplicates; delivery is therefore // at-most-once, not guaranteed. for _, event := range events { - if event.Transaction != nil { + switch event.Processor { + case model.TransactionProcessor: if err := p.storage.DeleteTraceEvent(event.Trace.ID, event.Transaction.ID); err != nil { return errors.Wrap(err, "failed to delete transaction from local storage") } - } else if event.Span != nil { + case model.SpanProcessor: if err := p.storage.DeleteTraceEvent(event.Trace.ID, event.Span.ID); err != nil { return errors.Wrap(err, "failed to delete span from local storage") } diff --git a/x-pack/apm-server/sampling/processor_test.go b/x-pack/apm-server/sampling/processor_test.go index 8e0dc4208bb..caa7a612b3d 100644 --- a/x-pack/apm-server/sampling/processor_test.go +++ b/x-pack/apm-server/sampling/processor_test.go @@ -37,6 +37,7 @@ func TestProcessUnsampled(t *testing.T) { defer processor.Stop(context.Background()) in := model.Batch{{ + Processor: model.TransactionProcessor, Trace: model.Trace{ ID: "0102030405060708090a0b0c0d0e0f10", }, @@ -79,37 +80,45 @@ func TestProcessAlreadyTailSampled(t *testing.T) { go processor.Run() defer processor.Stop(context.Background()) - transaction1 := &model.Transaction{ - ID: "0102030405060708", - Sampled: true, + transaction1 := model.APMEvent{ + Processor: model.TransactionProcessor, + Trace: trace1, + Transaction: &model.Transaction{ + ID: "0102030405060708", + Sampled: true, + }, } - span1 := &model.Span{ - ID: "0102030405060709", + span1 := model.APMEvent{ + Processor: model.SpanProcessor, + Trace: trace1, + Span: &model.Span{ + ID: "0102030405060709", + }, } - transaction2 := &model.Transaction{ - ID: "0102030405060710", - Sampled: true, + transaction2 := model.APMEvent{ + Processor: model.TransactionProcessor, + Trace: trace2, + Transaction: &model.Transaction{ + ID: "0102030405060710", + Sampled: true, + }, } - span2 := &model.Span{ - ID: "0102030405060711", + span2 := model.APMEvent{ + Processor: model.SpanProcessor, + Trace: trace2, + Span: &model.Span{ + ID: "0102030405060711", + }, } - batch := model.Batch{ - {Trace: trace1, Transaction: transaction1}, - {Trace: trace2, Transaction: transaction2}, - {Trace: trace1, Span: span1}, - {Trace: trace2, Span: span2}, - } + batch := model.Batch{transaction1, transaction2, span1, span2} err = processor.ProcessBatch(context.Background(), &batch) require.NoError(t, err) // Tail sampling decision already made. The first transaction and span should be // reported immediately, whereas the second ones should be written storage since // they were received after the trace sampling entry expired. - assert.Equal(t, model.Batch{ - {Trace: trace1, Transaction: transaction1}, - {Trace: trace1, Span: span1}, - }, batch) + assert.Equal(t, model.Batch{transaction1, span1}, batch) expectedMonitoring := monitoring.MakeFlatSnapshot() expectedMonitoring.Ints["sampling.events.processed"] = 4 @@ -131,10 +140,7 @@ func TestProcessAlreadyTailSampled(t *testing.T) { err = reader.ReadTraceEvents(trace2.ID, &batch) assert.NoError(t, err) - assert.Equal(t, model.Batch{ - {Trace: trace2, Transaction: transaction2}, - {Trace: trace2, Span: span2}, - }, batch) + assert.Equal(t, model.Batch{transaction2, span2}, batch) }) } @@ -151,33 +157,37 @@ func TestProcessLocalTailSampling(t *testing.T) { trace1 := model.Trace{ID: "0102030405060708090a0b0c0d0e0f10"} trace2 := model.Trace{ID: "0102030405060708090a0b0c0d0e0f11"} trace1Events := model.Batch{{ - Trace: trace1, + Processor: model.TransactionProcessor, + Trace: trace1, Transaction: &model.Transaction{ ID: "0102030405060708", Duration: 123, Sampled: true, }, }, { - Trace: trace1, + Processor: model.SpanProcessor, + Trace: trace1, Span: &model.Span{ ID: "0102030405060709", Duration: 123, }, }} trace2Events := model.Batch{{ - Trace: trace2, + Processor: model.TransactionProcessor, + Trace: trace2, Transaction: &model.Transaction{ ID: "0102030405060710", Duration: 456, Sampled: true, }, }, { - Trace: trace2, + Processor: model.SpanProcessor, + Trace: trace2, Span: &model.Span{ ID: "0102030405060711", Duration: 456, - }}, - } + }, + }} in := append(trace1Events[:], trace2Events...) err = processor.ProcessBatch(context.Background(), &in) @@ -266,9 +276,8 @@ func TestProcessLocalTailSamplingUnsampled(t *testing.T) { traceID := uuid.Must(uuid.NewV4()).String() traceIDs[i] = traceID batch := model.Batch{{ - Trace: model.Trace{ - ID: traceID, - }, + Processor: model.TransactionProcessor, + Trace: model.Trace{ID: traceID}, Transaction: &model.Transaction{ ID: traceID, Duration: 1, @@ -332,10 +341,9 @@ func TestProcessLocalTailSamplingPolicyOrder(t *testing.T) { _, err := rng.Read(traceIDBytes[:]) require.NoError(t, err) events[i] = model.APMEvent{ - Service: service, - Trace: model.Trace{ - ID: fmt.Sprintf("%x", traceIDBytes[:]), - }, + Service: service, + Processor: model.TransactionProcessor, + Trace: model.Trace{ID: fmt.Sprintf("%x", traceIDBytes[:])}, Transaction: &model.Transaction{ Name: "trace_name", ID: fmt.Sprintf("%x", traceIDBytes[8:]), @@ -403,9 +411,8 @@ func TestProcessRemoteTailSampling(t *testing.T) { traceID1 := "0102030405060708090a0b0c0d0e0f10" traceID2 := "0102030405060708090a0b0c0d0e0f11" trace1Events := model.Batch{{ - Trace: model.Trace{ - ID: traceID1, - }, + Processor: model.SpanProcessor, + Trace: model.Trace{ID: traceID1}, Span: &model.Span{ ID: "0102030405060709", Duration: 123, @@ -486,10 +493,9 @@ func TestGroupsMonitoring(t *testing.T) { for i := 0; i < config.MaxDynamicServices+1; i++ { err := processor.ProcessBatch(context.Background(), &model.Batch{{ - Service: model.Service{Name: fmt.Sprintf("service_%d", i)}, - Trace: model.Trace{ - ID: uuid.Must(uuid.NewV4()).String(), - }, + Service: model.Service{Name: fmt.Sprintf("service_%d", i)}, + Processor: model.TransactionProcessor, + Trace: model.Trace{ID: uuid.Must(uuid.NewV4()).String()}, Transaction: &model.Transaction{ ID: "0102030405060709", Duration: 123, @@ -517,9 +523,8 @@ func TestStorageMonitoring(t *testing.T) { for i := 0; i < 100; i++ { traceID := uuid.Must(uuid.NewV4()).String() batch := model.Batch{{ - Trace: model.Trace{ - ID: traceID, - }, + Processor: model.TransactionProcessor, + Trace: model.Trace{ID: traceID}, Transaction: &model.Transaction{ ID: traceID, Duration: 123, @@ -563,9 +568,8 @@ func TestStorageGC(t *testing.T) { for i := 0; i < n; i++ { traceID := uuid.Must(uuid.NewV4()).String() batch := model.Batch{{ - Trace: model.Trace{ - ID: traceID, - }, + Processor: model.SpanProcessor, + Trace: model.Trace{ID: traceID}, Span: &model.Span{ ID: traceID, Duration: 123,