Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add generic http input plugin which supports setting an input data format #3546

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ configuration options.
* [graylog](./plugins/inputs/graylog)
* [haproxy](./plugins/inputs/haproxy)
* [hddtemp](./plugins/inputs/hddtemp)
* [http](./plugins/inputs/http) (generic HTTP plugin, supports using input data formats)
* [http_response](./plugins/inputs/http_response)
* [httpjson](./plugins/inputs/httpjson) (generic JSON-emitting http service plugin)
* [internal](./plugins/inputs/internal)
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/graylog"
_ "github.com/influxdata/telegraf/plugins/inputs/haproxy"
_ "github.com/influxdata/telegraf/plugins/inputs/hddtemp"
_ "github.com/influxdata/telegraf/plugins/inputs/http"
_ "github.com/influxdata/telegraf/plugins/inputs/http_listener"
_ "github.com/influxdata/telegraf/plugins/inputs/http_response"
_ "github.com/influxdata/telegraf/plugins/inputs/httpjson"
Expand Down
42 changes: 42 additions & 0 deletions plugins/inputs/http/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# HTTP Input Plugin

The HTTP input plugin collects metrics from one or more HTTP(S) endpoints. The metrics need to be formatted in one of the supported data formats. Each data format has its own unique set of configuration options, read more about them here:
https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md


### Configuration:

This section contains the default TOML to configure the plugin. You can
generate it using `telegraf --usage http`.

```toml
# Read formatted metrics from one or more HTTP endpoints
[[inputs.http]]
## One or more URLs from which to read formatted metrics
urls = [
"http://localhost/metrics"
]

## Optional HTTP Basic Auth Credentials
# username = "username"
# password = "pa$$word"

## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false

# timeout = "5s"

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
# data_format = "influx"
```

### Metrics:

The metrics collected by this input plugin will depend on the configurated `data_format` and the payload returned by the HTTP endpoint(s).
200 changes: 200 additions & 0 deletions plugins/inputs/http/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package http

import (
"errors"
"fmt"
"io/ioutil"
"net/http"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)

type HTTP struct {
URLs []string `toml:"urls"`

Headers map[string]string

// HTTP Basic Auth Credentials
Username string
Password string

// Option to add "url" tag to each metric
TagURL bool `toml:"tag_url"`

// Path to CA file
SSLCA string `toml:"ssl_ca"`
// Path to host cert file
SSLCert string `toml:"ssl_cert"`
// Path to cert key file
SSLKey string `toml:"ssl_key"`
// Use SSL but skip chain & host verification
InsecureSkipVerify bool

Timeout internal.Duration

client *http.Client

// The parser will automatically be set by Telegraf core code because
// this plugin implements the ParserInput interface (i.e. the SetParser method)
parser parsers.Parser
}

var sampleConfig = `
## One or more URLs from which to read formatted metrics
urls = [
"http://localhost/metrics"
]

## Optional HTTP headers
# headers = {"X-Special-Header" = "Special-Value"}

## Optional HTTP Basic Auth Credentials
# username = "username"
# password = "pa$$word"

## Tag all metrics with the url
# tag_url = true

## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false

# timeout = "5s"

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
# data_format = "influx"
`

// SampleConfig returns the default configuration of the Input
func (*HTTP) SampleConfig() string {
return sampleConfig
}

// Description returns a one-sentence description on the Input
func (*HTTP) Description() string {
return "Read formatted metrics from one or more HTTP endpoints"
}

// Gather takes in an accumulator and adds the metrics that the Input
// gathers. This is called every "interval"
func (h *HTTP) Gather(acc telegraf.Accumulator) error {
if h.client == nil {
tlsCfg, err := internal.GetTLSConfig(
h.SSLCert, h.SSLKey, h.SSLCA, h.InsecureSkipVerify)
if err != nil {
return err
}
h.client = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
},
Timeout: h.Timeout.Duration,
}
}

var wg sync.WaitGroup
for _, u := range h.URLs {
wg.Add(1)
go func(url string) {
defer wg.Done()
if err := h.gatherURL(acc, url); err != nil {
acc.AddError(fmt.Errorf("[url=%s]: %s", url, err))
}
}(u)
}

wg.Wait()

return nil
}

// SetParser takes the data_format from the config and finds the right parser for that format
func (h *HTTP) SetParser(parser parsers.Parser) {
h.parser = parser
}

// Gathers data from a particular URL
// Parameters:
// acc : The telegraf Accumulator to use
// url : endpoint to send request to
//
// Returns:
// error: Any error that may have occurred
func (h *HTTP) gatherURL(
acc telegraf.Accumulator,
url string,
) error {
request, err := http.NewRequest("GET", url, nil)
if err != nil {
return err
}

for k, v := range h.Headers {
if strings.ToLower(k) == "host" {
request.Host = v
} else {
request.Header.Add(k, v)
}
}

if h.Username != "" {
request.SetBasicAuth(h.Username, h.Password)
}

resp, err := h.client.Do(request)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("Received status code %d (%s), expected %d (%s)",
resp.StatusCode,
http.StatusText(resp.StatusCode),
http.StatusOK,
http.StatusText(http.StatusOK))
}

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}

if h.parser == nil {
return errors.New("Parser is not set")
}

metrics, err := h.parser.Parse(b)
if err != nil {
return err
}

for _, metric := range metrics {
if h.TagURL {
metric.AddTag("url", url)
}
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}

return nil
}

func init() {
inputs.Add("http", func() telegraf.Input {
return &HTTP{
Timeout: internal.Duration{Duration: time.Second * 5},
}
})
}
117 changes: 117 additions & 0 deletions plugins/inputs/http/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package http_test

import (
"net/http"
"net/http/httptest"
"testing"

plugin "github.com/influxdata/telegraf/plugins/inputs/http"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

func TestHTTPwithJSONFormat(t *testing.T) {
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/endpoint" {
_, _ = w.Write([]byte(simpleJSON))
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer fakeServer.Close()

url := fakeServer.URL + "/endpoint"
plugin := &plugin.HTTP{
URLs: []string{url},
TagURL: true,
}
metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)
plugin.SetParser(p)

var acc testutil.Accumulator
require.NoError(t, acc.GatherError(plugin.Gather))

require.Len(t, acc.Metrics, 1)

// basic check to see if we got the right field, value and tag
var metric = acc.Metrics[0]
require.Equal(t, metric.Measurement, metricName)
require.Len(t, acc.Metrics[0].Fields, 1)
require.Equal(t, acc.Metrics[0].Fields["a"], 1.2)
require.Equal(t, acc.Metrics[0].Tags["url"], url)
}

func TestHTTPHeaders(t *testing.T) {
header := "X-Special-Header"
headerValue := "Special-Value"
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/endpoint" {
if r.Header.Get(header) == headerValue {
_, _ = w.Write([]byte(simpleJSON))
} else {
w.WriteHeader(http.StatusForbidden)
}
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer fakeServer.Close()

url := fakeServer.URL + "/endpoint"
plugin := &plugin.HTTP{
URLs: []string{url},
Headers: map[string]string{header: headerValue},
}
metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)
plugin.SetParser(p)

var acc testutil.Accumulator
require.NoError(t, acc.GatherError(plugin.Gather))
}

func TestInvalidStatusCode(t *testing.T) {
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
}))
defer fakeServer.Close()

url := fakeServer.URL + "/endpoint"
plugin := &plugin.HTTP{
URLs: []string{url},
}

metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)
plugin.SetParser(p)

var acc testutil.Accumulator
require.Error(t, acc.GatherError(plugin.Gather))
}

func TestParserNotSet(t *testing.T) {
fakeServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/endpoint" {
_, _ = w.Write([]byte(simpleJSON))
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer fakeServer.Close()

url := fakeServer.URL + "/endpoint"
plugin := &plugin.HTTP{
URLs: []string{url},
}

var acc testutil.Accumulator
require.Error(t, acc.GatherError(plugin.Gather))
}

const simpleJSON = `
{
"a": 1.2
}
`