Skip to content

Commit

Permalink
refactor(Subject): introduce interface to Subject
Browse files Browse the repository at this point in the history
  • Loading branch information
kwonoj committed Jan 16, 2017
1 parent dd925a8 commit 960a641
Showing 1 changed file with 20 additions and 17 deletions.
37 changes: 20 additions & 17 deletions src/Subject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,43 @@ export class SubjectSubscriber<T> extends Subscriber<T> {
}
}

export interface ISubject<T> extends ISubscription, Observable<T> {
readonly observers: ReadonlyArray<Observer<T>>;
readonly closed: boolean;
readonly isStopped: boolean;
asObservable(): Observable<T>;
}

/**
* @class Subject<T>
*/
export class Subject<T> extends Observable<T> implements ISubscription {
export class Subject<T> extends Observable<T> implements ISubject<T> {

[$$rxSubscriber]() {
return new SubjectSubscriber(this);
}

observers: Observer<T>[] = [];

closed = false;

isStopped = false;

hasError = false;

thrownError: any = null;
protected hasError = false;
protected thrownError: any = null;

constructor() {
super();
}

static create: Function = <T>(destination: Observer<T>, source: Observable<T>): AnonymousSubject<T> => {
static create: Function = <T>(destination: Observer<T>, source: Observable<T>): ISubject<T> => {
return new AnonymousSubject<T>(destination, source);
}

lift<R>(operator: Operator<T, R>): Observable<T> {
public lift<R>(operator: Operator<T, R>): Observable<T> {
const subject = new AnonymousSubject(this, this);
subject.operator = <any>operator;
return <any>subject;
}

next(value?: T) {
public next(value?: T): void {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
Expand All @@ -63,7 +66,7 @@ export class Subject<T> extends Observable<T> implements ISubscription {
}
}

error(err: any) {
public error(err: any): void {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
Expand All @@ -79,7 +82,7 @@ export class Subject<T> extends Observable<T> implements ISubscription {
this.observers.length = 0;
}

complete() {
public complete(): void {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
Expand All @@ -93,7 +96,7 @@ export class Subject<T> extends Observable<T> implements ISubscription {
this.observers.length = 0;
}

unsubscribe() {
public unsubscribe(): void {
this.isStopped = true;
this.closed = true;
this.observers = null;
Expand All @@ -114,7 +117,7 @@ export class Subject<T> extends Observable<T> implements ISubscription {
}
}

asObservable(): Observable<T> {
public asObservable(): Observable<T> {
const observable = new Observable<T>();
(<any>observable).source = this;
return observable;
Expand All @@ -130,21 +133,21 @@ export class AnonymousSubject<T> extends Subject<T> {
this.source = source;
}

next(value: T) {
public next(value: T): void {
const { destination } = this;
if (destination && destination.next) {
destination.next(value);
}
}

error(err: any) {
public error(err: any): void {
const { destination } = this;
if (destination && destination.error) {
this.destination.error(err);
}
}

complete() {
public complete(): void {
const { destination } = this;
if (destination && destination.complete) {
this.destination.complete();
Expand Down

0 comments on commit 960a641

Please sign in to comment.