From 6707cd46584e2d3d5c6528a2c671353c1f1b8427 Mon Sep 17 00:00:00 2001 From: soldierkam Date: Sat, 25 Mar 2017 00:12:57 +0100 Subject: [PATCH 1/7] Add read timeout to socket_listener --- plugins/inputs/socket_listener/README.md | 5 +++++ plugins/inputs/socket_listener/socket_listener.go | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/plugins/inputs/socket_listener/README.md b/plugins/inputs/socket_listener/README.md index e73296804b202..73d1c3f840db1 100644 --- a/plugins/inputs/socket_listener/README.md +++ b/plugins/inputs/socket_listener/README.md @@ -29,6 +29,11 @@ This is a sample configuration for the plugin. ## Only applies to stream sockets (e.g. TCP). ## 0 (default) is unlimited. # max_connections = 1024 + + ## Read deadline (timeout). + ## Only applies to stream sockets (e.g. TCP). + ## 0 (default) is unlimited. + # read_deadline = 30 ## Maximum socket buffer size in bytes. ## For stream sockets, once the buffer fills up, the sender will start backing up. diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index 9d3a8e1fe4ece..c06c9c6c05106 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" + "time" ) type setReadBufferer interface { @@ -75,6 +76,9 @@ func (ssl *streamSocketListener) read(c net.Conn) { for _, m := range metrics { ssl.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) } + if ssl.ReadDeadline > 0 { + c.SetReadDeadline(time.Now().Add(time.Duration(ssl.ReadDeadline) * time.Second)) + } } if err := scnr.Err(); err != nil { @@ -112,6 +116,7 @@ type SocketListener struct { ServiceAddress string MaxConnections int ReadBufferSize int + ReadDeadline int parsers.Parser telegraf.Accumulator From aa801fd92c7ffe5ed8f898902fbade3a0e524f9c Mon Sep 17 00:00:00 2001 From: soldierkam Date: Thu, 6 Apr 2017 21:47:40 +0200 Subject: [PATCH 2/7] Read timeout cleanup -Rename timeout parameter -Use internal.Duration --- CHANGELOG.md | 1 + plugins/inputs/socket_listener/README.md | 4 ++-- plugins/inputs/socket_listener/socket_listener.go | 7 ++++--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 509d6f2f1e7dc..9bc6f0bf38939 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ be deprecated eventually. - [#2244](https://github.com/influxdata/telegraf/pull/2244): Support ipmi_sensor plugin querying local ipmi sensors. - [#2339](https://github.com/influxdata/telegraf/pull/2339): Increment gather_errors for all errors emitted by inputs. - [#2071](https://github.com/influxdata/telegraf/issues/2071): Use official docker SDK. +- [#2571](https://github.com/influxdata/telegraf/pull/2571): Add read timeout to socket_listener ### Bugfixes diff --git a/plugins/inputs/socket_listener/README.md b/plugins/inputs/socket_listener/README.md index 73d1c3f840db1..a07b075d60d40 100644 --- a/plugins/inputs/socket_listener/README.md +++ b/plugins/inputs/socket_listener/README.md @@ -30,10 +30,10 @@ This is a sample configuration for the plugin. ## 0 (default) is unlimited. # max_connections = 1024 - ## Read deadline (timeout). + ## Read timeout. ## Only applies to stream sockets (e.g. TCP). ## 0 (default) is unlimited. - # read_deadline = 30 + # read_timeout = "30s" ## Maximum socket buffer size in bytes. ## For stream sockets, once the buffer fills up, the sender will start backing up. diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index c06c9c6c05106..e6372771934c3 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -10,6 +10,7 @@ import ( "sync" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" "time" @@ -76,8 +77,8 @@ func (ssl *streamSocketListener) read(c net.Conn) { for _, m := range metrics { ssl.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) } - if ssl.ReadDeadline > 0 { - c.SetReadDeadline(time.Now().Add(time.Duration(ssl.ReadDeadline) * time.Second)) + if ssl.ReadTimeout.Duration > 0 { + c.SetReadDeadline(time.Now().Add(ssl.ReadTimeout.Duration * time.Second)) } } @@ -116,7 +117,7 @@ type SocketListener struct { ServiceAddress string MaxConnections int ReadBufferSize int - ReadDeadline int + ReadTimeout internal.Duration parsers.Parser telegraf.Accumulator From d51dfab1cf8b3f5b07616932ee12541555b79e0d Mon Sep 17 00:00:00 2001 From: soldierkam Date: Fri, 7 Apr 2017 13:20:18 +0200 Subject: [PATCH 3/7] Setup read timeout at loop start --- plugins/inputs/socket_listener/socket_listener.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index 0ee587e50b82b..ebb7addfcf5e2 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -70,7 +70,13 @@ func (ssl *streamSocketListener) read(c net.Conn) { defer c.Close() scnr := bufio.NewScanner(c) - for scnr.Scan() { + for { + if ssl.ReadTimeout.Duration > 0 { + c.SetReadDeadline(time.Now().Add(ssl.ReadTimeout.Duration)) + } + if !scnr.Scan() { + break + } metrics, err := ssl.Parse(scnr.Bytes()) if err != nil { ssl.AddError(fmt.Errorf("unable to parse incoming line")) @@ -80,9 +86,6 @@ func (ssl *streamSocketListener) read(c net.Conn) { for _, m := range metrics { ssl.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) } - if ssl.ReadTimeout.Duration > 0 { - c.SetReadDeadline(time.Now().Add(ssl.ReadTimeout.Duration * time.Second)) - } } if err := scnr.Err(); err != nil { From f7d16d7d156688d6cf45cad4128523f8e18063f1 Mon Sep 17 00:00:00 2001 From: soldierkam Date: Wed, 14 Jun 2017 22:02:28 +0200 Subject: [PATCH 4/7] Log timeout error on debug level --- plugins/inputs/socket_listener/socket_listener.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index ebb7addfcf5e2..88c9adac2e738 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -89,7 +89,9 @@ func (ssl *streamSocketListener) read(c net.Conn) { } if err := scnr.Err(); err != nil { - if !strings.HasSuffix(err.Error(), ": use of closed network connection") { + if err, ok := err.(net.Error); ok && err.Timeout() { + log.Printf("D! Timeout in plugin [input.socket_listener]: %s", err) + } else if !strings.HasSuffix(err.Error(), ": use of closed network connection") { ssl.AddError(err) } } From c08f77aae6152a81e2ee7bead417678fc65a8292 Mon Sep 17 00:00:00 2001 From: soldierkam Date: Wed, 14 Jun 2017 22:44:47 +0200 Subject: [PATCH 5/7] Format source code and amend plugin description --- plugins/inputs/socket_listener/socket_listener.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index b1540c4d74094..97c713517fbbc 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -148,10 +148,10 @@ func (psl *packetSocketListener) listen() { } type SocketListener struct { - ServiceAddress string - MaxConnections int - ReadBufferSize int - ReadTimeout internal.Duration + ServiceAddress string + MaxConnections int + ReadBufferSize int + ReadTimeout *internal.Duration KeepAlivePeriod *internal.Duration parsers.Parser @@ -182,6 +182,11 @@ func (sl *SocketListener) SampleConfig() string { ## 0 (default) is unlimited. # max_connections = 1024 + ## Read timeout. + ## Only applies to stream sockets (e.g. TCP). + ## 0 (default) is unlimited. + # read_timeout = "30s" + ## Maximum socket buffer size in bytes. ## For stream sockets, once the buffer fills up, the sender will start backing up. ## For datagram sockets, once the buffer fills up, metrics will start dropping. From 49b9e95f6ce5c48dd6dd6a318c7bf92c82638b99 Mon Sep 17 00:00:00 2001 From: soldierkam Date: Wed, 14 Jun 2017 22:52:20 +0200 Subject: [PATCH 6/7] Update CHANGELOG --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b78eb6bb8b04d..4143d4b2a363b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ - [#2734](https://github.com/influxdata/telegraf/pull/2734): Add include/exclude filters for docker containers. - [#2602](https://github.com/influxdata/telegraf/pull/2602): Add secure connection support to graphite output. - [#2908](https://github.com/influxdata/telegraf/pull/2908): Add min/max response time on linux/darwin to ping. +- [#2571](https://github.com/influxdata/telegraf/pull/2571): Add read timeout to socket_listener ### Bugfixes @@ -125,7 +126,6 @@ be deprecated eventually. - [#2587](https://github.com/influxdata/telegraf/pull/2587): Add json timestamp units configurability - [#2597](https://github.com/influxdata/telegraf/issues/2597): Add support for Linux sysctl-fs metrics. - [#2425](https://github.com/influxdata/telegraf/pull/2425): Support to include/exclude docker container labels as tags -- [#2571](https://github.com/influxdata/telegraf/pull/2571): Add read timeout to socket_listener - [#1667](https://github.com/influxdata/telegraf/pull/1667): dmcache input plugin - [#2637](https://github.com/influxdata/telegraf/issues/2637): Add support for precision in http_listener - [#2636](https://github.com/influxdata/telegraf/pull/2636): Add `message_len_max` option to `kafka_consumer` input From aa1bb6dc3b552df5bfbf9084a534ab8dfb71bbd9 Mon Sep 17 00:00:00 2001 From: soldierkam Date: Wed, 14 Jun 2017 23:11:29 +0200 Subject: [PATCH 7/7] Fix nil pointer dereference --- plugins/inputs/socket_listener/socket_listener.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index 97c713517fbbc..6aefd96a697e9 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -93,7 +93,7 @@ func (ssl *streamSocketListener) read(c net.Conn) { scnr := bufio.NewScanner(c) for { - if ssl.ReadTimeout.Duration > 0 { + if ssl.ReadTimeout != nil && ssl.ReadTimeout.Duration > 0 { c.SetReadDeadline(time.Now().Add(ssl.ReadTimeout.Duration)) } if !scnr.Scan() {