Skip to content

Commit

Permalink
model: append events on transformation (#4926)
Browse files Browse the repository at this point in the history
Rather than having each model type create a slice
only to append it to another slice, append to that
slice directly.
  • Loading branch information
axw authored Mar 8, 2021
1 parent 91aeb6a commit 704e1d5
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 52 deletions.
10 changes: 5 additions & 5 deletions model/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,19 @@ func (b *Batch) Len() int {
func (b *Batch) Transform(ctx context.Context, cfg *transform.Config) []beat.Event {
events := make([]beat.Event, 0, b.Len())
for _, event := range b.Transactions {
events = append(events, event.Transform(ctx, cfg)...)
events = event.appendBeatEvents(cfg, events)
}
for _, event := range b.Spans {
events = append(events, event.Transform(ctx, cfg)...)
events = event.appendBeatEvents(ctx, cfg, events)
}
for _, event := range b.Metricsets {
events = append(events, event.Transform(ctx, cfg)...)
events = event.appendBeatEvents(cfg, events)
}
for _, event := range b.Errors {
events = append(events, event.Transform(ctx, cfg)...)
events = event.appendBeatEvents(ctx, cfg, events)
}
for _, event := range b.Profiles {
events = append(events, event.Transform(ctx, cfg)...)
events = event.appendBeatEvents(cfg, events)
}
return events
}
6 changes: 3 additions & 3 deletions model/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type Log struct {
Stacktrace Stacktrace
}

func (e *Error) Transform(ctx context.Context, cfg *transform.Config) []beat.Event {
func (e *Error) appendBeatEvents(ctx context.Context, cfg *transform.Config, events []beat.Event) []beat.Event {
errorTransformations.Inc()

if e.Exception != nil {
Expand Down Expand Up @@ -159,10 +159,10 @@ func (e *Error) Transform(ctx context.Context, cfg *transform.Config) []beat.Eve
fields.maybeSetMapStr("trace", common.MapStr(trace))
fields.maybeSetMapStr("timestamp", utility.TimeAsMicros(e.Timestamp))

return []beat.Event{{
return append(events, beat.Event{
Fields: common.MapStr(fields),
Timestamp: e.Timestamp,
}}
})
}

func (e *Error) fields(ctx context.Context, cfg *transform.Config) common.MapStr {
Expand Down
24 changes: 12 additions & 12 deletions model/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,9 @@ func TestEventFields(t *testing.T) {

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
output := tc.Error.Transform(context.Background(), &transform.Config{
output := tc.Error.appendBeatEvents(context.Background(), &transform.Config{
RUM: transform.RUMConfig{SourcemapStore: &sourcemap.Store{}},
})
}, nil)
require.Len(t, output, 1)
fields := output[0].Fields["error"]
assert.Equal(t, tc.Output, fields)
Expand Down Expand Up @@ -299,12 +299,12 @@ func TestEvents(t *testing.T) {
mdWithContext.UserAgent.Original = userAgent

for name, tc := range map[string]struct {
Transformable transform.Transformable
Output common.MapStr
Msg string
Error *Error
Output common.MapStr
Msg string
}{
"valid": {
Transformable: &Error{Timestamp: timestamp, Metadata: md},
Error: &Error{Timestamp: timestamp, Metadata: md},
Output: common.MapStr{
"data_stream.type": "logs",
"data_stream.dataset": "apm.error.myservice",
Expand All @@ -319,7 +319,7 @@ func TestEvents(t *testing.T) {
},
},
"notSampled": {
Transformable: &Error{Timestamp: timestamp, Metadata: md, TransactionSampled: &sampledFalse},
Error: &Error{Timestamp: timestamp, Metadata: md, TransactionSampled: &sampledFalse},
Output: common.MapStr{
"data_stream.type": "logs",
"data_stream.dataset": "apm.error.myservice",
Expand All @@ -335,7 +335,7 @@ func TestEvents(t *testing.T) {
},
},
"withMeta": {
Transformable: &Error{Timestamp: timestamp, Metadata: md, TransactionType: transactionType},
Error: &Error{Timestamp: timestamp, Metadata: md, TransactionType: transactionType},
Output: common.MapStr{
"data_stream.type": "logs",
"data_stream.dataset": "apm.error.myservice",
Expand All @@ -351,7 +351,7 @@ func TestEvents(t *testing.T) {
},
},
"withContext": {
Transformable: &Error{
Error: &Error{
Timestamp: timestamp,
Metadata: mdWithContext,
Log: baseLog(),
Expand Down Expand Up @@ -409,10 +409,10 @@ func TestEvents(t *testing.T) {
},
} {
t.Run(name, func(t *testing.T) {
outputEvents := tc.Transformable.Transform(context.Background(), &transform.Config{
outputEvents := tc.Error.appendBeatEvents(context.Background(), &transform.Config{
DataStreams: true,
RUM: transform.RUMConfig{SourcemapStore: &sourcemap.Store{}},
})
}, nil)
require.Len(t, outputEvents, 1)
outputEvent := outputEvents[0]
assert.Equal(t, tc.Output, outputEvent.Fields)
Expand Down Expand Up @@ -588,7 +588,7 @@ func TestErrorTransformPage(t *testing.T) {
}

for idx, test := range tests {
output := test.Error.Transform(context.Background(), &transform.Config{})
output := test.Error.appendBeatEvents(context.Background(), &transform.Config{}, nil)
assert.Equal(t, test.Output, output[0].Fields["url"], fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg))
}
}
Expand Down
7 changes: 3 additions & 4 deletions model/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package model

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -156,7 +155,7 @@ type MetricsetSpan struct {
DestinationService DestinationService
}

func (me *Metricset) Transform(ctx context.Context, cfg *transform.Config) []beat.Event {
func (me *Metricset) appendBeatEvents(cfg *transform.Config, events []beat.Event) []beat.Event {
metricsetTransformations.Inc()
if me == nil {
return nil
Expand Down Expand Up @@ -219,10 +218,10 @@ func (me *Metricset) Transform(ctx context.Context, cfg *transform.Config) []bea
fields[datastreams.TypeField] = datastreams.MetricsType
}

return []beat.Event{{
return append(events, beat.Event{
Fields: common.MapStr(fields),
Timestamp: me.Timestamp,
}}
})
}

func (e *MetricsetEventCategorization) fields() common.MapStr {
Expand Down
3 changes: 1 addition & 2 deletions model/metricset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package model

import (
"context"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -230,7 +229,7 @@ func TestTransform(t *testing.T) {
}

for idx, test := range tests {
outputEvents := test.Metricset.Transform(context.Background(), &transform.Config{DataStreams: true})
outputEvents := test.Metricset.appendBeatEvents(&transform.Config{DataStreams: true}, nil)

for j, outputEvent := range outputEvents {
assert.Equal(t, test.Output[j], outputEvent.Fields, fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg))
Expand Down
16 changes: 7 additions & 9 deletions model/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package model

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -49,8 +48,9 @@ type PprofProfile struct {
Profile *profile.Profile
}

// Transform transforms a Profile into a sequence of beat.Events: one per profile sample.
func (pp PprofProfile) Transform(ctx context.Context, cfg *transform.Config) []beat.Event {
// appendBeatEvents transforms a Profile into a sequence of beat.Events (one per profile sample),
// and appends them to events.
func (pp PprofProfile) appendBeatEvents(cfg *transform.Config, events []beat.Event) []beat.Event {
// Precompute value field names for use in each event.
// TODO(axw) limit to well-known value names?
profileTimestamp := time.Unix(0, pp.Profile.TimeNanos)
Expand All @@ -77,9 +77,7 @@ func (pp PprofProfile) Transform(ctx context.Context, cfg *transform.Config) []b
// Profiles are stored in their own "metrics" data stream, with a data
// set per service. This enables managing retention of profiling data
// per-service, and indepedently of lower volume metrics.

samples := make([]beat.Event, len(pp.Profile.Sample))
for i, sample := range pp.Profile.Sample {
for _, sample := range pp.Profile.Sample {
profileFields := common.MapStr{}
if profileID != "" {
profileFields["id"] = profileID
Expand Down Expand Up @@ -141,12 +139,12 @@ func (pp PprofProfile) Transform(ctx context.Context, cfg *transform.Config) []b
}
}
pp.Metadata.set(&fields, profileLabels)
samples[i] = beat.Event{
events = append(events, beat.Event{
Timestamp: profileTimestamp,
Fields: common.MapStr(fields),
}
})
}
return samples
return events
}

func normalizeUnit(unit string) string {
Expand Down
3 changes: 2 additions & 1 deletion model/profile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ func TestPprofProfileTransform(t *testing.T) {
},
}

output := pp.Transform(context.Background(), &transform.Config{DataStreams: true})
batch := &model.Batch{Profiles: []*model.PprofProfile{&pp}}
output := batch.Transform(context.Background(), &transform.Config{DataStreams: true})
require.Len(t, output, 2)
assert.Equal(t, output[0], output[1])

Expand Down
6 changes: 3 additions & 3 deletions model/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (d *DestinationService) fields() common.MapStr {
return common.MapStr(fields)
}

func (e *Span) Transform(ctx context.Context, cfg *transform.Config) []beat.Event {
func (e *Span) appendBeatEvents(ctx context.Context, cfg *transform.Config, events []beat.Event) []beat.Event {
spanTransformations.Inc()
if frames := len(e.Stacktrace); frames > 0 {
spanStacktraceCounter.Inc()
Expand Down Expand Up @@ -239,10 +239,10 @@ func (e *Span) Transform(ctx context.Context, cfg *transform.Config) []beat.Even
}
common.MapStr(fields).Put("event.outcome", e.Outcome)

return []beat.Event{{
return append(events, beat.Event{
Fields: common.MapStr(fields),
Timestamp: e.Timestamp,
}}
})
}

func (e *Span) fields(ctx context.Context, cfg *transform.Config) common.MapStr {
Expand Down
4 changes: 2 additions & 2 deletions model/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,10 @@ func TestSpanTransform(t *testing.T) {
}

for _, test := range tests {
output := test.Span.Transform(context.Background(), &transform.Config{
output := test.Span.appendBeatEvents(context.Background(), &transform.Config{
DataStreams: true,
RUM: transform.RUMConfig{SourcemapStore: &sourcemap.Store{}},
})
}, nil)
fields := output[0].Fields
assert.Equal(t, test.Output, fields, test.Msg)
}
Expand Down
7 changes: 3 additions & 4 deletions model/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package model

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -111,7 +110,7 @@ func (e *Transaction) fields() common.MapStr {
return common.MapStr(fields)
}

func (e *Transaction) Transform(_ context.Context, cfg *transform.Config) []beat.Event {
func (e *Transaction) appendBeatEvents(cfg *transform.Config, events []beat.Event) []beat.Event {
transactionTransformations.Inc()

fields := mapStr{
Expand Down Expand Up @@ -155,10 +154,10 @@ func (e *Transaction) Transform(_ context.Context, cfg *transform.Config) []beat
}
common.MapStr(fields).Put("event.outcome", e.Outcome)

return []beat.Event{{
return append(events, beat.Event{
Timestamp: e.Timestamp,
Fields: common.MapStr(fields),
}}
})
}

type TransactionMarks map[string]TransactionMark
Expand Down
13 changes: 6 additions & 7 deletions model/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package model

import (
"context"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -126,14 +125,14 @@ func TestTransactionTransform(t *testing.T) {
}

for idx, test := range tests {
output := test.Transaction.Transform(context.Background(), &transform.Config{})
output := test.Transaction.appendBeatEvents(&transform.Config{}, nil)
assert.Equal(t, test.Output, output[0].Fields["transaction"], fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg))
}
}

func TestTransactionTransformOutcome(t *testing.T) {
tx := Transaction{Outcome: "success"}
events := tx.Transform(context.Background(), &transform.Config{})
events := tx.appendBeatEvents(&transform.Config{}, nil)
require.Len(t, events, 1)
assert.Equal(t, common.MapStr{"outcome": "success"}, events[0].Fields["event"])
}
Expand Down Expand Up @@ -177,7 +176,7 @@ func TestEventsTransformWithMetadata(t *testing.T) {
Custom: common.MapStr{"foo.bar": "baz"},
Message: &Message{QueueName: "routeUser"},
}
events := txWithContext.Transform(context.Background(), &transform.Config{DataStreams: true})
events := txWithContext.appendBeatEvents(&transform.Config{DataStreams: true}, nil)
require.Len(t, events, 1)
assert.Equal(t, events[0].Fields, common.MapStr{
"data_stream.type": "traces",
Expand Down Expand Up @@ -229,7 +228,7 @@ func TestTransformTransactionHTTP(t *testing.T) {
tx := Transaction{
HTTP: &Http{Request: &request},
}
events := tx.Transform(context.Background(), &transform.Config{})
events := tx.appendBeatEvents(&transform.Config{}, nil)
require.Len(t, events, 1)
assert.Equal(t, common.MapStr{
"request": common.MapStr{
Expand Down Expand Up @@ -292,7 +291,7 @@ func TestTransactionTransformPage(t *testing.T) {
}

for idx, test := range tests {
output := test.Transaction.Transform(context.Background(), &transform.Config{})
output := test.Transaction.appendBeatEvents(&transform.Config{}, nil)
assert.Equal(t, test.Output, output[0].Fields["url"], fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg))
}
}
Expand Down Expand Up @@ -321,7 +320,7 @@ func TestTransactionTransformMarks(t *testing.T) {
}

for idx, test := range tests {
output := test.Transaction.Transform(context.Background(), &transform.Config{})
output := test.Transaction.appendBeatEvents(&transform.Config{}, nil)
marks, _ := output[0].Fields.GetValue("transaction.marks")
assert.Equal(t, test.Output, marks, fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg))
}
Expand Down

0 comments on commit 704e1d5

Please sign in to comment.