Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitoring telemetry collection interval #34609

Merged
merged 21 commits into from
Apr 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/legacy/server/usage/classes/__tests__/collector_set.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import sinon from 'sinon';
import expect from '@kbn/expect';
import { Collector } from '../collector';
import { CollectorSet } from '../collector_set';
import { UsageCollector } from '../usage_collector';

describe('CollectorSet', () => {
describe('registers a collector set and runs lifecycle events', () => {
Expand Down Expand Up @@ -140,6 +141,26 @@ describe('CollectorSet', () => {
});
});
});

describe('isUsageCollector', () => {
const server = { };
const collectorOptions = { type: 'MY_TEST_COLLECTOR', fetch: () => {} };

it('returns true only for UsageCollector instances', () => {
const collectors = new CollectorSet(server);

const usageCollector = new UsageCollector(server, collectorOptions);
const collector = new Collector(server, collectorOptions);
const randomClass = new (class Random {});
expect(collectors.isUsageCollector(usageCollector)).to.be(true);
expect(collectors.isUsageCollector(collector)).to.be(false);
expect(collectors.isUsageCollector(randomClass)).to.be(false);
expect(collectors.isUsageCollector({})).to.be(false);
expect(collectors.isUsageCollector(null)).to.be(false);
expect(collectors.isUsageCollector('')).to.be(false);
expect(collectors.isUsageCollector()).to.be(false);
});
});
});


5 changes: 5 additions & 0 deletions src/legacy/server/usage/classes/collector_set.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ export class CollectorSet {
return this._collectors.find(c => c.type === type);
}

// isUsageCollector(x: UsageCollector | any): x is UsageCollector {
isUsageCollector(x) {
return x instanceof UsageCollector;
}

/*
* Call a bunch of fetch methods and then do them in bulk
* @param {CollectorSet} collectorSet - a set of collectors to fetch. Default to all registered collectors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ class MockCollectorSet {
this.mockServer = _mockServer;
this.mockCollectors = mockCollectors;
}
isUsageCollector(x) {
return !!x.isUsageCollector;
}
getCollectorByType(type) {
return this.mockCollectors.find(collector => collector.type === type) || this.mockCollectors[0];
}
Expand All @@ -41,6 +44,9 @@ describe('BulkUploader', () => {
server = {
log: sinon.spy(),
plugins: {
xpack_main: {
telemetryCollectionInterval: 3000,
},
elasticsearch: {
createCluster: () => cluster,
getCluster: () => cluster,
Expand Down Expand Up @@ -121,5 +127,67 @@ describe('BulkUploader', () => {
done();
}, CHECK_DELAY);
});

it('does not call UsageCollectors if last reported is within the usageInterval', done => {
const usageCollectorFetch = sinon.stub();
const collectorFetch = sinon.stub().returns({ type: 'type_usage_collector_test', result: { testData: 12345 } });

const collectors = new MockCollectorSet(server, [
{
fetch: usageCollectorFetch,
formatForBulkUpload: result => result,
isUsageCollector: true,
},
{
fetch: collectorFetch,
formatForBulkUpload: result => result,
isUsageCollector: false,
}
]);

const uploader = new BulkUploader(server, {
interval: FETCH_INTERVAL
});
uploader._lastFetchUsageTime = Date.now();

uploader.start(collectors);
setTimeout(() => {
uploader.stop();
expect(collectorFetch.callCount).to.be.greaterThan(0);
expect(usageCollectorFetch.callCount).to.eql(0);
done();
}, CHECK_DELAY);
});

it('calls UsageCollectors if last reported exceeds during a _usageInterval', done => {
const usageCollectorFetch = sinon.stub();
const collectorFetch = sinon.stub().returns({ type: 'type_usage_collector_test', result: { testData: 12345 } });

const collectors = new MockCollectorSet(server, [
{
fetch: usageCollectorFetch,
formatForBulkUpload: result => result,
isUsageCollector: true,
},
{
fetch: collectorFetch,
formatForBulkUpload: result => result,
isUsageCollector: false,
}
]);

const uploader = new BulkUploader(server, {
interval: FETCH_INTERVAL
});
uploader._lastFetchUsageTime = Date.now() - uploader._usageInterval;

uploader.start(collectors);
setTimeout(() => {
uploader.stop();
expect(collectorFetch.callCount).to.be.greaterThan(0);
expect(usageCollectorFetch.callCount).to.be.greaterThan(0);
done();
}, CHECK_DELAY);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import { defaultsDeep, uniq, compact } from 'lodash';
import { callClusterFactory } from '../../../xpack_main';

import {
LOGGING_TAG,
KIBANA_MONITORING_LOGGING_TAG,
Expand Down Expand Up @@ -42,6 +43,9 @@ export class BulkUploader {

this._timer = null;
this._interval = interval;
this._lastFetchUsageTime = null;
this._usageInterval = server.plugins.xpack_main.telemetryCollectionInterval;

this._log = {
debug: message => server.log(['debug', ...LOGGING_TAGS], message),
info: message => server.log(['info', ...LOGGING_TAGS], message),
Expand All @@ -63,18 +67,34 @@ export class BulkUploader {
*/
start(collectorSet) {
this._log.info('Starting monitoring stats collection');
const filterCollectorSet = _collectorSet => {
const filterUsage = this._lastFetchUsageTime && this._lastFetchUsageTime + this._usageInterval > Date.now();
this._lastFetchWithUsage = !filterUsage;
if (!filterUsage) {
this._lastFetchUsageTime = Date.now();
}

// this is internal bulk upload, so filter out API-only collectors
const filterThem = _collectorSet => _collectorSet.getFilteredCollectorSet(c => c.ignoreForInternalUploader !== true);
return _collectorSet.getFilteredCollectorSet(c => {
// this is internal bulk upload, so filter out API-only collectors
if (c.ignoreForInternalUploader) {
return false;
}
// Only collect usage data at the same interval as telemetry would (default to once a day)
if (filterUsage && _collectorSet.isUsageCollector(c)) {
return false;
}
return true;
});
};

if (this._timer) {
clearInterval(this._timer);
} else {
this._fetchAndUpload(filterThem(collectorSet)); // initial fetch
this._fetchAndUpload(filterCollectorSet(collectorSet)); // initial fetch
}

this._timer = setInterval(() => {
this._fetchAndUpload(filterThem(collectorSet));
this._fetchAndUpload(filterCollectorSet(collectorSet));
}, this._interval);
}

Expand Down Expand Up @@ -172,7 +192,6 @@ export class BulkUploader {
const { type: uploadType, payload: uploadData } = collectorSet.getCollectorByType(type).formatForBulkUpload(result);
return defaultsDeep(accum, { [uploadType]: uploadData });
}, {});

// convert the nested object into a flat array, with each payload prefixed
// with an 'index' instruction, for bulk upload
const flat = Object.keys(typesNested).reduce((accum, type) => {
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugins/xpack_main/server/lib/setup_xpack_main.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import { injectXPackInfoSignature } from './inject_xpack_info_signature';
import { XPackInfo } from './xpack_info';
import { REPORT_INTERVAL_MS } from '../../common/constants';
import { FeatureRegistry } from './feature_registry';

/**
Expand All @@ -22,6 +23,7 @@ export function setupXPackMain(server) {
});

server.expose('info', info);
server.expose('telemetryCollectionInterval', REPORT_INTERVAL_MS);
server.expose('createXPackInfo', (options) => new XPackInfo(server, options));
server.ext('onPreResponse', (request, h) => injectXPackInfoSignature(info, request, h));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,20 +175,34 @@ export function getHighLevelStats(server, callCluster, clusterUuids, start, end,
.then(response => handleHighLevelStatsResponse(response, product));
}

/**
* Fetch the high level stats to report for the {@code product}.
*
* @param {Object} server The server instance
* @param {function} callCluster The callWithRequest or callWithInternalUser handler
* @param {Array} indices The indices to use for the request
* @param {Array} clusterUuids Cluster UUIDs to limit the request against
* @param {Date} start Start time to limit the stats
* @param {Date} end End time to limit the stats
* @param {String} product The product to limit too ('kibana', 'logstash', 'beats')
* @return {Promise} Response for the instances to fetch detailed for the product.
*/
export function fetchHighLevelStats(server, callCluster, clusterUuids, start, end, product) {
export async function fetchHighLevelStats(server, callCluster, clusterUuids, start, end, product) {
const config = server.config();
const isKibanaIndex = product === KIBANA_SYSTEM_ID;
const filters = [
{ terms: { cluster_uuid: clusterUuids } },
];

// we should supply this from a parameter in the future so that this remains generic
if (isKibanaIndex) {
const kibanaFilter = {
bool: {
should: [
{ exists: { field: 'kibana_stats.usage.index' } },
{
bool: {
should: [
{ range: { 'kibana_stats.kibana.version': { lt: '6.7.3' } } },
{ term: { 'kibana_stats.kibana.version': '7.0.0' } },
]
}
}
],
}
};

filters.push(kibanaFilter);
}

const params = {
index: getIndexPatternForStackProduct(product),
size: config.get('xpack.monitoring.max_bucket_size'),
Expand All @@ -210,7 +224,7 @@ export function fetchHighLevelStats(server, callCluster, clusterUuids, start, en
start,
end,
type: `${product}_stats`,
filters: [ { terms: { cluster_uuid: clusterUuids } } ]
filters,
}),
collapse: {
// a more ideal field would be the concatenation of the uuid + transport address for duped UUIDs (copied installations)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
"aliases": {
},
"index": ".monitoring-kibana-6-2017.08.15",
"mappings":{"_doc":{"dynamic":false,"properties":{"cluster_uuid":{"type":"keyword"},"timestamp":{"type":"date","format":"date_time"},"interval_ms":{"type":"long"},"type":{"type":"keyword"},"source_node":{"properties":{"uuid":{"type":"keyword"},"host":{"type":"keyword"},"transport_address":{"type":"keyword"},"ip":{"type":"keyword"},"name":{"type":"keyword"},"timestamp":{"type":"date","format":"date_time"}}},"kibana_stats":{"properties":{"kibana":{"properties":{"uuid":{"type":"keyword"},"name":{"type":"keyword"},"host":{"type":"keyword"},"transport_address":{"type":"keyword"},"version":{"type":"keyword"},"snapshot":{"type":"boolean"},"status":{"type":"keyword"},"statuses":{"properties":{"name":{"type":"keyword"},"state":{"type":"keyword"}}}}},"cloud":{"properties":{"name":{"type":"keyword"},"id":{"type":"keyword"},"vm_type":{"type":"keyword"},"region":{"type":"keyword"},"zone":{"type":"keyword"},"metadata":{"type":"object"}}},"os":{"properties":{"load":{"properties":{"1m":{"type":"half_float"},"5m":{"type":"half_float"},"15m":{"type":"half_float"}}},"memory":{"properties":{"total_in_bytes":{"type":"float"},"free_in_bytes":{"type":"float"},"used_in_bytes":{"type":"float"}}},"uptime_in_millis":{"type":"long"}}},"process":{"properties":{"memory":{"properties":{"heap":{"properties":{"total_in_bytes":{"type":"float"},"used_in_bytes":{"type":"float"},"size_limit":{"type":"float"}}},"resident_set_size_in_bytes":{"type":"float"}}},"event_loop_delay":{"type":"float"},"uptime_in_millis":{"type":"long"}}},"sockets":{"properties":{"http":{"properties":{"total":{"type":"long"}}},"https":{"properties":{"total":{"type":"long"}}}}},"timestamp":{"type":"date"},"requests":{"properties":{"disconnects":{"type":"long"},"total":{"type":"long"},"status_codes":{"type":"object"}}},"response_times":{"properties":{"average":{"type":"float"},"max":{"type":"float"}}},"concurrent_connections":{"type":"long"}}}}}},
"mappings":{"_doc":{"dynamic":false,"properties":{"cluster_uuid":{"type":"keyword"},"timestamp":{"type":"date","format":"date_time"},"interval_ms":{"type":"long"},"type":{"type":"keyword"},"source_node":{"properties":{"uuid":{"type":"keyword"},"host":{"type":"keyword"},"transport_address":{"type":"keyword"},"ip":{"type":"keyword"},"name":{"type":"keyword"},"timestamp":{"type":"date","format":"date_time"}}},"kibana_stats":{"properties":{"usage":{"properties":{"index":{"type":"keyword"}}},"kibana":{"properties":{"uuid":{"type":"keyword"},"name":{"type":"keyword"},"host":{"type":"keyword"},"transport_address":{"type":"keyword"},"version":{"type":"keyword"},"snapshot":{"type":"boolean"},"status":{"type":"keyword"},"statuses":{"properties":{"name":{"type":"keyword"},"state":{"type":"keyword"}}}}},"cloud":{"properties":{"name":{"type":"keyword"},"id":{"type":"keyword"},"vm_type":{"type":"keyword"},"region":{"type":"keyword"},"zone":{"type":"keyword"},"metadata":{"type":"object"}}},"os":{"properties":{"load":{"properties":{"1m":{"type":"half_float"},"5m":{"type":"half_float"},"15m":{"type":"half_float"}}},"memory":{"properties":{"total_in_bytes":{"type":"float"},"free_in_bytes":{"type":"float"},"used_in_bytes":{"type":"float"}}},"uptime_in_millis":{"type":"long"}}},"process":{"properties":{"memory":{"properties":{"heap":{"properties":{"total_in_bytes":{"type":"float"},"used_in_bytes":{"type":"float"},"size_limit":{"type":"float"}}},"resident_set_size_in_bytes":{"type":"float"}}},"event_loop_delay":{"type":"float"},"uptime_in_millis":{"type":"long"}}},"sockets":{"properties":{"http":{"properties":{"total":{"type":"long"}}},"https":{"properties":{"total":{"type":"long"}}}}},"timestamp":{"type":"date"},"requests":{"properties":{"disconnects":{"type":"long"},"total":{"type":"long"},"status_codes":{"type":"object"}}},"response_times":{"properties":{"average":{"type":"float"},"max":{"type":"float"}}},"concurrent_connections":{"type":"long"}}}}}},
"settings": {
"index": {
"auto_expand_replicas": "0-1",
Expand Down