Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SignalFx exporter: Add host metadata synchronization #1039

Merged
merged 1 commit into from
Sep 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions exporter/signalfxexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ The following configuration options can also be configured:
- `translation_rules`: Set of rules on how to translate metrics to a SignalFx
compatible format. Rules defined in `translation/constants.go` are used by
default. Applicable only when `send_compatible_metrics` set to `true`.
- `sync_host_metadata`: Defines whether the exporter should scrape host metadata
and send it as property updates to SignalFx backend. Disabled by default.
IMPORTANT: Host metadata synchronization relies on `resourcedetection`
processor. If this option is enabled make sure that `resourcedetection`
processor is enabled in the pipeline with one of the cloud provider detectors
or environment variable detector setting a unique value to `host.name` attribute
within your k8s cluster. And keep `override=true` in resourcedetection config.

Example:

Expand Down
10 changes: 10 additions & 0 deletions exporter/signalfxexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ type Config struct {
// DeltaTranslationTTL specifies in seconds the max duration to keep the most recent datapoint for any
// `delta_metric` specified in TranslationRules. Default is 3600s.
DeltaTranslationTTL int64 `mapstructure:"delta_translation_ttl"`

// SyncHostMetadata defines if the exporter should scrape host metadata and
// sends it as property updates to SignalFx backend.
// IMPORTANT: Host metadata synchronization relies on `resourcedetection` processor.
// If this option is enabled make sure that `resourcedetection` processor
// is enabled in the pipeline with one of the cloud provider detectors
// or environment variable detector setting a unique value to
// `host.name` attribute within your k8s cluster. Also keep override
// And keep `override=true` in resourcedetection config.
SyncHostMetadata bool `mapstructure:"sync_host_metadata"`
}

func (cfg *Config) getOptionsFromConfig() (*exporterOptions, error) {
Expand Down
2 changes: 2 additions & 0 deletions exporter/signalfxexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func TestConfig_getOptionsFromConfig(t *testing.T) {
Headers map[string]string
SendCompatibleMetrics bool
TranslationRules []translation.Rule
SyncHostMetadata bool
}
tests := []struct {
name string
Expand Down Expand Up @@ -215,6 +216,7 @@ func TestConfig_getOptionsFromConfig(t *testing.T) {
Headers: tt.fields.Headers,
SendCompatibleMetrics: tt.fields.SendCompatibleMetrics,
TranslationRules: tt.fields.TranslationRules,
SyncHostMetadata: tt.fields.SyncHostMetadata,
}
got, err := cfg.getOptionsFromConfig()
if (err != nil) != tt.wantErr {
Expand Down
5 changes: 5 additions & 0 deletions exporter/signalfxexporter/dimensions/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/collection"
)

// MetadataUpdateClient is an interface for pushing metadata updates
type MetadataUpdateClient interface {
PushMetadata([]*collection.MetadataUpdate) error
}

var propNameSanitizer = strings.NewReplacer(
".", "_",
"/", "_")
Expand Down
25 changes: 18 additions & 7 deletions exporter/signalfxexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,17 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/dimensions"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/hostmetadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/translation"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/collection"
)

type signalfxExporter struct {
logger *zap.Logger
pushMetricsData func(ctx context.Context, md pdata.Metrics) (droppedTimeSeries int, err error)
pushMetadata func(metadata []*collection.MetadataUpdate) error
pushResourceLogs func(ctx context.Context, ld pdata.ResourceLogs) (droppedLogRecords int, err error)
logger *zap.Logger
pushMetricsData func(ctx context.Context, md pdata.Metrics) (droppedTimeSeries int, err error)
pushMetadata func(metadata []*collection.MetadataUpdate) error
pushResourceLogs func(ctx context.Context, ld pdata.ResourceLogs) (droppedLogRecords int, err error)
hostMetadataSyncer *hostmetadata.Syncer
}

type exporterOptions struct {
Expand Down Expand Up @@ -102,10 +104,16 @@ func newSignalFxExporter(
})
dimClient.Start()

var hms *hostmetadata.Syncer
if config.SyncHostMetadata {
hms = hostmetadata.NewSyncer(logger, dimClient)
}

return signalfxExporter{
logger: logger,
pushMetricsData: dpClient.pushMetricsData,
pushMetadata: dimClient.PushMetadata,
logger: logger,
pushMetricsData: dpClient.pushMetricsData,
pushMetadata: dimClient.PushMetadata,
hostMetadataSyncer: hms,
}, nil
}

Expand Down Expand Up @@ -160,6 +168,9 @@ func (se signalfxExporter) Shutdown(context.Context) error {
func (se signalfxExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
ctx = obsreport.StartMetricsExportOp(ctx, typeStr)
numDroppedTimeSeries, err := se.pushMetricsData(ctx, md)
if err == nil && se.hostMetadataSyncer != nil {
se.hostMetadataSyncer.Sync(md)
}
numReceivedTimeSeries, numPoints := md.MetricAndDataPointCount()

obsreport.EndMetricsExportOp(ctx, numPoints, numReceivedTimeSeries, numDroppedTimeSeries, err)
Expand Down
10 changes: 10 additions & 0 deletions exporter/signalfxexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ func TestNew(t *testing.T) {
Headers: nil,
},
},
{
name: "create exporter with host metadata syncer",
config: &Config{
AccessToken: "someToken",
Realm: "xyz",
Timeout: 1 * time.Second,
Headers: nil,
SyncHostMetadata: true,
},
},
}

for _, tt := range tests {
Expand Down
2 changes: 1 addition & 1 deletion exporter/signalfxexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestCreateMetricsExporterWithDefaultTranslaitonRules(t *testing.T) {
// Expected values has to be updated once default config changed
assert.Equal(t, 48, len(config.TranslationRules))
assert.Equal(t, translation.ActionRenameDimensionKeys, config.TranslationRules[0].Action)
assert.Equal(t, 32, len(config.TranslationRules[0].Mapping))
assert.Equal(t, 33, len(config.TranslationRules[0].Mapping))
}

func TestCreateMetricsExporterWithSpecifiedTranslaitonRules(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions exporter/signalfxexporter/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1119,6 +1119,7 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4 h1:LYy1Hy3MJdrCdMwwzxA/dRok4ejH+RwNGbuoD9fCjto=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opentelemetry.io/collector v0.11.0/go.mod h1:tJNTr3RWiwUyYKI6dtlHY+G/jfKa/+Ewv6MvmwLiqoE=
go.opentelemetry.io/collector v0.11.1-0.20200924160956-8690937037da h1:W990SgXqeewmIaj1I53yr253Kdl7+7yu6wPB/jUlxIo=
go.opentelemetry.io/collector v0.11.1-0.20200924160956-8690937037da/go.mod h1:tJNTr3RWiwUyYKI6dtlHY+G/jfKa/+Ewv6MvmwLiqoE=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
Expand Down
147 changes: 147 additions & 0 deletions exporter/signalfxexporter/hostmetadata/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package hostmetadata

import (
"sync"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/translator/conventions"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/dimensions"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/splunk"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/collection"
)

var (
// Resource attribute that used as fallback if there are no cloud provider attributes.
extraHostIDAttribute = conventions.AttributeHostName
)

// Syncer is a config structure for host metadata syncer.
type Syncer struct {
logger *zap.Logger
dimClient dimensions.MetadataUpdateClient
once sync.Once
}

// NewSyncer creates new instance of host metadata syncer.
func NewSyncer(logger *zap.Logger, dimClient dimensions.MetadataUpdateClient) *Syncer {
return &Syncer{
logger: logger,
dimClient: dimClient,
}
}

func (s *Syncer) Sync(md pdata.Metrics) {
// skip if already synced or if metrics data is empty
if md.ResourceMetrics().Len() == 0 {
return
}
s.once.Do(func() {
s.syncOnResource(md.ResourceMetrics().At(0).Resource())
})
}

func (s *Syncer) syncOnResource(res pdata.Resource) {
// If resourcedetection processor is enabled, all the metrics should have resource attributes
// that can be used to update host metadata.
// Based of this assumption we check just one ResourceMetrics object,
hostID, ok := getHostIDFromResource(res)
if !ok {
// if no attributes found, we assume that resourcedetection is not enabled or
// it doesn't set right attributes, and we do not retry.
s.logger.Error("Not found any host attributes. Host metadata synchronization skipped. " +
"Make sure that \"resourcedetection\" processor is enabled in the pipeline with one of " +
"the cloud provider detectors or environment variable detector setting \"host.name\" attribute")
return
}

props := s.scrapeHostProperties()
if len(props) == 0 {
// do not retry if scraping failed.
s.logger.Error("Failed to fetch system properties. Host metadata synchronization skipped")
return
}

metadataUpdate := s.prepareMetadataUpdate(props, hostID)
err := s.dimClient.PushMetadata([]*collection.MetadataUpdate{metadataUpdate})
if err != nil {
s.logger.Error("Failed to push host metadata update", zap.Error(err))
return
}
dmitryax marked this conversation as resolved.
Show resolved Hide resolved

s.logger.Info("Host metadata synchronized")
}

func (s *Syncer) prepareMetadataUpdate(props map[string]string, hostID splunk.HostID) *collection.MetadataUpdate {
return &collection.MetadataUpdate{
ResourceIDKey: string(hostID.Key),
ResourceID: collection.ResourceID(hostID.ID),
MetadataDelta: collection.MetadataDelta{
MetadataToUpdate: props,
},
}
}

func (s *Syncer) scrapeHostProperties() map[string]string {
props := make(map[string]string)

cpu, err := getCPU()
if err == nil {
for k, v := range cpu.toStringMap() {
props[k] = v
}
} else {
s.logger.Warn("Failed to scrape host hostCPU metadata", zap.Error(err))
}

mem, err := getMemory()
if err == nil {
for k, v := range mem.toStringMap() {
props[k] = v
}
} else {
s.logger.Warn("Failed to scrape host memory metadata", zap.Error(err))
}

os, err := getOS()
if err == nil {
for k, v := range os.toStringMap() {
props[k] = v
}
} else {
s.logger.Warn("Failed to scrape host hostOS metadata", zap.Error(err))
}

return props
}

func getHostIDFromResource(res pdata.Resource) (splunk.HostID, bool) {
hostID, ok := splunk.ResourceToHostID(res)
if ok {
return hostID, ok
}

if attr, ok := res.Attributes().Get(extraHostIDAttribute); ok {
return splunk.HostID{
Key: splunk.HostIDKey(extraHostIDAttribute),
ID: attr.StringVal(),
}, true
}

return splunk.HostID{}, false
}
24 changes: 24 additions & 0 deletions exporter/signalfxexporter/hostmetadata/metadata_linux_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package hostmetadata

import "syscall"

func mockSyscallUname() {
syscallUname = func(in *syscall.Utsname) error {
in.Machine = [65]int8{}
return nil
}
}
19 changes: 19 additions & 0 deletions exporter/signalfxexporter/hostmetadata/metadata_others_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !linux

package hostmetadata

func mockSyscallUname() {}
Loading