Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(AsyncObservable): Create AsyncObservable #308

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
15 changes: 15 additions & 0 deletions src/abortcontroller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export function createLinkedAbortController(...signals: AbortSignal[]) {
const controller = new AbortController();

Array.from(signals).forEach((signal) => {
signal.addEventListener(
'abort',
() => {
controller.abort();
},
{ once: true }
);
});

return controller;
}
159 changes: 159 additions & 0 deletions src/asyncobservable/asyncobservablex.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import {
AsyncObservable,
AsyncObserver,
AsyncSubscription,
OperatorAsyncObservableFunction,
PartialAsyncObserver,
SYMBOL_ASYNC_DISPOSABLE,
UnaryFunction,
} from '../interfaces';
import { AsyncObserverX } from './asyncobserverx';

class AutoDetachObserver<T> extends AsyncObserverX<T> {
private _observer: AsyncObserver<T>;
private _subscription?: AsyncSubscription;
private _unsubscribing: boolean = false;
private _task?: Promise<void>;

constructor(observer: AsyncObserver<T>) {
super();
this._observer = observer;
}

async assign(subscription: AsyncSubscription) {
let shouldUnsubscribe = false;
if (this._unsubscribing) {
shouldUnsubscribe = true;
} else {
this._subscription = subscription;
}

if (shouldUnsubscribe) {
await this._subscription![SYMBOL_ASYNC_DISPOSABLE]();
}
}

async _next(value: T) {
if (this._unsubscribing) {
return;
}
this._task = this._observer.next(value);
try {
await this._task;
} finally {
this._task = undefined;
}
}

async _error(err: any) {
if (this._unsubscribing) {
return;
}
this._task = this._observer.error(err);
try {
await this._task;
} finally {
await this._finish();
}
}

async _complete() {
if (this._unsubscribing) {
return;
}
this._task = this._observer.complete();
try {
await this._task;
} finally {
await this._finish();
}
}

async [SYMBOL_ASYNC_DISPOSABLE]() {
let subscription;

if (!this._unsubscribing) {
this._unsubscribing = true;
subscription = this._subscription;
}

if (subscription) {
await subscription[SYMBOL_ASYNC_DISPOSABLE]();
}
}

private async _finish() {
let subscription;
if (!this._unsubscribing) {
this._unsubscribing = true;
subscription = this._subscription;
}

this._task = undefined;

if (subscription) {
await subscription[SYMBOL_ASYNC_DISPOSABLE]();
}
}
}

export class SafeObserver<T> extends AsyncObserverX<T> {
private _observer: PartialAsyncObserver<T>;

constructor(observer: PartialAsyncObserver<T>) {
super();
this._observer = observer;
}

async _next(value: T) {
if (this._observer.next) {
await this._observer.next(value);
}
}

async _error(err: any) {
if (this._observer.error) {
await this._observer.error(err);
} else {
throw err;
}
}

async _complete() {
if (this._observer.complete) {
await this._observer.complete();
}
}
}

export abstract class AsyncObservableX<T> implements AsyncObservable<T> {
async subscribeAsync(
observer: PartialAsyncObserver<T>,
signal?: AbortSignal
): Promise<AsyncSubscription> {
const safeObserver = new SafeObserver<T>(observer);
const autoDetachObserver = new AutoDetachObserver<T>(safeObserver);
const subscription = await this._subscribeAsync(autoDetachObserver, signal);
await autoDetachObserver.assign(subscription);
return autoDetachObserver;
}

abstract _subscribeAsync(
observer: AsyncObserver<T>,
signal?: AbortSignal
): Promise<AsyncSubscription>;

/** @nocollapse */
pipe<R>(...operations: UnaryFunction<AsyncObservable<T>, R>[]): R;
pipe<R>(...operations: OperatorAsyncObservableFunction<T, R>[]): AsyncObservableX<R>;
pipe(...args: any[]) {
let i = -1;
const n = args.length;
let acc: any = this;
while (++i < n) {
// TODO: Cast using `as`
acc = args[i](acc);
}
return acc;
}
}
55 changes: 55 additions & 0 deletions src/asyncobservable/asyncobserverx.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { AsyncObserver } from '../interfaces';

enum ObserverState {
Idle,
Busy,
Done,
}

export abstract class AsyncObserverX<T> implements AsyncObserver<T> {
private _state: ObserverState = ObserverState.Idle;

next(value: T) {
this._tryEnter();
try {
return this._next(value);
} finally {
this._state = ObserverState.Idle;
}
}

abstract _next(value: T): Promise<void>;

error(err: any) {
this._tryEnter();
try {
return this._error(err);
} finally {
this._state = ObserverState.Done;
}
}

abstract _error(err: any): Promise<void>;

complete() {
this._tryEnter();
try {
return this._complete();
} finally {
this._state = ObserverState.Done;
}
}

abstract _complete(): Promise<void>;

private _tryEnter() {
const old = this._state;
if (old === ObserverState.Idle) {
this._state = ObserverState.Busy;
} else if (old === ObserverState.Busy) {
throw new Error('Observer is already busy');
} else if (old === ObserverState.Done) {
throw new Error('Observer has already terminated');
}
}
}
26 changes: 26 additions & 0 deletions src/asyncobservable/concurrency/_delay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { AbortError } from '../../aborterror';

export function delay(dueTime: number, signal: AbortSignal): Promise<void> {
return new Promise((resolve, reject) => {
if (signal.aborted) {
reject(new AbortError());
}

const id = setTimeout(() => {
if (signal.aborted) {
reject(new AbortError());
} else {
resolve();
}
}, dueTime);

signal.addEventListener(
'abort',
() => {
clearTimeout(id);
reject(new AbortError());
},
{ once: true }
);
});
}
70 changes: 70 additions & 0 deletions src/asyncobservable/concurrency/asyncschedulerx.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { throwIfAborted } from 'ix/aborterror';
import { AsyncScheduler, AsyncSubscription, SYMBOL_ASYNC_DISPOSABLE } from '../../interfaces';

function normalizeTime(time: number) {
return time < 0 ? 0 : time;
}

export abstract class AsyncSchedulerX implements AsyncScheduler {
get now() {
return Date.now();
}

scheduleNowAsync(action: (signal: AbortSignal) => Promise<void>, signal?: AbortSignal) {
return this._scheduleAsync(action, signal);
}

scheduleFutureAsync(
action: (signal: AbortSignal) => Promise<void>,
dueTime: number,
signal?: AbortSignal
) {
const newTime = normalizeTime(dueTime);

return this._scheduleAsync(async (innerSignal) => {
await this._delay(newTime, innerSignal);
await action(innerSignal);
}, signal);
}

async _scheduleAsync(
action: (signal: AbortSignal) => Promise<void>,
signal?: AbortSignal
): Promise<AsyncSubscription> {
throwIfAborted(signal);

const cas = new CancellationAsyncSubscription();
cas.link(signal);
await this._scheduleCoreAsync(action, cas.signal);
return cas;
}

abstract _scheduleCoreAsync(
action: (signal: AbortSignal) => Promise<void>,
signal: AbortSignal
): Promise<void>;

abstract _delay(dueTime: number, signal: AbortSignal): Promise<void>;
}

export class CancellationAsyncSubscription implements AsyncSubscription {
private _controller: AbortController;

constructor() {
this._controller = new AbortController();
}

get signal() {
return this._controller.signal;
}

link(signal?: AbortSignal) {
if (signal) {
signal.addEventListener('abort', () => this._controller.abort(), { once: true });
}
}

async [SYMBOL_ASYNC_DISPOSABLE]() {
this._controller.abort();
}
}
20 changes: 20 additions & 0 deletions src/asyncobservable/concurrency/immediateasyncscheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { AsyncSchedulerX } from './asyncschedulerx';
import { delay } from './_delay';

export class ImmediateAsyncScheduler extends AsyncSchedulerX {
private static _instance = new ImmediateAsyncScheduler();
static get instance() {
return ImmediateAsyncScheduler._instance;
}

async _scheduleCoreAsync(
action: (signal: AbortSignal) => Promise<void>,
signal: AbortSignal
): Promise<void> {
await action(signal);
}

async _delay(dueTime: number, signal: AbortSignal): Promise<void> {
await delay(dueTime, signal);
}
}
22 changes: 22 additions & 0 deletions src/asyncobservable/concurrency/microtaskscheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { AsyncSchedulerX } from './asyncschedulerx';
import { delay } from './_delay';

export class MicroTaskAsyncScheduler extends AsyncSchedulerX {
private static _instance = new MicroTaskAsyncScheduler();
static get instance() {
return MicroTaskAsyncScheduler._instance;
}

async _scheduleCoreAsync(
action: (signal: AbortSignal) => Promise<void>,
signal: AbortSignal
): Promise<void> {
return Promise.resolve().then(() => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return Promise.resolve().then(() => {
return await Promise.resolve().then(() => {

return action(signal);
});
}

async _delay(dueTime: number, signal: AbortSignal): Promise<void> {
await delay(dueTime, signal);
}
}
Loading