From 772c6b8f322a6d26034941171f87d3380f048892 Mon Sep 17 00:00:00 2001 From: Ahmad Bamieh Date: Sat, 27 Apr 2019 03:38:05 +0300 Subject: [PATCH] Monitoring telemetry collection interval (#34609) * Only collect usage data once a day from kibana monitoring * isUsageCollector * bulk uploader tests * linting * condition filter * checkout yarn.lock from master * update mappings for functional tests * debugging for refactoring * support legacy mappings * self code review * isUsageCollector * fix mock * live coding session with pickypg * self code review * Update x-pack/plugins/xpack_main/server/lib/telemetry/monitoring/get_high_level_stats.js Co-Authored-By: Bamieh * update mappings --- .../usage/classes/__tests__/collector_set.js | 21 ++++++ src/server/usage/classes/collector_set.js | 5 ++ .../__tests__/bulk_uploader.js | 68 +++++++++++++++++++ .../server/kibana_monitoring/bulk_uploader.js | 29 ++++++-- .../xpack_main/server/lib/setup_xpack_main.js | 2 + .../monitoring/get_high_level_stats.js | 42 ++++++++---- .../monitoring/multicluster/mappings.json | 1 + 7 files changed, 149 insertions(+), 19 deletions(-) diff --git a/src/server/usage/classes/__tests__/collector_set.js b/src/server/usage/classes/__tests__/collector_set.js index 3d1a23b8dc5d6..9c8a48b36256e 100644 --- a/src/server/usage/classes/__tests__/collector_set.js +++ b/src/server/usage/classes/__tests__/collector_set.js @@ -22,6 +22,7 @@ import sinon from 'sinon'; import expect from 'expect.js'; import { Collector } from '../collector'; import { CollectorSet } from '../collector_set'; +import { UsageCollector } from '../usage_collector'; describe('CollectorSet', () => { describe('registers a collector set and runs lifecycle events', () => { @@ -140,6 +141,26 @@ describe('CollectorSet', () => { }); }); }); + + describe('isUsageCollector', () => { + const server = { }; + const collectorOptions = { type: 'MY_TEST_COLLECTOR', fetch: () => {} }; + + it('returns true only for UsageCollector instances', () => { + const collectors = new CollectorSet(server); + + const usageCollector = new UsageCollector(server, collectorOptions); + const collector = new Collector(server, collectorOptions); + const randomClass = new (class Random {}); + expect(collectors.isUsageCollector(usageCollector)).to.be(true); + expect(collectors.isUsageCollector(collector)).to.be(false); + expect(collectors.isUsageCollector(randomClass)).to.be(false); + expect(collectors.isUsageCollector({})).to.be(false); + expect(collectors.isUsageCollector(null)).to.be(false); + expect(collectors.isUsageCollector('')).to.be(false); + expect(collectors.isUsageCollector()).to.be(false); + }); + }); }); diff --git a/src/server/usage/classes/collector_set.js b/src/server/usage/classes/collector_set.js index 8bcb75e39959f..fc8091e327cf6 100644 --- a/src/server/usage/classes/collector_set.js +++ b/src/server/usage/classes/collector_set.js @@ -68,6 +68,11 @@ export class CollectorSet { return this._collectors.find(c => c.type === type); } + // isUsageCollector(x: UsageCollector | any): x is UsageCollector { + isUsageCollector(x) { + return x instanceof UsageCollector; + } + /* * Call a bunch of fetch methods and then do them in bulk * @param {CollectorSet} collectorSet - a set of collectors to fetch. Default to all registered collectors diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js b/x-pack/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js index 2c6c52ed05ca6..93853c0a60c51 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js @@ -17,6 +17,9 @@ class MockCollectorSet { this.mockServer = _mockServer; this.mockCollectors = mockCollectors; } + isUsageCollector(x) { + return !!x.isUsageCollector; + } getCollectorByType(type) { return this.mockCollectors.find(collector => collector.type === type) || this.mockCollectors[0]; } @@ -35,6 +38,9 @@ describe('BulkUploader', () => { server = { log: sinon.spy(), plugins: { + xpack_main: { + telemetryCollectionInterval: 3000, + }, elasticsearch: { getCluster: () => ({ createClient: () => ({ @@ -123,5 +129,67 @@ describe('BulkUploader', () => { done(); }, CHECK_DELAY); }); + + it('does not call UsageCollectors if last reported is within the usageInterval', done => { + const usageCollectorFetch = sinon.stub(); + const collectorFetch = sinon.stub().returns({ type: 'type_usage_collector_test', result: { testData: 12345 } }); + + const collectors = new MockCollectorSet(server, [ + { + fetch: usageCollectorFetch, + formatForBulkUpload: result => result, + isUsageCollector: true, + }, + { + fetch: collectorFetch, + formatForBulkUpload: result => result, + isUsageCollector: false, + } + ]); + + const uploader = new BulkUploader(server, { + interval: FETCH_INTERVAL + }); + uploader._lastFetchUsageTime = Date.now(); + + uploader.start(collectors); + setTimeout(() => { + uploader.stop(); + expect(collectorFetch.callCount).to.be.greaterThan(0); + expect(usageCollectorFetch.callCount).to.eql(0); + done(); + }, CHECK_DELAY); + }); + + it('calls UsageCollectors if last reported exceeds during a _usageInterval', done => { + const usageCollectorFetch = sinon.stub(); + const collectorFetch = sinon.stub().returns({ type: 'type_usage_collector_test', result: { testData: 12345 } }); + + const collectors = new MockCollectorSet(server, [ + { + fetch: usageCollectorFetch, + formatForBulkUpload: result => result, + isUsageCollector: true, + }, + { + fetch: collectorFetch, + formatForBulkUpload: result => result, + isUsageCollector: false, + } + ]); + + const uploader = new BulkUploader(server, { + interval: FETCH_INTERVAL + }); + uploader._lastFetchUsageTime = Date.now() - uploader._usageInterval; + + uploader.start(collectors); + setTimeout(() => { + uploader.stop(); + expect(collectorFetch.callCount).to.be.greaterThan(0); + expect(usageCollectorFetch.callCount).to.be.greaterThan(0); + done(); + }, CHECK_DELAY); + }); }); }); diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js b/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js index b66617c6e5906..f5843903bcb31 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js @@ -6,6 +6,7 @@ import { defaultsDeep, isEmpty, uniq, compact } from 'lodash'; import { callClusterFactory } from '../../../xpack_main'; + import { LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG, @@ -42,6 +43,9 @@ export class BulkUploader { this._timer = null; this._interval = interval; + this._lastFetchUsageTime = null; + this._usageInterval = server.plugins.xpack_main.telemetryCollectionInterval; + this._log = { debug: message => server.log(['debug', ...LOGGING_TAGS], message), info: message => server.log(['info', ...LOGGING_TAGS], message), @@ -63,18 +67,34 @@ export class BulkUploader { */ start(collectorSet) { this._log.info('Starting monitoring stats collection'); + const filterCollectorSet = _collectorSet => { + const filterUsage = this._lastFetchUsageTime && this._lastFetchUsageTime + this._usageInterval > Date.now(); + this._lastFetchWithUsage = !filterUsage; + if (!filterUsage) { + this._lastFetchUsageTime = Date.now(); + } - // this is internal bulk upload, so filter out API-only collectors - const filterThem = _collectorSet => _collectorSet.getFilteredCollectorSet(c => c.ignoreForInternalUploader !== true); + return _collectorSet.getFilteredCollectorSet(c => { + // this is internal bulk upload, so filter out API-only collectors + if (c.ignoreForInternalUploader) { + return false; + } + // Only collect usage data at the same interval as telemetry would (default to once a day) + if (filterUsage && _collectorSet.isUsageCollector(c)) { + return false; + } + return true; + }); + }; if (this._timer) { clearInterval(this._timer); } else { - this._fetchAndUpload(filterThem(collectorSet)); // initial fetch + this._fetchAndUpload(filterCollectorSet(collectorSet)); // initial fetch } this._timer = setInterval(() => { - this._fetchAndUpload(filterThem(collectorSet)); + this._fetchAndUpload(filterCollectorSet(collectorSet)); }, this._interval); } @@ -175,7 +195,6 @@ export class BulkUploader { const { type: uploadType, payload: uploadData } = collectorSet.getCollectorByType(type).formatForBulkUpload(result); return defaultsDeep(accum, { [uploadType]: uploadData }); }, {}); - // convert the nested object into a flat array, with each payload prefixed // with an 'index' instruction, for bulk upload const flat = Object.keys(typesNested).reduce((accum, type) => { diff --git a/x-pack/plugins/xpack_main/server/lib/setup_xpack_main.js b/x-pack/plugins/xpack_main/server/lib/setup_xpack_main.js index 125e481c384a1..b030600db30f9 100644 --- a/x-pack/plugins/xpack_main/server/lib/setup_xpack_main.js +++ b/x-pack/plugins/xpack_main/server/lib/setup_xpack_main.js @@ -6,6 +6,7 @@ import { injectXPackInfoSignature } from './inject_xpack_info_signature'; import { XPackInfo } from './xpack_info'; +import { REPORT_INTERVAL_MS } from '../../common/constants'; /** * Setup the X-Pack Main plugin. This is fired every time that the Elasticsearch plugin becomes Green. @@ -21,6 +22,7 @@ export function setupXPackMain(server) { }); server.expose('info', info); + server.expose('telemetryCollectionInterval', REPORT_INTERVAL_MS); server.expose('createXPackInfo', (options) => new XPackInfo(server, options)); server.ext('onPreResponse', (request, h) => injectXPackInfoSignature(info, request, h)); diff --git a/x-pack/plugins/xpack_main/server/lib/telemetry/monitoring/get_high_level_stats.js b/x-pack/plugins/xpack_main/server/lib/telemetry/monitoring/get_high_level_stats.js index b845fb5115492..4ef70ea993a4b 100644 --- a/x-pack/plugins/xpack_main/server/lib/telemetry/monitoring/get_high_level_stats.js +++ b/x-pack/plugins/xpack_main/server/lib/telemetry/monitoring/get_high_level_stats.js @@ -155,20 +155,34 @@ export function getHighLevelStats(server, callCluster, clusterUuids, start, end, .then(response => handleHighLevelStatsResponse(response, product)); } -/** - * Fetch the high level stats to report for the {@code product}. - * - * @param {Object} server The server instance - * @param {function} callCluster The callWithRequest or callWithInternalUser handler - * @param {Array} indices The indices to use for the request - * @param {Array} clusterUuids Cluster UUIDs to limit the request against - * @param {Date} start Start time to limit the stats - * @param {Date} end End time to limit the stats - * @param {String} product The product to limit too ('kibana', 'logstash', 'beats') - * @return {Promise} Response for the instances to fetch detailed for the product. - */ -export function fetchHighLevelStats(server, callCluster, clusterUuids, start, end, product) { +export async function fetchHighLevelStats(server, callCluster, clusterUuids, start, end, product) { const config = server.config(); + const isKibanaIndex = product === KIBANA_SYSTEM_ID; + const filters = [ + { terms: { cluster_uuid: clusterUuids } }, + ]; + + // we should supply this from a parameter in the future so that this remains generic + if (isKibanaIndex) { + const kibanaFilter = { + bool: { + should: [ + { exists: { field: 'kibana_stats.usage.index' } }, + { + bool: { + should: [ + { range: { 'kibana_stats.kibana.version': { lt: '6.7.3' } } }, + { term: { 'kibana_stats.kibana.version': '7.0.0' } }, + ] + } + } + ], + } + }; + + filters.push(kibanaFilter); + } + const params = { index: config.get(`xpack.monitoring.${product}.index_pattern`), size: config.get('xpack.monitoring.max_bucket_size'), @@ -190,7 +204,7 @@ export function fetchHighLevelStats(server, callCluster, clusterUuids, start, en start, end, type: `${product}_stats`, - filters: [ { terms: { cluster_uuid: clusterUuids } } ] + filters, }), collapse: { // a more ideal field would be the concatenation of the uuid + transport address for duped UUIDs (copied installations) diff --git a/x-pack/test/functional/es_archives/monitoring/multicluster/mappings.json b/x-pack/test/functional/es_archives/monitoring/multicluster/mappings.json index 3945f2049b951..66a45eeecd9c8 100644 --- a/x-pack/test/functional/es_archives/monitoring/multicluster/mappings.json +++ b/x-pack/test/functional/es_archives/monitoring/multicluster/mappings.json @@ -36,6 +36,7 @@ "type": "index", "value": { "index": ".monitoring-kibana-6-2017.08.15", + "mappings":{"_doc":{"dynamic":false,"properties":{"cluster_uuid":{"type":"keyword"},"timestamp":{"type":"date","format":"date_time"},"interval_ms":{"type":"long"},"type":{"type":"keyword"},"source_node":{"properties":{"uuid":{"type":"keyword"},"host":{"type":"keyword"},"transport_address":{"type":"keyword"},"ip":{"type":"keyword"},"name":{"type":"keyword"},"timestamp":{"type":"date","format":"date_time"}}},"kibana_stats":{"properties":{"usage":{"properties":{"index":{"type":"keyword"}}},"kibana":{"properties":{"uuid":{"type":"keyword"},"name":{"type":"keyword"},"host":{"type":"keyword"},"transport_address":{"type":"keyword"},"version":{"type":"keyword"},"snapshot":{"type":"boolean"},"status":{"type":"keyword"},"statuses":{"properties":{"name":{"type":"keyword"},"state":{"type":"keyword"}}}}},"cloud":{"properties":{"name":{"type":"keyword"},"id":{"type":"keyword"},"vm_type":{"type":"keyword"},"region":{"type":"keyword"},"zone":{"type":"keyword"},"metadata":{"type":"object"}}},"os":{"properties":{"load":{"properties":{"1m":{"type":"half_float"},"5m":{"type":"half_float"},"15m":{"type":"half_float"}}},"memory":{"properties":{"total_in_bytes":{"type":"float"},"free_in_bytes":{"type":"float"},"used_in_bytes":{"type":"float"}}},"uptime_in_millis":{"type":"long"}}},"process":{"properties":{"memory":{"properties":{"heap":{"properties":{"total_in_bytes":{"type":"float"},"used_in_bytes":{"type":"float"},"size_limit":{"type":"float"}}},"resident_set_size_in_bytes":{"type":"float"}}},"event_loop_delay":{"type":"float"},"uptime_in_millis":{"type":"long"}}},"sockets":{"properties":{"http":{"properties":{"total":{"type":"long"}}},"https":{"properties":{"total":{"type":"long"}}}}},"timestamp":{"type":"date"},"requests":{"properties":{"disconnects":{"type":"long"},"total":{"type":"long"},"status_codes":{"type":"object"}}},"response_times":{"properties":{"average":{"type":"float"},"max":{"type":"float"}}},"concurrent_connections":{"type":"long"}}}}}}, "settings": { "index": { "codec": "best_compression",