Skip to content

Commit

Permalink
Verify OTLP receiver Shutdown()
Browse files Browse the repository at this point in the history
OTLP receiver had a bug when it would generate traces
after Shutdown() was returned.

It was visible in the test, with the following symptoms (it was a racy bug):

=== RUN   TestShutdown
    otlp_test.go:871:
        	Error Trace:	otlp_test.go:871
        	Error:      	Not equal:
        	            	expected: 4
        	            	actual  : 5
        	Test:       	TestShutdown
    otlp_test.go:871:
        	Error Trace:	otlp_test.go:871
        	Error:      	Not equal:
        	            	expected: 6
        	            	actual  : 7
        	Test:       	TestShutdown
--- FAIL: TestShutdown (0.49s)
  • Loading branch information
tigrannajaryan committed Feb 25, 2021
1 parent e6319ac commit 3163ee0
Show file tree
Hide file tree
Showing 2 changed files with 6,905 additions and 29 deletions.
177 changes: 148 additions & 29 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,15 +370,13 @@ func TestProtoHttp(t *testing.T) {
}
}
}
func testHTTPProtobufRequest(

func createHTTPProtobufRequest(
t *testing.T,
url string,
tSink *consumertest.TracesSink,
encoding string,
traceBytes []byte,
expectedErr error,
wantOtlp []*otlptrace.ResourceSpans,
) {
) *http.Request {
var buf *bytes.Buffer
var err error
switch encoding {
Expand All @@ -388,11 +386,25 @@ func testHTTPProtobufRequest(
default:
buf = bytes.NewBuffer(traceBytes)
}
tSink.SetConsumeError(expectedErr)
req, err := http.NewRequest("POST", url, buf)
require.NoError(t, err, "Error creating trace POST request: %v", err)
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("Content-Encoding", encoding)
return req
}

func testHTTPProtobufRequest(
t *testing.T,
url string,
tSink *consumertest.TracesSink,
encoding string,
traceBytes []byte,
expectedErr error,
wantOtlp []*otlptrace.ResourceSpans,
) {
tSink.SetConsumeError(expectedErr)

req := createHTTPProtobufRequest(t, url, encoding, traceBytes)

client := &http.Client{}
resp, err := client.Do(req)
Expand All @@ -402,14 +414,14 @@ func testHTTPProtobufRequest(
require.NoError(t, err, "Error reading response from trace grpc-gateway")
require.NoError(t, resp.Body.Close(), "Error closing response body")

allTraces := tSink.AllTraces()

require.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type"), "Unexpected response Content-Type")

allTraces := tSink.AllTraces()

if expectedErr == nil {
require.Equal(t, 200, resp.StatusCode, "Unexpected return status")
tmp := &collectortrace.ExportTraceServiceResponse{}
err = tmp.Unmarshal(respBytes)
err := tmp.Unmarshal(respBytes)
require.NoError(t, err, "Unable to unmarshal response to ExportTraceServiceResponse proto")

require.Len(t, allTraces, 1)
Expand Down Expand Up @@ -556,6 +568,29 @@ func TestHTTPStartWithoutConsumers(t *testing.T) {
require.Error(t, r.Start(context.Background(), componenttest.NewNopHost()))
}

func createSingleSpanTrace() *collectortrace.ExportTraceServiceRequest {
return &collectortrace.ExportTraceServiceRequest{
ResourceSpans: []*otlptrace.ResourceSpans{
{
InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{
{
Spans: []*otlptrace.Span{
{
TraceId: data.NewTraceID(
[16]byte{
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08,
0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10,
},
),
},
},
},
},
},
},
}
}

// TestOTLPReceiverTrace_HandleNextConsumerResponse checks if the trace receiver
// is returning the proper response (return and metrics) when the next consumer
// in the pipeline reports error. The test changes the responses returned by the
Expand Down Expand Up @@ -595,26 +630,7 @@ func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) {
}

addr := testutil.GetAvailableLocalAddress(t)
req := &collectortrace.ExportTraceServiceRequest{
ResourceSpans: []*otlptrace.ResourceSpans{
{
InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{
{
Spans: []*otlptrace.Span{
{
TraceId: data.NewTraceID(
[16]byte{
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08,
0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10,
},
),
},
},
},
},
},
},
}
req := createSingleSpanTrace()

exportBidiFn := func(
t *testing.T,
Expand Down Expand Up @@ -775,3 +791,106 @@ func compressGzip(body []byte) (*bytes.Buffer, error) {

return &buf, nil
}

type senderFunc func(msg *collectortrace.ExportTraceServiceRequest)

func TestShutdown(t *testing.T) {
for i := 0; i < 100; i++ {
endpointGrpc := testutil.GetAvailableLocalAddress(t)
endpointHTTP := testutil.GetAvailableLocalAddress(t)

nextSink := new(consumertest.TracesSink)

// Create OTLP receiver with gRPC and HTTP protocols.
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.SetName(otlpReceiverName)
cfg.GRPC.NetAddr.Endpoint = endpointGrpc
cfg.HTTP.Endpoint = endpointHTTP
ocr := newReceiver(t, factory, cfg, nextSink, nil)
require.NotNil(t, ocr)
require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()))

conn, err := grpc.Dial(endpointGrpc, grpc.WithInsecure(), grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()

doneSignalGrpc := make(chan bool)
doneSignalHTTP := make(chan bool)

senderGrpc := func(msg *collectortrace.ExportTraceServiceRequest) {
// Send request via OTLP/gRPC.
client := collectortrace.NewTraceServiceClient(conn)
client.Export(context.Background(), msg)
}
senderHTTP := func(msg *collectortrace.ExportTraceServiceRequest) {
// Send request via OTLP/HTTP.
traceBytes, err2 := msg.Marshal()
if err2 != nil {
t.Errorf("Error marshaling protobuf: %v", err2)
}
url := fmt.Sprintf("http://%s/v1/traces", endpointHTTP)
req := createHTTPProtobufRequest(t, url, "", traceBytes)
client := &http.Client{}
resp, err2 := client.Do(req)
if err2 == nil {
ioutil.ReadAll(resp.Body)
}
}

// Send traces to the receiver until we signal via done channel, and then continue
// sending some more traces after that.
go generateTraces(senderGrpc, doneSignalGrpc)
go generateTraces(senderHTTP, doneSignalHTTP)

// Wait until the receiver outputs anything to the sink.
assert.Eventually(t, func() bool {
return nextSink.SpansCount() > 0
}, time.Second, 1*time.Millisecond)

// Now shutdown the receiver, while continuing sending traces to it.
err = ocr.Shutdown(context.Background())
assert.NoError(t, err)

// Remember how many spans the sink received. This number should not change after this
// point because after Shutdown() returns the component is not allowed to produce
// any more data.
sinkSpanCountAfterShutdown := nextSink.SpansCount()

// Now signal to generateTraces to exit the main generation loop, then send
// a number of follow up traces and stop.
doneSignalGrpc <- true
doneSignalHTTP <- true

// Wait until all follow up traces are sent.
<-doneSignalGrpc
<-doneSignalHTTP

// The follow up traces should not be received by sink, so the number of spans in
// the sink should not change.
assert.EqualValues(t, sinkSpanCountAfterShutdown, nextSink.SpansCount())
}
}

func generateTraces(senderFn senderFunc, doneSignal chan bool) {
// Continuously generate spans until signaled to stop.
loop:
for {
select {
case <-doneSignal:
break loop
default:
}
senderFn(createSingleSpanTrace())
}

// After getting the signal to stop generate another 10 spans and then
// finally stop.
const afterDoneSpanCount = 10
for i := 0; i < afterDoneSpanCount; i++ {
senderFn(createSingleSpanTrace())
}

// Indicate that we are done.
close(doneSignal)
}
Loading

0 comments on commit 3163ee0

Please sign in to comment.