Skip to content

Commit

Permalink
Wait until outStream is finished (closes DevExpress#2502)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreyBelym committed Nov 8, 2018
1 parent ce7fa92 commit 0cd1cdd
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 19 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
"indent-string": "^1.2.2",
"is-ci": "^1.0.10",
"is-glob": "^2.0.1",
"is-stream": "^1.1.0",
"lodash": "^4.17.10",
"log-update-async-hook": "^2.0.2",
"make-dir": "^1.3.0",
Expand Down
21 changes: 21 additions & 0 deletions src/reporter/index.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import Promise from 'pinkie';
import { find, sortBy } from 'lodash';
import { writable as isWritableStream } from 'is-stream';
import ReporterPluginHost from './plugin-host';

export default class Reporter {
constructor (plugin, task, outStream) {
this.plugin = new ReporterPluginHost(plugin, outStream);

this.disposed = false;
this.passed = 0;
this.skipped = task.tests.filter(test => test.skip).length;
this.testCount = task.tests.length - this.skipped;
this.reportQueue = Reporter._createReportQueue(task);
this.stopOnFirstFail = task.opts.stopOnFirstFail;
this.outStream = outStream;

this._assignTaskEventHandlers(task);
}
Expand Down Expand Up @@ -130,4 +134,21 @@ export default class Reporter {
this.plugin.reportTaskDone(endTime, this.passed, task.warningLog.messages);
});
}

async dispose () {
if (this.disposed)
return;

this.disposed = true;

if (!isWritableStream(this.outStream))
return;

this.outStream.end();

await new Promise(resolve => {
this.outStream.once('finish', resolve);
this.outStream.once('error', resolve);
});
}
}
44 changes: 31 additions & 13 deletions src/runner/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { resolve as resolvePath } from 'path';
import debug from 'debug';
import Promise from 'pinkie';
import promisifyEvent from 'promisify-event';
import mapReverse from 'map-reverse';
Expand All @@ -14,10 +15,13 @@ import renderForbiddenCharsList from '../errors/render-forbidden-chars-list';
import checkFilePath from '../utils/check-file-path';
import { addRunningTest, removeRunningTest, startHandlingTestErrors, stopHandlingTestErrors } from '../utils/handle-errors';


const DEFAULT_SELECTOR_TIMEOUT = 10000;
const DEFAULT_ASSERTION_TIMEOUT = 3000;
const DEFAULT_PAGE_LOAD_TIMEOUT = 3000;

const DEBUG_LOGGER = debug('testcafe:runner');

export default class Runner extends EventEmitter {
constructor (proxy, browserConnectionGateway, options = {}) {
super();
Expand All @@ -41,18 +45,32 @@ export default class Runner extends EventEmitter {
};
}

static async _disposeTaskAndRelatedAssets (task, browserSet, testedApp) {

static _disposeBrowserSet (browserSet) {
return browserSet.dispose().catch(e => DEBUG_LOGGER(e));
}

static _disposeReporters (reporters) {
return Promise.all(reporters.map(reporter => reporter.dispose().catch(e => DEBUG_LOGGER(e))));
}

static _disposeTestedApp (testedApp) {
return testedApp ? testedApp.kill().catch(e => DEBUG_LOGGER(e)) : Promise.resolve();
}

static async _disposeTaskAndRelatedAssets (task, browserSet, reporters, testedApp) {
task.abort();
task.removeAllListeners();

await Runner._disposeBrowserSetAndTestedApp(browserSet, testedApp);
await Runner._disposeAssets(browserSet, reporters, testedApp);
}

static async _disposeBrowserSetAndTestedApp (browserSet, testedApp) {
await browserSet.dispose();

if (testedApp)
await testedApp.kill();
static _disposeAssets (browserSet, reporters, testedApp) {
return Promise.all([
Runner._disposeBrowserSet(browserSet),
Runner._disposeReporters(reporters),
Runner._disposeTestedApp(testedApp)
]);
}

_createCancelablePromise (taskPromise) {
Expand Down Expand Up @@ -81,7 +99,7 @@ export default class Runner extends EventEmitter {
return failedTestCount;
}

async _getTaskResult (task, browserSet, reporter, testedApp) {
async _getTaskResult (task, browserSet, reporters, testedApp) {
task.on('browser-job-done', job => browserSet.releaseConnection(job.browserConnection));

const promises = [
Expand All @@ -96,21 +114,21 @@ export default class Runner extends EventEmitter {
await Promise.race(promises);
}
catch (err) {
await Runner._disposeTaskAndRelatedAssets(task, browserSet, testedApp);
await Runner._disposeTaskAndRelatedAssets(task, browserSet, reporters, testedApp);

throw err;
}

await Runner._disposeBrowserSetAndTestedApp(browserSet, testedApp);
await Runner._disposeAssets(browserSet, reporters, testedApp);

return this._getFailedTestCount(task, reporter);
return this._getFailedTestCount(task, reporters[0]);
}

_runTask (reporterPlugins, browserSet, tests, testedApp) {
let completed = false;
const task = new Task(tests, browserSet.browserConnectionGroups, this.proxy, this.opts);
const reporters = reporterPlugins.map(reporter => new Reporter(reporter.plugin, task, reporter.outStream));
const completionPromise = this._getTaskResult(task, browserSet, reporters[0], testedApp);
const completionPromise = this._getTaskResult(task, browserSet, reporters, testedApp);

task.once('start', startHandlingTestErrors);

Expand All @@ -131,7 +149,7 @@ export default class Runner extends EventEmitter {

const cancelTask = async () => {
if (!completed)
await Runner._disposeTaskAndRelatedAssets(task, browserSet, testedApp);
await Runner._disposeTaskAndRelatedAssets(task, browserSet, reporters, testedApp);
};

return { completionPromise, cancelTask };
Expand Down
40 changes: 39 additions & 1 deletion test/functional/fixtures/reporter/test.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const expect = require('chai').expect;
const { createTestStream } = require('../../utils/stream');
const { createTestStream, createAsyncTestStream } = require('../../utils/stream');

describe('Reporter', () => {
it('Should support several different reporters for a test run', function () {
Expand Down Expand Up @@ -28,4 +28,42 @@ describe('Reporter', () => {
expect(stream2.data).to.contains('Simple test');
});
});

it('Should wait until reporter stream is finished (GH-2502)', function () {
const stream = createAsyncTestStream();

const runOpts = {
only: ['chrome'],
reporters: [
{
reporter: 'json',
outStream: stream
}
]
};

return runTests('testcafe-fixtures/index-test.js', 'Simple test', runOpts)
.then(() => {
expect(stream.finalCalled).to.be.ok;
});
});

it('Should wait until reporter stream failed to finish (GH-2502)', function () {
const stream = createAsyncTestStream({ shouldFail: true });

const runOpts = {
only: ['chrome'],
reporters: [
{
reporter: 'json',
outStream: stream
}
]
};

return runTests('testcafe-fixtures/index-test.js', 'Simple test', runOpts)
.then(() => {
expect(stream.finalCalled).to.be.ok;
});
});
});
31 changes: 26 additions & 5 deletions test/functional/utils/stream.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,36 @@
const { Writable: WritableStream } = require('stream');


const ASYNC_REPORTER_FINALIZING_TIMEOUT = 2000;

module.exports.createTestStream = () => {
const stream = {
data: '',
write: function (val) {
return {
data: '',

write (val) {
this.data += val;
},
end: function (val) {
end (val) {
this.data += val;
}
};
};

return stream;

module.exports.createAsyncTestStream = ({ shouldFail } = {}) => {
return new WritableStream({
write (chunk, enc, cb) {
cb();
},

final (cb) {
setTimeout(() => {
this.finalCalled = true;

cb(shouldFail ? new Error('Stream failed') : null);
}, ASYNC_REPORTER_FINALIZING_TIMEOUT);
}
});
};

module.exports.createNullStream = () => {
Expand Down

0 comments on commit 0cd1cdd

Please sign in to comment.