Skip to content

Commit

Permalink
[chore] Move exporterhelper queue code to exporterqueue
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Jan 7, 2025
1 parent 6c99ba0 commit 6765569
Show file tree
Hide file tree
Showing 24 changed files with 281 additions and 256 deletions.
1 change: 1 addition & 0 deletions exporter/exporterhelper/exporterhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper"

import "go.opentelemetry.io/collector/exporter/internal"

// Request represents a single request that can be sent to an external endpoint.
Expand Down
18 changes: 12 additions & 6 deletions exporter/exporterhelper/internal/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/internal"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/exporter/internal/storagetest"
"go.opentelemetry.io/collector/pipeline"
)

Expand Down Expand Up @@ -435,7 +435,7 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) {
require.NoError(t, err)

extensions := map[component.ID]component.Component{
storageID: queue.NewMockStorageExtension(nil),
storageID: storagetest.NewMockStorageExtension(nil),
}
host := &MockHost{Ext: extensions}

Expand Down Expand Up @@ -468,7 +468,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
require.NoError(t, err)

extensions := map[component.ID]component.Component{
storageID: queue.NewMockStorageExtension(storageError),
storageID: storagetest.NewMockStorageExtension(storageError),
}
host := &MockHost{Ext: extensions}

Expand Down Expand Up @@ -500,7 +500,7 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {
require.NoError(t, err)

extensions := map[component.ID]component.Component{
storageID: queue.NewMockStorageExtension(nil),
storageID: storagetest.NewMockStorageExtension(nil),
}
host := &MockHost{Ext: extensions}

Expand Down Expand Up @@ -540,11 +540,17 @@ func TestQueueSenderNoStartShutdown(t *testing.T) {
runTest := func(testName string, enableQueueBatcher bool) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
queue := queue.NewBoundedMemoryQueue[internal.Request](queue.MemoryQueueSettings[internal.Request]{})
set := exportertest.NewNopSettings()
queue := exporterqueue.NewMemoryQueueFactory[internal.Request]()(
context.Background(),
exporterqueue.Settings{
Signal: pipeline.SignalTraces,
ExporterSettings: set,
},
exporterqueue.NewDefaultConfig())
obsrep, err := NewExporter(ObsReportSettings{
ExporterID: exporterID,
ExporterCreateSettings: exportertest.NewNopSettings(),
ExporterCreateSettings: set,
})
require.NoError(t, err)
qs := NewQueueSender(queue, set, 1, "", obsrep, exporterbatcher.NewDefaultConfig())
Expand Down
3 changes: 1 addition & 2 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pipeline"
)
Expand Down Expand Up @@ -135,7 +134,7 @@ func NewLogsRequest(
return consumererror.NewPermanent(cErr)
}
sErr := be.Send(ctx, req)
if errors.Is(sErr, queue.ErrQueueIsFull) {
if errors.Is(sErr, exporterqueue.ErrQueueIsFull) {
be.Obsrep.RecordEnqueueFailure(ctx, pipeline.SignalLogs, int64(req.ItemsCount()))
}
return sErr
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/exporter/internal/storagetest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/testdata"
)
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestLogs_WithPersistentQueue(t *testing.T) {
require.NoError(t, err)

host := &internal.MockHost{Ext: map[component.ID]component.Component{
storageID: queue.NewMockStorageExtension(nil),
storageID: storagetest.NewMockStorageExtension(nil),
}}
require.NoError(t, te.Start(context.Background(), host))
t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) })
Expand Down
3 changes: 1 addition & 2 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pipeline"
)
Expand Down Expand Up @@ -135,7 +134,7 @@ func NewMetricsRequest(
return consumererror.NewPermanent(cErr)
}
sErr := be.Send(ctx, req)
if errors.Is(sErr, queue.ErrQueueIsFull) {
if errors.Is(sErr, exporterqueue.ErrQueueIsFull) {
be.Obsrep.RecordEnqueueFailure(ctx, pipeline.SignalMetrics, int64(req.ItemsCount()))
}
return sErr
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/exporter/internal/storagetest"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/testdata"
)
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestMetrics_WithPersistentQueue(t *testing.T) {
require.NoError(t, err)

host := &internal.MockHost{Ext: map[component.ID]component.Component{
storageID: queue.NewMockStorageExtension(nil),
storageID: storagetest.NewMockStorageExtension(nil),
}}
require.NoError(t, te.Start(context.Background(), host))
t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) })
Expand Down
3 changes: 1 addition & 2 deletions exporter/exporterhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pipeline"
)
Expand Down Expand Up @@ -135,7 +134,7 @@ func NewTracesRequest(
return consumererror.NewPermanent(cErr)
}
sErr := be.Send(ctx, req)
if errors.Is(sErr, queue.ErrQueueIsFull) {
if errors.Is(sErr, exporterqueue.ErrQueueIsFull) {
be.Obsrep.RecordEnqueueFailure(ctx, pipeline.SignalTraces, int64(req.ItemsCount()))
}
return sErr
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/exporter/internal/storagetest"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pdata/testdata"
)
Expand Down Expand Up @@ -167,7 +167,7 @@ func TestTraces_WithPersistentQueue(t *testing.T) {
require.NoError(t, err)

host := &internal.MockHost{Ext: map[component.ID]component.Component{
storageID: queue.NewMockStorageExtension(nil),
storageID: storagetest.NewMockStorageExtension(nil),
}}
require.NoError(t, te.Start(context.Background(), host))
t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) })
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/xexporterhelper/profiles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/exporter/internal/storagetest"
"go.opentelemetry.io/collector/exporter/xexporter"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/testdata"
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestProfilesExporter_WithPersistentQueue(t *testing.T) {
require.NoError(t, err)

host := &internal.MockHost{Ext: map[component.ID]component.Component{
storageID: queue.NewMockStorageExtension(nil),
storageID: storagetest.NewMockStorageExtension(nil),
}}
require.NoError(t, te.Start(context.Background(), host))
t.Cleanup(func() { require.NoError(t, te.Shutdown(context.Background())) })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Copyright (c) 2017 Uber Technologies, Inc.
// SPDX-License-Identifier: Apache-2.0

package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"
package exporterqueue // import "go.opentelemetry.io/collector/exporter/internal/queue"

import (
"context"
Expand All @@ -17,21 +17,21 @@ import (
type boundedMemoryQueue[T any] struct {
component.StartFunc
*sizedChannel[memQueueEl[T]]
sizer Sizer[T]
sizer sizer[T]
}

// MemoryQueueSettings defines internal parameters for boundedMemoryQueue creation.
type MemoryQueueSettings[T any] struct {
Sizer Sizer[T]
Capacity int64
type memoryQueueSettings[T any] struct {
sizer sizer[T]
capacity int64
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// newBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue[T any](set MemoryQueueSettings[T]) Queue[T] {
func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) Queue[T] {
return &boundedMemoryQueue[T]{
sizedChannel: newSizedChannel[memQueueEl[T]](set.Capacity, nil, 0),
sizer: set.Sizer,
sizedChannel: newSizedChannel[memQueueEl[T]](set.capacity, nil, 0),
sizer: set.sizer,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
// Copyright (c) 2017 Uber Technologies, Inc.
// SPDX-License-Identifier: Apache-2.0

package queue
package exporterqueue

import (
"context"
"strconv"
"sync"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -21,7 +22,7 @@ import (
// We want to test the overflow behavior, so we block the consumer
// by holding a startLock before submitting items to the queue.
func TestBoundedQueue(t *testing.T) {
q := NewBoundedMemoryQueue[string](MemoryQueueSettings[string]{Sizer: &RequestSizer[string]{}, Capacity: 1})
q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 1})

require.NoError(t, q.Offer(context.Background(), "a"))

Expand Down Expand Up @@ -71,7 +72,7 @@ func TestBoundedQueue(t *testing.T) {
// only after Stop will mean the consumers are still locked while
// trying to perform the final consumptions.
func TestShutdownWhileNotEmpty(t *testing.T) {
q := NewBoundedMemoryQueue[string](MemoryQueueSettings[string]{Sizer: &RequestSizer[string]{}, Capacity: 1000})
q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 1000})

assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
for i := 0; i < 10; i++ {
Expand All @@ -97,11 +98,11 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
}

func Benchmark_QueueUsage_1000_requests(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 1000)
benchmarkQueueUsage(b, &requestSizer[fakeReq]{}, 1000)
}

func Benchmark_QueueUsage_100000_requests(b *testing.B) {
benchmarkQueueUsage(b, &RequestSizer[fakeReq]{}, 100000)
benchmarkQueueUsage(b, &requestSizer[fakeReq]{}, 100000)
}

func Benchmark_QueueUsage_10000_items(b *testing.B) {
Expand All @@ -116,40 +117,38 @@ func Benchmark_QueueUsage_1M_items(b *testing.B) {

func TestQueueUsage(t *testing.T) {
t.Run("requests_based", func(t *testing.T) {
queueUsage(t, &RequestSizer[fakeReq]{}, 10)
queueUsage(t, &requestSizer[fakeReq]{}, 10)
})
t.Run("items_based", func(t *testing.T) {
queueUsage(t, &itemsSizer[fakeReq]{}, 10)
})
}

func benchmarkQueueUsage(b *testing.B, sizer Sizer[fakeReq], requestsCount int) {
func benchmarkQueueUsage(b *testing.B, sizer sizer[fakeReq], requestsCount int) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
queueUsage(b, sizer, requestsCount)
}
}

func queueUsage(tb testing.TB, sizer Sizer[fakeReq], requestsCount int) {
var wg sync.WaitGroup
wg.Add(requestsCount)
q := NewBoundedMemoryQueue[fakeReq](MemoryQueueSettings[fakeReq]{Sizer: sizer, Capacity: int64(10 * requestsCount)})
consumers := NewQueueConsumers(q, 1, func(context.Context, fakeReq) error {
wg.Done()
func queueUsage(t testing.TB, sizer sizer[fakeReq], requestsCount int) {
q := newBoundedMemoryQueue[fakeReq](memoryQueueSettings[fakeReq]{sizer: sizer, capacity: int64(10 * requestsCount)})
consumed := &atomic.Int64{}
require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
ac := newAsyncConsumer(q, 1, func(ctx context.Context, req fakeReq) error {
consumed.Add(1)
return nil
})
require.NoError(tb, q.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(tb, consumers.Start(context.Background(), componenttest.NewNopHost()))
for j := 0; j < requestsCount; j++ {
require.NoError(tb, q.Offer(context.Background(), fakeReq{10}))
require.NoError(t, q.Offer(context.Background(), fakeReq{10}))
}
assert.NoError(tb, q.Shutdown(context.Background()))
assert.NoError(tb, consumers.Shutdown(context.Background()))
wg.Wait()
assert.NoError(t, q.Shutdown(context.Background()))
assert.NoError(t, ac.Shutdown(context.Background()))
assert.Equal(t, int64(requestsCount), consumed.Load())
}

func TestZeroSizeNoConsumers(t *testing.T) {
q := NewBoundedMemoryQueue[string](MemoryQueueSettings[string]{Sizer: &RequestSizer[string]{}, Capacity: 0})
q := newBoundedMemoryQueue[string](memoryQueueSettings[string]{sizer: &requestSizer[string]{}, capacity: 0})

err := q.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
Expand All @@ -166,3 +165,43 @@ type fakeReq struct {
func (r fakeReq) ItemsCount() int {
return r.itemsCount
}

func consume[T any](q Queue[T], consumeFunc func(context.Context, T) error) bool {
index, ctx, req, ok := q.Read(context.Background())
if !ok {
return false
}
consumeErr := consumeFunc(ctx, req)
q.OnProcessingFinished(index, consumeErr)
return true
}

type asyncConsumer struct {
stopWG sync.WaitGroup
}

func newAsyncConsumer[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) error) *asyncConsumer {
ac := &asyncConsumer{}

ac.stopWG.Add(numConsumers)
for i := 0; i < numConsumers; i++ {
go func() {
defer ac.stopWG.Done()
for {
index, ctx, req, ok := q.Read(context.Background())
if !ok {
return
}
consumeErr := consumeFunc(ctx, req)
q.OnProcessingFinished(index, consumeErr)
}
}()
}
return ac
}

// Shutdown ensures that queue and all consumers are stopped.
func (qc *asyncConsumer) Shutdown(_ context.Context) error {
qc.stopWG.Wait()
return nil
}
Loading

0 comments on commit 6765569

Please sign in to comment.