Skip to content

Commit

Permalink
splunkhec Exporter: Don't send 'zero' timestamps to Splunk HEC. (#1157)
Browse files Browse the repository at this point in the history
  • Loading branch information
nebffa authored Oct 29, 2020
1 parent 68d65d7 commit f305e19
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 40 deletions.
20 changes: 10 additions & 10 deletions exporter/splunkhecexporter/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,11 @@ func createMetricsData(numberOfDataPoints int) pdata.Metrics {
keys := []string{"k0", "k1"}
values := []string{"v0", "v1"}

unixSecs := int64(1574092046)
unixNSecs := int64(11 * time.Millisecond)
doubleVal := 1234.5678
var metrics []*metricspb.Metric
for i := 0; i < numberOfDataPoints; i++ {
tsUnix := time.Unix(unixSecs+int64(i), unixNSecs)
tsUnix := time.Unix(int64(i), int64(i)*time.Millisecond.Nanoseconds())

doublePt := metricstestutil.Double(tsUnix, doubleVal)
metric := metricstestutil.Gauge("gauge_double_with_dims", keys, metricstestutil.Timeseries(tsUnix, values, doublePt))
metrics = append(metrics, metric)
Expand Down Expand Up @@ -112,8 +111,8 @@ func createLogData(numberOfLogs int) pdata.Logs {
ill.InitEmpty()
rl.InstrumentationLibraryLogs().Append(ill)

ts := pdata.TimestampUnixNano(123)
for i := 0; i < numberOfLogs; i++ {
ts := pdata.TimestampUnixNano(int64(i) * time.Millisecond.Nanoseconds())
logRecord := pdata.NewLogRecord()
logRecord.InitEmpty()
logRecord.Body().SetStringVal("mylog")
Expand All @@ -122,6 +121,7 @@ func createLogData(numberOfLogs int) pdata.Logs {
logRecord.Attributes().InsertString(conventions.AttributeHostHostname, "myhost")
logRecord.Attributes().InsertString("custom", "custom")
logRecord.SetTimestamp(ts)

ill.Logs().Append(logRecord)
}

Expand Down Expand Up @@ -280,23 +280,23 @@ func TestReceiveTraces(t *testing.T) {
func TestReceiveLogs(t *testing.T) {
actual, err := runLogExport(true, 3, t)
assert.NoError(t, err)
expected := `{"time":0,"host":"myhost","source":"myapp","sourcetype":"myapp-type","event":"mylog","fields":{"custom":"custom"}}`
expected := `{"host":"myhost","source":"myapp","sourcetype":"myapp-type","event":"mylog","fields":{"custom":"custom"}}`
expected += "\n\r\n\r\n"
expected += `{"time":0,"host":"myhost","source":"myapp","sourcetype":"myapp-type","event":"mylog","fields":{"custom":"custom"}}`
expected += `{"time":0.001,"host":"myhost","source":"myapp","sourcetype":"myapp-type","event":"mylog","fields":{"custom":"custom"}}`
expected += "\n\r\n\r\n"
expected += `{"time":0,"host":"myhost","source":"myapp","sourcetype":"myapp-type","event":"mylog","fields":{"custom":"custom"}}`
expected += `{"time":0.002,"host":"myhost","source":"myapp","sourcetype":"myapp-type","event":"mylog","fields":{"custom":"custom"}}`
expected += "\n\r\n\r\n"
assert.Equal(t, expected, actual)
}

func TestReceiveMetrics(t *testing.T) {
actual, err := runMetricsExport(true, 3, t)
assert.NoError(t, err)
expected := `{"time":1574092046.011,"host":"unknown","event":"metric","fields":{"k/n0":"vn0","k/n1":"vn1","k/r0":"vr0","k/r1":"vr1","k0":"v0","k1":"v1","metric_name:gauge_double_with_dims":1234.5678}}`
expected := `{"host":"unknown","event":"metric","fields":{"k/n0":"vn0","k/n1":"vn1","k/r0":"vr0","k/r1":"vr1","k0":"v0","k1":"v1","metric_name:gauge_double_with_dims":1234.5678}}`
expected += "\n\r\n\r\n"
expected += `{"time":1574092047.011,"host":"unknown","event":"metric","fields":{"k/n0":"vn0","k/n1":"vn1","k/r0":"vr0","k/r1":"vr1","k0":"v0","k1":"v1","metric_name:gauge_double_with_dims":1234.5678}}`
expected += `{"time":1.001,"host":"unknown","event":"metric","fields":{"k/n0":"vn0","k/n1":"vn1","k/r0":"vr0","k/r1":"vr1","k0":"v0","k1":"v1","metric_name:gauge_double_with_dims":1234.5678}}`
expected += "\n\r\n\r\n"
expected += `{"time":1574092048.011,"host":"unknown","event":"metric","fields":{"k/n0":"vn0","k/n1":"vn1","k/r0":"vr0","k/r1":"vr1","k0":"v0","k1":"v1","metric_name:gauge_double_with_dims":1234.5678}}`
expected += `{"time":2.002,"host":"unknown","event":"metric","fields":{"k/n0":"vn0","k/n1":"vn1","k/r0":"vr0","k/r1":"vr1","k0":"v0","k1":"v1","metric_name:gauge_double_with_dims":1234.5678}}`
expected += "\n\r\n\r\n"
assert.Equal(t, expected, actual)
}
Expand Down
14 changes: 12 additions & 2 deletions exporter/splunkhecexporter/logdata_to_splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,16 @@ func convertAttributeValue(value pdata.AttributeValue, logger *zap.Logger) inter
}

// nanoTimestampToEpochMilliseconds transforms nanoseconds into <sec>.<ms>. For example, 1433188255.500 indicates 1433188255 seconds and 500 milliseconds after epoch.
func nanoTimestampToEpochMilliseconds(ts pdata.TimestampUnixNano) float64 {
return time.Duration(ts).Round(time.Millisecond).Seconds()
func nanoTimestampToEpochMilliseconds(ts pdata.TimestampUnixNano) *float64 {
duration := time.Duration(ts)
if duration == 0 {
// some telemetry sources send data with timestamps set to 0 by design, as their original target destinations
// (i.e. before Open Telemetry) are setup with the know-how on how to consume them. In this case,
// we want to omit the time field when sending data to the Splunk HEC so that the HEC adds a timestamp
// at indexing time, which will be much more useful than a 0-epoch-time value.
return nil
}

val := duration.Round(time.Millisecond).Seconds()
return &val
}
6 changes: 4 additions & 2 deletions exporter/splunkhecexporter/logdata_to_splunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,9 @@ func Test_nilInstrumentationLogs(t *testing.T) {

func Test_nanoTimestampToEpochMilliseconds(t *testing.T) {
splunkTs := nanoTimestampToEpochMilliseconds(1001000000)
assert.Equal(t, 1.001, splunkTs)
assert.Equal(t, 1.001, *splunkTs)
splunkTs = nanoTimestampToEpochMilliseconds(1001990000)
assert.Equal(t, 1.002, splunkTs)
assert.Equal(t, 1.002, *splunkTs)
splunkTs = nanoTimestampToEpochMilliseconds(0)
assert.True(t, nil == splunkTs)
}
19 changes: 16 additions & 3 deletions exporter/splunkhecexporter/metricdata_to_splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,24 @@ func metricDataToSplunk(logger *zap.Logger, data pdata.Metrics, config *Config)
return splunkMetrics, numDroppedTimeSeries, nil
}

func timestampToEpochMilliseconds(ts *timestamppb.Timestamp) float64 {
func timestampToEpochMilliseconds(ts *timestamppb.Timestamp) *float64 {
if ts == nil {
return 0
return nil
}
return float64(ts.GetSeconds()) + math.Round(float64(ts.GetNanos())/1e6)/1e3

seconds := ts.GetSeconds()
nanos := ts.GetNanos()
if seconds == 0 && nanos == 0 {
// some telemetry sources send data with timestamps set to 0 by design, as their original target destinations
// (i.e. before Open Telemetry) are setup with the know-how on how to consume them. In this case,
// we want to omit the time field when sending data to the Splunk HEC so that the HEC adds a timestamp
// at indexing time, which will be much more useful than a 0-epoch-time value.
return nil
}

val := float64(seconds) + math.Round(float64(nanos)/1e6)/1e3

return &val
}

func mapValues(logger *zap.Logger, metric *metricspb.Metric, value interface{}) (map[string]interface{}, error) {
Expand Down
13 changes: 9 additions & 4 deletions exporter/splunkhecexporter/metricdata_to_splunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func getFieldValue(metric *splunk.Event) string {

func commonSplunkMetric(
metricName string,
ts float64,
ts *float64,
keys []string,
values []string,
val interface{},
Expand All @@ -213,7 +213,7 @@ func commonSplunkMetric(

func expectedFromDistribution(
metricName string,
ts float64,
ts *float64,
keys []string,
values []string,
distributionTimeSeries *metricspb.TimeSeries,
Expand Down Expand Up @@ -250,10 +250,15 @@ func expectedFromDistribution(

func TestTimestampFormat(t *testing.T) {
ts := timestamppb.Timestamp{Seconds: 32, Nanos: 1000345}
assert.Equal(t, 32.001, timestampToEpochMilliseconds(&ts))
assert.Equal(t, 32.001, *timestampToEpochMilliseconds(&ts))
}

func TestTimestampFormatRounding(t *testing.T) {
ts := timestamppb.Timestamp{Seconds: 32, Nanos: 1999345}
assert.Equal(t, 32.002, timestampToEpochMilliseconds(&ts))
assert.Equal(t, 32.002, *timestampToEpochMilliseconds(&ts))
}

func TestNilTimeWhenTimestampIsZero(t *testing.T) {
ts := timestamppb.Timestamp{Seconds: 0, Nanos: 0}
assert.True(t, nil == timestampToEpochMilliseconds(&ts))
}
2 changes: 1 addition & 1 deletion internal/splunk/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type AccessTokenPassthroughConfig struct {

// Event represents a metric in Splunk HEC format
type Event struct {
Time float64 `json:"time"` // epoch time
Time *float64 `json:"time,omitempty"` // optional epoch time - set to nil if the event timestamp is missing or unknown
Host string `json:"host"` // hostname
Source string `json:"source,omitempty"` // optional description of the source of the event; typically the app's name
SourceType string `json:"sourcetype,omitempty"` // optional name of a Splunk parsing configuration; this is usually inferred by Splunk
Expand Down
4 changes: 2 additions & 2 deletions receiver/splunkhecreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ func Test_splunkhecReceiver_AccessTokenPassthrough(t *testing.T) {

func buildSplunkHecMetricsMsg(time float64, value int64, dimensions uint) *splunk.Event {
ev := &splunk.Event{
Time: time,
Time: &time,
Event: "metric",
Fields: map[string]interface{}{
"metric_name:foo": value,
Expand All @@ -565,7 +565,7 @@ func buildSplunkHecMetricsMsg(time float64, value int64, dimensions uint) *splun

func buildSplunkHecMsg(time float64, value string, dimensions uint) *splunk.Event {
ev := &splunk.Event{
Time: time,
Time: &time,
Event: value,
Fields: map[string]interface{}{},
}
Expand Down
4 changes: 3 additions & 1 deletion receiver/splunkhecreceiver/splunk_to_logdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ func SplunkHecToLogData(logger *zap.Logger, events []*splunk.Event, resourceCust

// Splunk timestamps are in seconds so convert to nanos by multiplying
// by 1 billion.
logRecord.SetTimestamp(pdata.TimestampUnixNano(event.Time * 1e9))
if event.Time != nil {
logRecord.SetTimestamp(pdata.TimestampUnixNano(*event.Time * 1e9))
}

rl.Resource().InitEmpty()
attrs := rl.Resource().Attributes()
Expand Down
41 changes: 31 additions & 10 deletions receiver/splunkhecreceiver/splunk_to_logdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (

func Test_SplunkHecToLogData(t *testing.T) {

time := 0.123
nanoseconds := 123000000

tests := []struct {
name string
event splunk.Event
Expand All @@ -35,7 +38,7 @@ func Test_SplunkHecToLogData(t *testing.T) {
{
name: "happy_path",
event: splunk.Event{
Time: 0.123,
Time: &time,
Host: "localhost",
Source: "mysource",
SourceType: "mysourcetype",
Expand All @@ -46,14 +49,14 @@ func Test_SplunkHecToLogData(t *testing.T) {
},
},
output: func() pdata.ResourceLogsSlice {
return createLogsSlice("value")
return createLogsSlice("value", nanoseconds)
}(),
wantErr: nil,
},
{
name: "double",
event: splunk.Event{
Time: 0.123,
Time: &time,
Host: "localhost",
Source: "mysource",
SourceType: "mysourcetype",
Expand All @@ -64,7 +67,7 @@ func Test_SplunkHecToLogData(t *testing.T) {
},
},
output: func() pdata.ResourceLogsSlice {
logsSlice := createLogsSlice("value")
logsSlice := createLogsSlice("value", nanoseconds)
logsSlice.At(0).InstrumentationLibraryLogs().At(0).Logs().At(0).Body().SetDoubleVal(12.3)
return logsSlice
}(),
Expand All @@ -73,7 +76,7 @@ func Test_SplunkHecToLogData(t *testing.T) {
{
name: "array",
event: splunk.Event{
Time: 0.123,
Time: &time,
Host: "localhost",
Source: "mysource",
SourceType: "mysourcetype",
Expand All @@ -84,7 +87,7 @@ func Test_SplunkHecToLogData(t *testing.T) {
},
},
output: func() pdata.ResourceLogsSlice {
logsSlice := createLogsSlice("value")
logsSlice := createLogsSlice("value", nanoseconds)
arr := pdata.NewAnyValueArray()
arr.Append(pdata.NewAttributeValueString("foo"))
arr.Append(pdata.NewAttributeValueString("bar"))
Expand All @@ -96,7 +99,7 @@ func Test_SplunkHecToLogData(t *testing.T) {
{
name: "complex_structure",
event: splunk.Event{
Time: 0.123,
Time: &time,
Host: "localhost",
Source: "mysource",
SourceType: "mysourcetype",
Expand All @@ -107,7 +110,7 @@ func Test_SplunkHecToLogData(t *testing.T) {
},
},
output: func() pdata.ResourceLogsSlice {
logsSlice := createLogsSlice("value")
logsSlice := createLogsSlice("value", nanoseconds)
attMap := pdata.NewAttributeMap()
foos := pdata.NewAnyValueArray()
foos.Append(pdata.NewAttributeValueString("foo"))
Expand All @@ -123,6 +126,24 @@ func Test_SplunkHecToLogData(t *testing.T) {
}(),
wantErr: nil,
},
{
name: "nil_timestamp",
event: splunk.Event{
Time: new(float64),
Host: "localhost",
Source: "mysource",
SourceType: "mysourcetype",
Index: "myindex",
Event: "value",
Fields: map[string]interface{}{
"foo": "bar",
},
},
output: func() pdata.ResourceLogsSlice {
return createLogsSlice("value", 0)
}(),
wantErr: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -134,7 +155,7 @@ func Test_SplunkHecToLogData(t *testing.T) {
}
}

func createLogsSlice(body string) pdata.ResourceLogsSlice {
func createLogsSlice(body string, nanoseconds int) pdata.ResourceLogsSlice {
lrs := pdata.NewResourceLogsSlice()
lrs.Resize(1)
lr := lrs.At(0)
Expand All @@ -145,7 +166,7 @@ func createLogsSlice(body string) pdata.ResourceLogsSlice {

logRecord.SetName("mysourcetype")
logRecord.Body().SetStringVal(body)
logRecord.SetTimestamp(pdata.TimestampUnixNano(123000000))
logRecord.SetTimestamp(pdata.TimestampUnixNano(nanoseconds))
lr.Resource().Attributes().InsertString("host.hostname", "localhost")
lr.Resource().Attributes().InsertString("service.name", "mysource")
lr.Resource().Attributes().InsertString("com.splunk.sourcetype", "mysourcetype")
Expand Down
7 changes: 4 additions & 3 deletions receiver/splunkhecreceiver/splunkhec_to_metricdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,12 @@ func addDoubleGauge(ts pdata.TimestampUnixNano, value float64, metric pdata.Metr
metric.DoubleGauge().DataPoints().Append(doublePt)
}

func convertTimestamp(sec float64) pdata.TimestampUnixNano {
if sec == 0 {
func convertTimestamp(sec *float64) pdata.TimestampUnixNano {
if sec == nil {
return 0
}
return pdata.TimestampUnixNano(sec * 1e9)

return pdata.TimestampUnixNano(*sec * 1e9)
}

// Extract dimensions from the Splunk event fields to populate metric data point labels.
Expand Down
4 changes: 2 additions & 2 deletions receiver/splunkhecreceiver/splunkhec_to_metricdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func Test_splunkV2ToMetricsData(t *testing.T) {

buildDefaultSplunkDataPt := func() *splunk.Event {
return &splunk.Event{
Time: sec,
Time: &sec,
Host: "localhost",
Source: "source",
SourceType: "sourcetype",
Expand Down Expand Up @@ -233,7 +233,7 @@ func Test_splunkV2ToMetricsData(t *testing.T) {
name: "zero_timestamp",
splunkDataPoint: func() *splunk.Event {
pt := buildDefaultSplunkDataPt()
pt.Time = 0
pt.Time = new(float64)
return pt
}(),
wantMetricsData: func() pdata.Metrics {
Expand Down

0 comments on commit f305e19

Please sign in to comment.