From 7f47042639b5d4b935dcd804d948f1283f61da36 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Fri, 20 Aug 2021 14:50:11 +0800 Subject: [PATCH 1/7] Introduce model.Processor Processor identifies the event kind: transaction, span, metricset, error, or profile. This is used for routing events to the appropriate index or data stream, and in queries for fetching events of a given kind. --- beater/api/profile/convert.go | 1 + model/apmevent.go | 5 ++- model/apmevent_test.go | 9 +++--- model/error.go | 10 +++--- model/metricset.go | 23 ++++++-------- model/metricset_test.go | 12 ++----- model/modeldecoder/rumv3/decoder.go | 25 +++++++++------ model/modeldecoder/rumv3/metadata_test.go | 1 + model/modeldecoder/v2/decoder.go | 4 +++ model/modeldecoder/v2/metadata_test.go | 3 ++ model/processor.go | 38 +++++++++++++++++++++++ model/profile.go | 15 +++------ model/profile_test.go | 1 - model/span.go | 12 +++---- model/span_test.go | 2 -- model/transaction.go | 13 +++----- model/transaction_test.go | 4 --- processor/otel/exceptions_test.go | 3 ++ processor/otel/metadata_test.go | 1 + processor/otel/metrics.go | 1 + processor/otel/metrics_test.go | 9 ++++++ processor/otel/traces.go | 3 ++ 22 files changed, 117 insertions(+), 78 deletions(-) create mode 100644 model/processor.go diff --git a/beater/api/profile/convert.go b/beater/api/profile/convert.go index 3c0248a5c39..15c1f58de09 100644 --- a/beater/api/profile/convert.go +++ b/beater/api/profile/convert.go @@ -87,6 +87,7 @@ func appendProfileSampleBatch(pp *profile.Profile, baseEvent model.APMEvent, out } event := baseEvent + event.Processor = model.ProfileProcessor event.Labels = event.Labels.Clone() if n := len(sample.Label); n > 0 { for k, v := range sample.Label { diff --git a/model/apmevent.go b/model/apmevent.go index 8c06713fc7e..004f4139f18 100644 --- a/model/apmevent.go +++ b/model/apmevent.go @@ -53,6 +53,7 @@ type APMEvent struct { Network Network Session Session URL URL + Processor Processor // Timestamp holds the event timestamp. // @@ -90,7 +91,8 @@ func (e *APMEvent) BeatEvent(ctx context.Context) beat.Event { event.Fields = e.Error.fields() case e.ProfileSample != nil: event.Fields = e.ProfileSample.fields() - default: + } + if event.Fields == nil { event.Fields = make(common.MapStr) } @@ -132,6 +134,7 @@ func (e *APMEvent) BeatEvent(ctx context.Context) beat.Event { fields.maybeSetMapStr("event", e.Event.fields()) fields.maybeSetMapStr("url", e.URL.fields()) fields.maybeSetMapStr("session", e.Session.fields()) + fields.maybeSetMapStr("processor", e.Processor.fields()) fields.maybeSetString("message", e.Message) return event } diff --git a/model/apmevent_test.go b/model/apmevent_test.go index a071a292d63..c9e4f44b9b9 100644 --- a/model/apmevent_test.go +++ b/model/apmevent_test.go @@ -73,6 +73,7 @@ func TestAPMEventFields(t *testing.T) { Message: "bottle", Transaction: &Transaction{}, Timestamp: time.Date(2019, 1, 3, 15, 17, 4, 908.596*1e6, time.FixedZone("+0100", 3600)), + Processor: Processor{Name: "processor_name", Event: "processor_event"}, }, output: common.MapStr{ // common fields @@ -102,12 +103,12 @@ func TestAPMEventFields(t *testing.T) { "c": 123, }, "message": "bottle", - - // fields related to APMEvent.Transaction "processor": common.MapStr{ - "name": "transaction", - "event": "transaction", + "name": "processor_name", + "event": "processor_event", }, + + // fields related to APMEvent.Transaction "timestamp": common.MapStr{"us": int64(1546525024908596)}, "transaction": common.MapStr{ "duration": common.MapStr{"us": 0}, diff --git a/model/error.go b/model/error.go index 1ef6b47bf0d..d3603ca2c01 100644 --- a/model/error.go +++ b/model/error.go @@ -27,13 +27,13 @@ var ( errorTransformations = monitoring.NewInt(errorMetrics, "transformations") errorStacktraceCounter = monitoring.NewInt(errorMetrics, "stacktraces") errorFrameCounter = monitoring.NewInt(errorMetrics, "frames") - errorProcessorEntry = common.MapStr{"name": errorProcessorName, "event": errorDocType} + + // ErrorProcessor is the Processor value that should be assigned to error events. + ErrorProcessor = Processor{Name: "error", Event: "error"} ) const ( - errorProcessorName = "error" - errorDocType = "error" - ErrorsDataset = "apm.error" + ErrorsDataset = "apm.error" ) type Error struct { @@ -86,7 +86,7 @@ func (e *Error) fields() common.MapStr { addStacktraceCounter(e.Log.Stacktrace) } - fields := mapStr{"processor": errorProcessorEntry} + var fields mapStr if e.HTTP != nil { fields.maybeSetMapStr("http", e.HTTP.transactionTopLevelFields()) } diff --git a/model/metricset.go b/model/metricset.go index 6a44b633605..4e78c516cd4 100644 --- a/model/metricset.go +++ b/model/metricset.go @@ -23,19 +23,16 @@ import ( ) const ( - metricsetProcessorName = "metric" - metricsetDocType = "metric" - metricsetEventKey = "event" - metricsetTransactionKey = "transaction" - metricsetSpanKey = "span" - AppMetricsDataset = "apm.app" - InternalMetricsDataset = "apm.internal" + AppMetricsDataset = "apm.app" + InternalMetricsDataset = "apm.internal" ) var ( metricsetMetrics = monitoring.Default.NewRegistry("apm-server.processor.metric") metricsetTransformations = monitoring.NewInt(metricsetMetrics, "transformations") - metricsetProcessorEntry = common.MapStr{"name": metricsetProcessorName, "event": metricsetDocType} + + // MetricsetProcessor is the Processor value that should be assigned to metricset events. + MetricsetProcessor = Processor{Name: "metric", Event: "metric"} ) // MetricType describes the type of a metric: gauge, counter, or histogram. @@ -144,10 +141,8 @@ func (me *Metricset) fields() common.MapStr { metricsetTransformations.Inc() var fields mapStr - fields.set("processor", metricsetProcessorEntry) - - fields.maybeSetMapStr(metricsetTransactionKey, me.Transaction.fields()) - fields.maybeSetMapStr(metricsetSpanKey, me.Span.fields()) + fields.maybeSetMapStr("transaction", me.Transaction.fields()) + fields.maybeSetMapStr("span", me.Span.fields()) if me.TimeseriesInstanceID != "" { fields.set("timeseries", common.MapStr{"instance": me.TimeseriesInstanceID}) } @@ -158,7 +153,7 @@ func (me *Metricset) fields() common.MapStr { var metricDescriptions mapStr for name, sample := range me.Samples { - sample.set(name, fields) + sample.set(name, &fields) var md mapStr md.maybeSetString("type", string(sample.Type)) @@ -190,7 +185,7 @@ func (s *MetricsetSpan) fields() common.MapStr { return common.MapStr(fields) } -func (s *MetricsetSample) set(name string, fields mapStr) { +func (s *MetricsetSample) set(name string, fields *mapStr) { if s.Type == MetricTypeHistogram { fields.set(name, common.MapStr{ "counts": s.Counts, diff --git a/model/metricset_test.go b/model/metricset_test.go index 28ceb2dcbeb..38476984ce4 100644 --- a/model/metricset_test.go +++ b/model/metricset_test.go @@ -46,15 +46,12 @@ func TestMetricset(t *testing.T) { }{ { Metricset: &Metricset{}, - Output: common.MapStr{ - "processor": common.MapStr{"event": "metric", "name": "metric"}, - }, - Msg: "Payload with empty metric.", + Output: common.MapStr{}, + Msg: "Payload with empty metric.", }, { Metricset: &Metricset{Name: "raj"}, Output: common.MapStr{ - "processor": common.MapStr{"event": "metric", "name": "metric"}, "metricset.name": "raj", }, Msg: "Payload with metricset name.", @@ -67,7 +64,6 @@ func TestMetricset(t *testing.T) { }, }, Output: common.MapStr{ - "processor": common.MapStr{"event": "metric", "name": "metric"}, "a.counter": 612.0, "some.gauge": 9.16, }, @@ -82,7 +78,6 @@ func TestMetricset(t *testing.T) { }, }, Output: common.MapStr{ - "processor": common.MapStr{"event": "metric", "name": "metric"}, "transaction": common.MapStr{"type": trType, "name": trName}, "span": common.MapStr{ "type": spType, "subtype": spSubtype, @@ -111,7 +106,6 @@ func TestMetricset(t *testing.T) { DocCount: 6, }, Output: common.MapStr{ - "processor": common.MapStr{"event": "metric", "name": "metric"}, "timeseries": common.MapStr{"instance": "foo"}, "transaction": common.MapStr{ "type": trType, @@ -143,7 +137,6 @@ func TestMetricset(t *testing.T) { }, }, Output: common.MapStr{ - "processor": common.MapStr{"event": "metric", "name": "metric"}, "span": common.MapStr{ "type": spType, "subtype": spSubtype, "destination": common.MapStr{"service": common.MapStr{"resource": resource}}, @@ -173,7 +166,6 @@ func TestMetricset(t *testing.T) { }, }, Output: common.MapStr{ - "processor": common.MapStr{"event": "metric", "name": "metric"}, "latency_histogram": common.MapStr{ "counts": []int64{1, 2, 3}, "values": []float64{1.1, 2.2, 3.3}, diff --git a/model/modeldecoder/rumv3/decoder.go b/model/modeldecoder/rumv3/decoder.go index 7d59d2825d6..a5caad0325d 100644 --- a/model/modeldecoder/rumv3/decoder.go +++ b/model/modeldecoder/rumv3/decoder.go @@ -161,20 +161,22 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch *batch = append(*batch, transaction) for _, m := range root.Transaction.Metricsets { - metricset := input.Base - mapToMetricsetModel(&m, &metricset) - metricset.Metricset.Transaction.Name = transaction.Transaction.Name - metricset.Metricset.Transaction.Type = transaction.Transaction.Type - *batch = append(*batch, metricset) + event := input.Base + mapToMetricsetModel(&m, &event) + event.Processor = model.MetricsetProcessor + event.Metricset.Transaction.Name = transaction.Transaction.Name + event.Metricset.Transaction.Type = transaction.Transaction.Type + *batch = append(*batch, event) } offset := len(*batch) for _, s := range root.Transaction.Spans { - span := input.Base - mapToSpanModel(&s, &span) - span.Span.TransactionID = transaction.Transaction.ID - span.Span.TraceID = transaction.Transaction.TraceID - *batch = append(*batch, span) + event := input.Base + mapToSpanModel(&s, &event) + event.Processor = model.SpanProcessor + event.Span.TransactionID = transaction.Transaction.ID + event.Span.TraceID = transaction.Transaction.TraceID + *batch = append(*batch, event) } spans := (*batch)[offset:] for i, s := range root.Transaction.Spans { @@ -190,6 +192,7 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch func mapToErrorModel(from *errorEvent, event *model.APMEvent) { out := &model.Error{} event.Error = out + event.Processor = model.ErrorProcessor // overwrite metadata with event specific information mapToServiceModel(from.Context.Service, &event.Service) @@ -391,6 +394,7 @@ func mapToMetadataModel(m *metadata, out *model.APMEvent) { func mapToMetricsetModel(from *metricset, event *model.APMEvent) { out := &model.Metricset{} event.Metricset = out + event.Processor = model.MetricsetProcessor // map samples information if from.Samples.IsSet() { @@ -690,6 +694,7 @@ func mapToStracktraceModel(from []stacktraceFrame, out model.Stacktrace) { func mapToTransactionModel(from *transaction, event *model.APMEvent) { out := &model.Transaction{} event.Transaction = out + event.Processor = model.TransactionProcessor // overwrite metadata with event specific information mapToServiceModel(from.Context.Service, &event.Service) diff --git a/model/modeldecoder/rumv3/metadata_test.go b/model/modeldecoder/rumv3/metadata_test.go index 22ee257d201..effcccbe898 100644 --- a/model/modeldecoder/rumv3/metadata_test.go +++ b/model/modeldecoder/rumv3/metadata_test.go @@ -60,6 +60,7 @@ func metadataExceptions(keys ...string) func(key string) bool { "Network", "Observer", "Process", + "Processor", "Service.Node", "Service.Agent.EphemeralID", "Host", diff --git a/model/modeldecoder/v2/decoder.go b/model/modeldecoder/v2/decoder.go index 23149dae4b9..08276ed38e4 100644 --- a/model/modeldecoder/v2/decoder.go +++ b/model/modeldecoder/v2/decoder.go @@ -240,6 +240,7 @@ func mapToClientModel(from contextRequest, out *model.Client) { func mapToErrorModel(from *errorEvent, config modeldecoder.Config, event *model.APMEvent) { out := &model.Error{} event.Error = out + event.Processor = model.ErrorProcessor // overwrite metadata with event specific information mapToServiceModel(from.Context.Service, &event.Service) @@ -538,6 +539,7 @@ func mapToMetadataModel(from *metadata, out *model.APMEvent) { func mapToMetricsetModel(from *metricset, config modeldecoder.Config, event *model.APMEvent) { out := &model.Metricset{} event.Metricset = out + event.Processor = model.MetricsetProcessor if !from.Timestamp.Val.IsZero() { event.Timestamp = from.Timestamp.Val @@ -725,6 +727,7 @@ func mapToAgentModel(from contextServiceAgent, out *model.Agent) { func mapToSpanModel(from *span, config modeldecoder.Config, event *model.APMEvent) { out := &model.Span{} event.Span = out + event.Processor = model.SpanProcessor // map span specific data if !from.Action.IsSet() && !from.Subtype.IsSet() { @@ -987,6 +990,7 @@ func mapToStracktraceModel(from []stacktraceFrame, out model.Stacktrace) { func mapToTransactionModel(from *transaction, config modeldecoder.Config, event *model.APMEvent) { out := &model.Transaction{} + event.Processor = model.TransactionProcessor event.Transaction = out // overwrite metadata with event specific information diff --git a/model/modeldecoder/v2/metadata_test.go b/model/modeldecoder/v2/metadata_test.go index 3f4caf17216..4feddae63ef 100644 --- a/model/modeldecoder/v2/metadata_test.go +++ b/model/modeldecoder/v2/metadata_test.go @@ -74,6 +74,9 @@ func isUnmappedMetadataField(key string) bool { "Observer.VersionMajor", "Process.CommandLine", "Process.Executable", + "Processor", + "Processor.Event", + "Processor.Name", "Host.OS.Full", "Host.OS.Type", "Host.ID", diff --git a/model/processor.go b/model/processor.go new file mode 100644 index 00000000000..fb5116b25e1 --- /dev/null +++ b/model/processor.go @@ -0,0 +1,38 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package model + +import ( + "github.com/elastic/beats/v7/libbeat/common" +) + +// Processor identifies an event type, and is used for routing events +// to the appropriate data stream or index. +// +// TODO(axw) this should be replaced with ECS event categorisation fields. +type Processor struct { + Name string + Event string +} + +func (p *Processor) fields() common.MapStr { + var fields mapStr + fields.maybeSetString("name", p.Name) + fields.maybeSetString("event", p.Event) + return common.MapStr(fields) +} diff --git a/model/profile.go b/model/profile.go index bb2d4a6fbc4..b89e3766b20 100644 --- a/model/profile.go +++ b/model/profile.go @@ -24,15 +24,11 @@ import ( ) const ( - profileProcessorName = "profile" - profileDocType = "profile" - ProfilesDataset = "apm.profiling" + ProfilesDataset = "apm.profiling" ) -var profileProcessorEntry = common.MapStr{ - "name": profileProcessorName, - "event": profileDocType, -} +// ProfileProcessor is the Processor value that should be assigned to profile events. +var ProfileProcessor = Processor{Name: "profile", Event: "profile"} // ProfileSample holds a profiling sample. type ProfileSample struct { @@ -78,8 +74,5 @@ func (p *ProfileSample) fields() common.MapStr { profileFields.set(k, v) } - return common.MapStr{ - "processor": profileProcessorEntry, - profileDocType: common.MapStr(profileFields), - } + return common.MapStr{"profile": common.MapStr(profileFields)} } diff --git a/model/profile_test.go b/model/profile_test.go index dda914e53dc..3d24f11a3a9 100644 --- a/model/profile_test.go +++ b/model/profile_test.go @@ -67,7 +67,6 @@ func TestProfileSampleTransform(t *testing.T) { assert.Equal(t, beat.Event{ Timestamp: timestamp, Fields: common.MapStr{ - "processor": common.MapStr{"event": "profile", "name": "profile"}, "profile": common.MapStr{ "id": "profile_id", "duration": int64(10 * time.Second), diff --git a/model/span.go b/model/span.go index a69b2f8630a..c1852f8452d 100644 --- a/model/span.go +++ b/model/span.go @@ -24,16 +24,14 @@ import ( "github.com/elastic/apm-server/utility" ) -const ( - spanDocType = "span" -) - var ( spanMetrics = monitoring.Default.NewRegistry("apm-server.processor.span") spanTransformations = monitoring.NewInt(spanMetrics, "transformations") spanStacktraceCounter = monitoring.NewInt(spanMetrics, "stacktraces") spanFrameCounter = monitoring.NewInt(spanMetrics, "frames") - spanProcessorEntry = common.MapStr{"name": "transaction", "event": spanDocType} + + // SpanProcessor is the Processor value that should be assigned to span events. + SpanProcessor = Processor{Name: "transaction", Event: "span"} ) type Span struct { @@ -139,8 +137,7 @@ func (e *Span) fields(apmEvent *APMEvent) common.MapStr { spanFrameCounter.Add(int64(frames)) } - fields := mapStr{"processor": spanProcessorEntry} - + var fields mapStr var trace, transaction, parent mapStr if trace.maybeSetString("id", e.TraceID) { fields.set("trace", common.MapStr(trace)) @@ -191,6 +188,5 @@ func (e *Span) fields(apmEvent *APMEvent) common.MapStr { span.set("stacktrace", st) } fields.set("span", common.MapStr(span)) - return common.MapStr(fields) } diff --git a/model/span_test.go b/model/span_test.go index 85a3feaf336..125de30e0d2 100644 --- a/model/span_test.go +++ b/model/span_test.go @@ -49,7 +49,6 @@ func TestSpanTransform(t *testing.T) { Msg: "Span without a Stacktrace", Span: Span{}, Output: common.MapStr{ - "processor": common.MapStr{"event": "span", "name": "transaction"}, "span": common.MapStr{ "duration": common.MapStr{"us": 0}, "name": "", @@ -133,7 +132,6 @@ func TestSpanTransform(t *testing.T) { "compression_strategy": "exact_match", }, }, - "processor": common.MapStr{"event": "span", "name": "transaction"}, "timestamp": common.MapStr{"us": timestampUs}, "trace": common.MapStr{"id": traceID}, "parent": common.MapStr{"id": parentID}, diff --git a/model/transaction.go b/model/transaction.go index c62d77ad1cb..9bd84dee562 100644 --- a/model/transaction.go +++ b/model/transaction.go @@ -25,15 +25,15 @@ import ( ) const ( - transactionProcessorName = "transaction" - transactionDocType = "transaction" - TracesDataset = "apm" + TracesDataset = "apm" ) var ( transactionMetrics = monitoring.Default.NewRegistry("apm-server.processor.transaction") transactionTransformations = monitoring.NewInt(transactionMetrics, "transformations") - transactionProcessorEntry = common.MapStr{"name": transactionProcessorName, "event": transactionDocType} + + // TransactionProcessor is the Processor value that should be assigned to transaction events. + TransactionProcessor = Processor{Name: "transaction", Event: "transaction"} ) type Transaction struct { @@ -70,10 +70,7 @@ type SpanCount struct { func (e *Transaction) fields() common.MapStr { transactionTransformations.Inc() - fields := mapStr{ - "processor": transactionProcessorEntry, - } - + var fields mapStr var parent, trace mapStr parent.maybeSetString("id", e.ParentID) trace.maybeSetString("id", e.TraceID) diff --git a/model/transaction_test.go b/model/transaction_test.go index 4a8eea22e09..1df41514813 100644 --- a/model/transaction_test.go +++ b/model/transaction_test.go @@ -172,10 +172,6 @@ func TestEventsTransformWithMetadata(t *testing.T) { "platform": "x64", }, }, - "processor": common.MapStr{ - "event": "transaction", - "name": "transaction", - }, "service": common.MapStr{ "name": serviceName, "version": serviceVersion, diff --git a/processor/otel/exceptions_test.go b/processor/otel/exceptions_test.go index 93f2f515510..fb4edfa9dd8 100644 --- a/processor/otel/exceptions_test.go +++ b/processor/otel/exceptions_test.go @@ -114,6 +114,7 @@ Caused by: LowLevelException Service: service, Agent: agent, Timestamp: timestamp, + Processor: model.ErrorProcessor, Error: &model.Error{ TraceID: transactionEvent.Transaction.TraceID, ParentID: transactionEvent.Transaction.ID, @@ -161,6 +162,7 @@ Caused by: LowLevelException Service: service, Agent: agent, Timestamp: timestamp, + Processor: model.ErrorProcessor, Error: &model.Error{ TraceID: transactionEvent.Transaction.TraceID, ParentID: transactionEvent.Transaction.ID, @@ -317,6 +319,7 @@ func TestEncodeSpanEventsNonJavaExceptions(t *testing.T) { Service: service, Agent: agent, Timestamp: timestamp, + Processor: model.ErrorProcessor, Error: &model.Error{ TraceID: transactionEvent.Transaction.TraceID, ParentID: transactionEvent.Transaction.ID, diff --git a/processor/otel/metadata_test.go b/processor/otel/metadata_test.go index 0fcf584a0d3..178e47fdb7a 100644 --- a/processor/otel/metadata_test.go +++ b/processor/otel/metadata_test.go @@ -236,5 +236,6 @@ func transformResourceMetadata(t *testing.T, resourceAttrs map[string]pdata.Attr events[0].Transaction = nil events[0].Event.Outcome = "" events[0].Timestamp = time.Time{} + events[0].Processor = model.Processor{} return events[0] } diff --git a/processor/otel/metrics.go b/processor/otel/metrics.go index d8d58f1d903..184aa45f187 100644 --- a/processor/otel/metrics.go +++ b/processor/otel/metrics.go @@ -105,6 +105,7 @@ func (c *Consumer) convertInstrumentationLibraryMetrics( } for _, m := range ms { event := baseEvent + event.Processor = model.MetricsetProcessor event.Metricset = m.Metricset event.Timestamp = m.timestamp.Add(timeDelta) if n := len(m.labels); n > 0 { diff --git a/processor/otel/metrics_test.go b/processor/otel/metrics_test.go index 453760a1e47..ec1b75d897d 100644 --- a/processor/otel/metrics_test.go +++ b/processor/otel/metrics_test.go @@ -164,6 +164,7 @@ func TestConsumeMetrics(t *testing.T) { Agent: agent, Service: service, Timestamp: timestamp0, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Samples: map[string]model.MetricsetSample{ "int_gauge_metric": {Value: 1, Type: "gauge"}, @@ -186,6 +187,7 @@ func TestConsumeMetrics(t *testing.T) { Agent: agent, Service: service, Timestamp: timestamp1, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Samples: map[string]model.MetricsetSample{ "int_gauge_metric": {Value: 3, Type: "gauge"}, @@ -197,6 +199,7 @@ func TestConsumeMetrics(t *testing.T) { Service: service, Labels: common.MapStr{"k": "v"}, Timestamp: timestamp1, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Samples: map[string]model.MetricsetSample{ "int_gauge_metric": {Value: 2, Type: "gauge"}, @@ -210,6 +213,7 @@ func TestConsumeMetrics(t *testing.T) { Service: service, Labels: common.MapStr{"k": "v2"}, Timestamp: timestamp1, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Samples: map[string]model.MetricsetSample{ "int_gauge_metric": {Value: 4, Type: "gauge"}, @@ -221,6 +225,7 @@ func TestConsumeMetrics(t *testing.T) { Service: service, Labels: common.MapStr{"k2": "v"}, Timestamp: timestamp1, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Samples: map[string]model.MetricsetSample{ "int_sum_metric": {Value: 11, Type: "counter"}, @@ -270,6 +275,7 @@ func TestConsumeMetrics_JVM(t *testing.T) { Agent: agent, Service: service, Timestamp: timestamp, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Samples: map[string]model.MetricsetSample{ "jvm.memory.heap.used": { @@ -283,6 +289,7 @@ func TestConsumeMetrics_JVM(t *testing.T) { Service: service, Labels: common.MapStr{"gc": "G1 Young Generation"}, Timestamp: timestamp, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Samples: map[string]model.MetricsetSample{ "runtime.jvm.gc.time": { @@ -300,6 +307,7 @@ func TestConsumeMetrics_JVM(t *testing.T) { Service: service, Labels: common.MapStr{"name": "G1 Young Generation"}, Timestamp: timestamp, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Samples: map[string]model.MetricsetSample{ "jvm.gc.time": { @@ -317,6 +325,7 @@ func TestConsumeMetrics_JVM(t *testing.T) { Service: service, Labels: common.MapStr{"area": "heap", "type": "used"}, Timestamp: timestamp, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Samples: map[string]model.MetricsetSample{ "runtime.jvm.memory.area": { diff --git a/processor/otel/traces.go b/processor/otel/traces.go index 4601b63f2f3..64ef4228fed 100644 --- a/processor/otel/traces.go +++ b/processor/otel/traces.go @@ -211,6 +211,7 @@ func (c *Consumer) convertSpan( event.Timestamp = startTime.Add(timeDelta) event.Event.Outcome = spanStatusOutcome(otelSpan.Status()) if root || otelSpan.Kind() == pdata.SpanKindServer || otelSpan.Kind() == pdata.SpanKindConsumer { + event.Processor = model.TransactionProcessor event.Transaction = &model.Transaction{ ID: spanID, ParentID: parentID, @@ -221,6 +222,7 @@ func (c *Consumer) convertSpan( } translateTransaction(otelSpan, otelLibrary, &event) } else { + event.Processor = model.SpanProcessor event.Span = &model.Span{ ID: spanID, ParentID: parentID, @@ -877,6 +879,7 @@ func convertSpanEvent( } if e != nil { event := parent + event.Processor = model.ErrorProcessor event.Error = e event.Timestamp = spanEvent.Timestamp().AsTime().Add(timeDelta) if parent.Transaction != nil { From dee6a251cc6ab43f36467f936d4c02e6f1f1d65c Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Fri, 20 Aug 2021 14:55:20 +0800 Subject: [PATCH 2/7] Update SetDataStream to use APMEvent.Processor --- model/modelprocessor/datastream.go | 10 +++++----- model/modelprocessor/datastream_test.go | 15 ++++++++++----- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/model/modelprocessor/datastream.go b/model/modelprocessor/datastream.go index 96af159c376..375ed68e999 100644 --- a/model/modelprocessor/datastream.go +++ b/model/modelprocessor/datastream.go @@ -39,14 +39,14 @@ func (s *SetDataStream) ProcessBatch(ctx context.Context, b *model.Batch) error } func (s *SetDataStream) setDataStream(event *model.APMEvent) { - switch { - case event.Transaction != nil || event.Span != nil: + switch event.Processor { + case model.SpanProcessor, model.TransactionProcessor: event.DataStream.Type = datastreams.TracesType event.DataStream.Dataset = model.TracesDataset - case event.Error != nil: + case model.ErrorProcessor: event.DataStream.Type = datastreams.LogsType event.DataStream.Dataset = model.ErrorsDataset - case event.Metricset != nil: + case model.MetricsetProcessor: event.DataStream.Type = datastreams.MetricsType // Metrics that include well-defined transaction/span fields // (i.e. breakdown metrics, transaction and span metrics) will @@ -58,7 +58,7 @@ func (s *SetDataStream) setDataStream(event *model.APMEvent) { datastreams.NormalizeServiceName(event.Service.Name), ) } - case event.ProfileSample != nil: + case model.ProfileProcessor: event.DataStream.Type = datastreams.MetricsType event.DataStream.Dataset = model.ProfilesDataset } diff --git a/model/modelprocessor/datastream_test.go b/model/modelprocessor/datastream_test.go index 01d151c0e0c..eef40765741 100644 --- a/model/modelprocessor/datastream_test.go +++ b/model/modelprocessor/datastream_test.go @@ -35,17 +35,18 @@ func TestSetDataStream(t *testing.T) { input: model.APMEvent{}, output: model.DataStream{Namespace: "custom"}, }, { - input: model.APMEvent{Transaction: &model.Transaction{}}, + input: model.APMEvent{Processor: model.TransactionProcessor}, output: model.DataStream{Type: "traces", Dataset: "apm", Namespace: "custom"}, }, { - input: model.APMEvent{Span: &model.Span{}}, + input: model.APMEvent{Processor: model.SpanProcessor}, output: model.DataStream{Type: "traces", Dataset: "apm", Namespace: "custom"}, }, { - input: model.APMEvent{Error: &model.Error{}}, + input: model.APMEvent{Processor: model.ErrorProcessor}, output: model.DataStream{Type: "logs", Dataset: "apm.error", Namespace: "custom"}, }, { input: model.APMEvent{ - Service: model.Service{Name: "service-name"}, + Processor: model.MetricsetProcessor, + Service: model.Service{Name: "service-name"}, Metricset: &model.Metricset{ Transaction: model.MetricsetTransaction{Name: "foo"}, }, @@ -53,12 +54,16 @@ func TestSetDataStream(t *testing.T) { output: model.DataStream{Type: "metrics", Dataset: "apm.internal", Namespace: "custom"}, }, { input: model.APMEvent{ + Processor: model.MetricsetProcessor, Service: model.Service{Name: "service-name"}, Metricset: &model.Metricset{}, }, output: model.DataStream{Type: "metrics", Dataset: "apm.app.service_name", Namespace: "custom"}, }, { - input: model.APMEvent{ProfileSample: &model.ProfileSample{}}, + input: model.APMEvent{ + Processor: model.ProfileProcessor, + ProfileSample: &model.ProfileSample{}, + }, output: model.DataStream{Type: "metrics", Dataset: "apm.profiling", Namespace: "custom"}, }} From d2ee7d2929842103618d5f8c8ec87b1557be6c33 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Fri, 20 Aug 2021 16:40:50 +0800 Subject: [PATCH 3/7] Move monitoring event counters to model processor Also, remove unused metrics. --- beater/beater.go | 2 + model/error.go | 22 ------- model/metricset.go | 6 -- model/modelprocessor/eventcounter.go | 79 +++++++++++++++++++++++ model/modelprocessor/eventcounter_test.go | 51 +++++++++++++++ model/span.go | 12 ---- model/transaction.go | 6 -- 7 files changed, 132 insertions(+), 46 deletions(-) create mode 100644 model/modelprocessor/eventcounter.go create mode 100644 model/modelprocessor/eventcounter_test.go diff --git a/beater/beater.go b/beater/beater.go index 89e364fb2d1..1114ef37fe2 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/transport" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" + "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/go-ucfg" "github.com/pkg/errors" @@ -479,6 +480,7 @@ func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) R modelprocessor.SetErrorMessage{}, newObserverBatchProcessor(s.beat.Info), model.ProcessBatchFunc(ecsVersionBatchProcessor), + modelprocessor.NewEventCounter(monitoring.Default.GetRegistry("apm-server")), } if s.config.DefaultServiceEnvironment != "" { processors = append(processors, &modelprocessor.SetDefaultServiceEnvironment{ diff --git a/model/error.go b/model/error.go index d3603ca2c01..16b42a0049b 100644 --- a/model/error.go +++ b/model/error.go @@ -19,15 +19,9 @@ package model import ( "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/monitoring" ) var ( - errorMetrics = monitoring.Default.NewRegistry("apm-server.processor.error") - errorTransformations = monitoring.NewInt(errorMetrics, "transformations") - errorStacktraceCounter = monitoring.NewInt(errorMetrics, "stacktraces") - errorFrameCounter = monitoring.NewInt(errorMetrics, "frames") - // ErrorProcessor is the Processor value that should be assigned to error events. ErrorProcessor = Processor{Name: "error", Event: "error"} ) @@ -77,15 +71,6 @@ type Log struct { } func (e *Error) fields() common.MapStr { - errorTransformations.Inc() - - if e.Exception != nil { - addStacktraceCounter(e.Exception.Stacktrace) - } - if e.Log != nil { - addStacktraceCounter(e.Log.Stacktrace) - } - var fields mapStr if e.HTTP != nil { fields.maybeSetMapStr("http", e.HTTP.transactionTopLevelFields()) @@ -164,13 +149,6 @@ func (e *Error) logFields() common.MapStr { return common.MapStr(log) } -func addStacktraceCounter(st Stacktrace) { - if frames := len(st); frames > 0 { - errorStacktraceCounter.Inc() - errorFrameCounter.Add(int64(frames)) - } -} - // flattenExceptionTree recursively traverses the causes of an exception to return a slice of exceptions. // Tree traversal is Depth First. // The parent of a exception in the resulting slice is at the position indicated by the `parent` property diff --git a/model/metricset.go b/model/metricset.go index 4e78c516cd4..94f9724a771 100644 --- a/model/metricset.go +++ b/model/metricset.go @@ -19,7 +19,6 @@ package model import ( "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/monitoring" ) const ( @@ -28,9 +27,6 @@ const ( ) var ( - metricsetMetrics = monitoring.Default.NewRegistry("apm-server.processor.metric") - metricsetTransformations = monitoring.NewInt(metricsetMetrics, "transformations") - // MetricsetProcessor is the Processor value that should be assigned to metricset events. MetricsetProcessor = Processor{Name: "metric", Event: "metric"} ) @@ -138,8 +134,6 @@ type MetricsetSpan struct { } func (me *Metricset) fields() common.MapStr { - metricsetTransformations.Inc() - var fields mapStr fields.maybeSetMapStr("transaction", me.Transaction.fields()) fields.maybeSetMapStr("span", me.Span.fields()) diff --git a/model/modelprocessor/eventcounter.go b/model/modelprocessor/eventcounter.go new file mode 100644 index 00000000000..5f02e915a54 --- /dev/null +++ b/model/modelprocessor/eventcounter.go @@ -0,0 +1,79 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package modelprocessor + +import ( + "context" + "sync" + + "github.com/elastic/beats/v7/libbeat/monitoring" + + "github.com/elastic/apm-server/model" +) + +// EventCounter is a model.BatchProcessor that counts the number of events processed, +// recording the counts as metrics in a monitoring.Registry. +// +// Metrics are named after the event type: `processor..transformations`. +// These metrics are used to populate the "Processed Events" graphs in Stack Monitoring. +type EventCounter struct { + registry *monitoring.Registry + + mu sync.RWMutex + eventCounters map[string]*monitoring.Int +} + +// NewEventCounter returns an EventCounter that counts events processed, recording +// them as `.transformations` under the given registry. +func NewEventCounter(registry *monitoring.Registry) *EventCounter { + return &EventCounter{ + registry: registry, + eventCounters: make(map[string]*monitoring.Int), + } +} + +// ProcessBatch counts events in b, grouping by APMEvent.Processor.Event. +func (c *EventCounter) ProcessBatch(ctx context.Context, b *model.Batch) error { + for _, event := range *b { + pe := event.Processor.Event + if pe == "" { + continue + } + c.mu.RLock() + eventCounter := c.eventCounters[pe] + c.mu.RUnlock() + if eventCounter == nil { + c.mu.Lock() + eventCounter = c.eventCounters[pe] + if eventCounter == nil { + // Metric may exist in the registry but not in our map, + // so first check if it exists before attempting to create. + name := "processor." + pe + ".transformations" + var ok bool + eventCounter, ok = c.registry.Get(name).(*monitoring.Int) + if !ok { + eventCounter = monitoring.NewInt(c.registry, name) + } + c.eventCounters[pe] = eventCounter + } + c.mu.Unlock() + } + eventCounter.Inc() + } + return nil +} diff --git a/model/modelprocessor/eventcounter_test.go b/model/modelprocessor/eventcounter_test.go new file mode 100644 index 00000000000..92b073e79af --- /dev/null +++ b/model/modelprocessor/eventcounter_test.go @@ -0,0 +1,51 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package modelprocessor_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/monitoring" + + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/model/modelprocessor" +) + +func TestEventCounter(t *testing.T) { + batch := model.Batch{ + {}, + {Processor: model.TransactionProcessor}, + {Processor: model.SpanProcessor}, + {Processor: model.TransactionProcessor}, + } + + expected := monitoring.MakeFlatSnapshot() + expected.Ints["processor.span.transformations"] = 1 + expected.Ints["processor.transaction.transformations"] = 2 + + registry := monitoring.NewRegistry() + processor := modelprocessor.NewEventCounter(registry) + err := processor.ProcessBatch(context.Background(), &batch) + assert.NoError(t, err) + snapshot := monitoring.CollectFlatSnapshot(registry, monitoring.Full, false) + assert.Equal(t, expected, snapshot) + +} diff --git a/model/span.go b/model/span.go index c1852f8452d..af0f7ab91d8 100644 --- a/model/span.go +++ b/model/span.go @@ -19,17 +19,11 @@ package model import ( "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/apm-server/utility" ) var ( - spanMetrics = monitoring.Default.NewRegistry("apm-server.processor.span") - spanTransformations = monitoring.NewInt(spanMetrics, "transformations") - spanStacktraceCounter = monitoring.NewInt(spanMetrics, "stacktraces") - spanFrameCounter = monitoring.NewInt(spanMetrics, "frames") - // SpanProcessor is the Processor value that should be assigned to span events. SpanProcessor = Processor{Name: "transaction", Event: "span"} ) @@ -131,12 +125,6 @@ func (c *Composite) fields() common.MapStr { } func (e *Span) fields(apmEvent *APMEvent) common.MapStr { - spanTransformations.Inc() - if frames := len(e.Stacktrace); frames > 0 { - spanStacktraceCounter.Inc() - spanFrameCounter.Add(int64(frames)) - } - var fields mapStr var trace, transaction, parent mapStr if trace.maybeSetString("id", e.TraceID) { diff --git a/model/transaction.go b/model/transaction.go index 9bd84dee562..879e979ae01 100644 --- a/model/transaction.go +++ b/model/transaction.go @@ -19,7 +19,6 @@ package model import ( "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/apm-server/utility" ) @@ -29,9 +28,6 @@ const ( ) var ( - transactionMetrics = monitoring.Default.NewRegistry("apm-server.processor.transaction") - transactionTransformations = monitoring.NewInt(transactionMetrics, "transformations") - // TransactionProcessor is the Processor value that should be assigned to transaction events. TransactionProcessor = Processor{Name: "transaction", Event: "transaction"} ) @@ -68,8 +64,6 @@ type SpanCount struct { } func (e *Transaction) fields() common.MapStr { - transactionTransformations.Inc() - var fields mapStr var parent, trace mapStr parent.maybeSetString("id", e.ParentID) From b8c703b0d6587c68a49218ce87aea412a4f5ca24 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Sat, 21 Aug 2021 10:23:03 +0800 Subject: [PATCH 4/7] Update aggregators to use Processor --- .../aggregation/spanmetrics/aggregator.go | 3 +- .../spanmetrics/aggregator_test.go | 49 +++++++++++-------- .../aggregation/txmetrics/aggregator.go | 3 +- .../aggregation/txmetrics/aggregator_test.go | 48 ++++++++++++++---- 4 files changed, 70 insertions(+), 33 deletions(-) diff --git a/x-pack/apm-server/aggregation/spanmetrics/aggregator.go b/x-pack/apm-server/aggregation/spanmetrics/aggregator.go index 8eef44523fe..dc80be5e347 100644 --- a/x-pack/apm-server/aggregation/spanmetrics/aggregator.go +++ b/x-pack/apm-server/aggregation/spanmetrics/aggregator.go @@ -177,7 +177,7 @@ func (a *Aggregator) ProcessBatch(ctx context.Context, b *model.Batch) error { a.mu.RLock() defer a.mu.RUnlock() for _, event := range *b { - if event.Span == nil { + if event.Processor != model.SpanProcessor { continue } if metricsetEvent := a.processSpan(&event); metricsetEvent.Metricset != nil { @@ -277,6 +277,7 @@ func makeMetricset(timestamp time.Time, key aggregationKey, metrics spanMetrics, Event: model.Event{ Outcome: key.outcome, }, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Name: metricsetName, Span: model.MetricsetSpan{ diff --git a/x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go b/x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go index e0a750fce1f..f3a52206d26 100644 --- a/x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go +++ b/x-pack/apm-server/aggregation/spanmetrics/aggregator_test.go @@ -115,9 +115,10 @@ func TestAggregatorRun(t *testing.T) { metricsets := batchMetricsets(t, batch) assert.ElementsMatch(t, []model.APMEvent{{ - Agent: model.Agent{Name: "java"}, - Service: model.Service{Name: "service-A"}, - Event: model.Event{Outcome: "success"}, + Agent: model.Agent{Name: "java"}, + Service: model.Service{Name: "service-A"}, + Event: model.Event{Outcome: "success"}, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Name: "service_destination", Span: model.MetricsetSpan{ @@ -130,9 +131,10 @@ func TestAggregatorRun(t *testing.T) { }, }, }, { - Agent: model.Agent{Name: "java"}, - Service: model.Service{Name: "service-A"}, - Event: model.Event{Outcome: "failure"}, + Agent: model.Agent{Name: "java"}, + Service: model.Service{Name: "service-A"}, + Event: model.Event{Outcome: "failure"}, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Name: "service_destination", Span: model.MetricsetSpan{ @@ -145,9 +147,10 @@ func TestAggregatorRun(t *testing.T) { }, }, }, { - Agent: model.Agent{Name: "java"}, - Service: model.Service{Name: "service-A"}, - Event: model.Event{Outcome: "success"}, + Agent: model.Agent{Name: "java"}, + Service: model.Service{Name: "service-A"}, + Event: model.Event{Outcome: "success"}, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Name: "service_destination", Span: model.MetricsetSpan{ @@ -160,9 +163,10 @@ func TestAggregatorRun(t *testing.T) { }, }, }, { - Agent: model.Agent{Name: "python"}, - Service: model.Service{Name: "service-B"}, - Event: model.Event{Outcome: "success"}, + Agent: model.Agent{Name: "python"}, + Service: model.Service{Name: "service-B"}, + Event: model.Event{Outcome: "success"}, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Name: "service_destination", Span: model.MetricsetSpan{ @@ -205,9 +209,10 @@ func TestAggregateCompositeSpan(t *testing.T) { metricsets := batchMetricsets(t, batch) assert.Equal(t, []model.APMEvent{{ - Agent: model.Agent{Name: "java"}, - Service: model.Service{Name: "service-A"}, - Event: model.Event{Outcome: "success"}, + Agent: model.Agent{Name: "java"}, + Service: model.Service{Name: "service-A"}, + Event: model.Event{Outcome: "success"}, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Name: "service_destination", Span: model.MetricsetSpan{ @@ -254,9 +259,10 @@ func TestAggregatorOverflow(t *testing.T) { for _, m := range metricsets { assert.Equal(t, model.APMEvent{ - Agent: model.Agent{Name: "agent"}, - Service: model.Service{Name: "service"}, - Event: model.Event{Outcome: "success"}, + Agent: model.Agent{Name: "agent"}, + Service: model.Service{Name: "service"}, + Event: model.Event{Outcome: "success"}, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Name: "service_destination", Span: model.MetricsetSpan{ @@ -278,9 +284,10 @@ func makeSpan( count float64, ) model.APMEvent { event := model.APMEvent{ - Agent: model.Agent{Name: agentName}, - Service: model.Service{Name: serviceName}, - Event: model.Event{Outcome: outcome}, + Agent: model.Agent{Name: agentName}, + Service: model.Service{Name: serviceName}, + Event: model.Event{Outcome: outcome}, + Processor: model.SpanProcessor, Span: &model.Span{ Name: serviceName + ":" + destinationServiceResource, Duration: duration.Seconds() * 1000, diff --git a/x-pack/apm-server/aggregation/txmetrics/aggregator.go b/x-pack/apm-server/aggregation/txmetrics/aggregator.go index 4b48adfdb8f..d3f8973d9e2 100644 --- a/x-pack/apm-server/aggregation/txmetrics/aggregator.go +++ b/x-pack/apm-server/aggregation/txmetrics/aggregator.go @@ -239,7 +239,7 @@ func (a *Aggregator) publish(ctx context.Context) error { // included in the same batch. func (a *Aggregator) ProcessBatch(ctx context.Context, b *model.Batch) error { for _, event := range *b { - if event.Transaction == nil { + if event.Processor != model.TransactionProcessor { continue } if metricsetEvent := a.AggregateTransaction(event); metricsetEvent.Metricset != nil { @@ -384,6 +384,7 @@ func makeMetricset( Event: model.Event{ Outcome: key.eventOutcome, }, + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Name: metricsetName, Transaction: model.MetricsetTransaction{ diff --git a/x-pack/apm-server/aggregation/txmetrics/aggregator_test.go b/x-pack/apm-server/aggregation/txmetrics/aggregator_test.go index 486cd8d1a4e..fd82c9b9bc9 100644 --- a/x-pack/apm-server/aggregation/txmetrics/aggregator_test.go +++ b/x-pack/apm-server/aggregation/txmetrics/aggregator_test.go @@ -83,8 +83,14 @@ func TestProcessTransformablesOverflow(t *testing.T) { // as we have configured the txmetrics with a maximum of two buckets. batch := make(model.Batch, 20) for i := 0; i < len(batch); i += 2 { - batch[i].Transaction = &model.Transaction{Name: "foo", RepresentativeCount: 1} - batch[i+1].Transaction = &model.Transaction{Name: "bar", RepresentativeCount: 1} + batch[i] = model.APMEvent{ + Processor: model.TransactionProcessor, + Transaction: &model.Transaction{Name: "foo", RepresentativeCount: 1}, + } + batch[i+1] = model.APMEvent{ + Processor: model.TransactionProcessor, + Transaction: &model.Transaction{Name: "bar", RepresentativeCount: 1}, + } } err = agg.ProcessBatch(context.Background(), &batch) require.NoError(t, err) @@ -92,11 +98,14 @@ func TestProcessTransformablesOverflow(t *testing.T) { // The third transaction group will return a metricset for immediate publication. for i := 0; i < 2; i++ { - batch = append(batch, model.APMEvent{Transaction: &model.Transaction{ - Name: "baz", - Duration: float64(time.Minute / time.Millisecond), - RepresentativeCount: 1, - }}) + batch = append(batch, model.APMEvent{ + Processor: model.TransactionProcessor, + Transaction: &model.Transaction{ + Name: "baz", + Duration: float64(time.Minute / time.Millisecond), + RepresentativeCount: 1, + }, + }) } err = agg.ProcessBatch(context.Background(), &batch) require.NoError(t, err) @@ -105,6 +114,7 @@ func TestProcessTransformablesOverflow(t *testing.T) { for _, m := range metricsets { assert.Equal(t, model.APMEvent{ + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Name: "transaction", Transaction: model.MetricsetTransaction{ @@ -152,6 +162,7 @@ func TestAggregatorRun(t *testing.T) { for i := 0; i < 1000; i++ { metricset := agg.AggregateTransaction(model.APMEvent{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ Name: "T-1000", RepresentativeCount: 1, @@ -161,6 +172,7 @@ func TestAggregatorRun(t *testing.T) { } for i := 0; i < 800; i++ { metricset := agg.AggregateTransaction(model.APMEvent{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ Name: "T-800", RepresentativeCount: 1, @@ -221,6 +233,7 @@ func TestAggregatorRunPublishErrors(t *testing.T) { for i := 0; i < 2; i++ { metricset := agg.AggregateTransaction(model.APMEvent{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ Name: "T-1000", RepresentativeCount: 1, @@ -254,12 +267,19 @@ func TestAggregateRepresentativeCount(t *testing.T) { // Record a transaction group so subsequent calls yield immediate metricsets, // and to demonstrate that fractional transaction counts are accumulated. - agg.AggregateTransaction(model.APMEvent{Transaction: &model.Transaction{Name: "fnord", RepresentativeCount: 1}}) - agg.AggregateTransaction(model.APMEvent{Transaction: &model.Transaction{Name: "fnord", RepresentativeCount: 1.5}}) + agg.AggregateTransaction(model.APMEvent{ + Processor: model.TransactionProcessor, + Transaction: &model.Transaction{Name: "fnord", RepresentativeCount: 1}, + }) + agg.AggregateTransaction(model.APMEvent{ + Processor: model.TransactionProcessor, + Transaction: &model.Transaction{Name: "fnord", RepresentativeCount: 1.5}, + }) // For non-positive RepresentativeCounts, no metrics will be accumulated. for _, representativeCount := range []float64{-1, 0} { m := agg.AggregateTransaction(model.APMEvent{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ Name: "foo", RepresentativeCount: representativeCount, @@ -282,6 +302,7 @@ func TestAggregateRepresentativeCount(t *testing.T) { expectedCount: 2, }} { m := agg.AggregateTransaction(model.APMEvent{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ Name: "foo", RepresentativeCount: test.representativeCount, @@ -291,6 +312,7 @@ func TestAggregateRepresentativeCount(t *testing.T) { m.Timestamp = time.Time{} assert.Equal(t, model.APMEvent{ + Processor: model.MetricsetProcessor, Metricset: &model.Metricset{ Name: "transaction", TimeseriesInstanceID: ":foo:1db641f187113b17", @@ -357,6 +379,7 @@ func testHDRHistogramSignificantFigures(t *testing.T, sigfigs int) { 101111 * time.Microsecond, } { metricset := agg.AggregateTransaction(model.APMEvent{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ Name: "T-1000", Duration: durationMillis(duration), @@ -393,7 +416,10 @@ func TestAggregationFields(t *testing.T) { go agg.Run() defer agg.Stop(context.Background()) - input := model.APMEvent{Transaction: &model.Transaction{RepresentativeCount: 1}} + input := model.APMEvent{ + Processor: model.TransactionProcessor, + Transaction: &model.Transaction{RepresentativeCount: 1}, + } inputFields := []*string{ &input.Transaction.Name, &input.Transaction.Result, @@ -412,6 +438,7 @@ func TestAggregationFields(t *testing.T) { expectedEvent := input expectedEvent.Transaction = nil expectedEvent.Event.Outcome = input.Event.Outcome + expectedEvent.Processor = model.MetricsetProcessor expectedEvent.Metricset = &model.Metricset{ Name: "transaction", Transaction: model.MetricsetTransaction{ @@ -480,6 +507,7 @@ func BenchmarkAggregateTransaction(b *testing.B) { require.NoError(b, err) event := model.APMEvent{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ Name: "T-1000", Duration: 1, From 9a201290ddd286b4d281993dba805b67be5da38e Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Sat, 21 Aug 2021 10:29:44 +0800 Subject: [PATCH 5/7] sampling: use Processor --- x-pack/apm-server/sampling/groups_test.go | 12 +++ x-pack/apm-server/sampling/processor.go | 10 +- x-pack/apm-server/sampling/processor_test.go | 98 +++++++++++--------- 3 files changed, 74 insertions(+), 46 deletions(-) diff --git a/x-pack/apm-server/sampling/groups_test.go b/x-pack/apm-server/sampling/groups_test.go index 7b29c41253e..579c07c521e 100644 --- a/x-pack/apm-server/sampling/groups_test.go +++ b/x-pack/apm-server/sampling/groups_test.go @@ -25,6 +25,7 @@ func TestTraceGroupsPolicies(t *testing.T) { Event: model.Event{ Outcome: traceOutcome, }, + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ Name: traceName, TraceID: uuid.Must(uuid.NewV4()).String(), @@ -98,6 +99,7 @@ func TestTraceGroupsMax(t *testing.T) { Service: model.Service{ Name: serviceName, }, + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ Name: "whatever", TraceID: uuid.Must(uuid.NewV4()).String(), @@ -110,6 +112,7 @@ func TestTraceGroupsMax(t *testing.T) { } admitted, err := groups.sampleTrace(&model.APMEvent{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ Name: "overflow", TraceID: uuid.Must(uuid.NewV4()).String(), @@ -131,6 +134,7 @@ func TestTraceGroupReservoirResize(t *testing.T) { sendTransactions := func(n int) { for i := 0; i < n; i++ { groups.sampleTrace(&model.APMEvent{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ TraceID: "0102030405060708090a0b0c0d0e0f10", ID: "0102030405060708", @@ -171,6 +175,7 @@ func TestTraceGroupReservoirResizeMinimum(t *testing.T) { sendTransactions := func(n int) { for i := 0; i < n; i++ { groups.sampleTrace(&model.APMEvent{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ TraceID: "0102030405060708090a0b0c0d0e0f10", ID: "0102030405060708", @@ -206,18 +211,21 @@ func TestTraceGroupsRemoval(t *testing.T) { for i := 0; i < 10000; i++ { _, err := groups.sampleTrace(&model.APMEvent{ Service: model.Service{Name: "many"}, + Processor: model.TransactionProcessor, Transaction: &model.Transaction{}, }) assert.NoError(t, err) } _, err := groups.sampleTrace(&model.APMEvent{ Service: model.Service{Name: "few"}, + Processor: model.TransactionProcessor, Transaction: &model.Transaction{}, }) assert.NoError(t, err) _, err = groups.sampleTrace(&model.APMEvent{ Service: model.Service{Name: "another"}, + Processor: model.TransactionProcessor, Transaction: &model.Transaction{}, }) assert.Equal(t, errTooManyTraceGroups, err) @@ -226,6 +234,7 @@ func TestTraceGroupsRemoval(t *testing.T) { // will not be affected by the limit... _, err = groups.sampleTrace(&model.APMEvent{ Service: model.Service{Name: "defined"}, + Processor: model.TransactionProcessor, Transaction: &model.Transaction{}, }) assert.NoError(t, err) @@ -234,6 +243,7 @@ func TestTraceGroupsRemoval(t *testing.T) { // a matching dynamic policy. _, err = groups.sampleTrace(&model.APMEvent{ Service: model.Service{Name: "defined_later"}, + Processor: model.TransactionProcessor, Transaction: &model.Transaction{}, }) assert.Equal(t, errTooManyTraceGroups, err) @@ -245,6 +255,7 @@ func TestTraceGroupsRemoval(t *testing.T) { // We should now be able to add another trace group. _, err = groups.sampleTrace(&model.APMEvent{ Service: model.Service{Name: "another"}, + Processor: model.TransactionProcessor, Transaction: &model.Transaction{}, }) assert.NoError(t, err) @@ -265,6 +276,7 @@ func BenchmarkTraceGroups(b *testing.B) { // Duration is non-zero to ensure transactions have a non-zero chance of // being sampled. tx := model.APMEvent{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ Duration: 1000, Name: uuid.Must(uuid.NewV4()).String(), diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go index 94750d3ae92..8ca85ddf040 100644 --- a/x-pack/apm-server/sampling/processor.go +++ b/x-pack/apm-server/sampling/processor.go @@ -156,14 +156,15 @@ func (p *Processor) ProcessBatch(ctx context.Context, batch *model.Batch) error for i := 0; i < len(events); i++ { event := &events[i] var report, stored bool - if event.Transaction != nil { + switch event.Processor { + case model.TransactionProcessor: var err error atomic.AddInt64(&p.eventMetrics.processed, 1) report, stored, err = p.processTransaction(event) if err != nil { return err } - } else if event.Span != nil { + case model.SpanProcessor: var err error atomic.AddInt64(&p.eventMetrics.processed, 1) report, stored, err = p.processSpan(event) @@ -464,11 +465,12 @@ func (p *Processor) Run() error { // we don't publish duplicates; delivery is therefore // at-most-once, not guaranteed. for _, event := range events { - if event.Transaction != nil { + switch event.Processor { + case model.TransactionProcessor: if err := p.storage.DeleteTraceEvent(event.Transaction.TraceID, event.Transaction.ID); err != nil { return errors.Wrap(err, "failed to delete transaction from local storage") } - } else if event.Span != nil { + case model.SpanProcessor: if err := p.storage.DeleteTraceEvent(event.Span.TraceID, event.Span.ID); err != nil { return errors.Wrap(err, "failed to delete span from local storage") } diff --git a/x-pack/apm-server/sampling/processor_test.go b/x-pack/apm-server/sampling/processor_test.go index f36889a69b2..77deb281388 100644 --- a/x-pack/apm-server/sampling/processor_test.go +++ b/x-pack/apm-server/sampling/processor_test.go @@ -37,6 +37,7 @@ func TestProcessUnsampled(t *testing.T) { defer processor.Stop(context.Background()) in := model.Batch{{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ TraceID: "0102030405060708090a0b0c0d0e0f10", ID: "0102030405060708", @@ -77,41 +78,45 @@ func TestProcessAlreadyTailSampled(t *testing.T) { go processor.Run() defer processor.Stop(context.Background()) - transaction1 := &model.Transaction{ - TraceID: traceID1, - ID: "0102030405060708", - Sampled: true, + transaction1 := model.APMEvent{ + Processor: model.TransactionProcessor, + Transaction: &model.Transaction{ + TraceID: traceID1, + ID: "0102030405060708", + Sampled: true, + }, } - span1 := &model.Span{ - TraceID: traceID1, - ID: "0102030405060709", + span1 := model.APMEvent{ + Processor: model.SpanProcessor, + Span: &model.Span{ + TraceID: traceID1, + ID: "0102030405060709", + }, } - transaction2 := &model.Transaction{ - TraceID: traceID2, - ID: "0102030405060710", - Sampled: true, + transaction2 := model.APMEvent{ + Processor: model.TransactionProcessor, + Transaction: &model.Transaction{ + TraceID: traceID2, + ID: "0102030405060710", + Sampled: true, + }, } - span2 := &model.Span{ - TraceID: traceID2, - ID: "0102030405060711", + span2 := model.APMEvent{ + Processor: model.SpanProcessor, + Span: &model.Span{ + TraceID: traceID2, + ID: "0102030405060711", + }, } - batch := model.Batch{ - {Transaction: transaction1}, - {Transaction: transaction2}, - {Span: span1}, - {Span: span2}, - } + batch := model.Batch{transaction1, transaction2, span1, span2} err = processor.ProcessBatch(context.Background(), &batch) require.NoError(t, err) // Tail sampling decision already made. The first transaction and span should be // reported immediately, whereas the second ones should be written storage since // they were received after the trace sampling entry expired. - assert.Equal(t, model.Batch{ - {Transaction: transaction1}, - {Span: span1}, - }, batch) + assert.Equal(t, model.Batch{transaction1, span1}, batch) expectedMonitoring := monitoring.MakeFlatSnapshot() expectedMonitoring.Ints["sampling.events.processed"] = 4 @@ -133,10 +138,7 @@ func TestProcessAlreadyTailSampled(t *testing.T) { err = reader.ReadTraceEvents(traceID2, &batch) assert.NoError(t, err) - assert.Equal(t, model.Batch{ - {Transaction: transaction2}, - {Span: span2}, - }, batch) + assert.Equal(t, model.Batch{transaction2, span2}, batch) }) } @@ -152,32 +154,38 @@ func TestProcessLocalTailSampling(t *testing.T) { traceID1 := "0102030405060708090a0b0c0d0e0f10" traceID2 := "0102030405060708090a0b0c0d0e0f11" - trace1Events := model.Batch{ - {Transaction: &model.Transaction{ + trace1Events := model.Batch{{ + Processor: model.TransactionProcessor, + Transaction: &model.Transaction{ TraceID: traceID1, ID: "0102030405060708", Duration: 123, Sampled: true, - }}, - {Span: &model.Span{ + }, + }, { + Processor: model.SpanProcessor, + Span: &model.Span{ TraceID: traceID1, ID: "0102030405060709", Duration: 123, - }}, - } - trace2Events := model.Batch{ - {Transaction: &model.Transaction{ + }, + }} + trace2Events := model.Batch{{ + Processor: model.TransactionProcessor, + Transaction: &model.Transaction{ TraceID: traceID2, ID: "0102030405060710", Duration: 456, Sampled: true, - }}, - {Span: &model.Span{ + }, + }, { + Processor: model.SpanProcessor, + Span: &model.Span{ TraceID: traceID2, ID: "0102030405060711", Duration: 456, - }}, - } + }, + }} in := append(trace1Events[:], trace2Events...) err = processor.ProcessBatch(context.Background(), &in) @@ -266,6 +274,7 @@ func TestProcessLocalTailSamplingUnsampled(t *testing.T) { traceID := uuid.Must(uuid.NewV4()).String() traceIDs[i] = traceID batch := model.Batch{{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ TraceID: traceID, ID: traceID, @@ -330,7 +339,8 @@ func TestProcessLocalTailSamplingPolicyOrder(t *testing.T) { _, err := rng.Read(traceIDBytes[:]) require.NoError(t, err) events[i] = model.APMEvent{ - Service: service, + Service: service, + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ Name: "trace_name", TraceID: fmt.Sprintf("%x", traceIDBytes[:]), @@ -399,6 +409,7 @@ func TestProcessRemoteTailSampling(t *testing.T) { traceID1 := "0102030405060708090a0b0c0d0e0f10" traceID2 := "0102030405060708090a0b0c0d0e0f11" trace1Events := model.Batch{{ + Processor: model.SpanProcessor, Span: &model.Span{ TraceID: traceID1, ID: "0102030405060709", @@ -480,7 +491,8 @@ func TestGroupsMonitoring(t *testing.T) { for i := 0; i < config.MaxDynamicServices+1; i++ { err := processor.ProcessBatch(context.Background(), &model.Batch{{ - Service: model.Service{Name: fmt.Sprintf("service_%d", i)}, + Service: model.Service{Name: fmt.Sprintf("service_%d", i)}, + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ TraceID: uuid.Must(uuid.NewV4()).String(), ID: "0102030405060709", @@ -509,6 +521,7 @@ func TestStorageMonitoring(t *testing.T) { for i := 0; i < 100; i++ { traceID := uuid.Must(uuid.NewV4()).String() batch := model.Batch{{ + Processor: model.TransactionProcessor, Transaction: &model.Transaction{ TraceID: traceID, ID: traceID, @@ -552,6 +565,7 @@ func TestStorageGC(t *testing.T) { for i := 0; i < n; i++ { traceID := uuid.Must(uuid.NewV4()).String() batch := model.Batch{{ + Processor: model.SpanProcessor, Span: &model.Span{ TraceID: traceID, ID: traceID, From 5de2075df1da83be6b336f6e4692e5b5758a740d Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Tue, 24 Aug 2021 08:18:17 +0800 Subject: [PATCH 6/7] Update changelog --- changelogs/head.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index 4790d09b362..68060c1b8a4 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -9,6 +9,7 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits] - `transaction.page` and `error.page` no longer recorded {pull}5872[5872] - experimental:["This breaking change applies to the experimental tail-based sampling feature."] `apm-server.sampling.tail` now requires `apm-server.data_streams.enabled` {pull}5952[5952] - beta:["This breaking change applies to the beta <>."] The `traces-sampled-*` data stream is now `traces-apm.sampled-*` {pull}5952[5952] +- Removed unused stacktrace/frame monitoring counters {pull}5984[5984] [float] ==== Bug fixes From 69dbf45444ee9f9b5e43537977f1ce025859f792 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Tue, 24 Aug 2021 08:23:40 +0800 Subject: [PATCH 7/7] Remove redundant setting of event.Processor --- model/modeldecoder/rumv3/decoder.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/model/modeldecoder/rumv3/decoder.go b/model/modeldecoder/rumv3/decoder.go index 3ef4b0cf8fe..ad4001871ad 100644 --- a/model/modeldecoder/rumv3/decoder.go +++ b/model/modeldecoder/rumv3/decoder.go @@ -163,7 +163,6 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch for _, m := range root.Transaction.Metricsets { event := input.Base mapToMetricsetModel(&m, &event) - event.Processor = model.MetricsetProcessor event.Metricset.Transaction.Name = transaction.Transaction.Name event.Metricset.Transaction.Type = transaction.Transaction.Type *batch = append(*batch, event) @@ -173,7 +172,6 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch for _, s := range root.Transaction.Spans { event := input.Base mapToSpanModel(&s, &event) - event.Processor = model.SpanProcessor event.Span.TransactionID = transaction.Transaction.ID event.Trace = transaction.Trace *batch = append(*batch, event) @@ -516,6 +514,7 @@ func mapToAgentModel(from contextServiceAgent, out *model.Agent) { func mapToSpanModel(from *span, event *model.APMEvent) { out := &model.Span{} event.Span = out + event.Processor = model.SpanProcessor // map span specific data if !from.Action.IsSet() && !from.Subtype.IsSet() {