Skip to content

Commit

Permalink
Report kibana_settings to X-Pack Monitoring (#7664) (#7943)
Browse files Browse the repository at this point in the history
Resolves #7621.

Depends on elastic/kibana#21100.

X-Pack Monitoring of Kibana requires two types of documents in the `.monitoring-kibana-*` indices: `kibana_stats` and `kibana_settings`. We made Metricbeat's `kibana/stats` metricset index `kibana_stats` documents into `.monitoring-kibana-*` in #7525. This PR makes the same metricset index `kibana_settings` documents into `.monitoring-kibana-*`.

(cherry picked from commit 2af5ab9)
  • Loading branch information
ycombinator authored and ruflin committed Aug 13, 2018
1 parent cf18dd3 commit 9e9434e
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 39 deletions.
32 changes: 32 additions & 0 deletions metricbeat/module/kibana/kibana.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ import (
"github.com/elastic/beats/metricbeat/mb"
)

const (
// StatsAPIAvailableVersion is the version of Kibana since when the stats API is available
StatsAPIAvailableVersion = "6.4.0"

// SettingsAPIAvailableVersion is the version of Kibana since when the settings API is available
SettingsAPIAvailableVersion = "6.5.0"
)

// ReportErrorForMissingField reports and returns an error message for the given
// field being missing in API response received from Kibana
func ReportErrorForMissingField(field string, r mb.ReporterV2) error {
Expand Down Expand Up @@ -64,6 +72,30 @@ func GetVersion(http *helper.HTTP, currentPath string) (string, error) {
return versionStr, nil
}

func isKibanaAPIAvailable(currentKibanaVersion, apiAvailableInKibanaVersion string) (bool, error) {
currentVersion, err := common.NewVersion(currentKibanaVersion)
if err != nil {
return false, err
}

wantVersion, err := common.NewVersion(apiAvailableInKibanaVersion)
if err != nil {
return false, err
}

return !currentVersion.LessThan(wantVersion), nil
}

// IsStatsAPIAvailable returns whether the stats API is available in the given version of Kibana
func IsStatsAPIAvailable(currentKibanaVersion string) (bool, error) {
return isKibanaAPIAvailable(currentKibanaVersion, StatsAPIAvailableVersion)
}

// IsSettingsAPIAvailable returns whether the settings API is available in the given version of Kibana
func IsSettingsAPIAvailable(currentKibanaVersion string) (bool, error) {
return isKibanaAPIAvailable(currentKibanaVersion, SettingsAPIAvailableVersion)
}

func fetchPath(http *helper.HTTP, currentPath, newPath string) ([]byte, error) {
currentURI := http.GetURI()
defer http.SetURI(currentURI) // Reset after this request
Expand Down
18 changes: 18 additions & 0 deletions metricbeat/module/kibana/stats/_meta/test/settings.700.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"cluster_uuid":"u5ii0pnQRka_P0gimfmthg",
"settings":{
"xpack":{
"default_admin_email":"[email protected]"
},
"kibana":{
"uuid":"5b2de169-2785-441b-ae8c-186a1936b17d",
"name":"Janes-MBP-2",
"index":".kibana",
"host":"localhost",
"transport_address":"localhost:5601",
"version":"7.0.0-alpha1",
"snapshot":false,
"status":"green"
}
}
}
75 changes: 52 additions & 23 deletions metricbeat/module/kibana/stats/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

var (
schemaXPackMonitoring = s.Schema{
schemaXPackMonitoringStats = s.Schema{
"concurrent_connections": c.Int("concurrent_connections"),
"os": c.Dict("os", s.Schema{
"load": c.Dict("load", s.Schema{
Expand Down Expand Up @@ -105,61 +105,90 @@ var (
}
)

func eventMappingXPack(r mb.ReporterV2, intervalMs int64, content []byte) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
r.Error(err)
return err
type dataParser func(mb.ReporterV2, common.MapStr, time.Time) (string, string, common.MapStr, error)

func statsDataParser(r mb.ReporterV2, data common.MapStr, now time.Time) (string, string, common.MapStr, error) {
clusterUUID, ok := data["clusterUuid"].(string)
if !ok {
return "", "", nil, elastic.ReportErrorForMissingField("clusterUuid", elastic.Kibana, r)
}

kibanaStatsFields, err := schemaXPackMonitoring.Apply(data)
kibanaStatsFields, err := schemaXPackMonitoringStats.Apply(data)
if err != nil {
r.Error(err)
return err
return "", "", nil, err
}

process, ok := data["process"].(map[string]interface{})
if !ok {
return elastic.ReportErrorForMissingField("process", elastic.Kibana, r)
return "", "", nil, elastic.ReportErrorForMissingField("process", elastic.Kibana, r)
}
memory, ok := process["memory"].(map[string]interface{})
if !ok {
return elastic.ReportErrorForMissingField("process.memory", elastic.Kibana, r)
return "", "", nil, elastic.ReportErrorForMissingField("process.memory", elastic.Kibana, r)
}

rss, ok := memory["resident_set_size_bytes"].(float64)
if !ok {
return elastic.ReportErrorForMissingField("process.memory.resident_set_size_bytes", elastic.Kibana, r)
return "", "", nil, elastic.ReportErrorForMissingField("process.memory.resident_set_size_bytes", elastic.Kibana, r)
}
kibanaStatsFields.Put("process.memory.resident_set_size_in_bytes", int64(rss))

timestamp := time.Now()
kibanaStatsFields.Put("timestamp", timestamp)
kibanaStatsFields.Put("timestamp", now)

// Make usage field passthrough as-is
usage, ok := data["usage"].(map[string]interface{})
if !ok {
return elastic.ReportErrorForMissingField("usage", elastic.Kibana, r)
return "", "", nil, elastic.ReportErrorForMissingField("usage", elastic.Kibana, r)
}
kibanaStatsFields.Put("usage", usage)

clusterUUID, ok := data["clusterUuid"].(string)
return "kibana_stats", clusterUUID, kibanaStatsFields, nil
}

func settingsDataParser(r mb.ReporterV2, data common.MapStr, now time.Time) (string, string, common.MapStr, error) {
clusterUUID, ok := data["cluster_uuid"].(string)
if !ok {
return "", "", nil, elastic.ReportErrorForMissingField("cluster_uuid", elastic.Kibana, r)
}

kibanaSettingsFields, ok := data["settings"]
if !ok {
return elastic.ReportErrorForMissingField("clusterUuid", elastic.Kibana, r)
return "", "", nil, elastic.ReportErrorForMissingField("settings", elastic.Kibana, r)
}

return "kibana_settings", clusterUUID, kibanaSettingsFields.(map[string]interface{}), nil
}

func eventMappingXPack(r mb.ReporterV2, intervalMs int64, now time.Time, content []byte, dataParserFunc dataParser) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
return err
}

t, clusterUUID, fields, err := dataParserFunc(r, data, now)
if err != nil {
return err
}

var event mb.Event
event.RootFields = common.MapStr{
"cluster_uuid": clusterUUID,
"timestamp": timestamp,
"timestamp": now,
"interval_ms": intervalMs,
"type": "kibana_stats",
"kibana_stats": kibanaStatsFields,
"type": t,
t: fields,
}

event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Kibana)
r.Event(event)

r.Event(event)
return nil
}

func eventMappingStatsXPack(r mb.ReporterV2, intervalMs int64, now time.Time, content []byte) error {
return eventMappingXPack(r, intervalMs, now, content, statsDataParser)
}

func eventMappingSettingsXPack(r mb.ReporterV2, intervalMs int64, now time.Time, content []byte) error {
return eventMappingXPack(r, intervalMs, now, content, settingsDataParser)
}
26 changes: 24 additions & 2 deletions metricbeat/module/kibana/stats/data_xpack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ import (
"io/ioutil"
"path/filepath"
"testing"
"time"

mbtest "github.com/elastic/beats/metricbeat/mb/testing"

"github.com/stretchr/testify/assert"
)

func TestEventMappingXPack(t *testing.T) {
func TestEventMappingStatsXPack(t *testing.T) {

files, err := filepath.Glob("./_meta/test/stats-legacy.*.json")
assert.NoError(t, err)
Expand All @@ -39,7 +40,28 @@ func TestEventMappingXPack(t *testing.T) {
assert.NoError(t, err)

reporter := &mbtest.CapturingReporterV2{}
err = eventMappingXPack(reporter, 10000, input)
now := time.Now()

err = eventMappingStatsXPack(reporter, 10000, now, input)
assert.NoError(t, err, f)
assert.True(t, len(reporter.GetEvents()) >= 1, f)
assert.Equal(t, 0, len(reporter.GetErrors()), f)
}
}

func TestEventMappingSettingsXPack(t *testing.T) {

files, err := filepath.Glob("./_meta/test/settings.*.json")
assert.NoError(t, err)

for _, f := range files {
input, err := ioutil.ReadFile(f)
assert.NoError(t, err)

reporter := &mbtest.CapturingReporterV2{}
now := time.Now()

err = eventMappingSettingsXPack(reporter, 10000, now, input)
assert.NoError(t, err, f)
assert.True(t, len(reporter.GetEvents()) >= 1, f)
assert.Equal(t, 0, len(reporter.GetErrors()), f)
Expand Down
79 changes: 65 additions & 14 deletions metricbeat/module/kibana/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package stats

import (
"fmt"
"strings"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
Expand All @@ -37,8 +39,8 @@ func init() {
}

const (
statsPath = "api/stats"
kibanaStatsAPIAvailableVersion = "6.4.0"
statsPath = "api/stats"
settingsPath = "api/settings"
)

var (
Expand All @@ -52,7 +54,8 @@ var (
// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
statsHTTP *helper.HTTP
settingsHTTP *helper.HTTP
xPackEnabled bool
}

Expand All @@ -62,7 +65,7 @@ func isKibanaStatsAPIAvailable(kibanaVersion string) (bool, error) {
return false, err
}

wantVersion, err := common.NewVersion(kibanaStatsAPIAvailableVersion)
wantVersion, err := common.NewVersion(kibana.StatsAPIAvailableVersion)
if err != nil {
return false, err
}
Expand All @@ -79,36 +82,62 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return nil, err
}

http, err := helper.NewHTTP(base)
statsHTTP, err := helper.NewHTTP(base)
if err != nil {
return nil, err
}

kibanaVersion, err := kibana.GetVersion(http, statsPath)
kibanaVersion, err := kibana.GetVersion(statsHTTP, statsPath)
if err != nil {
return nil, err
}

isAPIAvailable, err := isKibanaStatsAPIAvailable(kibanaVersion)
isStatsAPIAvailable, err := kibana.IsStatsAPIAvailable(kibanaVersion)
if err != nil {
return nil, err
}

if !isAPIAvailable {
if !isStatsAPIAvailable {
const errorMsg = "The kibana stats metricset is only supported with Kibana >= %v. You are currently running Kibana %v"
return nil, fmt.Errorf(errorMsg, kibanaStatsAPIAvailableVersion, kibanaVersion)
return nil, fmt.Errorf(errorMsg, kibana.StatsAPIAvailableVersion, kibanaVersion)
}

if config.XPackEnabled {
cfgwarn.Experimental("The experimental xpack.enabled flag in kibana/stats metricset is enabled.")

// Use legacy API response so we can passthru usage as-is
http.SetURI(http.GetURI() + "&legacy=true")
statsHTTP.SetURI(statsHTTP.GetURI() + "&legacy=true")
}

var settingsHTTP *helper.HTTP
if config.XPackEnabled {
cfgwarn.Experimental("The experimental xpack.enabled flag in kibana/stats metricset is enabled.")

isSettingsAPIAvailable, err := kibana.IsSettingsAPIAvailable(kibanaVersion)
if err != nil {
return nil, err
}

if !isSettingsAPIAvailable {
const errorMsg = "The kibana stats metricset with X-Pack enabled is only supported with Kibana >= %v. You are currently running Kibana %v"
return nil, fmt.Errorf(errorMsg, kibana.SettingsAPIAvailableVersion, kibanaVersion)
}

settingsHTTP, err = helper.NewHTTP(base)
if err != nil {
return nil, err
}

// HACK! We need to do this because there might be a basepath involved, so we
// only search/replace the actual API paths
settingsURI := strings.Replace(statsHTTP.GetURI(), statsPath, settingsPath, 1)
settingsHTTP.SetURI(settingsURI)
}

return &MetricSet{
base,
http,
statsHTTP,
settingsHTTP,
config.XPackEnabled,
}, nil
}
Expand All @@ -117,17 +146,39 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// It returns the event which is then forward to the output. In case of an error, a
// descriptive error must be returned.
func (m *MetricSet) Fetch(r mb.ReporterV2) {
content, err := m.http.FetchContent()
now := time.Now()

m.fetchStats(r, now)
if m.xPackEnabled {
m.fetchSettings(r, now)
}
}

func (m *MetricSet) fetchStats(r mb.ReporterV2, now time.Time) {
content, err := m.statsHTTP.FetchContent()
if err != nil {
r.Error(err)
return
}

if m.xPackEnabled {
intervalMs := m.Module().Config().Period.Nanoseconds() / 1000 / 1000
eventMappingXPack(r, intervalMs, content)
intervalMs := m.calculateIntervalMs()
eventMappingStatsXPack(r, intervalMs, now, content)
} else {
eventMapping(r, content)
}
}

func (m *MetricSet) fetchSettings(r mb.ReporterV2, now time.Time) {
content, err := m.settingsHTTP.FetchContent()
if err != nil {
return
}

intervalMs := m.calculateIntervalMs()
eventMappingSettingsXPack(r, intervalMs, now, content)
}

func (m *MetricSet) calculateIntervalMs() int64 {
return m.Module().Config().Period.Nanoseconds() / 1000 / 1000
}

0 comments on commit 9e9434e

Please sign in to comment.