From 652dc3daabb22e39a355fbf9dbb9830d2c90975e Mon Sep 17 00:00:00 2001 From: AriPerkkio Date: Wed, 16 Aug 2023 19:49:55 +0300 Subject: [PATCH] feat: support multiple parallel `child_process` --- .github/workflows/ci.yml | 2 +- package.json | 2 +- packages/vitest/package.json | 2 +- packages/vitest/src/node/pools/child.ts | 252 ++++++++++++++++-------- packages/vitest/src/runtime/child.ts | 33 ++-- packages/vitest/src/types/child.ts | 2 +- pnpm-lock.yaml | 8 +- test/bail/test/bail.test.ts | 2 +- test/setup/tests/setup-files.test.ts | 2 +- 9 files changed, 197 insertions(+), 108 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 57e8b3852476c..f868585ebd9d4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -82,7 +82,7 @@ jobs: run: pnpm run test:ci - name: Test Single Thread - run: pnpm run test:ci:single-thread + run: pnpm run test:ci:no-threads - name: Test Vm Threads run: pnpm run test:ci:vm-threads diff --git a/package.json b/package.json index 44aa2546625ef..905b49541530f 100644 --- a/package.json +++ b/package.json @@ -24,7 +24,7 @@ "test:all": "CI=true pnpm -r --stream run test --allowOnly", "test:ci": "CI=true pnpm -r --stream --filter !test-fails --filter !test-browser --filter !test-esm --filter !test-browser run test --allowOnly", "test:ci:vm-threads": "CI=true pnpm -r --stream --filter !test-fails --filter !test-single-thread --filter !test-browser --filter !test-esm --filter !test-browser run test --allowOnly --experimental-vm-threads", - "test:ci:single-thread": "CI=true pnpm -r --stream --filter !test-fails --filter !test-coverage --filter !test-watch --filter !test-bail --filter !test-esm --filter !test-browser run test --allowOnly --no-threads", + "test:ci:no-threads": "CI=true pnpm -r --stream --filter !test-fails --filter !test-coverage --filter !test-watch --filter !test-bail --filter !test-esm --filter !test-browser run test --allowOnly --no-threads", "typecheck": "tsc --noEmit", "typecheck:why": "tsc --noEmit --explainFiles > explainTypes.txt", "ui:build": "vite build packages/ui", diff --git a/packages/vitest/package.json b/packages/vitest/package.json index 747cf9e2ab5c1..5c2f8831fe379 100644 --- a/packages/vitest/package.json +++ b/packages/vitest/package.json @@ -158,7 +158,7 @@ "std-env": "^3.3.3", "strip-literal": "^1.0.1", "tinybench": "^2.5.0", - "tinypool": "^0.7.0", + "tinypool": "^0.8.0", "vite": "^3.0.0 || ^4.0.0", "vite-node": "workspace:*", "why-is-node-running": "^2.2.2" diff --git a/packages/vitest/src/node/pools/child.ts b/packages/vitest/src/node/pools/child.ts index a22e91fb3c124..01e49f2b72827 100644 --- a/packages/vitest/src/node/pools/child.ts +++ b/packages/vitest/src/node/pools/child.ts @@ -1,21 +1,32 @@ import v8 from 'node:v8' -import type { ChildProcess } from 'node:child_process' -import { fork } from 'node:child_process' import { fileURLToPath, pathToFileURL } from 'node:url' +import { cpus } from 'node:os' +import EventEmitter from 'node:events' +import { Tinypool } from 'tinypool' +import type { TinypoolChannel, Options as TinypoolOptions } from 'tinypool' import { createBirpc } from 'birpc' import { resolve } from 'pathe' import type { ContextTestEnvironment, ResolvedConfig, RunnerRPC, RuntimeRPC, Vitest } from '../../types' import type { ChildContext } from '../../types/child' -import type { PoolProcessOptions, ProcessPool, WorkspaceSpec } from '../pool' +import type { PoolProcessOptions, ProcessPool, RunWithFiles } from '../pool' import { distDir } from '../../paths' -import { groupBy } from '../../utils/base' -import { envsOrder, groupFilesByEnv } from '../../utils/test-helpers' import type { WorkspaceProject } from '../workspace' +import { envsOrder, groupFilesByEnv } from '../../utils/test-helpers' +import { groupBy } from '../../utils' import { createMethodsRPC } from './rpc' const childPath = fileURLToPath(pathToFileURL(resolve(distDir, './child.js')).href) -function setupChildProcessChannel(project: WorkspaceProject, fork: ChildProcess): void { +function createChildProcessChannel(project: WorkspaceProject) { + const emitter = new EventEmitter() + const cleanup = () => emitter.removeAllListeners() + + const events = { message: 'message', response: 'response' } + const channel: TinypoolChannel = { + onMessage: callback => emitter.on(events.message, callback), + postMessage: message => emitter.emit(events.response, message), + } + const rpc = createBirpc( createMethodsRPC(project), { @@ -23,15 +34,17 @@ function setupChildProcessChannel(project: WorkspaceProject, fork: ChildProcess) serialize: v8.serialize, deserialize: v => v8.deserialize(Buffer.from(v)), post(v) { - fork.send(v) + emitter.emit(events.message, v) }, on(fn) { - fork.on('message', fn) + emitter.on(events.response, fn) }, }, ) project.ctx.onCancel(reason => rpc.onCancel(reason)) + + return { channel, cleanup } } function stringifyRegex(input: RegExp | string): string { @@ -40,101 +53,180 @@ function stringifyRegex(input: RegExp | string): string { return `$$vitest:${input.toString()}` } -function getTestConfig(ctx: WorkspaceProject): ResolvedConfig { - const config = ctx.getSerializableConfig() - // v8 serialize does not support regex - return { - ...config, - testNamePattern: config.testNamePattern - ? stringifyRegex(config.testNamePattern) - : undefined, +export function createChildProcessPool(ctx: Vitest, { execArgv, env }: PoolProcessOptions): ProcessPool { + const threadsCount = ctx.config.watch + ? Math.max(Math.floor(cpus().length / 2), 1) + : Math.max(cpus().length - 1, 1) + + const maxThreads = ctx.config.maxThreads ?? threadsCount + const minThreads = ctx.config.minThreads ?? threadsCount + + const options: TinypoolOptions = { + runtime: 'child_process', + filename: childPath, + + maxThreads, + minThreads, + + env, + execArgv, + + terminateTimeout: ctx.config.teardownTimeout, } -} -export function createChildProcessPool(ctx: Vitest, { execArgv, env }: PoolProcessOptions): ProcessPool { - const children = new Set() + if (ctx.config.isolate) { + options.isolateWorkers = true + options.concurrentTasksPerWorker = 1 + } - const Sequencer = ctx.config.sequence.sequencer - const sequencer = new Sequencer(ctx) + if (ctx.config.singleThread) { + options.concurrentTasksPerWorker = 1 + options.maxThreads = 1 + options.minThreads = 1 + } - function runFiles(project: WorkspaceProject, files: string[], environment: ContextTestEnvironment, invalidates: string[] = []) { - const config = getTestConfig(project) - ctx.state.clearFiles(project, files) + const pool = new Tinypool(options) + + const runWithFiles = (name: string): RunWithFiles => { + let id = 0 + + async function runFiles(project: WorkspaceProject, config: ResolvedConfig, files: string[], environment: ContextTestEnvironment, invalidates: string[] = []) { + ctx.state.clearFiles(project, files) + const { channel, cleanup } = createChildProcessChannel(project) + const workerId = ++id + const data: ChildContext = { + config, + files, + invalidates, + environment, + workerId, + } + try { + await pool.run(data, { name, channel }) + } + catch (error) { + // Worker got stuck and won't terminate - this may cause process to hang + if (error instanceof Error && /Failed to terminate worker/.test(error.message)) + ctx.state.addProcessTimeoutCause(`Failed to terminate worker while running ${files.join(', ')}.`) - const data: ChildContext = { - command: 'start', - config, - files, - invalidates, - environment, - } + // Intentionally cancelled + else if (ctx.isCancelling && error instanceof Error && /The task has been cancelled/.test(error.message)) + ctx.state.cancelFiles(files, ctx.config.root) - const child = fork(childPath, [], { - execArgv, - env, - // TODO: investigate - // serialization: 'advanced', - }) - children.add(child) - setupChildProcessChannel(project, child) - - return new Promise((resolve, reject) => { - child.send(data, (err) => { - if (err) - reject(err) - }) - child.on('close', (code) => { - if (!code) - resolve() else - reject(new Error(`Child process exited unexpectedly with code ${code}`)) + throw error + } + finally { + cleanup() + } + } - children.delete(child) - }) - }) - } + const Sequencer = ctx.config.sequence.sequencer + const sequencer = new Sequencer(ctx) + + return async (specs, invalidates) => { + // Cancel pending tasks from pool when possible + ctx.onCancel(() => pool.cancelPendingTasks()) - async function runTests(specs: WorkspaceSpec[], invalidates: string[] = []): Promise { - const { shard } = ctx.config + const configs = new Map() + const getConfig = (project: WorkspaceProject): ResolvedConfig => { + if (configs.has(project)) + return configs.get(project)! - if (shard) - specs = await sequencer.shard(specs) + const _config = project.getSerializableConfig() + + const config = { + ..._config, + // v8 serialize does not support regex + testNamePattern: _config.testNamePattern + ? stringifyRegex(_config.testNamePattern) + : undefined, + } as ResolvedConfig + + configs.set(project, config) + return config + } + + const workspaceMap = new Map() + for (const [project, file] of specs) { + const workspaceFiles = workspaceMap.get(file) ?? [] + workspaceFiles.push(project) + workspaceMap.set(file, workspaceFiles) + } - specs = await sequencer.sort(specs) + // it's possible that project defines a file that is also defined by another project + const { shard } = ctx.config + + if (shard) + specs = await sequencer.shard(specs) + + specs = await sequencer.sort(specs) + + // TODO: What to do about singleThread flag? + const singleThreads = specs.filter(([project]) => project.config.singleThread) + const multipleThreads = specs.filter(([project]) => !project.config.singleThread) + + if (multipleThreads.length) { + const filesByEnv = await groupFilesByEnv(multipleThreads) + const files = Object.values(filesByEnv).flat() + const results: PromiseSettledResult[] = [] + + if (ctx.config.isolate) { + results.push(...await Promise.allSettled(files.map(({ file, environment, project }) => + runFiles(project, getConfig(project), [file], environment, invalidates)))) + } + else { + // When isolation is disabled, we still need to isolate environments and workspace projects from each other. + // Tasks are still running parallel but environments are isolated between tasks. + const grouped = groupBy(files, ({ project, environment }) => project.getName() + environment.name + JSON.stringify(environment.options)) + + for (const group of Object.values(grouped)) { + // Push all files to pool's queue + results.push(...await Promise.allSettled(group.map(({ file, environment, project }) => + runFiles(project, getConfig(project), [file], environment, invalidates)))) + + // Once all tasks are running or finished, recycle worker for isolation. + // On-going workers will run in the previous environment. + await new Promise(resolve => pool.queueSize === 0 ? resolve() : pool.once('drain', resolve)) + await pool.recycleWorkers() + } + } + + const errors = results.filter((r): r is PromiseRejectedResult => r.status === 'rejected').map(r => r.reason) + if (errors.length > 0) + throw new AggregateError(errors, 'Errors occurred while running tests. For more information, see serialized error.') + } - const filesByEnv = await groupFilesByEnv(specs) - const envs = envsOrder.concat( - Object.keys(filesByEnv).filter(env => !envsOrder.includes(env)), - ) + if (singleThreads.length) { + const filesByEnv = await groupFilesByEnv(singleThreads) + const envs = envsOrder.concat( + Object.keys(filesByEnv).filter(env => !envsOrder.includes(env)), + ) - // always run environments isolated between each other - for (const env of envs) { - const files = filesByEnv[env] + for (const env of envs) { + const files = filesByEnv[env] - if (!files?.length) - continue + if (!files?.length) + continue - const filesByOptions = groupBy(files, ({ project, environment }) => project.getName() + JSON.stringify(environment.options) + environment.transformMode) + const filesByOptions = groupBy(files, ({ project, environment }) => project.getName() + JSON.stringify(environment.options)) - for (const option in filesByOptions) { - const files = filesByOptions[option] + for (const files of Object.values(filesByOptions)) { + // Always run environments isolated between each other + await pool.recycleWorkers() - if (files?.length) { - const filenames = files.map(f => f.file) - await runFiles(files[0].project, filenames, files[0].environment, invalidates) + const filenames = files.map(f => f.file) + await runFiles(files[0].project, getConfig(files[0].project), filenames, files[0].environment, invalidates) + } } } } } return { - runTests, - async close() { - children.forEach((child) => { - if (!child.killed) - child.kill() - }) - children.clear() + runTests: runWithFiles('run'), + close: async () => { + await pool.destroy() }, } } diff --git a/packages/vitest/src/runtime/child.ts b/packages/vitest/src/runtime/child.ts index 5bdf6cd984ad7..a751240833957 100644 --- a/packages/vitest/src/runtime/child.ts +++ b/packages/vitest/src/runtime/child.ts @@ -2,6 +2,8 @@ import { performance } from 'node:perf_hooks' import v8 from 'node:v8' import { createBirpc } from 'birpc' import { parseRegexp } from '@vitest/utils' +import { workerId as poolId } from 'tinypool' +import type { TinypoolWorkerMessage } from 'tinypool' import type { CancelReason } from '@vitest/runner' import type { ResolvedConfig, WorkerGlobalState } from '../types' import type { RunnerRPC, RuntimeRPC } from '../types/rpc' @@ -12,10 +14,10 @@ import { createSafeRpc, rpcDone } from './rpc' import { setupInspect } from './inspector' async function init(ctx: ChildContext) { - const { config } = ctx + const { config, workerId } = ctx - process.env.VITEST_WORKER_ID = '1' - process.env.VITEST_POOL_ID = '1' + process.env.VITEST_WORKER_ID = String(workerId) + process.env.VITEST_POOL_ID = String(poolId) let setCancel = (_reason: CancelReason) => {} const onCancel = new Promise((resolve) => { @@ -33,7 +35,15 @@ async function init(ctx: ChildContext) { post(v) { process.send?.(v) }, - on(fn) { process.on('message', fn) }, + on(fn) { + process.on('message', (message: any, ...extras: any) => { + // Do not react on Tinypool's internal messaging + if ((message as TinypoolWorkerMessage)?.__tinypool_worker_message__) + return + + return fn(message, ...extras) + }) + }, }, ) @@ -83,6 +93,7 @@ function unwrapConfig(config: ResolvedConfig) { } export async function run(ctx: ChildContext) { + ctx.config = unwrapConfig(ctx.config) const inspectorCleanup = setupInspect(ctx.config) try { @@ -97,17 +108,3 @@ export async function run(ctx: ChildContext) { inspectorCleanup() } } - -const procesExit = process.exit - -process.on('message', async (message: any) => { - if (typeof message === 'object' && message.command === 'start') { - try { - message.config = unwrapConfig(message.config) - await run(message) - } - finally { - procesExit() - } - } -}) diff --git a/packages/vitest/src/types/child.ts b/packages/vitest/src/types/child.ts index 023e1dc77a96c..722912fce5daa 100644 --- a/packages/vitest/src/types/child.ts +++ b/packages/vitest/src/types/child.ts @@ -1,5 +1,5 @@ import type { ContextRPC } from './rpc' export interface ChildContext extends ContextRPC { - command: 'start' + workerId: number } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1d221ddaa436e..d377547207206 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1351,8 +1351,8 @@ importers: specifier: ^2.5.0 version: 2.5.0 tinypool: - specifier: ^0.7.0 - version: 0.7.0 + specifier: ^0.8.0 + version: 0.8.0 vite: specifier: ^4.3.9 version: 4.3.9(@types/node@18.7.13) @@ -24003,8 +24003,8 @@ packages: resolution: {integrity: sha512-kRwSG8Zx4tjF9ZiyH4bhaebu+EDz1BOx9hOigYHlUW4xxI/wKIUQUqo018UlU4ar6ATPBsaMrdbKZ+tmPdohFA==} dev: false - /tinypool@0.7.0: - resolution: {integrity: sha512-zSYNUlYSMhJ6Zdou4cJwo/p7w5nmAH17GRfU/ui3ctvjXFErXXkruT4MWW6poDeXgCaIBlGLrfU6TbTXxyGMww==} + /tinypool@0.8.0: + resolution: {integrity: sha512-BkMhpw8M8y9+XBOEP57Wzbw/8IoJYtL1OvFjX+88IvwzAqVhwEV2TID0lZ1l4b5dPhjzSFQrhWdD2CLWt+oXRw==} engines: {node: '>=14.0.0'} dev: false diff --git a/test/bail/test/bail.test.ts b/test/bail/test/bail.test.ts index ddd8197d9728d..5dce88cc6c7fc 100644 --- a/test/bail/test/bail.test.ts +++ b/test/bail/test/bail.test.ts @@ -15,7 +15,7 @@ for (const isolate of [true, false]) { for (const config of configs) { test(`should bail with "${JSON.stringify(config)}"`, async () => { - process.env.THREADS = config?.threads ? 'true' : 'false' + process.env.THREADS = config?.singleThread ? 'false' : 'true' const { exitCode, stdout } = await runVitest({ root: './fixtures', diff --git a/test/setup/tests/setup-files.test.ts b/test/setup/tests/setup-files.test.ts index 4146e2b9b44ca..d843ea1e65e8c 100644 --- a/test/setup/tests/setup-files.test.ts +++ b/test/setup/tests/setup-files.test.ts @@ -19,7 +19,7 @@ describe('setup files with forceRerunTrigger', () => { }) // Note that this test will fail locally if you have uncommitted changes - it('should run no tests if setup file is not changed', async () => { + it.skipIf(!process.env.GITHUB_ACTION)('should run no tests if setup file is not changed', async () => { const { stdout } = await run() expect(stdout).toContain('No test files found, exiting with code 0') }, 60_000)