Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only add data_stream.* fields when enabled #4461

Merged
merged 2 commits into from
Nov 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 1 addition & 20 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,8 @@ import (
"github.com/elastic/beats/v7/libbeat/instrumentation"
"github.com/elastic/beats/v7/libbeat/logp"
esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/processors"

"github.com/elastic/apm-server/beater/config"
"github.com/elastic/apm-server/datastreams"
"github.com/elastic/apm-server/elasticsearch"
"github.com/elastic/apm-server/ingest/pipeline"
logs "github.com/elastic/apm-server/log"
Expand Down Expand Up @@ -372,29 +370,12 @@ func newPublisher(b *beat.Beat, cfg *config.Config, tracer *apm.Tracer) (*publis
Pipeline: cfg.Pipeline,
TransformConfig: transformConfig,
}
if !cfg.DataStreams.Enabled {
// Remove data_stream.* fields during publishing when data streams are disabled.
processors, err := processors.New(processors.PluginConfig{common.MustNewConfigFrom(
map[string]interface{}{
"drop_fields": map[string]interface{}{
"fields": []interface{}{
datastreams.TypeField,
datastreams.DatasetField,
datastreams.NamespaceField,
},
},
},
)})
if err != nil {
return nil, err
}
publisherConfig.Processor = processors
}
return publish.NewPublisher(b.Publisher, tracer, publisherConfig)
}

func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Config, error) {
transformConfig := &transform.Config{
DataStreams: cfg.DataStreams.Enabled,
RUM: transform.RUMConfig{
LibraryPattern: regexp.MustCompile(cfg.RumConfig.LibraryPattern),
ExcludeFromGrouping: regexp.MustCompile(cfg.RumConfig.ExcludeFromGrouping),
Expand Down
17 changes: 9 additions & 8 deletions model/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,20 @@ func (e *Error) Transform(ctx context.Context, cfg *transform.Config) []beat.Eve
addStacktraceCounter(e.Log.Stacktrace)
}

// Errors are stored in an APM errors-specific "logs" data stream, per service.
// By storing errors in a "logs" data stream, they can be viewed in the Logs app
// in Kibana.
dataset := fmt.Sprintf("apm.error.%s", datastreams.NormalizeServiceName(e.Metadata.Service.Name))

fields := common.MapStr{
datastreams.TypeField: datastreams.LogsType,
datastreams.DatasetField: dataset,

"error": e.fields(ctx, cfg),
"processor": errorProcessorEntry,
}

if cfg.DataStreams {
// Errors are stored in an APM errors-specific "logs" data stream, per service.
// By storing errors in a "logs" data stream, they can be viewed in the Logs app
// in Kibana.
dataset := fmt.Sprintf("apm.error.%s", datastreams.NormalizeServiceName(e.Metadata.Service.Name))
fields[datastreams.TypeField] = datastreams.LogsType
fields[datastreams.DatasetField] = dataset
}

// first set the generic metadata (order is relevant)
e.Metadata.Set(fields)
utility.Set(fields, "source", fields["client"])
Expand Down
3 changes: 2 additions & 1 deletion model/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,8 @@ func TestEvents(t *testing.T) {
} {
t.Run(name, func(t *testing.T) {
outputEvents := tc.Transformable.Transform(context.Background(), &transform.Config{
RUM: transform.RUMConfig{SourcemapStore: &sourcemap.Store{}},
DataStreams: true,
RUM: transform.RUMConfig{SourcemapStore: &sourcemap.Store{}},
})
require.Len(t, outputEvents, 1)
outputEvent := outputEvents[0]
Expand Down
28 changes: 15 additions & 13 deletions model/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ type MetricsetSpan struct {
DestinationService DestinationService
}

func (me *Metricset) Transform(ctx context.Context, _ *transform.Config) []beat.Event {
func (me *Metricset) Transform(ctx context.Context, cfg *transform.Config) []beat.Event {
metricsetTransformations.Inc()
if me == nil {
return nil
Expand Down Expand Up @@ -185,19 +185,21 @@ func (me *Metricset) Transform(ctx context.Context, _ *transform.Config) []beat.
fields["timeseries"] = common.MapStr{"instance": me.TimeseriesInstanceID}
}

// Metrics are stored in "metrics" data streams.
dataset := "apm."
if isInternal {
// Metrics that include well-defined transaction/span fields
// (i.e. breakdown metrics, transaction and span metrics) will
// be stored separately from application and runtime metrics.
dataset += "internal."
}
dataset += datastreams.NormalizeServiceName(me.Metadata.Service.Name)

fields["processor"] = metricsetProcessorEntry
fields[datastreams.TypeField] = datastreams.MetricsType
fields[datastreams.DatasetField] = dataset

if cfg.DataStreams {
// Metrics are stored in "metrics" data streams.
dataset := "apm."
if isInternal {
// Metrics that include well-defined transaction/span fields
// (i.e. breakdown metrics, transaction and span metrics) will
// be stored separately from application and runtime metrics.
dataset = "apm.internal."
}
dataset += datastreams.NormalizeServiceName(me.Metadata.Service.Name)
fields[datastreams.TypeField] = datastreams.MetricsType
fields[datastreams.DatasetField] = dataset
}

return []beat.Event{{
Fields: fields,
Expand Down
2 changes: 1 addition & 1 deletion model/metricset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func TestTransform(t *testing.T) {
}

for idx, test := range tests {
outputEvents := test.Metricset.Transform(context.Background(), &transform.Config{})
outputEvents := test.Metricset.Transform(context.Background(), &transform.Config{DataStreams: true})

for j, outputEvent := range outputEvents {
assert.Equal(t, test.Output[j], outputEvent.Fields, fmt.Sprintf("Failed at idx %v; %s", idx, test.Msg))
Expand Down
17 changes: 11 additions & 6 deletions model/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type PprofProfile struct {
}

// Transform transforms a Profile into a sequence of beat.Events: one per profile sample.
func (pp PprofProfile) Transform(ctx context.Context, _ *transform.Config) []beat.Event {
func (pp PprofProfile) Transform(ctx context.Context, cfg *transform.Config) []beat.Event {
// Precompute value field names for use in each event.
// TODO(axw) limit to well-known value names?
profileTimestamp := time.Unix(0, pp.Profile.TimeNanos)
Expand All @@ -70,7 +70,10 @@ func (pp PprofProfile) Transform(ctx context.Context, _ *transform.Config) []bea
// Profiles are stored in their own "metrics" data stream, with a data
// set per service. This enables managing retention of profiling data
// per-service, and indepedently of lower volume metrics.
dataset := fmt.Sprintf("apm.profiling.%s", datastreams.NormalizeServiceName(pp.Metadata.Service.Name))
var dataset string
if cfg.DataStreams {
dataset = fmt.Sprintf("apm.profiling.%s", datastreams.NormalizeServiceName(pp.Metadata.Service.Name))
}

samples := make([]beat.Event, len(pp.Profile.Sample))
for i, sample := range pp.Profile.Sample {
Expand Down Expand Up @@ -122,12 +125,14 @@ func (pp PprofProfile) Transform(ctx context.Context, _ *transform.Config) []bea
event := beat.Event{
Timestamp: profileTimestamp,
Fields: common.MapStr{
datastreams.TypeField: datastreams.MetricsType,
datastreams.DatasetField: dataset,
"processor": profileProcessorEntry,
profileDocType: profileFields,
"processor": profileProcessorEntry,
profileDocType: profileFields,
},
}
if cfg.DataStreams {
event.Fields[datastreams.TypeField] = datastreams.MetricsType
event.Fields[datastreams.DatasetField] = dataset
}
pp.Metadata.Set(event.Fields)
if len(sample.Label) > 0 {
labels := make(common.MapStr)
Expand Down
2 changes: 1 addition & 1 deletion model/profile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestPprofProfileTransform(t *testing.T) {
},
}

output := pp.Transform(context.Background(), &transform.Config{})
output := pp.Transform(context.Background(), &transform.Config{DataStreams: true})
require.Len(t, output, 2)
assert.Equal(t, output[0], output[1])

Expand Down
13 changes: 7 additions & 6 deletions model/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,17 +191,18 @@ func (e *Span) Transform(ctx context.Context, cfg *transform.Config) []beat.Even
spanFrameCounter.Add(int64(frames))
}

// Spans are stored in a "traces" data stream along with transactions.
dataset := fmt.Sprintf("apm.%s", datastreams.NormalizeServiceName(e.Metadata.Service.Name))

fields := common.MapStr{
datastreams.TypeField: datastreams.TracesType,
datastreams.DatasetField: dataset,

"processor": spanProcessorEntry,
spanDocType: e.fields(ctx, cfg),
}

if cfg.DataStreams {
// Spans are stored in a "traces" data stream along with transactions.
dataset := fmt.Sprintf("apm.%s", datastreams.NormalizeServiceName(e.Metadata.Service.Name))
fields[datastreams.TypeField] = datastreams.TracesType
fields[datastreams.DatasetField] = dataset
}

// first set the generic metadata
e.Metadata.Set(fields)

Expand Down
3 changes: 2 additions & 1 deletion model/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ func TestSpanTransform(t *testing.T) {

for _, test := range tests {
output := test.Span.Transform(context.Background(), &transform.Config{
RUM: transform.RUMConfig{SourcemapStore: &sourcemap.Store{}},
DataStreams: true,
RUM: transform.RUMConfig{SourcemapStore: &sourcemap.Store{}},
})
fields := output[0].Fields
assert.Equal(t, test.Output, fields, test.Msg)
Expand Down
15 changes: 8 additions & 7 deletions model/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,21 @@ func (e *Transaction) fields() common.MapStr {
return common.MapStr(fields)
}

func (e *Transaction) Transform(_ context.Context, _ *transform.Config) []beat.Event {
func (e *Transaction) Transform(_ context.Context, cfg *transform.Config) []beat.Event {
transactionTransformations.Inc()

// Transactions are stored in a "traces" data stream along with spans.
dataset := fmt.Sprintf("apm.%s", datastreams.NormalizeServiceName(e.Metadata.Service.Name))

fields := common.MapStr{
datastreams.TypeField: datastreams.TracesType,
datastreams.DatasetField: dataset,

"processor": transactionProcessorEntry,
transactionDocType: e.fields(),
}

if cfg.DataStreams {
// Transactions are stored in a "traces" data stream along with spans.
dataset := fmt.Sprintf("apm.%s", datastreams.NormalizeServiceName(e.Metadata.Service.Name))
fields[datastreams.TypeField] = datastreams.TracesType
fields[datastreams.DatasetField] = dataset
}

// first set generic metadata (order is relevant)
e.Metadata.Set(fields)
utility.Set(fields, "source", fields["client"])
Expand Down
2 changes: 1 addition & 1 deletion model/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func TestEventsTransformWithMetadata(t *testing.T) {
Custom: &Custom{"foo": "bar"},
Message: &Message{QueueName: tests.StringPtr("routeUser")},
}
events := txWithContext.Transform(context.Background(), &transform.Config{})
events := txWithContext.Transform(context.Background(), &transform.Config{DataStreams: true})
require.Len(t, events, 1)
assert.Equal(t, events[0].Fields, common.MapStr{
"data_stream.type": "traces",
Expand Down
2 changes: 1 addition & 1 deletion processor/otel/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func testAttributeStringValue(s string) *tracepb.AttributeValue {
func transformAll(ctx context.Context, p publish.PendingReq) []beat.Event {
var events []beat.Event
for _, transformable := range p.Transformables {
events = append(events, transformable.Transform(ctx, &transform.Config{})...)
events = append(events, transformable.Transform(ctx, &transform.Config{DataStreams: true})...)
}
return events
}
Expand Down
2 changes: 1 addition & 1 deletion processor/stream/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func makeApproveEventsReporter(t *testing.T, name string) publish.Reporter {
return func(ctx context.Context, p publish.PendingReq) error {
var events []beat.Event
for _, transformable := range p.Transformables {
events = append(events, transformable.Transform(ctx, &transform.Config{})...)
events = append(events, transformable.Transform(ctx, &transform.Config{DataStreams: true})...)
}
docs := beatertest.EncodeEventDocs(events...)
approvaltest.ApproveEventDocs(t, name, docs)
Expand Down
4 changes: 3 additions & 1 deletion publish/pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer, cfg *PublisherConf

processingCfg := beat.ProcessingConfig{
Fields: common.MapStr{
datastreams.NamespaceField: "default",
"observer": common.MapStr{
"type": cfg.Info.Beat,
"hostname": cfg.Info.Hostname,
Expand All @@ -100,6 +99,9 @@ func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer, cfg *PublisherConf
},
Processor: cfg.Processor,
}
if cfg.TransformConfig.DataStreams {
processingCfg.Fields[datastreams.NamespaceField] = "default"
}
if cfg.Pipeline != "" {
processingCfg.Meta = map[string]interface{}{"pipeline": cfg.Pipeline}
}
Expand Down
8 changes: 0 additions & 8 deletions testdata/jaeger/batch_0.approved.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
"name": "Jaeger/Go",
"version": "2.20.1"
},
"data_stream.dataset": "apm.driver",
"data_stream.type": "traces",
"event": {
"outcome": "success"
},
Expand Down Expand Up @@ -62,8 +60,6 @@
"name": "Jaeger/Go",
"version": "2.20.1"
},
"data_stream.dataset": "apm.error.driver",
"data_stream.type": "logs",
"error": {
"exception": [
{
Expand Down Expand Up @@ -114,8 +110,6 @@
"name": "Jaeger/Go",
"version": "2.20.1"
},
"data_stream.dataset": "apm.error.driver",
"data_stream.type": "logs",
"error": {
"exception": [
{
Expand Down Expand Up @@ -166,8 +160,6 @@
"name": "Jaeger/Go",
"version": "2.20.1"
},
"data_stream.dataset": "apm.error.driver",
"data_stream.type": "logs",
"error": {
"exception": [
{
Expand Down
Loading