diff --git a/model/batch.go b/model/batch.go index 84b5f50e455..9cffc65cf3b 100644 --- a/model/batch.go +++ b/model/batch.go @@ -69,19 +69,19 @@ func (b *Batch) Len() int { func (b *Batch) Transform(ctx context.Context, cfg *transform.Config) []beat.Event { events := make([]beat.Event, 0, b.Len()) for _, event := range b.Transactions { - events = append(events, event.Transform(ctx, cfg)...) + events = event.appendBeatEvents(cfg, events) } for _, event := range b.Spans { - events = append(events, event.Transform(ctx, cfg)...) + events = event.appendBeatEvents(ctx, cfg, events) } for _, event := range b.Metricsets { - events = append(events, event.Transform(ctx, cfg)...) + events = event.appendBeatEvents(cfg, events) } for _, event := range b.Errors { - events = append(events, event.Transform(ctx, cfg)...) + events = event.appendBeatEvents(ctx, cfg, events) } for _, event := range b.Profiles { - events = append(events, event.Transform(ctx, cfg)...) + events = event.appendBeatEvents(cfg, events) } return events } diff --git a/model/error.go b/model/error.go index 87718083962..71447cb34d2 100644 --- a/model/error.go +++ b/model/error.go @@ -100,7 +100,7 @@ type Log struct { Stacktrace Stacktrace } -func (e *Error) Transform(ctx context.Context, cfg *transform.Config) []beat.Event { +func (e *Error) appendBeatEvents(ctx context.Context, cfg *transform.Config, events []beat.Event) []beat.Event { errorTransformations.Inc() if e.Exception != nil { @@ -159,10 +159,10 @@ func (e *Error) Transform(ctx context.Context, cfg *transform.Config) []beat.Eve fields.maybeSetMapStr("trace", common.MapStr(trace)) fields.maybeSetMapStr("timestamp", utility.TimeAsMicros(e.Timestamp)) - return []beat.Event{{ + return append(events, beat.Event{ Fields: common.MapStr(fields), Timestamp: e.Timestamp, - }} + }) } func (e *Error) fields(ctx context.Context, cfg *transform.Config) common.MapStr { diff --git a/model/error_test.go b/model/error_test.go index bc937895e2c..7406259cb2e 100644 --- a/model/error_test.go +++ b/model/error_test.go @@ -259,9 +259,9 @@ func TestEventFields(t *testing.T) { for name, tc := range tests { t.Run(name, func(t *testing.T) { - output := tc.Error.Transform(context.Background(), &transform.Config{ + output := tc.Error.appendBeatEvents(context.Background(), &transform.Config{ RUM: transform.RUMConfig{SourcemapStore: &sourcemap.Store{}}, - }) + }, nil) require.Len(t, output, 1) fields := output[0].Fields["error"] assert.Equal(t, tc.Output, fields) @@ -299,12 +299,12 @@ func TestEvents(t *testing.T) { mdWithContext.UserAgent.Original = userAgent for name, tc := range map[string]struct { - Transformable transform.Transformable - Output common.MapStr - Msg string + Error *Error + Output common.MapStr + Msg string }{ "valid": { - Transformable: &Error{Timestamp: timestamp, Metadata: md}, + Error: &Error{Timestamp: timestamp, Metadata: md}, Output: common.MapStr{ "data_stream.type": "logs", "data_stream.dataset": "apm.error.myservice", @@ -319,7 +319,7 @@ func TestEvents(t *testing.T) { }, }, "notSampled": { - Transformable: &Error{Timestamp: timestamp, Metadata: md, TransactionSampled: &sampledFalse}, + Error: &Error{Timestamp: timestamp, Metadata: md, TransactionSampled: &sampledFalse}, Output: common.MapStr{ "data_stream.type": "logs", "data_stream.dataset": "apm.error.myservice", @@ -335,7 +335,7 @@ func TestEvents(t *testing.T) { }, }, "withMeta": { - Transformable: &Error{Timestamp: timestamp, Metadata: md, TransactionType: transactionType}, + Error: &Error{Timestamp: timestamp, Metadata: md, TransactionType: transactionType}, Output: common.MapStr{ "data_stream.type": "logs", "data_stream.dataset": "apm.error.myservice", @@ -351,7 +351,7 @@ func TestEvents(t *testing.T) { }, }, "withContext": { - Transformable: &Error{ + Error: &Error{ Timestamp: timestamp, Metadata: mdWithContext, Log: baseLog(), @@ -409,10 +409,10 @@ func TestEvents(t *testing.T) { }, } { t.Run(name, func(t *testing.T) { - outputEvents := tc.Transformable.Transform(context.Background(), &transform.Config{ + outputEvents := tc.Error.appendBeatEvents(context.Background(), &transform.Config{ DataStreams: true, RUM: transform.RUMConfig{SourcemapStore: &sourcemap.Store{}}, - }) + }, nil) require.Len(t, outputEvents, 1) outputEvent := outputEvents[0] assert.Equal(t, tc.Output, outputEvent.Fields) @@ -588,7 +588,7 @@ func TestErrorTransformPage(t *testing.T) { } for idx, test := range tests { - output := test.Error.Transform(context.Background(), &transform.Config{}) + output := test.Error.appendBeatEvents(context.Background(), &transform.Config{}, nil) assert.Equal(t, test.Output, output[0].Fields["url"], fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg)) } } diff --git a/model/metricset.go b/model/metricset.go index 72377902769..71324f5a99f 100644 --- a/model/metricset.go +++ b/model/metricset.go @@ -18,7 +18,6 @@ package model import ( - "context" "fmt" "time" @@ -156,7 +155,7 @@ type MetricsetSpan struct { DestinationService DestinationService } -func (me *Metricset) Transform(ctx context.Context, cfg *transform.Config) []beat.Event { +func (me *Metricset) appendBeatEvents(cfg *transform.Config, events []beat.Event) []beat.Event { metricsetTransformations.Inc() if me == nil { return nil @@ -219,10 +218,10 @@ func (me *Metricset) Transform(ctx context.Context, cfg *transform.Config) []bea fields[datastreams.TypeField] = datastreams.MetricsType } - return []beat.Event{{ + return append(events, beat.Event{ Fields: common.MapStr(fields), Timestamp: me.Timestamp, - }} + }) } func (e *MetricsetEventCategorization) fields() common.MapStr { diff --git a/model/metricset_test.go b/model/metricset_test.go index 2f798cf5298..41582ab0cf1 100644 --- a/model/metricset_test.go +++ b/model/metricset_test.go @@ -18,7 +18,6 @@ package model import ( - "context" "fmt" "testing" "time" @@ -230,7 +229,7 @@ func TestTransform(t *testing.T) { } for idx, test := range tests { - outputEvents := test.Metricset.Transform(context.Background(), &transform.Config{DataStreams: true}) + outputEvents := test.Metricset.appendBeatEvents(&transform.Config{DataStreams: true}, nil) for j, outputEvent := range outputEvents { assert.Equal(t, test.Output[j], outputEvent.Fields, fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg)) diff --git a/model/profile.go b/model/profile.go index cfefb355919..8b2d7b32e14 100644 --- a/model/profile.go +++ b/model/profile.go @@ -18,7 +18,6 @@ package model import ( - "context" "fmt" "time" @@ -49,8 +48,9 @@ type PprofProfile struct { Profile *profile.Profile } -// Transform transforms a Profile into a sequence of beat.Events: one per profile sample. -func (pp PprofProfile) Transform(ctx context.Context, cfg *transform.Config) []beat.Event { +// appendBeatEvents transforms a Profile into a sequence of beat.Events (one per profile sample), +// and appends them to events. +func (pp PprofProfile) appendBeatEvents(cfg *transform.Config, events []beat.Event) []beat.Event { // Precompute value field names for use in each event. // TODO(axw) limit to well-known value names? profileTimestamp := time.Unix(0, pp.Profile.TimeNanos) @@ -77,9 +77,7 @@ func (pp PprofProfile) Transform(ctx context.Context, cfg *transform.Config) []b // Profiles are stored in their own "metrics" data stream, with a data // set per service. This enables managing retention of profiling data // per-service, and indepedently of lower volume metrics. - - samples := make([]beat.Event, len(pp.Profile.Sample)) - for i, sample := range pp.Profile.Sample { + for _, sample := range pp.Profile.Sample { profileFields := common.MapStr{} if profileID != "" { profileFields["id"] = profileID @@ -141,12 +139,12 @@ func (pp PprofProfile) Transform(ctx context.Context, cfg *transform.Config) []b } } pp.Metadata.set(&fields, profileLabels) - samples[i] = beat.Event{ + events = append(events, beat.Event{ Timestamp: profileTimestamp, Fields: common.MapStr(fields), - } + }) } - return samples + return events } func normalizeUnit(unit string) string { diff --git a/model/profile_test.go b/model/profile_test.go index ada105c6b13..b10bb916c53 100644 --- a/model/profile_test.go +++ b/model/profile_test.go @@ -87,7 +87,8 @@ func TestPprofProfileTransform(t *testing.T) { }, } - output := pp.Transform(context.Background(), &transform.Config{DataStreams: true}) + batch := &model.Batch{Profiles: []*model.PprofProfile{&pp}} + output := batch.Transform(context.Background(), &transform.Config{DataStreams: true}) require.Len(t, output, 2) assert.Equal(t, output[0], output[1]) diff --git a/model/span.go b/model/span.go index 63716376127..1cd09f8d9c0 100644 --- a/model/span.go +++ b/model/span.go @@ -185,7 +185,7 @@ func (d *DestinationService) fields() common.MapStr { return common.MapStr(fields) } -func (e *Span) Transform(ctx context.Context, cfg *transform.Config) []beat.Event { +func (e *Span) appendBeatEvents(ctx context.Context, cfg *transform.Config, events []beat.Event) []beat.Event { spanTransformations.Inc() if frames := len(e.Stacktrace); frames > 0 { spanStacktraceCounter.Inc() @@ -239,10 +239,10 @@ func (e *Span) Transform(ctx context.Context, cfg *transform.Config) []beat.Even } common.MapStr(fields).Put("event.outcome", e.Outcome) - return []beat.Event{{ + return append(events, beat.Event{ Fields: common.MapStr(fields), Timestamp: e.Timestamp, - }} + }) } func (e *Span) fields(ctx context.Context, cfg *transform.Config) common.MapStr { diff --git a/model/span_test.go b/model/span_test.go index db9b3ce27b2..5d44e7df5f7 100644 --- a/model/span_test.go +++ b/model/span_test.go @@ -175,10 +175,10 @@ func TestSpanTransform(t *testing.T) { } for _, test := range tests { - output := test.Span.Transform(context.Background(), &transform.Config{ + output := test.Span.appendBeatEvents(context.Background(), &transform.Config{ DataStreams: true, RUM: transform.RUMConfig{SourcemapStore: &sourcemap.Store{}}, - }) + }, nil) fields := output[0].Fields assert.Equal(t, test.Output, fields, test.Msg) } diff --git a/model/transaction.go b/model/transaction.go index b2cb36dc783..fdcc22459a7 100644 --- a/model/transaction.go +++ b/model/transaction.go @@ -18,7 +18,6 @@ package model import ( - "context" "fmt" "time" @@ -111,7 +110,7 @@ func (e *Transaction) fields() common.MapStr { return common.MapStr(fields) } -func (e *Transaction) Transform(_ context.Context, cfg *transform.Config) []beat.Event { +func (e *Transaction) appendBeatEvents(cfg *transform.Config, events []beat.Event) []beat.Event { transactionTransformations.Inc() fields := mapStr{ @@ -155,10 +154,10 @@ func (e *Transaction) Transform(_ context.Context, cfg *transform.Config) []beat } common.MapStr(fields).Put("event.outcome", e.Outcome) - return []beat.Event{{ + return append(events, beat.Event{ Timestamp: e.Timestamp, Fields: common.MapStr(fields), - }} + }) } type TransactionMarks map[string]TransactionMark diff --git a/model/transaction_test.go b/model/transaction_test.go index 38ca0eda090..73febe3762f 100644 --- a/model/transaction_test.go +++ b/model/transaction_test.go @@ -18,7 +18,6 @@ package model import ( - "context" "fmt" "net" "net/http" @@ -126,14 +125,14 @@ func TestTransactionTransform(t *testing.T) { } for idx, test := range tests { - output := test.Transaction.Transform(context.Background(), &transform.Config{}) + output := test.Transaction.appendBeatEvents(&transform.Config{}, nil) assert.Equal(t, test.Output, output[0].Fields["transaction"], fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg)) } } func TestTransactionTransformOutcome(t *testing.T) { tx := Transaction{Outcome: "success"} - events := tx.Transform(context.Background(), &transform.Config{}) + events := tx.appendBeatEvents(&transform.Config{}, nil) require.Len(t, events, 1) assert.Equal(t, common.MapStr{"outcome": "success"}, events[0].Fields["event"]) } @@ -177,7 +176,7 @@ func TestEventsTransformWithMetadata(t *testing.T) { Custom: common.MapStr{"foo.bar": "baz"}, Message: &Message{QueueName: "routeUser"}, } - events := txWithContext.Transform(context.Background(), &transform.Config{DataStreams: true}) + events := txWithContext.appendBeatEvents(&transform.Config{DataStreams: true}, nil) require.Len(t, events, 1) assert.Equal(t, events[0].Fields, common.MapStr{ "data_stream.type": "traces", @@ -229,7 +228,7 @@ func TestTransformTransactionHTTP(t *testing.T) { tx := Transaction{ HTTP: &Http{Request: &request}, } - events := tx.Transform(context.Background(), &transform.Config{}) + events := tx.appendBeatEvents(&transform.Config{}, nil) require.Len(t, events, 1) assert.Equal(t, common.MapStr{ "request": common.MapStr{ @@ -292,7 +291,7 @@ func TestTransactionTransformPage(t *testing.T) { } for idx, test := range tests { - output := test.Transaction.Transform(context.Background(), &transform.Config{}) + output := test.Transaction.appendBeatEvents(&transform.Config{}, nil) assert.Equal(t, test.Output, output[0].Fields["url"], fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg)) } } @@ -321,7 +320,7 @@ func TestTransactionTransformMarks(t *testing.T) { } for idx, test := range tests { - output := test.Transaction.Transform(context.Background(), &transform.Config{}) + output := test.Transaction.appendBeatEvents(&transform.Config{}, nil) marks, _ := output[0].Fields.GetValue("transaction.marks") assert.Equal(t, test.Output, marks, fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg)) }