Skip to content

Commit

Permalink
Add grpc response struct for all signals instead of returning interfa…
Browse files Browse the repository at this point in the history
…ce (open-telemetry#3437)

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Jun 14, 2021
1 parent fc413fc commit d5a1e43
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 45 deletions.
12 changes: 6 additions & 6 deletions exporter/otlpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ type mockTracesReceiver struct {
lastRequest pdata.Traces
}

func (r *mockTracesReceiver) Export(ctx context.Context, td pdata.Traces) (interface{}, error) {
func (r *mockTracesReceiver) Export(ctx context.Context, td pdata.Traces) (pdatagrpc.TracesResponse, error) {
atomic.AddInt32(&r.requestCount, 1)
atomic.AddInt32(&r.totalItems, int32(td.SpanCount()))
r.mux.Lock()
defer r.mux.Unlock()
r.lastRequest = td
r.metadata, _ = metadata.FromIncomingContext(ctx)
return nil, nil
return pdatagrpc.NewTracesResponse(), nil
}

func (r *mockTracesReceiver) GetLastRequest() pdata.Traces {
Expand Down Expand Up @@ -92,14 +92,14 @@ type mockLogsReceiver struct {
lastRequest pdata.Logs
}

func (r *mockLogsReceiver) Export(ctx context.Context, ld pdata.Logs) (interface{}, error) {
func (r *mockLogsReceiver) Export(ctx context.Context, ld pdata.Logs) (pdatagrpc.LogsResponse, error) {
atomic.AddInt32(&r.requestCount, 1)
atomic.AddInt32(&r.totalItems, int32(ld.LogRecordCount()))
r.mux.Lock()
defer r.mux.Unlock()
r.lastRequest = ld
r.metadata, _ = metadata.FromIncomingContext(ctx)
return nil, nil
return pdatagrpc.NewLogsResponse(), nil
}

func (r *mockLogsReceiver) GetLastRequest() pdata.Logs {
Expand Down Expand Up @@ -129,15 +129,15 @@ type mockMetricsReceiver struct {
lastRequest pdata.Metrics
}

func (r *mockMetricsReceiver) Export(ctx context.Context, md pdata.Metrics) (interface{}, error) {
func (r *mockMetricsReceiver) Export(ctx context.Context, md pdata.Metrics) (pdatagrpc.MetricsResponse, error) {
atomic.AddInt32(&r.requestCount, 1)
_, recordCount := md.MetricAndDataPointCount()
atomic.AddInt32(&r.totalItems, int32(recordCount))
r.mux.Lock()
defer r.mux.Unlock()
r.lastRequest = md
r.metadata, _ = metadata.FromIncomingContext(ctx)
return nil, nil
return pdatagrpc.NewMetricsResponse(), nil
}

func (r *mockMetricsReceiver) GetLastRequest() pdata.Metrics {
Expand Down
27 changes: 18 additions & 9 deletions internal/pdatagrpc/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@ import (
otlpcollectorlogs "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
)

// TODO: Consider to add `LogsRequest` and `LogsResponse`. Right now the funcs return interface{},
// it would be better and future proof to create a LogsResponse empty struct and return that.
// So if we ever add things in the OTLP response I can deal with that. Similar for request if we add non pdata properties.
// TODO: Consider to add `LogsRequest`. If we add non pdata properties we can add them to the request.

// LogsResponse represents the response for gRPC client/server.
type LogsResponse struct {
orig *otlpcollectorlogs.ExportLogsServiceResponse
}

// NewLogsResponse returns an empty LogsResponse.
func NewLogsResponse() LogsResponse {
return LogsResponse{orig: &otlpcollectorlogs.ExportLogsServiceResponse{}}
}

// LogsClient is the client API for OTLP-GRPC Logs service.
//
Expand All @@ -36,7 +44,7 @@ type LogsClient interface {
//
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
Export(ctx context.Context, in pdata.Logs, opts ...grpc.CallOption) (interface{}, error)
Export(ctx context.Context, in pdata.Logs, opts ...grpc.CallOption) (LogsResponse, error)
}

type logsClient struct {
Expand All @@ -48,8 +56,9 @@ func NewLogsClient(cc *grpc.ClientConn) LogsClient {
return &logsClient{rawClient: otlpcollectorlogs.NewLogsServiceClient(cc)}
}

func (c *logsClient) Export(ctx context.Context, in pdata.Logs, opts ...grpc.CallOption) (interface{}, error) {
return c.rawClient.Export(ctx, internal.LogsToOtlp(in.InternalRep()), opts...)
func (c *logsClient) Export(ctx context.Context, in pdata.Logs, opts ...grpc.CallOption) (LogsResponse, error) {
rsp, err := c.rawClient.Export(ctx, internal.LogsToOtlp(in.InternalRep()), opts...)
return LogsResponse{orig: rsp}, err
}

// LogsServer is the server API for OTLP gRPC LogsService service.
Expand All @@ -58,7 +67,7 @@ type LogsServer interface {
//
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
Export(context.Context, pdata.Logs) (interface{}, error)
Export(context.Context, pdata.Logs) (LogsResponse, error)
}

// RegisterLogsServer registers the LogsServer to the grpc.Server.
Expand All @@ -71,6 +80,6 @@ type rawLogsServer struct {
}

func (s rawLogsServer) Export(ctx context.Context, request *otlpcollectorlogs.ExportLogsServiceRequest) (*otlpcollectorlogs.ExportLogsServiceResponse, error) {
_, err := s.srv.Export(ctx, pdata.LogsFromInternalRep(internal.LogsFromOtlp(request)))
return &otlpcollectorlogs.ExportLogsServiceResponse{}, err
rsp, err := s.srv.Export(ctx, pdata.LogsFromInternalRep(internal.LogsFromOtlp(request)))
return rsp.orig, err
}
27 changes: 18 additions & 9 deletions internal/pdatagrpc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@ import (
otlpcollectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1"
)

// TODO: Consider to add `MetricsRequest` and `MetricsResponse`. Right now the funcs return interface{},
// it would be better and future proof to create a MetricsResponse empty struct and return that.
// So if we ever add things in the OTLP response I can deal with that. Similar for request if we add non pdata properties.
// TODO: Consider to add `MetricsRequest`. If we add non pdata properties we can add them to the request.

// MetricsResponse represents the response for gRPC client/server.
type MetricsResponse struct {
orig *otlpcollectormetrics.ExportMetricsServiceResponse
}

// NewMetricsResponse returns an empty MetricsResponse.
func NewMetricsResponse() MetricsResponse {
return MetricsResponse{orig: &otlpcollectormetrics.ExportMetricsServiceResponse{}}
}

// MetricsClient is the client API for OTLP-GRPC Metrics service.
//
Expand All @@ -36,7 +44,7 @@ type MetricsClient interface {
//
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
Export(ctx context.Context, in pdata.Metrics, opts ...grpc.CallOption) (interface{}, error)
Export(ctx context.Context, in pdata.Metrics, opts ...grpc.CallOption) (MetricsResponse, error)
}

type metricsClient struct {
Expand All @@ -48,8 +56,9 @@ func NewMetricsClient(cc *grpc.ClientConn) MetricsClient {
return &metricsClient{rawClient: otlpcollectormetrics.NewMetricsServiceClient(cc)}
}

func (c *metricsClient) Export(ctx context.Context, in pdata.Metrics, opts ...grpc.CallOption) (interface{}, error) {
return c.rawClient.Export(ctx, internal.MetricsToOtlp(in.InternalRep()), opts...)
func (c *metricsClient) Export(ctx context.Context, in pdata.Metrics, opts ...grpc.CallOption) (MetricsResponse, error) {
rsp, err := c.rawClient.Export(ctx, internal.MetricsToOtlp(in.InternalRep()), opts...)
return MetricsResponse{orig: rsp}, err
}

// MetricsServer is the server API for OTLP gRPC MetricsService service.
Expand All @@ -58,7 +67,7 @@ type MetricsServer interface {
//
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
Export(context.Context, pdata.Metrics) (interface{}, error)
Export(context.Context, pdata.Metrics) (MetricsResponse, error)
}

// RegisterMetricsServer registers the MetricsServer to the grpc.Server.
Expand All @@ -71,6 +80,6 @@ type rawMetricsServer struct {
}

func (s rawMetricsServer) Export(ctx context.Context, request *otlpcollectormetrics.ExportMetricsServiceRequest) (*otlpcollectormetrics.ExportMetricsServiceResponse, error) {
_, err := s.srv.Export(ctx, pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(request)))
return &otlpcollectormetrics.ExportMetricsServiceResponse{}, err
rsp, err := s.srv.Export(ctx, pdata.MetricsFromInternalRep(internal.MetricsFromOtlp(request)))
return rsp.orig, err
}
27 changes: 18 additions & 9 deletions internal/pdatagrpc/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@ import (
otlpcollectortraces "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
)

// TODO: Consider to add `TracesRequest` and `TracesResponse`. Right now the funcs return interface{},
// it would be better and future proof to create a TracesResponse empty struct and return that.
// So if we ever add things in the OTLP response I can deal with that. Similar for request if we add non pdata properties.
// TODO: Consider to add `TracesRequest`. If we add non pdata properties we can add them to the request.

// TracesResponse represents the response for gRPC client/server.
type TracesResponse struct {
orig *otlpcollectortraces.ExportTraceServiceResponse
}

// NewTracesResponse returns an empty TracesResponse.
func NewTracesResponse() TracesResponse {
return TracesResponse{orig: &otlpcollectortraces.ExportTraceServiceResponse{}}
}

// TracesClient is the client API for OTLP-GRPC Traces service.
//
Expand All @@ -36,7 +44,7 @@ type TracesClient interface {
//
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
Export(ctx context.Context, in pdata.Traces, opts ...grpc.CallOption) (interface{}, error)
Export(ctx context.Context, in pdata.Traces, opts ...grpc.CallOption) (TracesResponse, error)
}

type tracesClient struct {
Expand All @@ -49,8 +57,9 @@ func NewTracesClient(cc *grpc.ClientConn) TracesClient {
}

// Export implements the TracesClient interface.
func (c *tracesClient) Export(ctx context.Context, in pdata.Traces, opts ...grpc.CallOption) (interface{}, error) {
return c.rawClient.Export(ctx, internal.TracesToOtlp(in.InternalRep()), opts...)
func (c *tracesClient) Export(ctx context.Context, in pdata.Traces, opts ...grpc.CallOption) (TracesResponse, error) {
rsp, err := c.rawClient.Export(ctx, internal.TracesToOtlp(in.InternalRep()), opts...)
return TracesResponse{orig: rsp}, err
}

// TracesServer is the server API for OTLP gRPC TracesService service.
Expand All @@ -59,7 +68,7 @@ type TracesServer interface {
//
// For performance reasons, it is recommended to keep this RPC
// alive for the entire life of the application.
Export(context.Context, pdata.Traces) (interface{}, error)
Export(context.Context, pdata.Traces) (TracesResponse, error)
}

// RegisterTracesServer registers the TracesServer to the grpc.Server.
Expand All @@ -73,6 +82,6 @@ type rawTracesServer struct {

func (s rawTracesServer) Export(ctx context.Context, request *otlpcollectortraces.ExportTraceServiceRequest) (*otlpcollectortraces.ExportTraceServiceResponse, error) {
internal.TracesCompatibilityChanges(request)
_, err := s.srv.Export(ctx, pdata.TracesFromInternalRep(internal.TracesFromOtlp(request)))
return &otlpcollectortraces.ExportTraceServiceResponse{}, err
rsp, err := s.srv.Export(ctx, pdata.TracesFromInternalRep(internal.TracesFromOtlp(request)))
return rsp.orig, err
}
7 changes: 4 additions & 3 deletions receiver/otlpreceiver/internal/logs/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/pdatagrpc"
"go.opentelemetry.io/collector/obsreport"
)

Expand Down Expand Up @@ -53,15 +54,15 @@ const (
var receiverID = config.NewIDWithName("otlp", "log")

// Export implements the service Export logs func.
func (r *Receiver) Export(ctx context.Context, ld pdata.Logs) (interface{}, error) {
func (r *Receiver) Export(ctx context.Context, ld pdata.Logs) (pdatagrpc.LogsResponse, error) {
// We need to ensure that it propagates the receiver name as a tag
ctxWithReceiverName := obsreport.ReceiverContext(ctx, r.id, receiverTransport)
err := r.sendToNextConsumer(ctxWithReceiverName, ld)
if err != nil {
return nil, err
return pdatagrpc.LogsResponse{}, err
}

return nil, nil
return pdatagrpc.NewLogsResponse(), nil
}

func (r *Receiver) sendToNextConsumer(ctx context.Context, ld pdata.Logs) error {
Expand Down
2 changes: 1 addition & 1 deletion receiver/otlpreceiver/internal/logs/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestExport_ErrorConsumer(t *testing.T) {

resp, err := logClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = Unknown desc = my error")
assert.Nil(t, resp)
assert.Equal(t, pdatagrpc.LogsResponse{}, resp)
}

func makeLogsServiceClient(addr net.Addr) (pdatagrpc.LogsClient, func(), error) {
Expand Down
7 changes: 4 additions & 3 deletions receiver/otlpreceiver/internal/metrics/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/pdatagrpc"
"go.opentelemetry.io/collector/obsreport"
)

Expand Down Expand Up @@ -52,14 +53,14 @@ const (
var receiverID = config.NewIDWithName("otlp", "metrics")

// Export implements the service Export metrics func.
func (r *Receiver) Export(ctx context.Context, md pdata.Metrics) (interface{}, error) {
func (r *Receiver) Export(ctx context.Context, md pdata.Metrics) (pdatagrpc.MetricsResponse, error) {
receiverCtx := obsreport.ReceiverContext(ctx, r.id, receiverTransport)
err := r.sendToNextConsumer(receiverCtx, md)
if err != nil {
return nil, err
return pdatagrpc.MetricsResponse{}, err
}

return nil, nil
return pdatagrpc.NewMetricsResponse(), nil
}

func (r *Receiver) sendToNextConsumer(ctx context.Context, md pdata.Metrics) error {
Expand Down
2 changes: 1 addition & 1 deletion receiver/otlpreceiver/internal/metrics/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestExport_ErrorConsumer(t *testing.T) {

resp, err := metricsClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = Unknown desc = my error")
assert.Nil(t, resp)
assert.Equal(t, pdatagrpc.MetricsResponse{}, resp)
}

func makeMetricsServiceClient(addr net.Addr) (pdatagrpc.MetricsClient, func(), error) {
Expand Down
7 changes: 4 additions & 3 deletions receiver/otlpreceiver/internal/trace/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/pdatagrpc"
"go.opentelemetry.io/collector/obsreport"
)

Expand Down Expand Up @@ -53,15 +54,15 @@ const (
var receiverID = config.NewIDWithName("otlp", "trace")

// Export implements the service Export traces func.
func (r *Receiver) Export(ctx context.Context, td pdata.Traces) (interface{}, error) {
func (r *Receiver) Export(ctx context.Context, td pdata.Traces) (pdatagrpc.TracesResponse, error) {
// We need to ensure that it propagates the receiver name as a tag
ctxWithReceiverName := obsreport.ReceiverContext(ctx, r.id, receiverTransport)
err := r.sendToNextConsumer(ctxWithReceiverName, td)
if err != nil {
return nil, err
return pdatagrpc.TracesResponse{}, err
}

return nil, nil
return pdatagrpc.NewTracesResponse(), nil
}

func (r *Receiver) sendToNextConsumer(ctx context.Context, td pdata.Traces) error {
Expand Down
2 changes: 1 addition & 1 deletion receiver/otlpreceiver/internal/trace/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestExport_ErrorConsumer(t *testing.T) {
req := testdata.GenerateTracesOneSpan()
resp, err := traceClient.Export(context.Background(), req)
assert.EqualError(t, err, "rpc error: code = Unknown desc = my error")
assert.Nil(t, resp)
assert.Equal(t, pdatagrpc.TracesResponse{}, resp)
}

func makeTraceServiceClient(addr net.Addr) (pdatagrpc.TracesClient, func(), error) {
Expand Down

0 comments on commit d5a1e43

Please sign in to comment.