Skip to content

Commit

Permalink
sumologicexporter: refactor sender
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Rosiek <[email protected]>
  • Loading branch information
Dominik Rosiek committed Apr 22, 2024
1 parent ce2d146 commit 907a13a
Show file tree
Hide file tree
Showing 8 changed files with 486 additions and 165 deletions.
6 changes: 6 additions & 0 deletions exporter/sumologicexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type sumologicexporter struct {

foundSumologicExtension bool
sumologicExtension *sumologicextension.SumologicExtension

id component.ID
}

func initExporter(cfg *Config, settings component.TelemetrySettings) (*sumologicexporter, error) {
Expand Down Expand Up @@ -295,6 +297,7 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld plog.Logs) err
}
logsURL, metricsURL, tracesURL := se.getDataURLs()
sdr := newSender(
se.logger,
se.config,
se.getHTTPClient(),
se.filter,
Expand All @@ -305,6 +308,7 @@ func (se *sumologicexporter) pushLogsData(ctx context.Context, ld plog.Logs) err
logsURL,
tracesURL,
se.graphiteFormatter,
se.id,
)

// Iterate over ResourceLogs
Expand Down Expand Up @@ -392,6 +396,7 @@ func (se *sumologicexporter) pushMetricsData(ctx context.Context, md pmetric.Met
}
logsURL, metricsURL, tracesURL := se.getDataURLs()
sdr := newSender(
se.logger,
se.config,
se.getHTTPClient(),
se.filter,
Expand All @@ -402,6 +407,7 @@ func (se *sumologicexporter) pushMetricsData(ctx context.Context, md pmetric.Met
logsURL,
tracesURL,
se.graphiteFormatter,
se.id,
)

// Iterate over ResourceMetrics
Expand Down
14 changes: 7 additions & 7 deletions exporter/sumologicexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestAllFailed(t *testing.T) {
logs := logRecordsToLogs(exampleTwoLogs())

err := test.exp.pushLogsData(context.Background(), logs)
assert.EqualError(t, err, "error during sending data: 500 Internal Server Error")
assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error")

var partial consumererror.Logs
require.True(t, errors.As(err, &partial))
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestPartiallyFailed(t *testing.T) {
expected := logRecordsToLogs(records[:1])

err = test.exp.pushLogsData(context.Background(), logs)
assert.EqualError(t, err, "error during sending data: 500 Internal Server Error")
assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error")

var partial consumererror.Logs
require.True(t, errors.As(err, &partial))
Expand Down Expand Up @@ -224,7 +224,7 @@ func TestPushFailedBatch(t *testing.T) {
}

err := test.exp.pushLogsData(context.Background(), logs)
assert.EqualError(t, err, "error during sending data: 500 Internal Server Error")
assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error")
}

func TestAllMetricsSuccess(t *testing.T) {
Expand Down Expand Up @@ -272,7 +272,7 @@ gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1
})

err := test.exp.pushMetricsData(context.Background(), metrics)
assert.EqualError(t, err, "error during sending data: 500 Internal Server Error")
assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error")

var partial consumererror.Metrics
require.True(t, errors.As(err, &partial))
Expand Down Expand Up @@ -309,7 +309,7 @@ gauge_metric_name{foo="bar",remote_name="156955",url="http://another_url"} 245 1
expected := metricPairToMetrics(records[:1])

err := test.exp.pushMetricsData(context.Background(), metrics)
assert.EqualError(t, err, "error during sending data: 500 Internal Server Error")
assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error")

var partial consumererror.Metrics
require.True(t, errors.As(err, &partial))
Expand Down Expand Up @@ -375,7 +375,7 @@ gauge_metric_name{foo="bar",key2="value2",remote_name="156955",url="http://anoth
expected := metricPairToMetrics(records[:1])

err = test.exp.pushMetricsData(context.Background(), metrics)
assert.EqualError(t, err, "error during sending data: 500 Internal Server Error")
assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error")

var partial consumererror.Metrics
require.True(t, errors.As(err, &partial))
Expand Down Expand Up @@ -418,7 +418,7 @@ func TestPushMetricsFailedBatch(t *testing.T) {
}

err := test.exp.pushMetricsData(context.Background(), metrics)
assert.EqualError(t, err, "error during sending data: 500 Internal Server Error")
assert.EqualError(t, err, "failed sending data: status: 500 Internal Server Error")
}

func TestGetSignalUrl(t *testing.T) {
Expand Down
63 changes: 49 additions & 14 deletions exporter/sumologicexporter/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,81 @@
package sumologicexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter"

import (
"fmt"
"sort"
"strings"

"go.opentelemetry.io/collector/pdata/pcommon"
"golang.org/x/exp/slices"
)

// fields represents metadata
type fields struct {
orig pcommon.Map
replacer *strings.Replacer
orig pcommon.Map
initialized bool
}

func newFields(attrMap pcommon.Map) fields {
return fields{
orig: attrMap,
replacer: strings.NewReplacer(",", "_", "=", ":", "\n", "_"),
orig: attrMap,
initialized: true,
}
}

// string returns fields as ordered key=value string with `, ` as separator
func (f fields) string() string {
if !f.initialized {
return ""
}

returnValue := make([]string, 0, f.orig.Len())

f.orig.Range(func(k string, v pcommon.Value) bool {
// Don't add source related attributes to fields as they are handled separately
// and are added to the payload either as special HTTP headers or as resources
// attributes.
if k == attributeKeySourceCategory || k == attributeKeySourceHost || k == attributeKeySourceName {
return true
}

sv := v.AsString()

// Skip empty field
if len(sv) == 0 {
return true
}

key := []byte(k)
f.sanitizeField(key)
value := []byte(sv)
f.sanitizeField(value)
sb := strings.Builder{}
sb.Grow(len(key) + len(value) + 1)
sb.Write(key)
sb.WriteRune('=')
sb.Write(value)

returnValue = append(
returnValue,
fmt.Sprintf(
"%s=%s",
f.sanitizeField(k),
f.sanitizeField(v.AsString()),
),
sb.String(),
)
return true
})
sort.Strings(returnValue)
slices.Sort(returnValue)

return strings.Join(returnValue, ", ")
}

// sanitizeFields sanitize field (key or value) to be correctly parsed by sumologic receiver
func (f fields) sanitizeField(fld string) string {
return f.replacer.Replace(fld)
// It modifies the field in place.
func (f fields) sanitizeField(fld []byte) {
for i := 0; i < len(fld); i++ {
switch fld[i] {
case ',':
fld[i] = '_'
case '=':
fld[i] = ':'
case '\n':
fld[i] = '_'
default:
}
}
}
90 changes: 73 additions & 17 deletions exporter/sumologicexporter/fields_test.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,88 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sumologicexporter
package sumologicexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter"

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
)

func TestFieldsAsString(t *testing.T) {
expected := "key1=value1, key2=value2, key3=value3"
flds := fieldsFromMap(map[string]string{
"key1": "value1",
"key3": "value3",
"key2": "value2",
})
func TestFields(t *testing.T) {
testcases := []struct {
name string
fields map[string]string
expected string
}{
{
name: "string",
fields: map[string]string{
"key1": "value1",
"key3": "value3",
"key2": "value2",
},
expected: "key1=value1, key2=value2, key3=value3",
},
{
name: "sanitization",
fields: map[string]string{
"key1": "value,1",
"key3": "value\n3",
"key=,2": "valu,e=2",
},
expected: "key1=value_1, key3=value_3, key:_2=valu_e:2",
},
{
name: "empty element",
fields: map[string]string{
"key1": "value1",
"key3": "value3",
"key2": "",
},
expected: "key1=value1, key3=value3",
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
flds := fieldsFromMap(tc.fields)

assert.Equal(t, expected, flds.string())
assert.Equal(t, tc.expected, flds.string())
})
}
}

func TestFieldsSanitization(t *testing.T) {
expected := "key1=value_1, key3=value_3, key:_2=valu_e:2"
flds := fieldsFromMap(map[string]string{
"key1": "value,1",
"key3": "value\n3",
"key=,2": "valu,e=2",
})
func BenchmarkFields(b *testing.B) {
attrMap := pcommon.NewMap()
flds := map[string]any{
"key1": "value1",
"key3": "value3",
"key2": "",
"map": map[string]string{
"key1": "value1",
"key3": "value3",
"key2": "",
},
}
for k, v := range flds {
switch v := v.(type) {
case string:
attrMap.PutStr(k, v)
case map[string]string:
m := pcommon.NewValueMap()
mm := m.Map().AsRaw()
for kk, vv := range v {
mm[kk] = vv
}
m.CopyTo(attrMap.PutEmpty(k))
}
}
sut := newFields(attrMap)

assert.Equal(t, expected, flds.string())
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = sut.string()
}
}
1 change: 1 addition & 0 deletions exporter/sumologicexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
go.opentelemetry.io/otel/metric v1.25.0
go.opentelemetry.io/otel/trace v1.25.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f
)

require (
Expand Down
2 changes: 2 additions & 0 deletions exporter/sumologicexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 907a13a

Please sign in to comment.