Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitoring telemetry collection interval #34609

Merged
merged 21 commits into from
Apr 27, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/legacy/server/usage/classes/__tests__/collector_set.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import sinon from 'sinon';
import expect from '@kbn/expect';
import { Collector } from '../collector';
import { CollectorSet } from '../collector_set';
import { UsageCollector } from '../usage_collector';

describe('CollectorSet', () => {
describe('registers a collector set and runs lifecycle events', () => {
Expand Down Expand Up @@ -140,6 +141,26 @@ describe('CollectorSet', () => {
});
});
});

describe('isUsageCollector', () => {
const server = { };
const collectorOptions = { type: 'MY_TEST_COLLECTOR', fetch: () => {} };

it('returns true only for UsageCollector instances', () => {
const collectors = new CollectorSet(server);

const usageCollector = new UsageCollector(server, collectorOptions);
const collector = new Collector(server, collectorOptions);
const randomClass = new (class Random {});
expect(collectors.isUsageCollector(usageCollector)).to.be(true);
expect(collectors.isUsageCollector(collector)).to.be(false);
expect(collectors.isUsageCollector(randomClass)).to.be(false);
expect(collectors.isUsageCollector({})).to.be(false);
expect(collectors.isUsageCollector(null)).to.be(false);
expect(collectors.isUsageCollector('')).to.be(false);
expect(collectors.isUsageCollector()).to.be(false);
});
});
});


5 changes: 5 additions & 0 deletions src/legacy/server/usage/classes/collector_set.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ export class CollectorSet {
return this._collectors.find(c => c.type === type);
}

// isUsageCollector(x: UsageCollector | any): x is UsageCollector {
isUsageCollector(x) {
return x instanceof UsageCollector;
}

/*
* Call a bunch of fetch methods and then do them in bulk
* @param {CollectorSet} collectorSet - a set of collectors to fetch. Default to all registered collectors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ class MockCollectorSet {
this.mockServer = _mockServer;
this.mockCollectors = mockCollectors;
}
isUsageCollector(x) {
return !!x.isUsageCollector;
}
getCollectorByType(type) {
return this.mockCollectors.find(collector => collector.type === type) || this.mockCollectors[0];
}
Expand All @@ -41,6 +44,9 @@ describe('BulkUploader', () => {
server = {
log: sinon.spy(),
plugins: {
xpack_main: {
telemetryCollectionInterval: 3000,
},
elasticsearch: {
createCluster: () => cluster,
getCluster: () => cluster,
Expand Down Expand Up @@ -121,5 +127,67 @@ describe('BulkUploader', () => {
done();
}, CHECK_DELAY);
});

it('does not call UsageCollectors if last reported is within the usageInterval', done => {
const usageCollectorFetch = sinon.stub();
const collectorFetch = sinon.stub().returns({ type: 'type_usage_collector_test', result: { testData: 12345 } });

const collectors = new MockCollectorSet(server, [
{
fetch: usageCollectorFetch,
formatForBulkUpload: result => result,
isUsageCollector: true,
},
{
fetch: collectorFetch,
formatForBulkUpload: result => result,
isUsageCollector: false,
}
]);

const uploader = new BulkUploader(server, {
interval: FETCH_INTERVAL
});
uploader._lastFetchUsageTime = Date.now();

uploader.start(collectors);
setTimeout(() => {
uploader.stop();
expect(collectorFetch.callCount).to.be.greaterThan(0);
expect(usageCollectorFetch.callCount).to.eql(0);
done();
}, CHECK_DELAY);
});

it('calls UsageCollectors if last reported exceeds during a _usageInterval', done => {
const usageCollectorFetch = sinon.stub();
const collectorFetch = sinon.stub().returns({ type: 'type_usage_collector_test', result: { testData: 12345 } });

const collectors = new MockCollectorSet(server, [
{
fetch: usageCollectorFetch,
formatForBulkUpload: result => result,
isUsageCollector: true,
},
{
fetch: collectorFetch,
formatForBulkUpload: result => result,
isUsageCollector: false,
}
]);

const uploader = new BulkUploader(server, {
interval: FETCH_INTERVAL
});
uploader._lastFetchUsageTime = Date.now() - uploader._usageInterval;

uploader.start(collectors);
setTimeout(() => {
uploader.stop();
expect(collectorFetch.callCount).to.be.greaterThan(0);
expect(usageCollectorFetch.callCount).to.be.greaterThan(0);
done();
}, CHECK_DELAY);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import { defaultsDeep, isEmpty, uniq, compact } from 'lodash';
import { callClusterFactory } from '../../../xpack_main';

import {
LOGGING_TAG,
KIBANA_MONITORING_LOGGING_TAG,
Expand Down Expand Up @@ -42,6 +43,9 @@ export class BulkUploader {

this._timer = null;
this._interval = interval;
this._lastFetchUsageTime = null;
this._usageInterval = server.plugins.xpack_main.telemetryCollectionInterval;

this._log = {
debug: message => server.log(['debug', ...LOGGING_TAGS], message),
info: message => server.log(['info', ...LOGGING_TAGS], message),
Expand All @@ -63,18 +67,33 @@ export class BulkUploader {
*/
start(collectorSet) {
this._log.info('Starting monitoring stats collection');
const filterCollectorSet = _collectorSet => {
const filterUsage = this._lastFetchUsageTime && this._lastFetchUsageTime + this._usageInterval > Date.now();
if (!filterUsage) {
this._lastFetchUsageTime = Date.now();
}

// this is internal bulk upload, so filter out API-only collectors
const filterThem = _collectorSet => _collectorSet.getFilteredCollectorSet(c => c.ignoreForInternalUploader !== true);
return _collectorSet.getFilteredCollectorSet(c => {
// this is internal bulk upload, so filter out API-only collectors
if (c.ignoreForInternalUploader) {
return false;
}
// Only collect usage data at the same interval as telemetry would (default to once a day)
if (filterUsage && _collectorSet.isUsageCollector(c)) {
return false;
}
return true;
});
};

if (this._timer) {
clearInterval(this._timer);
} else {
this._fetchAndUpload(filterThem(collectorSet)); // initial fetch
this._fetchAndUpload(filterCollectorSet(collectorSet)); // initial fetch
}

this._timer = setInterval(() => {
this._fetchAndUpload(filterThem(collectorSet));
this._fetchAndUpload(filterCollectorSet(collectorSet));
}, this._interval);
}

Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugins/xpack_main/server/lib/setup_xpack_main.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import { injectXPackInfoSignature } from './inject_xpack_info_signature';
import { XPackInfo } from './xpack_info';
import { REPORT_INTERVAL_MS } from '../../common/constants';

/**
* Setup the X-Pack Main plugin. This is fired every time that the Elasticsearch plugin becomes Green.
Expand All @@ -21,6 +22,7 @@ export function setupXPackMain(server) {
});

server.expose('info', info);
server.expose('telemetryCollectionInterval', REPORT_INTERVAL_MS);
server.expose('createXPackInfo', (options) => new XPackInfo(server, options));
server.ext('onPreResponse', (request, h) => injectXPackInfoSignature(info, request, h));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,10 @@ export function fetchHighLevelStats(server, callCluster, clusterUuids, start, en
start,
end,
type: `${product}_stats`,
filters: [ { terms: { cluster_uuid: clusterUuids } } ]
filters: [
{ terms: { cluster_uuid: clusterUuids } },
{ exists: { field: 'kibana_stats.usage.index' } }
]
}),
collapse: {
// a more ideal field would be the concatenation of the uuid + transport address for duped UUIDs (copied installations)
Expand Down