Skip to content

Commit

Permalink
Fix OTLP receiver Shutdown() bug
Browse files Browse the repository at this point in the history
OTLP receiver had a bug due to which it could generate data after Shutdown()
was returned.

It was visible in TestShutdown, with the following symptoms (it was a racy bug):

=== RUN   TestShutdown
    otlp_test.go:871:
        	Error Trace:	otlp_test.go:871
        	Error:      	Not equal:
        	            	expected: 4
        	            	actual  : 5
        	Test:       	TestShutdown
    otlp_test.go:871:
        	Error Trace:	otlp_test.go:871
        	Error:      	Not equal:
        	            	expected: 6
        	            	actual  : 7
        	Test:       	TestShutdown
--- FAIL: TestShutdown (0.49s)

I added a Helper which allows to wait until all in-flight operations are complete
before the Shutdown() function returns.

The Helper can be used by other components too in the future.
  • Loading branch information
tigrannajaryan committed Feb 26, 2021
1 parent e6319ac commit 30ca26c
Show file tree
Hide file tree
Showing 8 changed files with 7,265 additions and 30 deletions.
12 changes: 12 additions & 0 deletions receiver/otlpreceiver/logs/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.opentelemetry.io/collector/internal"
collectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/receiver/otlpreceiver/shutdownhelper"
)

const (
Expand All @@ -33,13 +34,15 @@ const (
type Receiver struct {
instanceName string
nextConsumer consumer.LogsConsumer
helper shutdownhelper.Helper
}

// New creates a new Receiver reference.
func New(instanceName string, nextConsumer consumer.LogsConsumer) *Receiver {
r := &Receiver{
instanceName: instanceName,
nextConsumer: nextConsumer,
helper: shutdownhelper.NewHelper(),
}

return r
Expand All @@ -50,7 +53,16 @@ const (
receiverTransport = "grpc"
)

func (r *Receiver) Shutdown(ctx context.Context) error {
return r.helper.Shutdown(ctx)
}

func (r *Receiver) Export(ctx context.Context, req *collectorlog.ExportLogsServiceRequest) (*collectorlog.ExportLogsServiceResponse, error) {
if err := r.helper.BeginOperation(); err != nil {
return nil, err
}
defer r.helper.EndOperation()

// We need to ensure that it propagates the receiver name as a tag
ctxWithReceiverName := obsreport.ReceiverContext(ctx, r.instanceName, receiverTransport)

Expand Down
12 changes: 12 additions & 0 deletions receiver/otlpreceiver/metrics/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
collectormetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/receiver/otlpreceiver/shutdownhelper"
)

const (
Expand All @@ -32,13 +33,15 @@ const (
type Receiver struct {
instanceName string
nextConsumer consumer.MetricsConsumer
helper shutdownhelper.Helper
}

// New creates a new Receiver reference.
func New(instanceName string, nextConsumer consumer.MetricsConsumer) *Receiver {
r := &Receiver{
instanceName: instanceName,
nextConsumer: nextConsumer,
helper: shutdownhelper.NewHelper(),
}
return r
}
Expand All @@ -48,7 +51,16 @@ const (
receiverTransport = "grpc"
)

func (r *Receiver) Shutdown(ctx context.Context) error {
return r.helper.Shutdown(ctx)
}

func (r *Receiver) Export(ctx context.Context, req *collectormetrics.ExportMetricsServiceRequest) (*collectormetrics.ExportMetricsServiceResponse, error) {
if err := r.helper.BeginOperation(); err != nil {
return nil, err
}
defer r.helper.EndOperation()

receiverCtx := obsreport.ReceiverContext(ctx, r.instanceName, receiverTransport)

md := pdata.MetricsFromOtlp(req.ResourceMetrics)
Expand Down
23 changes: 22 additions & 1 deletion receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (r *otlpReceiver) Start(_ context.Context, host component.Host) error {
}

// Shutdown is a method to turn off receiving.
func (r *otlpReceiver) Shutdown(context.Context) error {
func (r *otlpReceiver) Shutdown(ctx context.Context) error {
var err error
r.stopOnce.Do(func() {
err = nil
Expand All @@ -180,6 +180,27 @@ func (r *otlpReceiver) Shutdown(context.Context) error {
if r.serverGRPC != nil {
r.serverGRPC.Stop()
}

if r.traceReceiver != nil {
err2 := r.traceReceiver.Shutdown(ctx)
if err == nil && err2 != nil {
err = err2
}
}

if r.metricsReceiver != nil {
err2 := r.metricsReceiver.Shutdown(ctx)
if err == nil && err2 != nil {
err = err2
}
}

if r.logReceiver != nil {
err2 := r.logReceiver.Shutdown(ctx)
if err == nil && err2 != nil {
err = err2
}
}
})
return err
}
Expand Down
174 changes: 145 additions & 29 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,15 +370,13 @@ func TestProtoHttp(t *testing.T) {
}
}
}
func testHTTPProtobufRequest(

func createHTTPProtobufRequest(
t *testing.T,
url string,
tSink *consumertest.TracesSink,
encoding string,
traceBytes []byte,
expectedErr error,
wantOtlp []*otlptrace.ResourceSpans,
) {
) *http.Request {
var buf *bytes.Buffer
var err error
switch encoding {
Expand All @@ -388,11 +386,25 @@ func testHTTPProtobufRequest(
default:
buf = bytes.NewBuffer(traceBytes)
}
tSink.SetConsumeError(expectedErr)
req, err := http.NewRequest("POST", url, buf)
require.NoError(t, err, "Error creating trace POST request: %v", err)
req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("Content-Encoding", encoding)
return req
}

func testHTTPProtobufRequest(
t *testing.T,
url string,
tSink *consumertest.TracesSink,
encoding string,
traceBytes []byte,
expectedErr error,
wantOtlp []*otlptrace.ResourceSpans,
) {
tSink.SetConsumeError(expectedErr)

req := createHTTPProtobufRequest(t, url, encoding, traceBytes)

client := &http.Client{}
resp, err := client.Do(req)
Expand All @@ -402,14 +414,14 @@ func testHTTPProtobufRequest(
require.NoError(t, err, "Error reading response from trace grpc-gateway")
require.NoError(t, resp.Body.Close(), "Error closing response body")

allTraces := tSink.AllTraces()

require.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type"), "Unexpected response Content-Type")

allTraces := tSink.AllTraces()

if expectedErr == nil {
require.Equal(t, 200, resp.StatusCode, "Unexpected return status")
tmp := &collectortrace.ExportTraceServiceResponse{}
err = tmp.Unmarshal(respBytes)
err := tmp.Unmarshal(respBytes)
require.NoError(t, err, "Unable to unmarshal response to ExportTraceServiceResponse proto")

require.Len(t, allTraces, 1)
Expand Down Expand Up @@ -556,6 +568,29 @@ func TestHTTPStartWithoutConsumers(t *testing.T) {
require.Error(t, r.Start(context.Background(), componenttest.NewNopHost()))
}

func createSingleSpanTrace() *collectortrace.ExportTraceServiceRequest {
return &collectortrace.ExportTraceServiceRequest{
ResourceSpans: []*otlptrace.ResourceSpans{
{
InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{
{
Spans: []*otlptrace.Span{
{
TraceId: data.NewTraceID(
[16]byte{
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08,
0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10,
},
),
},
},
},
},
},
},
}
}

// TestOTLPReceiverTrace_HandleNextConsumerResponse checks if the trace receiver
// is returning the proper response (return and metrics) when the next consumer
// in the pipeline reports error. The test changes the responses returned by the
Expand Down Expand Up @@ -595,26 +630,7 @@ func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) {
}

addr := testutil.GetAvailableLocalAddress(t)
req := &collectortrace.ExportTraceServiceRequest{
ResourceSpans: []*otlptrace.ResourceSpans{
{
InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{
{
Spans: []*otlptrace.Span{
{
TraceId: data.NewTraceID(
[16]byte{
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08,
0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10,
},
),
},
},
},
},
},
},
}
req := createSingleSpanTrace()

exportBidiFn := func(
t *testing.T,
Expand Down Expand Up @@ -775,3 +791,103 @@ func compressGzip(body []byte) (*bytes.Buffer, error) {

return &buf, nil
}

type senderFunc func(msg *collectortrace.ExportTraceServiceRequest)

func TestShutdown(t *testing.T) {
endpointGrpc := testutil.GetAvailableLocalAddress(t)
endpointHTTP := testutil.GetAvailableLocalAddress(t)

nextSink := new(consumertest.TracesSink)

// Create OTLP receiver with gRPC and HTTP protocols.
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.SetName(otlpReceiverName)
cfg.GRPC.NetAddr.Endpoint = endpointGrpc
cfg.HTTP.Endpoint = endpointHTTP
ocr := newReceiver(t, factory, cfg, nextSink, nil)
require.NotNil(t, ocr)
require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()))

conn, err := grpc.Dial(endpointGrpc, grpc.WithInsecure(), grpc.WithBlock())
require.NoError(t, err)
defer conn.Close()

doneSignalGrpc := make(chan bool)
doneSignalHTTP := make(chan bool)

senderGrpc := func(msg *collectortrace.ExportTraceServiceRequest) {
// Send request via OTLP/gRPC.
client := collectortrace.NewTraceServiceClient(conn)
client.Export(context.Background(), msg)
}
senderHTTP := func(msg *collectortrace.ExportTraceServiceRequest) {
// Send request via OTLP/HTTP.
traceBytes, err2 := msg.Marshal()
if err2 != nil {
t.Errorf("Error marshaling protobuf: %v", err2)
}
url := fmt.Sprintf("http://%s/v1/traces", endpointHTTP)
req := createHTTPProtobufRequest(t, url, "", traceBytes)
client := &http.Client{}
resp, err2 := client.Do(req)
if err2 == nil {
ioutil.ReadAll(resp.Body)
}
}

// Send traces to the receiver until we signal via done channel, and then continue
// sending some more traces after that.
go generateTraces(senderGrpc, doneSignalGrpc)
go generateTraces(senderHTTP, doneSignalHTTP)

// Wait until the receiver outputs anything to the sink.
assert.Eventually(t, func() bool {
return nextSink.SpansCount() > 0
}, time.Second, 1*time.Millisecond)

// Now shutdown the receiver, while continuing sending traces to it.
ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
defer cancelFn()
err = ocr.Shutdown(ctx)
assert.NoError(t, err)

// Remember how many spans the sink received. This number should not change after this
// point because after Shutdown() returns the component is not allowed to produce
// any more data.
sinkSpanCountAfterShutdown := nextSink.SpansCount()

// Now signal to generateTraces to exit the main generation loop, then send
// one more trace and stop.
doneSignalGrpc <- true
doneSignalHTTP <- true

// Wait until all follow up traces are sent.
<-doneSignalGrpc
<-doneSignalHTTP

// The follow up traces should not be received by sink, so the number of spans in
// the sink should not change.
assert.EqualValues(t, sinkSpanCountAfterShutdown, nextSink.SpansCount())
}

func generateTraces(senderFn senderFunc, doneSignal chan bool) {
// Continuously generate spans until signaled to stop.
loop:
for {
select {
case <-doneSignal:
break loop
default:
}
senderFn(createSingleSpanTrace())
}

// After getting the signal to stop, send one more span and then
// finally stop. We should never receive this last span.
senderFn(createSingleSpanTrace())

// Indicate that we are done.
close(doneSignal)
}
Loading

0 comments on commit 30ca26c

Please sign in to comment.