Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial support for indexing to data streams #4409

Merged
merged 15 commits into from
Nov 19, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions _meta/fields.common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,21 @@
description: >
Fields common to various APM events.
fields:
- name: data_stream.type
type: keyword
description: Data stream type, e.g. logs, metrics, or traces.
example: traces

- name: data_stream.dataset
type: keyword
description: Data stream dataset name.
example: backend_service

- name: data_stream.namespace
type: keyword
description: User-defined data stream namespace.
example: production

- name: processor.name
type: keyword
description: Processor name.
Expand Down
27 changes: 24 additions & 3 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ import (
"github.com/elastic/beats/v7/libbeat/instrumentation"
"github.com/elastic/beats/v7/libbeat/logp"
esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/processors"

"github.com/elastic/apm-server/beater/config"
"github.com/elastic/apm-server/datastreams"
"github.com/elastic/apm-server/elasticsearch"
"github.com/elastic/apm-server/ingest/pipeline"
logs "github.com/elastic/apm-server/log"
Expand Down Expand Up @@ -117,7 +119,6 @@ type beater struct {
// Run runs the APM Server, blocking until the beater's Stop method is called,
// or a fatal error occurs.
func (bt *beater) Run(b *beat.Beat) error {

done := make(chan struct{})

var reloadOnce sync.Once
Expand All @@ -127,6 +128,7 @@ func (bt *beater) Run(b *beat.Beat) error {
// during startup. This might change when APM Server is included in Fleet
reloadOnce.Do(func() {
defer close(done)
// TODO(axw) config received from Fleet should be modified to set data_streams.enabled.
var cfg *config.Config
cfg, err = config.NewConfig(ucfg.Config, elasticsearchOutputConfig(b))
if err != nil {
Expand Down Expand Up @@ -365,11 +367,30 @@ func newPublisher(b *beat.Beat, cfg *config.Config, tracer *apm.Tracer) (*publis
if err != nil {
return nil, err
}
return publish.NewPublisher(b.Publisher, tracer, &publish.PublisherConfig{
publisherConfig := &publish.PublisherConfig{
Info: b.Info,
Pipeline: cfg.Pipeline,
TransformConfig: transformConfig,
})
}
if !cfg.DataStreams.Enabled {
// Remove data_stream.* fields during publishing when data streams are disabled.
processors, err := processors.New(processors.PluginConfig{common.MustNewConfigFrom(
map[string]interface{}{
"drop_fields": map[string]interface{}{
"fields": []interface{}{
datastreams.TypeField,
datastreams.DatasetField,
datastreams.NamespaceField,
},
},
},
)})
if err != nil {
return nil, err
}
publisherConfig.Processor = processors
}
return publish.NewPublisher(b.Publisher, tracer, publisherConfig)
}

func newTransformConfig(beatInfo beat.Info, cfg *config.Config) (*transform.Config, error) {
Expand Down
2 changes: 2 additions & 0 deletions beater/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type Config struct {
JaegerConfig JaegerConfig `config:"jaeger"`
Aggregation AggregationConfig `config:"aggregation"`
Sampling SamplingConfig `config:"sampling"`
DataStreams DataStreamsConfig `config:"data_streams"`

Pipeline string
}
Expand Down Expand Up @@ -189,5 +190,6 @@ func DefaultConfig() *Config {
JaegerConfig: defaultJaeger(),
Aggregation: defaultAggregationConfig(),
Sampling: defaultSamplingConfig(),
DataStreams: defaultDataStreamsConfig(),
}
}
27 changes: 27 additions & 0 deletions beater/config/data_streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package config

// DataStreamsConfig holds data streams configuration.
type DataStreamsConfig struct {
Enabled bool `config:"enabled"`
}

func defaultDataStreamsConfig() DataStreamsConfig {
return DataStreamsConfig{Enabled: false}
}
9 changes: 7 additions & 2 deletions beater/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,13 @@ func (h *httpServer) start() error {
h.logger.Infof("Connection limit set to: %d", h.cfg.MaxConnections)
}

// Create the "onboarding" document, which contains the server's listening address.
notifyListening(context.Background(), addr, h.reporter)
if !h.cfg.DataStreams.Enabled {
// Create the "onboarding" document, which contains the server's
// listening address. We only do this if data streams are not enabled,
// as onboarding documents are incompatible with data streams.
// Onboarding documents should be replaced by Fleet status later.
notifyListening(context.Background(), addr, h.reporter)
}

if h.TLSConfig != nil {
h.logger.Info("SSL enabled.")
Expand Down
3 changes: 3 additions & 0 deletions beater/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
var apmRegistry = monitoring.GetNamespace("state").GetRegistry().NewRegistry("apm-server")

type configTelemetry struct {
dataStreamsEnabled *monitoring.Bool
rumEnabled *monitoring.Bool
apiKeysEnabled *monitoring.Bool
kibanaEnabled *monitoring.Bool
Expand All @@ -49,6 +50,7 @@ type configTelemetry struct {
}

var configMonitors = &configTelemetry{
dataStreamsEnabled: monitoring.NewBool(apmRegistry, "data_streams.enabled"),
rumEnabled: monitoring.NewBool(apmRegistry, "rum.enabled"),
apiKeysEnabled: monitoring.NewBool(apmRegistry, "api_key.enabled"),
kibanaEnabled: monitoring.NewBool(apmRegistry, "kibana.enabled"),
Expand All @@ -73,6 +75,7 @@ func recordConfigs(info beat.Info, apmCfg *config.Config, rootCfg *common.Config
if err != nil {
return err
}
configMonitors.dataStreamsEnabled.Set(apmCfg.DataStreams.Enabled)
configMonitors.rumEnabled.Set(apmCfg.RumConfig.IsEnabled())
configMonitors.apiKeysEnabled.Set(apmCfg.APIKeyConfig.IsEnabled())
configMonitors.kibanaEnabled.Set(apmCfg.Kibana.Enabled)
Expand Down
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ https://github.com/elastic/apm-server/compare/7.10\...master[View commits]
* Log warnings in aggregation of transaction metrics when grouping limit is reached {pull}4313[4313]
* Configurable tail-based sampling policies {pull}4320[4320]
* Monitoring and telemetry for tail-based sampling {pull}4346[4346] {pull}4360[4360]
* Support for data streams {pull}4409[4409]
axw marked this conversation as resolved.
Show resolved Hide resolved
37 changes: 37 additions & 0 deletions datastreams/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package datastreams

// Constants for data stream types.
const (
LogsType = "logs"
MetricsType = "metrics"
TracesType = "traces"
)

// Cosntants for data stream event metadata fields.
const (
TypeField = "data_stream.type"
DatasetField = "data_stream.dataset"
NamespaceField = "data_stream.namespace"
)

// IndexFormat holds the variable "index" format to use for the libbeat Elasticsearch output.
// Each event the server publishes is expected to contain data_stream.* fields, which will
axw marked this conversation as resolved.
Show resolved Hide resolved
// added to the documents as well as be used for routing documents to the correct data stream.
const IndexFormat = "%{[data_stream.type]}-%{[data_stream.dataset]}-%{[data_stream.namespace]}"
39 changes: 39 additions & 0 deletions datastreams/servicename.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package datastreams

import "strings"

// NormalizeServiceName translates serviceName into a string suitable
// for inclusion in a data stream name.
//
// Concretely, this function will lowercase the string and replace any
// reserved characters with "_".
func NormalizeServiceName(s string) string {
s = strings.ToLower(s)
s = strings.Map(replaceReservedRune, s)
axw marked this conversation as resolved.
Show resolved Hide resolved
return s
}

func replaceReservedRune(r rune) rune {
switch r {
case '\\', '/', '*', '?', '"', '<', '>', '|', ' ', ',', '#', ':':
return '_'
axw marked this conversation as resolved.
Show resolved Hide resolved
}
return r
}
35 changes: 35 additions & 0 deletions datastreams/servicename_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package datastreams_test

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/apm-server/datastreams"
)

func TestNormalizeServiceName(t *testing.T) {
testNormalizeServiceName := func(expected, input string) {
t.Helper()
assert.Equal(t, expected, datastreams.NormalizeServiceName(input))
}
testNormalizeServiceName("uppercase", "UPPERCASE")
testNormalizeServiceName("____________", "\\/*?\"<>| ,#:")
}
33 changes: 33 additions & 0 deletions docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,39 @@ Fields common to various APM events.



*`data_stream.type`*::
+
--
Data stream type, e.g. logs, metrics, or traces.
axw marked this conversation as resolved.
Show resolved Hide resolved

type: keyword

example: traces

--

*`data_stream.dataset`*::
+
--
Data stream dataset name.

type: keyword

example: backend_service

--

*`data_stream.namespace`*::
+
--
User-defined data stream namespace.

type: keyword

example: production

--

*`processor.name`*::
+
--
Expand Down
Loading