Skip to content

Commit

Permalink
Add ResourceAttributes to Splunk Event (open-telemetry#2843)
Browse files Browse the repository at this point in the history
**Description:** 
Fixes open-telemetry#2799

**Testing:** 
```
ResourceLog #6
Resource labels:
     -> k8s.container.name: STRING(loggen)
     -> k8s.pod.uid: STRING(f7880852-38f9-4158-b37a-41b3436a3b95)
     -> k8s.namespace.name: STRING(default)
     -> k8s.pod.startTime: STRING(2021-03-24 23:18:27 +0000 UTC)
     -> k8s.node.name: STRING(ip-192-168-68-130.us-west-1.compute.internal)
     -> k8s.pod.labels.hello: STRING(world)
     -> k8s.pod.labels.app: STRING(appName)
     -> k8s.pod.annotations.splunk.com/index: STRING(main)
     -> host.name: STRING(ip-192-168-68-130.us-west-1.compute.internal)
     -> com.splunk.sourcetype: STRING(loggen)
     -> com.splunk.index: STRING(main)
InstrumentationLibraryLogs #0
InstrumentationLibrary
LogRecord #0
Timestamp: 1616627909498109658
Severity: Undefined
ShortName:
Body: {
     -> log: STRING(EPS: 99
)
}
Attributes:
     -> k8s.namespace.name: STRING(default)
     -> k8s.pod.name: STRING(loggen-65dvk)
     -> run_id: STRING(0)
     -> k8s.container.name: STRING(loggen)
```

![image](https://user-images.githubusercontent.com/12387289/112396117-d7f79780-8cbc-11eb-879f-07bb1eb4413a.png)
  • Loading branch information
rockb1017 authored and pmatyjasek-sumo committed Apr 28, 2021
1 parent c9b874b commit 54b70bc
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 4 deletions.
3 changes: 2 additions & 1 deletion exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld pdata.Logs, send f

var rls = ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
res := rls.At(i).Resource()
ills := rls.At(i).InstrumentationLibraryLogs()
for j := 0; j < ills.Len(); j++ {
logs := ills.At(j).Logs()
Expand All @@ -194,7 +195,7 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld pdata.Logs, send f
}

// Parsing log record to Splunk event.
event := mapLogRecordToSplunkEvent(logs.At(k), c.config, c.logger)
event := mapLogRecordToSplunkEvent(res, logs.At(k), c.config, c.logger)
// JSON encoding event and writing to buffer.
if err = encoder.Encode(event); err != nil {
permanentErrors = append(permanentErrors, consumererror.Permanent(fmt.Errorf("dropped log event: %v, error: %v", event, err)))
Expand Down
18 changes: 17 additions & 1 deletion exporter/splunkhecexporter/logdata_to_splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,28 @@ func (i *logIndex) zero() bool {
return i.resource == 0 && i.library == 0 && i.record == 0
}

func mapLogRecordToSplunkEvent(lr pdata.LogRecord, config *Config, logger *zap.Logger) *splunk.Event {
func mapLogRecordToSplunkEvent(res pdata.Resource, lr pdata.LogRecord, config *Config, logger *zap.Logger) *splunk.Event {
host := unknownHostName
source := config.Source
sourcetype := config.SourceType
index := config.Index
fields := map[string]interface{}{}
res.Attributes().ForEach(func(k string, v pdata.AttributeValue) {
switch k {
case conventions.AttributeHostName:
host = v.StringVal()
fields[k] = v.StringVal()
case conventions.AttributeServiceName:
source = v.StringVal()
fields[k] = v.StringVal()
case splunk.SourcetypeLabel:
sourcetype = v.StringVal()
case splunk.IndexLabel:
index = v.StringVal()
default:
fields[k] = convertAttributeValue(v, logger)
}
})
lr.Attributes().ForEach(func(k string, v pdata.AttributeValue) {
switch k {
case conventions.AttributeHostName:
Expand Down
48 changes: 46 additions & 2 deletions exporter/splunkhecexporter/logdata_to_splunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func Test_mapLogRecordToSplunkEvent(t *testing.T) {
tests := []struct {
name string
logRecordFn func() pdata.LogRecord
logResourceFn func() pdata.Resource
configDataFn func() *Config
wantSplunkEvents []*splunk.Event
}{
Expand All @@ -47,6 +48,7 @@ func Test_mapLogRecordToSplunkEvent(t *testing.T) {
logRecord.SetTimestamp(ts)
return logRecord
},
logResourceFn: pdata.NewResource,
configDataFn: func() *Config {
return &Config{
Source: "source",
Expand All @@ -70,6 +72,7 @@ func Test_mapLogRecordToSplunkEvent(t *testing.T) {
logRecord.SetTimestamp(ts)
return logRecord
},
logResourceFn: pdata.NewResource,
configDataFn: func() *Config {
return &Config{
Source: "source",
Expand All @@ -89,6 +92,7 @@ func Test_mapLogRecordToSplunkEvent(t *testing.T) {
logRecord.SetTimestamp(ts)
return logRecord
},
logResourceFn: pdata.NewResource,
configDataFn: func() *Config {
return &Config{
Source: "source",
Expand All @@ -105,6 +109,7 @@ func Test_mapLogRecordToSplunkEvent(t *testing.T) {
logRecord := pdata.NewLogRecord()
return logRecord
},
logResourceFn: pdata.NewResource,
configDataFn: func() *Config {
return &Config{
Source: "source",
Expand All @@ -127,6 +132,7 @@ func Test_mapLogRecordToSplunkEvent(t *testing.T) {
logRecord.SetTimestamp(ts)
return logRecord
},
logResourceFn: pdata.NewResource,
configDataFn: func() *Config {
return &Config{
Source: "source",
Expand All @@ -149,6 +155,7 @@ func Test_mapLogRecordToSplunkEvent(t *testing.T) {
logRecord.SetTimestamp(ts)
return logRecord
},
logResourceFn: pdata.NewResource,
configDataFn: func() *Config {
return &Config{
Source: "source",
Expand All @@ -171,6 +178,7 @@ func Test_mapLogRecordToSplunkEvent(t *testing.T) {
logRecord.SetTimestamp(ts)
return logRecord
},
logResourceFn: pdata.NewResource,
configDataFn: func() *Config {
return &Config{
Source: "source",
Expand All @@ -197,6 +205,7 @@ func Test_mapLogRecordToSplunkEvent(t *testing.T) {
logRecord.SetTimestamp(ts)
return logRecord
},
logResourceFn: pdata.NewResource,
configDataFn: func() *Config {
return &Config{
Source: "source",
Expand All @@ -220,6 +229,7 @@ func Test_mapLogRecordToSplunkEvent(t *testing.T) {
logRecord.SetTimestamp(ts)
return logRecord
},
logResourceFn: pdata.NewResource,
configDataFn: func() *Config {
return &Config{
Source: "source",
Expand All @@ -246,6 +256,7 @@ func Test_mapLogRecordToSplunkEvent(t *testing.T) {
logRecord.SetTimestamp(ts)
return logRecord
},
logResourceFn: pdata.NewResource,
configDataFn: func() *Config {
return &Config{
Source: "source",
Expand All @@ -257,11 +268,44 @@ func Test_mapLogRecordToSplunkEvent(t *testing.T) {
"myhost", "myapp", "myapp-type"),
},
},
{
name: "log resource attribute",
logRecordFn: func() pdata.LogRecord {
logRecord := pdata.NewLogRecord()
logRecord.Body().SetStringVal("mylog")
logRecord.SetTimestamp(ts)
return logRecord
},
logResourceFn: func() pdata.Resource {
attr := map[string]pdata.AttributeValue{
"resourceAttr1": pdata.NewAttributeValueString("some_string"),
splunk.SourcetypeLabel: pdata.NewAttributeValueString("myapp-type-from-resource-attr"),
splunk.IndexLabel: pdata.NewAttributeValueString("index-resource"),
conventions.AttributeServiceName: pdata.NewAttributeValueString("myapp-resource"),
conventions.AttributeHostName: pdata.NewAttributeValueString("myhost-resource"),
}
resource := pdata.NewResource()
resource.Attributes().InitFromMap(attr)
return resource
},
configDataFn: func() *Config {
return &Config{}
},
wantSplunkEvents: func() []*splunk.Event {
event := commonLogSplunkEvent("mylog", ts, map[string]interface{}{
"service.name": "myapp-resource", "host.name": "myhost-resource", "resourceAttr1": "some_string",
}, "myhost-resource", "myapp-resource", "myapp-type-from-resource-attr")
event.Index = "index-resource"
return []*splunk.Event{
event,
}
}(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
for _, want := range tt.wantSplunkEvents {
got := mapLogRecordToSplunkEvent(tt.logRecordFn(), tt.configDataFn(), logger)
got := mapLogRecordToSplunkEvent(tt.logResourceFn(), tt.logRecordFn(), tt.configDataFn(), logger)
assert.EqualValues(t, want, got)
}
})
Expand Down Expand Up @@ -297,7 +341,7 @@ func commonLogSplunkEvent(
}

func Test_emptyLogRecord(t *testing.T) {
event := mapLogRecordToSplunkEvent(pdata.NewLogRecord(), &Config{}, zap.NewNop())
event := mapLogRecordToSplunkEvent(pdata.NewResource(), pdata.NewLogRecord(), &Config{}, zap.NewNop())
assert.Nil(t, event.Time)
assert.Equal(t, event.Host, "unknown")
assert.Zero(t, event.Source)
Expand Down

0 comments on commit 54b70bc

Please sign in to comment.