diff --git a/beater/beater.go b/beater/beater.go index fd413382396..ab25c4aa6c3 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -101,6 +101,7 @@ type beater struct { rawConfig *common.Config config *config.Config logger *logp.Logger + namespace string wrapRunServer func(RunServerFunc) RunServerFunc mutex sync.Mutex // guards stopServer and stopped @@ -120,14 +121,27 @@ func (bt *beater) Run(b *beat.Beat) error { // during startup. This might change when APM Server is included in Fleet reloadOnce.Do(func() { defer close(done) - // TODO(axw) config received from Fleet should be modified to set data_streams.enabled. + + integrationConfig, err := config.NewIntegrationConfig(ucfg.Config) + if err != nil { + bt.logger.Error("Could not parse integration configuration from Elastic Agent", err) + return + } + var cfg *config.Config - cfg, err = config.NewConfig(ucfg.Config, elasticsearchOutputConfig(b)) + apmServerCommonConfig := integrationConfig.APMServer + apmServerCommonConfig.Merge(common.MustNewConfigFrom(`{"data_streams.enabled": true}`)) + cfg, err = config.NewConfig(apmServerCommonConfig, elasticsearchOutputConfig(b)) if err != nil { - bt.logger.Warn("Could not parse configuration from Elastic Agent ", err) + bt.logger.Error("Could not parse apm-server configuration from Elastic Agent ", err) + return } + bt.config = cfg - bt.rawConfig = ucfg.Config + bt.rawConfig = apmServerCommonConfig + if integrationConfig.DataStream != nil { + bt.namespace = integrationConfig.DataStream.Namespace + } bt.logger.Info("Applying configuration from Elastic Agent... ") }) return err @@ -158,7 +172,7 @@ func (bt *beater) Run(b *beat.Beat) error { runServer = bt.wrapRunServer(runServer) } - publisher, err := newPublisher(b, bt.config, tracer) + publisher, err := newPublisher(b, bt.config, bt.namespace, tracer) if err != nil { return err } @@ -381,7 +395,7 @@ func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServ } } -func newPublisher(b *beat.Beat, cfg *config.Config, tracer *apm.Tracer) (*publish.Publisher, error) { +func newPublisher(b *beat.Beat, cfg *config.Config, namespace string, tracer *apm.Tracer) (*publish.Publisher, error) { transformConfig, err := newTransformConfig(b.Info, cfg) if err != nil { return nil, err @@ -389,6 +403,7 @@ func newPublisher(b *beat.Beat, cfg *config.Config, tracer *apm.Tracer) (*publis publisherConfig := &publish.PublisherConfig{ Info: b.Info, Pipeline: cfg.Pipeline, + Namespace: namespace, TransformConfig: transformConfig, } return publish.NewPublisher(b.Publisher, tracer, publisherConfig) diff --git a/beater/config/integration.go b/beater/config/integration.go new file mode 100644 index 00000000000..f31e835434b --- /dev/null +++ b/beater/config/integration.go @@ -0,0 +1,57 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package config + +import ( + "github.com/elastic/beats/v7/libbeat/common" +) + +func NewIntegrationConfig(rootConfig *common.Config) (*IntegrationConfig, error) { + config := &IntegrationConfig{ + DataStream: &DataStream{ + Namespace: "default", + }, + } + err := rootConfig.Unpack(config) + return config, err +} + +// IntegrationConfig that comes from Elastic Agent +type IntegrationConfig struct { + ID string `config:"id"` + Name string `config:"name"` + Revision int `config:"revision"` + Type string `config:"type"` + UseOutput string `config:"use_output"` + Meta *Meta `config:"meta"` + DataStream *DataStream `config:"data_stream"` + APMServer *common.Config `config:"apm-server"` +} + +type DataStream struct { + Namespace string `config:"namespace"` +} + +type Meta struct { + Package *Package `config:"package"` +} + +type Package struct { + Name string `config:"name"` + Version string `config:"version"` +} diff --git a/publish/pub.go b/publish/pub.go index 16b04f7759a..84733ad3031 100644 --- a/publish/pub.go +++ b/publish/pub.go @@ -62,6 +62,7 @@ type PendingReq struct { type PublisherConfig struct { Info beat.Info Pipeline string + Namespace string Processor beat.ProcessorList TransformConfig *transform.Config } @@ -100,7 +101,7 @@ func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer, cfg *PublisherConf Processor: cfg.Processor, } if cfg.TransformConfig.DataStreams { - processingCfg.Fields[datastreams.NamespaceField] = "default" + processingCfg.Fields[datastreams.NamespaceField] = cfg.Namespace } if cfg.Pipeline != "" { processingCfg.Meta = map[string]interface{}{"pipeline": cfg.Pipeline} diff --git a/systemtest/approvals/TestDataStreamsEnabled/true.approved.json b/systemtest/approvals/TestDataStreamsEnabled/true.approved.json index c9a6191a011..c648c1bfbc5 100644 --- a/systemtest/approvals/TestDataStreamsEnabled/true.approved.json +++ b/systemtest/approvals/TestDataStreamsEnabled/true.approved.json @@ -7,7 +7,7 @@ "version": "0.0.0" }, "data_stream.dataset": "apm.systemtest", - "data_stream.namespace": "default", + "data_stream.namespace": "", "data_stream.type": "traces", "ecs": { "version": "dynamic"