Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#8295 Initial Yandex.Cloud monitoring #8296

Merged
merged 4 commits into from
Nov 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -446,3 +446,4 @@ For documentation on the latest development code see the [documentation index][d
* [warp10](./plugins/outputs/warp10)
* [wavefront](./plugins/outputs/wavefront)
* [sumologic](./plugins/outputs/sumologic)
* [yandex_cloud_monitoring](./plugins/outputs/yandex_cloud_monitoring)
1 change: 1 addition & 0 deletions plugins/outputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/timestream"
_ "github.com/influxdata/telegraf/plugins/outputs/warp10"
_ "github.com/influxdata/telegraf/plugins/outputs/wavefront"
_ "github.com/influxdata/telegraf/plugins/outputs/yandex_cloud_monitoring"
)
26 changes: 26 additions & 0 deletions plugins/outputs/yandex_cloud_monitoring/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Yandex Cloud Monitoring

This plugin will send custom metrics to Yandex Cloud Monitoring.
https://cloud.yandex.com/services/monitoring

### Configuration:

```toml
[[outputs.yandex_cloud_monitoring]]
## Timeout for HTTP writes.
# timeout = "20s"

## Yandex.Cloud monitoring API endpoint. Normally should not be changed
# endpoint_url = "https://monitoring.api.cloud.yandex.net/monitoring/v2/data/write"

## All user metrics should be sent with "custom" service specified. Normally should not be changed
# service = "custom"
```
peter-volkov marked this conversation as resolved.
Show resolved Hide resolved

### Authentication

This plugin currently support only YC.Compute metadata based authentication.

When plugin is working inside a YC.Compute instance it will take IAM token and Folder ID from instance metadata.

Other authentication methods will be added later.
259 changes: 259 additions & 0 deletions plugins/outputs/yandex_cloud_monitoring/yandex_cloud_monitoring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
package yandex_cloud_monitoring

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/selfstat"
)

// YandexCloudMonitoring allows publishing of metrics to the Yandex Cloud Monitoring custom metrics
// service
type YandexCloudMonitoring struct {
Timeout internal.Duration `toml:"timeout"`
EndpointUrl string `toml:"endpoint_url"`
Service string `toml:"service"`

Log telegraf.Logger

MetadataTokenURL string
MetadataFolderURL string
FolderID string
IAMToken string
IamTokenExpirationTime time.Time

client *http.Client

timeFunc func() time.Time

MetricOutsideWindow selfstat.Stat
}

type yandexCloudMonitoringMessage struct {
TS string `json:"ts,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
Metrics []yandexCloudMonitoringMetric `json:"metrics"`
}

type yandexCloudMonitoringMetric struct {
Name string `json:"name"`
Labels map[string]string `json:"labels"`
MetricType string `json:"type,omitempty"` // DGAUGE|IGAUGE|COUNTER|RATE. Default: DGAUGE
TS string `json:"ts,omitempty"`
Value float64 `json:"value"`
}

type MetadataIamToken struct {
AccessToken string `json:"access_token"`
ExpiresIn int64 `json:"expires_in"`
TokenType string `json:"token_type"`
}

const (
defaultRequestTimeout = time.Second * 20
defaultEndpointUrl = "https://monitoring.api.cloud.yandex.net/monitoring/v2/data/write"
defaultMetadataTokenUrl = "http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token"
defaultMetadataFolderUrl = "http://169.254.169.254/computeMetadata/v1/instance/attributes/folder-id"
)

var sampleConfig = `
## Timeout for HTTP writes.
# timeout = "20s"

## Yandex.Cloud monitoring API endpoint. Normally should not be changed
# endpoint_url = "https://monitoring.api.cloud.yandex.net/monitoring/v2/data/write"

## All user metrics should be sent with "custom" service specified. Normally should not be changed
# service = "custom"
`

// Description provides a description of the plugin
func (a *YandexCloudMonitoring) Description() string {
return "Send aggregated metrics to Yandex.Cloud Monitoring"
}

// SampleConfig provides a sample configuration for the plugin
func (a *YandexCloudMonitoring) SampleConfig() string {
return sampleConfig
}

// Connect initializes the plugin and validates connectivity
func (a *YandexCloudMonitoring) Connect() error {
if a.Timeout.Duration <= 0 {
a.Timeout.Duration = defaultRequestTimeout
}
if a.EndpointUrl == "" {
a.EndpointUrl = defaultEndpointUrl
}
if a.Service == "" {
a.Service = "custom"
}
if a.MetadataTokenURL == "" {
a.MetadataTokenURL = defaultMetadataTokenUrl
}
if a.MetadataFolderURL == "" {
a.MetadataFolderURL = defaultMetadataFolderUrl
}

a.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
},
Timeout: a.Timeout.Duration,
}

var err error
a.FolderID, err = a.getFolderIDFromMetadata()
if err != nil {
return err
}

a.Log.Infof("Writing to Yandex.Cloud Monitoring URL: %s", a.EndpointUrl)
ssoroka marked this conversation as resolved.
Show resolved Hide resolved

tags := map[string]string{}
a.MetricOutsideWindow = selfstat.Register("yandex_cloud_monitoring", "metric_outside_window", tags)

return nil
}

// Close shuts down an any active connections
func (a *YandexCloudMonitoring) Close() error {
a.client = nil
return nil
}

// Write writes metrics to the remote endpoint
func (a *YandexCloudMonitoring) Write(metrics []telegraf.Metric) error {
var yandexCloudMonitoringMetrics []yandexCloudMonitoringMetric
for _, m := range metrics {
for _, field := range m.FieldList() {
yandexCloudMonitoringMetrics = append(
yandexCloudMonitoringMetrics,
yandexCloudMonitoringMetric{
Name: field.Key,
Labels: m.Tags(),
TS: fmt.Sprint(m.Time().Format(time.RFC3339)),
Value: field.Value.(float64),
},
)
}
}

var body []byte
jsonBytes, err := json.Marshal(
yandexCloudMonitoringMessage{
Metrics: yandexCloudMonitoringMetrics,
},
)

if err != nil {
return err
}
body = append(body, jsonBytes...)
body = append(jsonBytes, '\n')
return a.send(body)
}

func getResponseFromMetadata(c *http.Client, metadataUrl string) ([]byte, error) {
req, err := http.NewRequest("GET", metadataUrl, nil)
if err != nil {
return nil, fmt.Errorf("error creating request: %v", err)
}
req.Header.Set("Metadata-Flavor", "Google")
resp, err := c.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
return nil, fmt.Errorf("unable to fetch instance metadata: [%s] %d",
metadataUrl, resp.StatusCode)
}
return body, nil
}

func (a *YandexCloudMonitoring) getFolderIDFromMetadata() (string, error) {
a.Log.Infof("getting folder ID in %s", a.MetadataFolderURL)
body, err := getResponseFromMetadata(a.client, a.MetadataFolderURL)
if err != nil {
return "", err
}
folderID := string(body)
if folderID == "" {
return "", fmt.Errorf("unable to fetch folder id from URL %s: %v", a.MetadataFolderURL, err)
}
return folderID, nil
}

func (a *YandexCloudMonitoring) getIAMTokenFromMetadata() (string, int, error) {
a.Log.Debugf("getting new IAM token in %s", a.MetadataTokenURL)
body, err := getResponseFromMetadata(a.client, a.MetadataTokenURL)
if err != nil {
return "", 0, err
}
var metadata MetadataIamToken
if err := json.Unmarshal(body, &metadata); err != nil {
return "", 0, err
}
if metadata.AccessToken == "" || metadata.ExpiresIn == 0 {
return "", 0, fmt.Errorf("unable to fetch authentication credentials %s: %v", a.MetadataTokenURL, err)
}
return metadata.AccessToken, int(metadata.ExpiresIn), nil
}

func (a *YandexCloudMonitoring) send(body []byte) error {
req, err := http.NewRequest("POST", a.EndpointUrl, bytes.NewBuffer(body))
if err != nil {
return err
}
q := req.URL.Query()
q.Add("folderId", a.FolderID)
q.Add("service", a.Service)
req.URL.RawQuery = q.Encode()

req.Header.Set("Content-Type", "application/json")
isTokenExpired := !a.IamTokenExpirationTime.After(time.Now())
if a.IAMToken == "" || isTokenExpired {
token, expiresIn, err := a.getIAMTokenFromMetadata()
if err != nil {
return err
}
a.IamTokenExpirationTime = time.Now().Add(time.Duration(expiresIn) * time.Second)
a.IAMToken = token
}
req.Header.Set("Authorization", "Bearer "+a.IAMToken)

a.Log.Debugf("sending metrics to %s", req.URL.String())
resp, err := a.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

_, err = ioutil.ReadAll(resp.Body)
if err != nil || resp.StatusCode < 200 || resp.StatusCode > 299 {
return fmt.Errorf("failed to write batch: [%v] %s", resp.StatusCode, resp.Status)
}

return nil
}

func init() {
outputs.Add("yandex_cloud_monitoring", func() telegraf.Output {
return &YandexCloudMonitoring{
timeFunc: time.Now,
}
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package yandex_cloud_monitoring

import (
"encoding/json"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
)

func TestWrite(t *testing.T) {
readBody := func(r *http.Request) (yandexCloudMonitoringMessage, error) {
decoder := json.NewDecoder(r.Body)
var message yandexCloudMonitoringMessage
err := decoder.Decode(&message)
require.NoError(t, err)
return message, nil
}

testMetadataHttpServer := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasSuffix(r.URL.Path, "/token") {
token := MetadataIamToken{
AccessToken: "token1",
ExpiresIn: 123,
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
err := json.NewEncoder(w).Encode(token)
require.NoError(t, err)
} else if strings.HasSuffix(r.URL.Path, "/folder") {
_, err := io.WriteString(w, "folder1")
require.NoError(t, err)
}
w.WriteHeader(http.StatusOK)
}),
)
defer testMetadataHttpServer.Close()
metadataTokenUrl := "http://" + testMetadataHttpServer.Listener.Addr().String() + "/token"
metadataFolderUrl := "http://" + testMetadataHttpServer.Listener.Addr().String() + "/folder"

ts := httptest.NewServer(http.NotFoundHandler())
defer ts.Close()
url := "http://" + ts.Listener.Addr().String() + "/metrics"

tests := []struct {
name string
plugin *YandexCloudMonitoring
metrics []telegraf.Metric
handler func(t *testing.T, w http.ResponseWriter, r *http.Request)
}{
{
name: "metric is converted to json value",
plugin: &YandexCloudMonitoring{},
metrics: []telegraf.Metric{
testutil.MustMetric(
"cluster",
map[string]string{},
map[string]interface{}{
"cpu": 42.0,
},
time.Unix(0, 0),
),
},
handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
message, err := readBody(r)
require.NoError(t, err)
require.Len(t, message.Metrics, 1)
require.Equal(t, "cpu", message.Metrics[0].Name)
require.Equal(t, 42.0, message.Metrics[0].Value)
w.WriteHeader(http.StatusOK)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tt.handler(t, w, r)
})
tt.plugin.Log = testutil.Logger{}
tt.plugin.EndpointUrl = url
tt.plugin.MetadataTokenURL = metadataTokenUrl
tt.plugin.MetadataFolderURL = metadataFolderUrl
err := tt.plugin.Connect()
require.NoError(t, err)

err = tt.plugin.Write(tt.metrics)

require.NoError(t, err)
})
}
}