Skip to content

Commit

Permalink
Convert ocexporter to the new interfaces
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Aug 18, 2020
1 parent ed65294 commit c016ec5
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 127 deletions.
5 changes: 5 additions & 0 deletions consumer/pdatautil/pdatautil.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ func MetricAndDataPointCount(md pdata.Metrics) (int, int) {
panic("Unsupported metrics type.")
}

func MetricPointCount(md pdata.Metrics) int {
_, points := MetricAndDataPointCount(md)
return points
}

// CloneMetricsDataOld copied from processors.cloneMetricsDataOld
func CloneMetricsDataOld(md consumerdata.MetricsData) consumerdata.MetricsData {
clone := consumerdata.MetricsData{
Expand Down
2 changes: 1 addition & 1 deletion exporter/opencensusexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestLoadConfig(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.NoError(t, err)

factory := &Factory{}
factory := NewFactory()
factories.Exporters[typeStr] = factory
cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)

Expand Down
30 changes: 13 additions & 17 deletions exporter/opencensusexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,27 @@ package opencensusexporter
import (
"context"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

const (
// The value of "type" key in configuration.
typeStr = "opencensus"
)

// Factory is the factory for OpenCensus exporter.
type Factory struct {
}

// Type gets the type of the Exporter config created by this factory.
func (f *Factory) Type() configmodels.Type {
return typeStr
// NewFactory creates a factory for OTLP exporter.
func NewFactory() component.ExporterFactory {
return exporterhelper.NewFactory(
typeStr,
createDefaultConfig,
exporterhelper.WithTraces(createTraceExporter),
exporterhelper.WithMetrics(createMetricsExporter))
}

// CreateDefaultConfig creates the default configuration for exporter.
func (f *Factory) CreateDefaultConfig() configmodels.Exporter {
func createDefaultConfig() configmodels.Exporter {
return &Config{
ExporterSettings: configmodels.ExporterSettings{
TypeVal: typeStr,
Expand All @@ -54,14 +52,12 @@ func (f *Factory) CreateDefaultConfig() configmodels.Exporter {
}
}

// CreateTraceExporter creates a trace exporter based on this config.
func (f *Factory) CreateTraceExporter(_ *zap.Logger, config configmodels.Exporter) (component.TraceExporterOld, error) {
func createTraceExporter(ctx context.Context, _ component.ExporterCreateParams, config configmodels.Exporter) (component.TraceExporter, error) {
oCfg := config.(*Config)
return newTraceExporter(context.Background(), oCfg)
return newTraceExporter(ctx, oCfg)
}

// CreateMetricsExporter creates a metrics exporter based on this config.
func (f *Factory) CreateMetricsExporter(_ *zap.Logger, config configmodels.Exporter) (component.MetricsExporterOld, error) {
func createMetricsExporter(ctx context.Context, _ component.ExporterCreateParams, config configmodels.Exporter) (component.MetricsExporter, error) {
oCfg := config.(*Config)
return newMetricsExporter(context.Background(), oCfg)
return newMetricsExporter(ctx, oCfg)
}
20 changes: 4 additions & 16 deletions exporter/opencensusexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,11 @@ import (
)

func TestCreateDefaultConfig(t *testing.T) {
factory := Factory{}
cfg := factory.CreateDefaultConfig()
cfg := createDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, configcheck.ValidateConfig(cfg))
}

func TestCreateMetricsExporter(t *testing.T) {
factory := Factory{}
cfg := factory.CreateDefaultConfig().(*Config)
cfg.GRPCClientSettings.Endpoint = testutil.GetAvailableLocalAddress(t)

oexp, err := factory.CreateMetricsExporter(zap.NewNop(), cfg)
require.Nil(t, err)
require.NotNil(t, oexp)
}

func TestCreateTraceExporter(t *testing.T) {
endpoint := testutil.GetAvailableLocalAddress(t)
tests := []struct {
Expand Down Expand Up @@ -170,11 +159,10 @@ func TestCreateTraceExporter(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := &Factory{}

tReceiver, tErr := factory.CreateTraceExporter(zap.NewNop(), &tt.config)
params := component.ExporterCreateParams{Logger: zap.NewNop()}
tReceiver, tErr := createTraceExporter(context.Background(), params, &tt.config)
checkErrorsAndShutdown(t, tReceiver, tErr, tt.mustFail)
mReceiver, mErr := factory.CreateMetricsExporter(zap.NewNop(), &tt.config)
mReceiver, mErr := createMetricsExporter(context.Background(), params, &tt.config)
checkErrorsAndShutdown(t, mReceiver, mErr, tt.mustFail)
})
}
Expand Down
103 changes: 60 additions & 43 deletions exporter/opencensusexporter/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ import (
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/translator/internaldata"
)

// See https://godoc.org/google.golang.org/grpc#ClientConn.NewStream
Expand Down Expand Up @@ -104,7 +107,7 @@ func (oce *ocExporter) shutdown(context.Context) error {
return oce.grpcClientConn.Close()
}

func newTraceExporter(ctx context.Context, cfg *Config) (component.TraceExporterOld, error) {
func newTraceExporter(ctx context.Context, cfg *Config) (component.TraceExporter, error) {
oce, err := newOcExporter(ctx, cfg)
if err != nil {
return nil, err
Expand All @@ -118,13 +121,13 @@ func newTraceExporter(ctx context.Context, cfg *Config) (component.TraceExporter
oce.tracesClients <- nil
}

return exporterhelper.NewTraceExporterOld(
return exporterhelper.NewTraceExporter(
cfg,
oce.pushTraceData,
exporterhelper.WithShutdown(oce.shutdown))
}

func newMetricsExporter(ctx context.Context, cfg *Config) (component.MetricsExporterOld, error) {
func newMetricsExporter(ctx context.Context, cfg *Config) (component.MetricsExporter, error) {
oce, err := newOcExporter(ctx, cfg)
if err != nil {
return nil, err
Expand All @@ -138,18 +141,18 @@ func newMetricsExporter(ctx context.Context, cfg *Config) (component.MetricsExpo
oce.metricsClients <- nil
}

return exporterhelper.NewMetricsExporterOld(
return exporterhelper.NewMetricsExporter(
cfg,
oce.pushMetricsData,
exporterhelper.WithShutdown(oce.shutdown))
}

func (oce *ocExporter) pushTraceData(_ context.Context, td consumerdata.TraceData) (int, error) {
func (oce *ocExporter) pushTraceData(_ context.Context, td pdata.Traces) (int, error) {
// Get first available trace Client.
tClient, ok := <-oce.tracesClients
if !ok {
err := errors.New("failed to push traces, OpenCensus exporter was already stopped")
return len(td.Spans), err
return td.SpanCount(), err
}

// In any of the metricsClients channel we keep always NumWorkers object (sometimes nil),
Expand All @@ -162,37 +165,44 @@ func (oce *ocExporter) pushTraceData(_ context.Context, td consumerdata.TraceDat
if err != nil {
// Cannot create an RPC, put back nil to keep the number of workers constant.
oce.tracesClients <- nil
return len(td.Spans), err
return td.SpanCount(), err
}
}

// This is a hack because OC protocol expects a Node for the initial message.
node := td.Node
if node == nil {
node = &commonpb.Node{}
}
req := &agenttracepb.ExportTraceServiceRequest{
Spans: td.Spans,
Resource: td.Resource,
Node: node,
}
if err := tClient.tsec.Send(req); err != nil {
// Error received, cancel the context used to create the RPC to free all resources,
// put back nil to keep the number of workers constant.
tClient.cancel()
oce.tracesClients <- nil
return len(td.Spans), err
octds := internaldata.TraceDataToOC(td)
for _, octd := range octds {
// This is a hack because OC protocol expects a Node for the initial message.
node := octd.Node
if node == nil {
node = &commonpb.Node{}
}
resource := octd.Resource
if resource == nil {
resource = &resourcepb.Resource{}
}
req := &agenttracepb.ExportTraceServiceRequest{
Spans: octd.Spans,
Resource: resource,
Node: node,
}
if err := tClient.tsec.Send(req); err != nil {
// Error received, cancel the context used to create the RPC to free all resources,
// put back nil to keep the number of workers constant.
tClient.cancel()
oce.tracesClients <- nil
return td.SpanCount(), err
}
}
oce.tracesClients <- tClient
return 0, nil
}

func (oce *ocExporter) pushMetricsData(_ context.Context, md consumerdata.MetricsData) (int, error) {
func (oce *ocExporter) pushMetricsData(_ context.Context, md pdata.Metrics) (int, error) {
// Get first available mClient.
mClient, ok := <-oce.metricsClients
if !ok {
err := errors.New("failed to push metrics, OpenCensus exporter was already stopped")
return exporterhelper.NumTimeSeries(md), err
return pdatautil.MetricPointCount(md), err
}

// In any of the metricsClients channel we keep always NumWorkers object (sometimes nil),
Expand All @@ -205,26 +215,33 @@ func (oce *ocExporter) pushMetricsData(_ context.Context, md consumerdata.Metric
if err != nil {
// Cannot create an RPC, put back nil to keep the number of workers constant.
oce.metricsClients <- nil
return exporterhelper.NumTimeSeries(md), err
return pdatautil.MetricPointCount(md), err
}
}

// This is a hack because OC protocol expects a Node for the initial message.
node := md.Node
if node == nil {
node = &commonpb.Node{}
}
req := &agentmetricspb.ExportMetricsServiceRequest{
Metrics: md.Metrics,
Resource: md.Resource,
Node: node,
}
if err := mClient.msec.Send(req); err != nil {
// Error received, cancel the context used to create the RPC to free all resources,
// put back nil to keep the number of workers constant.
mClient.cancel()
oce.metricsClients <- nil
return exporterhelper.NumTimeSeries(md), err
ocmds := pdatautil.MetricsToMetricsData(md)
for _, ocmd := range ocmds {
// This is a hack because OC protocol expects a Node for the initial message.
node := ocmd.Node
if node == nil {
node = &commonpb.Node{}
}
resource := ocmd.Resource
if resource == nil {
resource = &resourcepb.Resource{}
}
req := &agentmetricspb.ExportMetricsServiceRequest{
Metrics: ocmd.Metrics,
Resource: resource,
Node: node,
}
if err := mClient.msec.Send(req); err != nil {
// Error received, cancel the context used to create the RPC to free all resources,
// put back nil to keep the number of workers constant.
mClient.cancel()
oce.metricsClients <- nil
return pdatautil.MetricPointCount(md), err
}
}
oce.metricsClients <- mClient
return 0, nil
Expand Down
Loading

0 comments on commit c016ec5

Please sign in to comment.