Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(batchprocessor): Multi/single-shard consistent start method and field names #11314

Merged
merged 2 commits into from
Oct 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 33 additions & 14 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,19 @@ type batchProcessor struct {

telemetry *batchProcessorTelemetry

// batcher will be either *singletonBatcher or *multiBatcher
// batcher will be either *singletonBatcher or *multiBatcher
batcher batcher
}

// batcher is describes a *singletonBatcher or *multiBatcher.
type batcher interface {
// start initializes background resources used by this batcher.
start(ctx context.Context) error

// consume incorporates a new item of data into the pending batch.
consume(ctx context.Context, data any) error

// currentMetadataCardinality returns the number of shards.
currentMetadataCardinality() int
}

Expand Down Expand Up @@ -135,12 +142,13 @@ func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() bat
metadataLimit: int(cfg.MetadataCardinalityLimit),
}
if len(bp.metadataKeys) == 0 {
s := bp.newShard(nil)
s.start()
bp.batcher = &singleShardBatcher{batcher: s}
bp.batcher = &singleShardBatcher{
processor: bp,
single: nil, // created in start
}
} else {
bp.batcher = &multiShardBatcher{
batchProcessor: bp,
processor: bp,
}
}

Expand Down Expand Up @@ -172,8 +180,8 @@ func (bp *batchProcessor) Capabilities() consumer.Capabilities {
}

// Start is invoked during service startup.
func (bp *batchProcessor) Start(context.Context, component.Host) error {
return nil
func (bp *batchProcessor) Start(ctx context.Context, _ component.Host) error {
return bp.batcher.start(ctx)
}

// Shutdown is invoked during service shutdown.
Expand Down Expand Up @@ -281,11 +289,18 @@ func (b *shard) sendItems(trigger trigger) {
// singleShardBatcher is used when metadataKeys is empty, to avoid the
// additional lock and map operations used in multiBatcher.
type singleShardBatcher struct {
batcher *shard
processor *batchProcessor
single *shard
}

func (sb *singleShardBatcher) start(context.Context) error {
sb.single = sb.processor.newShard(nil)
sb.single.start()
return nil
}

func (sb *singleShardBatcher) consume(_ context.Context, data any) error {
sb.batcher.newItem <- data
sb.single.newItem <- data
return nil
}

Expand All @@ -295,22 +310,26 @@ func (sb *singleShardBatcher) currentMetadataCardinality() int {

// multiBatcher is used when metadataKeys is not empty.
type multiShardBatcher struct {
*batchProcessor
batchers sync.Map
processor *batchProcessor
batchers sync.Map

// Guards the size and the storing logic to ensure no more than limit items are stored.
// If we are willing to allow "some" extra items than the limit this can be removed and size can be made atomic.
lock sync.Mutex
size int
}

func (mb *multiShardBatcher) start(context.Context) error {
return nil
}

func (mb *multiShardBatcher) consume(ctx context.Context, data any) error {
// Get each metadata key value, form the corresponding
// attribute set for use as a map lookup key.
info := client.FromContext(ctx)
md := map[string][]string{}
var attrs []attribute.KeyValue
for _, k := range mb.metadataKeys {
for _, k := range mb.processor.metadataKeys {
// Lookup the value in the incoming metadata, copy it
// into the outgoing metadata, and create a unique
// value for the attributeSet.
Expand All @@ -327,15 +346,15 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error {
b, ok := mb.batchers.Load(aset)
if !ok {
mb.lock.Lock()
if mb.metadataLimit != 0 && mb.size >= mb.metadataLimit {
if mb.processor.metadataLimit != 0 && mb.size >= mb.processor.metadataLimit {
mb.lock.Unlock()
return errTooManyBatchers
}

// aset.ToSlice() returns the sorted, deduplicated,
// and name-downcased list of attributes.
var loaded bool
b, loaded = mb.batchers.LoadOrStore(aset, mb.newShard(md))
b, loaded = mb.batchers.LoadOrStore(aset, mb.processor.newShard(md))
if !loaded {
// Start the goroutine only if we added the object to the map, otherwise is already started.
b.(*shard).start()
Expand Down
Loading