Skip to content

Commit

Permalink
feat(inputs.influxdb): Add metrics for build, crypto and commandline (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored Jun 4, 2024
1 parent 3d85f53 commit ceba179
Show file tree
Hide file tree
Showing 5 changed files with 5,078 additions and 145 deletions.
271 changes: 128 additions & 143 deletions plugins/inputs/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"io"
"net/http"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -87,48 +88,6 @@ func (i *InfluxDB) Gather(acc telegraf.Accumulator) error {
return nil
}

type point struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Values map[string]interface{} `json:"values"`
}

type memstats struct {
Alloc int64 `json:"Alloc"`
TotalAlloc int64 `json:"TotalAlloc"`
Sys int64 `json:"Sys"`
Lookups int64 `json:"Lookups"`
Mallocs int64 `json:"Mallocs"`
Frees int64 `json:"Frees"`
HeapAlloc int64 `json:"HeapAlloc"`
HeapSys int64 `json:"HeapSys"`
HeapIdle int64 `json:"HeapIdle"`
HeapInuse int64 `json:"HeapInuse"`
HeapReleased int64 `json:"HeapReleased"`
HeapObjects int64 `json:"HeapObjects"`
StackInuse int64 `json:"StackInuse"`
StackSys int64 `json:"StackSys"`
MSpanInuse int64 `json:"MSpanInuse"`
MSpanSys int64 `json:"MSpanSys"`
MCacheInuse int64 `json:"MCacheInuse"`
MCacheSys int64 `json:"MCacheSys"`
BuckHashSys int64 `json:"BuckHashSys"`
GCSys int64 `json:"GCSys"`
OtherSys int64 `json:"OtherSys"`
NextGC int64 `json:"NextGC"`
LastGC int64 `json:"LastGC"`
PauseTotalNs int64 `json:"PauseTotalNs"`
PauseNs [256]int64 `json:"PauseNs"`
NumGC int64 `json:"NumGC"`
GCCPUFraction float64 `json:"GCCPUFraction"`
}

type system struct {
CurrentTime string `json:"currentTime"`
Started string `json:"started"`
Uptime uint64 `json:"uptime"`
}

// Gathers data from a particular URL
// Parameters:
//
Expand All @@ -138,13 +97,11 @@ type system struct {
// Returns:
//
// error: Any error that may have occurred
func (i *InfluxDB) gatherURL(
acc telegraf.Accumulator,
url string,
) error {
func (i *InfluxDB) gatherURL(acc telegraf.Accumulator, url string) error {
shardCounter := 0
now := time.Now()

// Get the data
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return err
Expand Down Expand Up @@ -181,119 +138,147 @@ func (i *InfluxDB) gatherURL(
}

// Loop through rest of object
for {
// Nothing left in this object, we're done
if !dec.More() {
break
}

for dec.More() {
// Read in a string key. We don't do anything with the top-level keys,
// so it's discarded.
key, err := dec.Token()
rawKey, err := dec.Token()
if err != nil {
return err
}

if keyStr, ok := key.(string); ok {
if keyStr == "system" {
var p system
if err := dec.Decode(&p); err != nil {
continue
}

acc.AddFields("influxdb_system",
map[string]interface{}{
"current_time": p.CurrentTime,
"started": p.Started,
"uptime": p.Uptime,
},
map[string]string{
"url": url,
},
)
}

if keyStr == "memstats" {
var m memstats
if err := dec.Decode(&m); err != nil {
continue
}
acc.AddFields("influxdb_memstats",
map[string]interface{}{
"alloc": m.Alloc,
"total_alloc": m.TotalAlloc,
"sys": m.Sys,
"lookups": m.Lookups,
"mallocs": m.Mallocs,
"frees": m.Frees,
"heap_alloc": m.HeapAlloc,
"heap_sys": m.HeapSys,
"heap_idle": m.HeapIdle,
"heap_inuse": m.HeapInuse,
"heap_released": m.HeapReleased,
"heap_objects": m.HeapObjects,
"stack_inuse": m.StackInuse,
"stack_sys": m.StackSys,
"mspan_inuse": m.MSpanInuse,
"mspan_sys": m.MSpanSys,
"mcache_inuse": m.MCacheInuse,
"mcache_sys": m.MCacheSys,
"buck_hash_sys": m.BuckHashSys,
"gc_sys": m.GCSys,
"other_sys": m.OtherSys,
"next_gc": m.NextGC,
"last_gc": m.LastGC,
"pause_total_ns": m.PauseTotalNs,
"pause_ns": m.PauseNs[(m.NumGC+255)%256],
"num_gc": m.NumGC,
"gc_cpu_fraction": m.GCCPUFraction,
},
map[string]string{
"url": url,
})
}
}

// Attempt to parse a whole object into a point.
// It might be a non-object, like a string or array.
// If we fail to decode it into a point, ignore it and move on.
var p point
if err := dec.Decode(&p); err != nil {
// All variables should be keyed
key, ok := rawKey.(string)
if !ok {
continue
}

if p.Tags == nil {
p.Tags = make(map[string]string)
}
// Try to decode known special structs
switch key {
case "system":
var p system
if err := dec.Decode(&p); err != nil {
continue
}

// If the object was a point, but was not fully initialized,
// ignore it and move on.
if p.Name == "" || p.Values == nil || len(p.Values) == 0 {
acc.AddFields("influxdb_system",
map[string]interface{}{
"current_time": p.CurrentTime,
"started": p.Started,
"uptime": p.Uptime,
},
map[string]string{"url": url},
now,
)
continue
}
case "memstats":
var m memstats
if err := dec.Decode(&m); err != nil {
continue
}
acc.AddFields("influxdb_memstats",
map[string]interface{}{
"alloc": m.Alloc,
"total_alloc": m.TotalAlloc,
"sys": m.Sys,
"lookups": m.Lookups,
"mallocs": m.Mallocs,
"frees": m.Frees,
"heap_alloc": m.HeapAlloc,
"heap_sys": m.HeapSys,
"heap_idle": m.HeapIdle,
"heap_inuse": m.HeapInuse,
"heap_released": m.HeapReleased,
"heap_objects": m.HeapObjects,
"stack_inuse": m.StackInuse,
"stack_sys": m.StackSys,
"mspan_inuse": m.MSpanInuse,
"mspan_sys": m.MSpanSys,
"mcache_inuse": m.MCacheInuse,
"mcache_sys": m.MCacheSys,
"buck_hash_sys": m.BuckHashSys,
"gc_sys": m.GCSys,
"other_sys": m.OtherSys,
"next_gc": m.NextGC,
"last_gc": m.LastGC,
"pause_total_ns": m.PauseTotalNs,
"pause_ns": m.PauseNs[(m.NumGC+255)%256],
"num_gc": m.NumGC,
"gc_cpu_fraction": m.GCCPUFraction,
},
map[string]string{"url": url},
now,
)
case "build":
var d build
if err := dec.Decode(&d); err != nil {
continue
}
acc.AddFields("influxdb_build",
map[string]interface{}{
"branch": d.Branch,
"build_time": d.BuildTime,
"commit": d.Commit,
"version": d.Version,
},
map[string]string{"url": url},
now,
)
case "cmdline":
var d []string
if err := dec.Decode(&d); err != nil {
continue
}
acc.AddFields("influxdb_cmdline",
map[string]interface{}{"value": strings.Join(d, " ")},
map[string]string{"url": url},
now,
)
case "crypto":
var d crypto
if err := dec.Decode(&d); err != nil {
continue
}
acc.AddFields("influxdb_crypto",
map[string]interface{}{
"fips": d.FIPS,
"ensure_fips": d.EnsureFIPS,
"implementation": d.Implementation,
"password_hash": d.PasswordHash,
},
map[string]string{"url": url},
now,
)
default:
// Attempt to parse all other entires as an object into a point.
// Ignore all non-object entries (like a string or array) or
// entries not conforming to a "point" structure and move on.
var p point
if err := dec.Decode(&p); err != nil {
continue
}

if p.Name == "shard" {
shardCounter++
}
// If the object was a point, but was not fully initialized,
// ignore it and move on.
if p.Name == "" || p.Values == nil || len(p.Values) == 0 {
continue
}

if p.Name == "shard" {
shardCounter++
}

// Add a tag to indicate the source of the data.
p.Tags["url"] = url
// Add a tag to indicate the source of the data.
if p.Tags == nil {
p.Tags = make(map[string]string)
}
p.Tags["url"] = url

acc.AddFields(
"influxdb_"+p.Name,
p.Values,
p.Tags,
now,
)
acc.AddFields("influxdb_"+p.Name, p.Values, p.Tags, now)
}
}

acc.AddFields("influxdb",
map[string]interface{}{
"n_shards": shardCounter,
},
nil,
now,
)
// Add a metric for the number of shards
acc.AddFields("influxdb", map[string]interface{}{"n_shards": shardCounter}, nil, now)

return nil
}
Expand Down
45 changes: 43 additions & 2 deletions plugins/inputs/influxdb/influxdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"os"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/plugins/inputs/influxdb"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)

Expand Down Expand Up @@ -82,7 +84,7 @@ func TestInfluxDB(t *testing.T) {
var acc testutil.Accumulator
require.NoError(t, acc.GatherError(plugin.Gather))

require.Len(t, acc.Metrics, 35)
require.Len(t, acc.Metrics, 36)

fields := map[string]interface{}{
"heap_inuse": int64(18046976),
Expand Down Expand Up @@ -154,7 +156,7 @@ func TestInfluxDB2(t *testing.T) {
var acc testutil.Accumulator
require.NoError(t, acc.GatherError(plugin.Gather))

require.Len(t, acc.Metrics, 35)
require.Len(t, acc.Metrics, 36)

acc.AssertContainsTaggedFields(t, "influxdb",
map[string]interface{}{
Expand All @@ -172,6 +174,45 @@ func TestInfluxDB2(t *testing.T) {
acc.AssertContainsTaggedFields(t, "influxdb_system", fields, tags)
}

func TestCloud1(t *testing.T) {
// Setup a fake endpoint with the input data
input, err := os.ReadFile("./testdata/cloud1.json")
require.NoError(t, err)

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/endpoint" {
_, err := w.Write(input)
require.NoError(t, err)
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer server.Close()

// Setup the plugin
plugin := &influxdb.InfluxDB{
URLs: []string{server.URL + "/endpoint"},
}

// Gather the data
var acc testutil.Accumulator
require.NoError(t, acc.GatherError(plugin.Gather))

// Read the expected data
parser := &influx.Parser{}
require.NoError(t, parser.Init())

buf, err := os.ReadFile("./testdata/cloud1.influx")
require.NoError(t, err)
expected, err := parser.Parse(buf)
require.NoError(t, err)

// Check the output
opts := []cmp.Option{testutil.IgnoreTags("url"), testutil.IgnoreTime()}
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, opts...)
}

func TestErrorHandling(t *testing.T) {
badServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/endpoint" {
Expand Down
Loading

0 comments on commit ceba179

Please sign in to comment.