Skip to content

Commit

Permalink
Remove Pivot Operator
Browse files Browse the repository at this point in the history
  • Loading branch information
benjchristensen committed Jul 17, 2014
1 parent 63e5773 commit bebca3d
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 72 deletions.
16 changes: 0 additions & 16 deletions src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1424,20 +1424,4 @@ class RxScalaDemo extends JUnitSuite {
o.take(3).toBlocking.foreach(println)
}

@Test def pivotExample() {
val o1 = (1 to 20).toObservable.groupBy(i => if (i <= 10) "x" else "y").map {
case (t: String, o: Observable[Int]) => (t, o.groupBy(i => i % 2 == 0))
}
println("o1:")
(for ((k1, o) <- o1;
(k2, vs) <- o;
v <- vs
) yield (k1, k2, v)).subscribe(println(_))
val o2 = o1.pivot
println("o2:")
(for ((k1, o) <- o2;
(k2, vs) <- o;
v <- vs
) yield (k1, k2, v)).subscribe(println(_))
}
}
56 changes: 0 additions & 56 deletions src/main/scala/rx/lang/scala/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3808,62 +3808,6 @@ trait Observable[+T]
asJavaObservable.subscribe(onNext, onError, onComplete)
}

/**
* Pivots a sequence of `(K1, Observable[(K2, Observable[U])])`s emitted by an `Observable` so as to swap the group
* and and the set on which their items are grouped.
* <p>
* <img width="640" height="580" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/pivot.png">
*
* For example an `Observable` such as `this = Observable[(String, Observable[(Boolean, Observable[Integer])])`:
* <ul>
* <li>o1.odd: 1, 3, 5, 7, 9 on Thread 1</li>
* <li>o1.even: 2, 4, 6, 8, 10 on Thread 1</li>
* <li>o2.odd: 11, 13, 15, 17, 19 on Thread 2</li>
* <li>o2.even: 12, 14, 16, 18, 20 on Thread 2</li>
* </ul>
* is pivoted to become `this = Observable[(Boolean, Observable[(String, Observable[Integer])])`:
*
* <ul>
* <li>odd.o1: 1, 3, 5, 7, 9 on Thread 1</li>
* <li>odd.o2: 11, 13, 15, 17, 19 on Thread 2</li>
* <li>even.o1: 2, 4, 6, 8, 10 on Thread 1</li>
* <li>even.o2: 12, 14, 16, 18, 20 on Thread 2</li>
* </ul>
* <p>
* <img width="640" height="1140" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/pivot.ex.png">
* <p>
* <em>Note:</em> A `(K, Observable[_])` will cache the items it is to emit until such time as it
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* `(K, Observable[_])`s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like `take(0)` to them.
*
* @return an `Observable`containing a stream of nested `(K1, Observable[(K2, Observable[U])])`s with swapped
* inner-outer keys.
*/
def pivot[U, K1, K2](implicit evidence: Observable[T] <:< Observable[(K1, Observable[(K2, Observable[U])])]): Observable[(K2, Observable[(K1, Observable[U])])] = {
import rx.observables.{GroupedObservable => JGroupedObservable}
val f1 = new Func1[(K1, Observable[(K2, Observable[U])]), JGroupedObservable[K1, JGroupedObservable[K2, U]]]() {
override def call(t1: (K1, Observable[(K2, Observable[U])])): JGroupedObservable[K1, JGroupedObservable[K2, U]] = {
val jo = t1._2.asJavaObservable.asInstanceOf[rx.Observable[(K2, Observable[U])]].map[JGroupedObservable[K2, U]](new Func1[(K2, Observable[U]), JGroupedObservable[K2, U]]() {
override def call(t2: (K2, Observable[U])): JGroupedObservable[K2, U] = {
JGroupedObservable.from(t2._1, t2._2.asJavaObservable.asInstanceOf[rx.Observable[U]])
}
})
JGroupedObservable.from(t1._1, jo)
}
}
val o1: Observable[(K1, Observable[(K2, Observable[U])])] = this
val o2 = toScalaObservable[JGroupedObservable[K2, JGroupedObservable[K1, U]]](rx.Observable.pivot(o1.asJavaObservable.map(f1)))
o2.map {
(jgo1: JGroupedObservable[K2, JGroupedObservable[K1, U]]) => {
val jo = jgo1.map[(K1, Observable[U])](new Func1[JGroupedObservable[K1, U], (K1, Observable[U])]() {
override def call(jgo2: JGroupedObservable[K1, U]): (K1, Observable[U]) = (jgo2.getKey, toScalaObservable[U](jgo2))
})
(jgo1.getKey, toScalaObservable[(K1, Observable[U])](jo))
}
}
}

/**
* Returns an Observable that counts the total number of items emitted by the source Observable and emits this count as a 64-bit Long.
*
Expand Down

0 comments on commit bebca3d

Please sign in to comment.