Skip to content

Commit

Permalink
[receiver/carbon] Fix memory leak (#24276)
Browse files Browse the repository at this point in the history
**Description:** 
The carbonreceiver has a memory leak where it will repeatedly open new
obsrecv operations but not close them afterwards. Those operations
eventually create a burden.
  
The fix is to make sure the receiver only creates an operation per
interaction over TCP.

**Link to tracking Issue:**
Fixes #24275

**Testing:**
Added a test showing TCP reconnections no longer start additional spans.
  • Loading branch information
atoulme authored Aug 8, 2023
1 parent f70c6b0 commit e314b70
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 13 deletions.
23 changes: 23 additions & 0 deletions .chloggen/carbon-receiver-obsrecv-memory-leak.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: carbonreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix Carbon receiver obsrecv operations memory leak

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [24275]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The carbonreceiver has a memory leak where it will repeatedly open new obsrecv operations but not close them afterwards. Those operations eventually create a burden.
The fix is to make sure the receiver only creates an operation per interaction over TCP.
2 changes: 1 addition & 1 deletion receiver/carbonreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
go.opentelemetry.io/collector/consumer v0.82.0
go.opentelemetry.io/collector/pdata v1.0.0-rcv0014
go.opentelemetry.io/collector/receiver v0.82.0
go.opentelemetry.io/otel/sdk v1.16.0
go.uber.org/zap v1.25.0
)

Expand Down Expand Up @@ -50,7 +51,6 @@ require (
go.opentelemetry.io/otel v1.16.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.39.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/otel/sdk v1.16.0 // indirect
go.opentelemetry.io/otel/sdk/metric v0.39.0 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
41 changes: 31 additions & 10 deletions receiver/carbonreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver/receivertest"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/protocol"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/transport/client"
)

Expand Down Expand Up @@ -170,17 +171,28 @@ func Test_carbonreceiver_EndToEnd(t *testing.T) {
tests := []struct {
name string
configFn func() *Config
clientFn func(t *testing.T) *client.Graphite
clientFn func(t *testing.T) func(client.Metric) error
}{
{
name: "default_config",
configFn: func() *Config {
return createDefaultConfig().(*Config)
},
clientFn: func(t *testing.T) *client.Graphite {
clientFn: func(t *testing.T) func(client.Metric) error {
c, err := client.NewGraphite(client.TCP, addr)
require.NoError(t, err)
return c
return c.SendMetric
},
},
{
name: "tcp_reconnect",
configFn: func() *Config {
return createDefaultConfig().(*Config)
},
clientFn: func(t *testing.T) func(client.Metric) error {
c, err := client.NewGraphite(client.TCP, addr)
require.NoError(t, err)
return c.SputterThenSendMetric
},
},
{
Expand All @@ -190,10 +202,10 @@ func Test_carbonreceiver_EndToEnd(t *testing.T) {
cfg.Transport = "udp"
return cfg
},
clientFn: func(t *testing.T) *client.Graphite {
clientFn: func(t *testing.T) func(client.Metric) error {
c, err := client.NewGraphite(client.UDP, addr)
require.NoError(t, err)
return c
return c.SendMetric
},
},
}
Expand All @@ -202,11 +214,16 @@ func Test_carbonreceiver_EndToEnd(t *testing.T) {
cfg := tt.configFn()
cfg.Endpoint = addr
sink := new(consumertest.MetricsSink)
rcv, err := New(receivertest.NewNopCreateSettings(), *cfg, sink)
recorder := tracetest.NewSpanRecorder()
rt := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(recorder))
cs := receivertest.NewNopCreateSettings()
cs.TracerProvider = rt
rcv, err := New(cs, *cfg, sink)
require.NoError(t, err)
r := rcv.(*carbonReceiver)

mr := transport.NewMockReporter(1)
mr, err := newReporter(cs)
require.NoError(t, err)
r.reporter = mr

require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -223,17 +240,21 @@ func Test_carbonreceiver_EndToEnd(t *testing.T) {
Value: 1.23,
Timestamp: ts,
}
err = snd.SendMetric(carbonMetric)

err = snd(carbonMetric)
require.NoError(t, err)

mr.WaitAllOnMetricsProcessedCalls()
require.Eventually(t, func() bool {
return len(recorder.Ended()) == 1
}, 30*time.Second, 100*time.Millisecond)

mdd := sink.AllMetrics()
require.Len(t, mdd, 1)
require.Equal(t, 1, mdd[0].MetricCount())
m := mdd[0].ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0)
assert.Equal(t, carbonMetric.Name, m.Name())
require.Equal(t, 1, m.Gauge().DataPoints().Len())
require.Equal(t, len(recorder.Ended()), len(recorder.Started()))
})
}
}
22 changes: 22 additions & 0 deletions receiver/carbonreceiver/transport/client/plaintext_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,28 @@ func (g *Graphite) SendMetric(metric Metric) error {
return nil
}

// SputterThenSendMetric method sends a bad partial metric, then the whole metric across.
func (g *Graphite) SputterThenSendMetric(metric Metric) error {
str := metric.String()
for i := 0; i < 5; i++ {
if _, err := fmt.Fprint(g.Conn, ""); err != nil {
return err
}
if err := g.Disconnect(); err != nil {
return err
}
if err := g.connect(TCP); err != nil {
return err
}
}

if _, err := fmt.Fprint(g.Conn, str); err != nil {
return err
}

return nil
}

// SendMetrics method can be used to pass a set of metrics and
// have it be sent to the Graphite host
func (g *Graphite) SendMetrics(metrics []Metric) error {
Expand Down
16 changes: 14 additions & 2 deletions receiver/carbonreceiver/transport/tcp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ func (t *tcpServer) handleConnection(
defer conn.Close()
var span *trace.Span
reader := bufio.NewReader(conn)
reporterActive := false
var ctx context.Context
for {
if span != nil {
span.End()
Expand All @@ -161,11 +163,13 @@ func (t *tcpServer) handleConnection(
// this case).
bytes, err := reader.ReadBytes((byte)('\n'))

// It is possible to have new data in bytes and err to be io.EOF
ctx := t.reporter.OnDataReceived(context.Background())
var numReceivedMetricPoints int
line := strings.TrimSpace(string(bytes))
if line != "" {
if !reporterActive {
ctx = t.reporter.OnDataReceived(context.Background())
reporterActive = true
}
numReceivedMetricPoints++
var metric pmetric.Metric
metric, err = p.Parse(line)
Expand All @@ -178,6 +182,7 @@ func (t *tcpServer) handleConnection(
metric.MoveTo(newMetric)
err = nextConsumer.ConsumeMetrics(ctx, metrics)
t.reporter.OnMetricsProcessed(ctx, numReceivedMetricPoints, err)
reporterActive = false
if err != nil {
// The protocol doesn't account for returning errors.
// Since this is a TCP connection it seems reasonable to close the
Expand All @@ -193,6 +198,9 @@ func (t *tcpServer) handleConnection(
t.reporter.OnDebugf("TCP Transport (%s) - net.OpError: %v", t.ln.Addr(), netErr)
if netErr.Timeout() {
// We want to end on timeout so idle connections are purged.
if reporterActive {
t.reporter.OnMetricsProcessed(ctx, 0, err)
}
span.End()
return
}
Expand All @@ -203,6 +211,10 @@ func (t *tcpServer) handleConnection(
"TCP Transport (%s) - error: %v",
t.ln.Addr(),
err)

if reporterActive {
t.reporter.OnMetricsProcessed(ctx, 0, err)
}
span.End()
return
}
Expand Down

0 comments on commit e314b70

Please sign in to comment.