From c10543fea32b02ad405f1f51aab719e1a72df669 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Mon, 17 Aug 2020 14:30:36 -0700 Subject: [PATCH] Convert ocexporter to the new interfaces Signed-off-by: Bogdan Drutu --- consumer/pdatautil/pdatautil.go | 5 + exporter/opencensusexporter/config_test.go | 2 +- exporter/opencensusexporter/factory.go | 30 +++-- exporter/opencensusexporter/factory_test.go | 20 +--- exporter/opencensusexporter/opencensus.go | 103 ++++++++++-------- .../opencensusexporter/opencensus_test.go | 76 ++++++------- .../ocmetrics/opencensus_test.go | 10 +- .../octrace/opencensus_test.go | 9 +- service/builder/exporters_builder_test.go | 2 +- service/defaultcomponents/defaults.go | 2 +- testbed/testbed/senders.go | 26 +++-- 11 files changed, 146 insertions(+), 139 deletions(-) diff --git a/consumer/pdatautil/pdatautil.go b/consumer/pdatautil/pdatautil.go index 2fe2d521c6c..20316e07725 100644 --- a/consumer/pdatautil/pdatautil.go +++ b/consumer/pdatautil/pdatautil.go @@ -116,6 +116,11 @@ func MetricAndDataPointCount(md pdata.Metrics) (int, int) { panic("Unsupported metrics type.") } +func MetricPointCount(md pdata.Metrics) int { + _, points := MetricAndDataPointCount(md) + return points +} + // CloneMetricsDataOld copied from processors.cloneMetricsDataOld func CloneMetricsDataOld(md consumerdata.MetricsData) consumerdata.MetricsData { clone := consumerdata.MetricsData{ diff --git a/exporter/opencensusexporter/config_test.go b/exporter/opencensusexporter/config_test.go index 3dd2ed8269e..a61bb851c58 100644 --- a/exporter/opencensusexporter/config_test.go +++ b/exporter/opencensusexporter/config_test.go @@ -32,7 +32,7 @@ func TestLoadConfig(t *testing.T) { factories, err := componenttest.ExampleComponents() assert.NoError(t, err) - factory := &Factory{} + factory := NewFactory() factories.Exporters[typeStr] = factory cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories) diff --git a/exporter/opencensusexporter/factory.go b/exporter/opencensusexporter/factory.go index 7d7d6ba1544..24edceaafff 100644 --- a/exporter/opencensusexporter/factory.go +++ b/exporter/opencensusexporter/factory.go @@ -17,11 +17,10 @@ package opencensusexporter import ( "context" - "go.uber.org/zap" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/exporter/exporterhelper" ) const ( @@ -29,17 +28,16 @@ const ( typeStr = "opencensus" ) -// Factory is the factory for OpenCensus exporter. -type Factory struct { -} - -// Type gets the type of the Exporter config created by this factory. -func (f *Factory) Type() configmodels.Type { - return typeStr +// NewFactory creates a factory for OTLP exporter. +func NewFactory() component.ExporterFactory { + return exporterhelper.NewFactory( + typeStr, + createDefaultConfig, + exporterhelper.WithTraces(createTraceExporter), + exporterhelper.WithMetrics(createMetricsExporter)) } -// CreateDefaultConfig creates the default configuration for exporter. -func (f *Factory) CreateDefaultConfig() configmodels.Exporter { +func createDefaultConfig() configmodels.Exporter { return &Config{ ExporterSettings: configmodels.ExporterSettings{ TypeVal: typeStr, @@ -54,14 +52,12 @@ func (f *Factory) CreateDefaultConfig() configmodels.Exporter { } } -// CreateTraceExporter creates a trace exporter based on this config. -func (f *Factory) CreateTraceExporter(_ *zap.Logger, config configmodels.Exporter) (component.TraceExporterOld, error) { +func createTraceExporter(ctx context.Context, _ component.ExporterCreateParams, config configmodels.Exporter) (component.TraceExporter, error) { oCfg := config.(*Config) - return newTraceExporter(context.Background(), oCfg) + return newTraceExporter(ctx, oCfg) } -// CreateMetricsExporter creates a metrics exporter based on this config. -func (f *Factory) CreateMetricsExporter(_ *zap.Logger, config configmodels.Exporter) (component.MetricsExporterOld, error) { +func createMetricsExporter(ctx context.Context, _ component.ExporterCreateParams, config configmodels.Exporter) (component.MetricsExporter, error) { oCfg := config.(*Config) - return newMetricsExporter(context.Background(), oCfg) + return newMetricsExporter(ctx, oCfg) } diff --git a/exporter/opencensusexporter/factory_test.go b/exporter/opencensusexporter/factory_test.go index e534267f057..11160dad046 100644 --- a/exporter/opencensusexporter/factory_test.go +++ b/exporter/opencensusexporter/factory_test.go @@ -31,22 +31,11 @@ import ( ) func TestCreateDefaultConfig(t *testing.T) { - factory := Factory{} - cfg := factory.CreateDefaultConfig() + cfg := createDefaultConfig() assert.NotNil(t, cfg, "failed to create default config") assert.NoError(t, configcheck.ValidateConfig(cfg)) } -func TestCreateMetricsExporter(t *testing.T) { - factory := Factory{} - cfg := factory.CreateDefaultConfig().(*Config) - cfg.GRPCClientSettings.Endpoint = testutil.GetAvailableLocalAddress(t) - - oexp, err := factory.CreateMetricsExporter(zap.NewNop(), cfg) - require.Nil(t, err) - require.NotNil(t, oexp) -} - func TestCreateTraceExporter(t *testing.T) { endpoint := testutil.GetAvailableLocalAddress(t) tests := []struct { @@ -170,11 +159,10 @@ func TestCreateTraceExporter(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - factory := &Factory{} - - tReceiver, tErr := factory.CreateTraceExporter(zap.NewNop(), &tt.config) + params := component.ExporterCreateParams{Logger: zap.NewNop()} + tReceiver, tErr := createTraceExporter(context.Background(), params, &tt.config) checkErrorsAndShutdown(t, tReceiver, tErr, tt.mustFail) - mReceiver, mErr := factory.CreateMetricsExporter(zap.NewNop(), &tt.config) + mReceiver, mErr := createMetricsExporter(context.Background(), params, &tt.config) checkErrorsAndShutdown(t, mReceiver, mErr, tt.mustFail) }) } diff --git a/exporter/opencensusexporter/opencensus.go b/exporter/opencensusexporter/opencensus.go index 52eb1aa899b..ddac8ccb3b6 100644 --- a/exporter/opencensusexporter/opencensus.go +++ b/exporter/opencensusexporter/opencensus.go @@ -22,12 +22,15 @@ import ( commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer/consumerdata" + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/consumer/pdatautil" "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/translator/internaldata" ) // See https://godoc.org/google.golang.org/grpc#ClientConn.NewStream @@ -104,7 +107,7 @@ func (oce *ocExporter) shutdown(context.Context) error { return oce.grpcClientConn.Close() } -func newTraceExporter(ctx context.Context, cfg *Config) (component.TraceExporterOld, error) { +func newTraceExporter(ctx context.Context, cfg *Config) (component.TraceExporter, error) { oce, err := newOcExporter(ctx, cfg) if err != nil { return nil, err @@ -118,13 +121,13 @@ func newTraceExporter(ctx context.Context, cfg *Config) (component.TraceExporter oce.tracesClients <- nil } - return exporterhelper.NewTraceExporterOld( + return exporterhelper.NewTraceExporter( cfg, oce.pushTraceData, exporterhelper.WithShutdown(oce.shutdown)) } -func newMetricsExporter(ctx context.Context, cfg *Config) (component.MetricsExporterOld, error) { +func newMetricsExporter(ctx context.Context, cfg *Config) (component.MetricsExporter, error) { oce, err := newOcExporter(ctx, cfg) if err != nil { return nil, err @@ -138,18 +141,18 @@ func newMetricsExporter(ctx context.Context, cfg *Config) (component.MetricsExpo oce.metricsClients <- nil } - return exporterhelper.NewMetricsExporterOld( + return exporterhelper.NewMetricsExporter( cfg, oce.pushMetricsData, exporterhelper.WithShutdown(oce.shutdown)) } -func (oce *ocExporter) pushTraceData(_ context.Context, td consumerdata.TraceData) (int, error) { +func (oce *ocExporter) pushTraceData(_ context.Context, td pdata.Traces) (int, error) { // Get first available trace Client. tClient, ok := <-oce.tracesClients if !ok { err := errors.New("failed to push traces, OpenCensus exporter was already stopped") - return len(td.Spans), err + return td.SpanCount(), err } // In any of the metricsClients channel we keep always NumWorkers object (sometimes nil), @@ -162,37 +165,44 @@ func (oce *ocExporter) pushTraceData(_ context.Context, td consumerdata.TraceDat if err != nil { // Cannot create an RPC, put back nil to keep the number of workers constant. oce.tracesClients <- nil - return len(td.Spans), err + return td.SpanCount(), err } } - // This is a hack because OC protocol expects a Node for the initial message. - node := td.Node - if node == nil { - node = &commonpb.Node{} - } - req := &agenttracepb.ExportTraceServiceRequest{ - Spans: td.Spans, - Resource: td.Resource, - Node: node, - } - if err := tClient.tsec.Send(req); err != nil { - // Error received, cancel the context used to create the RPC to free all resources, - // put back nil to keep the number of workers constant. - tClient.cancel() - oce.tracesClients <- nil - return len(td.Spans), err + octds := internaldata.TraceDataToOC(td) + for _, octd := range octds { + // This is a hack because OC protocol expects a Node for the initial message. + node := octd.Node + if node == nil { + node = &commonpb.Node{} + } + resource := octd.Resource + if resource == nil { + resource = &resourcepb.Resource{} + } + req := &agenttracepb.ExportTraceServiceRequest{ + Spans: octd.Spans, + Resource: resource, + Node: node, + } + if err := tClient.tsec.Send(req); err != nil { + // Error received, cancel the context used to create the RPC to free all resources, + // put back nil to keep the number of workers constant. + tClient.cancel() + oce.tracesClients <- nil + return td.SpanCount(), err + } } oce.tracesClients <- tClient return 0, nil } -func (oce *ocExporter) pushMetricsData(_ context.Context, md consumerdata.MetricsData) (int, error) { +func (oce *ocExporter) pushMetricsData(_ context.Context, md pdata.Metrics) (int, error) { // Get first available mClient. mClient, ok := <-oce.metricsClients if !ok { err := errors.New("failed to push metrics, OpenCensus exporter was already stopped") - return exporterhelper.NumTimeSeries(md), err + return pdatautil.MetricPointCount(md), err } // In any of the metricsClients channel we keep always NumWorkers object (sometimes nil), @@ -205,26 +215,33 @@ func (oce *ocExporter) pushMetricsData(_ context.Context, md consumerdata.Metric if err != nil { // Cannot create an RPC, put back nil to keep the number of workers constant. oce.metricsClients <- nil - return exporterhelper.NumTimeSeries(md), err + return pdatautil.MetricPointCount(md), err } } - // This is a hack because OC protocol expects a Node for the initial message. - node := md.Node - if node == nil { - node = &commonpb.Node{} - } - req := &agentmetricspb.ExportMetricsServiceRequest{ - Metrics: md.Metrics, - Resource: md.Resource, - Node: node, - } - if err := mClient.msec.Send(req); err != nil { - // Error received, cancel the context used to create the RPC to free all resources, - // put back nil to keep the number of workers constant. - mClient.cancel() - oce.metricsClients <- nil - return exporterhelper.NumTimeSeries(md), err + ocmds := pdatautil.MetricsToMetricsData(md) + for _, ocmd := range ocmds { + // This is a hack because OC protocol expects a Node for the initial message. + node := ocmd.Node + if node == nil { + node = &commonpb.Node{} + } + resource := ocmd.Resource + if resource == nil { + resource = &resourcepb.Resource{} + } + req := &agentmetricspb.ExportMetricsServiceRequest{ + Metrics: ocmd.Metrics, + Resource: resource, + Node: node, + } + if err := mClient.msec.Send(req); err != nil { + // Error received, cancel the context used to create the RPC to free all resources, + // put back nil to keep the number of workers constant. + mClient.cancel() + oce.metricsClients <- nil + return pdatautil.MetricPointCount(md), err + } } oce.metricsClients <- mClient return 0, nil diff --git a/exporter/opencensusexporter/opencensus_test.go b/exporter/opencensusexporter/opencensus_test.go index 232e39671e2..6e34fe69345 100644 --- a/exporter/opencensusexporter/opencensus_test.go +++ b/exporter/opencensusexporter/opencensus_test.go @@ -26,12 +26,12 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdatautil" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/internal/data/testdata" "go.opentelemetry.io/collector/receiver/opencensusreceiver" "go.opentelemetry.io/collector/testutil" - "go.opentelemetry.io/collector/translator/internaldata" ) func TestSendTraces(t *testing.T) { @@ -48,7 +48,7 @@ func TestSendTraces(t *testing.T) { assert.NoError(t, recv.Shutdown(context.Background())) }) - factory := &Factory{} + factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ Endpoint: endpoint, @@ -57,7 +57,7 @@ func TestSendTraces(t *testing.T) { }, } cfg.NumWorkers = 1 - exp, err := factory.CreateTraceExporter(zap.NewNop(), cfg) + exp, err := factory.CreateTraceExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg) require.NoError(t, err) require.NotNil(t, exp) host := componenttest.NewNopHost() @@ -67,9 +67,7 @@ func TestSendTraces(t *testing.T) { }) td := testdata.GenerateTraceDataOneSpan() - octd := internaldata.TraceDataToOC(testdata.GenerateTraceDataOneSpan()) - require.Len(t, octd, 1) - assert.NoError(t, exp.ConsumeTraceData(context.Background(), octd[0])) + assert.NoError(t, exp.ConsumeTraces(context.Background(), td)) testutil.WaitFor(t, func() bool { return len(sink.AllTraces()) == 1 }) @@ -77,19 +75,22 @@ func TestSendTraces(t *testing.T) { require.Len(t, traces, 1) assert.Equal(t, td, traces[0]) + sink.Reset() // Sending data no Node. - octd[0].Node = nil - assert.NoError(t, exp.ConsumeTraceData(context.Background(), octd[0])) + pdata.NewResource().CopyTo(td.ResourceSpans().At(0).Resource()) + assert.NoError(t, exp.ConsumeTraces(context.Background(), td)) testutil.WaitFor(t, func() bool { - return len(sink.AllTraces()) == 2 + return len(sink.AllTraces()) == 1 }) traces = sink.AllTraces() - require.Len(t, traces, 2) - assert.Equal(t, td, traces[1]) + require.Len(t, traces, 1) + // The conversion will initialize the Resource + td.ResourceSpans().At(0).Resource().InitEmpty() + assert.Equal(t, td, traces[0]) } func TestSendTraces_NoBackend(t *testing.T) { - factory := &Factory{} + factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ Endpoint: "localhost:56569", @@ -97,7 +98,7 @@ func TestSendTraces_NoBackend(t *testing.T) { Insecure: true, }, } - exp, err := factory.CreateTraceExporter(zap.NewNop(), cfg) + exp, err := factory.CreateTraceExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg) require.NoError(t, err) require.NotNil(t, exp) host := componenttest.NewNopHost() @@ -106,14 +107,14 @@ func TestSendTraces_NoBackend(t *testing.T) { assert.NoError(t, exp.Shutdown(context.Background())) }) - td := internaldata.TraceDataToOC(testdata.GenerateTraceDataOneSpan()) + td := testdata.GenerateTraceDataOneSpan() for i := 0; i < 10000; i++ { - assert.Error(t, exp.ConsumeTraceData(context.Background(), td[0])) + assert.Error(t, exp.ConsumeTraces(context.Background(), td)) } } func TestSendTraces_AfterStop(t *testing.T) { - factory := &Factory{} + factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ Endpoint: "localhost:56569", @@ -121,15 +122,15 @@ func TestSendTraces_AfterStop(t *testing.T) { Insecure: true, }, } - exp, err := factory.CreateTraceExporter(zap.NewNop(), cfg) + exp, err := factory.CreateTraceExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg) require.NoError(t, err) require.NotNil(t, exp) host := componenttest.NewNopHost() require.NoError(t, exp.Start(context.Background(), host)) assert.NoError(t, exp.Shutdown(context.Background())) - td := internaldata.TraceDataToOC(testdata.GenerateTraceDataOneSpan()) - assert.Error(t, exp.ConsumeTraceData(context.Background(), td[0])) + td := testdata.GenerateTraceDataOneSpan() + assert.Error(t, exp.ConsumeTraces(context.Background(), td)) } func TestSendMetrics(t *testing.T) { @@ -146,7 +147,7 @@ func TestSendMetrics(t *testing.T) { assert.NoError(t, recv.Shutdown(context.Background())) }) - factory := &Factory{} + factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ Endpoint: endpoint, @@ -155,7 +156,7 @@ func TestSendMetrics(t *testing.T) { }, } cfg.NumWorkers = 1 - exp, err := factory.CreateMetricsExporter(zap.NewNop(), cfg) + exp, err := factory.CreateMetricsExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg) require.NoError(t, err) require.NotNil(t, exp) host := componenttest.NewNopHost() @@ -165,9 +166,7 @@ func TestSendMetrics(t *testing.T) { }) md := testdata.GenerateMetricDataOneMetric() - ocmd := internaldata.MetricDataToOC(testdata.GenerateMetricDataOneMetric()) - require.Len(t, ocmd, 1) - assert.NoError(t, exp.ConsumeMetricsData(context.Background(), ocmd[0])) + assert.NoError(t, exp.ConsumeMetrics(context.Background(), pdatautil.MetricsFromInternalMetrics(md))) testutil.WaitFor(t, func() bool { return len(sink.AllMetrics()) == 1 }) @@ -176,18 +175,21 @@ func TestSendMetrics(t *testing.T) { assert.Equal(t, md, pdatautil.MetricsToInternalMetrics(metrics[0])) // Sending data no node. - ocmd[0].Node = nil - assert.NoError(t, exp.ConsumeMetricsData(context.Background(), ocmd[0])) + sink.Reset() + pdata.NewResource().CopyTo(md.ResourceMetrics().At(0).Resource()) + assert.NoError(t, exp.ConsumeMetrics(context.Background(), pdatautil.MetricsFromInternalMetrics(md))) testutil.WaitFor(t, func() bool { - return len(sink.AllMetrics()) == 2 + return len(sink.AllMetrics()) == 1 }) metrics = sink.AllMetrics() - require.Len(t, metrics, 2) - assert.Equal(t, md, pdatautil.MetricsToInternalMetrics(metrics[1])) + require.Len(t, metrics, 1) + // The conversion will initialize the Resource + md.ResourceMetrics().At(0).Resource().InitEmpty() + assert.Equal(t, md, pdatautil.MetricsToInternalMetrics(metrics[0])) } func TestSendMetrics_NoBackend(t *testing.T) { - factory := &Factory{} + factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ Endpoint: "localhost:56569", @@ -195,7 +197,7 @@ func TestSendMetrics_NoBackend(t *testing.T) { Insecure: true, }, } - exp, err := factory.CreateMetricsExporter(zap.NewNop(), cfg) + exp, err := factory.CreateMetricsExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg) require.NoError(t, err) require.NotNil(t, exp) host := componenttest.NewNopHost() @@ -204,14 +206,14 @@ func TestSendMetrics_NoBackend(t *testing.T) { assert.NoError(t, exp.Shutdown(context.Background())) }) - md := internaldata.MetricDataToOC(testdata.GenerateMetricDataOneMetric()) + md := pdatautil.MetricsFromInternalMetrics(testdata.GenerateMetricDataOneMetric()) for i := 0; i < 10000; i++ { - assert.Error(t, exp.ConsumeMetricsData(context.Background(), md[0])) + assert.Error(t, exp.ConsumeMetrics(context.Background(), md)) } } func TestSendMetrics_AfterStop(t *testing.T) { - factory := &Factory{} + factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) cfg.GRPCClientSettings = configgrpc.GRPCClientSettings{ Endpoint: "localhost:56569", @@ -219,13 +221,13 @@ func TestSendMetrics_AfterStop(t *testing.T) { Insecure: true, }, } - exp, err := factory.CreateMetricsExporter(zap.NewNop(), cfg) + exp, err := factory.CreateMetricsExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg) require.NoError(t, err) require.NotNil(t, exp) host := componenttest.NewNopHost() require.NoError(t, exp.Start(context.Background(), host)) assert.NoError(t, exp.Shutdown(context.Background())) - md := internaldata.MetricDataToOC(testdata.GenerateMetricDataOneMetric()) - assert.Error(t, exp.ConsumeMetricsData(context.Background(), md[0])) + md := pdatautil.MetricsFromInternalMetrics(testdata.GenerateMetricDataOneMetric()) + assert.Error(t, exp.ConsumeMetrics(context.Background(), md)) } diff --git a/receiver/opencensusreceiver/ocmetrics/opencensus_test.go b/receiver/opencensusreceiver/ocmetrics/opencensus_test.go index 1a51065f2cc..74dbb91510e 100644 --- a/receiver/opencensusreceiver/ocmetrics/opencensus_test.go +++ b/receiver/opencensusreceiver/ocmetrics/opencensus_test.go @@ -35,6 +35,7 @@ import ( "google.golang.org/grpc" "google.golang.org/protobuf/proto" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdatautil" @@ -44,7 +45,6 @@ import ( "go.opentelemetry.io/collector/internal/data/testdata" "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/testutil" - "go.opentelemetry.io/collector/translator/internaldata" ) func TestReceiver_endToEnd(t *testing.T) { @@ -54,12 +54,12 @@ func TestReceiver_endToEnd(t *testing.T) { defer doneFn() address := fmt.Sprintf("localhost:%d", port) - expFactory := &opencensusexporter.Factory{} + expFactory := opencensusexporter.NewFactory() expCfg := expFactory.CreateDefaultConfig().(*opencensusexporter.Config) expCfg.GRPCClientSettings.TLSSetting.Insecure = true expCfg.Endpoint = address expCfg.WaitForReady = true - oce, err := expFactory.CreateMetricsExporter(zap.NewNop(), expCfg) + oce, err := expFactory.CreateMetricsExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, expCfg) require.NoError(t, err) err = oce.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) @@ -69,9 +69,7 @@ func TestReceiver_endToEnd(t *testing.T) { }() md := testdata.GenerateMetricDataOneMetric() - ocmd := internaldata.MetricDataToOC(md) - require.Len(t, ocmd, 1) - assert.NoError(t, oce.ConsumeMetricsData(context.Background(), ocmd[0])) + assert.NoError(t, oce.ConsumeMetrics(context.Background(), pdatautil.MetricsFromInternalMetrics(md))) testutil.WaitFor(t, func() bool { return len(metricSink.AllMetrics()) != 0 diff --git a/receiver/opencensusreceiver/octrace/opencensus_test.go b/receiver/opencensusreceiver/octrace/opencensus_test.go index d9c72271616..888b1f26978 100644 --- a/receiver/opencensusreceiver/octrace/opencensus_test.go +++ b/receiver/opencensusreceiver/octrace/opencensus_test.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc" "google.golang.org/protobuf/proto" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter/exportertest" @@ -51,12 +52,12 @@ func TestReceiver_endToEnd(t *testing.T) { defer doneFn() address := fmt.Sprintf("localhost:%d", port) - expFactory := &opencensusexporter.Factory{} + expFactory := opencensusexporter.NewFactory() expCfg := expFactory.CreateDefaultConfig().(*opencensusexporter.Config) expCfg.GRPCClientSettings.TLSSetting.Insecure = true expCfg.Endpoint = address expCfg.WaitForReady = true - oce, err := expFactory.CreateTraceExporter(zap.NewNop(), expCfg) + oce, err := expFactory.CreateTraceExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, expCfg) require.NoError(t, err) err = oce.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) @@ -66,9 +67,7 @@ func TestReceiver_endToEnd(t *testing.T) { }() td := testdata.GenerateTraceDataOneSpan() - octd := internaldata.TraceDataToOC(td) - require.Len(t, octd, 1) - assert.NoError(t, oce.ConsumeTraceData(context.Background(), octd[0])) + assert.NoError(t, oce.ConsumeTraces(context.Background(), td)) testutil.WaitFor(t, func() bool { return len(spanSink.AllTraces()) != 0 diff --git a/service/builder/exporters_builder_test.go b/service/builder/exporters_builder_test.go index cc166df037e..c1b318b94a0 100644 --- a/service/builder/exporters_builder_test.go +++ b/service/builder/exporters_builder_test.go @@ -33,7 +33,7 @@ func TestExportersBuilder_Build(t *testing.T) { factories, err := componenttest.ExampleComponents() assert.NoError(t, err) - oceFactory := &opencensusexporter.Factory{} + oceFactory := opencensusexporter.NewFactory() factories.Exporters[oceFactory.Type()] = oceFactory cfg := &configmodels.Config{ Exporters: map[string]configmodels.Exporter{ diff --git a/service/defaultcomponents/defaults.go b/service/defaultcomponents/defaults.go index 11dea1e9ff3..4b8b85b3059 100644 --- a/service/defaultcomponents/defaults.go +++ b/service/defaultcomponents/defaults.go @@ -83,7 +83,7 @@ func Components() ( } exporters, err := component.MakeExporterFactoryMap( - &opencensusexporter.Factory{}, + opencensusexporter.NewFactory(), prometheusexporter.NewFactory(), loggingexporter.NewFactory(), zipkinexporter.NewFactory(), diff --git a/testbed/testbed/senders.go b/testbed/testbed/senders.go index 890a19d7b80..dbb806132e6 100644 --- a/testbed/testbed/senders.go +++ b/testbed/testbed/senders.go @@ -70,13 +70,13 @@ type MetricDataSenderOld interface { // DataSenderOverTraceExporter partially implements TraceDataSender via a TraceExporter. type DataSenderOverTraceExporterOld struct { - exporter component.TraceExporterOld + exporter component.TraceExporter Host string Port int } -func (ds *DataSenderOverTraceExporterOld) SendSpans(traces consumerdata.TraceData) error { - return ds.exporter.ConsumeTraceData(context.Background(), traces) +func (ds *DataSenderOverTraceExporterOld) SendSpans(td pdata.Traces) error { + return ds.exporter.ConsumeTraces(context.Background(), td) } func (ds *DataSenderOverTraceExporterOld) Flush() { @@ -175,7 +175,7 @@ type OCTraceDataSender struct { } // Ensure OCTraceDataSender implements TraceDataSender. -var _ TraceDataSenderOld = (*OCTraceDataSender)(nil) +var _ TraceDataSender = (*OCTraceDataSender)(nil) // NewOCTraceDataSender creates a new OCTraceDataSender that will send // to the specified port after Start is called. @@ -187,14 +187,15 @@ func NewOCTraceDataSender(host string, port int) *OCTraceDataSender { } func (ote *OCTraceDataSender) Start() error { - factory := opencensusexporter.Factory{} + factory := opencensusexporter.NewFactory() cfg := factory.CreateDefaultConfig().(*opencensusexporter.Config) cfg.Endpoint = fmt.Sprintf("%s:%d", ote.Host, ote.Port) cfg.TLSSetting = configtls.TLSClientSetting{ Insecure: true, } - exporter, err := factory.CreateTraceExporter(zap.L(), cfg) + params := component.ExporterCreateParams{Logger: zap.L()} + exporter, err := factory.CreateTraceExporter(context.Background(), params, cfg) if err != nil { return err } @@ -216,13 +217,13 @@ func (ote *OCTraceDataSender) ProtocolName() string { // OCMetricsDataSender implements MetricDataSender for OpenCensus metrics protocol. type OCMetricsDataSender struct { - exporter component.MetricsExporterOld + exporter component.MetricsExporter host string port int } // Ensure OCMetricsDataSender implements MetricDataSender. -var _ MetricDataSenderOld = (*OCMetricsDataSender)(nil) +var _ MetricDataSender = (*OCMetricsDataSender)(nil) // NewOCMetricDataSender creates a new OpenCensus metric protocol sender that will send // to the specified port after Start is called. @@ -234,14 +235,15 @@ func NewOCMetricDataSender(host string, port int) *OCMetricsDataSender { } func (ome *OCMetricsDataSender) Start() error { - factory := opencensusexporter.Factory{} + factory := opencensusexporter.NewFactory() cfg := factory.CreateDefaultConfig().(*opencensusexporter.Config) cfg.Endpoint = fmt.Sprintf("%s:%d", ome.host, ome.port) cfg.TLSSetting = configtls.TLSClientSetting{ Insecure: true, } - exporter, err := factory.CreateMetricsExporter(zap.L(), cfg) + params := component.ExporterCreateParams{Logger: zap.L()} + exporter, err := factory.CreateMetricsExporter(context.Background(), params, cfg) if err != nil { return err } @@ -250,8 +252,8 @@ func (ome *OCMetricsDataSender) Start() error { return nil } -func (ome *OCMetricsDataSender) SendMetrics(metrics consumerdata.MetricsData) error { - return ome.exporter.ConsumeMetricsData(context.Background(), metrics) +func (ome *OCMetricsDataSender) SendMetrics(md pdata.Metrics) error { + return ome.exporter.ConsumeMetrics(context.Background(), md) } func (ome *OCMetricsDataSender) Flush() {