Skip to content

Commit

Permalink
Remove Pivot Operator
Browse files Browse the repository at this point in the history
  • Loading branch information
benjchristensen committed Jul 17, 2014
1 parent fc9dbd5 commit a4eb42e
Show file tree
Hide file tree
Showing 4 changed files with 0 additions and 953 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3808,62 +3808,6 @@ trait Observable[+T]
asJavaObservable.subscribe(onNext, onError, onComplete)
}

/**
* Pivots a sequence of `(K1, Observable[(K2, Observable[U])])`s emitted by an `Observable` so as to swap the group
* and and the set on which their items are grouped.
* <p>
* <img width="640" height="580" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/pivot.png">
*
* For example an `Observable` such as `this = Observable[(String, Observable[(Boolean, Observable[Integer])])`:
* <ul>
* <li>o1.odd: 1, 3, 5, 7, 9 on Thread 1</li>
* <li>o1.even: 2, 4, 6, 8, 10 on Thread 1</li>
* <li>o2.odd: 11, 13, 15, 17, 19 on Thread 2</li>
* <li>o2.even: 12, 14, 16, 18, 20 on Thread 2</li>
* </ul>
* is pivoted to become `this = Observable[(Boolean, Observable[(String, Observable[Integer])])`:
*
* <ul>
* <li>odd.o1: 1, 3, 5, 7, 9 on Thread 1</li>
* <li>odd.o2: 11, 13, 15, 17, 19 on Thread 2</li>
* <li>even.o1: 2, 4, 6, 8, 10 on Thread 1</li>
* <li>even.o2: 12, 14, 16, 18, 20 on Thread 2</li>
* </ul>
* <p>
* <img width="640" height="1140" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/pivot.ex.png">
* <p>
* <em>Note:</em> A `(K, Observable[_])` will cache the items it is to emit until such time as it
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* `(K, Observable[_])`s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like `take(0)` to them.
*
* @return an `Observable`containing a stream of nested `(K1, Observable[(K2, Observable[U])])`s with swapped
* inner-outer keys.
*/
def pivot[U, K1, K2](implicit evidence: Observable[T] <:< Observable[(K1, Observable[(K2, Observable[U])])]): Observable[(K2, Observable[(K1, Observable[U])])] = {
import rx.observables.{GroupedObservable => JGroupedObservable}
val f1 = new Func1[(K1, Observable[(K2, Observable[U])]), JGroupedObservable[K1, JGroupedObservable[K2, U]]]() {
override def call(t1: (K1, Observable[(K2, Observable[U])])): JGroupedObservable[K1, JGroupedObservable[K2, U]] = {
val jo = t1._2.asJavaObservable.asInstanceOf[rx.Observable[(K2, Observable[U])]].map[JGroupedObservable[K2, U]](new Func1[(K2, Observable[U]), JGroupedObservable[K2, U]]() {
override def call(t2: (K2, Observable[U])): JGroupedObservable[K2, U] = {
JGroupedObservable.from(t2._1, t2._2.asJavaObservable.asInstanceOf[rx.Observable[U]])
}
})
JGroupedObservable.from(t1._1, jo)
}
}
val o1: Observable[(K1, Observable[(K2, Observable[U])])] = this
val o2 = toScalaObservable[JGroupedObservable[K2, JGroupedObservable[K1, U]]](rx.Observable.pivot(o1.asJavaObservable.map(f1)))
o2.map {
(jgo1: JGroupedObservable[K2, JGroupedObservable[K1, U]]) => {
val jo = jgo1.map[(K1, Observable[U])](new Func1[JGroupedObservable[K1, U], (K1, Observable[U])]() {
override def call(jgo2: JGroupedObservable[K1, U]): (K1, Observable[U]) = (jgo2.getKey, toScalaObservable[U](jgo2))
})
(jgo1.getKey, toScalaObservable[(K1, Observable[U])](jo))
}
}
}

/**
* Returns an Observable that counts the total number of items emitted by the source Observable and emits this count as a 64-bit Long.
*
Expand Down
44 changes: 0 additions & 44 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2393,50 +2393,6 @@ public final static <T> Observable<Observable<T>> parallelMerge(Observable<Obser
return OperatorParallelMerge.parallelMerge(source, parallelObservables, scheduler);
}

/**
* Pivots a sequence of {@code GroupedObservable}s emitted by an {@code Observable} so as to swap the group
* and and the set on which their items are grouped.
* <p>
* <img width="640" height="580" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/pivot.png" alt="">
* <p>
* For example an {@code Observable} such as this =&gt;
*
* {@code Observable<GroupedObservable<String, GroupedObservable<Boolean, Integer>>>}:
* <ul>
* <li>o1.odd: 1, 3, 5, 7, 9 on Thread 1</li>
* <li>o1.even: 2, 4, 6, 8, 10 on Thread 1</li>
* <li>o2.odd: 11, 13, 15, 17, 19 on Thread 2</li>
* <li>o2.even: 12, 14, 16, 18, 20 on Thread 2</li>
* </ul>
* is pivoted to become this =&gt;
*
* {@code Observable<GroupedObservable<Boolean, GroupedObservable<String, Integer>>>}:
* <ul>
* <li>odd.o1: 1, 3, 5, 7, 9 on Thread 1</li>
* <li>odd.o2: 11, 13, 15, 17, 19 on Thread 2</li>
* <li>even.o1: 2, 4, 6, 8, 10 on Thread 1</li>
* <li>even.o2: 12, 14, 16, 18, 20 on Thread 2</li>
* </ul>
* <p>
* <img width="640" height="1140" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/pivot.ex.png" alt="">
* <p>
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like {@link #take}{@code (0)} to them.
* <p>
* {@code pivot} does not operate by default on a particular {@link Scheduler}.
*
* @param groups
the {@link GroupedObservable} to pivot
* @return an {@code Observable} containing a stream of nested {@code GroupedObservable}s with swapped
* inner-outer keys.
* @since 0.17
*/
public static final <K1, K2, T> Observable<GroupedObservable<K2, GroupedObservable<K1, T>>> pivot(Observable<GroupedObservable<K1, GroupedObservable<K2, T>>> groups) {
return groups.lift(new OperatorPivot<K1, K2, T>());
}

/**
* Returns an Observable that emits a sequence of Integers within a specified range.
* <p>
Expand Down
Loading

0 comments on commit a4eb42e

Please sign in to comment.