From 37200eb7cb0abac55801d32ed439f2925df3899e Mon Sep 17 00:00:00 2001 From: Paul Hawxby Date: Fri, 5 Aug 2022 23:57:16 +0100 Subject: [PATCH] feat: add workerIdleMemoryLimit to support worker recycling in the event of node >16.11.0 memory leaks (#13056) --- .circleci/config.yml | 6 +- .gitignore | 3 + CHANGELOG.md | 1 + docs/Configuration.md | 39 +++ e2e/__tests__/workerForceExit.test.ts | 2 +- jest.config.mjs | 6 +- .../src/__tests__/normalize.test.ts | 12 + .../src/__tests__/stringToBytes.test.ts | 127 ++++++++ packages/jest-config/src/normalize.ts | 5 + packages/jest-config/src/stringToBytes.ts | 90 ++++++ packages/jest-runner/src/index.ts | 6 + packages/jest-types/src/Config.ts | 4 + packages/jest-worker/README.md | 15 + .../src/__tests__/process-integration.test.js | 40 ++- .../jest-worker/src/base/BaseWorkerPool.ts | 1 + packages/jest-worker/src/index.ts | 1 + packages/jest-worker/src/types.ts | 53 +++- .../src/workers/ChildProcessWorker.ts | 237 +++++++++++++-- .../src/workers/NodeThreadsWorker.ts | 181 +++++++++++- .../__tests__/ChildProcessWorker.test.js | 191 +++++++++++- .../workers/__tests__/WorkerEdgeCases.test.js | 279 ++++++++++++++++++ .../__tests__/__fixtures__/EdgeCasesWorker.js | 42 +++ .../workers/__tests__/processChild.test.js | 15 + .../jest-worker/src/workers/processChild.ts | 20 ++ .../jest-worker/src/workers/threadChild.ts | 20 ++ 25 files changed, 1358 insertions(+), 38 deletions(-) create mode 100644 packages/jest-config/src/__tests__/stringToBytes.test.ts create mode 100644 packages/jest-config/src/stringToBytes.ts create mode 100644 packages/jest-worker/src/workers/__tests__/WorkerEdgeCases.test.js create mode 100644 packages/jest-worker/src/workers/__tests__/__fixtures__/EdgeCasesWorker.js diff --git a/.circleci/config.yml b/.circleci/config.yml index 34b8da8592e5..61be28b5a409 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -42,7 +42,11 @@ jobs: node-version: lts/* - node/install-packages: *install - run: - command: JEST_JASMINE=1 yarn test-ci-partial --shard=$(expr $CIRCLE_NODE_INDEX + 1)/$CIRCLE_NODE_TOTAL && JEST_JASMINE=1 yarn test-leak + name: Test + command: JEST_JASMINE=1 yarn test-ci-partial --shard=$(expr $CIRCLE_NODE_INDEX + 1)/$CIRCLE_NODE_TOTAL + - run: + name: Leak test + command: JEST_JASMINE=1 yarn test-leak - store_test_results: path: reports/junit diff --git a/.gitignore b/.gitignore index 7dfb5101b363..1a8975f59cf9 100644 --- a/.gitignore +++ b/.gitignore @@ -53,3 +53,6 @@ api-extractor.json **/.pnp.* crowdin-cli.jar + +# We don't want these temp files +packages/jest-worker/src/workers/__tests__/__temp__ diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f83b1fe2ab3..4a6776b013d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - `[jest-config]` [**BREAKING**] Make `snapshotFormat` default to `escapeString: false` and `printBasicPrototype: false` ([#13036](https://github.com/facebook/jest/pull/13036)) - `[jest-environment-jsdom]` [**BREAKING**] Upgrade to `jsdom@20` ([#13037](https://github.com/facebook/jest/pull/13037), [#13058](https://github.com/facebook/jest/pull/13058)) - `[pretty-format]` [**BREAKING**] Remove `ConvertAnsi` plugin in favour of `jest-serializer-ansi-escapes` ([#13040](https://github.com/facebook/jest/pull/13040)) +- `[jest-worker]` Adds `workerIdleMemoryLimit` option which is used as a check for worker memory leaks >= Node 16.11.0 and recycles child workers as required. ([#13056](https://github.com/facebook/jest/pull/13056)) ### Fixes diff --git a/docs/Configuration.md b/docs/Configuration.md index 75028672b31c..b6d56b9ec376 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -2254,6 +2254,45 @@ Default: `true` Whether to use [`watchman`](https://facebook.github.io/watchman/) for file crawling. +### `workerIdleMemoryLimit` \[number|string] + +Default: `undefined` + +Specifies the memory limit for workers before they are recycled and is primarily a work-around for [this issue](https://github.com/facebook/jest/issues/11956); + +After the worker has executed a test the memory usage of it is checked. If it exceeds the value specified the worker is killed and restarted. The limit can be specified in a number of different ways and whatever the result is `Math.floor` is used to turn it into an integer value: + +- `<= 1` - The value is assumed to be a percentage of system memory. So 0.5 sets the memory limit of the worker to half of the total system memory +- `\> 1` - Assumed to be a fixed byte value. Because of the previous rule if you wanted a value of 1 byte (I don't know why) you could use `1.1`. +- With units + - `50%` - As above, a percentage of total system memory + - `100KB`, `65MB`, etc - With units to denote a fixed memory limit. + - `K` / `KB` - Kilobytes (x1000) + - `KiB` - Kibibytes (x1024) + - `M` / `MB` - Megabytes + - `MiB` - Mebibytes + - `G` / `GB` - Gigabytes + - `GiB` - Gibibytes + +```js tab +/** @type {import('jest').Config} */ +const config = { + workerIdleMemoryLimit: 0.2, +}; + +module.exports = config; +``` + +```ts tab +import type {Config} from 'jest'; + +const config: Config = { + workerIdleMemoryLimit: 0.2, +}; + +export default config; +``` + ### `//` \[string] This option allows comments in `package.json`. Include the comment text as the value of this key: diff --git a/e2e/__tests__/workerForceExit.test.ts b/e2e/__tests__/workerForceExit.test.ts index 6ce9032faf99..045c1ce702fc 100644 --- a/e2e/__tests__/workerForceExit.test.ts +++ b/e2e/__tests__/workerForceExit.test.ts @@ -74,4 +74,4 @@ test('force exits a worker that fails to exit gracefully', async () => { expect(pidNumber).not.toBeNaN(); expect(await findProcess('pid', pidNumber)).toHaveLength(0); -}); +}, 15000); diff --git a/jest.config.mjs b/jest.config.mjs index 39ff9114a445..192ccd07dcfa 100644 --- a/jest.config.mjs +++ b/jest.config.mjs @@ -66,6 +66,7 @@ export default { '/packages/jest-snapshot/src/__tests__/plugins', '/packages/jest-snapshot/src/__tests__/fixtures/', '/packages/jest-validate/src/__tests__/fixtures/', + '/packages/jest-worker/src/workers/__tests__/__fixtures__/', '/e2e/__tests__/iterator-to-null-test.ts', '/e2e/__tests__/tsIntegration.test.ts', // this test needs types to be build, it runs in a separate CI job through `jest.config.ts.mjs` ], @@ -73,7 +74,10 @@ export default { transform: { '\\.[jt]sx?$': require.resolve('babel-jest'), }, - watchPathIgnorePatterns: ['coverage'], + watchPathIgnorePatterns: [ + 'coverage', + '/packages/jest-worker/src/workers/__tests__/__temp__', + ], watchPlugins: [ require.resolve('jest-watch-typeahead/filename'), require.resolve('jest-watch-typeahead/testname'), diff --git a/packages/jest-config/src/__tests__/normalize.test.ts b/packages/jest-config/src/__tests__/normalize.test.ts index 4ec6c24d5963..39f4c0da60a1 100644 --- a/packages/jest-config/src/__tests__/normalize.test.ts +++ b/packages/jest-config/src/__tests__/normalize.test.ts @@ -2112,3 +2112,15 @@ describe('logs a deprecation warning', () => { expect(console.warn).toMatchSnapshot(); }); }); + +it('parses workerIdleMemoryLimit', async () => { + const {options} = await normalize( + { + rootDir: '/root/', + workerIdleMemoryLimit: '45MiB', + }, + {} as Config.Argv, + ); + + expect(options.workerIdleMemoryLimit).toEqual(47185920); +}); diff --git a/packages/jest-config/src/__tests__/stringToBytes.test.ts b/packages/jest-config/src/__tests__/stringToBytes.test.ts new file mode 100644 index 000000000000..7c5902ec1acb --- /dev/null +++ b/packages/jest-config/src/__tests__/stringToBytes.test.ts @@ -0,0 +1,127 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import stringToBytes from '../stringToBytes'; + +describe('numeric input', () => { + test('> 1 represents bytes', () => { + expect(stringToBytes(50.8)).toEqual(50); + }); + + test('1.1 should be a 1', () => { + expect(stringToBytes(1.1, 54)).toEqual(1); + }); + + test('< 1 represents a %', () => { + expect(stringToBytes(0.3, 51)).toEqual(15); + }); + + test('should throw when no reference supplied', () => { + expect(() => stringToBytes(0.3)).toThrowError(); + }); + + test('should throw on a bad input', () => { + expect(() => stringToBytes(-0.3, 51)).toThrowError(); + }); +}); + +describe('string input', () => { + describe('numeric passthrough', () => { + test('> 1 represents bytes', () => { + expect(stringToBytes('50.8')).toEqual(50); + }); + + test('< 1 represents a %', () => { + expect(stringToBytes('0.3', 51)).toEqual(15); + }); + + test('should throw when no reference supplied', () => { + expect(() => stringToBytes('0.3')).toThrowError(); + }); + + test('should throw on a bad input', () => { + expect(() => stringToBytes('-0.3', 51)).toThrowError(); + }); + }); + + describe('parsing', () => { + test('0% should throw an error', () => { + expect(() => stringToBytes('0%', 51)).toThrowError(); + }); + + test('30%', () => { + expect(stringToBytes('30%', 51)).toEqual(15); + }); + + test('80%', () => { + expect(stringToBytes('80%', 51)).toEqual(40); + }); + + test('100%', () => { + expect(stringToBytes('100%', 51)).toEqual(51); + }); + + // The units caps is intentionally janky to test for forgiving string parsing. + describe('k', () => { + test('30k', () => { + expect(stringToBytes('30K')).toEqual(30000); + }); + + test('30KB', () => { + expect(stringToBytes('30kB')).toEqual(30000); + }); + + test('30KiB', () => { + expect(stringToBytes('30kIb')).toEqual(30720); + }); + }); + + describe('m', () => { + test('30M', () => { + expect(stringToBytes('30M')).toEqual(30000000); + }); + + test('30MB', () => { + expect(stringToBytes('30MB')).toEqual(30000000); + }); + + test('30MiB', () => { + expect(stringToBytes('30MiB')).toEqual(31457280); + }); + }); + + describe('g', () => { + test('30G', () => { + expect(stringToBytes('30G')).toEqual(30000000000); + }); + + test('30GB', () => { + expect(stringToBytes('30gB')).toEqual(30000000000); + }); + + test('30GiB', () => { + expect(stringToBytes('30GIB')).toEqual(32212254720); + }); + }); + + test('unknown unit', () => { + expect(() => stringToBytes('50XX')).toThrowError(); + }); + }); +}); + +test('nesting', () => { + expect(stringToBytes(stringToBytes(stringToBytes('30%', 51)))).toEqual(15); +}); + +test('null', () => { + expect(stringToBytes(null)).toEqual(null); +}); + +test('undefined', () => { + expect(stringToBytes(undefined)).toEqual(undefined); +}); diff --git a/packages/jest-config/src/normalize.ts b/packages/jest-config/src/normalize.ts index 52e67fc288a2..753955c89be2 100644 --- a/packages/jest-config/src/normalize.ts +++ b/packages/jest-config/src/normalize.ts @@ -6,6 +6,7 @@ */ import {createHash} from 'crypto'; +import {totalmem} from 'os'; import * as path from 'path'; import chalk = require('chalk'); import merge = require('deepmerge'); @@ -36,6 +37,7 @@ import {DEFAULT_JS_PATTERN} from './constants'; import getMaxWorkers from './getMaxWorkers'; import {parseShardPair} from './parseShardPair'; import setFromArgv from './setFromArgv'; +import stringToBytes from './stringToBytes'; import { BULLET, DOCUMENTATION_NOTE, @@ -969,6 +971,9 @@ export default async function normalize( case 'watchman': value = oldOptions[key]; break; + case 'workerIdleMemoryLimit': + value = stringToBytes(oldOptions[key], totalmem()); + break; case 'watchPlugins': value = (oldOptions[key] || []).map(watchPlugin => { if (typeof watchPlugin === 'string') { diff --git a/packages/jest-config/src/stringToBytes.ts b/packages/jest-config/src/stringToBytes.ts new file mode 100644 index 000000000000..f9fa6d72b37f --- /dev/null +++ b/packages/jest-config/src/stringToBytes.ts @@ -0,0 +1,90 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +function stringToBytes( + input: undefined, + percentageReference?: number, +): undefined; +function stringToBytes(input: null, percentageReference?: number): null; +function stringToBytes( + input: string | number, + percentageReference?: number, +): number; + +/** + * Converts a string representing an amount of memory to bytes. + * + * @param input The value to convert to bytes. + * @param percentageReference The reference value to use when a '%' value is supplied. + */ +function stringToBytes( + input: string | number | null | undefined, + percentageReference?: number, +): number | null | undefined { + if (input === null || input === undefined) { + return input; + } + + if (typeof input === 'string') { + if (isNaN(Number.parseFloat(input.slice(-1)))) { + // eslint-disable-next-line prefer-const + let [, numericString, trailingChars] = + input.match(/(.*?)([^0-9.-]+)$/i) || []; + + if (trailingChars && numericString) { + const numericValue = Number.parseFloat(numericString); + trailingChars = trailingChars.toLowerCase(); + + switch (trailingChars) { + case '%': + input = numericValue / 100; + break; + case 'kb': + case 'k': + return numericValue * 1000; + case 'kib': + return numericValue * 1024; + case 'mb': + case 'm': + return numericValue * 1000 * 1000; + case 'mib': + return numericValue * 1024 * 1024; + case 'gb': + case 'g': + return numericValue * 1000 * 1000 * 1000; + case 'gib': + return numericValue * 1024 * 1024 * 1024; + } + } + + // It ends in some kind of char so we need to do some parsing + } else { + input = Number.parseFloat(input); + } + } + + if (typeof input === 'number') { + if (input <= 1 && input > 0) { + if (percentageReference) { + return Math.floor(input * percentageReference); + } else { + throw new Error( + 'For a percentage based memory limit a percentageReference must be supplied', + ); + } + } else if (input > 1) { + return Math.floor(input); + } else { + throw new Error('Unexpected numerical input'); + } + } + + throw new Error('Unexpected input'); +} + +// https://github.com/import-js/eslint-plugin-import/issues/1590 +export default stringToBytes; diff --git a/packages/jest-runner/src/index.ts b/packages/jest-runner/src/index.ts index d43f899f796d..c646c432232c 100644 --- a/packages/jest-runner/src/index.ts +++ b/packages/jest-runner/src/index.ts @@ -107,6 +107,12 @@ export default class TestRunner extends EmittingTestRunner { const worker = new Worker(require.resolve('./testWorker'), { exposedMethods: ['worker'], forkOptions: {serialization: 'json', stdio: 'pipe'}, + // The workerIdleMemoryLimit should've been converted to a number during + // the normalization phase. + idleMemoryLimit: + typeof this._globalConfig.workerIdleMemoryLimit === 'number' + ? this._globalConfig.workerIdleMemoryLimit + : undefined, maxRetries: 3, numWorkers: this._globalConfig.maxWorkers, setupArgs: [{serializableResolvers: Array.from(resolvers.values())}], diff --git a/packages/jest-types/src/Config.ts b/packages/jest-types/src/Config.ts index e7854413bfd6..87a4eb29bdc2 100644 --- a/packages/jest-types/src/Config.ts +++ b/packages/jest-types/src/Config.ts @@ -327,6 +327,7 @@ export type InitialOptions = Partial<{ watchAll: boolean; watchman: boolean; watchPlugins: Array]>; + workerIdleMemoryLimit: number; }>; export type SnapshotUpdateState = 'all' | 'new' | 'none'; @@ -420,6 +421,7 @@ export type GlobalConfig = { path: string; config: Record; }> | null; + workerIdleMemoryLimit?: number; }; export type ProjectConfig = { @@ -478,6 +480,7 @@ export type ProjectConfig = { transformIgnorePatterns: Array; watchPathIgnorePatterns: Array; unmockedModulePathPatterns?: Array; + workerIdleMemoryLimit?: number; }; export type Argv = Arguments< @@ -571,5 +574,6 @@ export type Argv = Arguments< watchAll: boolean; watchman: boolean; watchPathIgnorePatterns: Array; + workerIdleMemoryLimit: number; }> >; diff --git a/packages/jest-worker/README.md b/packages/jest-worker/README.md index c8a29401c19d..1277ed824bf2 100644 --- a/packages/jest-worker/README.md +++ b/packages/jest-worker/README.md @@ -75,6 +75,15 @@ List of method names that can be called on the child processes from the parent p Allow customizing all options passed to `child_process.fork`. By default, some values are set (`cwd`, `env`, `execArgv` and `serialization`), but you can override them and customize the rest. For a list of valid values, check [the Node documentation](https://nodejs.org/api/child_process.html#child_process_child_process_fork_modulepath_args_options). +#### `idleMemoryLimit: number` (optional) + +Specifies the memory limit for workers before they are recycled and is primarily a work-around for [this issue](https://github.com/facebook/jest/issues/11956); + +After the worker has executed a task the memory usage of it is checked. If it exceeds the value specified the worker is killed and restarted. If no limit is set this process does not occur. The limit can be specified in 2 ways: + +- `<= 1` - The value is assumed to be a percentage of system memory. So 0.5 sets the memory limit of the worker to half of the total system memory +- `\> 1` - Assumed to be a fixed byte value. Because of the previous rule if you wanted a value of 1 byte (I don't know why) you could use `1.1`. + #### `maxRetries: number` (optional) Maximum amount of times that a dead child can be re-spawned, per call. Defaults to `3`, pass `Infinity` to allow endless retries. @@ -87,6 +96,12 @@ Amount of workers to spawn. Defaults to the number of CPUs minus 1. The `resourceLimits` option which will be passed to `worker_threads` workers. +#### `silent: Boolean` (optional) + +Set to false for `stdout` and `stderr` to be logged to console. + +By default this is true. + #### `setupArgs: Array` (optional) The arguments that will be passed to the `setup` method during initialization. diff --git a/packages/jest-worker/src/__tests__/process-integration.test.js b/packages/jest-worker/src/__tests__/process-integration.test.js index d5e3f0950504..2674ab21c27f 100644 --- a/packages/jest-worker/src/__tests__/process-integration.test.js +++ b/packages/jest-worker/src/__tests__/process-integration.test.js @@ -6,7 +6,12 @@ */ import EventEmitter from 'events'; -import {CHILD_MESSAGE_CALL, PARENT_MESSAGE_OK} from '../types'; +import { + CHILD_MESSAGE_CALL, + CHILD_MESSAGE_MEM_USAGE, + PARENT_MESSAGE_OK, + WorkerFarmOptions, +} from '../types'; let Farm; let mockForkedProcesses; @@ -15,6 +20,7 @@ function mockBuildForkedProcess() { const mockChild = new EventEmitter(); mockChild.send = jest.fn(); + mockChild.connected = true; return mockChild; } @@ -175,4 +181,36 @@ describe('Jest Worker Integration', () => { expect(await promise1).toBe('worker-1'); expect(await promise2).toBe('worker-2'); }); + + it('should check for memory limits', async () => { + /** @type WorkerFarmOptions */ + const options = { + computeWorkerKey: () => '1234567890abcdef', + exposedMethods: ['foo', 'bar'], + idleMemoryLimit: 0.4, + numWorkers: 2, + }; + + const farm = new Farm('/tmp/baz.js', options); + + // Send a call to the farm + const promise0 = farm.foo('param-0'); + + // Send different responses for each call (from the same child). + replySuccess(0, 'worker-0'); + + // Check that all the calls have been received by the same child). + // We're not using the assertCallsToChild helper because we need to check + // for other send types. + expect(mockForkedProcesses[0].send).toHaveBeenCalledTimes(3); + expect(mockForkedProcesses[0].send.mock.calls[1][0]).toEqual([ + CHILD_MESSAGE_CALL, + true, + 'foo', + ['param-0'], + ]); + expect(mockForkedProcesses[0].send.mock.calls[2][0]).toEqual([ + CHILD_MESSAGE_MEM_USAGE, + ]); + }); }); diff --git a/packages/jest-worker/src/base/BaseWorkerPool.ts b/packages/jest-worker/src/base/BaseWorkerPool.ts index da6e57d1b707..801e30eba957 100644 --- a/packages/jest-worker/src/base/BaseWorkerPool.ts +++ b/packages/jest-worker/src/base/BaseWorkerPool.ts @@ -40,6 +40,7 @@ export default class BaseWorkerPool { for (let i = 0; i < options.numWorkers; i++) { const workerOptions: WorkerOptions = { forkOptions, + idleMemoryLimit: this._options.idleMemoryLimit, maxRetries, resourceLimits, setupArgs, diff --git a/packages/jest-worker/src/index.ts b/packages/jest-worker/src/index.ts index c540c6a5fabe..e8a531f6458b 100644 --- a/packages/jest-worker/src/index.ts +++ b/packages/jest-worker/src/index.ts @@ -96,6 +96,7 @@ export class Worker { const workerPoolOptions: WorkerPoolOptions = { enableWorkerThreads: this._options.enableWorkerThreads ?? false, forkOptions: this._options.forkOptions ?? {}, + idleMemoryLimit: this._options.idleMemoryLimit, maxRetries: this._options.maxRetries ?? 3, numWorkers: this._options.numWorkers ?? Math.max(cpus().length - 1, 1), resourceLimits: this._options.resourceLimits ?? {}, diff --git a/packages/jest-worker/src/types.ts b/packages/jest-worker/src/types.ts index 7bf383a4d7cc..302771e10b0a 100644 --- a/packages/jest-worker/src/types.ts +++ b/packages/jest-worker/src/types.ts @@ -36,11 +36,13 @@ export type WorkerModule = { export const CHILD_MESSAGE_INITIALIZE = 0; export const CHILD_MESSAGE_CALL = 1; export const CHILD_MESSAGE_END = 2; +export const CHILD_MESSAGE_MEM_USAGE = 3; export const PARENT_MESSAGE_OK = 0; export const PARENT_MESSAGE_CLIENT_ERROR = 1; export const PARENT_MESSAGE_SETUP_ERROR = 2; export const PARENT_MESSAGE_CUSTOM = 3; +export const PARENT_MESSAGE_MEM_USAGE = 4; export type PARENT_MESSAGE_ERROR = | typeof PARENT_MESSAGE_CLIENT_ERROR @@ -76,6 +78,11 @@ export interface WorkerInterface { getWorkerId(): number; getStderr(): NodeJS.ReadableStream | null; getStdout(): NodeJS.ReadableStream | null; + /** + * Some system level identifier for the worker. IE, process id, thread id, etc. + */ + getWorkerSystemId(): number; + getMemoryUsage(): Promise; } export type PoolExitResult = { @@ -122,6 +129,7 @@ export type WorkerFarmOptions = { options?: WorkerPoolOptions, ) => WorkerPoolInterface; workerSchedulingPolicy?: WorkerSchedulingPolicy; + idleMemoryLimit?: number; }; export type WorkerPoolOptions = { @@ -131,6 +139,7 @@ export type WorkerPoolOptions = { maxRetries: number; numWorkers: number; enableWorkerThreads: boolean; + idleMemoryLimit?: number; }; export type WorkerOptions = { @@ -141,6 +150,26 @@ export type WorkerOptions = { workerId: number; workerData?: unknown; workerPath: string; + /** + * After a job has executed the memory usage it should return to. + * + * @remarks + * Note this is different from ResourceLimits in that it checks at idle, after + * a job is complete. So you could have a resource limit of 500MB but an idle + * limit of 50MB. The latter will only trigger if after a job has completed the + * memory usage hasn't returned back down under 50MB. + */ + idleMemoryLimit?: number; + /** + * This mainly exists so the path can be changed during testing. + * https://github.com/facebook/jest/issues/9543 + */ + childWorkerPath?: string; + /** + * This is useful for debugging individual tests allowing you to see + * the raw output of the worker. + */ + silent?: boolean; }; // Messages passed from the parent to the children. @@ -174,10 +203,15 @@ export type ChildMessageEnd = [ boolean, // processed ]; +export type ChildMessageMemUsage = [ + typeof CHILD_MESSAGE_MEM_USAGE, // type +]; + export type ChildMessage = | ChildMessageInitialize | ChildMessageCall - | ChildMessageEnd; + | ChildMessageEnd + | ChildMessageMemUsage; // Messages passed from the children to the parent. @@ -191,6 +225,11 @@ export type ParentMessageOk = [ unknown, // result ]; +export type ParentMessageMemUsage = [ + typeof PARENT_MESSAGE_MEM_USAGE, // type + number, // used memory in bytes +]; + export type ParentMessageError = [ PARENT_MESSAGE_ERROR, // type string, // constructor @@ -202,7 +241,8 @@ export type ParentMessageError = [ export type ParentMessage = | ParentMessageOk | ParentMessageError - | ParentMessageCustom; + | ParentMessageCustom + | ParentMessageMemUsage; // Queue types. @@ -216,3 +256,12 @@ export type QueueChildMessage = { onEnd: OnEnd; onCustomMessage: OnCustomMessage; }; + +export enum WorkerStates { + STARTING = 'starting', + OK = 'ok', + OUT_OF_MEMORY = 'oom', + RESTARTING = 'restarting', + SHUTTING_DOWN = 'shutting-down', + SHUT_DOWN = 'shut-down', +} diff --git a/packages/jest-worker/src/workers/ChildProcessWorker.ts b/packages/jest-worker/src/workers/ChildProcessWorker.ts index 9381310a30dc..4ef4d51b87cc 100644 --- a/packages/jest-worker/src/workers/ChildProcessWorker.ts +++ b/packages/jest-worker/src/workers/ChildProcessWorker.ts @@ -5,23 +5,27 @@ * LICENSE file in the root directory of this source tree. */ -import {ChildProcess, fork} from 'child_process'; +import {ChildProcess, ForkOptions, fork} from 'child_process'; +import {totalmem} from 'os'; import {PassThrough} from 'stream'; import mergeStream = require('merge-stream'); import {stdout as stdoutSupportsColor} from 'supports-color'; import { CHILD_MESSAGE_INITIALIZE, + CHILD_MESSAGE_MEM_USAGE, ChildMessage, OnCustomMessage, OnEnd, OnStart, PARENT_MESSAGE_CLIENT_ERROR, PARENT_MESSAGE_CUSTOM, + PARENT_MESSAGE_MEM_USAGE, PARENT_MESSAGE_OK, PARENT_MESSAGE_SETUP_ERROR, ParentMessage, WorkerInterface, WorkerOptions, + WorkerStates, } from '../types'; const SIGNAL_BASE_EXIT_CODE = 128; @@ -65,6 +69,16 @@ export default class ChildProcessWorker implements WorkerInterface { private _exitPromise: Promise; private _resolveExitPromise!: () => void; + private _memoryUsagePromise: Promise | undefined; + private _resolveMemoryUsage: ((arg0: number) => void) | undefined; + + private _childIdleMemoryUsage: number | null; + private _childIdleMemoryUsageLimit: number | null; + private _memoryUsageCheck = false; + private _state: WorkerStates; + + private _childWorkerPath: string; + constructor(options: WorkerOptions) { this._options = options; @@ -73,17 +87,37 @@ export default class ChildProcessWorker implements WorkerInterface { this._fakeStream = null; this._stdout = null; this._stderr = null; + this._childIdleMemoryUsage = null; + this._childIdleMemoryUsageLimit = options.idleMemoryLimit || null; this._exitPromise = new Promise(resolve => { this._resolveExitPromise = resolve; }); + this._childWorkerPath = + options.childWorkerPath || require.resolve('./processChild'); + + this._state = WorkerStates.STARTING; this.initialize(); } initialize(): void { + if ( + this._state === WorkerStates.OUT_OF_MEMORY || + this._state === WorkerStates.SHUTTING_DOWN || + this._state === WorkerStates.SHUT_DOWN + ) { + return; + } + + if (this._child && this._child.connected) { + this._child.kill('SIGKILL'); + } + + this._state = WorkerStates.STARTING; + const forceColor = stdoutSupportsColor ? {FORCE_COLOR: '1'} : {}; - const child = fork(require.resolve('./processChild'), [], { + const options: ForkOptions = { cwd: process.cwd(), env: { ...process.env, @@ -94,9 +128,11 @@ export default class ChildProcessWorker implements WorkerInterface { execArgv: process.execArgv.filter(v => !/^--(debug|inspect)/.test(v)), // default to advanced serialization in order to match worker threads serialization: 'advanced', - silent: true, + silent: this._options.silent ?? true, ...this._options.forkOptions, - }); + }; + + const child = fork(this._childWorkerPath, [], options); if (child.stdout) { if (!this._stdout) { @@ -118,6 +154,7 @@ export default class ChildProcessWorker implements WorkerInterface { this._stderr.add(child.stderr); } + this._detectOutOfMemoryCrash(child); child.on('message', this._onMessage.bind(this)); child.on('exit', this._onExit.bind(this)); @@ -129,6 +166,7 @@ export default class ChildProcessWorker implements WorkerInterface { ]); this._child = child; + this._state = WorkerStates.OK; this._retries++; @@ -147,10 +185,41 @@ export default class ChildProcessWorker implements WorkerInterface { error.stack!, {type: 'WorkerError'}, ]); + + // Clear the request so we don't keep executing it. + this._request = null; } } + private _detectOutOfMemoryCrash(child: ChildProcess): void { + let stderrStr = ''; + + const handler = (chunk: any) => { + if (this._state !== WorkerStates.OUT_OF_MEMORY) { + let str: string | undefined = undefined; + + if (chunk instanceof Buffer) { + str = chunk.toString('utf8'); + } else if (typeof chunk === 'string') { + str = chunk; + } + + if (str) { + stderrStr += str; + } + + if (stderrStr.includes('heap out of memory')) { + this._state = WorkerStates.OUT_OF_MEMORY; + } + } + }; + + child.stderr?.on('data', handler); + } + private _shutdown() { + this._state = WorkerStates.SHUTTING_DOWN; + // End the temporary streams so the merged streams end too if (this._fakeStream) { this._fakeStream.end(); @@ -198,20 +267,72 @@ export default class ChildProcessWorker implements WorkerInterface { this._onProcessEnd(error, null); break; + case PARENT_MESSAGE_CUSTOM: this._onCustomMessage(response[1]); break; + + case PARENT_MESSAGE_MEM_USAGE: + this._childIdleMemoryUsage = response[1]; + + if (this._resolveMemoryUsage) { + this._resolveMemoryUsage(response[1]); + + this._resolveMemoryUsage = undefined; + this._memoryUsagePromise = undefined; + } + + this._performRestartIfRequired(); + break; + default: throw new TypeError(`Unexpected response from worker: ${response[0]}`); } } - private _onExit(exitCode: number | null, signal: NodeJS.Signals | null) { - if ( - exitCode !== 0 && - exitCode !== null && - exitCode !== SIGTERM_EXIT_CODE && - exitCode !== SIGKILL_EXIT_CODE + private _performRestartIfRequired(): void { + if (this._memoryUsageCheck) { + this._memoryUsageCheck = false; + + let limit = this._childIdleMemoryUsageLimit; + + // TODO: At some point it would make sense to make use of + // stringToBytes found in jest-config, however as this + // package does not have any dependencies on an other jest + // packages that can wait until some other time. + if (limit && limit > 0 && limit <= 1) { + limit = Math.floor(totalmem() * limit); + } else if (limit) { + limit = Math.floor(limit); + } + + if ( + limit && + this._childIdleMemoryUsage && + this._childIdleMemoryUsage > limit + ) { + this._state = WorkerStates.RESTARTING; + + this.killChild(); + } + } + } + + private _onExit(exitCode: number | null) { + if (exitCode !== 0 && this._state === WorkerStates.OUT_OF_MEMORY) { + this._onProcessEnd( + new Error('Jest worker ran out of memory and crashed'), + null, + ); + + this._shutdown(); + } else if ( + (exitCode !== 0 && + exitCode !== null && + exitCode !== SIGTERM_EXIT_CODE && + exitCode !== SIGKILL_EXIT_CODE && + this._state !== WorkerStates.SHUTTING_DOWN) || + this._state === WorkerStates.RESTARTING ) { this.initialize(); @@ -219,17 +340,6 @@ export default class ChildProcessWorker implements WorkerInterface { this._child.send(this._request); } } else { - if (signal === 'SIGABRT') { - // When a child process worker crashes due to lack of memory this prevents - // jest from spinning and failing to exit. It could be argued it should restart - // the process, but if you're running out of memory then restarting processes - // is only going to make matters worse. - this._onProcessEnd( - new Error(`Process exited unexpectedly: ${signal}`), - null, - ); - } - this._shutdown(); } } @@ -241,10 +351,16 @@ export default class ChildProcessWorker implements WorkerInterface { onCustomMessage: OnCustomMessage, ): void { onProcessStart(this); + this._onProcessEnd = (...args) => { // Clean the request to avoid sending past requests to workers that fail // while waiting for a new request (timers, unhandled rejections...) this._request = null; + + if (this._childIdleMemoryUsageLimit && this._child.connected) { + this.checkMemoryUsage(); + } + return onProcessEnd(...args); }; @@ -260,12 +376,15 @@ export default class ChildProcessWorker implements WorkerInterface { return this._exitPromise; } - forceExit(): void { + killChild(): NodeJS.Timeout { this._child.kill('SIGTERM'); - const sigkillTimeout = setTimeout( - () => this._child.kill('SIGKILL'), - SIGKILL_DELAY, - ); + return setTimeout(() => this._child.kill('SIGKILL'), SIGKILL_DELAY); + } + + forceExit(): void { + this._state = WorkerStates.SHUTTING_DOWN; + + const sigkillTimeout = this.killChild(); this._exitPromise.then(() => clearTimeout(sigkillTimeout)); } @@ -273,6 +392,15 @@ export default class ChildProcessWorker implements WorkerInterface { return this._options.workerId; } + /** + * Gets the process id of the worker. + * + * @returns Process id. + */ + getWorkerSystemId(): number { + return this._child.pid; + } + getStdout(): NodeJS.ReadableStream | null { return this._stdout; } @@ -281,6 +409,63 @@ export default class ChildProcessWorker implements WorkerInterface { return this._stderr; } + /** + * Gets the last reported memory usage. + * + * @returns Memory usage in bytes. + */ + getMemoryUsage(): Promise { + if (!this._memoryUsagePromise) { + let rejectCallback!: (err: Error) => void; + + const promise = new Promise((resolve, reject) => { + this._resolveMemoryUsage = resolve; + rejectCallback = reject; + }); + this._memoryUsagePromise = promise; + + if (!this._child.connected && rejectCallback) { + rejectCallback(new Error('Child process is not running.')); + + this._memoryUsagePromise = undefined; + this._resolveMemoryUsage = undefined; + + return promise; + } + + this._child.send([CHILD_MESSAGE_MEM_USAGE], err => { + if (err && rejectCallback) { + this._memoryUsagePromise = undefined; + this._resolveMemoryUsage = undefined; + + rejectCallback(err); + } + }); + + return promise; + } + + return this._memoryUsagePromise; + } + + /** + * Gets updated memory usage and restarts if required + */ + checkMemoryUsage(): void { + if (this._childIdleMemoryUsageLimit) { + this._memoryUsageCheck = true; + this._child.send([CHILD_MESSAGE_MEM_USAGE], err => { + if (err) { + console.error('Unable to check memory usage', err); + } + }); + } else { + console.warn( + 'Memory usage of workers can only be checked if a limit is set', + ); + } + } + private _getFakeStream() { if (!this._fakeStream) { this._fakeStream = new PassThrough(); diff --git a/packages/jest-worker/src/workers/NodeThreadsWorker.ts b/packages/jest-worker/src/workers/NodeThreadsWorker.ts index 9cf59441f810..ad1bb822cd56 100644 --- a/packages/jest-worker/src/workers/NodeThreadsWorker.ts +++ b/packages/jest-worker/src/workers/NodeThreadsWorker.ts @@ -5,22 +5,26 @@ * LICENSE file in the root directory of this source tree. */ +import {totalmem} from 'os'; import {PassThrough} from 'stream'; import {Worker} from 'worker_threads'; import mergeStream = require('merge-stream'); import { CHILD_MESSAGE_INITIALIZE, + CHILD_MESSAGE_MEM_USAGE, ChildMessage, OnCustomMessage, OnEnd, OnStart, PARENT_MESSAGE_CLIENT_ERROR, PARENT_MESSAGE_CUSTOM, + PARENT_MESSAGE_MEM_USAGE, PARENT_MESSAGE_OK, PARENT_MESSAGE_SETUP_ERROR, ParentMessage, WorkerInterface, WorkerOptions, + WorkerStates, } from '../types'; export default class ExperimentalWorker implements WorkerInterface { @@ -38,7 +42,16 @@ export default class ExperimentalWorker implements WorkerInterface { private _exitPromise: Promise; private _resolveExitPromise!: () => void; - private _forceExited: boolean; + + private _memoryUsagePromise: Promise | undefined; + private _resolveMemoryUsage: ((arg0: number) => void) | undefined; + + private _childWorkerPath: string; + + private _childIdleMemoryUsage: number | null; + private _childIdleMemoryUsageLimit: number | null; + private _memoryUsageCheck = false; + private _state: WorkerStates; constructor(options: WorkerOptions) { this._options = options; @@ -49,16 +62,36 @@ export default class ExperimentalWorker implements WorkerInterface { this._stdout = null; this._stderr = null; + this._childWorkerPath = + options.childWorkerPath || require.resolve('./threadChild'); + + this._childIdleMemoryUsage = null; + this._childIdleMemoryUsageLimit = options.idleMemoryLimit || null; + this._exitPromise = new Promise(resolve => { this._resolveExitPromise = resolve; }); - this._forceExited = false; + this._state = WorkerStates.STARTING; this.initialize(); } initialize(): void { - this._worker = new Worker(require.resolve('./threadChild'), { + if ( + this._state === WorkerStates.OUT_OF_MEMORY || + this._state === WorkerStates.SHUTTING_DOWN || + this._state === WorkerStates.SHUT_DOWN + ) { + return; + } + + if (this._worker) { + this._worker.terminate(); + } + + this._state = WorkerStates.STARTING; + + this._worker = new Worker(this._childWorkerPath, { eval: false, resourceLimits: this._options.resourceLimits, stderr: true, @@ -87,8 +120,18 @@ export default class ExperimentalWorker implements WorkerInterface { this._stderr.add(this._worker.stderr); } + // This can be useful for debugging. + if (!(this._options.silent ?? true)) { + this._worker.stdout.setEncoding('utf8'); + // eslint-disable-next-line no-console + this._worker.stdout.on('data', console.log); + this._worker.stderr.setEncoding('utf8'); + this._worker.stderr.on('data', console.error); + } + this._worker.on('message', this._onMessage.bind(this)); this._worker.on('exit', this._onExit.bind(this)); + this._worker.on('error', this._onError.bind(this)); this._worker.postMessage([ CHILD_MESSAGE_INITIALIZE, @@ -126,6 +169,12 @@ export default class ExperimentalWorker implements WorkerInterface { this._resolveExitPromise(); } + private _onError(error: Error) { + if (error.message.includes('heap out of memory')) { + this._state = WorkerStates.OUT_OF_MEMORY; + } + } + private _onMessage(response: ParentMessage) { let error; @@ -155,6 +204,7 @@ export default class ExperimentalWorker implements WorkerInterface { this._onProcessEnd(error, null); break; + case PARENT_MESSAGE_SETUP_ERROR: error = new Error(`Error when calling setup: ${response[2]}`); @@ -164,16 +214,43 @@ export default class ExperimentalWorker implements WorkerInterface { this._onProcessEnd(error, null); break; + case PARENT_MESSAGE_CUSTOM: this._onCustomMessage(response[1]); break; + + case PARENT_MESSAGE_MEM_USAGE: + this._childIdleMemoryUsage = response[1]; + + if (this._resolveMemoryUsage) { + this._resolveMemoryUsage(response[1]); + + this._resolveMemoryUsage = undefined; + this._memoryUsagePromise = undefined; + } + + this._performRestartIfRequired(); + break; + default: throw new TypeError(`Unexpected response from worker: ${response[0]}`); } } private _onExit(exitCode: number) { - if (exitCode !== 0 && !this._forceExited) { + if (exitCode !== 0 && this._state === WorkerStates.OUT_OF_MEMORY) { + this._onProcessEnd( + new Error('Jest worker ran out of memory and crashed'), + null, + ); + + this._shutdown(); + } else if ( + (exitCode !== 0 && + this._state !== WorkerStates.SHUTTING_DOWN && + this._state !== WorkerStates.SHUT_DOWN) || + this._state === WorkerStates.RESTARTING + ) { this.initialize(); if (this._request) { @@ -189,7 +266,7 @@ export default class ExperimentalWorker implements WorkerInterface { } forceExit(): void { - this._forceExited = true; + this._state = WorkerStates.SHUTTING_DOWN; this._worker.terminate(); } @@ -205,6 +282,10 @@ export default class ExperimentalWorker implements WorkerInterface { // while waiting for a new request (timers, unhandled rejections...) this._request = null; + if (this._childIdleMemoryUsageLimit) { + this.checkMemoryUsage(); + } + const res = onProcessEnd?.(...args); // Clean up the reference so related closures can be garbage collected. @@ -233,6 +314,96 @@ export default class ExperimentalWorker implements WorkerInterface { return this._stderr; } + private _performRestartIfRequired(): void { + if (this._memoryUsageCheck) { + this._memoryUsageCheck = false; + + let limit = this._childIdleMemoryUsageLimit; + + // TODO: At some point it would make sense to make use of + // stringToBytes found in jest-config, however as this + // package does not have any dependencies on an other jest + // packages that can wait until some other time. + if (limit && limit > 0 && limit <= 1) { + limit = Math.floor(totalmem() * limit); + } else if (limit) { + limit = Math.floor(limit); + } + + if ( + limit && + this._childIdleMemoryUsage && + this._childIdleMemoryUsage > limit + ) { + this._state = WorkerStates.RESTARTING; + + this._worker.terminate(); + } + } + } + + /** + * Gets the last reported memory usage. + * + * @returns Memory usage in bytes. + */ + getMemoryUsage(): Promise { + if (!this._memoryUsagePromise) { + let rejectCallback!: (err: Error) => void; + + const promise = new Promise((resolve, reject) => { + this._resolveMemoryUsage = resolve; + rejectCallback = reject; + }); + this._memoryUsagePromise = promise; + + if (!this._worker.threadId) { + rejectCallback(new Error('Child process is not running.')); + + this._memoryUsagePromise = undefined; + this._resolveMemoryUsage = undefined; + + return promise; + } + + try { + this._worker.postMessage([CHILD_MESSAGE_MEM_USAGE]); + } catch (err: any) { + this._memoryUsagePromise = undefined; + this._resolveMemoryUsage = undefined; + + rejectCallback(err); + } + + return promise; + } + + return this._memoryUsagePromise; + } + + /** + * Gets updated memory usage and restarts if required + */ + checkMemoryUsage(): void { + if (this._childIdleMemoryUsageLimit) { + this._memoryUsageCheck = true; + this._worker.postMessage([CHILD_MESSAGE_MEM_USAGE]); + } else { + console.warn( + 'Memory usage of workers can only be checked if a limit is set', + ); + } + } + + /** + * Gets the thread id of the worker. + * + * @returns Thread id. + */ + getWorkerSystemId(): number { + return this._worker.threadId; + } + private _getFakeStream() { if (!this._fakeStream) { this._fakeStream = new PassThrough(); diff --git a/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js b/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js index 6bcfcc0ff654..0948ea82be57 100644 --- a/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js +++ b/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js @@ -12,9 +12,12 @@ import supportsColor from 'supports-color'; import { CHILD_MESSAGE_CALL, CHILD_MESSAGE_INITIALIZE, + CHILD_MESSAGE_MEM_USAGE, PARENT_MESSAGE_CLIENT_ERROR, PARENT_MESSAGE_CUSTOM, + PARENT_MESSAGE_MEM_USAGE, PARENT_MESSAGE_OK, + WorkerOptions, } from '../../types'; jest.useFakeTimers(); @@ -23,14 +26,22 @@ let Worker; let forkInterface; let childProcess; let originalExecArgv; +let totalmem; + +beforeAll(() => { + const os = require('os'); + totalmem = jest.spyOn(os, 'totalmem'); +}); beforeEach(() => { jest.mock('child_process'); + originalExecArgv = process.execArgv; childProcess = require('child_process'); childProcess.fork.mockImplementation(() => { forkInterface = Object.assign(new EventEmitter(), { + connected: true, kill: jest.fn(), send: jest.fn(), stderr: new PassThrough(), @@ -40,6 +51,8 @@ beforeEach(() => { return forkInterface; }); + totalmem.mockReset(); + Worker = require('../ChildProcessWorker').default; }); @@ -398,13 +411,33 @@ it('when out of memory occurs the worker is killed and exits', async () => { expect(onProcessEnd).not.toHaveBeenCalled(); expect(onCustomMessage).not.toHaveBeenCalled(); + // Splitting the emit into 2 to check concat is happening. + forkInterface.stderr.emit( + 'data', + `<--- Last few GCs ---> + + [20048:0x7fa356200000] 349 ms: Mark-sweep (reduce) 49.2 (80.6) -> 49.0 (51.6) MB, 6.8 / 0.0 ms (+ 59.5 ms in 35 steps since start of marking, biggest step 2.3 ms, walltime since start of marking 68 ms) (average mu = 0.679, current mu = 0.679) finali[20048:0x7fa356200000] 418 ms: Mark-sweep 50.0 (51.6) -> 49.9 (55.6) MB, 67.8 / 0.0 ms (average mu = 0.512, current mu = 0.004) allocation failure scavenge might not succeed + + + <--- JS stacktrace ---> + + FATAL ERROR: Reached heap limit Allocation failed - JavaScript he`, + ); + + forkInterface.stderr.emit( + 'data', + `ap out of memory + 1: 0x10da153a5 node::Abort() (.cold.1) [/Users/paul/.nvm/versions/node/v16.10.0/bin/node] + 2: 0x10c6f09b9 node::Abort() [/Users/paul/.nvm/versions/node/v16.10.0/bin/node]`, + ); + forkInterface.emit('exit', null, 'SIGABRT'); // We don't want it to try and restart. expect(childProcess.fork).toHaveBeenCalledTimes(1); expect(onProcessEnd).toHaveBeenCalledTimes(1); expect(onProcessEnd).toHaveBeenCalledWith( - new Error('Process exited unexpectedly: SIGABRT'), + new Error('Jest worker ran out of memory and crashed'), null, ); @@ -449,3 +482,159 @@ it('does not send SIGKILL if SIGTERM exited the process', async () => { jest.runAllTimers(); expect(forkInterface.kill.mock.calls).toEqual([['SIGTERM']]); }); + +it('should check for memory limits and not restart if under percentage limit', async () => { + const memoryConfig = { + limit: 0.2, + processHeap: 2500, + totalMem: 16000, + }; + + /** @type WorkerOptions */ + const options = { + forkOptions: {}, + idleMemoryLimit: memoryConfig.limit, + maxRetries: 3, + workerPath: '/tmp/foo', + }; + const worker = new Worker(options); + + const onProcessStart = jest.fn(); + const onProcessEnd = jest.fn(); + const onCustomMessage = jest.fn(); + + worker.send( + [CHILD_MESSAGE_CALL, false, 'foo', []], + onProcessStart, + onProcessEnd, + onCustomMessage, + ); + + // Only onProcessStart has been called + expect(onProcessStart).toHaveBeenCalledTimes(1); + expect(onProcessEnd).not.toHaveBeenCalled(); + expect(onCustomMessage).not.toHaveBeenCalled(); + + // then first call replies... + forkInterface.emit('message', [PARENT_MESSAGE_OK]); + + expect(onProcessEnd).toHaveBeenCalledTimes(1); + + // This is the initalization call. + expect(forkInterface.send.mock.calls[0][0]).toEqual([ + CHILD_MESSAGE_INITIALIZE, + false, + '/tmp/foo', + undefined, + ]); + + // This is the child message + expect(forkInterface.send.mock.calls[1][0]).toEqual([ + CHILD_MESSAGE_CALL, + false, + 'foo', + [], + ]); + + // This is the subsequent call to get memory usage + expect(forkInterface.send.mock.calls[2][0]).toEqual([ + CHILD_MESSAGE_MEM_USAGE, + ]); + + totalmem.mockReturnValue(memoryConfig.totalMem); + + forkInterface.emit('message', [ + PARENT_MESSAGE_MEM_USAGE, + memoryConfig.processHeap, + ]); + + expect(totalmem).toHaveBeenCalledTimes(1); + expect(forkInterface.kill).not.toHaveBeenCalled(); +}); + +it('should check for memory limits and not restart if under absolute limit', async () => { + const memoryConfig = { + limit: 2600, + processHeap: 2500, + totalMem: 16000, + }; + + /** @type WorkerOptions */ + const options = { + forkOptions: {}, + idleMemoryLimit: memoryConfig.limit, + maxRetries: 3, + workerPath: '/tmp/foo', + }; + const worker = new Worker(options); + + worker.checkMemoryUsage(); + + totalmem.mockReturnValue(memoryConfig.totalMem); + + forkInterface.emit('message', [ + PARENT_MESSAGE_MEM_USAGE, + memoryConfig.processHeap, + ]); + + expect(totalmem).not.toHaveBeenCalled(); + expect(forkInterface.kill).not.toHaveBeenCalled(); +}); + +it('should check for memory limits and restart if above percentage limit', async () => { + const memoryConfig = { + limit: 0.01, + processHeap: 2500, + totalMem: 16000, + }; + + /** @type WorkerOptions */ + const options = { + forkOptions: {}, + idleMemoryLimit: memoryConfig.limit, + maxRetries: 3, + workerPath: '/tmp/foo', + }; + const worker = new Worker(options); + + worker.checkMemoryUsage(); + + totalmem.mockReturnValue(memoryConfig.totalMem); + + forkInterface.emit('message', [ + PARENT_MESSAGE_MEM_USAGE, + memoryConfig.processHeap, + ]); + + expect(totalmem).toHaveBeenCalledTimes(1); + expect(forkInterface.kill).toHaveBeenCalledTimes(1); +}); + +it('should check for memory limits and restart if above absolute limit', async () => { + const memoryConfig = { + limit: 2000, + processHeap: 2500, + totalMem: 16000, + }; + + /** @type WorkerOptions */ + const options = { + forkOptions: {}, + idleMemoryLimit: memoryConfig.limit, + maxRetries: 3, + workerPath: '/tmp/foo', + }; + const worker = new Worker(options); + + worker.checkMemoryUsage(); + + totalmem.mockReturnValue(memoryConfig.totalMem); + + forkInterface.emit('message', [ + PARENT_MESSAGE_MEM_USAGE, + memoryConfig.processHeap, + ]); + + expect(totalmem).not.toHaveBeenCalled(); + expect(forkInterface.kill).toHaveBeenCalledTimes(1); +}); diff --git a/packages/jest-worker/src/workers/__tests__/WorkerEdgeCases.test.js b/packages/jest-worker/src/workers/__tests__/WorkerEdgeCases.test.js new file mode 100644 index 000000000000..8005048f48f4 --- /dev/null +++ b/packages/jest-worker/src/workers/__tests__/WorkerEdgeCases.test.js @@ -0,0 +1,279 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import {access, mkdir, rm, writeFile} from 'fs/promises'; +import {dirname, join} from 'path'; +import {transformFileAsync} from '@babel/core'; +import { + CHILD_MESSAGE_CALL, + CHILD_MESSAGE_MEM_USAGE, + WorkerInterface, + WorkerOptions, +} from '../../types'; +import ChildProcessWorker from '../ChildProcessWorker'; +import ThreadsWorker from '../NodeThreadsWorker'; + +// These tests appear to be slow/flaky. Allowing it to retry quite a few times +// will cut down on this noise and they're fast tests anyway. +jest.retryTimes(30); + +const root = join('../../'); +const filesToBuild = ['workers/processChild', 'workers/threadChild', 'types']; +const writeDestination = join(__dirname, '__temp__'); +const processChildWorkerPath = join( + writeDestination, + 'workers/processChild.js', +); +const threadChildWorkerPath = join(writeDestination, 'workers/threadChild.js'); + +beforeAll(async () => { + await mkdir(writeDestination, {recursive: true}); + + for (const file of filesToBuild) { + const sourcePath = join(__dirname, root, `${file}.ts`); + const writePath = join(writeDestination, `${file}.js`); + + await mkdir(dirname(writePath), {recursive: true}); + + const result = await transformFileAsync(sourcePath); + + await writeFile(writePath, result.code, { + encoding: 'utf-8', + }); + } +}); + +afterAll(async () => { + await rm(writeDestination, {force: true, recursive: true}); +}); + +test.each(filesToBuild)('%s.js should exist', async file => { + const path = join(writeDestination, `${file}.js`); + + await expect(async () => await access(path)).not.toThrowError(); +}); + +describe.each([ + { + name: 'ProcessWorker', + workerClass: ChildProcessWorker, + workerPath: processChildWorkerPath, + }, + { + name: 'ThreadWorker', + workerClass: ThreadsWorker, + workerPath: threadChildWorkerPath, + }, +])('$name', ({workerClass, workerPath}) => { + /** @type WorkerInterface */ + let worker; + let int; + + afterEach(async () => { + if (worker) { + worker.forceExit(); + await worker.waitForExit(); + } + + clearInterval(int); + }); + + function waitForChange(fn) { + const inital = fn(); + + return new Promise((resolve, reject) => { + let count = 0; + + int = setInterval(() => { + const updated = fn(); + + if (inital !== updated) { + resolve(updated); + clearInterval(int); + } + + if (count > 100000) { + reject(new Error('Timeout waiting for change')); + } + + count++; + }, 1); + }); + } + + test('should get memory usage', async () => { + worker = new workerClass({ + childWorkerPath: workerPath, + maxRetries: 0, + workerPath: join(__dirname, '__fixtures__', 'EdgeCasesWorker'), + }); + + const memoryUsagePromise = worker.getMemoryUsage(); + expect(memoryUsagePromise).toBeInstanceOf(Promise); + + expect(await memoryUsagePromise).toBeGreaterThan(0); + }); + + test('should recycle on idle limit breach', async () => { + worker = new workerClass({ + childWorkerPath: workerPath, + // There is no way this is fitting into 1000 bytes, so it should restart + // after requesting a memory usage update + idleMemoryLimit: 1000, + maxRetries: 0, + workerPath: join(__dirname, '__fixtures__', 'EdgeCasesWorker'), + }); + + const startSystemId = worker.getWorkerSystemId(); + expect(startSystemId).toBeGreaterThanOrEqual(0); + + worker.checkMemoryUsage(); + + await waitForChange(() => worker.getWorkerSystemId()); + + const systemId = worker.getWorkerSystemId(); + expect(systemId).toBeGreaterThanOrEqual(0); + expect(systemId).not.toEqual(startSystemId); + }); + + test('should automatically recycle on idle limit breach', async () => { + worker = new workerClass({ + childWorkerPath: workerPath, + // There is no way this is fitting into 1000 bytes, so it should restart + // after requesting a memory usage update + idleMemoryLimit: 1000, + maxRetries: 0, + workerPath: join(__dirname, '__fixtures__', 'EdgeCasesWorker'), + }); + + const startPid = worker.getWorkerSystemId(); + expect(startPid).toBeGreaterThanOrEqual(0); + + const onStart = jest.fn(); + const onEnd = jest.fn(); + const onCustom = jest.fn(); + + worker.send( + [CHILD_MESSAGE_CALL, true, 'safeFunction', []], + onStart, + onEnd, + onCustom, + ); + + await waitForChange(() => worker.getWorkerSystemId()); + + const endPid = worker.getWorkerSystemId(); + expect(endPid).toBeGreaterThanOrEqual(0); + expect(endPid).not.toEqual(startPid); + }, 10000); + + test('should cleanly exit on crash', async () => { + const workerHeapLimit = 10; + + /** @type WorkerOptions */ + const options = { + childWorkerPath: workerPath, + maxRetries: 0, + silent: true, + workerPath: join(__dirname, '__fixtures__', 'EdgeCasesWorker'), + }; + + if (workerClass === ThreadsWorker) { + options.resourceLimits = { + codeRangeSizeMb: workerHeapLimit * 2, + maxOldGenerationSizeMb: workerHeapLimit, + maxYoungGenerationSizeMb: workerHeapLimit * 2, + stackSizeMb: workerHeapLimit * 2, + }; + } else if (workerClass === ChildProcessWorker) { + options.forkOptions = { + // Forcibly set the heap limit so we can crash the process easily. + execArgv: [`--max-old-space-size=${workerHeapLimit}`], + }; + } + + worker = new workerClass(options); + + const pid = worker.getWorkerSystemId(); + expect(pid).toBeGreaterThanOrEqual(0); + + const onStart = jest.fn(); + const onEnd = jest.fn(); + const onCustom = jest.fn(); + + worker.send( + [CHILD_MESSAGE_CALL, true, 'leakMemory', []], + onStart, + onEnd, + onCustom, + ); + + await worker.waitForExit(); + }, 15000); + + test('should handle regular fatal crashes', async () => { + worker = new workerClass({ + childWorkerPath: workerPath, + maxRetries: 4, + workerPath: join(__dirname, '__fixtures__', 'EdgeCasesWorker'), + }); + + const startPid = worker.getWorkerSystemId(); + expect(startPid).toBeGreaterThanOrEqual(0); + + const onStart = jest.fn(); + const onEnd = jest.fn(); + const onCustom = jest.fn(); + + worker.send( + [CHILD_MESSAGE_CALL, true, 'fatalExitCode', []], + onStart, + onEnd, + onCustom, + ); + + let pidChanges = 0; + + while (true) { + // Ideally this would use Promise.any but it's not supported in Node 14 + // so doing this instead. Essentially what we're doing is looping and + // capturing the pid every time it changes. When it stops changing the + // timeout will be hit and we should be left with a collection of all + // the pids used by the worker. + const newPid = await new Promise(resolve => { + const resolved = false; + + const to = setTimeout(() => { + if (!resolved) { + this.resolved = true; + resolve(undefined); + } + }, 500); + + waitForChange(() => worker.getWorkerSystemId()).then(() => { + clearTimeout(to); + + if (!resolved) { + resolve(worker.getWorkerSystemId()); + } + }); + }); + + if (typeof newPid === 'number') { + pidChanges++; + } else { + break; + } + } + + // Expect the pids to be retries + 1 because it is restarted + // one last time at the end ready for the next request. + expect(pidChanges).toEqual(5); + + worker.forceExit(); + }); +}); diff --git a/packages/jest-worker/src/workers/__tests__/__fixtures__/EdgeCasesWorker.js b/packages/jest-worker/src/workers/__tests__/__fixtures__/EdgeCasesWorker.js new file mode 100644 index 000000000000..482fd7c33508 --- /dev/null +++ b/packages/jest-worker/src/workers/__tests__/__fixtures__/EdgeCasesWorker.js @@ -0,0 +1,42 @@ +/** + * Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +const leakStore = []; + +/** + * This exists to force a memory leak in the worker tests. + */ +async function leakMemory() { + console.log( + `Intentionally leaking memory: ${( + process.memoryUsage().heapUsed / + 1024 / + 1024 + ).toFixed(2)}MB at start`, + ); + + let i = 0; + while (true) { + i++; + + leakStore.push(i); + } +} + +function fatalExitCode() { + process.exit(134); +} + +function safeFunction() { + // Doesn't do anything. +} + +module.exports = { + fatalExitCode, + leakMemory, + safeFunction, +}; diff --git a/packages/jest-worker/src/workers/__tests__/processChild.test.js b/packages/jest-worker/src/workers/__tests__/processChild.test.js index 55b29107bdf7..b1520765f1d5 100644 --- a/packages/jest-worker/src/workers/__tests__/processChild.test.js +++ b/packages/jest-worker/src/workers/__tests__/processChild.test.js @@ -18,7 +18,9 @@ import { CHILD_MESSAGE_CALL, CHILD_MESSAGE_END, CHILD_MESSAGE_INITIALIZE, + CHILD_MESSAGE_MEM_USAGE, PARENT_MESSAGE_CLIENT_ERROR, + PARENT_MESSAGE_MEM_USAGE, PARENT_MESSAGE_OK, } from '../../types'; @@ -141,6 +143,19 @@ it('lazily requires the file', () => { expect(initializeParm).toBe(undefined); }); +it('should return memory usage', () => { + process.send = jest.fn(); + + expect(mockCount).toBe(0); + + process.emit('message', [CHILD_MESSAGE_MEM_USAGE]); + + expect(process.send.mock.calls[0][0]).toEqual([ + PARENT_MESSAGE_MEM_USAGE, + expect.any(Number), + ]); +}); + it('calls initialize with the correct arguments', () => { expect(mockCount).toBe(0); diff --git a/packages/jest-worker/src/workers/processChild.ts b/packages/jest-worker/src/workers/processChild.ts index dda7938aa415..3302843cf2f4 100644 --- a/packages/jest-worker/src/workers/processChild.ts +++ b/packages/jest-worker/src/workers/processChild.ts @@ -9,12 +9,15 @@ import { CHILD_MESSAGE_CALL, CHILD_MESSAGE_END, CHILD_MESSAGE_INITIALIZE, + CHILD_MESSAGE_MEM_USAGE, ChildMessageCall, ChildMessageInitialize, PARENT_MESSAGE_CLIENT_ERROR, PARENT_MESSAGE_ERROR, + PARENT_MESSAGE_MEM_USAGE, PARENT_MESSAGE_OK, PARENT_MESSAGE_SETUP_ERROR, + ParentMessageMemUsage, } from '../types'; type UnknownFunction = (...args: Array) => unknown | Promise; @@ -53,6 +56,10 @@ const messageListener: NodeJS.MessageListener = (request: any) => { end(); break; + case CHILD_MESSAGE_MEM_USAGE: + reportMemoryUsage(); + break; + default: throw new TypeError( `Unexpected request from parent process: ${request[0]}`, @@ -77,6 +84,19 @@ function reportInitializeError(error: Error) { return reportError(error, PARENT_MESSAGE_SETUP_ERROR); } +function reportMemoryUsage() { + if (!process || !process.send) { + throw new Error('Child can only be used on a forked process'); + } + + const msg: ParentMessageMemUsage = [ + PARENT_MESSAGE_MEM_USAGE, + process.memoryUsage().heapUsed, + ]; + + process.send(msg); +} + function reportError(error: Error, type: PARENT_MESSAGE_ERROR) { if (!process || !process.send) { throw new Error('Child can only be used on a forked process'); diff --git a/packages/jest-worker/src/workers/threadChild.ts b/packages/jest-worker/src/workers/threadChild.ts index f0c41563cf15..8abfbde77382 100644 --- a/packages/jest-worker/src/workers/threadChild.ts +++ b/packages/jest-worker/src/workers/threadChild.ts @@ -10,12 +10,15 @@ import { CHILD_MESSAGE_CALL, CHILD_MESSAGE_END, CHILD_MESSAGE_INITIALIZE, + CHILD_MESSAGE_MEM_USAGE, ChildMessageCall, ChildMessageInitialize, PARENT_MESSAGE_CLIENT_ERROR, PARENT_MESSAGE_ERROR, + PARENT_MESSAGE_MEM_USAGE, PARENT_MESSAGE_OK, PARENT_MESSAGE_SETUP_ERROR, + ParentMessageMemUsage, } from '../types'; type UnknownFunction = (...args: Array) => unknown | Promise; @@ -55,6 +58,10 @@ const messageListener = (request: any) => { end(); break; + case CHILD_MESSAGE_MEM_USAGE: + reportMemoryUsage(); + break; + default: throw new TypeError( `Unexpected request from parent process: ${request[0]}`, @@ -63,6 +70,19 @@ const messageListener = (request: any) => { }; parentPort!.on('message', messageListener); +function reportMemoryUsage() { + if (isMainThread) { + throw new Error('Child can only be used on a forked process'); + } + + const msg: ParentMessageMemUsage = [ + PARENT_MESSAGE_MEM_USAGE, + process.memoryUsage().heapUsed, + ]; + + parentPort!.postMessage(msg); +} + function reportSuccess(result: unknown) { if (isMainThread) { throw new Error('Child can only be used on a forked process');