Skip to content

Commit

Permalink
split code
Browse files Browse the repository at this point in the history
  • Loading branch information
gildas-lormeau committed Jan 27, 2024
1 parent 6e9e795 commit f731601
Showing 1 changed file with 37 additions and 29 deletions.
66 changes: 37 additions & 29 deletions lib/core/codec-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,51 +56,59 @@ let indexWorker = 0;
async function runWorker(stream, workerOptions) {
const { options, config } = workerOptions;
const { transferStreams, useWebWorkers, useCompressionStream, codecType, compressed, signed, encrypted } = options;
const { workerScripts, maxWorkers, terminateWorkerTimeout } = config;
const { workerScripts, maxWorkers } = config;
workerOptions.transferStreams = transferStreams || transferStreams === UNDEFINED_VALUE;
const streamCopy = !compressed && !signed && !encrypted && !workerOptions.transferStreams;
workerOptions.useWebWorkers = !streamCopy && (useWebWorkers || (useWebWorkers === UNDEFINED_VALUE && config.useWebWorkers));
workerOptions.scripts = workerOptions.useWebWorkers && workerScripts ? workerScripts[codecType] : [];
options.useCompressionStream = useCompressionStream || (useCompressionStream === UNDEFINED_VALUE && config.useCompressionStream);
let worker;
const workerData = pool.find(workerData => !workerData.busy);
if (workerData) {
clearTerminateTimeout(workerData);
worker = new CodecWorker(workerData, stream, workerOptions, onTaskFinished);
} else if (pool.length < maxWorkers) {
const workerData = { indexWorker };
indexWorker++;
pool.push(workerData);
worker = new CodecWorker(workerData, stream, workerOptions, onTaskFinished);
} else {
worker = await new Promise(resolve => pendingRequests.push({ resolve, stream, workerOptions }));
}
return worker.run();
return (await getWorker()).run();

function onTaskFinished(workerData) {
if (pendingRequests.length) {
const [{ resolve, stream, workerOptions }] = pendingRequests.splice(0, 1);
resolve(new CodecWorker(workerData, stream, workerOptions, onTaskFinished));
} else if (workerData.worker) {
clearTerminateTimeout(workerData);
if (Number.isFinite(terminateWorkerTimeout) && terminateWorkerTimeout >= 0) {
if (workerData.terminated) {
workerData.terminated = false;
} else {
workerData.terminateTimeout = setTimeout(async () => {
pool = pool.filter(data => data != workerData);
try {
await workerData.terminate();
} catch (_error) {
// ignored
}
}, terminateWorkerTimeout);
}
}
terminateWorker(workerData, workerOptions);
} else {
pool = pool.filter(data => data != workerData);
}
}

async function getWorker() {
const workerData = pool.find(workerData => !workerData.busy);
if (workerData) {
clearTerminateTimeout(workerData);
return new CodecWorker(workerData, stream, workerOptions, onTaskFinished);
} else if (pool.length < maxWorkers) {
const workerData = { indexWorker };
indexWorker++;
pool.push(workerData);
return new CodecWorker(workerData, stream, workerOptions, onTaskFinished);
} else {
return new Promise(resolve => pendingRequests.push({ resolve, stream, workerOptions }));
}
}
}

function terminateWorker(workerData, workerOptions) {
const { config } = workerOptions;
const { terminateWorkerTimeout } = config;
if (Number.isFinite(terminateWorkerTimeout) && terminateWorkerTimeout >= 0) {
if (workerData.terminated) {
workerData.terminated = false;
} else {
workerData.terminateTimeout = setTimeout(async () => {
pool = pool.filter(data => data != workerData);
try {
await workerData.terminate();
} catch (_error) {
// ignored
}
}, terminateWorkerTimeout);
}
}
}

function clearTerminateTimeout(workerData) {
Expand Down

0 comments on commit f731601

Please sign in to comment.