Skip to content

Commit

Permalink
WIP: concurrency based on worker threads; see #2839 [ci skip]
Browse files Browse the repository at this point in the history
  • Loading branch information
boneskull committed Mar 24, 2020
1 parent 0e77e41 commit 5c9e938
Show file tree
Hide file tree
Showing 10 changed files with 574 additions and 96 deletions.
25 changes: 14 additions & 11 deletions .eslintrc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@ rules:
- safe
overrides:
- files:
- scripts/**/*.js
- package-scripts.js
- karma.conf.js
- .wallaby.js
- .eleventy.js
- bin/*
- lib/cli/**/*.js
- test/node-unit/**/*.js
- test/integration/options/watch.spec.js
- test/integration/helpers.js
- lib/growl.js
- 'scripts/**/*.js'
- 'package-scripts.js'
- 'karma.conf.js'
- '.wallaby.js'
- '.eleventy.js'
- 'bin/*'
- 'lib/cli/**/*.js'
- 'test/node-unit/**/*.js'
- 'test/integration/options/watch.spec.js'
- 'test/integration/helpers.js'
- 'lib/growl.js'
- 'lib/buffered-runner.js'
- 'lib/worker.js'
- 'lib/reporters/buffered.js'
parserOptions:
ecmaVersion: 2018
env:
Expand Down
75 changes: 75 additions & 0 deletions lib/buffered-runner.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
'use strict';

const Runner = require('./runner');
const {EVENT_RUN_BEGIN, EVENT_RUN_END} = Runner.constants;
const {spawn, Pool, Worker} = require('threads');
const debug = require('debug')('mocha:buffered-runner');

/**
* This `Runner` delegates tests runs to worker threads. Does not execute any
* {@link Runnable}s by itself!
*/
class BufferedRunner extends Runner {
/**
* Runs Mocha tests by creating a thread pool, then delegating work to the
* worker threads. Each worker receives one file, and as workers become
* available, they take a file from the queue and run it.
* The worker thread execution is treated like an RPC--it returns a `Promise`
* containing serialized information about the run. The information is processed
* as it's received, and emitted to a {@link Reporter}, which is likely listening
* for these events.
*
* @todo handle tests in a specific order, e.g., via `--file`?
* @todo handle delayed runs?
* @todo graceful failure
* @todo audit `BufferedEvent` objects; e.g. do tests need a `parent` prop?
* @todo should we just instantiate a `Test` object from the `BufferedEvent`?
* @param {Function} callback - Called with an exit code corresponding to
* number of test failures.
* @param {Object} options
* @param {string[]} options.files - List of test files
* @param {Options} option.opts - Command-line options
* @returns {Promise<void>}
*/
async run(callback, {files, opts}) {
const pool = Pool(() => spawn(new Worker('./worker.js')), opts.jobs);

let exitCode = 0;

this.emit(EVENT_RUN_BEGIN);

files.forEach(file => {
debug('enqueueing test file %s', file);
pool.queue(async run => {
const [failures, events] = await run(file, opts);
debug(
'completed run of file %s; %d failures / %d events',
file,
failures,
events.length
);
exitCode += failures; // can this be non-numeric?
events.forEach(({name, data}) => {
Object.keys(data).forEach(key => {
if (key.startsWith('__')) {
data[key.slice(2)] = () => data[key];
}
});
// maybe we should just expect `err` separately from the worker.
if (data.err) {
this.emit(name, data, data.err);
} else {
this.emit(name, data);
}
});
});
});

await pool.settled(); // nonzero exit code if rejection?
await pool.terminate();
this.emit(EVENT_RUN_END);
callback(exitCode);
}
}

module.exports = BufferedRunner;
22 changes: 22 additions & 0 deletions lib/cli/run-helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,25 @@ const singleRun = async (mocha, {exit}, fileCollectParams) => {
return mocha.run(exit ? exitMocha : exitMochaLater);
};

/**
* Collect files and run tests (using `BufferedRunner`)
* @param {Mocha} mocha - Mocha instance
* @param {Options} opts - Command line options
* @param {Object} fileCollectParams - Parameters that control test
* file collection. See `lib/cli/collect-files.js`.
* @returns {Promise<BufferedRunner>}
* @private
*/
const parallelRun = async (mocha, opts, fileCollectParams) => {
const files = collectFiles(fileCollectParams);
const {jobs} = opts;
debug(
`executing ${files.length} test file(s) across ${jobs} concurrent jobs`
);

return mocha.run(opts.exit ? exitMocha : exitMochaLater, {files, opts});
};

/**
* Actually run tests
* @param {Mocha} mocha - Mocha instance
Expand All @@ -122,6 +141,7 @@ exports.runMocha = async (mocha, options) => {
exit = false,
ignore = [],
file = [],
parallel = false,
recursive = false,
sort = false,
spec = [],
Expand All @@ -140,6 +160,8 @@ exports.runMocha = async (mocha, options) => {

if (watch) {
watchRun(mocha, {watchFiles, watchIgnore}, fileCollectParams);
} else if (parallel) {
await parallelRun(mocha, options, fileCollectParams);
} else {
await singleRun(mocha, {exit}, fileCollectParams);
}
Expand Down
5 changes: 4 additions & 1 deletion lib/cli/run-option-metadata.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ exports.types = {
'list-interfaces',
'list-reporters',
'no-colors',
'parallel',
'recursive',
'sort',
'watch'
],
number: ['retries'],
number: ['retries', 'jobs'],
string: [
'config',
'fgrep',
Expand Down Expand Up @@ -75,7 +76,9 @@ exports.aliases = {
growl: ['G'],
ignore: ['exclude'],
invert: ['i'],
jobs: ['j'],
'no-colors': ['C'],
parallel: ['p'],
reporter: ['R'],
'reporter-option': ['reporter-options', 'O'],
require: ['r'],
Expand Down
13 changes: 13 additions & 0 deletions lib/cli/run.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const {ONE_AND_DONES, ONE_AND_DONE_ARGS} = require('./one-and-dones');
const debug = require('debug')('mocha:cli:run');
const defaults = require('../mocharc');
const {types, aliases} = require('./run-option-metadata');
const coreCount = require('os').cpus().length;

/**
* Logical option groups
Expand Down Expand Up @@ -150,6 +151,14 @@ exports.builder = yargs =>
description: 'Inverts --grep and --fgrep matches',
group: GROUPS.FILTERS
},
jobs: {
description: 'Number of concurrent jobs',
implies: 'parallel',
defaultDescription: `CPU core count (${coreCount})`,
requiresArg: true,
group: GROUPS.RULES,
coerce: value => (typeof value === 'undefined' ? coreCount : value)
},
'list-interfaces': {
conflicts: Array.from(ONE_AND_DONE_ARGS),
description: 'List built-in user interfaces & exit'
Expand All @@ -169,6 +178,10 @@ exports.builder = yargs =>
normalize: true,
requiresArg: true
},
parallel: {
description: 'Run tests in parallel',
group: GROUPS.RULES
},
recursive: {
description: 'Look for tests in subdirectories',
group: GROUPS.FILES
Expand Down
11 changes: 8 additions & 3 deletions lib/mocha.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ exports.Test = require('./test');
* @param {number} [options.slow] - Slow threshold value.
* @param {number|string} [options.timeout] - Timeout threshold value.
* @param {string} [options.ui] - Interface name.
* @param {boolean} [options.parallel] - Run jobs in parallel
*/
function Mocha(options) {
options = utils.assign({}, mocharc, options || {});
Expand Down Expand Up @@ -136,6 +137,10 @@ function Mocha(options) {
this[opt]();
}
}, this);

this._runner = options.parallel
? require('./buffered-runner')
: exports.Runner;
}

/**
Expand Down Expand Up @@ -824,14 +829,14 @@ Object.defineProperty(Mocha.prototype, 'version', {
* // exit with non-zero status if there were test failures
* mocha.run(failures => process.exitCode = failures ? 1 : 0);
*/
Mocha.prototype.run = function(fn) {
Mocha.prototype.run = function(fn, runOptions) {
if (this.files.length && !this.loadAsync) {
this.loadFiles();
}
var suite = this.suite;
var options = this.options;
options.files = this.files;
var runner = new exports.Runner(suite, options.delay);
var runner = new this._runner(suite, options.delay);
createStatsCollector(runner);
var reporter = new this._reporter(runner, options);
runner.checkLeaks = options.checkLeaks === true;
Expand Down Expand Up @@ -864,5 +869,5 @@ Mocha.prototype.run = function(fn) {
}
}

return runner.run(done);
return runner.run(done, runOptions);
};
114 changes: 114 additions & 0 deletions lib/reporters/buffered.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
'use strict';
/**
* @module Buffered
*/
/**
* Module dependencies.
*/

const {
EVENT_SUITE_BEGIN,
EVENT_SUITE_END,
EVENT_TEST_FAIL,
EVENT_TEST_PASS,
EVENT_TEST_PENDING
} = require('../runner').constants;

/**
* Creates a {@link BufferedEvent} from a {@link Suite}.
* @param {string} evt - Event name
* @param {Suite} suite - Suite object
* @returns {BufferedEvent}
*/
const serializeSuite = (evt, suite) => ({
name: evt,
data: {root: suite.root, title: suite.title}
});

/**
* Creates a {@link BufferedEvent} from a {@link Test}.
* @param {string} evt - Event name
* @param {Test} test - Test object
* @param {any} err - Error, if applicable
*/
const serializeTest = (evt, test, [err]) => {
const obj = {
title: test.title,
duration: test.duration,
err: test.err,
__fullTitle: test.fullTitle(),
__slow: test.slow(),
__titlePath: test.titlePath()
};
if (err) {
obj.err =
test.err && err instanceof Error
? {
multiple: [...(test.err.multiple || []), err]
}
: err;
}
return {
name: evt,
data: obj
};
};

/**
* The `Buffered` reporter is for use by parallel runs. Instead of outputting
* to `STDOUT`, etc., it retains a list of events it receives and hands these
* off to the callback passed into {@link Mocha#run}. That callback will then
* return the data to the main process.
*/
class Buffered {
/**
* Listens for {@link Runner} events and retains them in an `events` instance prop.
* @param {Runner} runner
*/
constructor(runner) {
/**
* Retained list of events emitted from the {@link Runner} instance.
* @type {BufferedEvent[]}
*/
const events = (this.events = []);

runner
.on(EVENT_SUITE_BEGIN, suite => {
events.push(serializeSuite(EVENT_SUITE_BEGIN, suite));
})
.on(EVENT_SUITE_END, suite => {
events.push(serializeSuite(EVENT_SUITE_END, suite));
})
.on(EVENT_TEST_PENDING, test => {
events.push(serializeTest(EVENT_TEST_PENDING, test));
})
.on(EVENT_TEST_FAIL, (test, err) => {
events.push(serializeTest(EVENT_TEST_FAIL, test, err));
})
.on(EVENT_TEST_PASS, test => {
events.push(serializeTest(EVENT_TEST_PASS, test));
});
}

/**
* Calls the {@link Mocha#run} callback (`callback`) with the test failure
* count and the array of {@link BufferedEvent} objects. Resets the array.
* @param {number} failures - Number of failed tests
* @param {Function} callback - The callback passed to {@link Mocha#run}.
*/
done(failures, callback) {
callback(failures, [...this.events]);
this.events = [];
}
}

/**
* Serializable event data from a `Runner`. Keys of the `data` property
* beginning with `__` will be converted into a function which returns the value
* upon deserialization.
* @typedef {Object} BufferedEvent
* @property {string} name - Event name
* @property {object} data - Event parameters
*/

module.exports = Buffered;
Loading

0 comments on commit 5c9e938

Please sign in to comment.