Skip to content

Commit

Permalink
Introduce model.Processor (#5984) (#6033)
Browse files Browse the repository at this point in the history
* Introduce model.Processor

Processor identifies the event kind: transaction, span,
metricset, error, or profile. This is used for routing
events to the appropriate index or data stream, and in
queries for fetching events of a given kind.

* Update SetDataStream to use APMEvent.Processor

* Move monitoring event counters to model processor

Also, remove unused metrics.

* Update aggregators to use Processor

* sampling: use Processor

* Update changelog

* Remove redundant setting of event.Processor

Co-authored-by: stuart nelson <[email protected]>
# Conflicts:
#	changelogs/head.asciidoc
  • Loading branch information
axw authored Aug 24, 2021
1 parent 664bbf3 commit 11ea652
Show file tree
Hide file tree
Showing 34 changed files with 408 additions and 238 deletions.
1 change: 1 addition & 0 deletions beater/api/profile/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func appendProfileSampleBatch(pp *profile.Profile, baseEvent model.APMEvent, out
}

event := baseEvent
event.Processor = model.ProfileProcessor
event.Labels = event.Labels.Clone()
if n := len(sample.Label); n > 0 {
for k, v := range sample.Label {
Expand Down
2 changes: 2 additions & 0 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/elastic/beats/v7/libbeat/common/transport"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/go-ucfg"

"github.com/pkg/errors"
Expand Down Expand Up @@ -479,6 +480,7 @@ func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) R
modelprocessor.SetErrorMessage{},
newObserverBatchProcessor(s.beat.Info),
model.ProcessBatchFunc(ecsVersionBatchProcessor),
modelprocessor.NewEventCounter(monitoring.Default.GetRegistry("apm-server")),
}
if s.config.DefaultServiceEnvironment != "" {
processors = append(processors, &modelprocessor.SetDefaultServiceEnvironment{
Expand Down
5 changes: 4 additions & 1 deletion model/apmevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type APMEvent struct {
Network Network
Session Session
URL URL
Processor Processor
Trace Trace

// Timestamp holds the event timestamp.
Expand Down Expand Up @@ -91,7 +92,8 @@ func (e *APMEvent) BeatEvent(ctx context.Context) beat.Event {
event.Fields = e.Error.fields()
case e.ProfileSample != nil:
event.Fields = e.ProfileSample.fields()
default:
}
if event.Fields == nil {
event.Fields = make(common.MapStr)
}

Expand Down Expand Up @@ -133,6 +135,7 @@ func (e *APMEvent) BeatEvent(ctx context.Context) beat.Event {
fields.maybeSetMapStr("event", e.Event.fields())
fields.maybeSetMapStr("url", e.URL.fields())
fields.maybeSetMapStr("session", e.Session.fields())
fields.maybeSetMapStr("processor", e.Processor.fields())
fields.maybeSetMapStr("trace", e.Trace.fields())
fields.maybeSetString("message", e.Message)
return event
Expand Down
9 changes: 5 additions & 4 deletions model/apmevent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func TestAPMEventFields(t *testing.T) {
Message: "bottle",
Transaction: &Transaction{},
Timestamp: time.Date(2019, 1, 3, 15, 17, 4, 908.596*1e6, time.FixedZone("+0100", 3600)),
Processor: Processor{Name: "processor_name", Event: "processor_event"},
Trace: Trace{ID: traceID},
},
output: common.MapStr{
Expand Down Expand Up @@ -107,12 +108,12 @@ func TestAPMEventFields(t *testing.T) {
"trace": common.MapStr{
"id": traceID,
},

// fields related to APMEvent.Transaction
"processor": common.MapStr{
"name": "transaction",
"event": "transaction",
"name": "processor_name",
"event": "processor_event",
},

// fields related to APMEvent.Transaction
"timestamp": common.MapStr{"us": int64(1546525024908596)},
"transaction": common.MapStr{
"duration": common.MapStr{"us": 0},
Expand Down
30 changes: 4 additions & 26 deletions model/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,15 @@ package model

import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/monitoring"
)

var (
errorMetrics = monitoring.Default.NewRegistry("apm-server.processor.error")
errorTransformations = monitoring.NewInt(errorMetrics, "transformations")
errorStacktraceCounter = monitoring.NewInt(errorMetrics, "stacktraces")
errorFrameCounter = monitoring.NewInt(errorMetrics, "frames")
errorProcessorEntry = common.MapStr{"name": errorProcessorName, "event": errorDocType}
// ErrorProcessor is the Processor value that should be assigned to error events.
ErrorProcessor = Processor{Name: "error", Event: "error"}
)

const (
errorProcessorName = "error"
errorDocType = "error"
ErrorsDataset = "apm.error"
ErrorsDataset = "apm.error"
)

type Error struct {
Expand Down Expand Up @@ -76,16 +70,7 @@ type Log struct {
}

func (e *Error) fields() common.MapStr {
errorTransformations.Inc()

if e.Exception != nil {
addStacktraceCounter(e.Exception.Stacktrace)
}
if e.Log != nil {
addStacktraceCounter(e.Log.Stacktrace)
}

fields := mapStr{"processor": errorProcessorEntry}
var fields mapStr
if e.HTTP != nil {
fields.maybeSetMapStr("http", e.HTTP.transactionTopLevelFields())
}
Expand Down Expand Up @@ -161,13 +146,6 @@ func (e *Error) logFields() common.MapStr {
return common.MapStr(log)
}

func addStacktraceCounter(st Stacktrace) {
if frames := len(st); frames > 0 {
errorStacktraceCounter.Inc()
errorFrameCounter.Add(int64(frames))
}
}

// flattenExceptionTree recursively traverses the causes of an exception to return a slice of exceptions.
// Tree traversal is Depth First.
// The parent of a exception in the resulting slice is at the position indicated by the `parent` property
Expand Down
27 changes: 8 additions & 19 deletions model/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,16 @@ package model

import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/monitoring"
)

const (
metricsetProcessorName = "metric"
metricsetDocType = "metric"
metricsetEventKey = "event"
metricsetTransactionKey = "transaction"
metricsetSpanKey = "span"
AppMetricsDataset = "apm.app"
InternalMetricsDataset = "apm.internal"
AppMetricsDataset = "apm.app"
InternalMetricsDataset = "apm.internal"
)

var (
metricsetMetrics = monitoring.Default.NewRegistry("apm-server.processor.metric")
metricsetTransformations = monitoring.NewInt(metricsetMetrics, "transformations")
metricsetProcessorEntry = common.MapStr{"name": metricsetProcessorName, "event": metricsetDocType}
// MetricsetProcessor is the Processor value that should be assigned to metricset events.
MetricsetProcessor = Processor{Name: "metric", Event: "metric"}
)

// MetricType describes the type of a metric: gauge, counter, or histogram.
Expand Down Expand Up @@ -141,13 +134,9 @@ type MetricsetSpan struct {
}

func (me *Metricset) fields() common.MapStr {
metricsetTransformations.Inc()

var fields mapStr
fields.set("processor", metricsetProcessorEntry)

fields.maybeSetMapStr(metricsetTransactionKey, me.Transaction.fields())
fields.maybeSetMapStr(metricsetSpanKey, me.Span.fields())
fields.maybeSetMapStr("transaction", me.Transaction.fields())
fields.maybeSetMapStr("span", me.Span.fields())
if me.TimeseriesInstanceID != "" {
fields.set("timeseries", common.MapStr{"instance": me.TimeseriesInstanceID})
}
Expand All @@ -158,7 +147,7 @@ func (me *Metricset) fields() common.MapStr {

var metricDescriptions mapStr
for name, sample := range me.Samples {
sample.set(name, fields)
sample.set(name, &fields)

var md mapStr
md.maybeSetString("type", string(sample.Type))
Expand Down Expand Up @@ -190,7 +179,7 @@ func (s *MetricsetSpan) fields() common.MapStr {
return common.MapStr(fields)
}

func (s *MetricsetSample) set(name string, fields mapStr) {
func (s *MetricsetSample) set(name string, fields *mapStr) {
if s.Type == MetricTypeHistogram {
fields.set(name, common.MapStr{
"counts": s.Counts,
Expand Down
12 changes: 2 additions & 10 deletions model/metricset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,12 @@ func TestMetricset(t *testing.T) {
}{
{
Metricset: &Metricset{},
Output: common.MapStr{
"processor": common.MapStr{"event": "metric", "name": "metric"},
},
Msg: "Payload with empty metric.",
Output: common.MapStr{},
Msg: "Payload with empty metric.",
},
{
Metricset: &Metricset{Name: "raj"},
Output: common.MapStr{
"processor": common.MapStr{"event": "metric", "name": "metric"},
"metricset.name": "raj",
},
Msg: "Payload with metricset name.",
Expand All @@ -67,7 +64,6 @@ func TestMetricset(t *testing.T) {
},
},
Output: common.MapStr{
"processor": common.MapStr{"event": "metric", "name": "metric"},
"a.counter": 612.0,
"some.gauge": 9.16,
},
Expand All @@ -82,7 +78,6 @@ func TestMetricset(t *testing.T) {
},
},
Output: common.MapStr{
"processor": common.MapStr{"event": "metric", "name": "metric"},
"transaction": common.MapStr{"type": trType, "name": trName},
"span": common.MapStr{
"type": spType, "subtype": spSubtype,
Expand Down Expand Up @@ -111,7 +106,6 @@ func TestMetricset(t *testing.T) {
DocCount: 6,
},
Output: common.MapStr{
"processor": common.MapStr{"event": "metric", "name": "metric"},
"timeseries": common.MapStr{"instance": "foo"},
"transaction": common.MapStr{
"type": trType,
Expand Down Expand Up @@ -143,7 +137,6 @@ func TestMetricset(t *testing.T) {
},
},
Output: common.MapStr{
"processor": common.MapStr{"event": "metric", "name": "metric"},
"span": common.MapStr{
"type": spType, "subtype": spSubtype,
"destination": common.MapStr{"service": common.MapStr{"resource": resource}},
Expand Down Expand Up @@ -173,7 +166,6 @@ func TestMetricset(t *testing.T) {
},
},
Output: common.MapStr{
"processor": common.MapStr{"event": "metric", "name": "metric"},
"latency_histogram": common.MapStr{
"counts": []int64{1, 2, 3},
"values": []float64{1.1, 2.2, 3.3},
Expand Down
24 changes: 14 additions & 10 deletions model/modeldecoder/rumv3/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,20 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch
*batch = append(*batch, transaction)

for _, m := range root.Transaction.Metricsets {
metricset := input.Base
mapToMetricsetModel(&m, &metricset)
metricset.Metricset.Transaction.Name = transaction.Transaction.Name
metricset.Metricset.Transaction.Type = transaction.Transaction.Type
*batch = append(*batch, metricset)
event := input.Base
mapToMetricsetModel(&m, &event)
event.Metricset.Transaction.Name = transaction.Transaction.Name
event.Metricset.Transaction.Type = transaction.Transaction.Type
*batch = append(*batch, event)
}

offset := len(*batch)
for _, s := range root.Transaction.Spans {
span := input.Base
mapToSpanModel(&s, &span)
span.Span.TransactionID = transaction.Transaction.ID
span.Trace = transaction.Trace
*batch = append(*batch, span)
event := input.Base
mapToSpanModel(&s, &event)
event.Span.TransactionID = transaction.Transaction.ID
event.Trace = transaction.Trace
*batch = append(*batch, event)
}
spans := (*batch)[offset:]
for i, s := range root.Transaction.Spans {
Expand All @@ -190,6 +190,7 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch
func mapToErrorModel(from *errorEvent, event *model.APMEvent) {
out := &model.Error{}
event.Error = out
event.Processor = model.ErrorProcessor

// overwrite metadata with event specific information
mapToServiceModel(from.Context.Service, &event.Service)
Expand Down Expand Up @@ -391,6 +392,7 @@ func mapToMetadataModel(m *metadata, out *model.APMEvent) {
func mapToMetricsetModel(from *metricset, event *model.APMEvent) {
out := &model.Metricset{}
event.Metricset = out
event.Processor = model.MetricsetProcessor

// map samples information
if from.Samples.IsSet() {
Expand Down Expand Up @@ -512,6 +514,7 @@ func mapToAgentModel(from contextServiceAgent, out *model.Agent) {
func mapToSpanModel(from *span, event *model.APMEvent) {
out := &model.Span{}
event.Span = out
event.Processor = model.SpanProcessor

// map span specific data
if !from.Action.IsSet() && !from.Subtype.IsSet() {
Expand Down Expand Up @@ -690,6 +693,7 @@ func mapToStracktraceModel(from []stacktraceFrame, out model.Stacktrace) {
func mapToTransactionModel(from *transaction, event *model.APMEvent) {
out := &model.Transaction{}
event.Transaction = out
event.Processor = model.TransactionProcessor

// overwrite metadata with event specific information
mapToServiceModel(from.Context.Service, &event.Service)
Expand Down
1 change: 1 addition & 0 deletions model/modeldecoder/rumv3/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func metadataExceptions(keys ...string) func(key string) bool {
"Network",
"Observer",
"Process",
"Processor",
"Service.Node",
"Service.Agent.EphemeralID",
"Host",
Expand Down
4 changes: 4 additions & 0 deletions model/modeldecoder/v2/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func mapToClientModel(from contextRequest, out *model.Client) {
func mapToErrorModel(from *errorEvent, config modeldecoder.Config, event *model.APMEvent) {
out := &model.Error{}
event.Error = out
event.Processor = model.ErrorProcessor

// overwrite metadata with event specific information
mapToServiceModel(from.Context.Service, &event.Service)
Expand Down Expand Up @@ -538,6 +539,7 @@ func mapToMetadataModel(from *metadata, out *model.APMEvent) {
func mapToMetricsetModel(from *metricset, config modeldecoder.Config, event *model.APMEvent) {
out := &model.Metricset{}
event.Metricset = out
event.Processor = model.MetricsetProcessor

if !from.Timestamp.Val.IsZero() {
event.Timestamp = from.Timestamp.Val
Expand Down Expand Up @@ -725,6 +727,7 @@ func mapToAgentModel(from contextServiceAgent, out *model.Agent) {
func mapToSpanModel(from *span, config modeldecoder.Config, event *model.APMEvent) {
out := &model.Span{}
event.Span = out
event.Processor = model.SpanProcessor

// map span specific data
if !from.Action.IsSet() && !from.Subtype.IsSet() {
Expand Down Expand Up @@ -987,6 +990,7 @@ func mapToStracktraceModel(from []stacktraceFrame, out model.Stacktrace) {

func mapToTransactionModel(from *transaction, config modeldecoder.Config, event *model.APMEvent) {
out := &model.Transaction{}
event.Processor = model.TransactionProcessor
event.Transaction = out

// overwrite metadata with event specific information
Expand Down
3 changes: 3 additions & 0 deletions model/modeldecoder/v2/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func isUnmappedMetadataField(key string) bool {
"Observer.VersionMajor",
"Process.CommandLine",
"Process.Executable",
"Processor",
"Processor.Event",
"Processor.Name",
"Host.OS.Full",
"Host.OS.Type",
"Host.ID",
Expand Down
10 changes: 5 additions & 5 deletions model/modelprocessor/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func (s *SetDataStream) ProcessBatch(ctx context.Context, b *model.Batch) error
}

func (s *SetDataStream) setDataStream(event *model.APMEvent) {
switch {
case event.Transaction != nil || event.Span != nil:
switch event.Processor {
case model.SpanProcessor, model.TransactionProcessor:
event.DataStream.Type = datastreams.TracesType
event.DataStream.Dataset = model.TracesDataset
case event.Error != nil:
case model.ErrorProcessor:
event.DataStream.Type = datastreams.LogsType
event.DataStream.Dataset = model.ErrorsDataset
case event.Metricset != nil:
case model.MetricsetProcessor:
event.DataStream.Type = datastreams.MetricsType
// Metrics that include well-defined transaction/span fields
// (i.e. breakdown metrics, transaction and span metrics) will
Expand All @@ -58,7 +58,7 @@ func (s *SetDataStream) setDataStream(event *model.APMEvent) {
datastreams.NormalizeServiceName(event.Service.Name),
)
}
case event.ProfileSample != nil:
case model.ProfileProcessor:
event.DataStream.Type = datastreams.MetricsType
event.DataStream.Dataset = model.ProfilesDataset
}
Expand Down
Loading

0 comments on commit 11ea652

Please sign in to comment.