Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add read timeout to socket_listener #2571

Closed
wants to merge 9 commits into from
Closed
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
- [#2734](https://github.com/influxdata/telegraf/pull/2734): Add include/exclude filters for docker containers.
- [#2602](https://github.com/influxdata/telegraf/pull/2602): Add secure connection support to graphite output.
- [#2908](https://github.com/influxdata/telegraf/pull/2908): Add min/max response time on linux/darwin to ping.
- [#2571](https://github.com/influxdata/telegraf/pull/2571): Add read timeout to socket_listener

### Bugfixes

Expand Down
5 changes: 5 additions & 0 deletions plugins/inputs/socket_listener/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ This is a sample configuration for the plugin.
## Only applies to stream sockets (e.g. TCP).
## 0 (default) is unlimited.
# max_connections = 1024

## Read timeout.
## Only applies to stream sockets (e.g. TCP).
## 0 (default) is unlimited.
# read_timeout = "30s"

## Maximum socket buffer size in bytes.
## For stream sockets, once the buffer fills up, the sender will start backing up.
Expand Down
19 changes: 17 additions & 2 deletions plugins/inputs/socket_listener/socket_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"time"
)

type setReadBufferer interface {
Expand Down Expand Up @@ -91,7 +92,13 @@ func (ssl *streamSocketListener) read(c net.Conn) {
defer c.Close()

scnr := bufio.NewScanner(c)
for scnr.Scan() {
for {
if ssl.ReadTimeout != nil && ssl.ReadTimeout.Duration > 0 {
c.SetReadDeadline(time.Now().Add(ssl.ReadTimeout.Duration))
}
if !scnr.Scan() {
break
}
metrics, err := ssl.Parse(scnr.Bytes())
if err != nil {
ssl.AddError(fmt.Errorf("unable to parse incoming line: %s", err))
Expand All @@ -104,7 +111,9 @@ func (ssl *streamSocketListener) read(c net.Conn) {
}

if err := scnr.Err(); err != nil {
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
if err, ok := err.(net.Error); ok && err.Timeout() {
log.Printf("D! Timeout in plugin [input.socket_listener]: %s", err)
} else if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
ssl.AddError(err)
}
}
Expand Down Expand Up @@ -142,6 +151,7 @@ type SocketListener struct {
ServiceAddress string
MaxConnections int
ReadBufferSize int
ReadTimeout *internal.Duration
KeepAlivePeriod *internal.Duration

parsers.Parser
Expand Down Expand Up @@ -172,6 +182,11 @@ func (sl *SocketListener) SampleConfig() string {
## 0 (default) is unlimited.
# max_connections = 1024

## Read timeout.
## Only applies to stream sockets (e.g. TCP).
## 0 (default) is unlimited.
# read_timeout = "30s"

## Maximum socket buffer size in bytes.
## For stream sockets, once the buffer fills up, the sender will start backing up.
## For datagram sockets, once the buffer fills up, metrics will start dropping.
Expand Down