Skip to content

Commit

Permalink
feat(rosetta): improve translation throughput (#3083)
Browse files Browse the repository at this point in the history
Previously, Rosetta would divide all the examples to translate into `N` equally
sized arrays, and spawn `N` workers to translate them all.

Experimentation shows that the time required to translate samples is very
unequally divided, and many workers used to be idle for half of the time after
having finished their `1/Nth` of the samples, hurting throughput.

Switch to a model where we have `N` workers, and we constantly feed them a
small amount of work until all the work is done. This keeps all workers
busy until the work is complete, improving the throughput a lot.

On my machine, improves a run of Rosetta on the CDK repository
with 8 workers from ~30m to ~15m.



---

By submitting this pull request, I confirm that my contribution is made under the terms of the [Apache 2.0 license].

[Apache 2.0 license]: https://www.apache.org/licenses/LICENSE-2.0
  • Loading branch information
rix0rrr authored Oct 27, 2021
1 parent d87c4a8 commit 919d895
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 94 deletions.
9 changes: 8 additions & 1 deletion packages/jsii-rosetta/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,13 @@ terminate the option list by passing `--`).
Since TypeScript compilation takes a lot of time, much time can be gained by using the CPUs in your system effectively.
`jsii-rosetta extract` will run the compilations in parallel if support for NodeJS Worker Threads is detected.

If worker thread support is available, `jsii-rosetta` will use a number of workers equal to half the number of CPU cores,
`jsii-rosetta` will use a number of workers equal to half the number of CPU cores,
up to a maximum of 16 workers. This default maximum can be overridden by setting the `JSII_ROSETTA_MAX_WORKER_COUNT`
environment variable.

If you get out of memory errors running too many workers, run a command like this to up the memory allowed for your workers:

```
$ /sbin/sysctl -w vm.max_map_count=2251954
```

109 changes: 49 additions & 60 deletions packages/jsii-rosetta/lib/commands/extract.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import * as os from 'os';
import * as path from 'path';
import * as ts from 'typescript';
import * as v8 from 'v8';
import * as workerpool from 'workerpool';

import { loadAssemblies, allTypeScriptSnippets } from '../jsii/assemblies';
import * as logging from '../logging';
import { TypeScriptSnippet } from '../snippet';
import { snippetKey } from '../tablets/key';
import { LanguageTablet, TranslatedSnippet } from '../tablets/tablets';
import { RosettaDiagnostic, Translator, rosettaDiagFromTypescript } from '../translate';
import { divideEvenly } from '../util';
import type { TranslateBatchRequest, TranslateBatchResponse } from './extract_worker';

export interface ExtractResult {
diagnostics: RosettaDiagnostic[];
Expand Down Expand Up @@ -81,24 +81,13 @@ function* filterSnippets(ts: IterableIterator<TypeScriptSnippet>, includeIds: st
/**
* Translate all snippets
*
* Uses a worker-based parallel translation if available, falling back to a single-threaded workflow if not.
* We are now always using workers, as we are targeting Node 12+.
*/
async function translateAll(
snippets: IterableIterator<TypeScriptSnippet>,
includeCompilerDiagnostics: boolean,
): Promise<TranslateAllResult> {
try {
const worker = await import('worker_threads');

return await workerBasedTranslateAll(worker, snippets, includeCompilerDiagnostics);
} catch (e) {
if (e.code !== 'MODULE_NOT_FOUND') {
throw e;
}
logging.warn('Worker threads not available (use NodeJS >= 10.5 and --experimental-worker). Working sequentially.');

return singleThreadedTranslateAll(snippets, includeCompilerDiagnostics);
}
return workerBasedTranslateAll(snippets, includeCompilerDiagnostics);
}

/**
Expand Down Expand Up @@ -140,65 +129,65 @@ export function singleThreadedTranslateAll(
/**
* Divide the work evenly over all processors by running 'extract_worker' in Worker Threads, then combine results
*
* The workers are fed small queues of work each. We used to divide the entire queue into N
* but since the work is divided unevenly that led to some workers stopping early, idling while
* waiting for more work.
*
* Never include 'extract_worker' directly, only do TypeScript type references (so that in
* the script we may assume that 'worker_threads' successfully imports).
*/
async function workerBasedTranslateAll(
worker: typeof import('worker_threads'),
snippets: IterableIterator<TypeScriptSnippet>,
includeCompilerDiagnostics: boolean,
): Promise<TranslateAllResult> {
// Use about half the advertised cores because hyperthreading doesn't seem to help that
// much (on my machine, using more than half the cores actually makes it slower).
// Use about half the advertised cores because hyperthreading doesn't seem to
// help that much, or we become I/O-bound at some point. On my machine, using
// more than half the cores actually makes it slower.
// Cap to a reasonable top-level limit to prevent thrash on machines with many, many cores.
const maxWorkers = parseInt(process.env.JSII_ROSETTA_MAX_WORKER_COUNT ?? '16');
const N = Math.min(maxWorkers, Math.max(1, Math.ceil(os.cpus().length / 2)));
const snippetArr = Array.from(snippets);
const groups = divideEvenly(N, snippetArr);
logging.info(`Translating ${snippetArr.length} snippets using ${groups.length} workers`);
logging.info(`Translating ${snippetArr.length} snippets using ${N} workers`);

// Run workers
const responses = await Promise.all(
groups.map((snippets) => ({ snippets, includeCompilerDiagnostics })).map(runWorker),
);
const pool = workerpool.pool(path.join(__dirname, 'extract_worker.js'), {
maxWorkers: N,
});

// Combine results
const x = responses.reduce(
(acc, current) => {
// Modifying 'acc' in place to not incur useless copying
acc.translatedSnippetSchemas.push(...current.translatedSnippetSchemas);
acc.diagnostics.push(...current.diagnostics);
return acc;
},
{ translatedSnippetSchemas: [], diagnostics: [] },
);
// Hydrate TranslatedSnippets from data back to objects
return {
diagnostics: x.diagnostics,
translatedSnippets: x.translatedSnippetSchemas.map((s) => TranslatedSnippet.fromSchema(s)),
};
try {
const requests = batchSnippets(snippetArr, includeCompilerDiagnostics);

/**
* Turn running the worker into a nice Promise.
*/
async function runWorker(
request: import('./extract_worker').TranslateRequest,
): Promise<import('./extract_worker').TranslateResponse> {
return new Promise((resolve, reject) => {
const wrk = new worker.Worker(path.join(__dirname, 'extract_worker.js'), {
resourceLimits: {
// Note: V8 heap statistics are expressed in bytes, so we divide by 1MiB (1,048,576 bytes)
maxOldGenerationSizeMb: Math.ceil(v8.getHeapStatistics().heap_size_limit / 1_048_576),
},
workerData: request,
});
wrk.on('message', resolve);
wrk.on('error', reject);
wrk.on('exit', (code) => {
if (code !== 0) {
reject(new Error(`Worker exited with code ${code}`));
}
});
const responses: TranslateBatchResponse[] = await Promise.all(
requests.map((request) => pool.exec('translateBatch', [request])),
);

const diagnostics = new Array<RosettaDiagnostic>();
const translatedSnippets = new Array<TranslatedSnippet>();

// Combine results
for (const response of responses) {
diagnostics.push(...response.diagnostics);
translatedSnippets.push(...response.translatedSchemas.map(TranslatedSnippet.fromSchema));
}
return { diagnostics, translatedSnippets };
} finally {
// Not waiting on purpose
void pool.terminate();
}
}

function batchSnippets(
snippets: TypeScriptSnippet[],
includeCompilerDiagnostics: boolean,
batchSize = 10,
): TranslateBatchRequest[] {
const ret = [];

for (let i = 0; i < snippets.length; i += batchSize) {
ret.push({
snippets: snippets.slice(i, i + batchSize),
includeCompilerDiagnostics,
});
}

return ret;
}
35 changes: 10 additions & 25 deletions packages/jsii-rosetta/lib/commands/extract_worker.ts
Original file line number Diff line number Diff line change
@@ -1,46 +1,31 @@
/**
* Pool worker for extract.ts
*/
import * as worker from 'worker_threads';
import * as workerpool from 'workerpool';

import * as logging from '../logging';
import { TypeScriptSnippet } from '../snippet';
import { TranslatedSnippetSchema } from '../tablets/schema';
import { RosettaDiagnostic } from '../translate';
import { singleThreadedTranslateAll } from './extract';

export interface TranslateRequest {
includeCompilerDiagnostics: boolean;
snippets: TypeScriptSnippet[];
export interface TranslateBatchRequest {
readonly snippets: TypeScriptSnippet[];
readonly includeCompilerDiagnostics: boolean;
}

export interface TranslateResponse {
diagnostics: RosettaDiagnostic[];
export interface TranslateBatchResponse {
// Cannot be 'TranslatedSnippet' because needs to be serializable
translatedSnippetSchemas: TranslatedSnippetSchema[];
readonly translatedSchemas: TranslatedSnippetSchema[];
readonly diagnostics: RosettaDiagnostic[];
}

function translateSnippet(request: TranslateRequest): TranslateResponse {
function translateBatch(request: TranslateBatchRequest): TranslateBatchResponse {
const result = singleThreadedTranslateAll(request.snippets[Symbol.iterator](), request.includeCompilerDiagnostics);

return {
translatedSchemas: result.translatedSnippets.map((s) => s.toSchema()),
diagnostics: result.diagnostics,
translatedSnippetSchemas: result.translatedSnippets.map((s) => s.toSchema()),
};
}

if (worker.isMainThread) {
// Throw an error to prevent accidental require() of this module. In principle not a big
// deal, but we want to be compatible with run modes where 'worker_threads' is not available
// and by doing this people on platforms where 'worker_threads' is available don't accidentally
// add a require().
throw new Error('This script should be run as a worker, not included directly.');
}

const request: TranslateRequest = worker.workerData;
const startTime = Date.now();
const response = translateSnippet(request);
const delta = (Date.now() - startTime) / 1000;
// eslint-disable-next-line prettier/prettier
logging.info(`Finished translation of ${request.snippets.length} in ${delta.toFixed(0)}s (${response.translatedSnippetSchemas.length} responses)`);
worker.parentPort!.postMessage(response);
workerpool.worker({ translateBatch });
2 changes: 2 additions & 0 deletions packages/jsii-rosetta/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"@types/jest": "^27.0.2",
"@types/mock-fs": "^4.13.1",
"@types/node": "^12.20.28",
"@types/workerpool": "^6.1.0",
"eslint": "^7.32.0",
"jest": "^27.2.4",
"jsii": "^0.0.0",
Expand All @@ -37,6 +38,7 @@
"typescript": "~3.9.10",
"sort-json": "^2.0.0",
"@xmldom/xmldom": "^0.7.5",
"workerpool": "^6.1.5",
"yargs": "^16.2.0"
},
"license": "Apache-2.0",
Expand Down
21 changes: 13 additions & 8 deletions yarn.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 919d895

Please sign in to comment.