Skip to content

Commit

Permalink
[chore] [exporter/splunk_hec] Remove redundant bufState struct (open-…
Browse files Browse the repository at this point in the history
…telemetry#22986)

Most of the stuff was moved out from this struct, so it's not needed anymore.
  • Loading branch information
dmitryax authored and Caleb-Hurshman committed Jul 6, 2023
1 parent 50244c4 commit 1a8b875
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 120 deletions.
77 changes: 13 additions & 64 deletions exporter/splunkhecexporter/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,12 @@ import (
"errors"
"io"
"sync"

jsoniter "github.com/json-iterator/go"
)

var (
errOverCapacity = errors.New("over capacity")
)

// bufferState encapsulates intermediate buffer state when pushing data
type bufferState struct {
buf buffer
jsonStream *jsoniter.Stream
}

type buffer interface {
io.Writer
io.Reader
Expand All @@ -32,39 +24,6 @@ type buffer interface {
Empty() bool
}

func (b *bufferState) compressionEnabled() bool {
_, ok := b.buf.(*cancellableGzipWriter)
return ok
}

func (b *bufferState) containsData() bool {
return !b.buf.Empty()
}

func (b *bufferState) reset() {
b.buf.Reset()
}

func (b *bufferState) Read(p []byte) (n int, err error) {
return b.buf.Read(p)
}

func (b *bufferState) Close() error {
return b.buf.Close()
}

// accept returns true if data is accepted by the buffer
func (b *bufferState) accept(data []byte) (bool, error) {
_, err := b.buf.Write(data)
if err == nil {
return true, nil
}
if errors.Is(err, errOverCapacity) {
return false, nil
}
return false, err
}

type cancellableBytesWriter struct {
innerWriter *bytes.Buffer
maxCapacity uint
Expand Down Expand Up @@ -166,44 +125,34 @@ func (c *cancellableGzipWriter) Empty() bool {
return c.rawLen == 0
}

// bufferStatePool is a pool of bufferState objects.
type bufferStatePool struct {
// bufferPool is a pool of buffer objects.
type bufferPool struct {
pool *sync.Pool
}

// get returns a bufferState from the pool.
func (p bufferStatePool) get() *bufferState {
return p.pool.Get().(*bufferState)
func (p bufferPool) get() buffer {
return p.pool.Get().(buffer)
}

// put returns a bufferState to the pool.
func (p bufferStatePool) put(bf *bufferState) {
func (p bufferPool) put(bf buffer) {
p.pool.Put(bf)
}

const initBufferCap = 512

func newBufferStatePool(bufCap uint, compressionEnabled bool) bufferStatePool {
return bufferStatePool{
func newBufferPool(bufCap uint, compressionEnabled bool) bufferPool {
return bufferPool{
&sync.Pool{
New: func() interface{} {
innerBuffer := bytes.NewBuffer(make([]byte, 0, initBufferCap))
var buf buffer
innerBuffer := &bytes.Buffer{}
if compressionEnabled {
buf = &cancellableGzipWriter{
return &cancellableGzipWriter{
innerBuffer: innerBuffer,
innerWriter: gzip.NewWriter(buf),
maxCapacity: bufCap,
}
} else {
buf = &cancellableBytesWriter{
innerWriter: innerBuffer,
innerWriter: gzip.NewWriter(innerBuffer),
maxCapacity: bufCap,
}
}
return &bufferState{
buf: buf,
jsonStream: jsoniter.NewStream(jsoniter.ConfigDefault, nil, initBufferCap),
return &cancellableBytesWriter{
innerWriter: innerBuffer,
maxCapacity: bufCap,
}
},
},
Expand Down
113 changes: 63 additions & 50 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package splunkhecexporter // import "github.com/open-telemetry/opentelemetry-col
import (
"bytes"
"context"
"errors"
"fmt"
"net/http"
"net/url"
Expand Down Expand Up @@ -50,7 +51,13 @@ type client struct {
hecWorker hecWorker
buildInfo component.BuildInfo
heartbeater *heartbeater
bufferStatePool bufferStatePool
bufferPool bufferPool
}

var jsonStreamPool = sync.Pool{
New: func() interface{} {
return jsoniter.NewStream(jsoniter.ConfigDefault, nil, 512)
},
}

func newClient(set exporter.CreateSettings, cfg *Config, maxContentLength uint) *client {
Expand All @@ -59,7 +66,7 @@ func newClient(set exporter.CreateSettings, cfg *Config, maxContentLength uint)
logger: set.Logger,
telemetrySettings: set.TelemetrySettings,
buildInfo: set.BuildInfo,
bufferStatePool: newBufferStatePool(maxContentLength, !cfg.DisableCompression),
bufferPool: newBufferPool(maxContentLength, !cfg.DisableCompression),
}
}

Expand Down Expand Up @@ -156,17 +163,17 @@ func isProfilingData(sl plog.ScopeLogs) bool {
// The batch content length is restricted to MaxContentLengthLogs.
// ld log records are parsed to Splunk events.
func (c *client) pushLogDataInBatches(ctx context.Context, ld plog.Logs, headers map[string]string) error {
bufState := c.bufferStatePool.get()
defer c.bufferStatePool.put(bufState)
buf := c.bufferPool.get()
defer c.bufferPool.put(buf)
is := iterState{}
var permanentErrors []error

for !is.done {
bufState.reset()
latestIterState, batchPermanentErrors := c.fillLogsBuffer(ld, bufState, is)
buf.Reset()
latestIterState, batchPermanentErrors := c.fillLogsBuffer(ld, buf, is)
permanentErrors = append(permanentErrors, batchPermanentErrors...)
if bufState.containsData() {
if err := c.postEvents(ctx, bufState, headers); err != nil {
if !buf.Empty() {
if err := c.postEvents(ctx, buf, headers); err != nil {
return consumererror.NewLogs(err, subLogs(ld, is))
}
}
Expand All @@ -177,9 +184,11 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld plog.Logs, headers
}

// fillLogsBuffer fills the buffer with Splunk events until the buffer is full or all logs are processed.
func (c *client) fillLogsBuffer(logs plog.Logs, bs *bufferState, is iterState) (iterState, []error) {
func (c *client) fillLogsBuffer(logs plog.Logs, buf buffer, is iterState) (iterState, []error) {
var b []byte
var permanentErrors []error
jsonStream := jsonStreamPool.Get().(*jsoniter.Stream)
defer jsonStreamPool.Put(jsonStream)

for i := is.resource; i < logs.ResourceLogs().Len(); i++ {
rl := logs.ResourceLogs().At(i)
Expand All @@ -198,7 +207,7 @@ func (c *client) fillLogsBuffer(logs plog.Logs, bs *bufferState, is iterState) (

// JSON encoding event and writing to buffer.
var err error
b, err = marshalEvent(event, c.config.MaxEventSize, bs.jsonStream)
b, err = marshalEvent(event, c.config.MaxEventSize, jsonStream)
if err != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf(
"dropped log event: %v, error: %w", event, err)))
Expand All @@ -207,30 +216,32 @@ func (c *client) fillLogsBuffer(logs plog.Logs, bs *bufferState, is iterState) (
}

// Continue adding events to buffer up to capacity.
accepted, e := bs.accept(b)
if e != nil {
permanentErrors = append(permanentErrors,
consumererror.NewPermanent(fmt.Errorf("error writing the event: %w", e)))
_, err := buf.Write(b)
if err == nil {
continue
}
if !accepted {
if bs.containsData() {
if errors.Is(err, errOverCapacity) {
if !buf.Empty() {
return iterState{i, j, k, false}, permanentErrors
}
permanentErrors = append(permanentErrors, consumererror.NewPermanent(
fmt.Errorf("dropped log event: error: event size %d bytes larger than configured max"+
" content length %d bytes", len(b), c.config.MaxContentLengthLogs)))
return iterState{i, j, k + 1, false}, permanentErrors
}
permanentErrors = append(permanentErrors,
consumererror.NewPermanent(fmt.Errorf("error writing the event: %w", err)))
}
}
}

return iterState{done: true}, permanentErrors
}

func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, bs *bufferState, is iterState) (iterState, []error) {
func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterState) (iterState, []error) {
var permanentErrors []error
jsonStream := jsonStreamPool.Get().(*jsoniter.Stream)
defer jsonStreamPool.Put(jsonStream)

for i := is.resource; i < metrics.ResourceMetrics().Len(); i++ {
rm := metrics.ResourceMetrics().At(i)
Expand All @@ -243,7 +254,7 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, bs *bufferState, is

// Parsing metric record to Splunk event.
events := mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger)
buf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics))
tempBuf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics))
if c.config.UseMultiMetricFormat {
merged, err := mergeEventsToMultiMetricFormat(events)
if err != nil {
Expand All @@ -255,40 +266,42 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, bs *bufferState, is
}
for _, event := range events {
// JSON encoding event and writing to buffer.
b, err := marshalEvent(event, c.config.MaxEventSize, bs.jsonStream)
b, err := marshalEvent(event, c.config.MaxEventSize, jsonStream)
if err != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, err)))
continue
}
buf.Write(b)
tempBuf.Write(b)
}

// Continue adding events to buffer up to capacity.
b := buf.Bytes()
accepted, e := bs.accept(b)
if e != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf(
"error writing the event: %w", e)))
b := tempBuf.Bytes()
_, err := buf.Write(b)
if err == nil {
continue
}
if !accepted {
if bs.containsData() {
if errors.Is(err, errOverCapacity) {
if !buf.Empty() {
return iterState{i, j, k, false}, permanentErrors
}
permanentErrors = append(permanentErrors, consumererror.NewPermanent(
fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+
" content length %d bytes", len(b), c.config.MaxContentLengthMetrics)))
return iterState{i, j, k + 1, false}, permanentErrors
}
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf(
"error writing the event: %w", err)))
}
}
}

return iterState{done: true}, permanentErrors
}

func (c *client) fillTracesBuffer(traces ptrace.Traces, bs *bufferState, is iterState) (iterState, []error) {
func (c *client) fillTracesBuffer(traces ptrace.Traces, buf buffer, is iterState) (iterState, []error) {
var permanentErrors []error
jsonStream := jsonStreamPool.Get().(*jsoniter.Stream)
defer jsonStreamPool.Put(jsonStream)

for i := is.resource; i < traces.ResourceSpans().Len(); i++ {
rs := traces.ResourceSpans().At(i)
Expand All @@ -303,28 +316,28 @@ func (c *client) fillTracesBuffer(traces ptrace.Traces, bs *bufferState, is iter
event := mapSpanToSplunkEvent(rs.Resource(), span, c.config)

// JSON encoding event and writing to buffer.
b, err := marshalEvent(event, c.config.MaxEventSize, bs.jsonStream)
b, err := marshalEvent(event, c.config.MaxEventSize, jsonStream)
if err != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped span events: %v, error: %w", event, err)))
continue
}

// Continue adding events to buffer up to capacity.
accepted, e := bs.accept(b)
if e != nil {
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf(
"error writing the event: %w", e)))
_, err = buf.Write(b)
if err == nil {
continue
}
if !accepted {
if bs.containsData() {
if errors.Is(err, errOverCapacity) {
if !buf.Empty() {
return iterState{i, j, k, false}, permanentErrors
}
permanentErrors = append(permanentErrors, consumererror.NewPermanent(
fmt.Errorf("dropped span event: error: event size %d bytes larger than configured max"+
" content length %d bytes", len(b), c.config.MaxContentLengthTraces)))
return iterState{i, j, k + 1, false}, permanentErrors
}
permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf(
"error writing the event: %w", err)))
}
}
}
Expand All @@ -336,17 +349,17 @@ func (c *client) fillTracesBuffer(traces ptrace.Traces, bs *bufferState, is iter
// The batch content length is restricted to MaxContentLengthMetrics.
// md metrics are parsed to Splunk events.
func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metrics, headers map[string]string) error {
bufState := c.bufferStatePool.get()
defer c.bufferStatePool.put(bufState)
buf := c.bufferPool.get()
defer c.bufferPool.put(buf)
is := iterState{}
var permanentErrors []error

for !is.done {
bufState.reset()
latestIterState, batchPermanentErrors := c.fillMetricsBuffer(md, bufState, is)
buf.Reset()
latestIterState, batchPermanentErrors := c.fillMetricsBuffer(md, buf, is)
permanentErrors = append(permanentErrors, batchPermanentErrors...)
if bufState.containsData() {
if err := c.postEvents(ctx, bufState, headers); err != nil {
if !buf.Empty() {
if err := c.postEvents(ctx, buf, headers); err != nil {
return consumererror.NewMetrics(err, subMetrics(md, is))
}
}
Expand All @@ -360,17 +373,17 @@ func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metric
// The batch content length is restricted to MaxContentLengthMetrics.
// td traces are parsed to Splunk events.
func (c *client) pushTracesDataInBatches(ctx context.Context, td ptrace.Traces, headers map[string]string) error {
bufState := c.bufferStatePool.get()
defer c.bufferStatePool.put(bufState)
buf := c.bufferPool.get()
defer c.bufferPool.put(buf)
is := iterState{}
var permanentErrors []error

for !is.done {
bufState.reset()
latestIterState, batchPermanentErrors := c.fillTracesBuffer(td, bufState, is)
buf.Reset()
latestIterState, batchPermanentErrors := c.fillTracesBuffer(td, buf, is)
permanentErrors = append(permanentErrors, batchPermanentErrors...)
if bufState.containsData() {
if err := c.postEvents(ctx, bufState, headers); err != nil {
if !buf.Empty() {
if err := c.postEvents(ctx, buf, headers); err != nil {
return consumererror.NewTraces(err, subTraces(td, is))
}
}
Expand All @@ -380,11 +393,11 @@ func (c *client) pushTracesDataInBatches(ctx context.Context, td ptrace.Traces,
return multierr.Combine(permanentErrors...)
}

func (c *client) postEvents(ctx context.Context, bufState *bufferState, headers map[string]string) error {
if err := bufState.buf.Close(); err != nil {
func (c *client) postEvents(ctx context.Context, buf buffer, headers map[string]string) error {
if err := buf.Close(); err != nil {
return err
}
return c.hecWorker.send(ctx, bufState, headers)
return c.hecWorker.send(ctx, buf, headers)
}

// subLogs returns a subset of logs starting from the state.
Expand Down
Loading

0 comments on commit 1a8b875

Please sign in to comment.