Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce model.Processor #5984

Merged
merged 11 commits into from
Aug 24, 2021
1 change: 1 addition & 0 deletions beater/api/profile/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func appendProfileSampleBatch(pp *profile.Profile, baseEvent model.APMEvent, out
}

event := baseEvent
event.Processor = model.ProfileProcessor
event.Labels = event.Labels.Clone()
if n := len(sample.Label); n > 0 {
for k, v := range sample.Label {
Expand Down
2 changes: 2 additions & 0 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/elastic/beats/v7/libbeat/common/transport"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/go-ucfg"

"github.com/pkg/errors"
Expand Down Expand Up @@ -479,6 +480,7 @@ func (s *serverRunner) wrapRunServerWithPreprocessors(runServer RunServerFunc) R
modelprocessor.SetErrorMessage{},
newObserverBatchProcessor(s.beat.Info),
model.ProcessBatchFunc(ecsVersionBatchProcessor),
modelprocessor.NewEventCounter(monitoring.Default.GetRegistry("apm-server")),
}
if s.config.DefaultServiceEnvironment != "" {
processors = append(processors, &modelprocessor.SetDefaultServiceEnvironment{
Expand Down
5 changes: 4 additions & 1 deletion model/apmevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type APMEvent struct {
Network Network
Session Session
URL URL
Processor Processor

// Timestamp holds the event timestamp.
//
Expand Down Expand Up @@ -90,7 +91,8 @@ func (e *APMEvent) BeatEvent(ctx context.Context) beat.Event {
event.Fields = e.Error.fields()
case e.ProfileSample != nil:
event.Fields = e.ProfileSample.fields()
default:
}
if event.Fields == nil {
event.Fields = make(common.MapStr)
}

Expand Down Expand Up @@ -132,6 +134,7 @@ func (e *APMEvent) BeatEvent(ctx context.Context) beat.Event {
fields.maybeSetMapStr("event", e.Event.fields())
fields.maybeSetMapStr("url", e.URL.fields())
fields.maybeSetMapStr("session", e.Session.fields())
fields.maybeSetMapStr("processor", e.Processor.fields())
fields.maybeSetString("message", e.Message)
return event
}
9 changes: 5 additions & 4 deletions model/apmevent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func TestAPMEventFields(t *testing.T) {
Message: "bottle",
Transaction: &Transaction{},
Timestamp: time.Date(2019, 1, 3, 15, 17, 4, 908.596*1e6, time.FixedZone("+0100", 3600)),
Processor: Processor{Name: "processor_name", Event: "processor_event"},
},
output: common.MapStr{
// common fields
Expand Down Expand Up @@ -102,12 +103,12 @@ func TestAPMEventFields(t *testing.T) {
"c": 123,
},
"message": "bottle",

// fields related to APMEvent.Transaction
"processor": common.MapStr{
"name": "transaction",
"event": "transaction",
"name": "processor_name",
"event": "processor_event",
},

// fields related to APMEvent.Transaction
"timestamp": common.MapStr{"us": int64(1546525024908596)},
"transaction": common.MapStr{
"duration": common.MapStr{"us": 0},
Expand Down
30 changes: 4 additions & 26 deletions model/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,15 @@ package model

import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/monitoring"
)

var (
errorMetrics = monitoring.Default.NewRegistry("apm-server.processor.error")
errorTransformations = monitoring.NewInt(errorMetrics, "transformations")
errorStacktraceCounter = monitoring.NewInt(errorMetrics, "stacktraces")
errorFrameCounter = monitoring.NewInt(errorMetrics, "frames")
errorProcessorEntry = common.MapStr{"name": errorProcessorName, "event": errorDocType}
// ErrorProcessor is the Processor value that should be assigned to error events.
ErrorProcessor = Processor{Name: "error", Event: "error"}
)

const (
errorProcessorName = "error"
errorDocType = "error"
ErrorsDataset = "apm.error"
ErrorsDataset = "apm.error"
)

type Error struct {
Expand Down Expand Up @@ -77,16 +71,7 @@ type Log struct {
}

func (e *Error) fields() common.MapStr {
errorTransformations.Inc()

if e.Exception != nil {
addStacktraceCounter(e.Exception.Stacktrace)
}
if e.Log != nil {
addStacktraceCounter(e.Log.Stacktrace)
}

fields := mapStr{"processor": errorProcessorEntry}
var fields mapStr
if e.HTTP != nil {
fields.maybeSetMapStr("http", e.HTTP.transactionTopLevelFields())
}
Expand Down Expand Up @@ -164,13 +149,6 @@ func (e *Error) logFields() common.MapStr {
return common.MapStr(log)
}

func addStacktraceCounter(st Stacktrace) {
if frames := len(st); frames > 0 {
errorStacktraceCounter.Inc()
errorFrameCounter.Add(int64(frames))
}
}

// flattenExceptionTree recursively traverses the causes of an exception to return a slice of exceptions.
// Tree traversal is Depth First.
// The parent of a exception in the resulting slice is at the position indicated by the `parent` property
Expand Down
27 changes: 8 additions & 19 deletions model/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,16 @@ package model

import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/monitoring"
)

const (
metricsetProcessorName = "metric"
metricsetDocType = "metric"
metricsetEventKey = "event"
metricsetTransactionKey = "transaction"
metricsetSpanKey = "span"
AppMetricsDataset = "apm.app"
InternalMetricsDataset = "apm.internal"
AppMetricsDataset = "apm.app"
InternalMetricsDataset = "apm.internal"
)

var (
metricsetMetrics = monitoring.Default.NewRegistry("apm-server.processor.metric")
metricsetTransformations = monitoring.NewInt(metricsetMetrics, "transformations")
metricsetProcessorEntry = common.MapStr{"name": metricsetProcessorName, "event": metricsetDocType}
// MetricsetProcessor is the Processor value that should be assigned to metricset events.
MetricsetProcessor = Processor{Name: "metric", Event: "metric"}
)

// MetricType describes the type of a metric: gauge, counter, or histogram.
Expand Down Expand Up @@ -141,13 +134,9 @@ type MetricsetSpan struct {
}

func (me *Metricset) fields() common.MapStr {
metricsetTransformations.Inc()

var fields mapStr
fields.set("processor", metricsetProcessorEntry)

fields.maybeSetMapStr(metricsetTransactionKey, me.Transaction.fields())
fields.maybeSetMapStr(metricsetSpanKey, me.Span.fields())
fields.maybeSetMapStr("transaction", me.Transaction.fields())
fields.maybeSetMapStr("span", me.Span.fields())
if me.TimeseriesInstanceID != "" {
fields.set("timeseries", common.MapStr{"instance": me.TimeseriesInstanceID})
}
Expand All @@ -158,7 +147,7 @@ func (me *Metricset) fields() common.MapStr {

var metricDescriptions mapStr
for name, sample := range me.Samples {
sample.set(name, fields)
sample.set(name, &fields)

var md mapStr
md.maybeSetString("type", string(sample.Type))
Expand Down Expand Up @@ -190,7 +179,7 @@ func (s *MetricsetSpan) fields() common.MapStr {
return common.MapStr(fields)
}

func (s *MetricsetSample) set(name string, fields mapStr) {
func (s *MetricsetSample) set(name string, fields *mapStr) {
if s.Type == MetricTypeHistogram {
fields.set(name, common.MapStr{
"counts": s.Counts,
Expand Down
12 changes: 2 additions & 10 deletions model/metricset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,12 @@ func TestMetricset(t *testing.T) {
}{
{
Metricset: &Metricset{},
Output: common.MapStr{
"processor": common.MapStr{"event": "metric", "name": "metric"},
},
Msg: "Payload with empty metric.",
Output: common.MapStr{},
Msg: "Payload with empty metric.",
},
{
Metricset: &Metricset{Name: "raj"},
Output: common.MapStr{
"processor": common.MapStr{"event": "metric", "name": "metric"},
"metricset.name": "raj",
},
Msg: "Payload with metricset name.",
Expand All @@ -67,7 +64,6 @@ func TestMetricset(t *testing.T) {
},
},
Output: common.MapStr{
"processor": common.MapStr{"event": "metric", "name": "metric"},
"a.counter": 612.0,
"some.gauge": 9.16,
},
Expand All @@ -82,7 +78,6 @@ func TestMetricset(t *testing.T) {
},
},
Output: common.MapStr{
"processor": common.MapStr{"event": "metric", "name": "metric"},
"transaction": common.MapStr{"type": trType, "name": trName},
"span": common.MapStr{
"type": spType, "subtype": spSubtype,
Expand Down Expand Up @@ -111,7 +106,6 @@ func TestMetricset(t *testing.T) {
DocCount: 6,
},
Output: common.MapStr{
"processor": common.MapStr{"event": "metric", "name": "metric"},
"timeseries": common.MapStr{"instance": "foo"},
"transaction": common.MapStr{
"type": trType,
Expand Down Expand Up @@ -143,7 +137,6 @@ func TestMetricset(t *testing.T) {
},
},
Output: common.MapStr{
"processor": common.MapStr{"event": "metric", "name": "metric"},
"span": common.MapStr{
"type": spType, "subtype": spSubtype,
"destination": common.MapStr{"service": common.MapStr{"resource": resource}},
Expand Down Expand Up @@ -173,7 +166,6 @@ func TestMetricset(t *testing.T) {
},
},
Output: common.MapStr{
"processor": common.MapStr{"event": "metric", "name": "metric"},
"latency_histogram": common.MapStr{
"counts": []int64{1, 2, 3},
"values": []float64{1.1, 2.2, 3.3},
Expand Down
25 changes: 15 additions & 10 deletions model/modeldecoder/rumv3/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,22 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch
*batch = append(*batch, transaction)

for _, m := range root.Transaction.Metricsets {
metricset := input.Base
mapToMetricsetModel(&m, &metricset)
metricset.Metricset.Transaction.Name = transaction.Transaction.Name
metricset.Metricset.Transaction.Type = transaction.Transaction.Type
*batch = append(*batch, metricset)
event := input.Base
mapToMetricsetModel(&m, &event)
event.Processor = model.MetricsetProcessor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears this event.Processor = model.MetricsetProcessor is also being set in mapToMetricsetModel? Is that intentional?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, I've removed these ones and consolidated on setting it in mapToFooModel

event.Metricset.Transaction.Name = transaction.Transaction.Name
event.Metricset.Transaction.Type = transaction.Transaction.Type
*batch = append(*batch, event)
}

offset := len(*batch)
for _, s := range root.Transaction.Spans {
span := input.Base
mapToSpanModel(&s, &span)
span.Span.TransactionID = transaction.Transaction.ID
span.Span.TraceID = transaction.Transaction.TraceID
*batch = append(*batch, span)
event := input.Base
mapToSpanModel(&s, &event)
event.Processor = model.SpanProcessor
event.Span.TransactionID = transaction.Transaction.ID
event.Span.TraceID = transaction.Transaction.TraceID
*batch = append(*batch, event)
}
spans := (*batch)[offset:]
for i, s := range root.Transaction.Spans {
Expand All @@ -190,6 +192,7 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch
func mapToErrorModel(from *errorEvent, event *model.APMEvent) {
out := &model.Error{}
event.Error = out
event.Processor = model.ErrorProcessor

// overwrite metadata with event specific information
mapToServiceModel(from.Context.Service, &event.Service)
Expand Down Expand Up @@ -391,6 +394,7 @@ func mapToMetadataModel(m *metadata, out *model.APMEvent) {
func mapToMetricsetModel(from *metricset, event *model.APMEvent) {
out := &model.Metricset{}
event.Metricset = out
event.Processor = model.MetricsetProcessor

// map samples information
if from.Samples.IsSet() {
Expand Down Expand Up @@ -690,6 +694,7 @@ func mapToStracktraceModel(from []stacktraceFrame, out model.Stacktrace) {
func mapToTransactionModel(from *transaction, event *model.APMEvent) {
out := &model.Transaction{}
event.Transaction = out
event.Processor = model.TransactionProcessor

// overwrite metadata with event specific information
mapToServiceModel(from.Context.Service, &event.Service)
Expand Down
1 change: 1 addition & 0 deletions model/modeldecoder/rumv3/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func metadataExceptions(keys ...string) func(key string) bool {
"Network",
"Observer",
"Process",
"Processor",
"Service.Node",
"Service.Agent.EphemeralID",
"Host",
Expand Down
4 changes: 4 additions & 0 deletions model/modeldecoder/v2/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func mapToClientModel(from contextRequest, out *model.Client) {
func mapToErrorModel(from *errorEvent, config modeldecoder.Config, event *model.APMEvent) {
out := &model.Error{}
event.Error = out
event.Processor = model.ErrorProcessor

// overwrite metadata with event specific information
mapToServiceModel(from.Context.Service, &event.Service)
Expand Down Expand Up @@ -538,6 +539,7 @@ func mapToMetadataModel(from *metadata, out *model.APMEvent) {
func mapToMetricsetModel(from *metricset, config modeldecoder.Config, event *model.APMEvent) {
out := &model.Metricset{}
event.Metricset = out
event.Processor = model.MetricsetProcessor

if !from.Timestamp.Val.IsZero() {
event.Timestamp = from.Timestamp.Val
Expand Down Expand Up @@ -725,6 +727,7 @@ func mapToAgentModel(from contextServiceAgent, out *model.Agent) {
func mapToSpanModel(from *span, config modeldecoder.Config, event *model.APMEvent) {
out := &model.Span{}
event.Span = out
event.Processor = model.SpanProcessor

// map span specific data
if !from.Action.IsSet() && !from.Subtype.IsSet() {
Expand Down Expand Up @@ -987,6 +990,7 @@ func mapToStracktraceModel(from []stacktraceFrame, out model.Stacktrace) {

func mapToTransactionModel(from *transaction, config modeldecoder.Config, event *model.APMEvent) {
out := &model.Transaction{}
event.Processor = model.TransactionProcessor
event.Transaction = out

// overwrite metadata with event specific information
Expand Down
3 changes: 3 additions & 0 deletions model/modeldecoder/v2/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ func isUnmappedMetadataField(key string) bool {
"Observer.VersionMajor",
"Process.CommandLine",
"Process.Executable",
"Processor",
"Processor.Event",
"Processor.Name",
"Host.OS.Full",
"Host.OS.Type",
"Host.ID",
Expand Down
10 changes: 5 additions & 5 deletions model/modelprocessor/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func (s *SetDataStream) ProcessBatch(ctx context.Context, b *model.Batch) error
}

func (s *SetDataStream) setDataStream(event *model.APMEvent) {
switch {
case event.Transaction != nil || event.Span != nil:
switch event.Processor {
case model.SpanProcessor, model.TransactionProcessor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥳

event.DataStream.Type = datastreams.TracesType
event.DataStream.Dataset = model.TracesDataset
case event.Error != nil:
case model.ErrorProcessor:
event.DataStream.Type = datastreams.LogsType
event.DataStream.Dataset = model.ErrorsDataset
case event.Metricset != nil:
case model.MetricsetProcessor:
event.DataStream.Type = datastreams.MetricsType
// Metrics that include well-defined transaction/span fields
// (i.e. breakdown metrics, transaction and span metrics) will
Expand All @@ -58,7 +58,7 @@ func (s *SetDataStream) setDataStream(event *model.APMEvent) {
datastreams.NormalizeServiceName(event.Service.Name),
)
}
case event.ProfileSample != nil:
case model.ProfileProcessor:
event.DataStream.Type = datastreams.MetricsType
event.DataStream.Dataset = model.ProfilesDataset
}
Expand Down
Loading