Skip to content

Commit

Permalink
[exporter] [chore] Initialize batchSender and queueSender after confi…
Browse files Browse the repository at this point in the history
…guration - #2 (open-telemetry#11240)

This PR follows
open-telemetry#11041.

The previous PR changed the initialization of `batchSender` and
`queueSender` to AFTER configuration, because that enables us to access
`queueConfig` and `batcherConfig` in the same place.

I noticed since then that there is another API for queue configuration,
and this PR takes care of that other API

#### Link to tracking issue

open-telemetry#10368

#### Testing

Ran `opentelemetry-collector$ make` to make sure all tests still pass.
  • Loading branch information
sfc-gh-sili authored Sep 26, 2024
1 parent 1bfe6c3 commit dd35137
Showing 1 changed file with 31 additions and 26 deletions.
57 changes: 31 additions & 26 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ type BaseExporter struct {

ConsumerOptions []consumer.Option

QueueCfg exporterqueue.Config
QueueFactory exporterqueue.Factory[internal.Request]
BatcherCfg exporterbatcher.Config
BatcherOpts []BatcherOption
QueueCfg QueueConfig
ExporterQueueCfg exporterqueue.Config
QueueFactory exporterqueue.Factory[internal.Request]
BatcherCfg exporterbatcher.Config
BatcherOpts []BatcherOption
}

func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSenderFactory, options ...Option) (*BaseExporter, error) {
Expand Down Expand Up @@ -93,28 +94,40 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
return nil, err
}

if be.BatcherCfg.Enabled {
bs := NewBatchSender(be.BatcherCfg, be.Set, be.BatchMergeFunc, be.BatchMergeSplitfunc)
for _, opt := range be.BatcherOpts {
err = multierr.Append(err, opt(bs))
}
if bs.mergeFunc == nil || bs.mergeSplitFunc == nil {
err = multierr.Append(err, fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters"))
}
be.BatchSender = bs
if be.QueueCfg.Enabled {
q := be.QueueFactory(context.Background(), exporterqueue.Settings{
Signal: be.Signal,
ExporterSettings: be.Set,
}, exporterqueue.Config{
Enabled: be.QueueCfg.Enabled,
NumConsumers: be.QueueCfg.NumConsumers,
QueueSize: be.QueueCfg.QueueSize,
})
be.QueueSender = NewQueueSender(q, be.Set, be.QueueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep)
}

if be.QueueCfg.Enabled {
if be.ExporterQueueCfg.Enabled {
set := exporterqueue.Settings{
Signal: be.Signal,
ExporterSettings: be.Set,
}
be.QueueSender = NewQueueSender(be.QueueFactory(context.Background(), set, be.QueueCfg), be.Set, be.QueueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep)
be.QueueSender = NewQueueSender(be.QueueFactory(context.Background(), set, be.ExporterQueueCfg), be.Set, be.ExporterQueueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep)
for _, op := range options {
err = multierr.Append(err, op(be))
}
}

if be.BatcherCfg.Enabled {
bs := NewBatchSender(be.BatcherCfg, be.Set, be.BatchMergeFunc, be.BatchMergeSplitfunc)
for _, opt := range be.BatcherOpts {
err = multierr.Append(err, opt(bs))
}
if bs.mergeFunc == nil || bs.mergeSplitFunc == nil {
err = multierr.Append(err, fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters"))
}
be.BatchSender = bs
}

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -230,19 +243,11 @@ func WithQueue(config QueueConfig) Option {
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
qf := exporterqueue.NewPersistentQueueFactory[internal.Request](config.StorageID, exporterqueue.PersistentQueueSettings[internal.Request]{
o.QueueCfg = config
o.QueueFactory = exporterqueue.NewPersistentQueueFactory[internal.Request](config.StorageID, exporterqueue.PersistentQueueSettings[internal.Request]{
Marshaler: o.Marshaler,
Unmarshaler: o.Unmarshaler,
})
q := qf(context.Background(), exporterqueue.Settings{
Signal: o.Signal,
ExporterSettings: o.Set,
}, exporterqueue.Config{
Enabled: config.Enabled,
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
})
o.QueueSender = NewQueueSender(q, o.Set, config.NumConsumers, o.ExportFailureMessage, o.Obsrep)
return nil
}
}
Expand All @@ -260,7 +265,7 @@ func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Facto
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
o.QueueCfg = cfg
o.ExporterQueueCfg = cfg
o.QueueFactory = queueFactory
return nil
}
Expand Down

0 comments on commit dd35137

Please sign in to comment.