Skip to content

Commit

Permalink
Fix OTLP receiver Shutdown() bug
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Feb 26, 2021
1 parent e6319ac commit 5ad19b5
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 31 deletions.
4 changes: 2 additions & 2 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,13 @@ func (r *otlpReceiver) Start(_ context.Context, host component.Host) error {
}

// Shutdown is a method to turn off receiving.
func (r *otlpReceiver) Shutdown(context.Context) error {
func (r *otlpReceiver) Shutdown(ctx context.Context) error {
var err error
r.stopOnce.Do(func() {
err = nil

if r.serverHTTP != nil {
err = r.serverHTTP.Close()
err = r.serverHTTP.Shutdown(ctx)
}

if r.serverGRPC != nil {
Expand Down
174 changes: 145 additions & 29 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,15 +370,13 @@ func TestProtoHttp(t *testing.T) {
}
}
}
func testHTTPProtobufRequest(

func createHTTPProtobufRequest(
t *testing.T,
url string,
tSink *consumertest.TracesSink,
encoding string,
traceBytes []byte,
expectedErr error,
wantOtlp []*otlptrace.ResourceSpans,
) {
) *http.Request {
var buf *bytes.Buffer
var err error
switch encoding {
Expand All @@ -388,11 +386,25 @@ func testHTTPProtobufRequest(
default:
buf = bytes.NewBuffer(traceBytes)
}
tSink.SetConsumeError(expectedErr)
req, err := http.NewRequest("POST", url, buf)
require.NoError(t, err, "Error creating trace POST request: %v", err)
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("Content-Encoding", encoding)
return req
}

func testHTTPProtobufRequest(
t *testing.T,
url string,
tSink *consumertest.TracesSink,
encoding string,
traceBytes []byte,
expectedErr error,
wantOtlp []*otlptrace.ResourceSpans,
) {
tSink.SetConsumeError(expectedErr)

req := createHTTPProtobufRequest(t, url, encoding, traceBytes)

client := &http.Client{}
resp, err := client.Do(req)
Expand All @@ -402,14 +414,14 @@ func testHTTPProtobufRequest(
require.NoError(t, err, "Error reading response from trace grpc-gateway")
require.NoError(t, resp.Body.Close(), "Error closing response body")

allTraces := tSink.AllTraces()

require.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type"), "Unexpected response Content-Type")

allTraces := tSink.AllTraces()

if expectedErr == nil {
require.Equal(t, 200, resp.StatusCode, "Unexpected return status")
tmp := &collectortrace.ExportTraceServiceResponse{}
err = tmp.Unmarshal(respBytes)
err := tmp.Unmarshal(respBytes)
require.NoError(t, err, "Unable to unmarshal response to ExportTraceServiceResponse proto")

require.Len(t, allTraces, 1)
Expand Down Expand Up @@ -556,6 +568,29 @@ func TestHTTPStartWithoutConsumers(t *testing.T) {
require.Error(t, r.Start(context.Background(), componenttest.NewNopHost()))
}

func createSingleSpanTrace() *collectortrace.ExportTraceServiceRequest {
return &collectortrace.ExportTraceServiceRequest{
ResourceSpans: []*otlptrace.ResourceSpans{
{
InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{
{
Spans: []*otlptrace.Span{
{
TraceId: data.NewTraceID(
[16]byte{
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08,
0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10,
},
),
},
},
},
},
},
},
}
}

// TestOTLPReceiverTrace_HandleNextConsumerResponse checks if the trace receiver
// is returning the proper response (return and metrics) when the next consumer
// in the pipeline reports error. The test changes the responses returned by the
Expand Down Expand Up @@ -595,26 +630,7 @@ func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) {
}

addr := testutil.GetAvailableLocalAddress(t)
req := &collectortrace.ExportTraceServiceRequest{
ResourceSpans: []*otlptrace.ResourceSpans{
{
InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{
{
Spans: []*otlptrace.Span{
{
TraceId: data.NewTraceID(
[16]byte{
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08,
0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10,
},
),
},
},
},
},
},
},
}
req := createSingleSpanTrace()

exportBidiFn := func(
t *testing.T,
Expand Down Expand Up @@ -775,3 +791,103 @@ func compressGzip(body []byte) (*bytes.Buffer, error) {

return &buf, nil
}

type senderFunc func(msg *collectortrace.ExportTraceServiceRequest)

func TestShutdown(t *testing.T) {
endpointGrpc := testutil.GetAvailableLocalAddress(t)
endpointHTTP := testutil.GetAvailableLocalAddress(t)

nextSink := new(consumertest.TracesSink)

// Create OTLP receiver with gRPC and HTTP protocols.
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.SetName(otlpReceiverName)
cfg.GRPC.NetAddr.Endpoint = endpointGrpc
cfg.HTTP.Endpoint = endpointHTTP
ocr := newReceiver(t, factory, cfg, nextSink, nil)
require.NotNil(t, ocr)
require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()))

conn, err := grpc.Dial(endpointGrpc, grpc.WithInsecure(), grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()

doneSignalGrpc := make(chan bool)
doneSignalHTTP := make(chan bool)

senderGrpc := func(msg *collectortrace.ExportTraceServiceRequest) {
// Send request via OTLP/gRPC.
client := collectortrace.NewTraceServiceClient(conn)
client.Export(context.Background(), msg)
}
senderHTTP := func(msg *collectortrace.ExportTraceServiceRequest) {
// Send request via OTLP/HTTP.
traceBytes, err2 := msg.Marshal()
if err2 != nil {
t.Errorf("Error marshaling protobuf: %v", err2)
}
url := fmt.Sprintf("http://%s/v1/traces", endpointHTTP)
req := createHTTPProtobufRequest(t, url, "", traceBytes)
client := &http.Client{}
resp, err2 := client.Do(req)
if err2 == nil {
ioutil.ReadAll(resp.Body)
}
}

// Send traces to the receiver until we signal via done channel, and then
// send one more trace after that.
go generateTraces(senderGrpc, doneSignalGrpc)
go generateTraces(senderHTTP, doneSignalHTTP)

// Wait until the receiver outputs anything to the sink.
assert.Eventually(t, func() bool {
return nextSink.SpansCount() > 0
}, time.Second, 1*time.Millisecond)

// Now shutdown the receiver, while continuing sending traces to it.
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFn()
err = ocr.Shutdown(ctx)
assert.NoError(t, err)

// Remember how many spans the sink received. This number should not change after this
// point because after Shutdown() returns the component is not allowed to produce
// any more data.
sinkSpanCountAfterShutdown := nextSink.SpansCount()

// Now signal to generateTraces to exit the main generation loop, then send
// one more trace and stop.
doneSignalGrpc <- true
doneSignalHTTP <- true

// Wait until all follow up traces are sent.
<-doneSignalGrpc
<-doneSignalHTTP

// The last, additional trace should not be received by sink, so the number of spans in
// the sink should not change.
assert.EqualValues(t, sinkSpanCountAfterShutdown, nextSink.SpansCount())
}

func generateTraces(senderFn senderFunc, doneSignal chan bool) {
// Continuously generate spans until signaled to stop.
loop:
for {
select {
case <-doneSignal:
break loop
default:
}
senderFn(createSingleSpanTrace())
}

// After getting the signal to stop, send one more span and then
// finally stop. We should never receive this last span.
senderFn(createSingleSpanTrace())

// Indicate that we are done.
close(doneSignal)
}

0 comments on commit 5ad19b5

Please sign in to comment.