Skip to content

Commit

Permalink
TLS and MTLS enhancements to HTTPListener input plugin (#3191)
Browse files Browse the repository at this point in the history
  • Loading branch information
allingeek authored and danielnelson committed Sep 8, 2017
1 parent 247c2e7 commit c809deb
Show file tree
Hide file tree
Showing 3 changed files with 357 additions and 41 deletions.
11 changes: 11 additions & 0 deletions plugins/inputs/http_listener/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ The `/write` endpoint supports the `precision` query parameter and can be set to

When chaining Telegraf instances using this plugin, CREATE DATABASE requests receive a 200 OK response with message body `{"results":[]}` but they are not relayed. The output configuration of the Telegraf instance which ultimately submits data to InfluxDB determines the destination database.

Enable TLS by specifying the file names of a service TLS certificate and key.

Enable mutually authenticated TLS and authorize client connections by signing certificate authority by including a list of allowed CA certificate file names in ````tls_allowed_cacerts````.

See: [Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md#influx).

**Example:**
Expand All @@ -28,4 +32,11 @@ This is a sample configuration for the plugin.
## timeouts
read_timeout = "10s"
write_timeout = "10s"

## HTTPS
tls_cert= "/etc/telegraf/cert.pem"
tls_key = "/etc/telegraf/key.pem"

## MTLS
tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
```
95 changes: 72 additions & 23 deletions plugins/inputs/http_listener/http_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package http_listener
import (
"bytes"
"compress/gzip"
"crypto/tls"
"crypto/x509"
"io"
"io/ioutil"
"log"
"net"
"net/http"
Expand Down Expand Up @@ -37,6 +40,10 @@ type HTTPListener struct {
MaxLineSize int
Port int

TlsAllowedCacerts []string
TlsCert string
TlsKey string

mu sync.Mutex
wg sync.WaitGroup

Expand Down Expand Up @@ -75,6 +82,14 @@ const sampleConfig = `
## Maximum line size allowed to be sent in bytes.
## 0 means to use the default of 65536 bytes (64 kibibytes)
max_line_size = 0
## Set one or more allowed client CA certificate file names to
## enable mutually authenticated TLS connections
tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
## Add service certificate and key
tls_cert = "/etc/telegraf/cert.pem"
tls_key = "/etc/telegraf/key.pem"
`

func (h *HTTPListener) SampleConfig() string {
Expand Down Expand Up @@ -117,10 +132,33 @@ func (h *HTTPListener) Start(acc telegraf.Accumulator) error {
h.MaxLineSize = DEFAULT_MAX_LINE_SIZE
}

if h.ReadTimeout.Duration < time.Second {
h.ReadTimeout.Duration = time.Second * 10
}
if h.WriteTimeout.Duration < time.Second {
h.WriteTimeout.Duration = time.Second * 10
}

h.acc = acc
h.pool = NewPool(200, h.MaxLineSize)

var listener, err = net.Listen("tcp", h.ServiceAddress)
tlsConf := h.getTLSConfig()

server := &http.Server{
Addr: h.ServiceAddress,
Handler: h,
ReadTimeout: h.ReadTimeout.Duration,
WriteTimeout: h.WriteTimeout.Duration,
TLSConfig: tlsConf,
}

var err error
var listener net.Listener
if tlsConf != nil {
listener, err = tls.Listen("tcp", h.ServiceAddress, tlsConf)
} else {
listener, err = net.Listen("tcp", h.ServiceAddress)
}
if err != nil {
return err
}
Expand All @@ -130,7 +168,7 @@ func (h *HTTPListener) Start(acc telegraf.Accumulator) error {
h.wg.Add(1)
go func() {
defer h.wg.Done()
h.httpListen()
server.Serve(h.listener)
}()

log.Printf("I! Started HTTP listener service on %s\n", h.ServiceAddress)
Expand All @@ -149,27 +187,6 @@ func (h *HTTPListener) Stop() {
log.Println("I! Stopped HTTP listener service on ", h.ServiceAddress)
}

// httpListen sets up an http.Server and calls server.Serve.
// like server.Serve, httpListen will always return a non-nil error, for this
// reason, the error returned should probably be ignored.
// see https://golang.org/pkg/net/http/#Server.Serve
func (h *HTTPListener) httpListen() error {
if h.ReadTimeout.Duration < time.Second {
h.ReadTimeout.Duration = time.Second * 10
}
if h.WriteTimeout.Duration < time.Second {
h.WriteTimeout.Duration = time.Second * 10
}

var server = http.Server{
Handler: h,
ReadTimeout: h.ReadTimeout.Duration,
WriteTimeout: h.WriteTimeout.Duration,
}

return server.Serve(h.listener)
}

func (h *HTTPListener) ServeHTTP(res http.ResponseWriter, req *http.Request) {
h.RequestsRecv.Incr(1)
defer h.RequestsServed.Incr(1)
Expand Down Expand Up @@ -327,6 +344,38 @@ func badRequest(res http.ResponseWriter) {
res.Write([]byte(`{"error":"http: bad request"}`))
}

func (h *HTTPListener) getTLSConfig() *tls.Config {
tlsConf := &tls.Config{
InsecureSkipVerify: false,
Renegotiation: tls.RenegotiateNever,
}

if len(h.TlsCert) == 0 || len(h.TlsKey) == 0 {
return nil
}

cert, err := tls.LoadX509KeyPair(h.TlsCert, h.TlsKey)
if err != nil {
return nil
}
tlsConf.Certificates = []tls.Certificate{cert}

if h.TlsAllowedCacerts != nil {
tlsConf.ClientAuth = tls.RequireAndVerifyClientCert
clientPool := x509.NewCertPool()
for _, ca := range h.TlsAllowedCacerts {
c, err := ioutil.ReadFile(ca)
if err != nil {
continue
}
clientPool.AppendCertsFromPEM(c)
}
tlsConf.ClientCAs = clientPool
}

return tlsConf
}

func init() {
inputs.Add("http_listener", func() telegraf.Input {
return &HTTPListener{
Expand Down
Loading

0 comments on commit c809deb

Please sign in to comment.