Skip to content

Commit

Permalink
Only add data_streams.* fields when enabled
Browse files Browse the repository at this point in the history
Only add data_streams.* fields when apm-server.data_streams.enabled
is true. This avoids allocations when the feature is disabled, which
will be the default for some time yet.
  • Loading branch information
axw committed Nov 25, 2020
1 parent 8feaead commit a0857f9
Show file tree
Hide file tree
Showing 17 changed files with 68 additions and 111 deletions.
21 changes: 1 addition & 20 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,8 @@ import (
"github.com/elastic/beats/v7/libbeat/instrumentation"
"github.com/elastic/beats/v7/libbeat/logp"
esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/processors"

"github.com/elastic/apm-server/beater/config"
"github.com/elastic/apm-server/datastreams"
"github.com/elastic/apm-server/elasticsearch"
"github.com/elastic/apm-server/ingest/pipeline"
logs "github.com/elastic/apm-server/log"
Expand Down Expand Up @@ -372,29 +370,12 @@ func newPublisher(b *beat.Beat, cfg *config.Config, tracer *apm.Tracer) (*publis
Pipeline: cfg.Pipeline,
TransformConfig: transformConfig,
}
if !cfg.DataStreams.Enabled {
// Remove data_stream.* fields during publishing when data streams are disabled.
processors, err := processors.New(processors.PluginConfig{common.MustNewConfigFrom(
map[string]interface{}{
"drop_fields": map[string]interface{}{
"fields": []interface{}{
datastreams.TypeField,
datastreams.DatasetField,
datastreams.NamespaceField,
},
},
},
)})
if err != nil {
return nil, err
}
publisherConfig.Processor = processors
}
return publish.NewPublisher(b.Publisher, tracer, publisherConfig)
}

func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Config, error) {
transformConfig := &transform.Config{
DataStreams: cfg.DataStreams.Enabled,
RUM: transform.RUMConfig{
LibraryPattern: regexp.MustCompile(cfg.RumConfig.LibraryPattern),
ExcludeFromGrouping: regexp.MustCompile(cfg.RumConfig.ExcludeFromGrouping),
Expand Down
17 changes: 9 additions & 8 deletions model/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,20 @@ func (e *Error) Transform(ctx context.Context, cfg *transform.Config) []beat.Eve
addStacktraceCounter(e.Log.Stacktrace)
}

// Errors are stored in an APM errors-specific "logs" data stream, per service.
// By storing errors in a "logs" data stream, they can be viewed in the Logs app
// in Kibana.
dataset := fmt.Sprintf("apm.error.%s", datastreams.NormalizeServiceName(e.Metadata.Service.Name))

fields := common.MapStr{
datastreams.TypeField: datastreams.LogsType,
datastreams.DatasetField: dataset,

"error": e.fields(ctx, cfg),
"processor": errorProcessorEntry,
}

if cfg.DataStreams {
// Errors are stored in an APM errors-specific "logs" data stream, per service.
// By storing errors in a "logs" data stream, they can be viewed in the Logs app
// in Kibana.
dataset := fmt.Sprintf("apm.error.%s", datastreams.NormalizeServiceName(e.Metadata.Service.Name))
fields[datastreams.TypeField] = datastreams.LogsType
fields[datastreams.DatasetField] = dataset
}

// first set the generic metadata (order is relevant)
e.Metadata.Set(fields)
utility.Set(fields, "source", fields["client"])
Expand Down
3 changes: 2 additions & 1 deletion model/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,8 @@ func TestEvents(t *testing.T) {
} {
t.Run(name, func(t *testing.T) {
outputEvents := tc.Transformable.Transform(context.Background(), &transform.Config{
RUM: transform.RUMConfig{SourcemapStore: &sourcemap.Store{}},
DataStreams: true,
RUM: transform.RUMConfig{SourcemapStore: &sourcemap.Store{}},
})
require.Len(t, outputEvents, 1)
outputEvent := outputEvents[0]
Expand Down
30 changes: 16 additions & 14 deletions model/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,13 @@ type MetricsetSpan struct {
DestinationService DestinationService
}

func (me *Metricset) Transform(ctx context.Context, _ *transform.Config) []beat.Event {
func (me *Metricset) Transform(ctx context.Context, cfg *transform.Config) []beat.Event {
metricsetTransformations.Inc()
if me == nil {
return nil
}

fields := common.MapStr{}
fields := make(common.MapStr, 5)
for _, sample := range me.Samples {
if err := sample.set(fields); err != nil {
logp.NewLogger(logs.Transform).Warnf("failed to transform sample %#v", sample)
Expand Down Expand Up @@ -185,19 +185,21 @@ func (me *Metricset) Transform(ctx context.Context, _ *transform.Config) []beat.
fields["timeseries"] = common.MapStr{"instance": me.TimeseriesInstanceID}
}

// Metrics are stored in "metrics" data streams.
dataset := "apm."
if isInternal {
// Metrics that include well-defined transaction/span fields
// (i.e. breakdown metrics, transaction and span metrics) will
// be stored separately from application and runtime metrics.
dataset += "internal."
}
dataset += datastreams.NormalizeServiceName(me.Metadata.Service.Name)

fields["processor"] = metricsetProcessorEntry
fields[datastreams.TypeField] = datastreams.MetricsType
fields[datastreams.DatasetField] = dataset

if cfg.DataStreams {
// Metrics are stored in "metrics" data streams.
dataset := "apm."
if isInternal {
// Metrics that include well-defined transaction/span fields
// (i.e. breakdown metrics, transaction and span metrics) will
// be stored separately from application and runtime metrics.
dataset = "apm.internal."
}
dataset += datastreams.NormalizeServiceName(me.Metadata.Service.Name)
fields[datastreams.TypeField] = datastreams.MetricsType
fields[datastreams.DatasetField] = dataset
}

return []beat.Event{{
Fields: fields,
Expand Down
2 changes: 1 addition & 1 deletion model/metricset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func TestTransform(t *testing.T) {
}

for idx, test := range tests {
outputEvents := test.Metricset.Transform(context.Background(), &transform.Config{})
outputEvents := test.Metricset.Transform(context.Background(), &transform.Config{DataStreams: true})

for j, outputEvent := range outputEvents {
assert.Equal(t, test.Output[j], outputEvent.Fields, fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg))
Expand Down
17 changes: 11 additions & 6 deletions model/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type PprofProfile struct {
}

// Transform transforms a Profile into a sequence of beat.Events: one per profile sample.
func (pp PprofProfile) Transform(ctx context.Context, _ *transform.Config) []beat.Event {
func (pp PprofProfile) Transform(ctx context.Context, cfg *transform.Config) []beat.Event {
// Precompute value field names for use in each event.
// TODO(axw) limit to well-known value names?
profileTimestamp := time.Unix(0, pp.Profile.TimeNanos)
Expand All @@ -70,7 +70,10 @@ func (pp PprofProfile) Transform(ctx context.Context, _ *transform.Config) []bea
// Profiles are stored in their own "metrics" data stream, with a data
// set per service. This enables managing retention of profiling data
// per-service, and indepedently of lower volume metrics.
dataset := fmt.Sprintf("apm.profiling.%s", datastreams.NormalizeServiceName(pp.Metadata.Service.Name))
var dataset string
if cfg.DataStreams {
dataset = fmt.Sprintf("apm.profiling.%s", datastreams.NormalizeServiceName(pp.Metadata.Service.Name))
}

samples := make([]beat.Event, len(pp.Profile.Sample))
for i, sample := range pp.Profile.Sample {
Expand Down Expand Up @@ -122,12 +125,14 @@ func (pp PprofProfile) Transform(ctx context.Context, _ *transform.Config) []bea
event := beat.Event{
Timestamp: profileTimestamp,
Fields: common.MapStr{
datastreams.TypeField: datastreams.MetricsType,
datastreams.DatasetField: dataset,
"processor": profileProcessorEntry,
profileDocType: profileFields,
"processor": profileProcessorEntry,
profileDocType: profileFields,
},
}
if cfg.DataStreams {
event.Fields[datastreams.TypeField] = datastreams.MetricsType
event.Fields[datastreams.DatasetField] = dataset
}
pp.Metadata.Set(event.Fields)
if len(sample.Label) > 0 {
labels := make(common.MapStr)
Expand Down
2 changes: 1 addition & 1 deletion model/profile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestPprofProfileTransform(t *testing.T) {
},
}

output := pp.Transform(context.Background(), &transform.Config{})
output := pp.Transform(context.Background(), &transform.Config{DataStreams: true})
require.Len(t, output, 2)
assert.Equal(t, output[0], output[1])

Expand Down
13 changes: 7 additions & 6 deletions model/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,17 +191,18 @@ func (e *Span) Transform(ctx context.Context, cfg *transform.Config) []beat.Even
spanFrameCounter.Add(int64(frames))
}

// Spans are stored in a "traces" data stream along with transactions.
dataset := fmt.Sprintf("apm.%s", datastreams.NormalizeServiceName(e.Metadata.Service.Name))

fields := common.MapStr{
datastreams.TypeField: datastreams.TracesType,
datastreams.DatasetField: dataset,

"processor": spanProcessorEntry,
spanDocType: e.fields(ctx, cfg),
}

if cfg.DataStreams {
// Spans are stored in a "traces" data stream along with transactions.
dataset := fmt.Sprintf("apm.%s", datastreams.NormalizeServiceName(e.Metadata.Service.Name))
fields[datastreams.TypeField] = datastreams.TracesType
fields[datastreams.DatasetField] = dataset
}

// first set the generic metadata
e.Metadata.Set(fields)

Expand Down
3 changes: 2 additions & 1 deletion model/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ func TestSpanTransform(t *testing.T) {

for _, test := range tests {
output := test.Span.Transform(context.Background(), &transform.Config{
RUM: transform.RUMConfig{SourcemapStore: &sourcemap.Store{}},
DataStreams: true,
RUM: transform.RUMConfig{SourcemapStore: &sourcemap.Store{}},
})
fields := output[0].Fields
assert.Equal(t, test.Output, fields, test.Msg)
Expand Down
15 changes: 8 additions & 7 deletions model/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,21 @@ func (e *Transaction) fields() common.MapStr {
return common.MapStr(fields)
}

func (e *Transaction) Transform(_ context.Context, _ *transform.Config) []beat.Event {
func (e *Transaction) Transform(_ context.Context, cfg *transform.Config) []beat.Event {
transactionTransformations.Inc()

// Transactions are stored in a "traces" data stream along with spans.
dataset := fmt.Sprintf("apm.%s", datastreams.NormalizeServiceName(e.Metadata.Service.Name))

fields := common.MapStr{
datastreams.TypeField: datastreams.TracesType,
datastreams.DatasetField: dataset,

"processor": transactionProcessorEntry,
transactionDocType: e.fields(),
}

if cfg.DataStreams {
// Transactions are stored in a "traces" data stream along with spans.
dataset := fmt.Sprintf("apm.%s", datastreams.NormalizeServiceName(e.Metadata.Service.Name))
fields[datastreams.TypeField] = datastreams.TracesType
fields[datastreams.DatasetField] = dataset
}

// first set generic metadata (order is relevant)
e.Metadata.Set(fields)
utility.Set(fields, "source", fields["client"])
Expand Down
2 changes: 1 addition & 1 deletion model/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func TestEventsTransformWithMetadata(t *testing.T) {
Custom: &Custom{"foo": "bar"},
Message: &Message{QueueName: tests.StringPtr("routeUser")},
}
events := txWithContext.Transform(context.Background(), &transform.Config{})
events := txWithContext.Transform(context.Background(), &transform.Config{DataStreams: true})
require.Len(t, events, 1)
assert.Equal(t, events[0].Fields, common.MapStr{
"data_stream.type": "traces",
Expand Down
2 changes: 1 addition & 1 deletion processor/otel/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func testAttributeStringValue(s string) *tracepb.AttributeValue {
func transformAll(ctx context.Context, p publish.PendingReq) []beat.Event {
var events []beat.Event
for _, transformable := range p.Transformables {
events = append(events, transformable.Transform(ctx, &transform.Config{})...)
events = append(events, transformable.Transform(ctx, &transform.Config{DataStreams: true})...)
}
return events
}
Expand Down
2 changes: 1 addition & 1 deletion processor/stream/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func makeApproveEventsReporter(t *testing.T, name string) publish.Reporter {
return func(ctx context.Context, p publish.PendingReq) error {
var events []beat.Event
for _, transformable := range p.Transformables {
events = append(events, transformable.Transform(ctx, &transform.Config{})...)
events = append(events, transformable.Transform(ctx, &transform.Config{DataStreams: true})...)
}
docs := beatertest.EncodeEventDocs(events...)
approvaltest.ApproveEventDocs(t, name, docs)
Expand Down
4 changes: 3 additions & 1 deletion publish/pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer, cfg *PublisherConf

processingCfg := beat.ProcessingConfig{
Fields: common.MapStr{
datastreams.NamespaceField: "default",
"observer": common.MapStr{
"type": cfg.Info.Beat,
"hostname": cfg.Info.Hostname,
Expand All @@ -100,6 +99,9 @@ func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer, cfg *PublisherConf
},
Processor: cfg.Processor,
}
if cfg.TransformConfig.DataStreams {
processingCfg.Fields[datastreams.NamespaceField] = "default"
}
if cfg.Pipeline != "" {
processingCfg.Meta = map[string]interface{}{"pipeline": cfg.Pipeline}
}
Expand Down
8 changes: 0 additions & 8 deletions testdata/jaeger/batch_0.approved.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
"name": "Jaeger/Go",
"version": "2.20.1"
},
"data_stream.dataset": "apm.driver",
"data_stream.type": "traces",
"event": {
"outcome": "success"
},
Expand Down Expand Up @@ -62,8 +60,6 @@
"name": "Jaeger/Go",
"version": "2.20.1"
},
"data_stream.dataset": "apm.error.driver",
"data_stream.type": "logs",
"error": {
"exception": [
{
Expand Down Expand Up @@ -114,8 +110,6 @@
"name": "Jaeger/Go",
"version": "2.20.1"
},
"data_stream.dataset": "apm.error.driver",
"data_stream.type": "logs",
"error": {
"exception": [
{
Expand Down Expand Up @@ -166,8 +160,6 @@
"name": "Jaeger/Go",
"version": "2.20.1"
},
"data_stream.dataset": "apm.error.driver",
"data_stream.type": "logs",
"error": {
"exception": [
{
Expand Down
Loading

0 comments on commit a0857f9

Please sign in to comment.