Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix OTLP receiver Shutdown() bug #2555

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions receiver/otlpreceiver/logs/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.opentelemetry.io/collector/internal"
collectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/receiver/otlpreceiver/shutdownhelper"
)

const (
Expand All @@ -33,13 +34,15 @@ const (
type Receiver struct {
instanceName string
nextConsumer consumer.LogsConsumer
helper shutdownhelper.Helper
}

// New creates a new Receiver reference.
func New(instanceName string, nextConsumer consumer.LogsConsumer) *Receiver {
r := &Receiver{
instanceName: instanceName,
nextConsumer: nextConsumer,
helper: shutdownhelper.NewHelper(),
}

return r
Expand All @@ -50,7 +53,17 @@ const (
receiverTransport = "grpc"
)

func (r *Receiver) Shutdown(ctx context.Context) error {
return r.helper.Shutdown(ctx)
}

func (r *Receiver) Export(ctx context.Context, req *collectorlog.ExportLogsServiceRequest) (*collectorlog.ExportLogsServiceResponse, error) {
// Protect Export from being interrupted by Shutdown().
if err := r.helper.BeginOperation(); err != nil {
return nil, err
}
defer r.helper.EndOperation()

// We need to ensure that it propagates the receiver name as a tag
ctxWithReceiverName := obsreport.ReceiverContext(ctx, r.instanceName, receiverTransport)

Expand Down
13 changes: 13 additions & 0 deletions receiver/otlpreceiver/metrics/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
collectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/receiver/otlpreceiver/shutdownhelper"
)

const (
Expand All @@ -32,13 +33,15 @@ const (
type Receiver struct {
instanceName string
nextConsumer consumer.MetricsConsumer
helper shutdownhelper.Helper
}

// New creates a new Receiver reference.
func New(instanceName string, nextConsumer consumer.MetricsConsumer) *Receiver {
r := &Receiver{
instanceName: instanceName,
nextConsumer: nextConsumer,
helper: shutdownhelper.NewHelper(),
}
return r
}
Expand All @@ -48,7 +51,17 @@ const (
receiverTransport = "grpc"
)

func (r *Receiver) Shutdown(ctx context.Context) error {
return r.helper.Shutdown(ctx)
}

func (r *Receiver) Export(ctx context.Context, req *collectormetrics.ExportMetricsServiceRequest) (*collectormetrics.ExportMetricsServiceResponse, error) {
// Protect Export from being interrupted by Shutdown().
if err := r.helper.BeginOperation(); err != nil {
return nil, err
}
defer r.helper.EndOperation()

receiverCtx := obsreport.ReceiverContext(ctx, r.instanceName, receiverTransport)

md := pdata.MetricsFromOtlp(req.ResourceMetrics)
Expand Down
23 changes: 22 additions & 1 deletion receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (r *otlpReceiver) Start(_ context.Context, host component.Host) error {
}

// Shutdown is a method to turn off receiving.
func (r *otlpReceiver) Shutdown(context.Context) error {
func (r *otlpReceiver) Shutdown(ctx context.Context) error {
var err error
r.stopOnce.Do(func() {
err = nil
Expand All @@ -180,6 +180,27 @@ func (r *otlpReceiver) Shutdown(context.Context) error {
if r.serverGRPC != nil {
r.serverGRPC.Stop()
}

if r.traceReceiver != nil {
err2 := r.traceReceiver.Shutdown(ctx)
if err == nil && err2 != nil {
err = err2
}
}

if r.metricsReceiver != nil {
err2 := r.metricsReceiver.Shutdown(ctx)
if err == nil && err2 != nil {
err = err2
}
}

if r.logReceiver != nil {
err2 := r.logReceiver.Shutdown(ctx)
if err == nil && err2 != nil {
err = err2
}
}
})
return err
}
Expand Down
174 changes: 145 additions & 29 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,15 +370,13 @@ func TestProtoHttp(t *testing.T) {
}
}
}
func testHTTPProtobufRequest(

func createHTTPProtobufRequest(
t *testing.T,
url string,
tSink *consumertest.TracesSink,
encoding string,
traceBytes []byte,
expectedErr error,
wantOtlp []*otlptrace.ResourceSpans,
) {
) *http.Request {
var buf *bytes.Buffer
var err error
switch encoding {
Expand All @@ -388,11 +386,25 @@ func testHTTPProtobufRequest(
default:
buf = bytes.NewBuffer(traceBytes)
}
tSink.SetConsumeError(expectedErr)
req, err := http.NewRequest("POST", url, buf)
require.NoError(t, err, "Error creating trace POST request: %v", err)
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("Content-Encoding", encoding)
return req
}

func testHTTPProtobufRequest(
t *testing.T,
url string,
tSink *consumertest.TracesSink,
encoding string,
traceBytes []byte,
expectedErr error,
wantOtlp []*otlptrace.ResourceSpans,
) {
tSink.SetConsumeError(expectedErr)

req := createHTTPProtobufRequest(t, url, encoding, traceBytes)

client := &http.Client{}
resp, err := client.Do(req)
Expand All @@ -402,14 +414,14 @@ func testHTTPProtobufRequest(
require.NoError(t, err, "Error reading response from trace grpc-gateway")
require.NoError(t, resp.Body.Close(), "Error closing response body")

allTraces := tSink.AllTraces()

require.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type"), "Unexpected response Content-Type")

allTraces := tSink.AllTraces()

if expectedErr == nil {
require.Equal(t, 200, resp.StatusCode, "Unexpected return status")
tmp := &collectortrace.ExportTraceServiceResponse{}
err = tmp.Unmarshal(respBytes)
err := tmp.Unmarshal(respBytes)
require.NoError(t, err, "Unable to unmarshal response to ExportTraceServiceResponse proto")

require.Len(t, allTraces, 1)
Expand Down Expand Up @@ -556,6 +568,29 @@ func TestHTTPStartWithoutConsumers(t *testing.T) {
require.Error(t, r.Start(context.Background(), componenttest.NewNopHost()))
}

func createSingleSpanTrace() *collectortrace.ExportTraceServiceRequest {
return &collectortrace.ExportTraceServiceRequest{
ResourceSpans: []*otlptrace.ResourceSpans{
{
InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{
{
Spans: []*otlptrace.Span{
{
TraceId: data.NewTraceID(
[16]byte{
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08,
0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10,
},
),
},
},
},
},
},
},
}
}

// TestOTLPReceiverTrace_HandleNextConsumerResponse checks if the trace receiver
// is returning the proper response (return and metrics) when the next consumer
// in the pipeline reports error. The test changes the responses returned by the
Expand Down Expand Up @@ -595,26 +630,7 @@ func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) {
}

addr := testutil.GetAvailableLocalAddress(t)
req := &collectortrace.ExportTraceServiceRequest{
ResourceSpans: []*otlptrace.ResourceSpans{
{
InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{
{
Spans: []*otlptrace.Span{
{
TraceId: data.NewTraceID(
[16]byte{
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08,
0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10,
},
),
},
},
},
},
},
},
}
req := createSingleSpanTrace()

exportBidiFn := func(
t *testing.T,
Expand Down Expand Up @@ -775,3 +791,103 @@ func compressGzip(body []byte) (*bytes.Buffer, error) {

return &buf, nil
}

type senderFunc func(msg *collectortrace.ExportTraceServiceRequest)

func TestShutdown(t *testing.T) {
endpointGrpc := testutil.GetAvailableLocalAddress(t)
endpointHTTP := testutil.GetAvailableLocalAddress(t)

nextSink := new(consumertest.TracesSink)

// Create OTLP receiver with gRPC and HTTP protocols.
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.SetName(otlpReceiverName)
cfg.GRPC.NetAddr.Endpoint = endpointGrpc
cfg.HTTP.Endpoint = endpointHTTP
ocr := newReceiver(t, factory, cfg, nextSink, nil)
require.NotNil(t, ocr)
require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()))

conn, err := grpc.Dial(endpointGrpc, grpc.WithInsecure(), grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()

doneSignalGrpc := make(chan bool)
doneSignalHTTP := make(chan bool)

senderGrpc := func(msg *collectortrace.ExportTraceServiceRequest) {
// Send request via OTLP/gRPC.
client := collectortrace.NewTraceServiceClient(conn)
client.Export(context.Background(), msg)
}
senderHTTP := func(msg *collectortrace.ExportTraceServiceRequest) {
// Send request via OTLP/HTTP.
traceBytes, err2 := msg.Marshal()
if err2 != nil {
t.Errorf("Error marshaling protobuf: %v", err2)
}
url := fmt.Sprintf("http://%s/v1/traces", endpointHTTP)
req := createHTTPProtobufRequest(t, url, "", traceBytes)
client := &http.Client{}
resp, err2 := client.Do(req)
if err2 == nil {
ioutil.ReadAll(resp.Body)
}
}

// Send traces to the receiver until we signal via done channel, and then
// send one more trace after that.
go generateTraces(senderGrpc, doneSignalGrpc)
go generateTraces(senderHTTP, doneSignalHTTP)

// Wait until the receiver outputs anything to the sink.
assert.Eventually(t, func() bool {
return nextSink.SpansCount() > 0
}, time.Second, 1*time.Millisecond)

// Now shutdown the receiver, while continuing sending traces to it.
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFn()
err = ocr.Shutdown(ctx)
assert.NoError(t, err)

// Remember how many spans the sink received. This number should not change after this
// point because after Shutdown() returns the component is not allowed to produce
// any more data.
sinkSpanCountAfterShutdown := nextSink.SpansCount()

// Now signal to generateTraces to exit the main generation loop, then send
// one more trace and stop.
doneSignalGrpc <- true
doneSignalHTTP <- true

// Wait until all follow up traces are sent.
<-doneSignalGrpc
<-doneSignalHTTP

// The last, additional trace should not be received by sink, so the number of spans in
// the sink should not change.
assert.EqualValues(t, sinkSpanCountAfterShutdown, nextSink.SpansCount())
}

func generateTraces(senderFn senderFunc, doneSignal chan bool) {
// Continuously generate spans until signaled to stop.
loop:
for {
select {
case <-doneSignal:
break loop
default:
}
senderFn(createSingleSpanTrace())
}

// After getting the signal to stop, send one more span and then
// finally stop. We should never receive this last span.
senderFn(createSingleSpanTrace())

// Indicate that we are done.
close(doneSignal)
}
Loading