Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.influxdb): Add metrics for build, crypto and commandline #15438

Merged
merged 1 commit into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 128 additions & 143 deletions plugins/inputs/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"io"
"net/http"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -87,48 +88,6 @@ func (i *InfluxDB) Gather(acc telegraf.Accumulator) error {
return nil
}

type point struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Values map[string]interface{} `json:"values"`
}

type memstats struct {
Alloc int64 `json:"Alloc"`
TotalAlloc int64 `json:"TotalAlloc"`
Sys int64 `json:"Sys"`
Lookups int64 `json:"Lookups"`
Mallocs int64 `json:"Mallocs"`
Frees int64 `json:"Frees"`
HeapAlloc int64 `json:"HeapAlloc"`
HeapSys int64 `json:"HeapSys"`
HeapIdle int64 `json:"HeapIdle"`
HeapInuse int64 `json:"HeapInuse"`
HeapReleased int64 `json:"HeapReleased"`
HeapObjects int64 `json:"HeapObjects"`
StackInuse int64 `json:"StackInuse"`
StackSys int64 `json:"StackSys"`
MSpanInuse int64 `json:"MSpanInuse"`
MSpanSys int64 `json:"MSpanSys"`
MCacheInuse int64 `json:"MCacheInuse"`
MCacheSys int64 `json:"MCacheSys"`
BuckHashSys int64 `json:"BuckHashSys"`
GCSys int64 `json:"GCSys"`
OtherSys int64 `json:"OtherSys"`
NextGC int64 `json:"NextGC"`
LastGC int64 `json:"LastGC"`
PauseTotalNs int64 `json:"PauseTotalNs"`
PauseNs [256]int64 `json:"PauseNs"`
NumGC int64 `json:"NumGC"`
GCCPUFraction float64 `json:"GCCPUFraction"`
}

type system struct {
CurrentTime string `json:"currentTime"`
Started string `json:"started"`
Uptime uint64 `json:"uptime"`
}

// Gathers data from a particular URL
// Parameters:
//
Expand All @@ -138,13 +97,11 @@ type system struct {
// Returns:
//
// error: Any error that may have occurred
func (i *InfluxDB) gatherURL(
acc telegraf.Accumulator,
url string,
) error {
func (i *InfluxDB) gatherURL(acc telegraf.Accumulator, url string) error {
shardCounter := 0
now := time.Now()

// Get the data
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return err
Expand Down Expand Up @@ -181,119 +138,147 @@ func (i *InfluxDB) gatherURL(
}

// Loop through rest of object
for {
// Nothing left in this object, we're done
if !dec.More() {
break
}

for dec.More() {
// Read in a string key. We don't do anything with the top-level keys,
// so it's discarded.
key, err := dec.Token()
rawKey, err := dec.Token()
if err != nil {
return err
}

if keyStr, ok := key.(string); ok {
if keyStr == "system" {
var p system
if err := dec.Decode(&p); err != nil {
continue
}

acc.AddFields("influxdb_system",
map[string]interface{}{
"current_time": p.CurrentTime,
"started": p.Started,
"uptime": p.Uptime,
},
map[string]string{
"url": url,
},
)
}

if keyStr == "memstats" {
var m memstats
if err := dec.Decode(&m); err != nil {
continue
}
acc.AddFields("influxdb_memstats",
map[string]interface{}{
"alloc": m.Alloc,
"total_alloc": m.TotalAlloc,
"sys": m.Sys,
"lookups": m.Lookups,
"mallocs": m.Mallocs,
"frees": m.Frees,
"heap_alloc": m.HeapAlloc,
"heap_sys": m.HeapSys,
"heap_idle": m.HeapIdle,
"heap_inuse": m.HeapInuse,
"heap_released": m.HeapReleased,
"heap_objects": m.HeapObjects,
"stack_inuse": m.StackInuse,
"stack_sys": m.StackSys,
"mspan_inuse": m.MSpanInuse,
"mspan_sys": m.MSpanSys,
"mcache_inuse": m.MCacheInuse,
"mcache_sys": m.MCacheSys,
"buck_hash_sys": m.BuckHashSys,
"gc_sys": m.GCSys,
"other_sys": m.OtherSys,
"next_gc": m.NextGC,
"last_gc": m.LastGC,
"pause_total_ns": m.PauseTotalNs,
"pause_ns": m.PauseNs[(m.NumGC+255)%256],
"num_gc": m.NumGC,
"gc_cpu_fraction": m.GCCPUFraction,
},
map[string]string{
"url": url,
})
}
}

// Attempt to parse a whole object into a point.
// It might be a non-object, like a string or array.
// If we fail to decode it into a point, ignore it and move on.
var p point
if err := dec.Decode(&p); err != nil {
// All variables should be keyed
key, ok := rawKey.(string)
if !ok {
continue
}

if p.Tags == nil {
p.Tags = make(map[string]string)
}
// Try to decode known special structs
switch key {
case "system":
var p system
if err := dec.Decode(&p); err != nil {
continue
}

// If the object was a point, but was not fully initialized,
// ignore it and move on.
if p.Name == "" || p.Values == nil || len(p.Values) == 0 {
acc.AddFields("influxdb_system",
map[string]interface{}{
"current_time": p.CurrentTime,
"started": p.Started,
"uptime": p.Uptime,
},
map[string]string{"url": url},
now,
)
continue
}
case "memstats":
var m memstats
if err := dec.Decode(&m); err != nil {
continue
}
acc.AddFields("influxdb_memstats",
map[string]interface{}{
"alloc": m.Alloc,
"total_alloc": m.TotalAlloc,
"sys": m.Sys,
"lookups": m.Lookups,
"mallocs": m.Mallocs,
"frees": m.Frees,
"heap_alloc": m.HeapAlloc,
"heap_sys": m.HeapSys,
"heap_idle": m.HeapIdle,
"heap_inuse": m.HeapInuse,
"heap_released": m.HeapReleased,
"heap_objects": m.HeapObjects,
"stack_inuse": m.StackInuse,
"stack_sys": m.StackSys,
"mspan_inuse": m.MSpanInuse,
"mspan_sys": m.MSpanSys,
"mcache_inuse": m.MCacheInuse,
"mcache_sys": m.MCacheSys,
"buck_hash_sys": m.BuckHashSys,
"gc_sys": m.GCSys,
"other_sys": m.OtherSys,
"next_gc": m.NextGC,
"last_gc": m.LastGC,
"pause_total_ns": m.PauseTotalNs,
"pause_ns": m.PauseNs[(m.NumGC+255)%256],
"num_gc": m.NumGC,
"gc_cpu_fraction": m.GCCPUFraction,
},
map[string]string{"url": url},
now,
)
case "build":
var d build
if err := dec.Decode(&d); err != nil {
continue
}
acc.AddFields("influxdb_build",
map[string]interface{}{
"branch": d.Branch,
"build_time": d.BuildTime,
"commit": d.Commit,
"version": d.Version,
},
map[string]string{"url": url},
now,
)
case "cmdline":
var d []string
if err := dec.Decode(&d); err != nil {
continue
}
acc.AddFields("influxdb_cmdline",
map[string]interface{}{"value": strings.Join(d, " ")},
map[string]string{"url": url},
now,
)
case "crypto":
var d crypto
if err := dec.Decode(&d); err != nil {
continue
}
acc.AddFields("influxdb_crypto",
map[string]interface{}{
"fips": d.FIPS,
"ensure_fips": d.EnsureFIPS,
"implementation": d.Implementation,
"password_hash": d.PasswordHash,
},
map[string]string{"url": url},
now,
)
default:
// Attempt to parse all other entires as an object into a point.
// Ignore all non-object entries (like a string or array) or
// entries not conforming to a "point" structure and move on.
var p point
if err := dec.Decode(&p); err != nil {
continue
}

if p.Name == "shard" {
shardCounter++
}
// If the object was a point, but was not fully initialized,
// ignore it and move on.
if p.Name == "" || p.Values == nil || len(p.Values) == 0 {
continue
}

if p.Name == "shard" {
shardCounter++
}

// Add a tag to indicate the source of the data.
p.Tags["url"] = url
// Add a tag to indicate the source of the data.
if p.Tags == nil {
p.Tags = make(map[string]string)
}
p.Tags["url"] = url

acc.AddFields(
"influxdb_"+p.Name,
p.Values,
p.Tags,
now,
)
acc.AddFields("influxdb_"+p.Name, p.Values, p.Tags, now)
}
}

acc.AddFields("influxdb",
map[string]interface{}{
"n_shards": shardCounter,
},
nil,
now,
)
// Add a metric for the number of shards
acc.AddFields("influxdb", map[string]interface{}{"n_shards": shardCounter}, nil, now)

return nil
}
Expand Down
45 changes: 43 additions & 2 deletions plugins/inputs/influxdb/influxdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"os"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf/plugins/inputs/influxdb"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
)

Expand Down Expand Up @@ -82,7 +84,7 @@ func TestInfluxDB(t *testing.T) {
var acc testutil.Accumulator
require.NoError(t, acc.GatherError(plugin.Gather))

require.Len(t, acc.Metrics, 35)
require.Len(t, acc.Metrics, 36)

fields := map[string]interface{}{
"heap_inuse": int64(18046976),
Expand Down Expand Up @@ -154,7 +156,7 @@ func TestInfluxDB2(t *testing.T) {
var acc testutil.Accumulator
require.NoError(t, acc.GatherError(plugin.Gather))

require.Len(t, acc.Metrics, 35)
require.Len(t, acc.Metrics, 36)

acc.AssertContainsTaggedFields(t, "influxdb",
map[string]interface{}{
Expand All @@ -172,6 +174,45 @@ func TestInfluxDB2(t *testing.T) {
acc.AssertContainsTaggedFields(t, "influxdb_system", fields, tags)
}

func TestCloud1(t *testing.T) {
// Setup a fake endpoint with the input data
input, err := os.ReadFile("./testdata/cloud1.json")
require.NoError(t, err)

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/endpoint" {
_, err := w.Write(input)
require.NoError(t, err)
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
defer server.Close()

// Setup the plugin
plugin := &influxdb.InfluxDB{
URLs: []string{server.URL + "/endpoint"},
}

// Gather the data
var acc testutil.Accumulator
require.NoError(t, acc.GatherError(plugin.Gather))

// Read the expected data
parser := &influx.Parser{}
require.NoError(t, parser.Init())

buf, err := os.ReadFile("./testdata/cloud1.influx")
require.NoError(t, err)
expected, err := parser.Parse(buf)
require.NoError(t, err)

// Check the output
opts := []cmp.Option{testutil.IgnoreTags("url"), testutil.IgnoreTime()}
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, opts...)
}

func TestErrorHandling(t *testing.T) {
badServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/endpoint" {
Expand Down
Loading
Loading