diff --git a/.chloggen/splunkhecexporter-add-heartbeat.yaml b/.chloggen/splunkhecexporter-add-heartbeat.yaml new file mode 100644 index 000000000000..369e64cded95 --- /dev/null +++ b/.chloggen/splunkhecexporter-add-heartbeat.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: splunkhecexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adding an option in splunkhecexporter to enable heartbeat. A heartbeat is a metadata event about the current environment and build information. If heartbeat is enabled, splunkhecexporter will periodically send heartbeat to the destination in given time intervals from configurations. + +# One or more tracking issues related to the change +issues: [20225] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/exporter/splunkhecexporter/README.md b/exporter/splunkhecexporter/README.md index 6eefeafde718..bb8e967ef805 100644 --- a/exporter/splunkhecexporter/README.md +++ b/exporter/splunkhecexporter/README.md @@ -57,6 +57,10 @@ The following configuration options can also be configured: - `otel_to_hec_fields/severity_text` (default = `otel.log.severity.text`): Specifies the name of the field to map the severity text field of log events. - `otel_to_hec_fields/severity_number` (default = `otel.log.severity.number`): Specifies the name of the field to map the severity number field of log events. - `otel_to_hec_fields/name` (default = `"otel.log.name`): Specifies the name of the field to map the name field of log events. +- `hec_heartbeat/interval` (no default): Specifies the interval of sending hec heartbeat to the destination. If not specified, heartbeat is not enabled. +- `telemetry/enabled` (default: false): Specifies whether to enable telemetry inside splunk hec exporter. +- `telemetry/override_metrics_names` (default: empty map): Specifies the metrics name to overrides in splunk hec exporter. +- `telemetry/extra_attributes` (default: empty map): Specifies the extra metrics attributes in splunk hec exporter. In addition, this exporter offers queued retry which is enabled by default. Information about queued retry configuration parameters can be found @@ -115,6 +119,16 @@ exporters: splunk_app_name: "OpenTelemetry-Collector Splunk Exporter" # Application version is used to track telemetry information for Splunk App's using HEC by App version. splunk_app_version: "v0.0.1" + hec_heartbeat: + interval: 30s + telemetry: + enabled: true + override_metrics_names: + otelcol_exporter_splunkhec_heartbeat_sent: app_heartbeats_success_total + otelcol_exporter_splunkhec_heartbeat_failed: app_heartbeats_failed_total + extra_attributes: + dataset_name: SplunkCloudBeaverStack + custom_key: custom_value ``` The full list of settings exposed for this exporter are documented [here](config.go) diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index 6f81be61b4ee..b8cd4ff644a8 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -34,6 +34,11 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk" ) +// allow monkey patching for injecting pushLogData function in test +var getPushLogFn = func(c *client) func(ctx context.Context, ld plog.Logs) error { + return c.pushLogData +} + // client sends the data to the splunk backend. type client struct { config *Config @@ -42,6 +47,7 @@ type client struct { telemetrySettings component.TelemetrySettings hecWorker hecWorker buildInfo component.BuildInfo + heartbeater *heartbeater } func (c *client) pushMetricsData( @@ -631,6 +637,9 @@ func subTracesByType(src ptrace.Traces, from *index, dst ptrace.Traces) { func (c *client) stop(context.Context) error { c.wg.Wait() + if c.heartbeater != nil { + c.heartbeater.shutdown() + } return nil } @@ -650,6 +659,10 @@ func (c *client) start(ctx context.Context, host component.Host) (err error) { } url, _ := c.config.getURL() c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(c.config, c.buildInfo)} + c.heartbeater = newHeartbeater(c.config, getPushLogFn(c)) + if c.heartbeater != nil { + c.heartbeater.initHeartbeat(c.buildInfo) + } return nil } diff --git a/exporter/splunkhecexporter/client_test.go b/exporter/splunkhecexporter/client_test.go index e5fd35f1f1bf..61589ea83a87 100644 --- a/exporter/splunkhecexporter/client_test.go +++ b/exporter/splunkhecexporter/client_test.go @@ -1513,6 +1513,44 @@ func TestAllowedLogDataTypes(t *testing.T) { } } +func Test_heartbeat_success(t *testing.T) { + config := NewFactory().CreateDefaultConfig().(*Config) + config.HecHeartbeat = HecHeartbeat{ + Interval: 10 * time.Millisecond, + } + + consumeLogsChan := make(chan plog.Logs, 10) + fnBefore := getPushLogFn + t.Cleanup(func() { + getPushLogFn = fnBefore + }) + getPushLogFn = func(c *client) func(ctx context.Context, ld plog.Logs) error { + return func(ctx context.Context, ld plog.Logs) error { + consumeLogsChan <- ld + return nil + } + } + + c := client{ + config: config, + logger: zaptest.NewLogger(t), + hecWorker: &defaultHecWorker{&url.URL{Scheme: "http", Host: "splunk"}, http.DefaultClient, buildHTTPHeaders(config, component.NewDefaultBuildInfo())}, + } + + err := c.start(context.Background(), componenttest.NewNopHost()) + t.Cleanup(func() { + _ = c.stop(context.Background()) + }) + assert.NoError(t, err) + + assert.Eventually(t, func() bool { + return len(consumeLogsChan) != 0 + }, time.Second, 10*time.Millisecond) + + logs := <-consumeLogsChan + assertHeartbeatInfoLog(t, logs) +} + func TestSubLogs(t *testing.T) { // Creating 12 logs (2 resources x 2 libraries x 3 records) logs := createLogData(2, 2, 3) diff --git a/exporter/splunkhecexporter/config.go b/exporter/splunkhecexporter/config.go index 7ed59137baff..5bcf5ebde10b 100644 --- a/exporter/splunkhecexporter/config.go +++ b/exporter/splunkhecexporter/config.go @@ -19,6 +19,7 @@ import ( "fmt" "net/url" "path" + "time" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" @@ -48,6 +49,26 @@ type OtelToHecFields struct { SeverityNumber string `mapstructure:"severity_number"` } +// HecHeartbeat defines the heartbeat information for the exporter +type HecHeartbeat struct { + // Interval represents the time interval for the heartbeat interval. If nothing or 0 is set, + // heartbeat is not enabled. + // A heartbeat is an event sent to _internal index with metadata for the current collector/host. + Interval time.Duration `mapstructure:"interval"` +} + +// HecTelemetry defines the telemetry configuration for the exporter +type HecTelemetry struct { + // Enabled is the bool to enable telemetry inside splunk hec exporter + Enabled bool `mapstructure:"enabled"` + + // OverrideMetricsNames is the map to override metrics for internal metrics in splunk hec exporter + OverrideMetricsNames map[string]string `mapstructure:"override_metrics_names"` + + // ExtraAttributes is the extra attributes for metrics inside splunk hex exporter + ExtraAttributes map[string]string `mapstructure:"extra_attributes"` +} + // Config defines configuration for Splunk exporter. type Config struct { confighttp.HTTPClientSettings `mapstructure:",squash"` @@ -117,6 +138,12 @@ type Config struct { // UseMultiMetricFormat combines metric events to save space during ingestion. UseMultiMetricFormat bool `mapstructure:"use_multi_metric_format"` + + // HecHeartbeat is the configuration to enable heartbeat + HecHeartbeat HecHeartbeat `mapstructure:"hec_heartbeat"` + + // HecTelemetry is the configuration for splunk hec exporter telemetry + HecTelemetry HecTelemetry `mapstructure:"telemetry"` } func (cfg *Config) getURL() (out *url.URL, err error) { diff --git a/exporter/splunkhecexporter/config_test.go b/exporter/splunkhecexporter/config_test.go index e789bc600bd0..36983472d7e5 100644 --- a/exporter/splunkhecexporter/config_test.go +++ b/exporter/splunkhecexporter/config_test.go @@ -109,6 +109,19 @@ func TestLoadConfig(t *testing.T) { }, HealthPath: "/services/collector/health", HecHealthCheckEnabled: false, + HecHeartbeat: HecHeartbeat{ + Interval: 30 * time.Second, + }, + HecTelemetry: HecTelemetry{ + Enabled: true, + OverrideMetricsNames: map[string]string{ + "otelcol_exporter_splunkhec_heartbeat_sent": "app_heartbeats_success_total", + "otelcol_exporter_splunkhec_heartbeat_failed": "app_heartbeats_failed_total", + }, + ExtraAttributes: map[string]string{ + "customKey": "customVal", + }, + }, }, }, } diff --git a/exporter/splunkhecexporter/factory.go b/exporter/splunkhecexporter/factory.go index 1d98e3b8ba35..c47a6272e2f9 100644 --- a/exporter/splunkhecexporter/factory.go +++ b/exporter/splunkhecexporter/factory.go @@ -94,6 +94,11 @@ func createDefaultConfig() component.Config { HealthPath: splunk.DefaultHealthPath, HecHealthCheckEnabled: false, ExportRaw: false, + HecTelemetry: HecTelemetry{ + Enabled: false, + OverrideMetricsNames: map[string]string{}, + ExtraAttributes: map[string]string{}, + }, } } diff --git a/exporter/splunkhecexporter/factory_test.go b/exporter/splunkhecexporter/factory_test.go index 4200988acaae..310d8737f9d2 100644 --- a/exporter/splunkhecexporter/factory_test.go +++ b/exporter/splunkhecexporter/factory_test.go @@ -39,6 +39,7 @@ func TestCreateMetricsExporter(t *testing.T) { params := exportertest.NewNopCreateSettings() _, err := createMetricsExporter(context.Background(), params, cfg) assert.NoError(t, err) + assert.NoError(t, err) } func TestCreateTracesExporter(t *testing.T) { diff --git a/exporter/splunkhecexporter/go.mod b/exporter/splunkhecexporter/go.mod index 5ce70f36f1b3..b194e2e03304 100644 --- a/exporter/splunkhecexporter/go.mod +++ b/exporter/splunkhecexporter/go.mod @@ -9,6 +9,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.75.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.75.0 github.com/stretchr/testify v1.8.2 + go.opencensus.io v0.24.0 go.opentelemetry.io/collector v0.75.0 go.opentelemetry.io/collector/component v0.75.0 go.opentelemetry.io/collector/confmap v0.75.0 diff --git a/exporter/splunkhecexporter/heartbeat.go b/exporter/splunkhecexporter/heartbeat.go new file mode 100644 index 000000000000..8df954707c63 --- /dev/null +++ b/exporter/splunkhecexporter/heartbeat.go @@ -0,0 +1,182 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package splunkhecexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter" + +import ( + "context" + "fmt" + "os" + "runtime" + "sync" + "time" + + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/plog" +) + +const ( + metricsPrefix = "otelcol_exporter_splunkhec_heartbeat" + defaultHBSentMetricsName = metricsPrefix + "_sent" + defaultHBFailedMetricsName = metricsPrefix + "_failed" +) + +type heartbeater struct { + config *Config + pushLogFn func(ctx context.Context, ld plog.Logs) error + hbRunOnce sync.Once + hbDoneChan chan struct{} + + // Observability + heartbeatSuccessTotal *stats.Int64Measure + heartbeatErrorTotal *stats.Int64Measure + tagMutators []tag.Mutator +} + +func getMetricsName(overrides map[string]string, metricName string) string { + if name, ok := overrides[metricName]; ok { + return name + } + return metricName +} + +func newHeartbeater(config *Config, pushLogFn func(ctx context.Context, ld plog.Logs) error) *heartbeater { + interval := config.HecHeartbeat.Interval + if interval == 0 { + return nil + } + + var heartbeatSentTotal, heartbeatFailedTotal *stats.Int64Measure + var tagMutators []tag.Mutator + if config.HecTelemetry.Enabled { + overrides := config.HecTelemetry.OverrideMetricsNames + extraAttributes := config.HecTelemetry.ExtraAttributes + var tags []tag.Key + tagMutators = []tag.Mutator{} + for key, val := range extraAttributes { + newTag, _ := tag.NewKey(key) + tags = append(tags, newTag) + tagMutators = append(tagMutators, tag.Insert(newTag, val)) + } + + heartbeatSentTotal = stats.Int64( + getMetricsName(overrides, defaultHBSentMetricsName), + "number of heartbeats made to the destination", + stats.UnitDimensionless) + + heartbeatSuccessTotalView := &view.View{ + Name: heartbeatSentTotal.Name(), + Description: heartbeatSentTotal.Description(), + TagKeys: tags, + Measure: heartbeatSentTotal, + Aggregation: view.Sum(), + } + + heartbeatFailedTotal = stats.Int64( + getMetricsName(overrides, defaultHBFailedMetricsName), + "number of heartbeats made to destination that failed", + stats.UnitDimensionless) + + heartbeatErrorTotalView := &view.View{ + Name: heartbeatFailedTotal.Name(), + Description: heartbeatFailedTotal.Description(), + TagKeys: tags, + Measure: heartbeatFailedTotal, + Aggregation: view.Sum(), + } + + if err := view.Register(heartbeatSuccessTotalView, heartbeatErrorTotalView); err != nil { + return nil + } + } + + return &heartbeater{ + config: config, + pushLogFn: pushLogFn, + hbDoneChan: make(chan struct{}), + heartbeatSuccessTotal: heartbeatSentTotal, + heartbeatErrorTotal: heartbeatFailedTotal, + tagMutators: tagMutators, + } +} + +func (h *heartbeater) shutdown() { + close(h.hbDoneChan) +} + +func (h *heartbeater) initHeartbeat(buildInfo component.BuildInfo) { + interval := h.config.HecHeartbeat.Interval + if interval == 0 { + return + } + + h.hbRunOnce.Do(func() { + heartbeatLog := h.generateHeartbeatLog(buildInfo) + go func() { + ticker := time.NewTicker(interval) + for { + select { + case <-h.hbDoneChan: + return + case <-ticker.C: + err := h.pushLogFn(context.Background(), heartbeatLog) + h.observe(err) + } + } + }() + }) +} + +// there is only use case for open census metrics recording for now. Extend to use open telemetry in the future. +func (h *heartbeater) observe(err error) { + if !h.config.HecTelemetry.Enabled { + return + } + + var counter *stats.Int64Measure + if err == nil { + counter = h.heartbeatSuccessTotal + } else { + counter = h.heartbeatErrorTotal + } + _ = stats.RecordWithTags(context.Background(), h.tagMutators, counter.M(1)) +} + +func (h *heartbeater) generateHeartbeatLog(buildInfo component.BuildInfo) plog.Logs { + host, err := os.Hostname() + if err != nil { + host = "unknownhost" + } + + ret := plog.NewLogs() + resourceLogs := ret.ResourceLogs().AppendEmpty() + + resourceAttrs := resourceLogs.Resource().Attributes() + resourceAttrs.PutStr(h.config.HecToOtelAttrs.Index, "_internal") + resourceAttrs.PutStr(h.config.HecToOtelAttrs.Source, "otelcol") + resourceAttrs.PutStr(h.config.HecToOtelAttrs.SourceType, "heartbeat") + resourceAttrs.PutStr(h.config.HecToOtelAttrs.Host, host) + + logRecord := resourceLogs.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + logRecord.Body().SetStr(fmt.Sprintf( + "HeartbeatInfo version=%s description=%s os=%s arch=%s", + buildInfo.Version, + buildInfo.Description, + runtime.GOOS, + runtime.GOARCH)) + return ret +} diff --git a/exporter/splunkhecexporter/heartbeat_test.go b/exporter/splunkhecexporter/heartbeat_test.go new file mode 100644 index 000000000000..0cdd81810c8f --- /dev/null +++ b/exporter/splunkhecexporter/heartbeat_test.go @@ -0,0 +1,168 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package splunkhecexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter" + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/plog" +) + +const ( + metricLabelKey = "customKey" + metricLabelVal = "customVal" +) + +func createTestConfig(metricsOverrides map[string]string, enableMetrics bool) *Config { + config := NewFactory().CreateDefaultConfig().(*Config) + config.HecHeartbeat = HecHeartbeat{ + Interval: 10 * time.Millisecond, + } + config.HecTelemetry = HecTelemetry{ + Enabled: enableMetrics, + OverrideMetricsNames: metricsOverrides, + ExtraAttributes: map[string]string{ + metricLabelKey: metricLabelVal, + }, + } + return config +} + +func initHeartbeater(t *testing.T, metricsOverrides map[string]string, enableMetrics bool, consumeFn func(ctx context.Context, ld plog.Logs) error) { + config := createTestConfig(metricsOverrides, enableMetrics) + hbter := newHeartbeater(config, consumeFn) + hbter.initHeartbeat(component.NewDefaultBuildInfo()) + t.Cleanup(func() { + hbter.shutdown() + }) +} + +func assertHeartbeatInfoLog(t *testing.T, l plog.Logs) { + assert.Equal(t, 1, l.ResourceLogs().Len()) + assert.Contains(t, l.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().AsString(), "HeartbeatInfo") +} + +func getMetricValue(metricName string) []float64 { + viewData, _ := view.RetrieveData(metricName) + var ret []float64 + if len(viewData) > 0 { + for _, data := range viewData { + ret = append(ret, data.Data.(*view.SumData).Value) + } + } + return ret +} + +func getTags(metricName string) [][]tag.Tag { + viewData, _ := view.RetrieveData(metricName) + var ret [][]tag.Tag + if len(viewData) > 0 { + for _, data := range viewData { + ret = append(ret, data.Tags) + } + } + return ret +} + +func resetMetrics(metricsNames ...string) { + for _, metricsName := range metricsNames { + if v := view.Find(metricsName); v != nil { + view.Unregister(v) + } + } +} + +func Test_newHeartbeater_disabled(t *testing.T) { + config := createTestConfig(map[string]string{}, false) + config.HecHeartbeat.Interval = 0 + hb := newHeartbeater(config, func(ctx context.Context, ld plog.Logs) error { + return nil + }) + assert.Nil(t, hb) +} + +func Test_Heartbeat_success(t *testing.T) { + tests := []struct { + metricsOverrides map[string]string + enableMetrics bool + }{ + { + metricsOverrides: map[string]string{}, + enableMetrics: false, + }, + { + metricsOverrides: map[string]string{ + defaultHBSentMetricsName: "app_heartbeat_success_total", + }, + enableMetrics: true, + }, + { + metricsOverrides: map[string]string{}, + enableMetrics: false, + }, + } + + for _, tt := range tests { + consumeLogsChan := make(chan plog.Logs, 10) + consumeFn := func(ctx context.Context, ld plog.Logs) error { + consumeLogsChan <- ld + return nil + } + initHeartbeater(t, tt.metricsOverrides, true, consumeFn) + + assert.Eventually(t, func() bool { + return len(consumeLogsChan) != 0 + }, time.Second, 10*time.Millisecond) + + logs := <-consumeLogsChan + assertHeartbeatInfoLog(t, logs) + + if tt.enableMetrics { + sentMetricsName := getMetricsName(tt.metricsOverrides, defaultHBSentMetricsName) + failedMetricsName := getMetricsName(tt.metricsOverrides, defaultHBFailedMetricsName) + + assert.Eventually(t, func() bool { + return len(getMetricValue(sentMetricsName)) != 0 + }, time.Second, 10*time.Millisecond) + assert.Greater(t, getMetricValue(sentMetricsName)[0], float64(0), "there should be at least one success metric datapoint") + metricLabelKeyTag, _ := tag.NewKey(metricLabelKey) + assert.Equal(t, []tag.Tag{{Key: metricLabelKeyTag, Value: metricLabelVal}}, getTags(sentMetricsName)[0]) + + resetMetrics(sentMetricsName, failedMetricsName) + } + } +} + +func Test_Heartbeat_failure(t *testing.T) { + resetMetrics() + consumeFn := func(ctx context.Context, ld plog.Logs) error { + return errors.New("always error") + } + initHeartbeater(t, map[string]string{}, true, consumeFn) + + assert.Eventually(t, func() bool { + return len(getMetricValue(defaultHBFailedMetricsName)) != 0 + }, time.Second, 10*time.Millisecond) + assert.Greater(t, getMetricValue(defaultHBFailedMetricsName)[0], float64(0), "there should be at least one failure metric datapoint") + metricLabelKeyTag, _ := tag.NewKey(metricLabelKey) + assert.Equal(t, []tag.Tag{{Key: metricLabelKeyTag, Value: metricLabelVal}}, getTags(defaultHBFailedMetricsName)[0]) +} diff --git a/exporter/splunkhecexporter/testdata/config.yaml b/exporter/splunkhecexporter/testdata/config.yaml index 69533892b73f..f78def12c68b 100644 --- a/exporter/splunkhecexporter/testdata/config.yaml +++ b/exporter/splunkhecexporter/testdata/config.yaml @@ -36,3 +36,12 @@ splunk_hec/allsettings: otel_to_hec_fields: severity_text: "myseverityfield" severity_number: "myseveritynumfield" + hec_heartbeat: + interval: 30s + telemetry: + enabled: true + override_metrics_names: + otelcol_exporter_splunkhec_heartbeat_sent: app_heartbeats_success_total + otelcol_exporter_splunkhec_heartbeat_failed: app_heartbeats_failed_total + extra_attributes: + customKey: customVal