Skip to content

Commit

Permalink
fix(workbench/layout): debounce storing workbench layout
Browse files Browse the repository at this point in the history
Writing to storage is an asynchronous operation performed sequentially. To reduce the number of write operations, only the most recent request is buffered if a previous one is still pending.
  • Loading branch information
danielwiehl authored and Marcarrian committed Aug 28, 2024
1 parent 4d0ba04 commit 076c241
Show file tree
Hide file tree
Showing 6 changed files with 373 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2018-2024 Swiss Federal Railways
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/

import {TestBed} from '@angular/core/testing';
import {firstValueFrom, Subject, timer} from 'rxjs';
import {LatestTaskExecutor} from './latest-task-executor';
import {createEnvironmentInjector, EnvironmentInjector, runInInjectionContext} from '@angular/core';

describe('LatestTaskExecutor', () => {

it('should execute tasks in serial order, debouncing pending tasks', async () => {
const executor = TestBed.runInInjectionContext(() => new LatestTaskExecutor());

const completeTask1$ = new Subject<void>();
const executionLog = new Array<string>();

// Submit task 1.
executor.submit(async () => {
executionLog.push('task 1');
await firstValueFrom(completeTask1$);
});

// Submit task 2.
executor.submit(async () => {
executionLog.push('task 2');
});

// Submit task 3.
executor.submit(async () => {
executionLog.push('task 3');
});

// Wait for task 1 to be executed.
await firstValueFrom(timer(100));
expect(executionLog).toEqual(['task 1']);

// Complete task 1.
completeTask1$.next();

// Wait for task 3 to be executed.
await firstValueFrom(timer(100));
expect(executionLog).toEqual(['task 1', 'task 3']);
});

it('should destroy executor when destroying injector', async () => {
const injector = createEnvironmentInjector([], TestBed.inject(EnvironmentInjector));
const executor = runInInjectionContext(injector, () => new LatestTaskExecutor());

const completeTask1$ = new Subject<void>();
const executionLog = new Array<string>();

// Submit task 1.
executor.submit(async () => {
executionLog.push('task 1');
await firstValueFrom(completeTask1$);
});

// Submit task 2.
executor.submit(async () => {
executionLog.push('task 2');
});

// Wait for task 1 to be executed.
await firstValueFrom(timer(100));
expect(executionLog).toEqual(['task 1']);

// Destroy injector.
injector.destroy();

// Complete task 1.
completeTask1$.next();
await firstValueFrom(timer(100));

// Expect task 2 not to be executed.
expect(executionLog).toEqual(['task 1']);
});
});
64 changes: 64 additions & 0 deletions projects/scion/workbench/src/lib/executor/latest-task-executor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2018-2024 Swiss Federal Railways
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/

import {DestroyRef, inject} from '@angular/core';

/**
* Executes submitted tasks in serial order.
*
* Unlike {@link SingleTaskExecutor}, this executor has a queue size of 1, with only the most recently scheduled task being queued.
*
* Tasks are executed in serial order. At any one time, there is only one task executing and a maximum of one task pending.
* When a task is submitted and there is a task already executing, the submitted task will be queued for later execution.
* Any task previously placed in the queue is discarded.
*
* This executor must be constructed within an injection context. Destroying the injection context will also destroy the executor.
*/
export class LatestTaskExecutor {

private _executing = false;
private _latestTask: (() => Promise<void>) | undefined;

constructor() {
// Clear pending tasks when the current injection context is destroyed.
inject(DestroyRef).onDestroy(() => this.onDestroy());
}

/**
* Submits a task for serial execution.
*/
public submit(task: () => Promise<void>): void {
this._latestTask = task;
this.executeLatestTask();
}

/**
* Executes the last task in the queue (if any).
* After completion, this process is repeated until the task queue is empty.
*/
private executeLatestTask(): void {
if (this._executing || !this._latestTask) {
return;
}

const task = this._latestTask;
this._latestTask = undefined;

this._executing = true;
task().finally(() => {
this._executing = false;
this.executeLatestTask();
});
}

private onDestroy(): void {
this._latestTask = undefined;
}
}
105 changes: 105 additions & 0 deletions projects/scion/workbench/src/lib/executor/single-task-executor.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (c) 2018-2024 Swiss Federal Railways
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/

import {TestBed} from '@angular/core/testing';
import {firstValueFrom, Subject, timer} from 'rxjs';
import {createEnvironmentInjector, EnvironmentInjector, runInInjectionContext} from '@angular/core';
import {SingleTaskExecutor} from './single-task-executor';

describe('SingleTaskExecutor', () => {

it('should execute tasks in serial order', async () => {
const executor = TestBed.runInInjectionContext(() => new SingleTaskExecutor());

const completeTask1$ = new Subject<void>();
const completeTask2$ = new Subject<void>();
const executionLog = new Array<string>();

// Submit task 1.
const task1 = executor.submit(async () => {
executionLog.push('task 1');
await firstValueFrom(completeTask1$);
return 'task 1 completed';
});

// Submit task 2.
const task2 = executor.submit(async () => {
executionLog.push('task 2');
await firstValueFrom(completeTask2$);
return 'task 2 completed';
});

// Submit task 3.
const task3 = executor.submit(async () => {
executionLog.push('task 3');
return 'task 3 completed';
});

// Wait for task 1 to be executed.
await firstValueFrom(timer(100));
expect(executionLog).toEqual(['task 1']);
await expectAsync(task1).toBePending();
await expectAsync(task2).toBePending();
await expectAsync(task3).toBePending();

// Complete task 1.
completeTask1$.next();
await firstValueFrom(timer(100));
expect(executionLog).toEqual(['task 1', 'task 2']);
await expectAsync(task1).toBeResolvedTo('task 1 completed');
await expectAsync(task2).toBePending();
await expectAsync(task3).toBePending();

// Complete task 2.
completeTask2$.next();
await firstValueFrom(timer(100));
expect(executionLog).toEqual(['task 1', 'task 2', 'task 3']);
await expectAsync(task1).toBeResolvedTo('task 1 completed');
await expectAsync(task2).toBeResolvedTo('task 2 completed');
await expectAsync(task3).toBeResolvedTo('task 3 completed');
});

it('should destroy executor when destroying injector', async () => {
const injector = createEnvironmentInjector([], TestBed.inject(EnvironmentInjector));
const executor = runInInjectionContext(injector, () => new SingleTaskExecutor());

const completeTask1$ = new Subject<void>();
const executionLog = new Array<string>();

// Submit task 1.
const task1 = executor.submit(async () => {
executionLog.push('task 1');
await firstValueFrom(completeTask1$);
});

// Submit task 2.
const task2 = executor.submit(async () => {
executionLog.push('task 2');
});

// Wait for task 1 to be executed.
await firstValueFrom(timer(100));
expect(executionLog).toEqual(['task 1']);
await expectAsync(task1).toBePending();
await expectAsync(task2).toBePending();

// Destroy injector.
injector.destroy();

// Complete task 1.
completeTask1$.next();
await firstValueFrom(timer(100));

// Expect task 2 not to be executed.
expect(executionLog).toEqual(['task 1']);
await expectAsync(task1).toBeResolved();
await expectAsync(task2).toBePending();
});
});
23 changes: 6 additions & 17 deletions projects/scion/workbench/src/lib/executor/single-task-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,28 @@
import {asapScheduler, AsyncSubject, lastValueFrom, Subject} from 'rxjs';
import {serializeExecution} from '../common/operators';
import {catchError, observeOn} from 'rxjs/operators';
import {DestroyRef, inject, InjectionToken} from '@angular/core';
import {ɵDestroyRef} from '../common/ɵdestroy-ref';
import {InjectionToken} from '@angular/core';
import {takeUntilDestroyed} from '@angular/core/rxjs-interop';

/**
* Serializes navigation requests to the Angular Router to prevent cancelling of parallel navigations or race conditions when modifying the currently active workbench layout.
*/
export const SINGLE_NAVIGATION_EXECUTOR = new InjectionToken<SingleTaskExecutor>('SINGLE_NAVIGATION_EXECUTOR', {
providedIn: 'root',
factory: () => {
const executor = new SingleTaskExecutor();
inject(DestroyRef).onDestroy(() => executor.destroy());
return executor;
},
factory: () => new SingleTaskExecutor(),
});

/**
* Allows the serial execution of tasks.
*
* At any given time, there is only one task executing. When submitting a task and if there is a task already executing,
* At any one time, there is only one task executing. When submitting a task and if there is a task already executing,
* the submitted task will be queued for later execution.
*
* This executor must be constructed within an injection context. Destroying the injection context will also destroy the executor.
*/
export class SingleTaskExecutor {

private _task$ = new Subject<Task>();
private _destroyRef = new ɵDestroyRef();

constructor() {
this._task$
Expand All @@ -48,7 +44,7 @@ export class SingleTaskExecutor {
observeOn(asapScheduler),
serializeExecution(task => task.execute()),
catchError((error, caught) => caught),
takeUntilDestroyed(this._destroyRef),
takeUntilDestroyed(),
)
.subscribe();
}
Expand All @@ -63,13 +59,6 @@ export class SingleTaskExecutor {
this._task$.next(ɵtask);
return ɵtask.await();
}

/**
* Destroys this executor.
*/
public destroy(): void {
this._destroyRef.destroy();
}
}

class Task<T = any> {
Expand Down
Loading

0 comments on commit 076c241

Please sign in to comment.