Skip to content

Commit

Permalink
[exporter/splunkhec] Increase the performance of JSON marshaling
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Jul 10, 2024
1 parent 948fa91 commit 5feebb5
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 48 deletions.
27 changes: 27 additions & 0 deletions .chloggen/hec_json.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: splunkhecexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Increase the performance of JSON marshaling

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34011]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
42 changes: 13 additions & 29 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"net/url"
"sync"

jsoniter "github.com/json-iterator/go"
"github.com/goccy/go-json"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
Expand Down Expand Up @@ -58,12 +58,6 @@ type client struct {
meter metric.Meter
}

var jsonStreamPool = sync.Pool{
New: func() any {
return jsoniter.NewStream(jsoniter.ConfigDefault, nil, 512)
},
}

func newClient(set exporter.Settings, cfg *Config, maxContentLength uint) *client {
return &client{
config: cfg,
Expand Down Expand Up @@ -196,8 +190,6 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld plog.Logs, headers
func (c *client) fillLogsBuffer(logs plog.Logs, buf buffer, is iterState) (iterState, []error) {
var b []byte
var permanentErrors []error
jsonStream := jsonStreamPool.Get().(*jsoniter.Stream)
defer jsonStreamPool.Put(jsonStream)

for i := is.resource; i < logs.ResourceLogs().Len(); i++ {
rl := logs.ResourceLogs().At(i)
Expand All @@ -216,7 +208,7 @@ func (c *client) fillLogsBuffer(logs plog.Logs, buf buffer, is iterState) (iterS

// JSON encoding event and writing to buffer.
var err error
b, err = marshalEvent(event, c.config.MaxEventSize, jsonStream)
b, err = marshalEvent(event, c.config.MaxEventSize)
if err != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf(
"dropped log event: %v, error: %w", event, err)))
Expand Down Expand Up @@ -249,8 +241,6 @@ func (c *client) fillLogsBuffer(logs plog.Logs, buf buffer, is iterState) (iterS

func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterState) (iterState, []error) {
var permanentErrors []error
jsonStream := jsonStreamPool.Get().(*jsoniter.Stream)
defer jsonStreamPool.Put(jsonStream)

tempBuf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics))
for i := is.resource; i < metrics.ResourceMetrics().Len(); i++ {
Expand All @@ -267,7 +257,7 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS
tempBuf.Reset()
for _, event := range events {
// JSON encoding event and writing to buffer.
b, err := marshalEvent(event, c.config.MaxEventSize, jsonStream)
b, err := marshalEvent(event, c.config.MaxEventSize)
if err != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, err)))
continue
Expand Down Expand Up @@ -301,13 +291,11 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS

func (c *client) fillMetricsBufferMultiMetrics(events []*splunk.Event, buf buffer, is iterState) (iterState, []error) {
var permanentErrors []error
jsonStream := jsonStreamPool.Get().(*jsoniter.Stream)
defer jsonStreamPool.Put(jsonStream)

for i := is.record; i < len(events); i++ {
event := events[i]
// JSON encoding event and writing to buffer.
b, jsonErr := marshalEvent(event, c.config.MaxEventSize, jsonStream)
b, jsonErr := marshalEvent(event, c.config.MaxEventSize)
if jsonErr != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, jsonErr)))
continue
Expand Down Expand Up @@ -338,8 +326,6 @@ func (c *client) fillMetricsBufferMultiMetrics(events []*splunk.Event, buf buffe

func (c *client) fillTracesBuffer(traces ptrace.Traces, buf buffer, is iterState) (iterState, []error) {
var permanentErrors []error
jsonStream := jsonStreamPool.Get().(*jsoniter.Stream)
defer jsonStreamPool.Put(jsonStream)

for i := is.resource; i < traces.ResourceSpans().Len(); i++ {
rs := traces.ResourceSpans().At(i)
Expand All @@ -354,7 +340,7 @@ func (c *client) fillTracesBuffer(traces ptrace.Traces, buf buffer, is iterState
event := mapSpanToSplunkEvent(rs.Resource(), span, c.config)

// JSON encoding event and writing to buffer.
b, err := marshalEvent(event, c.config.MaxEventSize, jsonStream)
b, err := marshalEvent(event, c.config.MaxEventSize)
if err != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped span events: %v, error: %w", event, err)))
continue
Expand Down Expand Up @@ -687,16 +673,14 @@ func buildHTTPHeaders(config *Config, buildInfo component.BuildInfo) map[string]
}
}

// marshalEvent marshals an event to JSON using a reusable jsoniter stream.
func marshalEvent(event *splunk.Event, sizeLimit uint, stream *jsoniter.Stream) ([]byte, error) {
stream.Reset(nil)
stream.Error = nil
stream.WriteVal(event)
if stream.Error != nil {
return nil, stream.Error
// marshalEvent marshals an event to JSON
func marshalEvent(event *splunk.Event, sizeLimit uint) ([]byte, error) {
b, err := json.Marshal(event)
if err != nil {
return nil, err
}
if uint(stream.Buffered()) > sizeLimit {
return nil, fmt.Errorf("event size %d exceeds limit %d", stream.Buffered(), sizeLimit)
if uint(len(b)) > sizeLimit {
return nil, fmt.Errorf("event size %d exceeds limit %d", len(b), sizeLimit)
}
return stream.Buffer(), nil
return b, nil
}
9 changes: 4 additions & 5 deletions exporter/splunkhecexporter/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"testing"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/goccy/go-json"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -978,16 +978,15 @@ func TestReceiveSpanEvent(t *testing.T) {
}

// compareWithTestData compares hec output with a json file using maps instead of strings to avoid key ordering
// issues (jsoniter doesn't sort the keys).
func compareWithTestData(t *testing.T, actual []byte, file string) {
wantStr, err := os.ReadFile(file)
require.NoError(t, err)
wantMap := map[string]any{}
err = jsoniter.Unmarshal(wantStr, &wantMap)
err = json.Unmarshal(wantStr, &wantMap)
require.NoError(t, err)

gotMap := map[string]any{}
err = jsoniter.Unmarshal(actual, &gotMap)
err = json.Unmarshal(actual, &gotMap)
require.NoError(t, err)
assert.Equal(t, wantMap, gotMap)
}
Expand Down Expand Up @@ -1481,7 +1480,7 @@ func TestInvalidJson(t *testing.T) {
badEvent := badJSON{
Foo: math.Inf(1),
}
_, err := jsoniter.Marshal(badEvent)
_, err := json.Marshal(badEvent)
assert.Error(t, err)
}

Expand Down
3 changes: 2 additions & 1 deletion exporter/splunkhecexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.21.0

require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/json-iterator/go v1.1.12
github.com/goccy/go-json v0.9.7
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.104.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.104.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.104.0
Expand Down Expand Up @@ -57,6 +57,7 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions exporter/splunkhecexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions exporter/splunkhecexporter/logdata_to_splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"fmt"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/goccy/go-json"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"

Expand Down Expand Up @@ -110,8 +110,8 @@ func mergeValue(dst map[string]any, k string, v any) {
if isArrayFlat(element) {
dst[k] = v
} else {
jsonStr, _ := jsoniter.MarshalToString(element)
dst[k] = jsonStr
b, _ := json.Marshal(element)
dst[k] = string(b)
}
case map[string]any:
flattenAndMergeMap(element, dst, k)
Expand Down Expand Up @@ -141,8 +141,8 @@ func flattenAndMergeMap(src, dst map[string]any, key string) {
if isArrayFlat(element) {
dst[current] = element
} else {
jsonStr, _ := jsoniter.MarshalToString(element)
dst[current] = jsonStr
b, _ := json.Marshal(element)
dst[current] = string(b)
}

default:
Expand Down
5 changes: 2 additions & 3 deletions exporter/splunkhecexporter/metricdata_to_splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strconv"
"strings"

jsoniter "github.com/json-iterator/go"
"github.com/goccy/go-json"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
Expand Down Expand Up @@ -284,12 +284,11 @@ func mergeEventsToMultiMetricFormat(events []*splunk.Event) ([]*splunk.Event, er
hashes := map[uint32]*splunk.Event{}
hasher := fnv.New32a()
var merged []*splunk.Event
marshaler := jsoniter.ConfigCompatibleWithStandardLibrary

for _, e := range events {
cloned := copyEventWithoutValues(e)

data, err := marshaler.Marshal(cloned)
data, err := json.Marshal(cloned)
if err != nil {
return nil, err
}
Expand Down
9 changes: 4 additions & 5 deletions exporter/splunkhecexporter/metricdata_to_splunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
package splunkhecexporter

import (
"encoding/json"
"fmt"
"io"
"math"
"testing"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/goccy/go-json"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -706,16 +705,16 @@ func TestMergeEvents(t *testing.T) {
json1 := `{"event":"metric","fields":{"IF-Azure":"azure-env","k8s.cluster.name":"devops-uat","k8s.namespace.name":"splunk-collector-tests","k8s.node.name":"myk8snodename","k8s.pod.name":"my-otel-collector-pod","metric_type":"Gauge","metricsIndex":"test_metrics","metricsPlatform":"unset","resourceAttrs":"NO","testNumber":"number42","testRun":"42","metric_name:otel.collector.test":3411}}`
json2 := `{"event":"metric","fields":{"IF-Azure":"azure-env","k8s.cluster.name":"devops-uat","k8s.namespace.name":"splunk-collector-tests","k8s.node.name":"myk8snodename","k8s.pod.name":"my-otel-collector-pod","metric_type":"Gauge","metricsIndex":"test_metrics","metricsPlatform":"unset","resourceAttrs":"NO","testNumber":"number42","testRun":"42","metric_name:otel.collector.test2":26059}}`
ev1 := &splunk.Event{}
err := jsoniter.Unmarshal([]byte(json1), ev1)
err := json.Unmarshal([]byte(json1), ev1)
require.NoError(t, err)
ev2 := &splunk.Event{}
err = jsoniter.Unmarshal([]byte(json2), ev2)
err = json.Unmarshal([]byte(json2), ev2)
require.NoError(t, err)
events := []*splunk.Event{ev1, ev2}
merged, err := mergeEventsToMultiMetricFormat(events)
require.NoError(t, err)
require.Len(t, merged, 1)
b, err := jsoniter.ConfigCompatibleWithStandardLibrary.Marshal(merged[0])
b, err := json.Marshal(merged[0])
require.NoError(t, err)
require.Equal(t, `{"host":"","event":"metric","fields":{"IF-Azure":"azure-env","k8s.cluster.name":"devops-uat","k8s.namespace.name":"splunk-collector-tests","k8s.node.name":"myk8snodename","k8s.pod.name":"my-otel-collector-pod","metric_name:otel.collector.test":3411,"metric_name:otel.collector.test2":26059,"metric_type":"Gauge","metricsIndex":"test_metrics","metricsPlatform":"unset","resourceAttrs":"NO","testNumber":"number42","testRun":"42"}}`, string(b))
}
Expand Down
1 change: 1 addition & 0 deletions receiver/splunkhecreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect
github.com/goccy/go-json v0.9.7 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions receiver/splunkhecreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 5feebb5

Please sign in to comment.