Skip to content

Commit

Permalink
Monitoring telemetry collection interval (#34609)
Browse files Browse the repository at this point in the history
* Only collect usage data once a day from kibana monitoring

* isUsageCollector

* bulk uploader tests

* linting

* condition filter

* checkout yarn.lock from master

* update mappings for functional tests

* debugging for refactoring

* support legacy mappings

* self code review

* isUsageCollector

* fix mock

* live coding session with pickypg

* self code review

* Update x-pack/plugins/xpack_main/server/lib/telemetry/monitoring/get_high_level_stats.js

Co-Authored-By: Bamieh <[email protected]>

* update mappings
  • Loading branch information
Bamieh committed Apr 27, 2019
1 parent e3f1423 commit 772c6b8
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 19 deletions.
21 changes: 21 additions & 0 deletions src/server/usage/classes/__tests__/collector_set.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import sinon from 'sinon';
import expect from 'expect.js';
import { Collector } from '../collector';
import { CollectorSet } from '../collector_set';
import { UsageCollector } from '../usage_collector';

describe('CollectorSet', () => {
describe('registers a collector set and runs lifecycle events', () => {
Expand Down Expand Up @@ -140,6 +141,26 @@ describe('CollectorSet', () => {
});
});
});

describe('isUsageCollector', () => {
const server = { };
const collectorOptions = { type: 'MY_TEST_COLLECTOR', fetch: () => {} };

it('returns true only for UsageCollector instances', () => {
const collectors = new CollectorSet(server);

const usageCollector = new UsageCollector(server, collectorOptions);
const collector = new Collector(server, collectorOptions);
const randomClass = new (class Random {});
expect(collectors.isUsageCollector(usageCollector)).to.be(true);
expect(collectors.isUsageCollector(collector)).to.be(false);
expect(collectors.isUsageCollector(randomClass)).to.be(false);
expect(collectors.isUsageCollector({})).to.be(false);
expect(collectors.isUsageCollector(null)).to.be(false);
expect(collectors.isUsageCollector('')).to.be(false);
expect(collectors.isUsageCollector()).to.be(false);
});
});
});


5 changes: 5 additions & 0 deletions src/server/usage/classes/collector_set.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ export class CollectorSet {
return this._collectors.find(c => c.type === type);
}

// isUsageCollector(x: UsageCollector | any): x is UsageCollector {
isUsageCollector(x) {
return x instanceof UsageCollector;
}

/*
* Call a bunch of fetch methods and then do them in bulk
* @param {CollectorSet} collectorSet - a set of collectors to fetch. Default to all registered collectors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ class MockCollectorSet {
this.mockServer = _mockServer;
this.mockCollectors = mockCollectors;
}
isUsageCollector(x) {
return !!x.isUsageCollector;
}
getCollectorByType(type) {
return this.mockCollectors.find(collector => collector.type === type) || this.mockCollectors[0];
}
Expand All @@ -35,6 +38,9 @@ describe('BulkUploader', () => {
server = {
log: sinon.spy(),
plugins: {
xpack_main: {
telemetryCollectionInterval: 3000,
},
elasticsearch: {
getCluster: () => ({
createClient: () => ({
Expand Down Expand Up @@ -123,5 +129,67 @@ describe('BulkUploader', () => {
done();
}, CHECK_DELAY);
});

it('does not call UsageCollectors if last reported is within the usageInterval', done => {
const usageCollectorFetch = sinon.stub();
const collectorFetch = sinon.stub().returns({ type: 'type_usage_collector_test', result: { testData: 12345 } });

const collectors = new MockCollectorSet(server, [
{
fetch: usageCollectorFetch,
formatForBulkUpload: result => result,
isUsageCollector: true,
},
{
fetch: collectorFetch,
formatForBulkUpload: result => result,
isUsageCollector: false,
}
]);

const uploader = new BulkUploader(server, {
interval: FETCH_INTERVAL
});
uploader._lastFetchUsageTime = Date.now();

uploader.start(collectors);
setTimeout(() => {
uploader.stop();
expect(collectorFetch.callCount).to.be.greaterThan(0);
expect(usageCollectorFetch.callCount).to.eql(0);
done();
}, CHECK_DELAY);
});

it('calls UsageCollectors if last reported exceeds during a _usageInterval', done => {
const usageCollectorFetch = sinon.stub();
const collectorFetch = sinon.stub().returns({ type: 'type_usage_collector_test', result: { testData: 12345 } });

const collectors = new MockCollectorSet(server, [
{
fetch: usageCollectorFetch,
formatForBulkUpload: result => result,
isUsageCollector: true,
},
{
fetch: collectorFetch,
formatForBulkUpload: result => result,
isUsageCollector: false,
}
]);

const uploader = new BulkUploader(server, {
interval: FETCH_INTERVAL
});
uploader._lastFetchUsageTime = Date.now() - uploader._usageInterval;

uploader.start(collectors);
setTimeout(() => {
uploader.stop();
expect(collectorFetch.callCount).to.be.greaterThan(0);
expect(usageCollectorFetch.callCount).to.be.greaterThan(0);
done();
}, CHECK_DELAY);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import { defaultsDeep, isEmpty, uniq, compact } from 'lodash';
import { callClusterFactory } from '../../../xpack_main';

import {
LOGGING_TAG,
KIBANA_MONITORING_LOGGING_TAG,
Expand Down Expand Up @@ -42,6 +43,9 @@ export class BulkUploader {

this._timer = null;
this._interval = interval;
this._lastFetchUsageTime = null;
this._usageInterval = server.plugins.xpack_main.telemetryCollectionInterval;

this._log = {
debug: message => server.log(['debug', ...LOGGING_TAGS], message),
info: message => server.log(['info', ...LOGGING_TAGS], message),
Expand All @@ -63,18 +67,34 @@ export class BulkUploader {
*/
start(collectorSet) {
this._log.info('Starting monitoring stats collection');
const filterCollectorSet = _collectorSet => {
const filterUsage = this._lastFetchUsageTime && this._lastFetchUsageTime + this._usageInterval > Date.now();
this._lastFetchWithUsage = !filterUsage;
if (!filterUsage) {
this._lastFetchUsageTime = Date.now();
}

// this is internal bulk upload, so filter out API-only collectors
const filterThem = _collectorSet => _collectorSet.getFilteredCollectorSet(c => c.ignoreForInternalUploader !== true);
return _collectorSet.getFilteredCollectorSet(c => {
// this is internal bulk upload, so filter out API-only collectors
if (c.ignoreForInternalUploader) {
return false;
}
// Only collect usage data at the same interval as telemetry would (default to once a day)
if (filterUsage && _collectorSet.isUsageCollector(c)) {
return false;
}
return true;
});
};

if (this._timer) {
clearInterval(this._timer);
} else {
this._fetchAndUpload(filterThem(collectorSet)); // initial fetch
this._fetchAndUpload(filterCollectorSet(collectorSet)); // initial fetch
}

this._timer = setInterval(() => {
this._fetchAndUpload(filterThem(collectorSet));
this._fetchAndUpload(filterCollectorSet(collectorSet));
}, this._interval);
}

Expand Down Expand Up @@ -175,7 +195,6 @@ export class BulkUploader {
const { type: uploadType, payload: uploadData } = collectorSet.getCollectorByType(type).formatForBulkUpload(result);
return defaultsDeep(accum, { [uploadType]: uploadData });
}, {});

// convert the nested object into a flat array, with each payload prefixed
// with an 'index' instruction, for bulk upload
const flat = Object.keys(typesNested).reduce((accum, type) => {
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugins/xpack_main/server/lib/setup_xpack_main.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import { injectXPackInfoSignature } from './inject_xpack_info_signature';
import { XPackInfo } from './xpack_info';
import { REPORT_INTERVAL_MS } from '../../common/constants';

/**
* Setup the X-Pack Main plugin. This is fired every time that the Elasticsearch plugin becomes Green.
Expand All @@ -21,6 +22,7 @@ export function setupXPackMain(server) {
});

server.expose('info', info);
server.expose('telemetryCollectionInterval', REPORT_INTERVAL_MS);
server.expose('createXPackInfo', (options) => new XPackInfo(server, options));
server.ext('onPreResponse', (request, h) => injectXPackInfoSignature(info, request, h));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,20 +155,34 @@ export function getHighLevelStats(server, callCluster, clusterUuids, start, end,
.then(response => handleHighLevelStatsResponse(response, product));
}

/**
* Fetch the high level stats to report for the {@code product}.
*
* @param {Object} server The server instance
* @param {function} callCluster The callWithRequest or callWithInternalUser handler
* @param {Array} indices The indices to use for the request
* @param {Array} clusterUuids Cluster UUIDs to limit the request against
* @param {Date} start Start time to limit the stats
* @param {Date} end End time to limit the stats
* @param {String} product The product to limit too ('kibana', 'logstash', 'beats')
* @return {Promise} Response for the instances to fetch detailed for the product.
*/
export function fetchHighLevelStats(server, callCluster, clusterUuids, start, end, product) {
export async function fetchHighLevelStats(server, callCluster, clusterUuids, start, end, product) {
const config = server.config();
const isKibanaIndex = product === KIBANA_SYSTEM_ID;
const filters = [
{ terms: { cluster_uuid: clusterUuids } },
];

// we should supply this from a parameter in the future so that this remains generic
if (isKibanaIndex) {
const kibanaFilter = {
bool: {
should: [
{ exists: { field: 'kibana_stats.usage.index' } },
{
bool: {
should: [
{ range: { 'kibana_stats.kibana.version': { lt: '6.7.3' } } },
{ term: { 'kibana_stats.kibana.version': '7.0.0' } },
]
}
}
],
}
};

filters.push(kibanaFilter);
}

const params = {
index: config.get(`xpack.monitoring.${product}.index_pattern`),
size: config.get('xpack.monitoring.max_bucket_size'),
Expand All @@ -190,7 +204,7 @@ export function fetchHighLevelStats(server, callCluster, clusterUuids, start, en
start,
end,
type: `${product}_stats`,
filters: [ { terms: { cluster_uuid: clusterUuids } } ]
filters,
}),
collapse: {
// a more ideal field would be the concatenation of the uuid + transport address for duped UUIDs (copied installations)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"type": "index",
"value": {
"index": ".monitoring-kibana-6-2017.08.15",
"mappings":{"_doc":{"dynamic":false,"properties":{"cluster_uuid":{"type":"keyword"},"timestamp":{"type":"date","format":"date_time"},"interval_ms":{"type":"long"},"type":{"type":"keyword"},"source_node":{"properties":{"uuid":{"type":"keyword"},"host":{"type":"keyword"},"transport_address":{"type":"keyword"},"ip":{"type":"keyword"},"name":{"type":"keyword"},"timestamp":{"type":"date","format":"date_time"}}},"kibana_stats":{"properties":{"usage":{"properties":{"index":{"type":"keyword"}}},"kibana":{"properties":{"uuid":{"type":"keyword"},"name":{"type":"keyword"},"host":{"type":"keyword"},"transport_address":{"type":"keyword"},"version":{"type":"keyword"},"snapshot":{"type":"boolean"},"status":{"type":"keyword"},"statuses":{"properties":{"name":{"type":"keyword"},"state":{"type":"keyword"}}}}},"cloud":{"properties":{"name":{"type":"keyword"},"id":{"type":"keyword"},"vm_type":{"type":"keyword"},"region":{"type":"keyword"},"zone":{"type":"keyword"},"metadata":{"type":"object"}}},"os":{"properties":{"load":{"properties":{"1m":{"type":"half_float"},"5m":{"type":"half_float"},"15m":{"type":"half_float"}}},"memory":{"properties":{"total_in_bytes":{"type":"float"},"free_in_bytes":{"type":"float"},"used_in_bytes":{"type":"float"}}},"uptime_in_millis":{"type":"long"}}},"process":{"properties":{"memory":{"properties":{"heap":{"properties":{"total_in_bytes":{"type":"float"},"used_in_bytes":{"type":"float"},"size_limit":{"type":"float"}}},"resident_set_size_in_bytes":{"type":"float"}}},"event_loop_delay":{"type":"float"},"uptime_in_millis":{"type":"long"}}},"sockets":{"properties":{"http":{"properties":{"total":{"type":"long"}}},"https":{"properties":{"total":{"type":"long"}}}}},"timestamp":{"type":"date"},"requests":{"properties":{"disconnects":{"type":"long"},"total":{"type":"long"},"status_codes":{"type":"object"}}},"response_times":{"properties":{"average":{"type":"float"},"max":{"type":"float"}}},"concurrent_connections":{"type":"long"}}}}}},
"settings": {
"index": {
"codec": "best_compression",
Expand Down

0 comments on commit 772c6b8

Please sign in to comment.