Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus Input Plugin: Ability to gather metrics from all Kubernetes pods behind a service #3236

Merged
merged 7 commits into from
Sep 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions plugins/inputs/prometheus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ exposing metrics with Prometheus format

### Configuration:

Example for Kubernetes apiserver
Example for Kubernetes API server
```toml
# Get all metrics from Kube-apiserver
# Get all metrics from Kubernetes API server
[[inputs.prometheus]]
# An array of urls to scrape metrics from.
urls = ["http://my-kube-apiserver:8080/metrics"]
```

Specify a 10 second timeout for slower/over-loaded clients
```toml
# Get all metrics from Kube-apiserver
# Get all metrics from Kubernetes API server
[[inputs.prometheus]]
# An array of urls to scrape metrics from.
urls = ["http://my-kube-apiserver:8080/metrics"]
Expand All @@ -28,7 +28,7 @@ You can use more complex configuration
to filter and some tags

```toml
# Get all metrics from Kube-apiserver
# Get all metrics from Kubernetes API server
[[inputs.prometheus]]
# An array of urls to scrape metrics from.
urls = ["http://my-kube-apiserver:8080/metrics"]
Expand Down Expand Up @@ -61,6 +61,18 @@ to filter and some tags
ssl_key = '/path/to/keyfile'
```

```toml
# Use with [Kubernetes headless services](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services)
[[inputs.prometheus]]
# An array of urls to scrape metrics from.
urls = ["https://my-kube-apiserver:8080/metrics"]

# An array of Kubernetes services to scrape metrics from.
# The IP addresses of all pods behind these services will be resolved and
# then scraped
kubernetes_services = ["https://my-headless-service.my-namespace:8080/metrics"]
```

### Usage for Caddy HTTP server

If you want to monitor Caddy, you need to use Caddy with its Prometheus plugin:
Expand Down
84 changes: 74 additions & 10 deletions plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"errors"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"sync"
"time"

Expand All @@ -16,8 +19,12 @@ import (
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3`

type Prometheus struct {
// An array of urls to scrape metrics from.
Urls []string

// An array of Kubernetes services to scrape metrics from.
KubernetesServices []string

// Bearer Token authorization file path
BearerToken string `toml:"bearer_token"`

Expand All @@ -39,6 +46,9 @@ var sampleConfig = `
## An array of urls to scrape metrics from.
urls = ["http://localhost:9100/metrics"]

## An array of Kubernetes services to scrape metrics from.
kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]

## Use bearer token for authorization
# bearer_token = /path/to/bearer/token

Expand All @@ -63,6 +73,53 @@ func (p *Prometheus) Description() string {

var ErrProtocolError = errors.New("prometheus protocol error")

func (p *Prometheus) AddressToURL(u *url.URL, address string) string {
host := address
if u.Port() != "" {
host = address + ":" + u.Port()
}
reconstructedUrl := url.URL{
Scheme: u.Scheme,
Opaque: u.Opaque,
User: u.User,
Path: u.Path,
RawPath: u.RawPath,
ForceQuery: u.ForceQuery,
RawQuery: u.RawQuery,
Fragment: u.Fragment,
Host: host,
}
return reconstructedUrl.String()
}

type UrlAndAddress struct {
Url string
Address string
}

func (p *Prometheus) GetAllURLs() ([]UrlAndAddress, error) {
allUrls := make([]UrlAndAddress, 0)
for _, url := range p.Urls {
allUrls = append(allUrls, UrlAndAddress{Url: url})
}
for _, service := range p.KubernetesServices {
u, err := url.Parse(service)
if err != nil {
return nil, err
}
resolvedAddresses, err := net.LookupHost(u.Hostname())
if err != nil {
log.Printf("prometheus: Could not resolve %s, skipping it. Error: %s", u.Host, err)
continue
}
for _, resolved := range resolvedAddresses {
serviceUrl := p.AddressToURL(u, resolved)
allUrls = append(allUrls, UrlAndAddress{Url: serviceUrl, Address: resolved})
}
}
return allUrls, nil
}

// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (p *Prometheus) Gather(acc telegraf.Accumulator) error {
Expand All @@ -76,12 +133,16 @@ func (p *Prometheus) Gather(acc telegraf.Accumulator) error {

var wg sync.WaitGroup

for _, serv := range p.Urls {
allUrls, err := p.GetAllURLs()
if err != nil {
return err
}
for _, url := range allUrls {
wg.Add(1)
go func(serv string) {
go func(serviceUrl UrlAndAddress) {
defer wg.Done()
acc.AddError(p.gatherURL(serv, acc))
}(serv)
acc.AddError(p.gatherURL(serviceUrl, acc))
}(url)
}

wg.Wait()
Expand Down Expand Up @@ -116,8 +177,8 @@ func (p *Prometheus) createHttpClient() (*http.Client, error) {
return client, nil
}

func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
var req, err = http.NewRequest("GET", url, nil)
func (p *Prometheus) gatherURL(url UrlAndAddress, acc telegraf.Accumulator) error {
var req, err = http.NewRequest("GET", url.Url, nil)
req.Header.Add("Accept", acceptHeader)
var token []byte
var resp *http.Response
Expand All @@ -132,11 +193,11 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {

resp, err = p.client.Do(req)
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", url, err)
return fmt.Errorf("error making HTTP request to %s: %s", url.Url, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", url, resp.Status)
return fmt.Errorf("%s returned HTTP status %s", url.Url, resp.Status)
}

body, err := ioutil.ReadAll(resp.Body)
Expand All @@ -147,12 +208,15 @@ func (p *Prometheus) gatherURL(url string, acc telegraf.Accumulator) error {
metrics, err := Parse(body, resp.Header)
if err != nil {
return fmt.Errorf("error reading metrics for %s: %s",
url, err)
url.Url, err)
}
// Add (or not) collected metrics
for _, metric := range metrics {
tags := metric.Tags()
tags["url"] = url
tags["url"] = url.Url
if url.Address != "" {
tags["address"] = url.Address
}
acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time())
}

Expand Down
51 changes: 51 additions & 0 deletions plugins/inputs/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"

Expand Down Expand Up @@ -48,5 +49,55 @@ func TestPrometheusGeneratesMetrics(t *testing.T) {
assert.True(t, acc.HasFloatField("go_goroutines", "gauge"))
assert.True(t, acc.HasFloatField("test_metric", "value"))
assert.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0)))
assert.False(t, acc.HasTag("test_metric", "address"))
}

func TestPrometheusGeneratesMetricsWithHostNameTag(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, sampleTextFormat)
}))
defer ts.Close()

p := &Prometheus{
KubernetesServices: []string{ts.URL},
}
u, _ := url.Parse(ts.URL)
tsAddress := u.Hostname()

var acc testutil.Accumulator

err := acc.GatherError(p.Gather)
require.NoError(t, err)

assert.True(t, acc.HasFloatField("go_gc_duration_seconds", "count"))
assert.True(t, acc.HasFloatField("go_goroutines", "gauge"))
assert.True(t, acc.HasFloatField("test_metric", "value"))
assert.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0)))
assert.True(t, acc.TagValue("test_metric", "address") == tsAddress)
}

func TestPrometheusGeneratesMetricsAlthoughFirstDNSFails(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, sampleTextFormat)
}))
defer ts.Close()

p := &Prometheus{
Urls: []string{ts.URL},
KubernetesServices: []string{"http://random.telegraf.local:88/metrics"},
}

var acc testutil.Accumulator

err := acc.GatherError(p.Gather)
require.NoError(t, err)

assert.True(t, acc.HasFloatField("go_gc_duration_seconds", "count"))
assert.True(t, acc.HasFloatField("go_goroutines", "gauge"))
assert.True(t, acc.HasFloatField("test_metric", "value"))
assert.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0)))
}