From 1a8b875e1a2e809cbdae474a909afc44e6b5719d Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Thu, 1 Jun 2023 10:14:11 -0700 Subject: [PATCH] [chore] [exporter/splunk_hec] Remove redundant bufState struct (#22986) Most of the stuff was moved out from this struct, so it's not needed anymore. --- exporter/splunkhecexporter/buffer.go | 77 ++---------- exporter/splunkhecexporter/client.go | 113 ++++++++++-------- exporter/splunkhecexporter/hec_worker.go | 10 +- exporter/splunkhecexporter/hec_worker_test.go | 2 +- 4 files changed, 82 insertions(+), 120 deletions(-) diff --git a/exporter/splunkhecexporter/buffer.go b/exporter/splunkhecexporter/buffer.go index 7e06046a444c..cc9c4af65fa5 100644 --- a/exporter/splunkhecexporter/buffer.go +++ b/exporter/splunkhecexporter/buffer.go @@ -9,20 +9,12 @@ import ( "errors" "io" "sync" - - jsoniter "github.com/json-iterator/go" ) var ( errOverCapacity = errors.New("over capacity") ) -// bufferState encapsulates intermediate buffer state when pushing data -type bufferState struct { - buf buffer - jsonStream *jsoniter.Stream -} - type buffer interface { io.Writer io.Reader @@ -32,39 +24,6 @@ type buffer interface { Empty() bool } -func (b *bufferState) compressionEnabled() bool { - _, ok := b.buf.(*cancellableGzipWriter) - return ok -} - -func (b *bufferState) containsData() bool { - return !b.buf.Empty() -} - -func (b *bufferState) reset() { - b.buf.Reset() -} - -func (b *bufferState) Read(p []byte) (n int, err error) { - return b.buf.Read(p) -} - -func (b *bufferState) Close() error { - return b.buf.Close() -} - -// accept returns true if data is accepted by the buffer -func (b *bufferState) accept(data []byte) (bool, error) { - _, err := b.buf.Write(data) - if err == nil { - return true, nil - } - if errors.Is(err, errOverCapacity) { - return false, nil - } - return false, err -} - type cancellableBytesWriter struct { innerWriter *bytes.Buffer maxCapacity uint @@ -166,44 +125,34 @@ func (c *cancellableGzipWriter) Empty() bool { return c.rawLen == 0 } -// bufferStatePool is a pool of bufferState objects. -type bufferStatePool struct { +// bufferPool is a pool of buffer objects. +type bufferPool struct { pool *sync.Pool } -// get returns a bufferState from the pool. -func (p bufferStatePool) get() *bufferState { - return p.pool.Get().(*bufferState) +func (p bufferPool) get() buffer { + return p.pool.Get().(buffer) } -// put returns a bufferState to the pool. -func (p bufferStatePool) put(bf *bufferState) { +func (p bufferPool) put(bf buffer) { p.pool.Put(bf) } -const initBufferCap = 512 - -func newBufferStatePool(bufCap uint, compressionEnabled bool) bufferStatePool { - return bufferStatePool{ +func newBufferPool(bufCap uint, compressionEnabled bool) bufferPool { + return bufferPool{ &sync.Pool{ New: func() interface{} { - innerBuffer := bytes.NewBuffer(make([]byte, 0, initBufferCap)) - var buf buffer + innerBuffer := &bytes.Buffer{} if compressionEnabled { - buf = &cancellableGzipWriter{ + return &cancellableGzipWriter{ innerBuffer: innerBuffer, - innerWriter: gzip.NewWriter(buf), - maxCapacity: bufCap, - } - } else { - buf = &cancellableBytesWriter{ - innerWriter: innerBuffer, + innerWriter: gzip.NewWriter(innerBuffer), maxCapacity: bufCap, } } - return &bufferState{ - buf: buf, - jsonStream: jsoniter.NewStream(jsoniter.ConfigDefault, nil, initBufferCap), + return &cancellableBytesWriter{ + innerWriter: innerBuffer, + maxCapacity: bufCap, } }, }, diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index f5903878c2e0..99e8fb01208a 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -6,6 +6,7 @@ package splunkhecexporter // import "github.com/open-telemetry/opentelemetry-col import ( "bytes" "context" + "errors" "fmt" "net/http" "net/url" @@ -50,7 +51,13 @@ type client struct { hecWorker hecWorker buildInfo component.BuildInfo heartbeater *heartbeater - bufferStatePool bufferStatePool + bufferPool bufferPool +} + +var jsonStreamPool = sync.Pool{ + New: func() interface{} { + return jsoniter.NewStream(jsoniter.ConfigDefault, nil, 512) + }, } func newClient(set exporter.CreateSettings, cfg *Config, maxContentLength uint) *client { @@ -59,7 +66,7 @@ func newClient(set exporter.CreateSettings, cfg *Config, maxContentLength uint) logger: set.Logger, telemetrySettings: set.TelemetrySettings, buildInfo: set.BuildInfo, - bufferStatePool: newBufferStatePool(maxContentLength, !cfg.DisableCompression), + bufferPool: newBufferPool(maxContentLength, !cfg.DisableCompression), } } @@ -156,17 +163,17 @@ func isProfilingData(sl plog.ScopeLogs) bool { // The batch content length is restricted to MaxContentLengthLogs. // ld log records are parsed to Splunk events. func (c *client) pushLogDataInBatches(ctx context.Context, ld plog.Logs, headers map[string]string) error { - bufState := c.bufferStatePool.get() - defer c.bufferStatePool.put(bufState) + buf := c.bufferPool.get() + defer c.bufferPool.put(buf) is := iterState{} var permanentErrors []error for !is.done { - bufState.reset() - latestIterState, batchPermanentErrors := c.fillLogsBuffer(ld, bufState, is) + buf.Reset() + latestIterState, batchPermanentErrors := c.fillLogsBuffer(ld, buf, is) permanentErrors = append(permanentErrors, batchPermanentErrors...) - if bufState.containsData() { - if err := c.postEvents(ctx, bufState, headers); err != nil { + if !buf.Empty() { + if err := c.postEvents(ctx, buf, headers); err != nil { return consumererror.NewLogs(err, subLogs(ld, is)) } } @@ -177,9 +184,11 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld plog.Logs, headers } // fillLogsBuffer fills the buffer with Splunk events until the buffer is full or all logs are processed. -func (c *client) fillLogsBuffer(logs plog.Logs, bs *bufferState, is iterState) (iterState, []error) { +func (c *client) fillLogsBuffer(logs plog.Logs, buf buffer, is iterState) (iterState, []error) { var b []byte var permanentErrors []error + jsonStream := jsonStreamPool.Get().(*jsoniter.Stream) + defer jsonStreamPool.Put(jsonStream) for i := is.resource; i < logs.ResourceLogs().Len(); i++ { rl := logs.ResourceLogs().At(i) @@ -198,7 +207,7 @@ func (c *client) fillLogsBuffer(logs plog.Logs, bs *bufferState, is iterState) ( // JSON encoding event and writing to buffer. var err error - b, err = marshalEvent(event, c.config.MaxEventSize, bs.jsonStream) + b, err = marshalEvent(event, c.config.MaxEventSize, jsonStream) if err != nil { permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( "dropped log event: %v, error: %w", event, err))) @@ -207,14 +216,12 @@ func (c *client) fillLogsBuffer(logs plog.Logs, bs *bufferState, is iterState) ( } // Continue adding events to buffer up to capacity. - accepted, e := bs.accept(b) - if e != nil { - permanentErrors = append(permanentErrors, - consumererror.NewPermanent(fmt.Errorf("error writing the event: %w", e))) + _, err := buf.Write(b) + if err == nil { continue } - if !accepted { - if bs.containsData() { + if errors.Is(err, errOverCapacity) { + if !buf.Empty() { return iterState{i, j, k, false}, permanentErrors } permanentErrors = append(permanentErrors, consumererror.NewPermanent( @@ -222,6 +229,8 @@ func (c *client) fillLogsBuffer(logs plog.Logs, bs *bufferState, is iterState) ( " content length %d bytes", len(b), c.config.MaxContentLengthLogs))) return iterState{i, j, k + 1, false}, permanentErrors } + permanentErrors = append(permanentErrors, + consumererror.NewPermanent(fmt.Errorf("error writing the event: %w", err))) } } } @@ -229,8 +238,10 @@ func (c *client) fillLogsBuffer(logs plog.Logs, bs *bufferState, is iterState) ( return iterState{done: true}, permanentErrors } -func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, bs *bufferState, is iterState) (iterState, []error) { +func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterState) (iterState, []error) { var permanentErrors []error + jsonStream := jsonStreamPool.Get().(*jsoniter.Stream) + defer jsonStreamPool.Put(jsonStream) for i := is.resource; i < metrics.ResourceMetrics().Len(); i++ { rm := metrics.ResourceMetrics().At(i) @@ -243,7 +254,7 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, bs *bufferState, is // Parsing metric record to Splunk event. events := mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger) - buf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics)) + tempBuf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics)) if c.config.UseMultiMetricFormat { merged, err := mergeEventsToMultiMetricFormat(events) if err != nil { @@ -255,24 +266,22 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, bs *bufferState, is } for _, event := range events { // JSON encoding event and writing to buffer. - b, err := marshalEvent(event, c.config.MaxEventSize, bs.jsonStream) + b, err := marshalEvent(event, c.config.MaxEventSize, jsonStream) if err != nil { permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, err))) continue } - buf.Write(b) + tempBuf.Write(b) } // Continue adding events to buffer up to capacity. - b := buf.Bytes() - accepted, e := bs.accept(b) - if e != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( - "error writing the event: %w", e))) + b := tempBuf.Bytes() + _, err := buf.Write(b) + if err == nil { continue } - if !accepted { - if bs.containsData() { + if errors.Is(err, errOverCapacity) { + if !buf.Empty() { return iterState{i, j, k, false}, permanentErrors } permanentErrors = append(permanentErrors, consumererror.NewPermanent( @@ -280,6 +289,8 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, bs *bufferState, is " content length %d bytes", len(b), c.config.MaxContentLengthMetrics))) return iterState{i, j, k + 1, false}, permanentErrors } + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( + "error writing the event: %w", err))) } } } @@ -287,8 +298,10 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, bs *bufferState, is return iterState{done: true}, permanentErrors } -func (c *client) fillTracesBuffer(traces ptrace.Traces, bs *bufferState, is iterState) (iterState, []error) { +func (c *client) fillTracesBuffer(traces ptrace.Traces, buf buffer, is iterState) (iterState, []error) { var permanentErrors []error + jsonStream := jsonStreamPool.Get().(*jsoniter.Stream) + defer jsonStreamPool.Put(jsonStream) for i := is.resource; i < traces.ResourceSpans().Len(); i++ { rs := traces.ResourceSpans().At(i) @@ -303,21 +316,19 @@ func (c *client) fillTracesBuffer(traces ptrace.Traces, bs *bufferState, is iter event := mapSpanToSplunkEvent(rs.Resource(), span, c.config) // JSON encoding event and writing to buffer. - b, err := marshalEvent(event, c.config.MaxEventSize, bs.jsonStream) + b, err := marshalEvent(event, c.config.MaxEventSize, jsonStream) if err != nil { permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped span events: %v, error: %w", event, err))) continue } // Continue adding events to buffer up to capacity. - accepted, e := bs.accept(b) - if e != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( - "error writing the event: %w", e))) + _, err = buf.Write(b) + if err == nil { continue } - if !accepted { - if bs.containsData() { + if errors.Is(err, errOverCapacity) { + if !buf.Empty() { return iterState{i, j, k, false}, permanentErrors } permanentErrors = append(permanentErrors, consumererror.NewPermanent( @@ -325,6 +336,8 @@ func (c *client) fillTracesBuffer(traces ptrace.Traces, bs *bufferState, is iter " content length %d bytes", len(b), c.config.MaxContentLengthTraces))) return iterState{i, j, k + 1, false}, permanentErrors } + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( + "error writing the event: %w", err))) } } } @@ -336,17 +349,17 @@ func (c *client) fillTracesBuffer(traces ptrace.Traces, bs *bufferState, is iter // The batch content length is restricted to MaxContentLengthMetrics. // md metrics are parsed to Splunk events. func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metrics, headers map[string]string) error { - bufState := c.bufferStatePool.get() - defer c.bufferStatePool.put(bufState) + buf := c.bufferPool.get() + defer c.bufferPool.put(buf) is := iterState{} var permanentErrors []error for !is.done { - bufState.reset() - latestIterState, batchPermanentErrors := c.fillMetricsBuffer(md, bufState, is) + buf.Reset() + latestIterState, batchPermanentErrors := c.fillMetricsBuffer(md, buf, is) permanentErrors = append(permanentErrors, batchPermanentErrors...) - if bufState.containsData() { - if err := c.postEvents(ctx, bufState, headers); err != nil { + if !buf.Empty() { + if err := c.postEvents(ctx, buf, headers); err != nil { return consumererror.NewMetrics(err, subMetrics(md, is)) } } @@ -360,17 +373,17 @@ func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metric // The batch content length is restricted to MaxContentLengthMetrics. // td traces are parsed to Splunk events. func (c *client) pushTracesDataInBatches(ctx context.Context, td ptrace.Traces, headers map[string]string) error { - bufState := c.bufferStatePool.get() - defer c.bufferStatePool.put(bufState) + buf := c.bufferPool.get() + defer c.bufferPool.put(buf) is := iterState{} var permanentErrors []error for !is.done { - bufState.reset() - latestIterState, batchPermanentErrors := c.fillTracesBuffer(td, bufState, is) + buf.Reset() + latestIterState, batchPermanentErrors := c.fillTracesBuffer(td, buf, is) permanentErrors = append(permanentErrors, batchPermanentErrors...) - if bufState.containsData() { - if err := c.postEvents(ctx, bufState, headers); err != nil { + if !buf.Empty() { + if err := c.postEvents(ctx, buf, headers); err != nil { return consumererror.NewTraces(err, subTraces(td, is)) } } @@ -380,11 +393,11 @@ func (c *client) pushTracesDataInBatches(ctx context.Context, td ptrace.Traces, return multierr.Combine(permanentErrors...) } -func (c *client) postEvents(ctx context.Context, bufState *bufferState, headers map[string]string) error { - if err := bufState.buf.Close(); err != nil { +func (c *client) postEvents(ctx context.Context, buf buffer, headers map[string]string) error { + if err := buf.Close(); err != nil { return err } - return c.hecWorker.send(ctx, bufState, headers) + return c.hecWorker.send(ctx, buf, headers) } // subLogs returns a subset of logs starting from the state. diff --git a/exporter/splunkhecexporter/hec_worker.go b/exporter/splunkhecexporter/hec_worker.go index 35a8280cc901..e24278c7e26c 100644 --- a/exporter/splunkhecexporter/hec_worker.go +++ b/exporter/splunkhecexporter/hec_worker.go @@ -16,7 +16,7 @@ import ( ) type hecWorker interface { - send(context.Context, *bufferState, map[string]string) error + send(context.Context, buffer, map[string]string) error } type defaultHecWorker struct { @@ -25,12 +25,12 @@ type defaultHecWorker struct { headers map[string]string } -func (hec *defaultHecWorker) send(ctx context.Context, bufferState *bufferState, headers map[string]string) error { - req, err := http.NewRequestWithContext(ctx, "POST", hec.url.String(), bufferState) +func (hec *defaultHecWorker) send(ctx context.Context, buf buffer, headers map[string]string) error { + req, err := http.NewRequestWithContext(ctx, "POST", hec.url.String(), buf) if err != nil { return consumererror.NewPermanent(err) } - req.ContentLength = int64(bufferState.buf.Len()) + req.ContentLength = int64(buf.Len()) // Set the headers configured for the client for k, v := range hec.headers { @@ -42,7 +42,7 @@ func (hec *defaultHecWorker) send(ctx context.Context, bufferState *bufferState, req.Header.Set(k, v) } - if bufferState.compressionEnabled() { + if _, ok := buf.(*cancellableGzipWriter); ok { req.Header.Set("Content-Encoding", "gzip") } diff --git a/exporter/splunkhecexporter/hec_worker_test.go b/exporter/splunkhecexporter/hec_worker_test.go index 5ed7de05ebe3..17cacae0465e 100644 --- a/exporter/splunkhecexporter/hec_worker_test.go +++ b/exporter/splunkhecexporter/hec_worker_test.go @@ -14,7 +14,7 @@ type mockHecWorker struct { failSend bool } -func (m *mockHecWorker) send(_ context.Context, _ *bufferState, _ map[string]string) error { +func (m *mockHecWorker) send(_ context.Context, _ buffer, _ map[string]string) error { if m.failSend { return errHecSendFailed }