From 6d315f3ba7d4e07378ba68b8a58f9cc1294435af Mon Sep 17 00:00:00 2001 From: martinrusev Date: Fri, 6 Nov 2015 11:27:17 +0200 Subject: [PATCH 1/8] Amon output --- outputs/amon/README.md | 9 ++ outputs/amon/amon.go | 147 +++++++++++++++++++++++++++++ outputs/amon/amon_test.go | 190 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 346 insertions(+) create mode 100644 outputs/amon/README.md create mode 100644 outputs/amon/amon.go create mode 100644 outputs/amon/amon_test.go diff --git a/outputs/amon/README.md b/outputs/amon/README.md new file mode 100644 index 0000000000000..3860e4371a50c --- /dev/null +++ b/outputs/amon/README.md @@ -0,0 +1,9 @@ +# Amon Output Plugin + +This plugin writes to [Amon](https://www.amon.cx) +and requires an `serverkey` and `amoninstance` URL which can be obtained [here](https://www.amon.cx/docs/monitoring/) +for the account. + +If the point value being sent cannot be converted to a float64, the metric is skipped. + +Metrics are grouped by converting any `_` characters to `.` in the Point Name. \ No newline at end of file diff --git a/outputs/amon/amon.go b/outputs/amon/amon.go new file mode 100644 index 0000000000000..e75a635975de5 --- /dev/null +++ b/outputs/amon/amon.go @@ -0,0 +1,147 @@ +package amon + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "net/http" + "strings" + + "github.com/influxdb/influxdb/client/v2" + "github.com/influxdb/telegraf/duration" + "github.com/influxdb/telegraf/outputs" +) + +type Amon struct { + ServerKey string + AmonInstance string + Timeout duration.Duration + + client *http.Client +} + +var sampleConfig = ` + # Amon + serverkey = "my-server-key" # required. + amoninstance = "https://youramoninstance" # required + + # Connection timeout. + # timeout = "5s" +` + +type TimeSeries struct { + Series []*Metric `json:"series"` +} + +type Metric struct { + Metric string `json:"metric"` + Points [1]Point `json:"points"` +} + +type Point [2]float64 + + +func (a *Amon) Connect() error { + if a.ServerKey == "" || a.AmonInstance == "" { + return fmt.Errorf("serverkey and amoninstance are required fields for amon output") + } + a.client = &http.Client{ + Timeout: a.Timeout.Duration, + } + return nil +} + +func (a *Amon) Write(points []*client.Point) error { + if len(points) == 0 { + return nil + } + ts := TimeSeries{} + var tempSeries = make([]*Metric, len(points)) + var acceptablePoints = 0 + for _, pt := range points { + metric := &Metric{ + Metric: strings.Replace(pt.Name(), "_", ".", -1), + } + if p, err := buildPoint(pt); err == nil { + metric.Points[0] = p + tempSeries[acceptablePoints] = metric + acceptablePoints += 1 + } else { + log.Printf("unable to build Metric for %s, skipping\n", pt.Name()) + } + } + ts.Series = make([]*Metric, acceptablePoints) + copy(ts.Series, tempSeries[0:]) + tsBytes, err := json.Marshal(ts) + if err != nil { + return fmt.Errorf("unable to marshal TimeSeries, %s\n", err.Error()) + } + req, err := http.NewRequest("POST", a.authenticatedUrl(), bytes.NewBuffer(tsBytes)) + if err != nil { + return fmt.Errorf("unable to create http.Request, %s\n", err.Error()) + } + req.Header.Add("Content-Type", "application/json") + + resp, err := a.client.Do(req) + if err != nil { + return fmt.Errorf("error POSTing metrics, %s\n", err.Error()) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode > 209 { + return fmt.Errorf("received bad status code, %d\n", resp.StatusCode) + } + + return nil +} + +func (a *Amon) SampleConfig() string { + return sampleConfig +} + +func (a *Amon) Description() string { + return "Configuration for Amon API to send metrics to." +} + +func (a *Amon) authenticatedUrl() string { + + return fmt.Sprintf("%s/api/system/%s", a.AmonInstance, a.ServerKey) +} + +func buildPoint(pt *client.Point) (Point, error) { + var p Point + if err := p.setValue(pt.Fields()["value"]); err != nil { + return p, fmt.Errorf("unable to extract value from Fields, %s", err.Error()) + } + p[0] = float64(pt.Time().Unix()) + return p, nil +} + +func (p *Point) setValue(v interface{}) error { + switch d := v.(type) { + case int: + p[1] = float64(int(d)) + case int32: + p[1] = float64(int32(d)) + case int64: + p[1] = float64(int64(d)) + case float32: + p[1] = float64(d) + case float64: + p[1] = float64(d) + default: + return fmt.Errorf("undeterminable type") + } + return nil +} + +func (a *Amon) Close() error { + return nil +} + +func init() { + outputs.Add("amon", func() outputs.Output { + return &Amon{} + }) +} diff --git a/outputs/amon/amon_test.go b/outputs/amon/amon_test.go new file mode 100644 index 0000000000000..053d4e0e71d44 --- /dev/null +++ b/outputs/amon/amon_test.go @@ -0,0 +1,190 @@ +package amon + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "reflect" + "testing" + "time" + + "github.com/influxdb/telegraf/testutil" + + "github.com/influxdb/influxdb/client/v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + fakeServerKey = "123456" + fakeAmonInstance = "https://demo.amon.cx" +) + + +func TestUriOverride(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(`{"status":"ok"}`) + })) + defer ts.Close() + + a := &Amon{ + ServerKey: fakeServerKey, + AmonInstance: fakeAmonInstance, + } + + err := a.Connect() + require.NoError(t, err) + err = a.Write(testutil.MockBatchPoints().Points()) + require.NoError(t, err) +} + +func TestBadStatusCode(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + json.NewEncoder(w).Encode(`{ 'errors': [ + 'Something bad happened to the server.', + 'Your query made the server very sad.' + ] + }`) + })) + defer ts.Close() + + a := &Amon{ + ServerKey: fakeServerKey, + AmonInstance: fakeAmonInstance, + } + err := a.Connect() + require.NoError(t, err) + err = a.Write(testutil.MockBatchPoints().Points()) + if err == nil { + t.Errorf("error expected but none returned") + } else { + require.EqualError(t, fmt.Errorf("received bad status code, 500\n"), err.Error()) + } +} + +func TestAuthenticatedUrl(t *testing.T) { + a := &Amon{ + ServerKey: fakeServerKey, + AmonInstance: fakeAmonInstance, + } + + authUrl := a.authenticatedUrl() + assert.EqualValues(t, fmt.Sprintf("%s/api/system/%s", fakeAmonInstance, fakeServerKey), authUrl) +} + + +func TestBuildPoint(t *testing.T) { + tags := make(map[string]string) + var tagtests = []struct { + ptIn *client.Point + outPt Point + err error + }{ + { + client.NewPoint( + "test1", + tags, + map[string]interface{}{"value": 0.0}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + Point{ + float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), + 0.0, + }, + nil, + }, + { + client.NewPoint( + "test2", + tags, + map[string]interface{}{"value": 1.0}, + time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC), + ), + Point{ + float64(time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix()), + 1.0, + }, + nil, + }, + { + client.NewPoint( + "test3", + tags, + map[string]interface{}{"value": 10}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + Point{ + float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), + 10.0, + }, + nil, + }, + { + client.NewPoint( + "test4", + tags, + map[string]interface{}{"value": int32(112345)}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + Point{ + float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), + 112345.0, + }, + nil, + }, + { + client.NewPoint( + "test5", + tags, + map[string]interface{}{"value": int64(112345)}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + Point{ + float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), + 112345.0, + }, + nil, + }, + { + client.NewPoint( + "test6", + tags, + map[string]interface{}{"value": float32(11234.5)}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + Point{ + float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), + 11234.5, + }, + nil, + }, + { + client.NewPoint( + "test7", + tags, + map[string]interface{}{"value": "11234.5"}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + Point{ + float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), + 11234.5, + }, + fmt.Errorf("unable to extract value from Fields, undeterminable type"), + }, + } + for _, tt := range tagtests { + pt, err := buildPoint(tt.ptIn) + if err != nil && tt.err == nil { + t.Errorf("%s: unexpected error, %+v\n", tt.ptIn.Name(), err) + } + if tt.err != nil && err == nil { + t.Errorf("%s: expected an error (%s) but none returned", tt.ptIn.Name(), tt.err.Error()) + } + if !reflect.DeepEqual(pt, tt.outPt) && tt.err == nil { + t.Errorf("%s: \nexpected %+v\ngot %+v\n", tt.ptIn.Name(), tt.outPt, pt) + } + } +} From ab86ed2df894a6887092160e17b13d45e01a88ff Mon Sep 17 00:00:00 2001 From: martinrusev Date: Fri, 6 Nov 2015 11:34:48 +0200 Subject: [PATCH 2/8] Register Amon --- outputs/all/all.go | 1 + 1 file changed, 1 insertion(+) diff --git a/outputs/all/all.go b/outputs/all/all.go index c51a24c594838..be9c4cf42d03d 100644 --- a/outputs/all/all.go +++ b/outputs/all/all.go @@ -1,6 +1,7 @@ package all import ( + _ "github.com/influxdb/telegraf/outputs/amon" _ "github.com/influxdb/telegraf/outputs/amqp" _ "github.com/influxdb/telegraf/outputs/datadog" _ "github.com/influxdb/telegraf/outputs/influxdb" From d3ae5adcf02ff9e984893f7c1131731a4ce33668 Mon Sep 17 00:00:00 2001 From: martinrusev Date: Fri, 6 Nov 2015 11:46:59 +0200 Subject: [PATCH 3/8] Update amon output --- outputs/amon/amon.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/outputs/amon/amon.go b/outputs/amon/amon.go index e75a635975de5..0e0bed5bcad8c 100644 --- a/outputs/amon/amon.go +++ b/outputs/amon/amon.go @@ -22,8 +22,10 @@ type Amon struct { } var sampleConfig = ` - # Amon + # Amon Server Key serverkey = "my-server-key" # required. + + # Amon Insance URL amoninstance = "https://youramoninstance" # required # Connection timeout. @@ -101,7 +103,7 @@ func (a *Amon) SampleConfig() string { } func (a *Amon) Description() string { - return "Configuration for Amon API to send metrics to." + return "Configuration for Amon Server to send metrics to." } func (a *Amon) authenticatedUrl() string { From 1c92cfb3b61de7b23042bf40605f89b10b274485 Mon Sep 17 00:00:00 2001 From: martinrusev Date: Fri, 6 Nov 2015 12:06:17 +0200 Subject: [PATCH 4/8] Fix indentation --- outputs/amon/amon.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/outputs/amon/amon.go b/outputs/amon/amon.go index 0e0bed5bcad8c..6620501471881 100644 --- a/outputs/amon/amon.go +++ b/outputs/amon/amon.go @@ -14,9 +14,9 @@ import ( ) type Amon struct { - ServerKey string + ServerKey string AmonInstance string - Timeout duration.Duration + Timeout duration.Duration client *http.Client } @@ -43,7 +43,6 @@ type Metric struct { type Point [2]float64 - func (a *Amon) Connect() error { if a.ServerKey == "" || a.AmonInstance == "" { return fmt.Errorf("serverkey and amoninstance are required fields for amon output") @@ -107,7 +106,7 @@ func (a *Amon) Description() string { } func (a *Amon) authenticatedUrl() string { - + return fmt.Sprintf("%s/api/system/%s", a.AmonInstance, a.ServerKey) } From 9030e8b5f094d9a7e2ef10e90e13416e201f7ca8 Mon Sep 17 00:00:00 2001 From: martinrusev Date: Fri, 6 Nov 2015 12:13:14 +0200 Subject: [PATCH 5/8] Update identation --- outputs/amon/amon_test.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/outputs/amon/amon_test.go b/outputs/amon/amon_test.go index 053d4e0e71d44..0b6e5c7b8cbbb 100644 --- a/outputs/amon/amon_test.go +++ b/outputs/amon/amon_test.go @@ -21,7 +21,6 @@ var ( fakeAmonInstance = "https://demo.amon.cx" ) - func TestUriOverride(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) @@ -30,10 +29,10 @@ func TestUriOverride(t *testing.T) { defer ts.Close() a := &Amon{ - ServerKey: fakeServerKey, + ServerKey: fakeServerKey, AmonInstance: fakeAmonInstance, } - + err := a.Connect() require.NoError(t, err) err = a.Write(testutil.MockBatchPoints().Points()) @@ -52,7 +51,7 @@ func TestBadStatusCode(t *testing.T) { defer ts.Close() a := &Amon{ - ServerKey: fakeServerKey, + ServerKey: fakeServerKey, AmonInstance: fakeAmonInstance, } err := a.Connect() @@ -67,7 +66,7 @@ func TestBadStatusCode(t *testing.T) { func TestAuthenticatedUrl(t *testing.T) { a := &Amon{ - ServerKey: fakeServerKey, + ServerKey: fakeServerKey, AmonInstance: fakeAmonInstance, } @@ -75,7 +74,6 @@ func TestAuthenticatedUrl(t *testing.T) { assert.EqualValues(t, fmt.Sprintf("%s/api/system/%s", fakeAmonInstance, fakeServerKey), authUrl) } - func TestBuildPoint(t *testing.T) { tags := make(map[string]string) var tagtests = []struct { From f3ef4e64f1151ea7a338b287c75f8570b7486d9c Mon Sep 17 00:00:00 2001 From: martinrusev Date: Fri, 6 Nov 2015 12:38:00 +0200 Subject: [PATCH 6/8] Update tests --- outputs/amon/amon_test.go | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/outputs/amon/amon_test.go b/outputs/amon/amon_test.go index 0b6e5c7b8cbbb..7856d4540e156 100644 --- a/outputs/amon/amon_test.go +++ b/outputs/amon/amon_test.go @@ -39,31 +39,6 @@ func TestUriOverride(t *testing.T) { require.NoError(t, err) } -func TestBadStatusCode(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusInternalServerError) - json.NewEncoder(w).Encode(`{ 'errors': [ - 'Something bad happened to the server.', - 'Your query made the server very sad.' - ] - }`) - })) - defer ts.Close() - - a := &Amon{ - ServerKey: fakeServerKey, - AmonInstance: fakeAmonInstance, - } - err := a.Connect() - require.NoError(t, err) - err = a.Write(testutil.MockBatchPoints().Points()) - if err == nil { - t.Errorf("error expected but none returned") - } else { - require.EqualError(t, fmt.Errorf("received bad status code, 500\n"), err.Error()) - } -} - func TestAuthenticatedUrl(t *testing.T) { a := &Amon{ ServerKey: fakeServerKey, From 1f5bef1caf78b5b07567a325bbe8cbeac1fb2de5 Mon Sep 17 00:00:00 2001 From: martinrusev Date: Fri, 6 Nov 2015 12:46:20 +0200 Subject: [PATCH 7/8] Spelling + Readme/Changelog updates --- CHANGELOG.md | 1 + README.md | 1 + outputs/amon/amon.go | 6 +++--- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eee4a9ad0f5b7..c24ecd7618693 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ changed to just run docker commands in the Makefile. See `make docker-run` and - [#325](https://github.com/influxdb/telegraf/pull/325): NSQ output. Thanks @jrxFive! - [#318](https://github.com/influxdb/telegraf/pull/318): Prometheus output. Thanks @oldmantaiter! - [#338](https://github.com/influxdb/telegraf/pull/338): Restart Telegraf on package upgrade. Thanks @linsomniac! +- [#350](https://github.com/influxdb/telegraf/pull/350): Amon output. ### Bugfixes - [#331](https://github.com/influxdb/telegraf/pull/331): Dont overwrite host tag in redis plugin. diff --git a/README.md b/README.md index 4b6bd636fa0a2..8645100696517 100644 --- a/README.md +++ b/README.md @@ -225,6 +225,7 @@ found by running `telegraf -sample-config`. * mqtt * librato * prometheus +* amon ## Contributing diff --git a/outputs/amon/amon.go b/outputs/amon/amon.go index 6620501471881..bbe2146123447 100644 --- a/outputs/amon/amon.go +++ b/outputs/amon/amon.go @@ -25,8 +25,8 @@ var sampleConfig = ` # Amon Server Key serverkey = "my-server-key" # required. - # Amon Insance URL - amoninstance = "https://youramoninstance" # required + # Amon Instance URL + amon_instance = "https://youramoninstance" # required # Connection timeout. # timeout = "5s" @@ -45,7 +45,7 @@ type Point [2]float64 func (a *Amon) Connect() error { if a.ServerKey == "" || a.AmonInstance == "" { - return fmt.Errorf("serverkey and amoninstance are required fields for amon output") + return fmt.Errorf("serverkey and amon_instance are required fields for amon output") } a.client = &http.Client{ Timeout: a.Timeout.Duration, From 9cf7a542c151939a94f23800b41fc1c4af6aec09 Mon Sep 17 00:00:00 2001 From: martinrusev Date: Fri, 6 Nov 2015 12:53:44 +0200 Subject: [PATCH 8/8] server_key typo --- outputs/amon/amon.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/outputs/amon/amon.go b/outputs/amon/amon.go index bbe2146123447..08275f52da18b 100644 --- a/outputs/amon/amon.go +++ b/outputs/amon/amon.go @@ -23,7 +23,7 @@ type Amon struct { var sampleConfig = ` # Amon Server Key - serverkey = "my-server-key" # required. + server_key = "my-server-key" # required. # Amon Instance URL amon_instance = "https://youramoninstance" # required