Skip to content

Commit

Permalink
Wire agent config (#4486) (#4538)
Browse files Browse the repository at this point in the history
Read configuration from Elastic Agent and use the namespace for the indices
  • Loading branch information
jalvz authored Dec 15, 2020
1 parent 6c551ce commit 4c687c6
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 8 deletions.
27 changes: 21 additions & 6 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type beater struct {
rawConfig *common.Config
config *config.Config
logger *logp.Logger
namespace string
wrapRunServer func(RunServerFunc) RunServerFunc

mutex sync.Mutex // guards stopServer and stopped
Expand All @@ -120,14 +121,27 @@ func (bt *beater) Run(b *beat.Beat) error {
// during startup. This might change when APM Server is included in Fleet
reloadOnce.Do(func() {
defer close(done)
// TODO(axw) config received from Fleet should be modified to set data_streams.enabled.

integrationConfig, err := config.NewIntegrationConfig(ucfg.Config)
if err != nil {
bt.logger.Error("Could not parse integration configuration from Elastic Agent", err)
return
}

var cfg *config.Config
cfg, err = config.NewConfig(ucfg.Config, elasticsearchOutputConfig(b))
apmServerCommonConfig := integrationConfig.APMServer
apmServerCommonConfig.Merge(common.MustNewConfigFrom(`{"data_streams.enabled": true}`))
cfg, err = config.NewConfig(apmServerCommonConfig, elasticsearchOutputConfig(b))
if err != nil {
bt.logger.Warn("Could not parse configuration from Elastic Agent ", err)
bt.logger.Error("Could not parse apm-server configuration from Elastic Agent ", err)
return
}

bt.config = cfg
bt.rawConfig = ucfg.Config
bt.rawConfig = apmServerCommonConfig
if integrationConfig.DataStream != nil {
bt.namespace = integrationConfig.DataStream.Namespace
}
bt.logger.Info("Applying configuration from Elastic Agent... ")
})
return err
Expand Down Expand Up @@ -158,7 +172,7 @@ func (bt *beater) Run(b *beat.Beat) error {
runServer = bt.wrapRunServer(runServer)
}

publisher, err := newPublisher(b, bt.config, tracer)
publisher, err := newPublisher(b, bt.config, bt.namespace, tracer)
if err != nil {
return err
}
Expand Down Expand Up @@ -381,14 +395,15 @@ func runServerWithTracerServer(runServer RunServerFunc, tracerServer *tracerServ
}
}

func newPublisher(b *beat.Beat, cfg *config.Config, tracer *apm.Tracer) (*publish.Publisher, error) {
func newPublisher(b *beat.Beat, cfg *config.Config, namespace string, tracer *apm.Tracer) (*publish.Publisher, error) {
transformConfig, err := newTransformConfig(b.Info, cfg)
if err != nil {
return nil, err
}
publisherConfig := &publish.PublisherConfig{
Info: b.Info,
Pipeline: cfg.Pipeline,
Namespace: namespace,
TransformConfig: transformConfig,
}
return publish.NewPublisher(b.Publisher, tracer, publisherConfig)
Expand Down
57 changes: 57 additions & 0 deletions beater/config/integration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package config

import (
"github.com/elastic/beats/v7/libbeat/common"
)

func NewIntegrationConfig(rootConfig *common.Config) (*IntegrationConfig, error) {
config := &IntegrationConfig{
DataStream: &DataStream{
Namespace: "default",
},
}
err := rootConfig.Unpack(config)
return config, err
}

// IntegrationConfig that comes from Elastic Agent
type IntegrationConfig struct {
ID string `config:"id"`
Name string `config:"name"`
Revision int `config:"revision"`
Type string `config:"type"`
UseOutput string `config:"use_output"`
Meta *Meta `config:"meta"`
DataStream *DataStream `config:"data_stream"`
APMServer *common.Config `config:"apm-server"`
}

type DataStream struct {
Namespace string `config:"namespace"`
}

type Meta struct {
Package *Package `config:"package"`
}

type Package struct {
Name string `config:"name"`
Version string `config:"version"`
}
3 changes: 2 additions & 1 deletion publish/pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type PendingReq struct {
type PublisherConfig struct {
Info beat.Info
Pipeline string
Namespace string
Processor beat.ProcessorList
TransformConfig *transform.Config
}
Expand Down Expand Up @@ -100,7 +101,7 @@ func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer, cfg *PublisherConf
Processor: cfg.Processor,
}
if cfg.TransformConfig.DataStreams {
processingCfg.Fields[datastreams.NamespaceField] = "default"
processingCfg.Fields[datastreams.NamespaceField] = cfg.Namespace
}
if cfg.Pipeline != "" {
processingCfg.Meta = map[string]interface{}{"pipeline": cfg.Pipeline}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"version": "0.0.0"
},
"data_stream.dataset": "apm.systemtest",
"data_stream.namespace": "default",
"data_stream.namespace": "",
"data_stream.type": "traces",
"ecs": {
"version": "dynamic"
Expand Down

0 comments on commit 4c687c6

Please sign in to comment.