From 642aaee1c2b87618770a412aa99ccda57a237311 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Mon, 15 Nov 2021 16:23:32 +0800 Subject: [PATCH] Remove ingest pipeline registration (#6575) Also, stop blocking indexing of logs when data streams are disabled. Along with the removal of explicitly named pipelines, this allows us to simplify the publisher package. --- Makefile | 2 +- _meta/beat.yml | 21 ---- apm-server.docker.yml | 21 ---- apm-server.yml | 21 ---- apmpackage/README.md | 2 +- beater/beater.go | 91 +------------- beater/beater_test.go | 18 --- beater/config/config.go | 8 -- beater/config/config_test.go | 19 --- beater/config/data_streams_test.go | 33 ----- beater/config/register.go | 58 --------- beater/telemetry.go | 6 - beater/telemetry_test.go | 4 - changelogs/head.asciidoc | 1 + ingest/pipeline/definition.json | 194 ----------------------------- ingest/pipeline/definition.yml | 134 -------------------- ingest/pipeline/generate.go | 132 -------------------- ingest/pipeline/register.go | 75 ----------- ingest/pipeline/register_test.go | 76 ----------- magefile.go | 106 +++++----------- publish/pub.go | 40 ++---- publish/pub_test.go | 4 +- 22 files changed, 42 insertions(+), 1024 deletions(-) delete mode 100644 beater/config/data_streams_test.go delete mode 100644 beater/config/register.go delete mode 100644 ingest/pipeline/definition.json delete mode 100644 ingest/pipeline/definition.yml delete mode 100644 ingest/pipeline/generate.go delete mode 100644 ingest/pipeline/register.go delete mode 100644 ingest/pipeline/register_test.go diff --git a/Makefile b/Makefile index 229197769d1..4777b6d6173 100644 --- a/Makefile +++ b/Makefile @@ -100,7 +100,7 @@ apm-server.yml apm-server.docker.yml: $(MAGE) magefile.go _meta/beat.yml .PHONY: go-generate go-generate: - @$(GO) generate . ./ingest/pipeline + @$(GO) generate . notice: NOTICE.txt NOTICE.txt: $(PYTHON) go.mod tools/go.mod diff --git a/_meta/beat.yml b/_meta/beat.yml index 3bce17a3206..1c445145ff3 100644 --- a/_meta/beat.yml +++ b/_meta/beat.yml @@ -88,21 +88,6 @@ apm-server: # Url to expose expvar. #url: "/debug/vars" - # A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch. - # Using pipelines involves two steps: - # (1) registering a pipeline - # (2) applying a pipeline during data ingestion (see `output.elasticsearch.pipeline`) - # - # You can manually register a pipeline, or use this configuration option to ensure - # the pipeline is loaded and registered at the configured Elasticsearch instances. - # Find the default pipeline configuration at `ingest/pipeline/definition.json`. - # Automatic pipeline registration requires the `output.elasticsearch` to be enabled and configured. - #register.ingest.pipeline: - # Registers APM pipeline definition in Elasticsearch on APM Server startup. Defaults to true. - #enabled: true - # Overwrites existing APM pipeline definition in Elasticsearch. Defaults to false. - #overwrite: false - #---------------------------- APM Server - Secure Communication with Agents ---------------------------- @@ -497,12 +482,6 @@ output.elasticsearch: # when.contains: # processor.event: "onboarding" - # A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch. - # APM Server comes with a default pipeline definition, located at `ingest/pipeline/definition.json`, which is - # loaded to Elasticsearch by default (see `apm-server.register.ingest.pipeline`). - # APM pipeline is enabled by default. To disable it, set `pipeline: _none`. - #pipeline: "apm" - # Optional HTTP Path. #path: "/elasticsearch" diff --git a/apm-server.docker.yml b/apm-server.docker.yml index 9103bc41b36..91623715023 100644 --- a/apm-server.docker.yml +++ b/apm-server.docker.yml @@ -88,21 +88,6 @@ apm-server: # Url to expose expvar. #url: "/debug/vars" - # A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch. - # Using pipelines involves two steps: - # (1) registering a pipeline - # (2) applying a pipeline during data ingestion (see `output.elasticsearch.pipeline`) - # - # You can manually register a pipeline, or use this configuration option to ensure - # the pipeline is loaded and registered at the configured Elasticsearch instances. - # Find the default pipeline configuration at `ingest/pipeline/definition.json`. - # Automatic pipeline registration requires the `output.elasticsearch` to be enabled and configured. - #register.ingest.pipeline: - # Registers APM pipeline definition in Elasticsearch on APM Server startup. Defaults to true. - #enabled: true - # Overwrites existing APM pipeline definition in Elasticsearch. Defaults to false. - #overwrite: false - #---------------------------- APM Server - Secure Communication with Agents ---------------------------- @@ -497,12 +482,6 @@ output.elasticsearch: # when.contains: # processor.event: "onboarding" - # A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch. - # APM Server comes with a default pipeline definition, located at `ingest/pipeline/definition.json`, which is - # loaded to Elasticsearch by default (see `apm-server.register.ingest.pipeline`). - # APM pipeline is enabled by default. To disable it, set `pipeline: _none`. - #pipeline: "apm" - # Optional HTTP Path. #path: "/elasticsearch" diff --git a/apm-server.yml b/apm-server.yml index 720b986b042..5463b70a161 100644 --- a/apm-server.yml +++ b/apm-server.yml @@ -88,21 +88,6 @@ apm-server: # Url to expose expvar. #url: "/debug/vars" - # A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch. - # Using pipelines involves two steps: - # (1) registering a pipeline - # (2) applying a pipeline during data ingestion (see `output.elasticsearch.pipeline`) - # - # You can manually register a pipeline, or use this configuration option to ensure - # the pipeline is loaded and registered at the configured Elasticsearch instances. - # Find the default pipeline configuration at `ingest/pipeline/definition.json`. - # Automatic pipeline registration requires the `output.elasticsearch` to be enabled and configured. - #register.ingest.pipeline: - # Registers APM pipeline definition in Elasticsearch on APM Server startup. Defaults to true. - #enabled: true - # Overwrites existing APM pipeline definition in Elasticsearch. Defaults to false. - #overwrite: false - #---------------------------- APM Server - Secure Communication with Agents ---------------------------- @@ -497,12 +482,6 @@ output.elasticsearch: # when.contains: # processor.event: "onboarding" - # A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch. - # APM Server comes with a default pipeline definition, located at `ingest/pipeline/definition.json`, which is - # loaded to Elasticsearch by default (see `apm-server.register.ingest.pipeline`). - # APM pipeline is enabled by default. To disable it, set `pipeline: _none`. - #pipeline: "apm" - # Optional HTTP Path. #path: "/elasticsearch" diff --git a/apmpackage/README.md b/apmpackage/README.md index c969186cf37..a527711636a 100644 --- a/apmpackage/README.md +++ b/apmpackage/README.md @@ -95,7 +95,7 @@ Most of the work here is done in `beats/x-pack/elastic-agent` # tar and compress cp build/fields/fields.yml . - tar cvf apm-server--.tar apm-server LICENSE.txt NOTICE.txt README.md apm-server.yml ingest fields.yml + tar cvf apm-server--.tar apm-server LICENSE.txt NOTICE.txt README.md apm-server.yml fields.yml gzip apm-server--.tar sha512sum apm-server--.tar.gz | tee apm-server--.tar.gz.sha512 diff --git a/beater/beater.go b/beater/beater.go index 38cd5832548..ab0aa4b85ae 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -46,12 +46,10 @@ import ( "github.com/elastic/beats/v7/libbeat/licenser" "github.com/elastic/beats/v7/libbeat/logp" esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" - "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/elasticsearch" - "github.com/elastic/apm-server/ingest/pipeline" "github.com/elastic/apm-server/kibana" logs "github.com/elastic/apm-server/log" "github.com/elastic/apm-server/model" @@ -131,7 +129,6 @@ func NewCreator(args CreatorParams) beat.Creator { b.OutputConfigReloader = bt.outputConfigReloader } - bt.registerPipelineSetupCallback(b) return bt, nil } } @@ -421,17 +418,6 @@ func (s *serverRunner) run(listener net.Listener) error { // Send config to telemetry. recordAPMServerConfig(s.config) - publisherConfig := publish.PublisherConfig{Pipeline: s.config.Pipeline} - if !s.config.DataStreams.Enabled { - // Logs are only supported with data streams; - // add a beat.Processor which drops them. - dropLogsProcessor, err := newDropLogsBeatProcessor() - if err != nil { - return err - } - publisherConfig.Processor = dropLogsProcessor - } - var kibanaClient kibana.Client if s.config.Kibana.Enabled { kibanaClient = kibana.NewConnectingClient(&s.config.Kibana) @@ -483,16 +469,6 @@ func (s *serverRunner) run(listener net.Listener) error { }) } - // Register a libbeat elasticsearch output connect callback which - // ensures the pipeline is installed. The callback does nothing - // when data streams are in use. - pipelineCallback := newPipelineElasticsearchConnectCallback(s.config) - callbackUUID, err = esoutput.RegisterConnectCallback(pipelineCallback) - if err != nil { - return err - } - defer esoutput.DeregisterConnectCallback(callbackUUID) - var sourcemapFetcher sourcemap.Fetcher if s.config.RumConfig.Enabled && s.config.RumConfig.SourceMapping.Enabled { fetcher, err := newSourcemapFetcher( @@ -517,7 +493,7 @@ func (s *serverRunner) run(listener net.Listener) error { // be closed at shutdown time. s.acker.Open() pipeline := pipetool.WithACKer(s.pipeline, s.acker) - publisher, err := publish.NewPublisher(pipeline, s.tracer, publisherConfig) + publisher, err := publish.NewPublisher(pipeline, s.tracer) if err != nil { return err } @@ -735,57 +711,6 @@ func hasElasticsearchOutput(b *beat.Beat) bool { return b.Config != nil && b.Config.Output.Name() == "elasticsearch" } -// registerPipelineCallback registers a callback which is invoked when -// `setup --pipelines` is called, to either register pipelines or return -// an error depending on the configuration. -func (bt *beater) registerPipelineSetupCallback(b *beat.Beat) { - if !hasElasticsearchOutput(b) { - bt.logger.Info("Output is not Elasticsearch: pipeline registration disabled") - return - } - - if bt.config.DataStreams.Enabled { - bt.logger.Info("Data streams enabled: pipeline registration disabled") - b.OverwritePipelinesCallback = func(esConfig *common.Config) error { - return errors.New("index pipeline setup must be performed externally when using data streams, by installing the 'apm' integration package") - } - return - } - - if !bt.config.Register.Ingest.Pipeline.Enabled { - bt.logger.Info("Pipeline registration disabled") - return - } - - bt.logger.Info("Registering pipeline callback") - overwrite := bt.config.Register.Ingest.Pipeline.Overwrite - path := bt.config.Register.Ingest.Pipeline.Path - - // ensure setup cmd is working properly - b.OverwritePipelinesCallback = func(esConfig *common.Config) error { - conn, err := eslegclient.NewConnectedClient(esConfig, b.Info.Beat) - if err != nil { - return err - } - return pipeline.RegisterPipelines(conn, overwrite, path) - } -} - -// newPipelineElasticsearchConnectCallback returns an Elasticsearch connect -// callback that ensures the configured pipeline is installed, if configured -// to do so. If data streams are enabled, then pipeline registration is always -// disabled. -func newPipelineElasticsearchConnectCallback(cfg *config.Config) esoutput.ConnectCallback { - return func(conn *eslegclient.Connection) error { - if cfg.DataStreams.Enabled || !cfg.Register.Ingest.Pipeline.Enabled { - return nil - } - overwrite := cfg.Register.Ingest.Pipeline.Overwrite - path := cfg.Register.Ingest.Pipeline.Path - return pipeline.RegisterPipelines(conn, overwrite, path) - } -} - func initTracing(b *beat.Beat, cfg *config.Config, logger *logp.Logger) (*apm.Tracer, *tracerServer, error) { tracer := b.Instrumentation.Tracer() listener := b.Instrumentation.Listener() @@ -897,20 +822,6 @@ func WrapRunServerWithProcessors(runServer RunServerFunc, processors ...model.Ba } } -func newDropLogsBeatProcessor() (beat.ProcessorList, error) { - return processors.New(processors.PluginConfig{ - common.MustNewConfigFrom(map[string]interface{}{ - "drop_event": map[string]interface{}{ - "when": map[string]interface{}{ - "contains": map[string]interface{}{ - "processor.event": "log", - }, - }, - }, - }), - }) -} - // chanReloader implements libbeat/common/reload.Reloadable, converting // Reload calls into requests send to a channel consumed by serve. type chanReloader struct { diff --git a/beater/beater_test.go b/beater/beater_test.go index a9c894ac501..ac1c90643a7 100644 --- a/beater/beater_test.go +++ b/beater/beater_test.go @@ -343,21 +343,3 @@ func TestFleetStoreUsed(t *testing.T) { assert.True(t, called) } - -func Test_newDropLogsBeatProcessor(t *testing.T) { - dropLogsProcessor, err := newDropLogsBeatProcessor() - require.NoError(t, err) - - event := beat.Event{ - Timestamp: time.Now(), - Fields: common.MapStr{ - "processor": common.MapStr{ - "event": "log", - "name": "log", - }, - }, - } - result, err := dropLogsProcessor.Run(&event) - require.NoError(t, err) - require.Nil(t, result) -} diff --git a/beater/config/config.go b/beater/config/config.go index 380ff3c738e..cdedd037304 100644 --- a/beater/config/config.go +++ b/beater/config/config.go @@ -63,7 +63,6 @@ type Config struct { Pprof PprofConfig `config:"pprof"` AugmentEnabled bool `config:"capture_personal_data"` RumConfig RumConfig `config:"rum"` - Register RegisterConfig `config:"register"` Kibana KibanaConfig `config:"kibana"` KibanaAgentConfig KibanaAgentConfig `config:"agent.config"` Aggregation AggregationConfig `config:"aggregation"` @@ -72,8 +71,6 @@ type Config struct { DefaultServiceEnvironment string `config:"default_service_environment"` JavaAttacherConfig JavaAttacherConfig `config:"java_attacher"` - Pipeline string - AgentConfigs []AgentConfig `config:"agent_config"` // WaitReadyInterval holds the interval for checks when waiting for @@ -116,9 +113,6 @@ func NewConfig(ucfg *common.Config, outputESCfg *common.Config) (*Config, error) return nil, err } - if c.DataStreams.Enabled || (outputESCfg != nil && (outputESCfg.HasField("pipeline") || outputESCfg.HasField("pipelines"))) { - c.Pipeline = "" - } return c, nil } @@ -140,10 +134,8 @@ func DefaultConfig() *Config { }, Pprof: PprofConfig{Enabled: false}, RumConfig: defaultRum(), - Register: defaultRegisterConfig(), Kibana: defaultKibanaConfig(), KibanaAgentConfig: defaultKibanaAgentConfig(), - Pipeline: defaultAPMPipeline, Aggregation: defaultAggregationConfig(), Sampling: defaultSamplingConfig(), DataStreams: defaultDataStreamsConfig(), diff --git a/beater/config/config_test.go b/beater/config/config_test.go index f3a00a80d84..827c932fcac 100644 --- a/beater/config/config_test.go +++ b/beater/config/config_test.go @@ -218,21 +218,11 @@ func TestUnpackConfig(t *testing.T) { LibraryPattern: "^custom", ExcludeFromGrouping: "^grouping", }, - Register: RegisterConfig{ - Ingest: IngestConfig{ - Pipeline: PipelineConfig{ - Enabled: true, - Overwrite: false, - Path: filepath.Join("tmp", "definition.json"), - }, - }, - }, Kibana: KibanaConfig{ Enabled: true, ClientConfig: defaultDecodedKibanaClientConfig, }, KibanaAgentConfig: KibanaAgentConfig{Cache: Cache{Expiration: 2 * time.Minute}}, - Pipeline: defaultAPMPipeline, Aggregation: AggregationConfig{ Transactions: TransactionAggregationConfig{ Interval: time.Second, @@ -375,17 +365,8 @@ func TestUnpackConfig(t *testing.T) { LibraryPattern: "rum", ExcludeFromGrouping: "^/webpack", }, - Register: RegisterConfig{ - Ingest: IngestConfig{ - Pipeline: PipelineConfig{ - Enabled: false, - Path: filepath.Join("ingest", "pipeline", "definition.json"), - }, - }, - }, Kibana: defaultKibanaConfig(), KibanaAgentConfig: KibanaAgentConfig{Cache: Cache{Expiration: 30 * time.Second}}, - Pipeline: defaultAPMPipeline, Aggregation: AggregationConfig{ Transactions: TransactionAggregationConfig{ Interval: time.Minute, diff --git a/beater/config/data_streams_test.go b/beater/config/data_streams_test.go deleted file mode 100644 index ec2cca967ae..00000000000 --- a/beater/config/data_streams_test.go +++ /dev/null @@ -1,33 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package config - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/elastic/beats/v7/libbeat/common" -) - -func TestDataStreamsPipeline(t *testing.T) { - cfg, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{"data_streams.enabled": true}), nil) - require.NoError(t, err) - assert.Equal(t, "", cfg.Pipeline) // enabling data streams disables use of the pipeline -} diff --git a/beater/config/register.go b/beater/config/register.go deleted file mode 100644 index e9d3a7423b6..00000000000 --- a/beater/config/register.go +++ /dev/null @@ -1,58 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package config - -import ( - "path/filepath" - - "github.com/elastic/beats/v7/libbeat/paths" -) - -const ( - defaultAPMPipeline = "apm" -) - -// RegisterConfig holds ingest config information -type RegisterConfig struct { - Ingest IngestConfig `config:"ingest"` -} - -// IngestConfig holds config pipeline ingest information -type IngestConfig struct { - Pipeline PipelineConfig `config:"pipeline"` -} - -// PipelineConfig holds config information about registering ingest pipelines -type PipelineConfig struct { - Enabled bool `config:"enabled"` - Overwrite bool `config:"overwrite"` - Path string -} - -func defaultRegisterConfig() RegisterConfig { - return RegisterConfig{ - Ingest: IngestConfig{ - Pipeline: PipelineConfig{ - Enabled: true, - Path: paths.Resolve( - paths.Home, filepath.Join("ingest", "pipeline", "definition.json"), - ), - }, - }, - } -} diff --git a/beater/telemetry.go b/beater/telemetry.go index d9a5926a28e..ae003f032cf 100644 --- a/beater/telemetry.go +++ b/beater/telemetry.go @@ -32,8 +32,6 @@ type configTelemetry struct { rumEnabled *monitoring.Bool apiKeysEnabled *monitoring.Bool kibanaEnabled *monitoring.Bool - pipelinesEnabled *monitoring.Bool - pipelinesOverwrite *monitoring.Bool setupTemplateEnabled *monitoring.Bool setupTemplateOverwrite *monitoring.Bool setupTemplateAppendFields *monitoring.Bool @@ -51,8 +49,6 @@ var configMonitors = &configTelemetry{ rumEnabled: monitoring.NewBool(apmRegistry, "rum.enabled"), apiKeysEnabled: monitoring.NewBool(apmRegistry, "api_key.enabled"), kibanaEnabled: monitoring.NewBool(apmRegistry, "kibana.enabled"), - pipelinesEnabled: monitoring.NewBool(apmRegistry, "register.ingest.pipeline.enabled"), - pipelinesOverwrite: monitoring.NewBool(apmRegistry, "register.ingest.pipeline.overwrite"), setupTemplateEnabled: monitoring.NewBool(apmRegistry, "setup.template.enabled"), setupTemplateOverwrite: monitoring.NewBool(apmRegistry, "setup.template.overwrite"), setupTemplateAppendFields: monitoring.NewBool(apmRegistry, "setup.template.append_fields"), @@ -90,8 +86,6 @@ func recordAPMServerConfig(cfg *config.Config) { configMonitors.apiKeysEnabled.Set(cfg.AgentAuth.APIKey.Enabled) configMonitors.kibanaEnabled.Set(cfg.Kibana.Enabled) configMonitors.sslEnabled.Set(cfg.TLS.IsEnabled()) - configMonitors.pipelinesEnabled.Set(cfg.Register.Ingest.Pipeline.Enabled) - configMonitors.pipelinesOverwrite.Set(cfg.Register.Ingest.Pipeline.Overwrite) configMonitors.tailSamplingEnabled.Set(cfg.Sampling.Tail.Enabled) configMonitors.tailSamplingPolicies.Set(int64(len(cfg.Sampling.Tail.Policies))) } diff --git a/beater/telemetry_test.go b/beater/telemetry_test.go index 5f0ea64d190..9154650bbbb 100644 --- a/beater/telemetry_test.go +++ b/beater/telemetry_test.go @@ -59,8 +59,6 @@ func TestRecordConfigs(t *testing.T) { assert.Equal(t, configMonitors.rumEnabled.Get(), false) assert.Equal(t, configMonitors.apiKeysEnabled.Get(), true) assert.Equal(t, configMonitors.kibanaEnabled.Get(), true) - assert.Equal(t, configMonitors.pipelinesEnabled.Get(), true) - assert.Equal(t, configMonitors.pipelinesOverwrite.Get(), false) assert.Equal(t, configMonitors.setupTemplateEnabled.Get(), true) assert.Equal(t, configMonitors.setupTemplateOverwrite.Get(), true) assert.Equal(t, configMonitors.setupTemplateAppendFields.Get(), false) @@ -76,8 +74,6 @@ func resetCounters() { configMonitors.apiKeysEnabled.Set(false) configMonitors.kibanaEnabled.Set(false) configMonitors.sslEnabled.Set(false) - configMonitors.pipelinesEnabled.Set(false) - configMonitors.pipelinesOverwrite.Set(false) configMonitors.setupTemplateEnabled.Set(false) configMonitors.setupTemplateOverwrite.Set(false) configMonitors.setupTemplateAppendFields.Set(false) diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index 364649414a9..279f802a6a0 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -29,6 +29,7 @@ https://github.com/elastic/apm-server/compare/7.15\...master[View commits] - Removed `apm-server.rum.{allowed_service,event_rate}` configuration option in favor of `apm-server.auth.anonymous.{allow_service,rate_limit}` {pull}6560[6560] - Removed `apm-server.{api_key,secret_token}` configuration options in favor of `apm-server.auth.{api_key,secret_token}` {pull}6560[6560] - Onboarding documents are no longer indexed {pull}6431[6431] +- Removed `apm-server.register.ingest.pipeline` and `output.elasticsearch.pipeline` configuration options {pull}6575[6575] - Removed unused `span.start.us` field, and deprecated `span.http.*` fields {pull}6602[6602] [float] diff --git a/ingest/pipeline/definition.json b/ingest/pipeline/definition.json deleted file mode 100644 index c60c0294cbb..00000000000 --- a/ingest/pipeline/definition.json +++ /dev/null @@ -1,194 +0,0 @@ -[ - { - "id": "apm", - "body": { - "description": "Default enrichment for APM events", - "processors": [ - { - "pipeline": { - "name": "apm_ingest_timestamp" - } - }, - { - "pipeline": { - "name": "apm_user_agent" - } - }, - { - "pipeline": { - "name": "apm_user_geo" - } - }, - { - "pipeline": { - "name": "apm_remove_span_metadata" - } - }, - { - "pipeline": { - "name": "apm_error_grouping_name", - "if": "ctx.processor?.event == 'error'" - } - }, - { - "pipeline": { - "name": "apm_metrics_dynamic_template", - "if": "ctx.processor?.event == 'metric'" - } - } - ] - } - }, - { - "id": "apm_data_stream_migration", - "body": { - "description": "Migrate APM events to data streams", - "processors": [ - { - "script": { - "if": "ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction'", - "source": "ctx.data_stream = [\"type\": \"traces\", \"dataset\": \"apm\", \"namespace\": \"migrated\"]\n" - } - }, - { - "script": { - "if": "ctx.processor?.event == 'error'", - "source": "ctx.data_stream = [\"type\": \"logs\", \"dataset\": \"apm.error\", \"namespace\": \"migrated\"]\n" - } - }, - { - "script": { - "if": "ctx.processor?.event == 'metric'", - "source": "String dataset;\nif (ctx[\"metricset.name\"] != \"app\") {\n dataset = \"apm.internal\";\n} else {\n String serviceName = ctx.service.name;\n serviceName = serviceName.toLowerCase();\n serviceName = /[\\\\\\/*?\"<>| ,#:-]/.matcher(serviceName).replaceAll('_');\n dataset = \"apm.app.\" + serviceName;\n}\nctx.data_stream = [\"type\": \"metrics\", \"dataset\": dataset, \"namespace\": \"migrated\"];\n" - } - }, - { - "set": { - "if": "ctx.data_stream != null", - "field": "_index", - "value": "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}" - } - } - ] - } - }, - { - "id": "apm_user_agent", - "body": { - "description": "Add user agent information for APM events", - "processors": [ - { - "user_agent": { - "field": "user_agent.original", - "target_field": "user_agent", - "ignore_missing": true, - "ignore_failure": true - } - } - ] - } - }, - { - "id": "apm_user_geo", - "body": { - "description": "Add user geo information for APM events", - "processors": [ - { - "geoip": { - "database_file": "GeoLite2-City.mmdb", - "field": "client.ip", - "target_field": "client.geo", - "ignore_missing": true, - "on_failure": [ - { - "remove": { - "field": "client.ip", - "ignore_missing": true, - "ignore_failure": true - } - } - ] - } - } - ] - } - }, - { - "id": "apm_ingest_timestamp", - "body": { - "description": "Add an ingest timestamp for APM events", - "processors": [ - { - "set": { - "if": "ctx.processor?.event != 'span'", - "field": "event.ingested", - "value": "{{_ingest.timestamp}}" - } - } - ] - } - }, - { - "id": "apm_remove_span_metadata", - "body": { - "description": "Removes metadata fields available already on the parent transaction, to save storage", - "processors": [ - { - "remove": { - "if": "ctx.processor?.event == 'span'", - "field": [ - "host", - "process", - "user", - "user_agent", - "container", - "kubernetes", - "service.node", - "service.version", - "service.language", - "service.runtime", - "service.framework" - ], - "ignore_missing": true, - "ignore_failure": true - } - } - ] - } - }, - { - "id": "apm_error_grouping_name", - "body": { - "description": "Set error.grouping_name for APM error events", - "processors": [ - { - "script": { - "source": "ctx.error.grouping_name = ctx.error.exception[0].message", - "if": "ctx.error?.exception?.length != null && ctx.error?.exception?.length > 0" - } - }, - { - "set": { - "field": "error.grouping_name", - "value": "{{error.log.message}}", - "if": "ctx.error?.log?.message != null" - } - } - ] - } - }, - { - "id": "apm_metrics_dynamic_template", - "body": { - "description": "Set dynamic_templates for application metrics", - "processors": [ - { - "script": { - "if": "ctx._metric_descriptions != null", - "source": "Map dynamic_templates = new HashMap();\nfor (entry in ctx._metric_descriptions.entrySet()) {\n String name = entry.getKey();\n Map description = entry.getValue();\n String metric_type = description.type;\n if (metric_type == \"histogram\") {\n dynamic_templates[name] = \"histogram\";\n }\n}\nctx._dynamic_templates = dynamic_templates;\nctx.remove(\"_metric_descriptions\");\n" - } - } - ] - } - } -] \ No newline at end of file diff --git a/ingest/pipeline/definition.yml b/ingest/pipeline/definition.yml deleted file mode 100644 index d18f614b64a..00000000000 --- a/ingest/pipeline/definition.yml +++ /dev/null @@ -1,134 +0,0 @@ -apm: - description: Default enrichment for APM events - processors: - - pipeline: - # apm_ingest_timestamp should always come first, - # ensuring `event.ingested` is set as early as - # possible. - name: apm_ingest_timestamp - - pipeline: - name: apm_user_agent - - pipeline: - name: apm_user_geo - - pipeline: - name: apm_remove_span_metadata - - pipeline: - name: apm_error_grouping_name - if: ctx.processor?.event == 'error' - - pipeline: - name: apm_metrics_dynamic_template - if: ctx.processor?.event == 'metric' - -# apm_data_stream_migration is not used in the main apm pipeline, -# it is installed for migrating legacy indices to data streams, -# e.g. using the Kibana Upgrade Assistant. -apm_data_stream_migration: - description: Migrate APM events to data streams - processors: - - script: - if: ctx.processor?.event == 'span' || ctx.processor?.event == 'transaction' - source: | - ctx.data_stream = ["type": "traces", "dataset": "apm", "namespace": "migrated"] - - script: - if: ctx.processor?.event == 'error' - source: | - ctx.data_stream = ["type": "logs", "dataset": "apm.error", "namespace": "migrated"] - - script: - if: ctx.processor?.event == 'metric' - source: | - String dataset; - if (ctx["metricset.name"] != "app") { - dataset = "apm.internal"; - } else { - String serviceName = ctx.service.name; - serviceName = serviceName.toLowerCase(); - serviceName = /[\\\/*?"<>| ,#:-]/.matcher(serviceName).replaceAll('_'); - dataset = "apm.app." + serviceName; - } - ctx.data_stream = ["type": "metrics", "dataset": dataset, "namespace": "migrated"]; - - set: - if: ctx.data_stream != null - field: _index - value: "{{data_stream.type}}-{{data_stream.dataset}}-{{data_stream.namespace}}" - -apm_user_agent: - description: Add user agent information for APM events - processors: - - user_agent: - field: user_agent.original - target_field: user_agent - ignore_missing: true - ignore_failure: true - -apm_user_geo: - description: Add user geo information for APM events - processors: - - geoip: - database_file: GeoLite2-City.mmdb - field: client.ip - target_field: client.geo - ignore_missing: true - on_failure: - - remove: - field: client.ip - ignore_missing: true - ignore_failure: true - -apm_ingest_timestamp: - description: Add an ingest timestamp for APM events - processors: - - set: - if: ctx.processor?.event != 'span' - field: event.ingested - value: "{{_ingest.timestamp}}" - -apm_remove_span_metadata: - description: Removes metadata fields available already on the parent transaction, to save storage - processors: - - remove: - if: ctx.processor?.event == 'span' - field: - - host - - process - - user - - user_agent - - container - - kubernetes - - service.node - - service.version - - service.language - - service.runtime - - service.framework - ignore_missing: true - ignore_failure: true - -apm_error_grouping_name: - description: Set error.grouping_name for APM error events - processors: - - script: - source: ctx.error.grouping_name = ctx.error.exception[0].message - if: ctx.error?.exception?.length != null && ctx.error?.exception?.length > 0 - - set: - field: error.grouping_name - value: "{{error.log.message}}" - if: ctx.error?.log?.message != null - -# TODO(axw) handle unit in metric descriptions. -# See https://github.com/elastic/elasticsearch/issues/72536 -apm_metrics_dynamic_template: - description: Set dynamic_templates for application metrics - processors: - - script: - if: ctx._metric_descriptions != null - source: | - Map dynamic_templates = new HashMap(); - for (entry in ctx._metric_descriptions.entrySet()) { - String name = entry.getKey(); - Map description = entry.getValue(); - String metric_type = description.type; - if (metric_type == "histogram") { - dynamic_templates[name] = "histogram"; - } - } - ctx._dynamic_templates = dynamic_templates; - ctx.remove("_metric_descriptions"); diff --git a/ingest/pipeline/generate.go b/ingest/pipeline/generate.go deleted file mode 100644 index 6d1290fe9af..00000000000 --- a/ingest/pipeline/generate.go +++ /dev/null @@ -1,132 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//go:build ignore -// +build ignore - -package main - -import ( - "bytes" - "encoding/json" - "fmt" - "io/ioutil" - "log" - "os" - - "gopkg.in/yaml.v3" -) - -type pipelineDefinition struct { - ID interface{} `json:"id"` - Body interface{} `json:"body"` -} - -func main() { - var doc yaml.Node - fin, err := os.Open("definition.yml") - if err != nil { - log.Fatal(err) - } - yamlDecoder := yaml.NewDecoder(fin) - if err := yamlDecoder.Decode(&doc); err != nil { - log.Fatal(err) - } - - // Convert the document structure into the one expected by libbeat. - // e.g. convert {a: 1, b: 2, ...} to [{id: a, body: 1}, {id: b, body: 2}, ...] - if n := len(doc.Content); n != 1 { - log.Fatalf("expected 1 document, got %d", n) - } - mappingNode := doc.Content[0] - sequenceNode := &yaml.Node{Kind: yaml.SequenceNode, Content: make([]*yaml.Node, len(mappingNode.Content)/2)} - for i := 0; i < len(mappingNode.Content); i += 2 { - idNode := mappingNode.Content[i] - bodyNode := mappingNode.Content[i+1] - sequenceNode.Content[i/2] = &yaml.Node{ - Kind: yaml.MappingNode, - Content: []*yaml.Node{ - {Kind: yaml.ScalarNode, Tag: "!!str", Value: "id"}, - idNode, - {Kind: yaml.ScalarNode, Tag: "!!str", Value: "body"}, - bodyNode, - }, - } - } - doc.Content[0] = sequenceNode - - var buf bytes.Buffer - if err := encodeJSON(&buf, &doc); err != nil { - log.Fatal(err) - } - var indented bytes.Buffer - if err := json.Indent(&indented, buf.Bytes(), "", " "); err != nil { - log.Fatal(err) - } - if err := ioutil.WriteFile("definition.json", indented.Bytes(), 0644); err != nil { - log.Fatal(err) - } -} - -func encodeJSON(buf *bytes.Buffer, node *yaml.Node) error { - switch node.Kind { - case yaml.DocumentNode: - return encodeJSON(buf, node.Content[0]) - case yaml.SequenceNode: - buf.WriteByte('[') - for i, node := range node.Content { - if i > 0 { - buf.WriteByte(',') - } - if err := encodeJSON(buf, node); err != nil { - return err - } - } - buf.WriteByte(']') - return nil - case yaml.MappingNode: - buf.WriteByte('{') - for i := 0; i < len(node.Content); i += 2 { - if i > 0 { - buf.WriteByte(',') - } - if err := encodeJSON(buf, node.Content[i]); err != nil { - return err - } - buf.WriteByte(':') - if err := encodeJSON(buf, node.Content[i+1]); err != nil { - return err - } - } - buf.WriteByte('}') - return nil - case yaml.ScalarNode: - switch node.Tag { - case "!!str": - enc := json.NewEncoder(buf) - enc.SetEscapeHTML(false) - return enc.Encode(node.Value) - case "!!bool", "!!int": - buf.WriteString(node.Value) - return nil - default: - return fmt.Errorf("unexpected tag %q at %d:%d", node.Tag, node.Line, node.Column) - } - default: - return fmt.Errorf("unexpected kind %d at %d:%d", node.Kind, node.Line, node.Column) - } -} diff --git a/ingest/pipeline/register.go b/ingest/pipeline/register.go deleted file mode 100644 index 72ef26bc909..00000000000 --- a/ingest/pipeline/register.go +++ /dev/null @@ -1,75 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//go:generate go run generate.go - -package pipeline - -import ( - "encoding/json" - "io/ioutil" - - logs "github.com/elastic/apm-server/log" - - "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" - "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/libbeat/paths" -) - -func RegisterPipelines(conn *eslegclient.Connection, overwrite bool, path string) error { - logger := logp.NewLogger(logs.Pipelines) - pipelines, err := loadPipelinesFromJSON(path) - if err != nil { - return err - } - var exists bool - for _, p := range pipelines { - if !overwrite { - exists, err = conn.PipelineExists(p.Id) - if err != nil { - return err - } - } - if overwrite || !exists { - _, _, err := conn.CreatePipeline(p.Id, nil, p.Body) - if err != nil { - logger.Errorf("Pipeline registration failed for %s.", p.Id) - return err - } - logger.Infof("Pipeline successfully registered: %s", p.Id) - } else { - logger.Infof("Pipeline already registered: %s", p.Id) - } - } - logger.Info("Registered Ingest Pipelines successfully.") - return nil -} - -type pipeline struct { - Id string `json:"id"` - Body map[string]interface{} `json:"body"` -} - -func loadPipelinesFromJSON(path string) ([]pipeline, error) { - pipelineDef, err := ioutil.ReadFile(paths.Resolve(paths.Home, path)) - if err != nil { - return nil, err - } - var pipelines []pipeline - err = json.Unmarshal(pipelineDef, &pipelines) - return pipelines, err -} diff --git a/ingest/pipeline/register_test.go b/ingest/pipeline/register_test.go deleted file mode 100644 index c8e285e3fff..00000000000 --- a/ingest/pipeline/register_test.go +++ /dev/null @@ -1,76 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package pipeline - -import ( - "fmt" - "path/filepath" - "strings" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" -) - -func TestRegisterPipelines(t *testing.T) { - esClients, err := eslegclient.NewClients(getFakeESConfig(9200), "apm-server") - require.NoError(t, err) - esClient := &esClients[0] - path, err := filepath.Abs("definition.json") - require.NoError(t, err) - - // pipeline loading goes wrong - err = RegisterPipelines(esClient, true, "non-existing") - assert.Error(t, err) - assertContainsErrMsg(t, err.Error(), []string{"cannot find the file", "no such file or directory"}) - - // pipeline definition empty - emptyPath, err := filepath.Abs(filepath.FromSlash("../../testdata/ingest/pipeline/empty.json")) - require.NoError(t, err) - err = RegisterPipelines(esClient, true, emptyPath) - assert.NoError(t, err) - - // invalid esClient - invalidClients, err := eslegclient.NewClients(getFakeESConfig(1234), "apm-server") - require.NoError(t, err) - err = RegisterPipelines(&invalidClients[0], true, path) - assert.Error(t, err) - assertContainsErrMsg(t, err.Error(), []string{"connect: cannot assign requested address", "connection refused"}) -} - -func getFakeESConfig(port int) *common.Config { - cfg := map[string]interface{}{ - "hosts": []string{fmt.Sprintf("http://localhost:%v", port)}, - } - c, _ := common.NewConfigFrom(cfg) - return c -} - -func assertContainsErrMsg(t *testing.T, errMsg string, msgs []string) { - var found bool - for _, msg := range msgs { - if strings.Contains(errMsg, msg) { - found = true - break - } - } - assert.NotNil(t, found) -} diff --git a/magefile.go b/magefile.go index f5550102e81..416514de5d6 100644 --- a/magefile.go +++ b/magefile.go @@ -33,7 +33,6 @@ import ( "time" "github.com/magefile/mage/mg" - "github.com/magefile/mage/sh" "github.com/pkg/errors" "github.com/elastic/beats/v7/dev-tools/mage" @@ -173,7 +172,7 @@ func Package() error { } if os.Getenv("SKIP_BUILD") != "true" { - mg.Deps(Update, prepareIngestPackaging) + mg.Deps(Update) mg.Deps(CrossBuild, CrossBuildXPack, CrossBuildGoDaemon) } return mage.Package() @@ -312,109 +311,64 @@ func PythonUnitTest() error { // ----------------------------------------------------------------------------- -// Customizations specific to apm-server. -// - readme.md.tmpl used in packages is customized. -// - apm-server.reference.yml is not included in packages. -// - ingest .json files are included in packaging -// - fields.yml is sourced from the build directory - -var emptyDir = filepath.Clean("build/empty") -var ingestDirGenerated = filepath.Clean("build/packaging/ingest") - func customizePackaging() { + const emptyDir = "build/empty" if err := os.MkdirAll(emptyDir, 0750); err != nil { panic(errors.Wrapf(err, "failed to create dir %v", emptyDir)) } - var ( - readmeTemplate = mage.PackageFile{ - Mode: 0644, - Template: "packaging/files/README.md.tmpl", - } - ingestTarget = "ingest" - ingest = mage.PackageFile{ - Mode: 0644, - Source: ingestDirGenerated, - } - ) for idx := len(mage.Packages) - 1; idx >= 0; idx-- { args := &mage.Packages[idx] + pkgType := args.Types[0] + if pkgType == mage.DMG { + // We do not build macOS packages. + mage.Packages = append(mage.Packages[:idx], mage.Packages[idx+1:]...) + continue + } + + // Replace the generic Beats README.md with an APM specific one, and remove files unused by apm-server. + for filename, filespec := range args.Spec.Files { + switch filespec.Source { + case "{{ elastic_beats_dir }}/dev-tools/packaging/templates/common/README.md.tmpl": + args.Spec.Files[filename] = mage.PackageFile{Mode: 0644, Template: "packaging/files/README.md.tmpl"} + case "_meta/kibana.generated", "{{.BeatName}}.reference.yml": + delete(args.Spec.Files, filename) + case "fields.yml": + // Source fields.yml from the build directory. + if args.Spec.License == "Elastic License" || args.Spec.License == "Elastic License 2.0" { + filespec.Source = mage.FieldsAllYML + } else { + filespec.Source = mage.FieldsYML + } + args.Spec.Files[filename] = filespec + } + } switch pkgType := args.Types[0]; pkgType { case mage.Zip, mage.TarGz: - // Remove the reference config file from packages. - delete(args.Spec.Files, "{{.BeatName}}.reference.yml") - - // Replace the README.md with an APM specific file. - args.Spec.ReplaceFile("README.md", readmeTemplate) - args.Spec.Files[ingestTarget] = ingest + // No changes required. case mage.Docker: - delete(args.Spec.Files, "{{.BeatName}}.reference.yml") - args.Spec.ReplaceFile("README.md", readmeTemplate) - args.Spec.Files[ingestTarget] = ingest args.Spec.ExtraVars["expose_ports"] = config.DefaultPort args.Spec.ExtraVars["repository"] = "docker.elastic.co/apm" case mage.Deb, mage.RPM: - delete(args.Spec.Files, "/etc/{{.BeatName}}/{{.BeatName}}.reference.yml") - args.Spec.ReplaceFile("/usr/share/{{.BeatName}}/README.md", readmeTemplate) - args.Spec.Files["/usr/share/{{.BeatName}}/"+ingestTarget] = ingest - - // update config file Owner + // Update config file owner. pf := args.Spec.Files["/etc/{{.BeatName}}/{{.BeatName}}.yml"] pf.Owner = mage.BeatUser args.Spec.Files["/etc/{{.BeatName}}/{{.BeatName}}.yml"] = pf - - args.Spec.Files["/var/lib/{{.BeatName}}"] = mage.PackageFile{Mode: 0750, Source: emptyDir, Owner: mage.BeatUser} args.Spec.Files["/var/log/{{.BeatName}}"] = mage.PackageFile{Mode: 0750, Source: emptyDir, Owner: mage.BeatUser} + + // Customise the pre-install and post-install scripts. args.Spec.PreInstallScript = "packaging/files/linux/pre-install.sh.tmpl" if pkgType == mage.Deb { args.Spec.PostInstallScript = "packaging/files/linux/deb-post-install.sh.tmpl" } - case mage.DMG: - // We do not build macOS packages. - mage.Packages = append(mage.Packages[:idx], mage.Packages[idx+1:]...) - continue - default: panic(errors.Errorf("unhandled package type: %v", pkgType)) } - - for filename, filespec := range args.Spec.Files { - switch { - case strings.HasPrefix(filespec.Source, "_meta/kibana"): - // Remove Kibana dashboard files. - delete(args.Spec.Files, filename) - - case filespec.Source == "fields.yml": - // Source fields.yml from the build directory. - if args.Spec.License == "Elastic License" || args.Spec.License == "Elastic License 2.0" { - filespec.Source = mage.FieldsAllYML - } else { - filespec.Source = mage.FieldsYML - } - args.Spec.Files[filename] = filespec - } - } - } -} - -func prepareIngestPackaging() error { - if err := sh.Rm(ingestDirGenerated); err != nil { - return err } - - copy := &mage.CopyTask{ - Source: "ingest", - Dest: ingestDirGenerated, - Mode: 0644, - DirMode: 0755, - Exclude: []string{".*.go"}, - } - return copy.Execute() - } // DumpVariables writes the template variables and values to stdout. diff --git a/publish/pub.go b/publish/pub.go index 446009b5796..5dd2a9c6068 100644 --- a/publish/pub.go +++ b/publish/pub.go @@ -27,7 +27,6 @@ import ( "go.elastic.co/apm" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/apm-server/model" ) @@ -44,10 +43,9 @@ type Reporter func(context.Context, PendingReq) error // number of events active in the system can exceed the queue size. Only the number of // concurrent HTTP requests trying to publish at the same time is limited. type Publisher struct { - stopped chan struct{} - tracer *apm.Tracer - client beat.Client - pipeline string + stopped chan struct{} + tracer *apm.Tracer + client beat.Client mu sync.RWMutex stopping bool @@ -63,17 +61,6 @@ type Transformer interface { Transform(context.Context) []beat.Event } -// PublisherConfig is a struct holding configuration information for the publisher. -type PublisherConfig struct { - Info beat.Info - Pipeline string - Processor beat.ProcessorList -} - -func (cfg *PublisherConfig) Validate() error { - return nil -} - var ( ErrFull = errors.New("queue is full") ErrChannelClosed = errors.New("can't send batch, publisher is being stopped") @@ -83,16 +70,11 @@ var ( // // GOMAXPROCS goroutines are started for forwarding events to libbeat. // Stop must be called to close the beat.Client and free resources. -func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer, cfg PublisherConfig) (*Publisher, error) { - if err := cfg.Validate(); err != nil { - return nil, errors.Wrap(err, "invalid config") - } - - processingCfg := beat.ProcessingConfig{Processor: cfg.Processor} +func NewPublisher(pipeline beat.Pipeline, tracer *apm.Tracer) (*Publisher, error) { + processingCfg := beat.ProcessingConfig{} p := &Publisher{ - tracer: tracer, - stopped: make(chan struct{}), - pipeline: cfg.Pipeline, + tracer: tracer, + stopped: make(chan struct{}), // One request will be actively processed by the // worker, while the other concurrent requests will be buffered in the queue. @@ -184,17 +166,9 @@ func (p *Publisher) Send(ctx context.Context, req PendingReq) error { } func (p *Publisher) run() { - var meta common.MapStr - if p.pipeline != "" { - meta = common.MapStr{"pipeline": p.pipeline} - } - ctx := context.Background() for req := range p.pendingRequests { events := req.Transformable.Transform(ctx) - for i := range events { - events[i].Meta = meta - } p.client.PublishAll(events) } } diff --git a/publish/pub_test.go b/publish/pub_test.go index cbeb2a0af74..6e1309f2887 100644 --- a/publish/pub_test.go +++ b/publish/pub_test.go @@ -53,7 +53,7 @@ func TestPublisherStop(t *testing.T) { // so we can simulate a pipeline that blocks indefinitely. pipeline := newBlockingPipeline(t) publisher, err := publish.NewPublisher( - pipeline, apmtest.DiscardTracer, publish.PublisherConfig{}, + pipeline, apmtest.DiscardTracer, ) require.NoError(t, err) defer func() { @@ -96,7 +96,6 @@ func TestPublisherStopShutdownInactive(t *testing.T) { publisher, err := publish.NewPublisher( newBlockingPipeline(t), apmtest.DiscardTracer, - publish.PublisherConfig{}, ) require.NoError(t, err) @@ -156,7 +155,6 @@ func BenchmarkPublisher(b *testing.B) { publisher, err := publish.NewPublisher( pipetool.WithACKer(pipeline, acker), apmtest.DiscardTracer, - publish.PublisherConfig{}, ) require.NoError(b, err)