Skip to content

Commit

Permalink
[receiver/opencensus]: check for ErrListenerClosed when reporting mul…
Browse files Browse the repository at this point in the history
…tiplexer status (open-telemetry#34093)

Fixes open-telemetry#33865

---------

Signed-off-by: odubajDT <[email protected]>
  • Loading branch information
odubajDT authored Jul 19, 2024
1 parent c389bb2 commit aab1424
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 1 deletion.
27 changes: 27 additions & 0 deletions .chloggen/correctness-traces.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opencensusreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Do not report an error into resource status during receiver shutdown when the listener connection was closed.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33865]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
4 changes: 3 additions & 1 deletion receiver/opencensusreceiver/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,12 @@ func (ocr *ocReceiver) Start(ctx context.Context, host component.Host) error {
if !hasConsumer {
return errors.New("cannot start receiver: no consumers were specified")
}

ocr.ln, err = net.Listen(string(ocr.cfg.NetAddr.Transport), ocr.cfg.NetAddr.Endpoint)
if err != nil {
return fmt.Errorf("failed to bind to address %q: %w", ocr.cfg.NetAddr.Endpoint, err)
}

// Register the grpc-gateway on the HTTP server mux
var c context.Context
c, ocr.cancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -155,7 +157,7 @@ func (ocr *ocReceiver) Start(ctx context.Context, host component.Host) error {
}()
go func() {
startWG.Done()
if err := ocr.multiplexer.Serve(); !errors.Is(err, cmux.ErrServerClosed) && err != nil {
if err := ocr.multiplexer.Serve(); !errors.Is(err, cmux.ErrServerClosed) && !errors.Is(err, cmux.ErrListenerClosed) && err != nil {
ocr.settings.TelemetrySettings.ReportStatus(component.NewFatalErrorEvent(err))
}
}()
Expand Down
83 changes: 83 additions & 0 deletions receiver/opencensusreceiver/opencensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,89 @@ func TestStartWithoutConsumersShouldFail(t *testing.T) {
require.Error(t, r.Start(context.Background(), componenttest.NewNopHost()))
}

func TestStartListenerClosed(t *testing.T) {

addr := testutil.GetAvailableLocalAddress(t)

// Set the buffer count to 1 to make it flush the test span immediately.
sink := new(consumertest.TracesSink)
cfg := &Config{
ServerConfig: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: addr,
Transport: "tcp",
},
},
}
ocr := newOpenCensusReceiver(cfg, sink, nil, receivertest.NewNopSettings())

ctx := context.Background()

// start receiver
err := ocr.Start(ctx, componenttest.NewNopHost())
require.NoError(t, err, "Failed to start trace receiver: %v", err)

url := fmt.Sprintf("http://%s/v1/trace", addr)

traceJSON := []byte(`
{
"node":{"identifier":{"hostName":"testHost"}},
"spans":[
{
"traceId":"W47/95gDgQPSabYzgT/GDA==",
"spanId":"7uGbfsPBsXM=",
"name":{"value":"testSpan"},
"startTime":"2018-12-13T14:51:00Z",
"endTime":"2018-12-13T14:51:01Z",
"attributes": {
"attributeMap": {
"attr1": {"intValue": "55"}
}
}
}
]
}`)

// send request to verify listener is working
req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(traceJSON))
require.NoError(t, err, "Error creating trace POST request: %v", err)
req.Header.Set("Content-Type", "application/json")

client := &http.Client{}
defer client.CloseIdleConnections()
resp, err := client.Do(req)
require.NoError(t, err, "Error posting trace to grpc-gateway server: %v", err)

defer func() {
require.NoError(t, resp.Body.Close())
}()

respBytes, err := io.ReadAll(resp.Body)
require.NoError(t, err)
respStr := string(respBytes)

assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Empty(t, respStr)

// stop the listener
ocr.ln.Close()

// verify trace was sent
got := sink.AllTraces()
require.Len(t, got, 1)
require.Equal(t, 1, got[0].ResourceSpans().Len())
gotNode, gotResource, gotSpans := opencensus.ResourceSpansToOC(got[0].ResourceSpans().At(0))

wantNode := &commonpb.Node{Identifier: &commonpb.ProcessIdentifier{HostName: "testHost"}}
wantResource := &resourcepb.Resource{}
assert.True(t, proto.Equal(wantNode, gotNode))
assert.True(t, proto.Equal(wantResource, gotResource))
require.Len(t, gotSpans, 1)

// stop the receiver to verify it's not blocked by the closed listener
require.NoError(t, ocr.Shutdown(ctx))
}

func tempSocketName(t *testing.T) string {
tmpfile, err := os.CreateTemp("", "sock")
require.NoError(t, err)
Expand Down

0 comments on commit aab1424

Please sign in to comment.