diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index 7e8918b6ec..b9a17a4596 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -2,7 +2,7 @@ import Observable from './Observable'; import Scheduler from './Scheduler'; import ConnectableObservable from './observables/ConnectableObservable'; import Subject from './Subject' -import GroupSubject from './subjects/GroupSubject'; +import {GroupedObservable} from './operators/groupBy-support'; export interface CoreOperators { buffer?: (closingNotifier: Observable) => Observable; @@ -30,7 +30,7 @@ export interface CoreOperators { first?: (predicate?: (value: T, index: number, source: Observable) => boolean, resultSelector?: (value:T, index: number) => R, thisArg?: any, defaultValue?: any) => Observable; flatMap?: (project: ((x: T, ix: number) => Observable), projectResult?: (x: T, y: any, ix: number, iy: number) => R, concurrent?: number) => Observable; flatMapTo?: (observable: Observable, projectResult?: (x: T, y: any, ix: number, iy: number) => R, concurrent?: number) => Observable; - groupBy?: (keySelector: (value:T) => string, durationSelector?: (group:GroupSubject) => Observable, elementSelector?: (value:T) => R) => Observable; + groupBy?: (keySelector: (value:T) => string, elementSelector?: (value:T) => R, durationSelector?: (group: GroupedObservable) => Observable) => Observable>; ignoreElements?: () => Observable; last?: (predicate?: (value: T, index:number) => boolean, resultSelector?: (value: T, index: number) => R, thisArg?: any, defaultValue?: any) => Observable; every?: (predicate: (value: T, index:number) => boolean, thisArg?: any) => Observable; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 5dc59a6785..5e30269dae 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -150,7 +150,7 @@ observableProto.finally = _finally; import first from './operators/first'; observableProto.first = first; -import groupBy from './operators/groupBy'; +import {groupBy} from './operators/groupBy'; observableProto.groupBy = groupBy; import ignoreElements from './operators/ignoreElements'; diff --git a/src/Rx.ts b/src/Rx.ts index 75e4f0d74a..0de7c31461 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -127,7 +127,7 @@ observableProto.finally = _finally; import first from './operators/first'; observableProto.first = first; -import groupBy from './operators/groupBy'; +import {groupBy} from './operators/groupBy'; observableProto.groupBy = groupBy; import ignoreElements from './operators/ignoreElements'; diff --git a/src/operators/groupBy-support.ts b/src/operators/groupBy-support.ts new file mode 100644 index 0000000000..9027480260 --- /dev/null +++ b/src/operators/groupBy-support.ts @@ -0,0 +1,64 @@ +import Subscription from '../Subscription'; +import Subject from '../Subject'; +import Subscriber from '../Subscriber'; +import Observable from '../Observable'; + +export class RefCountSubscription extends Subscription { + primary: Subscription; + attemptedToUnsubscribePrimary: boolean = false; + count: number = 0; + + constructor() { + super(); + } + + setPrimary(subscription: Subscription) { + this.primary = subscription; + } + + unsubscribe() { + if (!this.isUnsubscribed && !this.attemptedToUnsubscribePrimary) { + this.attemptedToUnsubscribePrimary = true; + if (this.count === 0) { + super.unsubscribe(); + this.primary.unsubscribe(); + } + } + } +} + +export class GroupedObservable extends Observable { + constructor(public key: string, + private groupSubject: Subject, + private refCountSubscription: RefCountSubscription) { + super(); + } + + _subscribe(subscriber: Subscriber) { + const subscription = new Subscription(); + if (!this.refCountSubscription.isUnsubscribed) { + subscription.add(new InnerRefCountSubscription(this.refCountSubscription)); + } + subscription.add(this.groupSubject.subscribe(subscriber)); + return subscription; + } +} + +export class InnerRefCountSubscription extends Subscription { + constructor(private parent: RefCountSubscription) { + super(); + parent.count++; + } + + unsubscribe() { + if (!this.parent.isUnsubscribed && !this.isUnsubscribed) { + super.unsubscribe(); + this.parent.count--; + if (this.parent.count === 0 && this.parent.attemptedToUnsubscribePrimary) { + this.parent.unsubscribe(); + this.parent.primary.unsubscribe(); + } + } + } +} + diff --git a/src/operators/groupBy.ts b/src/operators/groupBy.ts index 4ec863a065..bdc287dd45 100644 --- a/src/operators/groupBy.ts +++ b/src/operators/groupBy.ts @@ -1,32 +1,38 @@ import Operator from '../Operator'; import Observer from '../Observer'; +import Subscription from '../Subscription'; import Subscriber from '../Subscriber'; import Observable from '../Observable'; import Subject from '../Subject'; import Map from '../util/Map'; import FastMap from '../util/FastMap'; -import GroupSubject from '../subjects/GroupSubject'; +import {RefCountSubscription, GroupedObservable, InnerRefCountSubscription} from './groupBy-support'; import tryCatch from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; import bindCallback from '../util/bindCallback'; -export default function groupBy(keySelector: (value: T) => string, - elementSelector?: (value: T) => R, - durationSelector?: (grouped: GroupSubject) => Observable): Observable> { - return this.lift(new GroupByOperator(keySelector, durationSelector, elementSelector)); +export function groupBy(keySelector: (value: T) => string, + elementSelector?: (value: T) => R, + durationSelector?: (grouped: GroupedObservable) => Observable): GroupByObservable { + return new GroupByObservable(this, keySelector, elementSelector, durationSelector); } -class GroupByOperator implements Operator { - constructor(private keySelector: (value: T) => string, - private durationSelector?: (grouped: GroupSubject) => Observable, - private elementSelector?: (value: T) => R) { +export class GroupByObservable extends Observable> { + constructor(public source: Observable, + private keySelector: (value: T) => string, + private elementSelector?: (value: T) => R, + private durationSelector?: (grouped: GroupedObservable) => Observable) { + super(); } - call(subscriber: Subscriber): Subscriber { - return new GroupBySubscriber( - subscriber, this.keySelector, this.durationSelector, this.elementSelector + _subscribe(subscriber) { + const refCountSubscription = new RefCountSubscription(); + const groupBySubscriber = new GroupBySubscriber( + subscriber, refCountSubscription, this.keySelector, this.elementSelector, this.durationSelector ); + refCountSubscription.setPrimary(this.source.subscribe(groupBySubscriber)); + return refCountSubscription; } } @@ -34,10 +40,13 @@ class GroupBySubscriber extends Subscriber { private groups = null; constructor(destination: Subscriber, + private refCountSubscription: RefCountSubscription, private keySelector: (value: T) => string, - private durationSelector?: (grouped: GroupSubject) => Observable, - private elementSelector?: (value: T) => R) { - super(destination); + private elementSelector?: (value: T) => R, + private durationSelector?: (grouped: GroupedObservable) => Observable) { + super(); + this.destination = destination; + this.add(destination); } _next(x: T) { @@ -53,27 +62,28 @@ class GroupBySubscriber extends Subscriber { groups = this.groups = typeof key === 'string' ? new FastMap() : new Map(); } - let group: GroupSubject = groups.get(key); + let group: Subject = groups.get(key); if (!group) { - groups.set(key, group = new GroupSubject(key)); + groups.set(key, group = new Subject()); + let groupedObservable = new GroupedObservable(key, group, this.refCountSubscription); if (durationSelector) { - let duration = tryCatch(durationSelector)(group); + let duration = tryCatch(durationSelector)(groupedObservable); if (duration === errorObject) { this.error(duration.e); } else { - this.add(duration._subscribe(new GroupDurationSubscriber(group, this))); + this.add(duration._subscribe(new GroupDurationSubscriber(key, group, this))); } } - this.destination.next(group); + this.destination.next(groupedObservable); } if (elementSelector) { let value = tryCatch(elementSelector)(x); if (value === errorObject) { - group.error(value.e); + this.error(value.e); } else { group.next(value); } @@ -111,26 +121,24 @@ class GroupBySubscriber extends Subscriber { } class GroupDurationSubscriber extends Subscriber { - constructor(private group: GroupSubject, + constructor(private key: string, + private group: Subject, private parent: GroupBySubscriber) { super(null); } _next(value: T) { - const group = this.group; - group.complete(); - this.parent.removeGroup(group.key); + this.group.complete(); + this.parent.removeGroup(this.key); } _error(err: any) { - const group = this.group; - group.error(err); - this.parent.removeGroup(group.key); + this.group.error(err); + this.parent.removeGroup(this.key); } _complete() { - const group = this.group; - group.complete(); - this.parent.removeGroup(group.key); + this.group.complete(); + this.parent.removeGroup(this.key); } } \ No newline at end of file diff --git a/src/subjects/GroupSubject.ts b/src/subjects/GroupSubject.ts deleted file mode 100644 index 8d42cf23de..0000000000 --- a/src/subjects/GroupSubject.ts +++ /dev/null @@ -1,7 +0,0 @@ -import Subject from '../Subject'; - -export default class GroupSubject extends Subject { - constructor(public key: string) { - super(); - } -} \ No newline at end of file