Skip to content

Commit

Permalink
introduce worker thread pools for SSR page generation (#983)
Browse files Browse the repository at this point in the history
  • Loading branch information
thescientist13 committed Nov 3, 2022
1 parent 0974a67 commit 4e650cd
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 42 deletions.
6 changes: 4 additions & 2 deletions packages/cli/src/lib/ssr-route-worker.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// https://github.com/nodejs/modules/issues/307#issuecomment-858729422
import { pathToFileURL } from 'url';
import { workerData, parentPort } from 'worker_threads';
import { parentPort } from 'worker_threads';
import { renderToString, renderFromHTML } from 'wc-compiler';

async function executeRouteModule({ modulePath, compilation, route, label, id, prerender, htmlContents, scripts }) {
Expand Down Expand Up @@ -43,4 +43,6 @@ async function executeRouteModule({ modulePath, compilation, route, label, id, p
parentPort.postMessage(data);
}

executeRouteModule(workerData);
parentPort.on('message', async (task) => {
await executeRouteModule(task);
});
79 changes: 79 additions & 0 deletions packages/cli/src/lib/threadpool.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// https://amagiacademy.com/blog/posts/2021-04-09/node-worker-threads-pool
import { AsyncResource } from 'async_hooks';
import { EventEmitter } from 'events';
import { Worker } from 'worker_threads';

const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');

class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}

done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy();
}
}

class WorkerPool extends EventEmitter {
constructor(numThreads, workerFile) {
super();
this.numThreads = numThreads;
this.workerFile = workerFile;
this.workers = [];
this.freeWorkers = [];

for (let i = 0; i < numThreads; i += 1) {
this.addNewWorker();
}
}

addNewWorker() {
const worker = new Worker(this.workerFile);

worker.on('message', (result) => {
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;

this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});

worker.on('error', (err) => {
if (worker[kTaskInfo]) {
worker[kTaskInfo].done(err, null);
} else {
this.emit('error', err);
}
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});

this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}

runTask(task, callback) {
if (this.freeWorkers.length === 0) {
this.once(kWorkerFreedEvent, () => this.runTask(task, callback));
return;
}

const worker = this.freeWorkers.pop();

worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}

close() {
for (const worker of this.workers) {
worker.terminate();
}
}
}

export { WorkerPool };
15 changes: 8 additions & 7 deletions packages/cli/src/lifecycles/graph.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,8 @@ const generateGraph = async (compilation) => {
filePath = route;

await new Promise((resolve, reject) => {
const worker = new Worker(routeWorkerUrl, {
workerData: {
modulePath: fullPath,
compilation: JSON.stringify(compilation),
route
}
});
const worker = new Worker(routeWorkerUrl);

worker.on('message', (result) => {
if (result.frontmatter) {
const resources = (result.frontmatter.imports || []).map((resource) => {
Expand All @@ -151,6 +146,12 @@ const generateGraph = async (compilation) => {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});

worker.postMessage({
modulePath: fullPath,
compilation: JSON.stringify(compilation),
route
});
});

if (ssrFrontmatter) {
Expand Down
43 changes: 19 additions & 24 deletions packages/cli/src/lifecycles/prerender.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import fs from 'fs';
import htmlparser from 'node-html-parser';
import { modelResource } from '../lib/resource-utils.js';
import os from 'os';
import path from 'path';
import { Worker } from 'worker_threads';
import { WorkerPool } from '../lib/threadpool.js';

function isLocalLink(url = '') {
return url.indexOf('http') !== 0 && url.indexOf('//') !== 0;
Expand Down Expand Up @@ -101,7 +102,9 @@ async function preRenderCompilationWorker(compilation, workerPrerender) {

console.info('pages to generate', `\n ${pages.map(page => page.route).join('\n ')}`);

await Promise.all(pages.map(async (page) => {
const pool = new WorkerPool(os.cpus().length, workerPrerender.workerUrl);

for (const page of pages) {
const { outputPath, route } = page;
const outputPathDir = path.join(scratchDir, route);
const htmlResource = compilation.config.plugins.filter((plugin) => {
Expand All @@ -121,35 +124,27 @@ async function preRenderCompilationWorker(compilation, workerPrerender) {
.filter(resource => resource.type === 'script')
.map(resource => resource.sourcePathURL.href);

await new Promise((resolve, reject) => {
const worker = new Worker(workerPrerender.workerUrl, {
workerData: {
modulePath: null,
compilation: JSON.stringify(compilation),
route,
prerender: true,
htmlContents: html,
scripts: JSON.stringify(scripts)
}
});
worker.on('message', (result) => {
if (result.html) {
html = result.html;
}
resolve();
});
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0) {
reject(new Error(`Worker stopped with exit code ${code}`));
html = await new Promise((resolve, reject) => {
pool.runTask({
modulePath: null,
compilation: JSON.stringify(compilation),
route,
prerender: true,
htmlContents: html,
scripts: JSON.stringify(scripts)
}, (err, result) => {
if (err) {
return reject(err);
}

return resolve(result.html);
});
});

await fs.promises.writeFile(path.join(scratchDir, outputPath), html);

console.info('generated page...', route);
}));
}
}

async function preRenderCompilationCustom(compilation, customPrerender) {
Expand Down
15 changes: 8 additions & 7 deletions packages/cli/src/plugins/resource/plugin-standard-html.js
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,8 @@ class StandardHtmlResource extends ResourceInterface {
const routeWorkerUrl = this.compilation.config.plugins.filter(plugin => plugin.type === 'renderer')[0].provider().workerUrl;

await new Promise((resolve, reject) => {
const worker = new Worker(routeWorkerUrl, {
workerData: {
modulePath: routeModuleLocation,
compilation: JSON.stringify(this.compilation),
route: fullPath
}
});
const worker = new Worker(routeWorkerUrl);

worker.on('message', (result) => {
if (result.template) {
ssrTemplate = result.template;
Expand Down Expand Up @@ -332,6 +327,12 @@ class StandardHtmlResource extends ResourceInterface {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});

worker.postMessage({
modulePath: routeModuleLocation,
compilation: JSON.stringify(this.compilation),
route: fullPath
});
});
}

Expand Down
6 changes: 4 additions & 2 deletions packages/plugin-renderer-lit/src/ssr-route-worker-lit.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { html } from 'lit';
import { unsafeHTML } from 'lit-html/directives/unsafe-html.js';
import { pathToFileURL } from 'url';
import { Readable } from 'stream';
import { workerData, parentPort } from 'worker_threads';
import { parentPort } from 'worker_threads';

async function streamToString (stream) {
const chunks = [];
Expand Down Expand Up @@ -71,4 +71,6 @@ async function executeRouteModule({ modulePath, compilation, route, label, id, p
parentPort.postMessage(data);
}

executeRouteModule(workerData);
parentPort.on('message', async (task) => {
await executeRouteModule(task);
});

0 comments on commit 4e650cd

Please sign in to comment.