Skip to content

Commit

Permalink
Introduce lock to prevent parallel task execution (#9858)
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Mäder <[email protected]>
  • Loading branch information
tsmaeder authored Aug 24, 2021
1 parent 8cae85b commit ff9e050
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 27 deletions.
24 changes: 22 additions & 2 deletions packages/core/src/common/promise-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export function timeout(ms: number, token = CancellationToken.None): Promise<voi
return deferred.promise;
}

export async function retry<T>(task: () => Promise<T>, delay: number, retries: number): Promise<T> {
export async function retry<T>(task: () => Promise<T>, retryDelay: number, retries: number): Promise<T> {
let lastError: Error | undefined;

for (let i = 0; i < retries; i++) {
Expand All @@ -53,9 +53,29 @@ export async function retry<T>(task: () => Promise<T>, delay: number, retries: n
} catch (error) {
lastError = error;

await timeout(delay);
await timeout(retryDelay);
}
}

throw lastError;
}

/**
* A function to allow a promise resolution to be delayed by a number of milliseconds. Usage is as follows:
*
* `const stringValue = await myPromise.then(delay(600)).then(value => value.toString());`
*
* @param ms the number of millisecond to delay
* @returns a function that returns a promise that returns the given value, but delayed
*/
export function delay<T>(ms: number): (value: T) => Promise<T> {
return value => new Promise((resolve, reject) => { setTimeout(() => resolve(value), ms); });
}

/**
* Constructs a promise that will resolve after a given delay.
* @param ms the number of milliseconds to wait
*/
export async function wait(ms: number): Promise<void> {
await delay(ms)(undefined);
}
3 changes: 2 additions & 1 deletion packages/task/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"@theia/workspace": "1.16.0",
"ajv": "^6.5.3",
"jsonc-parser": "^2.2.0",
"p-debounce": "^2.1.0"
"p-debounce": "^2.1.0",
"async-mutex": "^0.3.1"
},
"publishConfig": {
"access": "public"
Expand Down
67 changes: 43 additions & 24 deletions packages/task/src/browser/task-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import { TaskNode } from './task-node';
import { MonacoWorkspace } from '@theia/monaco/lib/browser/monaco-workspace';
import { TaskTerminalWidgetManager } from './task-terminal-widget-manager';
import { ShellTerminalServerProxy } from '@theia/terminal/lib/common/shell-terminal-protocol';
import { Mutex } from 'async-mutex';

export interface QuickPickProblemMatcherItem {
problemMatchers: NamedProblemMatcher[] | undefined;
Expand Down Expand Up @@ -100,6 +101,8 @@ export class TaskService implements TaskConfigurationClient {
isBackgroundTaskEnded: Deferred<boolean | undefined>
}>();

protected taskStartingLock: Mutex = new Mutex();

@inject(FrontendApplication)
protected readonly app: FrontendApplication;

Expand Down Expand Up @@ -727,34 +730,50 @@ export class TaskService implements TaskConfigurationClient {
}

async runTask(task: TaskConfiguration, option?: RunTaskOption): Promise<TaskInfo | undefined> {
const runningTasksInfo: TaskInfo[] = await this.getRunningTasks();
console.debug('entering runTask');
const releaseLock = await this.taskStartingLock.acquire();
console.debug('got lock');

// check if the task is active
const matchedRunningTaskInfo = runningTasksInfo.find(taskInfo => {
const taskConfig = taskInfo.config;
return this.taskDefinitionRegistry.compareTasks(taskConfig, task);
});
if (matchedRunningTaskInfo) { // the task is active
const taskName = this.taskNameResolver.resolve(task);
const terminalId = matchedRunningTaskInfo.terminalId;
if (terminalId) {
const terminal = this.terminalService.getByTerminalId(terminalId);
if (terminal) {
if (TaskOutputPresentation.shouldSetFocusToTerminal(task)) { // assign focus to the terminal if presentation.focus is true
this.terminalService.open(terminal, { mode: 'activate' });
} else if (TaskOutputPresentation.shouldAlwaysRevealTerminal(task)) { // show the terminal but not assign focus
this.terminalService.open(terminal, { mode: 'reveal' });
try {
const runningTasksInfo: TaskInfo[] = await this.getRunningTasks();
// check if the task is active
const matchedRunningTaskInfo = runningTasksInfo.find(taskInfo => {
const taskConfig = taskInfo.config;
return this.taskDefinitionRegistry.compareTasks(taskConfig, task);
});
console.debug(`running task ${JSON.stringify(task)}, already running = ${!!matchedRunningTaskInfo}`);

if (matchedRunningTaskInfo) { // the task is active
releaseLock();
console.debug('released lock');
const taskName = this.taskNameResolver.resolve(task);
const terminalId = matchedRunningTaskInfo.terminalId;
if (terminalId) {
const terminal = this.terminalService.getByTerminalId(terminalId);
if (terminal) {
if (TaskOutputPresentation.shouldSetFocusToTerminal(task)) { // assign focus to the terminal if presentation.focus is true
this.terminalService.open(terminal, { mode: 'activate' });
} else if (TaskOutputPresentation.shouldAlwaysRevealTerminal(task)) { // show the terminal but not assign focus
this.terminalService.open(terminal, { mode: 'reveal' });
}
}
}
const selectedAction = await this.messageService.info(`The task '${taskName}' is already active`, 'Terminate Task', 'Restart Task');
if (selectedAction === 'Terminate Task') {
await this.terminateTask(matchedRunningTaskInfo);
} else if (selectedAction === 'Restart Task') {
return this.restartTask(matchedRunningTaskInfo, option);
}
} else { // run task as the task is not active
console.debug('task about to start');
const taskInfo = await this.doRunTask(task, option);
releaseLock();
console.debug('release lock 2');
return taskInfo;
}
const selectedAction = await this.messageService.info(`The task '${taskName}' is already active`, 'Terminate Task', 'Restart Task');
if (selectedAction === 'Terminate Task') {
await this.terminateTask(matchedRunningTaskInfo);
} else if (selectedAction === 'Restart Task') {
return this.restartTask(matchedRunningTaskInfo, option);
}
} else { // run task as the task is not active
return this.doRunTask(task, option);
} catch (e) {
releaseLock();
throw e;
}
}

Expand Down
12 changes: 12 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2269,6 +2269,13 @@ async-limiter@~1.0.0:
resolved "https://registry.yarnpkg.com/async-limiter/-/async-limiter-1.0.1.tgz#dd379e94f0db8310b08291f9d64c3209766617fd"
integrity sha512-csOlWGAcRFJaI6m+F2WKdnMKr4HhdhFVBk0H/QbJFMCr+uO2kwohwXQPxw/9OCxp05r5ghVBFSyioixx3gfkNQ==

async-mutex@^0.3.1:
version "0.3.1"
resolved "https://registry.yarnpkg.com/async-mutex/-/async-mutex-0.3.1.tgz#7033af665f1c7cebed8b878267a43ba9e77c5f67"
integrity sha512-vRfQwcqBnJTLzVQo72Sf7KIUbcSUP5hNchx6udI1U6LuPQpfePgdjJzlCe76yFZ8pxlLjn9lwcl/Ya0TSOv0Tw==
dependencies:
tslib "^2.1.0"

async@^1.5.0:
version "1.5.2"
resolved "https://registry.yarnpkg.com/async/-/async-1.5.2.tgz#ec6a61ae56480c0c3cb241c95618e20892f9672a"
Expand Down Expand Up @@ -10224,6 +10231,11 @@ tslib@^1.10.0, tslib@^1.8.0, tslib@^1.8.1, tslib@^1.9.0:
resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.14.1.tgz#cf2d38bdc34a134bcaf1091c41f6619e2f672d00"
integrity sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg==

tslib@^2.1.0:
version "2.3.1"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.3.1.tgz#e8a335add5ceae51aa261d32a490158ef042ef01"
integrity sha512-77EbyPPpMz+FRFRuAFlWMtmgUWGe9UOG2Z25NqCwiIjRhOf5iKGuzSe5P2w1laq+FkRy4p+PCuVkJSGkzTEKVw==

tslint@^5.12.0:
version "5.20.1"
resolved "https://registry.yarnpkg.com/tslint/-/tslint-5.20.1.tgz#e401e8aeda0152bc44dd07e614034f3f80c67b7d"
Expand Down

0 comments on commit ff9e050

Please sign in to comment.