Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to configure maxIdleConnection for CarbonExporter #30109

Merged
merged 1 commit into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .chloggen/carbonexportermax.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: carbonexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add ability to configure max_idle_conns"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30109]

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
14 changes: 8 additions & 6 deletions exporter/carbonexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry"
)

// Defaults for not specified configuration settings.
const (
defaultEndpoint = "localhost:2003"
)

// Config defines configuration for Carbon exporter.
type Config struct {
// Specifies the connection endpoint config. The default value is "localhost:2003".
confignet.TCPAddr `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
// MaxIdleConns is used to set a limit to the maximum idle TCP connections the client can keep open. Default value is 100.
// If `sending_queue` is enabled, it is recommended to use same value as `sending_queue::num_consumers`.
MaxIdleConns int `mapstructure:"max_idle_conns"`

// Timeout is the maximum duration allowed to connecting and sending the
// data to the Carbon/Graphite backend. The default value is 5s.
Expand All @@ -43,7 +41,11 @@ func (cfg *Config) Validate() error {

// Negative timeouts are not acceptable, since all sends will fail.
if cfg.Timeout < 0 {
return errors.New("exporter requires a positive timeout")
return errors.New("'timeout' must be non-negative")
}

if cfg.MaxIdleConns < 0 {
return errors.New("'max_idle_conns' must be non-negative")
}

return nil
Expand Down
10 changes: 10 additions & 0 deletions exporter/carbonexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func TestLoadConfig(t *testing.T) {
TCPAddr: confignet.TCPAddr{
Endpoint: "localhost:8080",
},
MaxIdleConns: 15,
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 10 * time.Second,
},
Expand Down Expand Up @@ -101,12 +102,21 @@ func TestValidateConfig(t *testing.T) {
{
name: "invalid_timeout",
config: &Config{
TCPAddr: confignet.TCPAddr{Endpoint: defaultEndpoint},
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: -5 * time.Second,
},
},
wantErr: true,
},
{
name: "invalid_max_idle_conns",
config: &Config{
TCPAddr: confignet.TCPAddr{Endpoint: defaultEndpoint},
MaxIdleConns: -1,
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
195 changes: 116 additions & 79 deletions exporter/carbonexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ package carbonexporter // import "github.com/open-telemetry/opentelemetry-collec

import (
"context"
"io"
"net"
"sync"
"time"

"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand All @@ -21,7 +21,8 @@ import (
// newCarbonExporter returns a new Carbon exporter.
func newCarbonExporter(ctx context.Context, cfg *Config, set exporter.CreateSettings) (exporter.Metrics, error) {
sender := carbonSender{
writer: newTCPConnPool(cfg.Endpoint, cfg.Timeout),
writeTimeout: cfg.Timeout,
conns: newConnPool(cfg.TCPAddr, cfg.Timeout, cfg.MaxIdleConns),
}

exp, err := exporterhelper.NewMetricsExporter(
Expand All @@ -44,80 +45,13 @@ func newCarbonExporter(ctx context.Context, cfg *Config, set exporter.CreateSett
// connections into an implementations of exporterhelper.PushMetricsData so
// the exporter can leverage the helper and get consistent observability.
type carbonSender struct {
writer io.WriteCloser
writeTimeout time.Duration
conns connPool
}

func (cs *carbonSender) pushMetricsData(_ context.Context, md pmetric.Metrics) error {
lines := metricDataToPlaintext(md)

if _, err := cs.writer.Write([]byte(lines)); err != nil {
// Use the sum of converted and dropped since the write failed for all.
return err
}

return nil
}

func (cs *carbonSender) Shutdown(context.Context) error {
return cs.writer.Close()
}

// connPool is a very simple implementation of a pool of net.TCPConn instances.
// The implementation hides the pool and exposes a Write and Close methods.
// It leverages the prior art from SignalFx Gateway (see
// https://github.com/signalfx/gateway/blob/master/protocol/carbon/conn_pool.go
// but not its implementation).
//
// It keeps an unbounded "stack" of TCPConn instances always "popping" the most
// recently returned to the pool. There is no accounting to terminating old
// unused connections as that was the case on the prior art mentioned above.
type connPool struct {
mtx sync.Mutex
conns []net.Conn
endpoint string
timeout time.Duration
}

func newTCPConnPool(
endpoint string,
timeout time.Duration,
) io.WriteCloser {
return &connPool{
endpoint: endpoint,
timeout: timeout,
}
}

func (cp *connPool) Write(bytes []byte) (int, error) {
var conn net.Conn
var err error

start := time.Now()
cp.mtx.Lock()
lastIdx := len(cp.conns) - 1
if lastIdx >= 0 {
conn = cp.conns[lastIdx]
cp.conns = cp.conns[0:lastIdx]
}
cp.mtx.Unlock()
if conn == nil {
if conn, err = cp.createTCPConn(); err != nil {
return 0, err
}
}

// The deferred function below is what puts back connections on the pool if no error.
defer func() {
if err != nil {
// err already not nil, so will not influence retry logic because of the connection close.
err = multierr.Append(err, conn.Close())
return
}
cp.mtx.Lock()
cp.conns = append(cp.conns, conn)
cp.mtx.Unlock()
}()

// There is no way to do a call equivalent to recvfrom with an empty buffer
// to check if the connection was terminated (if the size of the buffer is
// 0 the Read call doesn't call lower level). So due to buffer sizes it is
Expand All @@ -136,17 +70,120 @@ func (cp *connPool) Write(bytes []byte) (int, error) {
// facts this "workaround" is not being added at this moment. If it is
// needed in some scenarios the workaround should be validated on other
// platforms and offered as a configuration setting.
conn, err := cs.conns.get()
if err != nil {
return err
}

if err = conn.SetWriteDeadline(time.Now().Add(cs.writeTimeout)); err != nil {
// Do not re-enqueue the connection since it failed to set a deadline.
return multierr.Append(err, conn.Close())
}

// If we did not write all bytes will get an error, so no need to check for that.
_, err = conn.Write([]byte(lines))
if err != nil {
// Do not re-enqueue the connection since it failed to write.
return multierr.Append(err, conn.Close())
}

// Even if we close the connection because of the max idle connections,
cs.conns.put(conn)
return nil
}

func (cs *carbonSender) Shutdown(context.Context) error {
return cs.conns.close()
}

// connPool is a very simple implementation of a pool of net.Conn instances.
type connPool interface {
get() (net.Conn, error)
put(conn net.Conn)
close() error
}

func newConnPool(
tcpConfig confignet.TCPAddr,
timeout time.Duration,
maxIdleConns int,
) connPool {
if maxIdleConns == 0 {
return &nopConnPool{
timeout: timeout,
tcpConfig: tcpConfig,
}
}
return &connPoolWithIdle{
timeout: timeout,
tcpConfig: tcpConfig,
maxIdleConns: maxIdleConns,
}
}

// nopConnPool is a very simple implementation that does not cache any net.Conn.
type nopConnPool struct {
timeout time.Duration
tcpConfig confignet.TCPAddr
}

func (cp *nopConnPool) get() (net.Conn, error) {
return createTCPConn(cp.tcpConfig, cp.timeout)
}

func (cp *nopConnPool) put(conn net.Conn) {
_ = conn.Close()
}

if err = conn.SetWriteDeadline(start.Add(cp.timeout)); err != nil {
return 0, err
func (cp *nopConnPool) close() error {
return nil
}

// connPool is a very simple implementation of a pool of net.Conn instances.
//
// It keeps at most maxIdleConns net.Conn and always "popping" the most
// recently returned to the pool. There is no accounting to terminating old
// unused connections.
type connPoolWithIdle struct {
timeout time.Duration
maxIdleConns int
mtx sync.Mutex
conns []net.Conn
tcpConfig confignet.TCPAddr
}

func (cp *connPoolWithIdle) get() (net.Conn, error) {
if conn := cp.getFromCache(); conn != nil {
return conn, nil
}

return createTCPConn(cp.tcpConfig, cp.timeout)
}

func (cp *connPoolWithIdle) put(conn net.Conn) {
cp.mtx.Lock()
defer cp.mtx.Unlock()
// Do not cache if above limit.
if len(cp.conns) > cp.maxIdleConns {
_ = conn.Close()
return
}
cp.conns = append(cp.conns, conn)
}

var n int
n, err = conn.Write(bytes)
return n, err
func (cp *connPoolWithIdle) getFromCache() net.Conn {
cp.mtx.Lock()
defer cp.mtx.Unlock()
lastIdx := len(cp.conns) - 1
if lastIdx < 0 {
return nil
}
conn := cp.conns[lastIdx]
cp.conns = cp.conns[0:lastIdx]
return conn
}

func (cp *connPool) Close() error {
func (cp *connPoolWithIdle) close() error {
cp.mtx.Lock()
defer cp.mtx.Unlock()

Expand All @@ -158,8 +195,8 @@ func (cp *connPool) Close() error {
return errs
}

func (cp *connPool) createTCPConn() (net.Conn, error) {
c, err := net.DialTimeout("tcp", cp.endpoint, cp.timeout)
func createTCPConn(tcpConfig confignet.TCPAddr, timeout time.Duration) (net.Conn, error) {
c, err := net.DialTimeout("tcp", tcpConfig.Endpoint, timeout)
if err != nil {
return nil, err
}
Expand Down
Loading