From 9e9434ef793a9130512b6afa630a619b4c7275e4 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 13 Aug 2018 01:56:01 -0700 Subject: [PATCH] Report kibana_settings to X-Pack Monitoring (#7664) (#7943) Resolves #7621. Depends on https://github.com/elastic/kibana/pull/21100. X-Pack Monitoring of Kibana requires two types of documents in the `.monitoring-kibana-*` indices: `kibana_stats` and `kibana_settings`. We made Metricbeat's `kibana/stats` metricset index `kibana_stats` documents into `.monitoring-kibana-*` in #7525. This PR makes the same metricset index `kibana_settings` documents into `.monitoring-kibana-*`. (cherry picked from commit 2af5ab9fc4825df68d399f57e6b16c9288a591c7) --- metricbeat/module/kibana/kibana.go | 32 ++++++++ .../kibana/stats/_meta/test/settings.700.json | 18 +++++ metricbeat/module/kibana/stats/data_xpack.go | 75 ++++++++++++------ .../module/kibana/stats/data_xpack_test.go | 26 +++++- metricbeat/module/kibana/stats/stats.go | 79 +++++++++++++++---- 5 files changed, 191 insertions(+), 39 deletions(-) create mode 100644 metricbeat/module/kibana/stats/_meta/test/settings.700.json diff --git a/metricbeat/module/kibana/kibana.go b/metricbeat/module/kibana/kibana.go index 2e643831e76d..34075259e3e6 100644 --- a/metricbeat/module/kibana/kibana.go +++ b/metricbeat/module/kibana/kibana.go @@ -29,6 +29,14 @@ import ( "github.com/elastic/beats/metricbeat/mb" ) +const ( + // StatsAPIAvailableVersion is the version of Kibana since when the stats API is available + StatsAPIAvailableVersion = "6.4.0" + + // SettingsAPIAvailableVersion is the version of Kibana since when the settings API is available + SettingsAPIAvailableVersion = "6.5.0" +) + // ReportErrorForMissingField reports and returns an error message for the given // field being missing in API response received from Kibana func ReportErrorForMissingField(field string, r mb.ReporterV2) error { @@ -64,6 +72,30 @@ func GetVersion(http *helper.HTTP, currentPath string) (string, error) { return versionStr, nil } +func isKibanaAPIAvailable(currentKibanaVersion, apiAvailableInKibanaVersion string) (bool, error) { + currentVersion, err := common.NewVersion(currentKibanaVersion) + if err != nil { + return false, err + } + + wantVersion, err := common.NewVersion(apiAvailableInKibanaVersion) + if err != nil { + return false, err + } + + return !currentVersion.LessThan(wantVersion), nil +} + +// IsStatsAPIAvailable returns whether the stats API is available in the given version of Kibana +func IsStatsAPIAvailable(currentKibanaVersion string) (bool, error) { + return isKibanaAPIAvailable(currentKibanaVersion, StatsAPIAvailableVersion) +} + +// IsSettingsAPIAvailable returns whether the settings API is available in the given version of Kibana +func IsSettingsAPIAvailable(currentKibanaVersion string) (bool, error) { + return isKibanaAPIAvailable(currentKibanaVersion, SettingsAPIAvailableVersion) +} + func fetchPath(http *helper.HTTP, currentPath, newPath string) ([]byte, error) { currentURI := http.GetURI() defer http.SetURI(currentURI) // Reset after this request diff --git a/metricbeat/module/kibana/stats/_meta/test/settings.700.json b/metricbeat/module/kibana/stats/_meta/test/settings.700.json new file mode 100644 index 000000000000..845ae9f6a155 --- /dev/null +++ b/metricbeat/module/kibana/stats/_meta/test/settings.700.json @@ -0,0 +1,18 @@ +{ + "cluster_uuid":"u5ii0pnQRka_P0gimfmthg", + "settings":{ + "xpack":{ + "default_admin_email":"jane@doe.com" + }, + "kibana":{ + "uuid":"5b2de169-2785-441b-ae8c-186a1936b17d", + "name":"Janes-MBP-2", + "index":".kibana", + "host":"localhost", + "transport_address":"localhost:5601", + "version":"7.0.0-alpha1", + "snapshot":false, + "status":"green" + } + } + } diff --git a/metricbeat/module/kibana/stats/data_xpack.go b/metricbeat/module/kibana/stats/data_xpack.go index d08dcaab5cfe..75594069c52b 100644 --- a/metricbeat/module/kibana/stats/data_xpack.go +++ b/metricbeat/module/kibana/stats/data_xpack.go @@ -29,7 +29,7 @@ import ( ) var ( - schemaXPackMonitoring = s.Schema{ + schemaXPackMonitoringStats = s.Schema{ "concurrent_connections": c.Int("concurrent_connections"), "os": c.Dict("os", s.Schema{ "load": c.Dict("load", s.Schema{ @@ -105,61 +105,90 @@ var ( } ) -func eventMappingXPack(r mb.ReporterV2, intervalMs int64, content []byte) error { - var data map[string]interface{} - err := json.Unmarshal(content, &data) - if err != nil { - r.Error(err) - return err +type dataParser func(mb.ReporterV2, common.MapStr, time.Time) (string, string, common.MapStr, error) + +func statsDataParser(r mb.ReporterV2, data common.MapStr, now time.Time) (string, string, common.MapStr, error) { + clusterUUID, ok := data["clusterUuid"].(string) + if !ok { + return "", "", nil, elastic.ReportErrorForMissingField("clusterUuid", elastic.Kibana, r) } - kibanaStatsFields, err := schemaXPackMonitoring.Apply(data) + kibanaStatsFields, err := schemaXPackMonitoringStats.Apply(data) if err != nil { - r.Error(err) - return err + return "", "", nil, err } process, ok := data["process"].(map[string]interface{}) if !ok { - return elastic.ReportErrorForMissingField("process", elastic.Kibana, r) + return "", "", nil, elastic.ReportErrorForMissingField("process", elastic.Kibana, r) } memory, ok := process["memory"].(map[string]interface{}) if !ok { - return elastic.ReportErrorForMissingField("process.memory", elastic.Kibana, r) + return "", "", nil, elastic.ReportErrorForMissingField("process.memory", elastic.Kibana, r) } - rss, ok := memory["resident_set_size_bytes"].(float64) if !ok { - return elastic.ReportErrorForMissingField("process.memory.resident_set_size_bytes", elastic.Kibana, r) + return "", "", nil, elastic.ReportErrorForMissingField("process.memory.resident_set_size_bytes", elastic.Kibana, r) } kibanaStatsFields.Put("process.memory.resident_set_size_in_bytes", int64(rss)) - timestamp := time.Now() - kibanaStatsFields.Put("timestamp", timestamp) + kibanaStatsFields.Put("timestamp", now) // Make usage field passthrough as-is usage, ok := data["usage"].(map[string]interface{}) if !ok { - return elastic.ReportErrorForMissingField("usage", elastic.Kibana, r) + return "", "", nil, elastic.ReportErrorForMissingField("usage", elastic.Kibana, r) } kibanaStatsFields.Put("usage", usage) - clusterUUID, ok := data["clusterUuid"].(string) + return "kibana_stats", clusterUUID, kibanaStatsFields, nil +} + +func settingsDataParser(r mb.ReporterV2, data common.MapStr, now time.Time) (string, string, common.MapStr, error) { + clusterUUID, ok := data["cluster_uuid"].(string) + if !ok { + return "", "", nil, elastic.ReportErrorForMissingField("cluster_uuid", elastic.Kibana, r) + } + + kibanaSettingsFields, ok := data["settings"] if !ok { - return elastic.ReportErrorForMissingField("clusterUuid", elastic.Kibana, r) + return "", "", nil, elastic.ReportErrorForMissingField("settings", elastic.Kibana, r) + } + + return "kibana_settings", clusterUUID, kibanaSettingsFields.(map[string]interface{}), nil +} + +func eventMappingXPack(r mb.ReporterV2, intervalMs int64, now time.Time, content []byte, dataParserFunc dataParser) error { + var data map[string]interface{} + err := json.Unmarshal(content, &data) + if err != nil { + return err + } + + t, clusterUUID, fields, err := dataParserFunc(r, data, now) + if err != nil { + return err } var event mb.Event event.RootFields = common.MapStr{ "cluster_uuid": clusterUUID, - "timestamp": timestamp, + "timestamp": now, "interval_ms": intervalMs, - "type": "kibana_stats", - "kibana_stats": kibanaStatsFields, + "type": t, + t: fields, } event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Kibana) - r.Event(event) + r.Event(event) return nil } + +func eventMappingStatsXPack(r mb.ReporterV2, intervalMs int64, now time.Time, content []byte) error { + return eventMappingXPack(r, intervalMs, now, content, statsDataParser) +} + +func eventMappingSettingsXPack(r mb.ReporterV2, intervalMs int64, now time.Time, content []byte) error { + return eventMappingXPack(r, intervalMs, now, content, settingsDataParser) +} diff --git a/metricbeat/module/kibana/stats/data_xpack_test.go b/metricbeat/module/kibana/stats/data_xpack_test.go index 0148c712fd04..9d38e0374417 100644 --- a/metricbeat/module/kibana/stats/data_xpack_test.go +++ b/metricbeat/module/kibana/stats/data_xpack_test.go @@ -23,13 +23,14 @@ import ( "io/ioutil" "path/filepath" "testing" + "time" mbtest "github.com/elastic/beats/metricbeat/mb/testing" "github.com/stretchr/testify/assert" ) -func TestEventMappingXPack(t *testing.T) { +func TestEventMappingStatsXPack(t *testing.T) { files, err := filepath.Glob("./_meta/test/stats-legacy.*.json") assert.NoError(t, err) @@ -39,7 +40,28 @@ func TestEventMappingXPack(t *testing.T) { assert.NoError(t, err) reporter := &mbtest.CapturingReporterV2{} - err = eventMappingXPack(reporter, 10000, input) + now := time.Now() + + err = eventMappingStatsXPack(reporter, 10000, now, input) + assert.NoError(t, err, f) + assert.True(t, len(reporter.GetEvents()) >= 1, f) + assert.Equal(t, 0, len(reporter.GetErrors()), f) + } +} + +func TestEventMappingSettingsXPack(t *testing.T) { + + files, err := filepath.Glob("./_meta/test/settings.*.json") + assert.NoError(t, err) + + for _, f := range files { + input, err := ioutil.ReadFile(f) + assert.NoError(t, err) + + reporter := &mbtest.CapturingReporterV2{} + now := time.Now() + + err = eventMappingSettingsXPack(reporter, 10000, now, input) assert.NoError(t, err, f) assert.True(t, len(reporter.GetEvents()) >= 1, f) assert.Equal(t, 0, len(reporter.GetErrors()), f) diff --git a/metricbeat/module/kibana/stats/stats.go b/metricbeat/module/kibana/stats/stats.go index 7c7d4c491084..16c4891c023a 100644 --- a/metricbeat/module/kibana/stats/stats.go +++ b/metricbeat/module/kibana/stats/stats.go @@ -19,6 +19,8 @@ package stats import ( "fmt" + "strings" + "time" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" @@ -37,8 +39,8 @@ func init() { } const ( - statsPath = "api/stats" - kibanaStatsAPIAvailableVersion = "6.4.0" + statsPath = "api/stats" + settingsPath = "api/settings" ) var ( @@ -52,7 +54,8 @@ var ( // MetricSet type defines all fields of the MetricSet type MetricSet struct { mb.BaseMetricSet - http *helper.HTTP + statsHTTP *helper.HTTP + settingsHTTP *helper.HTTP xPackEnabled bool } @@ -62,7 +65,7 @@ func isKibanaStatsAPIAvailable(kibanaVersion string) (bool, error) { return false, err } - wantVersion, err := common.NewVersion(kibanaStatsAPIAvailableVersion) + wantVersion, err := common.NewVersion(kibana.StatsAPIAvailableVersion) if err != nil { return false, err } @@ -79,36 +82,62 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return nil, err } - http, err := helper.NewHTTP(base) + statsHTTP, err := helper.NewHTTP(base) if err != nil { return nil, err } - kibanaVersion, err := kibana.GetVersion(http, statsPath) + kibanaVersion, err := kibana.GetVersion(statsHTTP, statsPath) if err != nil { return nil, err } - isAPIAvailable, err := isKibanaStatsAPIAvailable(kibanaVersion) + isStatsAPIAvailable, err := kibana.IsStatsAPIAvailable(kibanaVersion) if err != nil { return nil, err } - if !isAPIAvailable { + if !isStatsAPIAvailable { const errorMsg = "The kibana stats metricset is only supported with Kibana >= %v. You are currently running Kibana %v" - return nil, fmt.Errorf(errorMsg, kibanaStatsAPIAvailableVersion, kibanaVersion) + return nil, fmt.Errorf(errorMsg, kibana.StatsAPIAvailableVersion, kibanaVersion) } if config.XPackEnabled { cfgwarn.Experimental("The experimental xpack.enabled flag in kibana/stats metricset is enabled.") // Use legacy API response so we can passthru usage as-is - http.SetURI(http.GetURI() + "&legacy=true") + statsHTTP.SetURI(statsHTTP.GetURI() + "&legacy=true") + } + + var settingsHTTP *helper.HTTP + if config.XPackEnabled { + cfgwarn.Experimental("The experimental xpack.enabled flag in kibana/stats metricset is enabled.") + + isSettingsAPIAvailable, err := kibana.IsSettingsAPIAvailable(kibanaVersion) + if err != nil { + return nil, err + } + + if !isSettingsAPIAvailable { + const errorMsg = "The kibana stats metricset with X-Pack enabled is only supported with Kibana >= %v. You are currently running Kibana %v" + return nil, fmt.Errorf(errorMsg, kibana.SettingsAPIAvailableVersion, kibanaVersion) + } + + settingsHTTP, err = helper.NewHTTP(base) + if err != nil { + return nil, err + } + + // HACK! We need to do this because there might be a basepath involved, so we + // only search/replace the actual API paths + settingsURI := strings.Replace(statsHTTP.GetURI(), statsPath, settingsPath, 1) + settingsHTTP.SetURI(settingsURI) } return &MetricSet{ base, - http, + statsHTTP, + settingsHTTP, config.XPackEnabled, }, nil } @@ -117,17 +146,39 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. func (m *MetricSet) Fetch(r mb.ReporterV2) { - content, err := m.http.FetchContent() + now := time.Now() + + m.fetchStats(r, now) + if m.xPackEnabled { + m.fetchSettings(r, now) + } +} + +func (m *MetricSet) fetchStats(r mb.ReporterV2, now time.Time) { + content, err := m.statsHTTP.FetchContent() if err != nil { r.Error(err) return } if m.xPackEnabled { - intervalMs := m.Module().Config().Period.Nanoseconds() / 1000 / 1000 - eventMappingXPack(r, intervalMs, content) + intervalMs := m.calculateIntervalMs() + eventMappingStatsXPack(r, intervalMs, now, content) } else { eventMapping(r, content) } +} + +func (m *MetricSet) fetchSettings(r mb.ReporterV2, now time.Time) { + content, err := m.settingsHTTP.FetchContent() + if err != nil { + return + } + + intervalMs := m.calculateIntervalMs() + eventMappingSettingsXPack(r, intervalMs, now, content) +} +func (m *MetricSet) calculateIntervalMs() int64 { + return m.Module().Config().Period.Nanoseconds() / 1000 / 1000 }