diff --git a/src/abortcontroller.ts b/src/abortcontroller.ts new file mode 100644 index 00000000..be2f2e1c --- /dev/null +++ b/src/abortcontroller.ts @@ -0,0 +1,15 @@ +export function createLinkedAbortController(...signals: AbortSignal[]) { + const controller = new AbortController(); + + Array.from(signals).forEach((signal) => { + signal.addEventListener( + 'abort', + () => { + controller.abort(); + }, + { once: true } + ); + }); + + return controller; +} diff --git a/src/asyncobservable/asyncobservablex.ts b/src/asyncobservable/asyncobservablex.ts new file mode 100644 index 00000000..1fe8962d --- /dev/null +++ b/src/asyncobservable/asyncobservablex.ts @@ -0,0 +1,159 @@ +import { + AsyncObservable, + AsyncObserver, + AsyncSubscription, + OperatorAsyncObservableFunction, + PartialAsyncObserver, + SYMBOL_ASYNC_DISPOSABLE, + UnaryFunction, +} from '../interfaces'; +import { AsyncObserverX } from './asyncobserverx'; + +class AutoDetachObserver extends AsyncObserverX { + private _observer: AsyncObserver; + private _subscription?: AsyncSubscription; + private _unsubscribing: boolean = false; + private _task?: Promise; + + constructor(observer: AsyncObserver) { + super(); + this._observer = observer; + } + + async assign(subscription: AsyncSubscription) { + let shouldUnsubscribe = false; + if (this._unsubscribing) { + shouldUnsubscribe = true; + } else { + this._subscription = subscription; + } + + if (shouldUnsubscribe) { + await this._subscription![SYMBOL_ASYNC_DISPOSABLE](); + } + } + + async _next(value: T) { + if (this._unsubscribing) { + return; + } + this._task = this._observer.next(value); + try { + await this._task; + } finally { + this._task = undefined; + } + } + + async _error(err: any) { + if (this._unsubscribing) { + return; + } + this._task = this._observer.error(err); + try { + await this._task; + } finally { + await this._finish(); + } + } + + async _complete() { + if (this._unsubscribing) { + return; + } + this._task = this._observer.complete(); + try { + await this._task; + } finally { + await this._finish(); + } + } + + async [SYMBOL_ASYNC_DISPOSABLE]() { + let subscription; + + if (!this._unsubscribing) { + this._unsubscribing = true; + subscription = this._subscription; + } + + if (subscription) { + await subscription[SYMBOL_ASYNC_DISPOSABLE](); + } + } + + private async _finish() { + let subscription; + if (!this._unsubscribing) { + this._unsubscribing = true; + subscription = this._subscription; + } + + this._task = undefined; + + if (subscription) { + await subscription[SYMBOL_ASYNC_DISPOSABLE](); + } + } +} + +export class SafeObserver extends AsyncObserverX { + private _observer: PartialAsyncObserver; + + constructor(observer: PartialAsyncObserver) { + super(); + this._observer = observer; + } + + async _next(value: T) { + if (this._observer.next) { + await this._observer.next(value); + } + } + + async _error(err: any) { + if (this._observer.error) { + await this._observer.error(err); + } else { + throw err; + } + } + + async _complete() { + if (this._observer.complete) { + await this._observer.complete(); + } + } +} + +export abstract class AsyncObservableX implements AsyncObservable { + async subscribeAsync( + observer: PartialAsyncObserver, + signal?: AbortSignal + ): Promise { + const safeObserver = new SafeObserver(observer); + const autoDetachObserver = new AutoDetachObserver(safeObserver); + const subscription = await this._subscribeAsync(autoDetachObserver, signal); + await autoDetachObserver.assign(subscription); + return autoDetachObserver; + } + + abstract _subscribeAsync( + observer: AsyncObserver, + signal?: AbortSignal + ): Promise; + + /** @nocollapse */ + pipe(...operations: UnaryFunction, R>[]): R; + pipe(...operations: OperatorAsyncObservableFunction[]): AsyncObservableX; + pipe(...args: any[]) { + let i = -1; + const n = args.length; + let acc: any = this; + while (++i < n) { + // TODO: Cast using `as` + acc = args[i](acc); + } + return acc; + } +} diff --git a/src/asyncobservable/asyncobserverx.ts b/src/asyncobservable/asyncobserverx.ts new file mode 100644 index 00000000..ad7a327a --- /dev/null +++ b/src/asyncobservable/asyncobserverx.ts @@ -0,0 +1,55 @@ +import { AsyncObserver } from '../interfaces'; + +enum ObserverState { + Idle, + Busy, + Done, +} + +export abstract class AsyncObserverX implements AsyncObserver { + private _state: ObserverState = ObserverState.Idle; + + next(value: T) { + this._tryEnter(); + try { + return this._next(value); + } finally { + this._state = ObserverState.Idle; + } + } + + abstract _next(value: T): Promise; + + error(err: any) { + this._tryEnter(); + try { + return this._error(err); + } finally { + this._state = ObserverState.Done; + } + } + + abstract _error(err: any): Promise; + + complete() { + this._tryEnter(); + try { + return this._complete(); + } finally { + this._state = ObserverState.Done; + } + } + + abstract _complete(): Promise; + + private _tryEnter() { + const old = this._state; + if (old === ObserverState.Idle) { + this._state = ObserverState.Busy; + } else if (old === ObserverState.Busy) { + throw new Error('Observer is already busy'); + } else if (old === ObserverState.Done) { + throw new Error('Observer has already terminated'); + } + } +} diff --git a/src/asyncobservable/concurrency/_delay.ts b/src/asyncobservable/concurrency/_delay.ts new file mode 100644 index 00000000..d4d7ab3b --- /dev/null +++ b/src/asyncobservable/concurrency/_delay.ts @@ -0,0 +1,26 @@ +import { AbortError } from '../../aborterror'; + +export function delay(dueTime: number, signal: AbortSignal): Promise { + return new Promise((resolve, reject) => { + if (signal.aborted) { + reject(new AbortError()); + } + + const id = setTimeout(() => { + if (signal.aborted) { + reject(new AbortError()); + } else { + resolve(); + } + }, dueTime); + + signal.addEventListener( + 'abort', + () => { + clearTimeout(id); + reject(new AbortError()); + }, + { once: true } + ); + }); +} diff --git a/src/asyncobservable/concurrency/asyncschedulerx.ts b/src/asyncobservable/concurrency/asyncschedulerx.ts new file mode 100644 index 00000000..50db7667 --- /dev/null +++ b/src/asyncobservable/concurrency/asyncschedulerx.ts @@ -0,0 +1,70 @@ +import { throwIfAborted } from 'ix/aborterror'; +import { AsyncScheduler, AsyncSubscription, SYMBOL_ASYNC_DISPOSABLE } from '../../interfaces'; + +function normalizeTime(time: number) { + return time < 0 ? 0 : time; +} + +export abstract class AsyncSchedulerX implements AsyncScheduler { + get now() { + return Date.now(); + } + + scheduleNowAsync(action: (signal: AbortSignal) => Promise, signal?: AbortSignal) { + return this._scheduleAsync(action, signal); + } + + scheduleFutureAsync( + action: (signal: AbortSignal) => Promise, + dueTime: number, + signal?: AbortSignal + ) { + const newTime = normalizeTime(dueTime); + + return this._scheduleAsync(async (innerSignal) => { + await this._delay(newTime, innerSignal); + await action(innerSignal); + }, signal); + } + + async _scheduleAsync( + action: (signal: AbortSignal) => Promise, + signal?: AbortSignal + ): Promise { + throwIfAborted(signal); + + const cas = new CancellationAsyncSubscription(); + cas.link(signal); + await this._scheduleCoreAsync(action, cas.signal); + return cas; + } + + abstract _scheduleCoreAsync( + action: (signal: AbortSignal) => Promise, + signal: AbortSignal + ): Promise; + + abstract _delay(dueTime: number, signal: AbortSignal): Promise; +} + +export class CancellationAsyncSubscription implements AsyncSubscription { + private _controller: AbortController; + + constructor() { + this._controller = new AbortController(); + } + + get signal() { + return this._controller.signal; + } + + link(signal?: AbortSignal) { + if (signal) { + signal.addEventListener('abort', () => this._controller.abort(), { once: true }); + } + } + + async [SYMBOL_ASYNC_DISPOSABLE]() { + this._controller.abort(); + } +} diff --git a/src/asyncobservable/concurrency/immediateasyncscheduler.ts b/src/asyncobservable/concurrency/immediateasyncscheduler.ts new file mode 100644 index 00000000..4dd88891 --- /dev/null +++ b/src/asyncobservable/concurrency/immediateasyncscheduler.ts @@ -0,0 +1,20 @@ +import { AsyncSchedulerX } from './asyncschedulerx'; +import { delay } from './_delay'; + +export class ImmediateAsyncScheduler extends AsyncSchedulerX { + private static _instance = new ImmediateAsyncScheduler(); + static get instance() { + return ImmediateAsyncScheduler._instance; + } + + async _scheduleCoreAsync( + action: (signal: AbortSignal) => Promise, + signal: AbortSignal + ): Promise { + await action(signal); + } + + async _delay(dueTime: number, signal: AbortSignal): Promise { + await delay(dueTime, signal); + } +} diff --git a/src/asyncobservable/concurrency/microtaskscheduler.ts b/src/asyncobservable/concurrency/microtaskscheduler.ts new file mode 100644 index 00000000..da461a4f --- /dev/null +++ b/src/asyncobservable/concurrency/microtaskscheduler.ts @@ -0,0 +1,22 @@ +import { AsyncSchedulerX } from './asyncschedulerx'; +import { delay } from './_delay'; + +export class MicroTaskAsyncScheduler extends AsyncSchedulerX { + private static _instance = new MicroTaskAsyncScheduler(); + static get instance() { + return MicroTaskAsyncScheduler._instance; + } + + async _scheduleCoreAsync( + action: (signal: AbortSignal) => Promise, + signal: AbortSignal + ): Promise { + return Promise.resolve().then(() => { + return action(signal); + }); + } + + async _delay(dueTime: number, signal: AbortSignal): Promise { + await delay(dueTime, signal); + } +} diff --git a/src/asyncobservable/create.ts b/src/asyncobservable/create.ts new file mode 100644 index 00000000..0d4e3e4c --- /dev/null +++ b/src/asyncobservable/create.ts @@ -0,0 +1,26 @@ +import { PartialAsyncObserver, AsyncSubscription } from '../interfaces'; +import { AsyncObservableX } from './asyncobservablex'; + +class AnonymousAsyncObservable extends AsyncObservableX { + private _fn: ( + observer: PartialAsyncObserver, + signal?: AbortSignal + ) => Promise; + + constructor( + fn: (observer: PartialAsyncObserver, signal?: AbortSignal) => Promise + ) { + super(); + this._fn = fn; + } + + _subscribeAsync(observer: PartialAsyncObserver, signal?: AbortSignal) { + return this._fn(observer, signal); + } +} + +export function create( + fn: (observer: PartialAsyncObserver, signal?: AbortSignal) => Promise +) { + return new AnonymousAsyncObservable(fn); +} diff --git a/src/asyncobservable/from.ts b/src/asyncobservable/from.ts new file mode 100644 index 00000000..898fc139 --- /dev/null +++ b/src/asyncobservable/from.ts @@ -0,0 +1,72 @@ +import { AbortError } from '../aborterror'; +import { AsyncScheduler, AsyncObserver, AsyncSubscription } from '../interfaces'; +import { AsyncObservableX } from './asyncobservablex'; + +export class FromAsyncIterableAsyncObservable extends AsyncObservableX { + private _value: AsyncIterable; + private _scheduler: AsyncScheduler; + + constructor(value: AsyncIterable, scheduler: AsyncScheduler) { + super(); + this._value = value; + this._scheduler = scheduler; + } + + _subscribeAsync( + observer: AsyncObserver, + signal?: AbortSignal + ): Promise { + return this._scheduler.scheduleNowAsync(async (innerSignal) => { + if (innerSignal.aborted) { + await observer.error(new AbortError()); + return; + } + + // TODO: put a try/catch around each next() call + for await (const item of this._value) { + await observer.next(item); // TODO: Rendevous? + } + + if (innerSignal.aborted) { + await observer.error(new AbortError()); + return; + } + + await observer.complete(); + }, signal); + } +} + +export class FromIterableAsyncObservable extends AsyncObservableX { + private _value: Iterable; + private _scheduler: AsyncScheduler; + + constructor(value: Iterable, scheduler: AsyncScheduler) { + super(); + this._value = value; + this._scheduler = scheduler; + } + + _subscribeAsync( + observer: AsyncObserver, + signal?: AbortSignal + ): Promise { + return this._scheduler.scheduleNowAsync(async (innerSignal) => { + if (innerSignal.aborted) { + await observer.error(new AbortError()); + return; + } + + for await (const item of this._value) { + await observer.next(item); // TODO: Rendevous? + } + + if (innerSignal.aborted) { + await observer.error(new AbortError()); + return; + } + + await observer.complete(); + }, signal); + } +} diff --git a/src/asyncobservable/of.ts b/src/asyncobservable/of.ts new file mode 100644 index 00000000..77650f79 --- /dev/null +++ b/src/asyncobservable/of.ts @@ -0,0 +1,42 @@ +import { AbortError } from '../aborterror'; +import { AsyncScheduler, AsyncObserver, AsyncSubscription } from '../interfaces'; +import { AsyncObservableX } from './asyncobservablex'; +import { ImmediateAsyncScheduler } from './concurrency/immediateasyncscheduler'; + +export class OfAsyncObservable extends AsyncObservableX { + private _value: Iterable; + private _scheduler: AsyncScheduler; + + constructor(value: Iterable, scheduler: AsyncScheduler) { + super(); + this._value = value; + this._scheduler = scheduler; + } + + _subscribeAsync( + observer: AsyncObserver, + signal?: AbortSignal + ): Promise { + return this._scheduler.scheduleNowAsync(async (innerSignal) => { + if (innerSignal.aborted) { + await observer.error(new AbortError()); + return; + } + + for (const item of this._value) { + await observer.next(item); // TODO: Rendevous? + } + + if (innerSignal.aborted) { + await observer.error(new AbortError()); + return; + } + + await observer.complete(); + }, signal); + } +} + +export function of(...args: TSource[]) { + return new OfAsyncObservable(args, ImmediateAsyncScheduler.instance); +} diff --git a/src/asyncobservable/operators/filter.ts b/src/asyncobservable/operators/filter.ts new file mode 100644 index 00000000..71401c1e --- /dev/null +++ b/src/asyncobservable/operators/filter.ts @@ -0,0 +1,88 @@ +import { + AsyncObservable, + AsyncObserver, + AsyncSubscription, + MonoTypeOperatorAsyncObservableFunction, +} from '../../interfaces'; +import { AsyncObservableX } from '../asyncobservablex'; +import { AsyncObserverX } from '../asyncobserverx'; +import { subscribeSafe } from '../subscribesafe'; + +class FilterObserver extends AsyncObserverX { + private _observer: AsyncObserver; + private _predicate: (value: T, index: number, signal?: AbortSignal) => Promise | boolean; + private _thisArg?: any; + private _signal?: AbortSignal; + private _index: number = 0; + + constructor( + observer: AsyncObserver, + predicate: (value: T, index: number, signal?: AbortSignal) => Promise | boolean, + thisArg?: any, + signal?: AbortSignal + ) { + super(); + this._observer = observer; + this._predicate = predicate; + this._thisArg = thisArg; + this._signal = signal; + } + + async _next(value: T) { + let shouldYield; + try { + shouldYield = await this._predicate.call(this._thisArg, value, this._index++, this._signal); + } catch (e) { + await this._observer.error(e); + return; + } + if (shouldYield) { + await this._observer.next(value); + } + } + + async _error(err: any) { + await this._observer.error(err); + } + + async _complete() { + await this._observer.complete(); + } +} + +export class FilterObservable extends AsyncObservableX { + private _source: AsyncObservable; + private _predicate: (value: T, index: number, signal?: AbortSignal) => Promise | boolean; + private _thisArg?: any; + + constructor( + source: AsyncObservable, + predicate: (value: T, index: number, signal?: AbortSignal) => Promise | boolean, + thisArg?: any + ) { + super(); + this._source = source; + this._predicate = predicate; + this._thisArg = thisArg; + } + + async _subscribeAsync( + observer: AsyncObserver, + signal?: AbortSignal + ): Promise { + return await subscribeSafe( + this._source, + new FilterObserver(observer, this._predicate, this._thisArg, signal), + signal + ); + } +} + +export function filter( + predicate: (value: TSource, index: number, signal?: AbortSignal) => Promise | boolean, + thisArg?: any +): MonoTypeOperatorAsyncObservableFunction { + return function filterOperatorFunction(source: AsyncObservable) { + return new FilterObservable(source, predicate, thisArg); + }; +} diff --git a/src/asyncobservable/operators/map.ts b/src/asyncobservable/operators/map.ts new file mode 100644 index 00000000..ebdb6ca0 --- /dev/null +++ b/src/asyncobservable/operators/map.ts @@ -0,0 +1,100 @@ +import { + AsyncObservable, + AsyncObserver, + AsyncSubscription, + OperatorAsyncObservableFunction, +} from '../../interfaces'; +import { AsyncObservableX } from '../asyncobservablex'; +import { AsyncObserverX } from '../asyncobserverx'; +import { subscribeSafe } from '../subscribesafe'; + +class MapObserver extends AsyncObserverX { + private _observer: AsyncObserver; + private _selector: (value: T, index: number, signal?: AbortSignal) => Promise | R; + private _thisArg?: any; + private _signal?: AbortSignal; + private _index: number = 0; + + constructor( + observer: AsyncObserver, + selector: (value: T, index: number, signal?: AbortSignal) => Promise | R, + thisArg?: any, + signal?: AbortSignal + ) { + super(); + this._observer = observer; + this._selector = selector; + this._thisArg = thisArg; + this._signal = signal; + } + + async _next(value: T) { + let res; + try { + res = await this._selector.call(this._thisArg, value, this._index++, this._signal); + } catch (e) { + await this._observer.error(e); + return; + } + + await this._observer.next(res); + } + + async _error(err: any) { + await this._observer.error(err); + } + + async _complete() { + await this._observer.complete(); + } +} + +class MapObservable extends AsyncObservableX { + private _source: AsyncObservable; + private _selector: (value: T, index: number, signal?: AbortSignal) => Promise | R; + private _thisArg?: any; + + constructor( + source: AsyncObservable, + selector: (value: T, index: number, signal?: AbortSignal) => Promise | R, + thisArg?: any + ) { + super(); + this._source = source; + this._selector = selector; + this._thisArg = thisArg; + } + + async _subscribeAsync( + observer: AsyncObserver, + signal?: AbortSignal + ): Promise { + return await subscribeSafe( + this._source, + new MapObserver(observer, this._selector, this._thisArg, signal), + signal + ); + } +} + +/** + * Projects each element of an async-observable sequence into a new form. + * + * @export + * @template TSource The type of the elements in the source sequence. + * @template TResult The type of the elements in the result sequence, obtained by running the selector + * function for each element in the source sequence. + * @param {((value: TSource, index: number, signal?: AbortSignal) => Promise | TResult)} selector A transform function + * to apply to each source element. + * @param {*} [thisArg] Optional this for binding to the selector. + * @returns {OperatorAsyncObservableFunction} An async-observable sequence whose elements are the result of invoking the transform + * function on each element of source. + */ +export function map( + selector: (value: TSource, index: number) => Promise | TResult, + thisArg?: any +): OperatorAsyncObservableFunction { + return function mapOperatorFunction(source: AsyncObservable) { + return new MapObservable(source, selector, thisArg); + }; +} diff --git a/src/asyncobservable/operators/scan.ts b/src/asyncobservable/operators/scan.ts new file mode 100644 index 00000000..60b7d9b0 --- /dev/null +++ b/src/asyncobservable/operators/scan.ts @@ -0,0 +1,117 @@ +import { + AsyncObservable, + AsyncObserver, + AsyncSubscription, + OperatorAsyncObservableFunction, +} from '../../interfaces'; +import { AsyncObservableX } from '../asyncobservablex'; +import { AsyncObserverX } from '../asyncobserverx'; +import { subscribeSafe } from '../subscribesafe'; + +/** + * The options for performing a scan operation, including the callback and the optional seed. + * + * @export + * @interface ScanOptions + * @template T The type of the elements in the source sequence. + * @template R The type of the result for the reducer callback. + */ +export interface ScanOptions { + /** + * The optional seed used for the scan operation. + * + * @type {R} The type of the result + * @memberof ScanOptions + */ + seed?: R; + /** + * The callback used for the scan operation, which passes the accumulator, current value, the + * current index, and an Abort Signal. This returns a result or a Promise containing a result. + * + * @memberof ScanOptions + */ + callback: (accumulator: R, current: T, index: number, signal?: AbortSignal) => R | Promise; +} + +class ScanObserver extends AsyncObserverX { + private _observer: AsyncObserver; + private _fn: (acc: R, x: T, index: number, signal?: AbortSignal) => R | Promise; + private _seed?: T | R; + private _hasSeed: boolean; + private _signal?: AbortSignal; + private _index: number; + private _hasValue: boolean; + private _acc: T | R | undefined; + + constructor(observer: AsyncObserver, options: ScanOptions, signal?: AbortSignal) { + super(); + this._observer = observer; + this._fn = options['callback']; + this._hasSeed = options.hasOwnProperty('seed'); + this._seed = options['seed']; + this._signal = signal; + this._index = 0; + this._hasValue = false; + this._acc = this._seed; + } + + async _next(value: T): Promise { + if (this._hasValue || (this._hasValue = this._hasSeed)) { + try { + this._acc = await this._fn( this._acc, value, this._index++, this._signal); + } catch (e) { + await this._observer.error(e); + return; + } + } else { + this._acc = value; + this._hasValue = true; + this._index++; + } + + await this._observer.next( this._acc); + } + + async _error(err: any) { + await this._observer.error(err); + } + + async _complete() { + await this._observer.complete(); + } +} + +export class ScanAsyncObservable extends AsyncObservableX { + private _source: AsyncObservable; + private _options: ScanOptions; + + constructor(source: AsyncObservable, options: ScanOptions) { + super(); + this._source = source; + this._options = options; + } + + _subscribeAsync(observer: AsyncObserver, signal?: AbortSignal): Promise { + return subscribeSafe( + this._source, + new ScanObserver(observer, this._options, signal), + signal + ); + } +} + +/** + * Applies an accumulator function over an async-observable sequence and returns each intermediate result. + * The specified seed value, if given, is used as the initial accumulator value. + * + * @export + * @template T The type of the elements in the source sequence. + * @template R The type of the result of the aggregation. + * @param {ScanOptions} options The options including the accumulator function and seed. + * @returns {OperatorAsyncObservableFunction} An async-enumerable sequence containing the accumulated values. + */ +export function scan(options: ScanOptions): OperatorAsyncObservableFunction { + return function scanOperatorFunction(source: AsyncObservable): AsyncObservableX { + return new ScanAsyncObservable(source, options); + }; +} diff --git a/src/asyncobservable/subscribesafe.ts b/src/asyncobservable/subscribesafe.ts new file mode 100644 index 00000000..248735b0 --- /dev/null +++ b/src/asyncobservable/subscribesafe.ts @@ -0,0 +1,15 @@ +import { AsyncObservable, AsyncObserver, AsyncSubscription } from '../interfaces'; +import { AsyncSubscriptionX } from './subscriptions/asyncsubscriptionx'; + +export async function subscribeSafe( + observable: AsyncObservable, + observer: AsyncObserver, + signal?: AbortSignal +): Promise { + try { + return await observable.subscribeAsync(observer, signal); + } catch (e) { + await observer.error(e); + return AsyncSubscriptionX.empty(); + } +} diff --git a/src/asyncobservable/subscriptions/asyncsubscriptionx.ts b/src/asyncobservable/subscriptions/asyncsubscriptionx.ts new file mode 100644 index 00000000..8fae60cd --- /dev/null +++ b/src/asyncobservable/subscriptions/asyncsubscriptionx.ts @@ -0,0 +1,38 @@ +import { AsyncSubscription, SYMBOL_ASYNC_DISPOSABLE } from '../../interfaces'; + +const NOOP_PROMISE = Promise.resolve(undefined); + +class EmptySubscription implements AsyncSubscription { + async [SYMBOL_ASYNC_DISPOSABLE](): Promise { + await NOOP_PROMISE; + } +} + +const EMPTY_SUBSCRIPTION = new EmptySubscription(); + +class AnonymousSubscription implements AsyncSubscription { + private _fn?: () => Promise; + + constructor(fn: () => Promise) { + this._fn = fn; + } + + async [SYMBOL_ASYNC_DISPOSABLE](): Promise { + if (this._fn) { + await this._fn!(); + this._fn = undefined; + } else { + await NOOP_PROMISE; + } + } +} + +export class AsyncSubscriptionX { + static create(unsubscribe: () => Promise): AsyncSubscription { + return new AnonymousSubscription(unsubscribe); + } + + static empty(): AsyncSubscription { + return EMPTY_SUBSCRIPTION; + } +} diff --git a/src/interfaces.ts b/src/interfaces.ts index 94a45e98..01662e26 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -1,5 +1,6 @@ import { IterableX } from './iterable'; import { AsyncIterableX } from './asynciterable'; +import { AsyncObservableX } from './asyncobservable/asyncobservablex'; export type UnaryFunction = (source: T) => R; @@ -7,9 +8,75 @@ export type OperatorFunction = UnaryFunction, IterableX>; export type OperatorAsyncFunction = UnaryFunction, AsyncIterableX>; +export type OperatorAsyncObservableFunction = UnaryFunction< +AsyncObservable, +AsyncObservableX +>; + export type MonoTypeOperatorFunction = OperatorFunction; export type MonoTypeOperatorAsyncFunction = OperatorAsyncFunction; +export type MonoTypeOperatorAsyncObservableFunction = OperatorAsyncObservableFunction; + /** @ignore */ export type BufferLike = string | Buffer | Uint8Array; + +export const SYMBOL_ASYNC_DISPOSABLE = Symbol.for('AsyncDisposable'); + +export interface AsyncSubscription { + [SYMBOL_ASYNC_DISPOSABLE]: () => Promise; +} + +export interface NextAsyncObserver { + next: (value: T) => Promise; + error?: (err: any) => Promise; + complete?: () => Promise; +} + +export interface ErrorAsyncObserver { + next?: (value: T) => Promise; + error: (err: any) => Promise; + complete?: () => Promise; +} + +export interface CompletionAsyncObserver { + next?: (value: T) => Promise; + error?: (err: any) => Promise; + complete: () => Promise; +} + +export type PartialAsyncObserver = + | NextAsyncObserver + | ErrorAsyncObserver + | CompletionAsyncObserver; + +export interface AsyncObserver { + next: (value: T) => Promise; + error: (err: any) => Promise; + complete: () => Promise; +} + +export interface AsyncObservable { + subscribeAsync: ( + observer: PartialAsyncObserver, + signal?: AbortSignal + ) => Promise; +} + +export interface AsyncSubject + extends AsyncObserver, + AsyncObservable {} + +export interface AsyncScheduler { + now: number; + scheduleNowAsync( + action: (signal: AbortSignal) => Promise, + signal?: AbortSignal + ): Promise; + scheduleFutureAsync( + action: (signal: AbortSignal) => Promise, + dueTime: number, + signal?: AbortSignal + ): Promise; +}