Skip to content

Commit

Permalink
feat: Add option to override shipping method for writing items
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Jun 28, 2024
1 parent 5582734 commit c78dccc
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
15 changes: 14 additions & 1 deletion pkg/output/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package http
import (
"context"
"errors"
"strings"

"github.com/ethpandaops/xatu/pkg/processor"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
Expand Down Expand Up @@ -106,5 +107,17 @@ func (h *HTTP) HandleNewDecoratedEvents(ctx context.Context, events []*xatu.Deco
}
}

return h.proc.Write(ctx, filtered)
if len(filtered) == 0 {
return nil
}

shippingMethod := processor.ShippingMethodSync

if strings.Contains(filtered[0].Meta.Client.Name, "sentry") || strings.Contains(filtered[0].Meta.Client.Name, "cl-mimicry") {
shippingMethod = processor.ShippingMethodAsync
}

return h.proc.Write(ctx, filtered, processor.WriteOptions{
OverrideShippingMethod: &shippingMethod,
})
}
15 changes: 12 additions & 3 deletions pkg/processor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,19 +205,28 @@ func (bvp *BatchItemProcessor[T]) Start(ctx context.Context) {
}()
}

type WriteOptions struct {
OverrideShippingMethod *ShippingMethod
}

// Write writes items to the queue. If the Processor is configured to use
// the sync shipping method, the items will be written to the queue and this
// function will return when all items have been processed. If the Processor is
// configured to use the async shipping method, the items will be written to
// the queue and this function will return immediately.
func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error {
func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T, opts ...WriteOptions) error {
_, span := observability.Tracer().Start(ctx, "BatchItemProcessor.Write")
defer span.End()

if bvp.e == nil {
return errors.New("exporter is nil")
}

shippingMethod := bvp.o.ShippingMethod
if len(opts) > 0 {
shippingMethod = *opts[0].OverrideShippingMethod
}

// Break our items up in to chunks that can be processed at
// one time by our workers. This is to prevent wasting
// resources sending items if we've failed an earlier
Expand All @@ -236,7 +245,7 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error {
item: i,
}

if bvp.o.ShippingMethod == ShippingMethodSync {
if shippingMethod == ShippingMethodSync {
item.errCh = make(chan error, 1)
item.completedCh = make(chan struct{}, 1)
}
Expand All @@ -250,7 +259,7 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error {
}
}

if bvp.o.ShippingMethod == ShippingMethodSync {
if shippingMethod == ShippingMethodSync {
if err := bvp.waitForBatchCompletion(ctx, prepared); err != nil {
return err
}
Expand Down

0 comments on commit c78dccc

Please sign in to comment.