Skip to content

Commit

Permalink
feat: Add metrics tracking to batch processor
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Jun 27, 2024
1 parent 99a2ccb commit a833e32
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 21 deletions.
48 changes: 30 additions & 18 deletions pkg/processor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type BatchItemProcessorOptions struct {
// Workers is the number of workers to process batches.
// The default value of Workers is 5.
Workers int
// Metrics is the metrics instance to use.
Metrics *Metrics
}

func (o *BatchItemProcessorOptions) Validate() error {
Expand All @@ -104,8 +106,8 @@ type BatchItemProcessor[T any] struct {

log logrus.FieldLogger

queue chan traceableItem[T]
batchCh chan []traceableItem[T]
queue chan *TraceableItem[T]
batchCh chan []*TraceableItem[T]
name string

timer *time.Timer
Expand All @@ -118,12 +120,10 @@ type BatchItemProcessor[T any] struct {
metrics *Metrics
}

type traceableItem[T any] struct {
type TraceableItem[T any] struct {
item *T
errCh chan error
completedCh chan struct{}
//nolint:containedctx // we need to pass the context to the workers
ctx context.Context
}

// NewBatchItemProcessor creates a new batch item processor.
Expand Down Expand Up @@ -155,7 +155,10 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log
return nil, fmt.Errorf("invalid batch item processor options: %w: %s", err, name)
}

metrics := DefaultMetrics
metrics := o.Metrics
if metrics == nil {
metrics = DefaultMetrics
}

bvp := BatchItemProcessor[T]{
e: exporter,
Expand All @@ -164,8 +167,8 @@ func NewBatchItemProcessor[T any](exporter ItemExporter[T], name string, log log
name: name,
metrics: metrics,
timer: time.NewTimer(o.BatchTimeout),
queue: make(chan traceableItem[T], o.MaxQueueSize),
batchCh: make(chan []traceableItem[T], o.Workers),
queue: make(chan *TraceableItem[T], o.MaxQueueSize),
batchCh: make(chan []*TraceableItem[T], o.Workers),
stopCh: make(chan struct{}),
stopWorkersCh: make(chan struct{}),
}
Expand Down Expand Up @@ -224,12 +227,11 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error {
end = len(s)
}

prepared := []traceableItem[T]{}
prepared := []*TraceableItem[T]{}

for _, i := range s[start:end] {
item := traceableItem[T]{
item := &TraceableItem[T]{
item: i,
ctx: ctx,
}

if bvp.o.ShippingMethod == ShippingMethodSync {
Expand Down Expand Up @@ -257,7 +259,11 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error {
}

// exportWithTimeout exports items with a timeout.
func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBatch []traceableItem[T]) error {
func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBatch []*TraceableItem[T]) error {
if len(itemsBatch) == 0 {
return nil
}

_, span := observability.Tracer().Start(ctx, "BatchItemProcessor.exportWithTimeout")
defer span.End()

Expand Down Expand Up @@ -379,7 +385,13 @@ func WithWorkers(workers int) BatchItemProcessorOption {
}
}

func (bvp *BatchItemProcessor[T]) waitForBatchCompletion(ctx context.Context, items []traceableItem[T]) error {
func WithMetrics(metrics *Metrics) BatchItemProcessorOption {
return func(o *BatchItemProcessorOptions) {
o.Metrics = metrics
}
}

func (bvp *BatchItemProcessor[T]) waitForBatchCompletion(ctx context.Context, items []*TraceableItem[T]) error {
for _, item := range items {
select {
case err := <-item.errCh:
Expand All @@ -399,7 +411,7 @@ func (bvp *BatchItemProcessor[T]) waitForBatchCompletion(ctx context.Context, it
func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) {
log := bvp.log.WithField("module", "batch_builder")

var batch []traceableItem[T]
var batch []*TraceableItem[T]

for {
select {
Expand All @@ -413,19 +425,19 @@ func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) {
if len(batch) >= bvp.o.MaxExportBatchSize {
bvp.sendBatch(batch, "max_export_batch_size")

batch = []traceableItem[T]{}
batch = []*TraceableItem[T]{}
}
case <-bvp.timer.C:
if len(batch) > 0 {
bvp.sendBatch(batch, "timer")
batch = []traceableItem[T]{}
batch = []*TraceableItem[T]{}
} else {
bvp.timer.Reset(bvp.o.BatchTimeout)
}
}
}
}
func (bvp *BatchItemProcessor[T]) sendBatch(batch []traceableItem[T], reason string) {
func (bvp *BatchItemProcessor[T]) sendBatch(batch []*TraceableItem[T], reason string) {
log := bvp.log.WithField("reason", reason)
log.Tracef("Creating a batch of %d items", len(batch))

Expand Down Expand Up @@ -488,7 +500,7 @@ func recoverSendOnClosedChan() {
panic(x)
}

func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, item traceableItem[T]) error {
func (bvp *BatchItemProcessor[T]) enqueueOrDrop(ctx context.Context, item *TraceableItem[T]) error {
// This ensures the bvp.queue<- below does not panic as the
// processor shuts down.
defer recoverSendOnClosedChan()
Expand Down
59 changes: 56 additions & 3 deletions pkg/processor/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ func TestNewBatchItemProcessorWithNilExporter(t *testing.T) {
bsp, err := NewBatchItemProcessor[TestItem](nil, "processor", nullLogger())
require.NoError(t, err)

bsp.Start(context.Background())

err = bsp.Write(context.Background(), []*TestItem{{
name: "test",
}})
Expand Down Expand Up @@ -237,6 +239,8 @@ func TestBatchItemProcessorExportTimeout(t *testing.T) {
)
require.NoError(t, err)

bvp.Start(context.Background())

if err := bvp.Write(context.Background(), []*TestItem{{
name: "test",
}}); err != nil {
Expand All @@ -251,14 +255,23 @@ func TestBatchItemProcessorExportTimeout(t *testing.T) {
}

func createAndRegisterBatchSP[T TestItem](options []BatchItemProcessorOption, te *testBatchExporter[T]) (*BatchItemProcessor[T], error) {
return NewBatchItemProcessor[T](te, "processor", nullLogger(), options...)
bvp, err := NewBatchItemProcessor[T](te, "processor", nullLogger(), options...)
if err != nil {
return nil, err
}

bvp.Start(context.Background())

return bvp, nil
}

func TestBatchItemProcessorShutdown(t *testing.T) {
var bp testBatchExporter[TestItem]
bvp, err := NewBatchItemProcessor[TestItem](&bp, "processor", nullLogger())
require.NoError(t, err)

bvp.Start(context.Background())

err = bvp.Shutdown(context.Background())
if err != nil {
t.Error("Error shutting the BatchItemProcessor down\n")
Expand All @@ -281,6 +294,8 @@ func TestBatchItemProcessorDrainQueue(t *testing.T) {
bsp, err := NewBatchItemProcessor[TestItem](&be, "processor", log, WithMaxExportBatchSize(5), WithBatchTimeout(1*time.Second), WithWorkers(2), WithShippingMethod(ShippingMethodAsync))
require.NoError(t, err)

bsp.Start(context.Background())

itemsToExport := 5000

for i := 0; i < itemsToExport; i++ {
Expand All @@ -301,6 +316,8 @@ func TestBatchItemProcessorPostShutdown(t *testing.T) {
bsp, err := NewBatchItemProcessor[TestItem](&be, "processor", nullLogger(), WithMaxExportBatchSize(50), WithBatchTimeout(5*time.Millisecond))
require.NoError(t, err)

bsp.Start(context.Background())

for i := 0; i < 60; i++ {
if err := bsp.Write(context.Background(), []*TestItem{{
name: strconv.Itoa(i),
Expand Down Expand Up @@ -343,6 +360,8 @@ func TestMultipleWorkersConsumeConcurrently(t *testing.T) {
bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithBatchTimeout(5*time.Minute), WithWorkers(20))
require.NoError(t, err)

bsp.Start(context.Background())

itemsToExport := 100

for i := 0; i < itemsToExport; i++ {
Expand All @@ -363,6 +382,8 @@ func TestWorkersProcessBatches(t *testing.T) {
bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithWorkers(5))
require.NoError(t, err)

bsp.Start(context.Background())

itemsToExport := 50

for i := 0; i < itemsToExport; i++ {
Expand All @@ -387,6 +408,8 @@ func TestDrainQueueWithMultipleWorkers(t *testing.T) {
bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithWorkers(5))
require.NoError(t, err)

bsp.Start(context.Background())

itemsToExport := 100

for i := 0; i < itemsToExport; i++ {
Expand All @@ -410,6 +433,8 @@ func TestBatchItemProcessorTimerFunctionality(t *testing.T) {
bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(50), WithBatchTimeout(batchTimeout), WithWorkers(5))
require.NoError(t, err)

bsp.Start(context.Background())

// Add items less than the max batch size
itemsToExport := 25

Expand Down Expand Up @@ -453,6 +478,8 @@ func TestBatchItemProcessorTimeout(t *testing.T) {
t.Fatalf("failed to create batch processor: %v", err)
}

bsp.Start(context.Background())

if got, want := bsp.Write(ctx, []*TestItem{{}}), context.DeadlineExceeded; !errors.Is(got, want) {
t.Errorf("expected %q error, got %v", want, got)
}
Expand All @@ -472,6 +499,8 @@ func TestBatchItemProcessorCancellation(t *testing.T) {
t.Fatalf("failed to create batch processor: %v", err)
}

bsp.Start(context.Background())

if got, want := bsp.Write(ctx, []*TestItem{{}}), context.Canceled; !errors.Is(got, want) {
t.Errorf("expected %q error, got %v", want, got)
}
Expand Down Expand Up @@ -500,6 +529,8 @@ func TestBatchItemProcessorWithSyncErrorExporter(t *testing.T) {
t.Fatalf("failed to create batch processor: %v", err)
}

bsp.Start(context.Background())

err = bsp.Write(context.Background(), []*TestItem{{name: "test"}})
if err == nil {
t.Errorf("Expected write to fail")
Expand Down Expand Up @@ -528,6 +559,8 @@ func TestBatchItemProcessorSyncShipping(t *testing.T) {
)
require.NoError(t, err)

bsp.Start(context.Background())

items := make([]*TestItem, itemsToExport)
for i := 0; i < itemsToExport; i++ {
items[i] = &TestItem{name: strconv.Itoa(i)}
Expand Down Expand Up @@ -566,6 +599,8 @@ func TestBatchItemProcessorExportCancellationOnFailure(t *testing.T) {
bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(maxBatchSize), WithWorkers(workers), WithShippingMethod(ShippingMethodSync))
require.NoError(t, err)

bsp.Start(context.Background())

items := make([]*TestItem, itemsToExport)
for i := 0; i < itemsToExport; i++ {
items[i] = &TestItem{name: strconv.Itoa(i)}
Expand Down Expand Up @@ -627,6 +662,8 @@ func TestBatchItemProcessorExportWithTimeout(t *testing.T) {
bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithWorkers(5), WithExportTimeout(1*time.Second), WithShippingMethod(ShippingMethodSync))
require.NoError(t, err)

bsp.Start(context.Background())

itemsToExport := 10
items := make([]*TestItem, itemsToExport)

Expand All @@ -644,6 +681,8 @@ func TestBatchItemProcessorWithBatchTimeout(t *testing.T) {
bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithMaxExportBatchSize(10), WithWorkers(5), WithBatchTimeout(1*time.Second))
require.NoError(t, err)

bsp.Start(context.Background())

itemsToExport := 5
items := make([]*TestItem, itemsToExport)

Expand All @@ -662,11 +701,25 @@ func TestBatchItemProcessorWithBatchTimeout(t *testing.T) {

func TestBatchItemProcessorQueueSize(t *testing.T) {
te := indefiniteExporter[TestItem]{}

metrics := NewMetrics("test")
maxQueueSize := 5
bsp, err := NewBatchItemProcessor[TestItem](&te, "processor", nullLogger(), WithBatchTimeout(10*time.Minute), WithMaxQueueSize(maxQueueSize), WithMaxExportBatchSize(maxQueueSize), WithWorkers(1), WithShippingMethod(ShippingMethodAsync))
bsp, err := NewBatchItemProcessor[TestItem](
&te,
"processor",
nullLogger(),
WithBatchTimeout(10*time.Minute),
WithMaxQueueSize(maxQueueSize),
WithMaxExportBatchSize(maxQueueSize),
WithWorkers(1),
WithShippingMethod(ShippingMethodSync),
WithMetrics(metrics),
)
require.NoError(t, err)

itemsToExport := 10
bsp.Start(context.Background())

itemsToExport := 5
items := make([]*TestItem, itemsToExport)

for i := 0; i < itemsToExport; i++ {
Expand Down

0 comments on commit a833e32

Please sign in to comment.