Skip to content

Commit

Permalink
Remove ingest pipeline registration (#6575)
Browse files Browse the repository at this point in the history
Also, stop blocking indexing of logs when data
streams are disabled. Along with the removal of
explicitly named pipelines, this allows us to
simplify the publisher package.
  • Loading branch information
axw authored Nov 15, 2021
1 parent 1c6db52 commit 642aaee
Show file tree
Hide file tree
Showing 22 changed files with 42 additions and 1,024 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ apm-server.yml apm-server.docker.yml: $(MAGE) magefile.go _meta/beat.yml

.PHONY: go-generate
go-generate:
@$(GO) generate . ./ingest/pipeline
@$(GO) generate .

notice: NOTICE.txt
NOTICE.txt: $(PYTHON) go.mod tools/go.mod
Expand Down
21 changes: 0 additions & 21 deletions _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,6 @@ apm-server:
# Url to expose expvar.
#url: "/debug/vars"

# A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch.
# Using pipelines involves two steps:
# (1) registering a pipeline
# (2) applying a pipeline during data ingestion (see `output.elasticsearch.pipeline`)
#
# You can manually register a pipeline, or use this configuration option to ensure
# the pipeline is loaded and registered at the configured Elasticsearch instances.
# Find the default pipeline configuration at `ingest/pipeline/definition.json`.
# Automatic pipeline registration requires the `output.elasticsearch` to be enabled and configured.
#register.ingest.pipeline:
# Registers APM pipeline definition in Elasticsearch on APM Server startup. Defaults to true.
#enabled: true
# Overwrites existing APM pipeline definition in Elasticsearch. Defaults to false.
#overwrite: false


#---------------------------- APM Server - Secure Communication with Agents ----------------------------

Expand Down Expand Up @@ -497,12 +482,6 @@ output.elasticsearch:
# when.contains:
# processor.event: "onboarding"

# A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch.
# APM Server comes with a default pipeline definition, located at `ingest/pipeline/definition.json`, which is
# loaded to Elasticsearch by default (see `apm-server.register.ingest.pipeline`).
# APM pipeline is enabled by default. To disable it, set `pipeline: _none`.
#pipeline: "apm"

# Optional HTTP Path.
#path: "/elasticsearch"

Expand Down
21 changes: 0 additions & 21 deletions apm-server.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,6 @@ apm-server:
# Url to expose expvar.
#url: "/debug/vars"

# A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch.
# Using pipelines involves two steps:
# (1) registering a pipeline
# (2) applying a pipeline during data ingestion (see `output.elasticsearch.pipeline`)
#
# You can manually register a pipeline, or use this configuration option to ensure
# the pipeline is loaded and registered at the configured Elasticsearch instances.
# Find the default pipeline configuration at `ingest/pipeline/definition.json`.
# Automatic pipeline registration requires the `output.elasticsearch` to be enabled and configured.
#register.ingest.pipeline:
# Registers APM pipeline definition in Elasticsearch on APM Server startup. Defaults to true.
#enabled: true
# Overwrites existing APM pipeline definition in Elasticsearch. Defaults to false.
#overwrite: false


#---------------------------- APM Server - Secure Communication with Agents ----------------------------

Expand Down Expand Up @@ -497,12 +482,6 @@ output.elasticsearch:
# when.contains:
# processor.event: "onboarding"

# A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch.
# APM Server comes with a default pipeline definition, located at `ingest/pipeline/definition.json`, which is
# loaded to Elasticsearch by default (see `apm-server.register.ingest.pipeline`).
# APM pipeline is enabled by default. To disable it, set `pipeline: _none`.
#pipeline: "apm"

# Optional HTTP Path.
#path: "/elasticsearch"

Expand Down
21 changes: 0 additions & 21 deletions apm-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,6 @@ apm-server:
# Url to expose expvar.
#url: "/debug/vars"

# A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch.
# Using pipelines involves two steps:
# (1) registering a pipeline
# (2) applying a pipeline during data ingestion (see `output.elasticsearch.pipeline`)
#
# You can manually register a pipeline, or use this configuration option to ensure
# the pipeline is loaded and registered at the configured Elasticsearch instances.
# Find the default pipeline configuration at `ingest/pipeline/definition.json`.
# Automatic pipeline registration requires the `output.elasticsearch` to be enabled and configured.
#register.ingest.pipeline:
# Registers APM pipeline definition in Elasticsearch on APM Server startup. Defaults to true.
#enabled: true
# Overwrites existing APM pipeline definition in Elasticsearch. Defaults to false.
#overwrite: false


#---------------------------- APM Server - Secure Communication with Agents ----------------------------

Expand Down Expand Up @@ -497,12 +482,6 @@ output.elasticsearch:
# when.contains:
# processor.event: "onboarding"

# A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch.
# APM Server comes with a default pipeline definition, located at `ingest/pipeline/definition.json`, which is
# loaded to Elasticsearch by default (see `apm-server.register.ingest.pipeline`).
# APM pipeline is enabled by default. To disable it, set `pipeline: _none`.
#pipeline: "apm"

# Optional HTTP Path.
#path: "/elasticsearch"

Expand Down
2 changes: 1 addition & 1 deletion apmpackage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ Most of the work here is done in `beats/x-pack/elastic-agent`
# tar and compress
cp build/fields/fields.yml .
tar cvf apm-server-<stack-version>-<platform>.tar apm-server LICENSE.txt NOTICE.txt README.md apm-server.yml ingest fields.yml
tar cvf apm-server-<stack-version>-<platform>.tar apm-server LICENSE.txt NOTICE.txt README.md apm-server.yml fields.yml
gzip apm-server-<stack-version>-<platform>.tar
sha512sum apm-server-<stack-version>-<platform>.tar.gz | tee apm-server-<stack-version>-<platform>.tar.gz.sha512
Expand Down
91 changes: 1 addition & 90 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,10 @@ import (
"github.com/elastic/beats/v7/libbeat/licenser"
"github.com/elastic/beats/v7/libbeat/logp"
esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"

"github.com/elastic/apm-server/beater/config"
"github.com/elastic/apm-server/elasticsearch"
"github.com/elastic/apm-server/ingest/pipeline"
"github.com/elastic/apm-server/kibana"
logs "github.com/elastic/apm-server/log"
"github.com/elastic/apm-server/model"
Expand Down Expand Up @@ -131,7 +129,6 @@ func NewCreator(args CreatorParams) beat.Creator {
b.OutputConfigReloader = bt.outputConfigReloader
}

bt.registerPipelineSetupCallback(b)
return bt, nil
}
}
Expand Down Expand Up @@ -421,17 +418,6 @@ func (s *serverRunner) run(listener net.Listener) error {
// Send config to telemetry.
recordAPMServerConfig(s.config)

publisherConfig := publish.PublisherConfig{Pipeline: s.config.Pipeline}
if !s.config.DataStreams.Enabled {
// Logs are only supported with data streams;
// add a beat.Processor which drops them.
dropLogsProcessor, err := newDropLogsBeatProcessor()
if err != nil {
return err
}
publisherConfig.Processor = dropLogsProcessor
}

var kibanaClient kibana.Client
if s.config.Kibana.Enabled {
kibanaClient = kibana.NewConnectingClient(&s.config.Kibana)
Expand Down Expand Up @@ -483,16 +469,6 @@ func (s *serverRunner) run(listener net.Listener) error {
})
}

// Register a libbeat elasticsearch output connect callback which
// ensures the pipeline is installed. The callback does nothing
// when data streams are in use.
pipelineCallback := newPipelineElasticsearchConnectCallback(s.config)
callbackUUID, err = esoutput.RegisterConnectCallback(pipelineCallback)
if err != nil {
return err
}
defer esoutput.DeregisterConnectCallback(callbackUUID)

var sourcemapFetcher sourcemap.Fetcher
if s.config.RumConfig.Enabled && s.config.RumConfig.SourceMapping.Enabled {
fetcher, err := newSourcemapFetcher(
Expand All @@ -517,7 +493,7 @@ func (s *serverRunner) run(listener net.Listener) error {
// be closed at shutdown time.
s.acker.Open()
pipeline := pipetool.WithACKer(s.pipeline, s.acker)
publisher, err := publish.NewPublisher(pipeline, s.tracer, publisherConfig)
publisher, err := publish.NewPublisher(pipeline, s.tracer)
if err != nil {
return err
}
Expand Down Expand Up @@ -735,57 +711,6 @@ func hasElasticsearchOutput(b *beat.Beat) bool {
return b.Config != nil && b.Config.Output.Name() == "elasticsearch"
}

// registerPipelineCallback registers a callback which is invoked when
// `setup --pipelines` is called, to either register pipelines or return
// an error depending on the configuration.
func (bt *beater) registerPipelineSetupCallback(b *beat.Beat) {
if !hasElasticsearchOutput(b) {
bt.logger.Info("Output is not Elasticsearch: pipeline registration disabled")
return
}

if bt.config.DataStreams.Enabled {
bt.logger.Info("Data streams enabled: pipeline registration disabled")
b.OverwritePipelinesCallback = func(esConfig *common.Config) error {
return errors.New("index pipeline setup must be performed externally when using data streams, by installing the 'apm' integration package")
}
return
}

if !bt.config.Register.Ingest.Pipeline.Enabled {
bt.logger.Info("Pipeline registration disabled")
return
}

bt.logger.Info("Registering pipeline callback")
overwrite := bt.config.Register.Ingest.Pipeline.Overwrite
path := bt.config.Register.Ingest.Pipeline.Path

// ensure setup cmd is working properly
b.OverwritePipelinesCallback = func(esConfig *common.Config) error {
conn, err := eslegclient.NewConnectedClient(esConfig, b.Info.Beat)
if err != nil {
return err
}
return pipeline.RegisterPipelines(conn, overwrite, path)
}
}

// newPipelineElasticsearchConnectCallback returns an Elasticsearch connect
// callback that ensures the configured pipeline is installed, if configured
// to do so. If data streams are enabled, then pipeline registration is always
// disabled.
func newPipelineElasticsearchConnectCallback(cfg *config.Config) esoutput.ConnectCallback {
return func(conn *eslegclient.Connection) error {
if cfg.DataStreams.Enabled || !cfg.Register.Ingest.Pipeline.Enabled {
return nil
}
overwrite := cfg.Register.Ingest.Pipeline.Overwrite
path := cfg.Register.Ingest.Pipeline.Path
return pipeline.RegisterPipelines(conn, overwrite, path)
}
}

func initTracing(b *beat.Beat, cfg *config.Config, logger *logp.Logger) (*apm.Tracer, *tracerServer, error) {
tracer := b.Instrumentation.Tracer()
listener := b.Instrumentation.Listener()
Expand Down Expand Up @@ -897,20 +822,6 @@ func WrapRunServerWithProcessors(runServer RunServerFunc, processors ...model.Ba
}
}

func newDropLogsBeatProcessor() (beat.ProcessorList, error) {
return processors.New(processors.PluginConfig{
common.MustNewConfigFrom(map[string]interface{}{
"drop_event": map[string]interface{}{
"when": map[string]interface{}{
"contains": map[string]interface{}{
"processor.event": "log",
},
},
},
}),
})
}

// chanReloader implements libbeat/common/reload.Reloadable, converting
// Reload calls into requests send to a channel consumed by serve.
type chanReloader struct {
Expand Down
18 changes: 0 additions & 18 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,21 +343,3 @@ func TestFleetStoreUsed(t *testing.T) {

assert.True(t, called)
}

func Test_newDropLogsBeatProcessor(t *testing.T) {
dropLogsProcessor, err := newDropLogsBeatProcessor()
require.NoError(t, err)

event := beat.Event{
Timestamp: time.Now(),
Fields: common.MapStr{
"processor": common.MapStr{
"event": "log",
"name": "log",
},
},
}
result, err := dropLogsProcessor.Run(&event)
require.NoError(t, err)
require.Nil(t, result)
}
8 changes: 0 additions & 8 deletions beater/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ type Config struct {
Pprof PprofConfig `config:"pprof"`
AugmentEnabled bool `config:"capture_personal_data"`
RumConfig RumConfig `config:"rum"`
Register RegisterConfig `config:"register"`
Kibana KibanaConfig `config:"kibana"`
KibanaAgentConfig KibanaAgentConfig `config:"agent.config"`
Aggregation AggregationConfig `config:"aggregation"`
Expand All @@ -72,8 +71,6 @@ type Config struct {
DefaultServiceEnvironment string `config:"default_service_environment"`
JavaAttacherConfig JavaAttacherConfig `config:"java_attacher"`

Pipeline string

AgentConfigs []AgentConfig `config:"agent_config"`

// WaitReadyInterval holds the interval for checks when waiting for
Expand Down Expand Up @@ -116,9 +113,6 @@ func NewConfig(ucfg *common.Config, outputESCfg *common.Config) (*Config, error)
return nil, err
}

if c.DataStreams.Enabled || (outputESCfg != nil && (outputESCfg.HasField("pipeline") || outputESCfg.HasField("pipelines"))) {
c.Pipeline = ""
}
return c, nil
}

Expand All @@ -140,10 +134,8 @@ func DefaultConfig() *Config {
},
Pprof: PprofConfig{Enabled: false},
RumConfig: defaultRum(),
Register: defaultRegisterConfig(),
Kibana: defaultKibanaConfig(),
KibanaAgentConfig: defaultKibanaAgentConfig(),
Pipeline: defaultAPMPipeline,
Aggregation: defaultAggregationConfig(),
Sampling: defaultSamplingConfig(),
DataStreams: defaultDataStreamsConfig(),
Expand Down
19 changes: 0 additions & 19 deletions beater/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,21 +218,11 @@ func TestUnpackConfig(t *testing.T) {
LibraryPattern: "^custom",
ExcludeFromGrouping: "^grouping",
},
Register: RegisterConfig{
Ingest: IngestConfig{
Pipeline: PipelineConfig{
Enabled: true,
Overwrite: false,
Path: filepath.Join("tmp", "definition.json"),
},
},
},
Kibana: KibanaConfig{
Enabled: true,
ClientConfig: defaultDecodedKibanaClientConfig,
},
KibanaAgentConfig: KibanaAgentConfig{Cache: Cache{Expiration: 2 * time.Minute}},
Pipeline: defaultAPMPipeline,
Aggregation: AggregationConfig{
Transactions: TransactionAggregationConfig{
Interval: time.Second,
Expand Down Expand Up @@ -375,17 +365,8 @@ func TestUnpackConfig(t *testing.T) {
LibraryPattern: "rum",
ExcludeFromGrouping: "^/webpack",
},
Register: RegisterConfig{
Ingest: IngestConfig{
Pipeline: PipelineConfig{
Enabled: false,
Path: filepath.Join("ingest", "pipeline", "definition.json"),
},
},
},
Kibana: defaultKibanaConfig(),
KibanaAgentConfig: KibanaAgentConfig{Cache: Cache{Expiration: 30 * time.Second}},
Pipeline: defaultAPMPipeline,
Aggregation: AggregationConfig{
Transactions: TransactionAggregationConfig{
Interval: time.Minute,
Expand Down
Loading

0 comments on commit 642aaee

Please sign in to comment.