Skip to content

Commit

Permalink
Small nits in otlp/receiver (#3511)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Jun 29, 2021
1 parent 6986310 commit 590f240
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 69 deletions.
28 changes: 5 additions & 23 deletions receiver/otlpreceiver/internal/logs/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

const (
dataFormatProtobuf = "protobuf"
receiverTransport = "grpc"
)

// Receiver is the type used to handle spans from OpenTelemetry exporters.
Expand All @@ -38,46 +39,27 @@ type Receiver struct {

// New creates a new Receiver reference.
func New(id config.ComponentID, nextConsumer consumer.Logs) *Receiver {
r := &Receiver{
return &Receiver{
id: id,
nextConsumer: nextConsumer,
obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: id, Transport: receiverTransport}),
}

return r
}

const (
receiverTransport = "grpc"
)

var receiverID = config.NewIDWithName("otlp", "log")

// Export implements the service Export logs func.
func (r *Receiver) Export(ctx context.Context, ld pdata.Logs) (otlpgrpc.LogsResponse, error) {
// We need to ensure that it propagates the receiver name as a tag
ctxWithReceiverName := obsreport.ReceiverContext(ctx, r.id, receiverTransport)
err := r.sendToNextConsumer(ctxWithReceiverName, ld)
if err != nil {
return otlpgrpc.LogsResponse{}, err
}

return otlpgrpc.NewLogsResponse(), nil
}

func (r *Receiver) sendToNextConsumer(ctx context.Context, ld pdata.Logs) error {
numSpans := ld.LogRecordCount()
if numSpans == 0 {
return nil
return otlpgrpc.NewLogsResponse(), nil
}

if c, ok := client.FromGRPC(ctx); ok {
ctx = client.NewContext(ctx, c)
}

ctx = r.obsrecv.StartLogsOp(ctx)
ctx = r.obsrecv.StartLogsOp(obsreport.ReceiverContext(ctx, r.id, receiverTransport))
err := r.nextConsumer.ConsumeLogs(ctx, ld)
r.obsrecv.EndLogsOp(ctx, dataFormatProtobuf, numSpans, err)

return err
return otlpgrpc.NewLogsResponse(), err
}
3 changes: 2 additions & 1 deletion receiver/otlpreceiver/internal/logs/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/testdata"
Expand Down Expand Up @@ -110,7 +111,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Logs) (net.Addr, func())
}
}

r := New(receiverID, tc)
r := New(config.NewIDWithName("otlp", "log"), tc)
require.NoError(t, err)

// Now run it as a gRPC server
Expand Down
26 changes: 5 additions & 21 deletions receiver/otlpreceiver/internal/metrics/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

const (
dataFormatProtobuf = "protobuf"
receiverTransport = "grpc"
)

// Receiver is the type used to handle metrics from OpenTelemetry exporters.
Expand All @@ -38,44 +39,27 @@ type Receiver struct {

// New creates a new Receiver reference.
func New(id config.ComponentID, nextConsumer consumer.Metrics) *Receiver {
r := &Receiver{
return &Receiver{
id: id,
nextConsumer: nextConsumer,
obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: id, Transport: receiverTransport}),
}
return r
}

const (
receiverTransport = "grpc"
)

var receiverID = config.NewIDWithName("otlp", "metrics")

// Export implements the service Export metrics func.
func (r *Receiver) Export(ctx context.Context, md pdata.Metrics) (otlpgrpc.MetricsResponse, error) {
receiverCtx := obsreport.ReceiverContext(ctx, r.id, receiverTransport)
err := r.sendToNextConsumer(receiverCtx, md)
if err != nil {
return otlpgrpc.MetricsResponse{}, err
}

return otlpgrpc.NewMetricsResponse(), nil
}

func (r *Receiver) sendToNextConsumer(ctx context.Context, md pdata.Metrics) error {
metricCount, dataPointCount := md.MetricAndDataPointCount()
if metricCount == 0 {
return nil
return otlpgrpc.NewMetricsResponse(), nil
}

if c, ok := client.FromGRPC(ctx); ok {
ctx = client.NewContext(ctx, c)
}

ctx = r.obsrecv.StartMetricsOp(ctx)
ctx = r.obsrecv.StartMetricsOp(obsreport.ReceiverContext(ctx, r.id, receiverTransport))
err := r.nextConsumer.ConsumeMetrics(ctx, md)
r.obsrecv.EndMetricsOp(ctx, dataFormatProtobuf, dataPointCount, err)

return err
return otlpgrpc.NewMetricsResponse(), err
}
3 changes: 2 additions & 1 deletion receiver/otlpreceiver/internal/metrics/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/testdata"
Expand Down Expand Up @@ -119,7 +120,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) (net.Addr, func
}
}

r := New(receiverID, mc)
r := New(config.NewIDWithName("otlp", "metrics"), mc)
// Now run it as a gRPC server
srv := grpc.NewServer()
otlpgrpc.RegisterMetricsServer(srv, r)
Expand Down
27 changes: 5 additions & 22 deletions receiver/otlpreceiver/internal/trace/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

const (
dataFormatProtobuf = "protobuf"
receiverTransport = "grpc"
)

// Receiver is the type used to handle spans from OpenTelemetry exporters.
Expand All @@ -38,46 +39,28 @@ type Receiver struct {

// New creates a new Receiver reference.
func New(id config.ComponentID, nextConsumer consumer.Traces) *Receiver {
r := &Receiver{
return &Receiver{
id: id,
nextConsumer: nextConsumer,
obsrecv: obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: id, Transport: receiverTransport}),
}

return r
}

const (
receiverTransport = "grpc"
)

var receiverID = config.NewIDWithName("otlp", "trace")

// Export implements the service Export traces func.
func (r *Receiver) Export(ctx context.Context, td pdata.Traces) (otlpgrpc.TracesResponse, error) {
// We need to ensure that it propagates the receiver name as a tag
ctxWithReceiverName := obsreport.ReceiverContext(ctx, r.id, receiverTransport)
err := r.sendToNextConsumer(ctxWithReceiverName, td)
if err != nil {
return otlpgrpc.TracesResponse{}, err
}

return otlpgrpc.NewTracesResponse(), nil
}

func (r *Receiver) sendToNextConsumer(ctx context.Context, td pdata.Traces) error {
numSpans := td.SpanCount()
if numSpans == 0 {
return nil
return otlpgrpc.NewTracesResponse(), nil
}

if c, ok := client.FromGRPC(ctx); ok {
ctx = client.NewContext(ctx, c)
}

ctx = r.obsrecv.StartTracesOp(ctx)
ctx = r.obsrecv.StartTracesOp(obsreport.ReceiverContext(ctx, r.id, receiverTransport))
err := r.nextConsumer.ConsumeTraces(ctx, td)
r.obsrecv.EndTracesOp(ctx, dataFormatProtobuf, numSpans, err)

return err
return otlpgrpc.NewTracesResponse(), err
}
3 changes: 2 additions & 1 deletion receiver/otlpreceiver/internal/trace/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/testdata"
Expand Down Expand Up @@ -112,7 +113,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) (net.Addr, func(
}
}

r := New(receiverID, tc)
r := New(config.NewIDWithName("otlp", "trace"), tc)
require.NoError(t, err)

// Now run it as a gRPC server
Expand Down

0 comments on commit 590f240

Please sign in to comment.