diff --git a/receiver/otlpreceiver/logs/otlp_test.go b/receiver/otlpreceiver/logs/otlp_test.go index d583a1c3d49..b1e95ebe502 100644 --- a/receiver/otlpreceiver/logs/otlp_test.go +++ b/receiver/otlpreceiver/logs/otlp_test.go @@ -27,10 +27,9 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/internal" - "go.opentelemetry.io/collector/internal/data" collectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1" - otlplog "go.opentelemetry.io/collector/internal/data/protogen/logs/v1" + "go.opentelemetry.io/collector/internal/pdatagrpc" + "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/obsreport" ) @@ -48,43 +47,18 @@ func TestExport(t *testing.T) { require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err) defer traceClientDoneFn() - // when - - unixnanos := uint64(12578940000000012345) - traceID := [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 8, 7, 6, 5, 4, 3, 2, 1} - spanID := [8]byte{8, 7, 6, 5, 4, 3, 2, 1} - otlp := &collectorlog.ExportLogsServiceRequest{ - ResourceLogs: []*otlplog.ResourceLogs{ - { - InstrumentationLibraryLogs: []*otlplog.InstrumentationLibraryLogs{ - { - Logs: []*otlplog.LogRecord{ - { - TraceId: data.NewTraceID(traceID), - SpanId: data.NewSpanID(spanID), - Name: "operationB", - TimeUnixNano: unixnanos, - }, - }, - }, - }, - }, - }, - } - + req := testdata.GenerateLogsOneLogRecord() // Keep log data to compare the test result against it // Clone needed because OTLP proto XXX_ fields are altered in the GRPC downstream - ld := pdata.LogsFromInternalRep(internal.LogsFromOtlp(otlp)).Clone() + logData := req.Clone() - resp, err := traceClient.Export(context.Background(), otlp) + resp, err := traceClient.Export(context.Background(), req) require.NoError(t, err, "Failed to export trace: %v", err) require.NotNil(t, resp, "The response is missing") - // assert - - require.Equal(t, 1, len(logSink.AllLogs()), "unexpected length: %v", len(logSink.AllLogs())) - - assert.EqualValues(t, ld, logSink.AllLogs()[0]) + lds := logSink.AllLogs() + require.Len(t, lds, 1) + assert.EqualValues(t, logData, lds[0]) } func TestExport_EmptyRequest(t *testing.T) { @@ -97,7 +71,7 @@ func TestExport_EmptyRequest(t *testing.T) { require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err) defer logClientDoneFn() - resp, err := logClient.Export(context.Background(), &collectorlog.ExportLogsServiceRequest{}) + resp, err := logClient.Export(context.Background(), pdata.NewLogs()) assert.NoError(t, err, "Failed to export trace: %v", err) assert.NotNil(t, resp, "The response is missing") } @@ -110,34 +84,20 @@ func TestExport_ErrorConsumer(t *testing.T) { require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err) defer logClientDoneFn() - req := &collectorlog.ExportLogsServiceRequest{ - ResourceLogs: []*otlplog.ResourceLogs{ - { - InstrumentationLibraryLogs: []*otlplog.InstrumentationLibraryLogs{ - { - Logs: []*otlplog.LogRecord{ - { - Name: "operationB", - }, - }, - }, - }, - }, - }, - } + req := testdata.GenerateLogsOneLogRecord() resp, err := logClient.Export(context.Background(), req) assert.EqualError(t, err, "rpc error: code = Unknown desc = my error") assert.Nil(t, resp) } -func makeLogsServiceClient(addr net.Addr) (collectorlog.LogsServiceClient, func(), error) { +func makeLogsServiceClient(addr net.Addr) (pdatagrpc.LogsClient, func(), error) { cc, err := grpc.Dial(addr.String(), grpc.WithInsecure(), grpc.WithBlock()) if err != nil { return nil, nil, err } - logClient := collectorlog.NewLogsServiceClient(cc) + logClient := pdatagrpc.NewLogsClient(cc) doneFn := func() { _ = cc.Close() } return logClient, doneFn, nil diff --git a/receiver/otlpreceiver/metrics/otlp_test.go b/receiver/otlpreceiver/metrics/otlp_test.go index 965d8edb69f..51dde638406 100644 --- a/receiver/otlpreceiver/metrics/otlp_test.go +++ b/receiver/otlpreceiver/metrics/otlp_test.go @@ -27,10 +27,9 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/internal" collectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1" - otlpcommon "go.opentelemetry.io/collector/internal/data/protogen/common/v1" - otlpmetrics "go.opentelemetry.io/collector/internal/data/protogen/metrics/v1" + "go.opentelemetry.io/collector/internal/pdatagrpc" + "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/obsreport" ) @@ -50,60 +49,11 @@ func TestExport(t *testing.T) { // when - unixnanos1 := uint64(12578940000000012345) - unixnanos2 := uint64(12578940000000054321) - - req := &collectormetrics.ExportMetricsServiceRequest{ - ResourceMetrics: []*otlpmetrics.ResourceMetrics{ - { - InstrumentationLibraryMetrics: []*otlpmetrics.InstrumentationLibraryMetrics{ - { - Metrics: []*otlpmetrics.Metric{ - { - Name: "mymetric", - Description: "My metric", - Unit: "ms", - Data: &otlpmetrics.Metric_IntSum{ - IntSum: &otlpmetrics.IntSum{ - IsMonotonic: true, - AggregationTemporality: otlpmetrics.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, - DataPoints: []*otlpmetrics.IntDataPoint{ - { - Labels: []otlpcommon.StringKeyValue{ - { - Key: "key1", - Value: "value1", - }, - }, - StartTimeUnixNano: unixnanos1, - TimeUnixNano: unixnanos2, - Value: 123, - }, - { - Labels: []otlpcommon.StringKeyValue{ - { - Key: "key2", - Value: "value2", - }, - }, - StartTimeUnixNano: unixnanos1, - TimeUnixNano: unixnanos2, - Value: 456, - }, - }, - }, - }, - }, - }, - }, - }, - }, - }, - } + req := testdata.GenerateMetricsOneMetric() // Keep metric data to compare the test result against it // Clone needed because OTLP proto XXX_ fields are altered in the GRPC downstream - metricData := pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(req)).Clone() + metricData := req.Clone() resp, err := metricsClient.Export(context.Background(), req) require.NoError(t, err, "Failed to export metrics: %v", err) @@ -111,10 +61,9 @@ func TestExport(t *testing.T) { // assert - require.Equal(t, 1, len(metricSink.AllMetrics()), - "unexpected length: %v", len(metricSink.AllMetrics())) - - assert.EqualValues(t, metricData, metricSink.AllMetrics()[0]) + mds := metricSink.AllMetrics() + require.Len(t, mds, 1) + assert.EqualValues(t, metricData, mds[0]) } func TestExport_EmptyRequest(t *testing.T) { @@ -129,7 +78,7 @@ func TestExport_EmptyRequest(t *testing.T) { require.NoError(t, err, "Failed to create the MetricsServiceClient: %v", err) defer metricsClientDoneFn() - resp, err := metricsClient.Export(context.Background(), &collectormetrics.ExportMetricsServiceRequest{}) + resp, err := metricsClient.Export(context.Background(), pdata.NewMetrics()) require.NoError(t, err) require.NotNil(t, resp) } @@ -144,47 +93,20 @@ func TestExport_ErrorConsumer(t *testing.T) { require.NoError(t, err, "Failed to create the MetricsServiceClient: %v", err) defer metricsClientDoneFn() - req := &collectormetrics.ExportMetricsServiceRequest{ResourceMetrics: []*otlpmetrics.ResourceMetrics{ - { - InstrumentationLibraryMetrics: []*otlpmetrics.InstrumentationLibraryMetrics{ - { - Metrics: []*otlpmetrics.Metric{ - { - Name: "mymetric", - Description: "My metric", - Unit: "ms", - Data: &otlpmetrics.Metric_IntSum{ - IntSum: &otlpmetrics.IntSum{ - IsMonotonic: true, - AggregationTemporality: otlpmetrics.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, - DataPoints: []*otlpmetrics.IntDataPoint{ - { - Value: 123, - }, - { - Value: 456, - }, - }, - }, - }, - }, - }, - }, - }, - }, - }} + req := testdata.GenerateMetricsOneMetric() + resp, err := metricsClient.Export(context.Background(), req) assert.EqualError(t, err, "rpc error: code = Unknown desc = my error") assert.Nil(t, resp) } -func makeMetricsServiceClient(addr net.Addr) (collectormetrics.MetricsServiceClient, func(), error) { +func makeMetricsServiceClient(addr net.Addr) (pdatagrpc.MetricsClient, func(), error) { cc, err := grpc.Dial(addr.String(), grpc.WithInsecure(), grpc.WithBlock()) if err != nil { return nil, nil, err } - metricsClient := collectormetrics.NewMetricsServiceClient(cc) + metricsClient := pdatagrpc.NewMetricsClient(cc) doneFn := func() { _ = cc.Close() } return metricsClient, doneFn, nil diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index 7242344dbcf..fa9ed432b0f 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -54,6 +54,7 @@ import ( otlpresource "go.opentelemetry.io/collector/internal/data/protogen/resource/v1" otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1" "go.opentelemetry.io/collector/internal/internalconsumertest" + "go.opentelemetry.io/collector/internal/pdatagrpc" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/obsreport/obsreporttest" "go.opentelemetry.io/collector/testutil" @@ -98,7 +99,6 @@ var traceJSON = []byte(` }`) var resourceSpansOtlp = otlptrace.ResourceSpans{ - Resource: otlpresource.Resource{ Attributes: []otlpcommon.KeyValue{ { @@ -348,8 +348,8 @@ func TestProtoHttp(t *testing.T) { // Wait for the servers to start <-time.After(10 * time.Millisecond) - traceProto := internal.TracesToOtlp(testdata.GenerateTracesOneSpan().InternalRep()) - traceBytes, err := traceProto.Marshal() + traceData := testdata.GenerateTracesOneSpan() + traceBytes, err := traceData.ToOtlpProtoBytes() if err != nil { t.Errorf("Error marshaling protobuf: %v", err) } @@ -365,7 +365,7 @@ func TestProtoHttp(t *testing.T) { t.Run(test.name+targetURLPath, func(t *testing.T) { url := fmt.Sprintf("http://%s%s", addr, targetURLPath) tSink.Reset() - testHTTPProtobufRequest(t, url, tSink, test.encoding, traceBytes, test.err, traceProto) + testHTTPProtobufRequest(t, url, tSink, test.encoding, traceBytes, test.err, traceData) }) } } @@ -400,7 +400,7 @@ func testHTTPProtobufRequest( encoding string, traceBytes []byte, expectedErr error, - wantOtlp *collectortrace.ExportTraceServiceRequest, + wantData pdata.Traces, ) { tSink.SetConsumeError(expectedErr) @@ -425,9 +425,7 @@ func testHTTPProtobufRequest( require.NoError(t, err, "Unable to unmarshal response to ExportTraceServiceResponse proto") require.Len(t, allTraces, 1) - - gotOtlp := internal.TracesToOtlp(allTraces[0].InternalRep()) - assert.EqualValues(t, gotOtlp, wantOtlp) + assert.EqualValues(t, allTraces[0], wantData) } else { errStatus := &spb.Status{} assert.NoError(t, proto.Unmarshal(respBytes, errStatus)) @@ -542,8 +540,8 @@ func TestHTTPNewPortAlreadyUsed(t *testing.T) { require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) } -func createSingleSpanTrace() *collectortrace.ExportTraceServiceRequest { - return internal.TracesToOtlp(testdata.GenerateTracesOneSpan().InternalRep()) +func createSingleSpanTrace() pdata.Traces { + return testdata.GenerateTracesOneSpan() } // TestOTLPReceiverTrace_HandleNextConsumerResponse checks if the trace receiver @@ -587,27 +585,15 @@ func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) req := createSingleSpanTrace() - exportBidiFn := func( - t *testing.T, - cc *grpc.ClientConn, - msg *collectortrace.ExportTraceServiceRequest) error { - - acc := collectortrace.NewTraceServiceClient(cc) - _, err := acc.Export(context.Background(), req) - - return err - } - exporters := []struct { receiverTag string exportFn func( - t *testing.T, cc *grpc.ClientConn, - msg *collectortrace.ExportTraceServiceRequest) error + td pdata.Traces) error }{ { receiverTag: "trace", - exportFn: exportBidiFn, + exportFn: exportTraces, }, } for _, exporter := range exporters { @@ -635,7 +621,7 @@ func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) { sink.SetConsumeError(fmt.Errorf("%q: consumer error", tt.name)) } - err = exporter.exportFn(t, cc, req) + err = exporter.exportFn(cc, req) status, ok := status.FromError(err) require.True(t, ok) @@ -755,7 +741,7 @@ func compressGzip(body []byte) (*bytes.Buffer, error) { return &buf, nil } -type senderFunc func(msg *collectortrace.ExportTraceServiceRequest) +type senderFunc func(td pdata.Traces) func TestShutdown(t *testing.T) { endpointGrpc := testutil.GetAvailableLocalAddress(t) @@ -785,14 +771,13 @@ func TestShutdown(t *testing.T) { doneSignalGrpc := make(chan bool) doneSignalHTTP := make(chan bool) - senderGrpc := func(msg *collectortrace.ExportTraceServiceRequest) { - // Send request via OTLP/gRPC. - client := collectortrace.NewTraceServiceClient(conn) - client.Export(context.Background(), msg) //nolint: errcheck + senderGrpc := func(td pdata.Traces) { + // Ignore error, may be executed after the receiver shutdown. + _ = exportTraces(conn, td) } - senderHTTP := func(msg *collectortrace.ExportTraceServiceRequest) { + senderHTTP := func(td pdata.Traces) { // Send request via OTLP/HTTP. - traceBytes, err2 := msg.Marshal() + traceBytes, err2 := td.ToOtlpProtoBytes() if err2 != nil { t.Errorf("Error marshaling protobuf: %v", err2) } @@ -859,3 +844,10 @@ loop: // Indicate that we are done. close(doneSignal) } + +func exportTraces(cc *grpc.ClientConn, td pdata.Traces) error { + acc := pdatagrpc.NewTracesClient(cc) + _, err := acc.Export(context.Background(), td) + + return err +} diff --git a/receiver/otlpreceiver/trace/otlp_test.go b/receiver/otlpreceiver/trace/otlp_test.go index 62ef349a810..46e20076a8d 100644 --- a/receiver/otlpreceiver/trace/otlp_test.go +++ b/receiver/otlpreceiver/trace/otlp_test.go @@ -27,10 +27,9 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/internal" - "go.opentelemetry.io/collector/internal/data" collectortrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1" - otlptrace "go.opentelemetry.io/collector/internal/data/protogen/trace/v1" + "go.opentelemetry.io/collector/internal/pdatagrpc" + "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/obsreport" ) @@ -50,44 +49,18 @@ func TestExport(t *testing.T) { // when - unixnanos := uint64(12578940000000012345) - traceID := [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 8, 7, 6, 5, 4, 3, 2, 1} - spanID := [8]byte{8, 7, 6, 5, 4, 3, 2, 1} - req := &collectortrace.ExportTraceServiceRequest{ - ResourceSpans: []*otlptrace.ResourceSpans{ - { - InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ - { - Spans: []*otlptrace.Span{ - { - TraceId: data.NewTraceID(traceID), - SpanId: data.NewSpanID(spanID), - Name: "operationB", - Kind: otlptrace.Span_SPAN_KIND_SERVER, - StartTimeUnixNano: unixnanos, - EndTimeUnixNano: unixnanos, - Status: otlptrace.Status{Message: "status-cancelled", Code: otlptrace.Status_STATUS_CODE_ERROR}, - TraceState: "a=text,b=123", - }, - }, - }, - }, - }, - }, - } + req := testdata.GenerateTracesOneSpan() // Keep trace data to compare the test result against it // Clone needed because OTLP proto XXX_ fields are altered in the GRPC downstream - traceData := pdata.TracesFromInternalRep(internal.TracesFromOtlp(req)).Clone() + traceData := req.Clone() resp, err := traceClient.Export(context.Background(), req) require.NoError(t, err, "Failed to export trace: %v", err) require.NotNil(t, resp, "The response is missing") // assert - - require.Equal(t, 1, len(traceSink.AllTraces()), "unexpected length: %v", len(traceSink.AllTraces())) - + require.Len(t, traceSink.AllTraces(), 1) assert.EqualValues(t, traceData, traceSink.AllTraces()[0]) } @@ -101,7 +74,7 @@ func TestExport_EmptyRequest(t *testing.T) { require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err) defer traceClientDoneFn() - resp, err := traceClient.Export(context.Background(), &collectortrace.ExportTraceServiceRequest{}) + resp, err := traceClient.Export(context.Background(), pdata.NewTraces()) assert.NoError(t, err, "Failed to export trace: %v", err) assert.NotNil(t, resp, "The response is missing") } @@ -114,34 +87,19 @@ func TestExport_ErrorConsumer(t *testing.T) { require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err) defer traceClientDoneFn() - req := &collectortrace.ExportTraceServiceRequest{ - ResourceSpans: []*otlptrace.ResourceSpans{ - { - InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ - { - Spans: []*otlptrace.Span{ - { - Name: "operationB", - }, - }, - }, - }, - }, - }, - } - + req := testdata.GenerateTracesOneSpan() resp, err := traceClient.Export(context.Background(), req) assert.EqualError(t, err, "rpc error: code = Unknown desc = my error") assert.Nil(t, resp) } -func makeTraceServiceClient(addr net.Addr) (collectortrace.TraceServiceClient, func(), error) { +func makeTraceServiceClient(addr net.Addr) (pdatagrpc.TracesClient, func(), error) { cc, err := grpc.Dial(addr.String(), grpc.WithInsecure(), grpc.WithBlock()) if err != nil { return nil, nil, err } - metricsClient := collectortrace.NewTraceServiceClient(cc) + metricsClient := pdatagrpc.NewTracesClient(cc) doneFn := func() { _ = cc.Close() } return metricsClient, doneFn, nil