Skip to content

Commit

Permalink
Add heartbeating in SplunkHECExporter
Browse files Browse the repository at this point in the history
  • Loading branch information
splunkericl committed Apr 18, 2023
1 parent 6d35a19 commit 514419e
Show file tree
Hide file tree
Showing 12 changed files with 487 additions and 0 deletions.
16 changes: 16 additions & 0 deletions .chloggen/splunkhecexporter-add-heartbeat.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: splunkhecexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adding an option in splunkhecexporter to enable heartbeat. A heartbeat is a metadata event about the current environment and build information. If heartbeat is enabled, splunkhecexporter will periodically send heartbeat to the destination in given time intervals from configurations.

# One or more tracking issues related to the change
issues: [20225]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
14 changes: 14 additions & 0 deletions exporter/splunkhecexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ The following configuration options can also be configured:
- `otel_to_hec_fields/severity_text` (default = `otel.log.severity.text`): Specifies the name of the field to map the severity text field of log events.
- `otel_to_hec_fields/severity_number` (default = `otel.log.severity.number`): Specifies the name of the field to map the severity number field of log events.
- `otel_to_hec_fields/name` (default = `"otel.log.name`): Specifies the name of the field to map the name field of log events.
- `hec_heartbeat/interval` (no default): Specifies the interval of sending hec heartbeat to the destination. If not specified, heartbeat is not enabled.
- `telemetry/enabled` (default: false): Specifies whether to enable telemetry inside splunk hec exporter.
- `telemetry/override_metrics_names` (default: empty map): Specifies the metrics name to overrides in splunk hec exporter.
- `telemetry/extra_attributes` (default: empty map): Specifies the extra metrics attributes in splunk hec exporter.

In addition, this exporter offers queued retry which is enabled by default.
Information about queued retry configuration parameters can be found
Expand Down Expand Up @@ -115,6 +119,16 @@ exporters:
splunk_app_name: "OpenTelemetry-Collector Splunk Exporter"
# Application version is used to track telemetry information for Splunk App's using HEC by App version.
splunk_app_version: "v0.0.1"
hec_heartbeat:
interval: 30s
telemetry:
enabled: true
override_metrics_names:
otelcol_exporter_splunkhec_heartbeat_sent: app_heartbeats_success_total
otelcol_exporter_splunkhec_heartbeat_failed: app_heartbeats_failed_total
extra_attributes:
dataset_name: SplunkCloudBeaverStack
custom_key: custom_value
```
The full list of settings exposed for this exporter are documented [here](config.go)
Expand Down
13 changes: 13 additions & 0 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"
)

// allow monkey patching for injecting pushLogData function in test
var getPushLogFn = func(c *client) func(ctx context.Context, ld plog.Logs) error {
return c.pushLogData
}

// client sends the data to the splunk backend.
type client struct {
config *Config
Expand All @@ -42,6 +47,7 @@ type client struct {
telemetrySettings component.TelemetrySettings
hecWorker hecWorker
buildInfo component.BuildInfo
heartbeater *heartbeater
}

func (c *client) pushMetricsData(
Expand Down Expand Up @@ -631,6 +637,9 @@ func subTracesByType(src ptrace.Traces, from *index, dst ptrace.Traces) {

func (c *client) stop(context.Context) error {
c.wg.Wait()
if c.heartbeater != nil {
c.heartbeater.shutdown()
}
return nil
}

Expand All @@ -650,6 +659,10 @@ func (c *client) start(ctx context.Context, host component.Host) (err error) {
}
url, _ := c.config.getURL()
c.hecWorker = &defaultHecWorker{url, httpClient, buildHTTPHeaders(c.config, c.buildInfo)}
c.heartbeater = newHeartbeater(c.config, getPushLogFn(c))
if c.heartbeater != nil {
c.heartbeater.initHeartbeat(c.buildInfo)
}
return nil
}

Expand Down
38 changes: 38 additions & 0 deletions exporter/splunkhecexporter/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1513,6 +1513,44 @@ func TestAllowedLogDataTypes(t *testing.T) {
}
}

func Test_heartbeat_success(t *testing.T) {
config := NewFactory().CreateDefaultConfig().(*Config)
config.HecHeartbeat = HecHeartbeat{
Interval: 10 * time.Millisecond,
}

consumeLogsChan := make(chan plog.Logs, 10)
fnBefore := getPushLogFn
t.Cleanup(func() {
getPushLogFn = fnBefore
})
getPushLogFn = func(c *client) func(ctx context.Context, ld plog.Logs) error {
return func(ctx context.Context, ld plog.Logs) error {
consumeLogsChan <- ld
return nil
}
}

c := client{
config: config,
logger: zaptest.NewLogger(t),
hecWorker: &defaultHecWorker{&url.URL{Scheme: "http", Host: "splunk"}, http.DefaultClient, buildHTTPHeaders(config, component.NewDefaultBuildInfo())},
}

err := c.start(context.Background(), componenttest.NewNopHost())
t.Cleanup(func() {
_ = c.stop(context.Background())
})
assert.NoError(t, err)

assert.Eventually(t, func() bool {
return len(consumeLogsChan) != 0
}, time.Second, 10*time.Millisecond)

logs := <-consumeLogsChan
assertHeartbeatInfoLog(t, logs)
}

func TestSubLogs(t *testing.T) {
// Creating 12 logs (2 resources x 2 libraries x 3 records)
logs := createLogData(2, 2, 3)
Expand Down
27 changes: 27 additions & 0 deletions exporter/splunkhecexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"net/url"
"path"
"time"

"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configopaque"
Expand Down Expand Up @@ -48,6 +49,26 @@ type OtelToHecFields struct {
SeverityNumber string `mapstructure:"severity_number"`
}

// HecHeartbeat defines the heartbeat information for the exporter
type HecHeartbeat struct {
// Interval represents the time interval for the heartbeat interval. If nothing or 0 is set,
// heartbeat is not enabled.
// A heartbeat is an event sent to _internal index with metadata for the current collector/host.
Interval time.Duration `mapstructure:"interval"`
}

// HecTelemetry defines the telemetry configuration for the exporter
type HecTelemetry struct {
// Enabled is the bool to enable telemetry inside splunk hec exporter
Enabled bool `mapstructure:"enabled"`

// OverrideMetricsNames is the map to override metrics for internal metrics in splunk hec exporter
OverrideMetricsNames map[string]string `mapstructure:"override_metrics_names"`

// ExtraAttributes is the extra attributes for metrics inside splunk hex exporter
ExtraAttributes map[string]string `mapstructure:"extra_attributes"`
}

// Config defines configuration for Splunk exporter.
type Config struct {
confighttp.HTTPClientSettings `mapstructure:",squash"`
Expand Down Expand Up @@ -117,6 +138,12 @@ type Config struct {

// UseMultiMetricFormat combines metric events to save space during ingestion.
UseMultiMetricFormat bool `mapstructure:"use_multi_metric_format"`

// HecHeartbeat is the configuration to enable heartbeat
HecHeartbeat HecHeartbeat `mapstructure:"hec_heartbeat"`

// HecTelemetry is the configuration for splunk hec exporter telemetry
HecTelemetry HecTelemetry `mapstructure:"telemetry"`
}

func (cfg *Config) getURL() (out *url.URL, err error) {
Expand Down
13 changes: 13 additions & 0 deletions exporter/splunkhecexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,19 @@ func TestLoadConfig(t *testing.T) {
},
HealthPath: "/services/collector/health",
HecHealthCheckEnabled: false,
HecHeartbeat: HecHeartbeat{
Interval: 30 * time.Second,
},
HecTelemetry: HecTelemetry{
Enabled: true,
OverrideMetricsNames: map[string]string{
"otelcol_exporter_splunkhec_heartbeat_sent": "app_heartbeats_success_total",
"otelcol_exporter_splunkhec_heartbeat_failed": "app_heartbeats_failed_total",
},
ExtraAttributes: map[string]string{
"customKey": "customVal",
},
},
},
},
}
Expand Down
5 changes: 5 additions & 0 deletions exporter/splunkhecexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func createDefaultConfig() component.Config {
HealthPath: splunk.DefaultHealthPath,
HecHealthCheckEnabled: false,
ExportRaw: false,
HecTelemetry: HecTelemetry{
Enabled: false,
OverrideMetricsNames: map[string]string{},
ExtraAttributes: map[string]string{},
},
}
}

Expand Down
1 change: 1 addition & 0 deletions exporter/splunkhecexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestCreateMetricsExporter(t *testing.T) {
params := exportertest.NewNopCreateSettings()
_, err := createMetricsExporter(context.Background(), params, cfg)
assert.NoError(t, err)
assert.NoError(t, err)
}

func TestCreateTracesExporter(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions exporter/splunkhecexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.75.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.75.0
github.com/stretchr/testify v1.8.2
go.opencensus.io v0.24.0
go.opentelemetry.io/collector v0.75.0
go.opentelemetry.io/collector/component v0.75.0
go.opentelemetry.io/collector/confmap v0.75.0
Expand Down
Loading

0 comments on commit 514419e

Please sign in to comment.