Skip to content

Commit

Permalink
[exporter/awss3] add marshaller for Sumo Logic Installed Collector fo…
Browse files Browse the repository at this point in the history
…rmat

Signed-off-by: Katarzyna Kujawa <[email protected]>
  • Loading branch information
kasia-kujawa committed Jun 14, 2023
1 parent 5a582d9 commit 042f4fb
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 0 deletions.
1 change: 1 addition & 0 deletions exporter/awss3exporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type MarshalerType string

const (
OtlpJSON MarshalerType = "otlp_json"
Sumo MarshalerType = "sumo"
)

// Config contains the main configuration options for the s3 exporter
Expand Down
6 changes: 6 additions & 0 deletions exporter/awss3exporter/marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ func NewMarshaler(mType MarshalerType, logger *zap.Logger) (marshaler, error) {
marshaler.tracesMarshaler = &ptrace.JSONMarshaler{}
marshaler.metricsMarshaler = &pmetric.JSONMarshaler{}
marshaler.fileFormat = "json"
case Sumo:
sumomarshaler := NewSumoMarshaler()
marshaler.logsMarshaler = &sumomarshaler
marshaler.tracesMarshaler = &sumomarshaler
marshaler.metricsMarshaler = &sumomarshaler
marshaler.fileFormat = "json"
default:
return nil, ErrUnknownMarshaler
}
Expand Down
139 changes: 139 additions & 0 deletions exporter/awss3exporter/sumo_marshaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2022, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package awss3exporter

import (
"bytes"
"errors"
"fmt"
"strconv"
"strings"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

type SumoMarshaler struct {
format string
}

func (marshaler *SumoMarshaler) Format() string {
return marshaler.format
}

func NewSumoMarshaler() SumoMarshaler {
return SumoMarshaler{}
}

func logEntry(buf *bytes.Buffer, format string, a ...interface{}) {
buf.WriteString(fmt.Sprintf(format, a...))
buf.WriteString("\n")
}

func attributeValueToString(v pcommon.Value) string {
// TODO: chceck all types and add missing
switch v.Type() {
case pcommon.ValueTypeStr:
return v.Str()
case pcommon.ValueTypeBool:
return strconv.FormatBool(v.Bool())
case pcommon.ValueTypeDouble:
return strconv.FormatFloat(v.Double(), 'f', -1, 64)
case pcommon.ValueTypeInt:
return strconv.FormatInt(v.Int(), 10)
case pcommon.ValueTypeSlice:
return sliceToString(v.Slice())
case pcommon.ValueTypeMap:
return mapToString(v.Map())
default:
return fmt.Sprintf("<Unknown OpenTelemetry attribute value type %q>", v.Type())
}
}

func sliceToString(s pcommon.Slice) string {
var b strings.Builder
b.WriteByte('[')
for i := 0; i < s.Len(); i++ {
if i < s.Len()-1 {
fmt.Fprintf(&b, "%s, ", attributeValueToString(s.At(i)))
} else {
b.WriteString(attributeValueToString(s.At(i)))
}
}

b.WriteByte(']')
return b.String()
}

func mapToString(m pcommon.Map) string {
var b strings.Builder
b.WriteString("{\n")

m.Range(func(k string, v pcommon.Value) bool {
fmt.Fprintf(&b, " -> %s: %s(%s)\n", k, v.Type(), v.AsString())
return true
})
b.WriteByte('}')
return b.String()
}

const (
SourceCategoryKey = "_sourceCategory"
SourceHostKey = "_sourceHost"
SourceNameKey = "log.file.path_resolved"
)

func (SumoMarshaler) MarshalLogs(ld plog.Logs) ([]byte, error) {
buf := bytes.Buffer{}
rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
rl := rls.At(i)
sourceCategory, exists := rl.Resource().Attributes().Get(SourceCategoryKey)
if !exists {
return nil, errors.New("_sourceCategory attribute does not exists")
}
sourceHost, exists := rl.Resource().Attributes().Get(SourceHostKey)
if !exists {
return nil, errors.New("_sourceHost attribute does not exists")
}
ills := rl.ScopeLogs()
for j := 0; j < ills.Len(); j++ {
ils := ills.At(j)
logs := ils.LogRecords()
for k := 0; k < logs.Len(); k++ {
lr := logs.At(k)
dateVal := lr.ObservedTimestamp()
body := attributeValueToString(lr.Body())
sourceName, exists := lr.Attributes().Get(SourceNameKey)
if !exists {
return nil, errors.New("_sourceName attribute does not exists")
}
logEntry(&buf, "{\"data\": \"%s\",\"sourceName\":\"%s\",\"sourceHost\":\"%s\",\"sourceCategory\":\"%s\",\"fields\":{},\"message\":\"%s\"}",
dateVal, attributeValueToString(sourceName), attributeValueToString(sourceHost), attributeValueToString(sourceCategory), body)
}
}
}
return buf.Bytes(), nil
}

func (SumoMarshaler) MarshalTraces(traces ptrace.Traces) ([]byte, error) {
return nil, nil
}

func (SumoMarshaler) MarshalMetrics(md pmetric.Metrics) ([]byte, error) {
return nil, nil
}
102 changes: 102 additions & 0 deletions exporter/awss3exporter/sumo_marshaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2022, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package awss3exporter

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
)

func TestMarshalerMissingAttributes(t *testing.T) {
logs := plog.NewLogs()
rl := logs.ResourceLogs().AppendEmpty()
rl.ScopeLogs().AppendEmpty()
marshaler := &SumoMarshaler{"txt"}
require.NotNil(t, marshaler)
_, err := marshaler.MarshalLogs(logs)
assert.Error(t, err)
}

func TestMarshalerMissingSourceHost(t *testing.T) {
logs := plog.NewLogs()
rls := logs.ResourceLogs().AppendEmpty()
rls.Resource().Attributes().PutStr("_sourceCategory", "testcategory")

marshaler := &SumoMarshaler{"txt"}
require.NotNil(t, marshaler)
_, err := marshaler.MarshalLogs(logs)
assert.Error(t, err)
}

func TestMarshalerMissingScopedLogs(t *testing.T) {
logs := plog.NewLogs()
rls := logs.ResourceLogs().AppendEmpty()
rls.Resource().Attributes().PutStr("_sourceCategory", "testcategory")
rls.Resource().Attributes().PutStr("_sourceHost", "testHost")

marshaler := &SumoMarshaler{"txt"}
require.NotNil(t, marshaler)
_, err := marshaler.MarshalLogs(logs)
assert.NoError(t, err)
}

func TestMarshalerMissingSourceName(t *testing.T) {
logs := plog.NewLogs()
rls := logs.ResourceLogs().AppendEmpty()
rls.Resource().Attributes().PutStr("_sourceCategory", "testcategory")
rls.Resource().Attributes().PutStr("_sourceHost", "testHost")

sl := rls.ScopeLogs().AppendEmpty()
const recordNum = 0

ts := pcommon.Timestamp(int64(recordNum) * time.Millisecond.Nanoseconds())
logRecord := sl.LogRecords().AppendEmpty()
logRecord.Body().SetStr("entry1")
logRecord.SetTimestamp(ts)

marshaler := &SumoMarshaler{"txt"}
require.NotNil(t, marshaler)
_, err := marshaler.MarshalLogs(logs)
assert.Error(t, err)
}

func TestMarshalerOkStructure(t *testing.T) {
logs := plog.NewLogs()
rls := logs.ResourceLogs().AppendEmpty()
rls.Resource().Attributes().PutStr("_sourceCategory", "testcategory")
rls.Resource().Attributes().PutStr("_sourceHost", "testHost")

sl := rls.ScopeLogs().AppendEmpty()
const recordNum = 0

ts := pcommon.Timestamp(int64(recordNum) * time.Millisecond.Nanoseconds())
logRecord := sl.LogRecords().AppendEmpty()
logRecord.Body().SetStr("entry1")
logRecord.Attributes().PutStr("log.file.path_resolved", "testSourceName")
logRecord.SetTimestamp(ts)

marshaler := &SumoMarshaler{"txt"}
require.NotNil(t, marshaler)
buf, err := marshaler.MarshalLogs(logs)
assert.NoError(t, err)
expectedEntry := "{\"data\": \"1970-01-01 00:00:00 +0000 UTC\",\"sourceName\":\"testSourceName\",\"sourceHost\":\"testHost\""
expectedEntry = expectedEntry + ",\"sourceCategory\":\"testcategory\",\"fields\":{},\"message\":\"entry1\"}\n"
assert.Equal(t, string(buf), expectedEntry)
}

0 comments on commit 042f4fb

Please sign in to comment.