diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b46bacd725b..66bce957b9ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,6 +76,8 @@ ### Fixes +* `[jest-worker]` Stick calls to workers before processing them + ([#6073](https://github.com/facebook/jest/pull/6073)) * `[babel-plugin-jest-hoist]` Allow using `console` global variable ([#6074](https://github.com/facebook/jest/pull/6074)) * `[jest-jasmine2]` Always remove node core message from assert stack traces diff --git a/packages/jest-worker/src/__tests__/index-integration.test.js b/packages/jest-worker/src/__tests__/index-integration.test.js new file mode 100644 index 000000000000..6d3b122c1908 --- /dev/null +++ b/packages/jest-worker/src/__tests__/index-integration.test.js @@ -0,0 +1,151 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +'use strict'; + +import EventEmitter from 'events'; + +import {CHILD_MESSAGE_CALL, PARENT_MESSAGE_OK} from '../types'; + +let Farm; +let mockForkedProcesses; + +function mockBuildForkedProcess() { + const mockChild = new EventEmitter(); + + mockChild.send = jest.fn(); + + return mockChild; +} + +function replySuccess(i, result) { + mockForkedProcesses[i].emit('message', [PARENT_MESSAGE_OK, result]); +} + +function assertCallsToChild(childNum, ...calls) { + expect(mockForkedProcesses[childNum].send).toHaveBeenCalledTimes( + calls.length + 1, + ); + + calls.forEach(([methodName, ...args], numCall) => { + expect( + mockForkedProcesses[childNum].send.mock.calls[numCall + 1][0], + ).toEqual([CHILD_MESSAGE_CALL, true, methodName, args]); + }); +} + +beforeEach(() => { + mockForkedProcesses = []; + + jest.mock('child_process', () => ({ + fork() { + const forkedProcess = mockBuildForkedProcess(); + + mockForkedProcesses.push(forkedProcess); + + return forkedProcess; + }, + })); + + Farm = require('../index').default; +}); + +afterEach(() => { + jest.resetModules(); +}); + +it('calls a single method from the worker', async () => { + const farm = new Farm('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + numWorkers: 4, + }); + + const promise = farm.foo(); + + replySuccess(0, 42); + + expect(await promise).toBe(42); +}); + +it('distributes sequential calls across child processes', async () => { + const farm = new Farm('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + numWorkers: 4, + }); + + // The first call will go to the first child process. + const promise0 = farm.foo('param-0'); + + assertCallsToChild(0, ['foo', 'param-0']); + replySuccess(0, 'worker-0'); + expect(await promise0).toBe('worker-0'); + + // The second call will go to the second child process. + const promise1 = farm.foo(1); + + assertCallsToChild(1, ['foo', 1]); + replySuccess(1, 'worker-1'); + expect(await promise1).toBe('worker-1'); +}); + +it('distributes concurrent calls across child processes', async () => { + const farm = new Farm('/tmp/baz.js', { + exposedMethods: ['foo', 'bar'], + numWorkers: 4, + }); + + // Do 3 calls to the farm in parallel. + const promise0 = farm.foo('param-0'); + const promise1 = farm.foo('param-1'); + const promise2 = farm.foo('param-2'); + + // Check that the method calls are sent to each separate child process. + assertCallsToChild(0, ['foo', 'param-0']); + assertCallsToChild(1, ['foo', 'param-1']); + assertCallsToChild(2, ['foo', 'param-2']); + + // Send different responses from each child. + replySuccess(0, 'worker-0'); + replySuccess(1, 'worker-1'); + replySuccess(2, 'worker-2'); + + // Check + expect(await promise0).toBe('worker-0'); + expect(await promise1).toBe('worker-1'); + expect(await promise2).toBe('worker-2'); +}); + +it('sticks parallel calls to children', async () => { + const farm = new Farm('/tmp/baz.js', { + computeWorkerKey: () => '1234567890abcdef', + exposedMethods: ['foo', 'bar'], + numWorkers: 4, + }); + + // Do 3 calls to the farm in parallel. + const promise0 = farm.foo('param-0'); + const promise1 = farm.foo('param-1'); + const promise2 = farm.foo('param-2'); + + // Send different responses for each call (from the same child). + replySuccess(0, 'worker-0'); + replySuccess(0, 'worker-1'); + replySuccess(0, 'worker-2'); + + // Check that all the calls have been received by the same child). + assertCallsToChild( + 0, + ['foo', 'param-0'], + ['foo', 'param-1'], + ['foo', 'param-2'], + ); + + // Check that responses are correct. + expect(await promise0).toBe('worker-0'); + expect(await promise1).toBe('worker-1'); + expect(await promise2).toBe('worker-2'); +}); diff --git a/packages/jest-worker/src/__tests__/index.test.js b/packages/jest-worker/src/__tests__/index.test.js index 927c14254270..9be8f67aaf1c 100644 --- a/packages/jest-worker/src/__tests__/index.test.js +++ b/packages/jest-worker/src/__tests__/index.test.js @@ -11,12 +11,17 @@ let Farm; let Worker; let mockWorkers; +function workerReplyStart(i) { + mockWorkers[i].send.mock.calls[0][1](mockWorkers[i]); +} + +function workerReplyEnd(i, error, result) { + mockWorkers[i].send.mock.calls[0][2](error, result); +} + function workerReply(i, error, result) { - return mockWorkers[i].send.mock.calls[0][1].call( - mockWorkers[i], - error, - result, - ); + workerReplyStart(i); + workerReplyEnd(i, error, result); } beforeEach(() => { @@ -322,9 +327,8 @@ it('checks that once a sticked task finishes, next time is sent to that worker', }); // Worker 1 successfully replies with "17" as a result. - const promise = farm.foo('car', 'plane'); + farm.foo('car', 'plane'); workerReply(1, null, 17); - await promise; // Note that the stickiness is not created by the method name or the arguments // it is solely controlled by the provided "computeWorkerKey" method, which in @@ -341,6 +345,30 @@ it('checks that once a sticked task finishes, next time is sent to that worker', expect(mockWorkers[2].send).toHaveBeenCalledTimes(1); // Only "foo". }); +it('checks that even before a sticked task finishes, next time is sent to that worker', async () => { + const farm = new Farm('/tmp/baz.js', { + computeWorkerKey: () => '1234567890abcdef', + exposedMethods: ['foo', 'bar'], + numWorkers: 3, + }); + + // Call "foo". Not that the worker is sending a start response synchronously. + farm.foo('car', 'plane'); + workerReplyStart(1); + + // Call "bar". Not that the worker is sending a start response synchronously. + farm.bar(); + workerReplyStart(1); + + // The first time, a call with a "1234567890abcdef" hash had never been done + // earlier ("foo" call), so it got queued to all workers. Later, since the one + // that resolved the call was the one in position 1, all subsequent calls are + // only redirected to that worker. + expect(mockWorkers[0].send).toHaveBeenCalledTimes(1); // Only "foo". + expect(mockWorkers[1].send).toHaveBeenCalledTimes(2); // "foo" + "bar". + expect(mockWorkers[2].send).toHaveBeenCalledTimes(1); // Only "foo". +}); + it('checks that once a non-sticked task finishes, next time is sent to all workers', async () => { // Note there is no "computeWorkerKey". const farm = new Farm('/tmp/baz.js', { diff --git a/packages/jest-worker/src/__tests__/worker.test.js b/packages/jest-worker/src/__tests__/worker.test.js index aacba5def4bf..dd4ad947d86e 100644 --- a/packages/jest-worker/src/__tests__/worker.test.js +++ b/packages/jest-worker/src/__tests__/worker.test.js @@ -9,7 +9,7 @@ /* eslint-disable no-new */ -import {EventEmitter} from 'events'; +import EventEmitter from 'events'; import { CHILD_MESSAGE_CALL, @@ -104,9 +104,10 @@ it('stops initializing the worker after the amount of retries is exceeded', () = }); const request = [CHILD_MESSAGE_CALL, false, 'foo', []]; - const callback = jest.fn(); + const onProcessStart = jest.fn(); + const onProcessEnd = jest.fn(); - worker.send(request, callback); + worker.send(request, onProcessStart, onProcessEnd); // We fail four times (initial + three retries). forkInterface.emit('exit'); @@ -115,9 +116,10 @@ it('stops initializing the worker after the amount of retries is exceeded', () = forkInterface.emit('exit'); expect(childProcess.fork).toHaveBeenCalledTimes(5); - expect(callback.mock.calls[0][0]).toBeInstanceOf(Error); - expect(callback.mock.calls[0][0].type).toBe('WorkerError'); - expect(callback.mock.calls[0][1]).toBe(null); + expect(onProcessStart).toBeCalledWith(worker); + expect(onProcessEnd.mock.calls[0][0]).toBeInstanceOf(Error); + expect(onProcessEnd.mock.calls[0][0].type).toBe('WorkerError'); + expect(onProcessEnd.mock.calls[0][1]).toBe(null); }); it('provides stdout and stderr fields from the child process', () => { @@ -141,8 +143,8 @@ it('swtiches the processed flag of a task as soon as it is processed', () => { const request1 = [CHILD_MESSAGE_CALL, false, 'foo', []]; const request2 = [CHILD_MESSAGE_CALL, false, 'bar', []]; - worker.send(request1, () => {}); - worker.send(request2, () => {}); + worker.send(request1, () => {}, () => {}); + worker.send(request2, () => {}, () => {}); // The queue is empty when it got send, so the task is processed. expect(request1[1]).toBe(true); @@ -160,7 +162,7 @@ it('sends the task to the child process', () => { const request = [CHILD_MESSAGE_CALL, false, 'foo', []]; - worker.send(request, () => {}); + worker.send(request, () => {}, () => {}); // Skipping call "0" because it corresponds to the "initialize" one. expect(forkInterface.send.mock.calls[1][0]).toEqual(request); @@ -173,14 +175,16 @@ it('relates replies to requests, in order', () => { workerPath: '/tmp/foo', }); - const callback1 = jest.fn(); + const onProcessStart1 = jest.fn(); + const onProcessEnd1 = jest.fn(); const request1 = [CHILD_MESSAGE_CALL, false, 'foo', []]; - const callback2 = jest.fn(); + const onProcessStart2 = jest.fn(); + const onProcessEnd2 = jest.fn(); const request2 = [CHILD_MESSAGE_CALL, false, 'bar', []]; - worker.send(request1, callback1); - worker.send(request2, callback2); + worker.send(request1, onProcessStart1, onProcessEnd1); + worker.send(request2, onProcessStart2, onProcessEnd2); // 2nd call waits on the queue... expect(request2[1]).toBe(false); @@ -188,9 +192,9 @@ it('relates replies to requests, in order', () => { // then first call replies... forkInterface.emit('message', [PARENT_MESSAGE_OK, 44]); - expect(callback1.mock.calls[0][0]).toBeFalsy(); - expect(callback1.mock.calls[0][1]).toBe(44); - expect(callback1.mock.instances[0]).toBe(worker); + expect(onProcessStart1.mock.calls[0][0]).toBe(worker); + expect(onProcessEnd1.mock.calls[0][0]).toBeFalsy(); + expect(onProcessEnd1.mock.calls[0][1]).toBe(44); // which causes the second call to be processed... expect(request2[1]).toBe(true); @@ -204,8 +208,68 @@ it('relates replies to requests, in order', () => { {}, ]); - expect(callback2.mock.calls[0][0].message).toBe('foo'); - expect(callback2.mock.instances[0]).toBe(worker); + expect(onProcessStart2.mock.calls[0][0]).toBe(worker); + expect(onProcessEnd2.mock.calls[0][0].message).toBe('foo'); +}); + +it('calls the onProcessStart method synchronously if the queue is empty', () => { + const worker = new Worker({ + forkOptions: {}, + maxRetries: 3, + workerPath: '/tmp/foo', + }); + + const onProcessStart = jest.fn(); + const onProcessEnd = jest.fn(); + + worker.send( + [CHILD_MESSAGE_CALL, false, 'foo', []], + onProcessStart, + onProcessEnd, + ); + + // Only onProcessStart has been called + expect(onProcessStart).toHaveBeenCalledTimes(1); + expect(onProcessEnd).not.toHaveBeenCalled(); + + // then first call replies... + forkInterface.emit('message', [PARENT_MESSAGE_OK]); + + expect(onProcessEnd).toHaveBeenCalledTimes(1); +}); + +it('calls the onProcessStart method only when the request is starting to be processed', () => { + const worker = new Worker({ + forkOptions: {}, + maxRetries: 3, + workerPath: '/tmp/foo', + }); + + const onProcessStart1 = jest.fn(); + const onProcessEnd1 = jest.fn(); + + const onProcessStart2 = jest.fn(); + const onProcessEnd2 = jest.fn(); + + worker.send( + [CHILD_MESSAGE_CALL, false, 'foo', []], + onProcessStart1, + onProcessEnd1, + ); + worker.send( + [CHILD_MESSAGE_CALL, false, 'bar', []], + onProcessStart2, + onProcessEnd2, + ); + + // Not called yet since the second request is on the queue. + expect(onProcessStart2).not.toHaveBeenCalled(); + + // then first call replies... + forkInterface.emit('message', [PARENT_MESSAGE_OK]); + + // Now it's been called. + expect(onProcessStart2).toHaveBeenCalledTimes(1); }); it('creates error instances for known errors', () => { @@ -220,7 +284,7 @@ it('creates error instances for known errors', () => { const callback3 = jest.fn(); // Testing a generic ECMAScript error. - worker.send([CHILD_MESSAGE_CALL, false, 'method', []], callback1); + worker.send([CHILD_MESSAGE_CALL, false, 'method', []], () => {}, callback1); forkInterface.emit('message', [ PARENT_MESSAGE_ERROR, @@ -236,7 +300,7 @@ it('creates error instances for known errors', () => { expect(callback1.mock.calls[0][0].stack).toBe('TypeError: bar'); // Testing a custom error. - worker.send([CHILD_MESSAGE_CALL, false, 'method', []], callback2); + worker.send([CHILD_MESSAGE_CALL, false, 'method', []], () => {}, callback2); forkInterface.emit('message', [ PARENT_MESSAGE_ERROR, @@ -253,7 +317,7 @@ it('creates error instances for known errors', () => { expect(callback2.mock.calls[0][0].qux).toBe('extra property'); // Testing a non-object throw. - worker.send([CHILD_MESSAGE_CALL, false, 'method', []], callback3); + worker.send([CHILD_MESSAGE_CALL, false, 'method', []], () => {}, callback3); forkInterface.emit('message', [ PARENT_MESSAGE_ERROR, @@ -273,7 +337,7 @@ it('throws when the child process returns a strange message', () => { workerPath: '/tmp/foo', }); - worker.send([CHILD_MESSAGE_CALL, false, 'method', []], () => {}); + worker.send([CHILD_MESSAGE_CALL, false, 'method', []], () => {}, () => {}); // Type 27 does not exist. expect(() => { diff --git a/packages/jest-worker/src/index.js b/packages/jest-worker/src/index.js index 2193a4773009..f661e65aa667 100644 --- a/packages/jest-worker/src/index.js +++ b/packages/jest-worker/src/index.js @@ -147,7 +147,7 @@ export default class { // We do not cache the request object here. If so, it would only be only // processed by one of the workers, and we want them all to close. for (let i = 0; i < workers.length; i++) { - workers[i].send([CHILD_MESSAGE_END, false], emptyMethod); + workers[i].send([CHILD_MESSAGE_END, false], emptyMethod, emptyMethod); } this._ending = true; @@ -176,27 +176,33 @@ export default class { // Do not use a fat arrow since we need the "this" value, which points to // the worker that executed the call. - function callback(error, result) { + const onProcessStart = worker => { if (hash != null) { - cacheKeys[hash] = this; + cacheKeys[hash] = worker; } + }; + const onProcessEnd = (error, result) => { if (error) { reject(error); } else { resolve(result); } - } + }; // If a worker is pre-selected, use it... if (worker) { - worker.send(request, callback); + worker.send(request, onProcessStart, onProcessEnd); return; } // ... otherwise use all workers, so the first one available will pick it. for (let i = 0; i < length; i++) { - workers[(i + this._offset) % length].send(request, callback); + workers[(i + this._offset) % length].send( + request, + onProcessStart, + onProcessEnd, + ); } this._offset++; diff --git a/packages/jest-worker/src/types.js b/packages/jest-worker/src/types.js index cd61fa2d3dc2..f2ca164cff54 100644 --- a/packages/jest-worker/src/types.js +++ b/packages/jest-worker/src/types.js @@ -24,6 +24,8 @@ export const PARENT_MESSAGE_ERROR: 1 = 1; // Option objects. +import type Worker from './worker'; + export type ForkOptions = { cwd?: string, env?: Object, @@ -94,10 +96,12 @@ export type ParentMessage = ParentMessageOk | ParentMessageError; // Queue types. -export type QueueCallback = (?Error, ?any) => void; +export type OnProcessStart = Worker => void; +export type OnProcessEnd = (?Error, ?any) => void; export type QueueChildMessage = {| request: ChildMessage, - callback: QueueCallback, + onProcessStart: OnProcessStart, + onProcessEnd: OnProcessEnd, next: ?QueueChildMessage, |}; diff --git a/packages/jest-worker/src/worker.js b/packages/jest-worker/src/worker.js index 80562376828b..5eee64af241e 100644 --- a/packages/jest-worker/src/worker.js +++ b/packages/jest-worker/src/worker.js @@ -22,8 +22,9 @@ import type {Readable} from 'stream'; import type { ChildMessage, - QueueCallback, QueueChildMessage, + OnProcessEnd, + OnProcessStart, WorkerOptions, } from './types'; @@ -68,8 +69,12 @@ export default class { return this._child.stderr; } - send(request: ChildMessage, callback: QueueCallback) { - const item = {callback, next: null, request}; + send( + request: ChildMessage, + onProcessStart: OnProcessStart, + onProcessEnd: OnProcessEnd, + ) { + const item = {next: null, onProcessEnd, onProcessStart, request}; if (this._last) { this._last.next = item; @@ -145,6 +150,9 @@ export default class { // have to process it as well. item.request[1] = true; + // Tell the parent that this item is starting to be processed. + item.onProcessStart(this); + this._retries = 0; this._busy = true; @@ -162,14 +170,14 @@ export default class { throw new TypeError('Unexpected response with an empty queue'); } - const callback = item.callback; + const onProcessEnd = item.onProcessEnd; this._busy = false; this._process(); switch (response[0]) { case PARENT_MESSAGE_OK: - callback.call(this, null, response[1]); + onProcessEnd(null, response[1]); break; case PARENT_MESSAGE_ERROR: @@ -191,7 +199,7 @@ export default class { } } - callback.call(this, error, null); + onProcessEnd(error, null); break; default: