Skip to content

Commit

Permalink
Rely on gRPC to batch and loadbalance between connections instead of …
Browse files Browse the repository at this point in the history
…custom logic

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Jun 26, 2020
1 parent c448b1c commit d8f4143
Showing 1 changed file with 17 additions and 89 deletions.
106 changes: 17 additions & 89 deletions exporter/otlpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
Expand All @@ -32,7 +31,8 @@ import (
)

type otlpExporter struct {
exporters chan *exporterImp
exporter *exporterImp
stopOnce sync.Once
}

type exporterErrorCode int
Expand All @@ -48,8 +48,6 @@ func (e *exporterError) Error() string {
}

const (
defaultNumWorkers int = 8

_ exporterErrorCode = iota // skip 0
// errEndpointRequired indicates that this exporter was not provided with an endpoint in its config.
errEndpointRequired
Expand Down Expand Up @@ -133,77 +131,32 @@ func createOTLPExporter(config configmodels.Exporter) (*otlpExporter, error) {
}
}

numWorkers := defaultNumWorkers
if oCfg.NumWorkers > 0 {
numWorkers = oCfg.NumWorkers
exporter, serr := newExporter(oCfg)
if serr != nil {
return nil, fmt.Errorf("cannot configure OTLP exporter: %v", serr)
}

exportersChan := make(chan *exporterImp, numWorkers)
for exporterIndex := 0; exporterIndex < numWorkers; exporterIndex++ {
// TODO: newExporter blocks for connection. Now that we have ability
// to report errors asynchronously using Host.ReportFatalError we can move this
// code to Start() and do it in background to avoid blocking Collector startup
// as we do now.
exporter, serr := newExporter(oCfg)
if serr != nil {
return nil, fmt.Errorf("cannot configure OTLP exporter: %v", serr)
}
exportersChan <- exporter
}
oce := &otlpExporter{exporters: exportersChan}
oce := &otlpExporter{exporter: exporter}
return oce, nil
}

func (oce *otlpExporter) Shutdown(context.Context) error {
// Stop all exporters. Will wait until all are stopped.
wg := &sync.WaitGroup{}
var errors []error
var errorsMu sync.Mutex
visitedCnt := 0
for currExporter := range oce.exporters {
wg.Add(1)
go func(exporter *exporterImp) {
defer wg.Done()
err := exporter.stop()
if err != nil {
errorsMu.Lock()
errors = append(errors, err)
errorsMu.Unlock()
}
}(currExporter)
visitedCnt++
if visitedCnt == cap(oce.exporters) {
// Visited and concurrently executed stop() on all exporters.
break
}
}

// Wait for all stop() calls to finish.
wg.Wait()
close(oce.exporters)

return componenterror.CombineErrors(errors)
err := error(&exporterError{
code: errAlreadyStopped,
msg: "OpenTelemetry exporter was already stopped.",
})
oce.stopOnce.Do(func() {
err = oce.exporter.stop()
})
return err
}

func (oce *otlpExporter) pushTraceData(ctx context.Context, td pdata.Traces) (int, error) {
// Get first available exporter.
exporter, ok := <-oce.exporters
if !ok {
err := &exporterError{
code: errAlreadyStopped,
msg: "OpenTelemetry exporter was already stopped.",
}
return td.SpanCount(), err
}

// Perform the request.
request := &otlptrace.ExportTraceServiceRequest{
ResourceSpans: pdata.TracesToOtlp(td),
}
err := exporter.exportTrace(ctx, request)
err := oce.exporter.exportTrace(ctx, request)

// Return the exporter to the pool.
oce.exporters <- exporter
if err != nil {
return td.SpanCount(), err
}
Expand All @@ -212,48 +165,23 @@ func (oce *otlpExporter) pushTraceData(ctx context.Context, td pdata.Traces) (in

func (oce *otlpExporter) pushMetricsData(ctx context.Context, md pdata.Metrics) (int, error) {
imd := pdatautil.MetricsToInternalMetrics(md)
// Get first available exporter.
exporter, ok := <-oce.exporters
if !ok {
err := &exporterError{
code: errAlreadyStopped,
msg: "OpenTelemetry exporter was already stopped.",
}
return imd.MetricCount(), err
}

// Perform the request.
request := &otlpmetrics.ExportMetricsServiceRequest{
ResourceMetrics: data.MetricDataToOtlp(imd),
}
err := exporter.exportMetrics(ctx, request)
err := oce.exporter.exportMetrics(ctx, request)

// Return the exporter to the pool.
oce.exporters <- exporter
if err != nil {
return imd.MetricCount(), err
}
return 0, nil
}

func (oce *otlpExporter) pushLogData(ctx context.Context, logs data.Logs) (int, error) {
// Get first available exporter.
exporter, ok := <-oce.exporters
if !ok {
err := &exporterError{
code: errAlreadyStopped,
msg: "OpenTelemetry exporter was already stopped.",
}
return logs.LogRecordCount(), err
}

request := &otlplogs.ExportLogServiceRequest{
ResourceLogs: data.LogsToProto(logs),
}
err := exporter.exportLogs(ctx, request)
err := oce.exporter.exportLogs(ctx, request)

// Return the exporter to the pool.
oce.exporters <- exporter
if err != nil {
return logs.LogRecordCount(), err
}
Expand Down

0 comments on commit d8f4143

Please sign in to comment.