Skip to content

Commit

Permalink
Merge pull request #573 from akarnokd/OpeningClosingRemoved
Browse files Browse the repository at this point in the history
Removed Opening and Closing historical artifacts.
  • Loading branch information
benjchristensen committed Dec 8, 2013
2 parents a1f9988 + b7ac052 commit ae866be
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 269 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ trait Observable[+T]
import scala.collection.Seq
import scala.concurrent.duration.{Duration, TimeUnit}
import rx.util.functions._
import rx.lang.scala.util._
import rx.lang.scala.observables.BlockingObservable
import ImplicitFunctionConversions._
import JavaConversions._
Expand Down Expand Up @@ -302,45 +301,44 @@ trait Observable[+T]
* Creates an Observable which produces buffers of collected values.
*
* This Observable produces connected non-overlapping buffers. The current buffer is
* emitted and replaced with a new buffer when the Observable produced by the specified function produces a [[rx.lang.scala.util.Closing]] object. The function will then
* emitted and replaced with a new buffer when the Observable produced by the specified function produces an object. The function will then
* be used to create a new Observable to listen for the end of the next buffer.
*
* @param closings
* The function which is used to produce an [[rx.lang.scala.Observable]] for every buffer created.
* When this [[rx.lang.scala.Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated buffer
* When this [[rx.lang.scala.Observable]] produces an object, the associated buffer
* is emitted and replaced with a new one.
* @return
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping buffers, which are emitted
* when the current [[rx.lang.scala.Observable]] created with the function argument produces a [[rx.lang.scala.util.Closing]] object.
* when the current [[rx.lang.scala.Observable]] created with the function argument produces an object.
*/
def buffer(closings: () => Observable[Closing]) : Observable[Seq[T]] = {
def buffer[Closing](closings: () => Observable[_ <: Closing]) : Observable[Seq[T]] = {
val f: Func0[_ <: rx.Observable[_ <: Closing]] = closings().asJavaObservable
val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(f)
val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer[Closing](f)
Observable.jObsOfListToScObsOfSeq(jObs.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
}

/**
* Creates an Observable which produces buffers of collected values.
*
* This Observable produces buffers. Buffers are created when the specified `openings`
* Observable produces a [[rx.lang.scala.util.Opening]] object. Additionally the function argument
* Observable produces an object. Additionally the function argument
* is used to create an Observable which produces [[rx.lang.scala.util.Closing]] objects. When this
* Observable produces such an object, the associated buffer is emitted.
*
* @param openings
* The [[rx.lang.scala.Observable]] which, when it produces a [[rx.lang.scala.util.Opening]] object, will cause
* The [[rx.lang.scala.Observable]] which, when it produces an object, will cause
* another buffer to be created.
* @param closings
* The function which is used to produce an [[rx.lang.scala.Observable]] for every buffer created.
* When this [[rx.lang.scala.Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated buffer
* When this [[rx.lang.scala.Observable]] produces an object, the associated buffer
* is emitted.
* @return
* An [[rx.lang.scala.Observable]] which produces buffers which are created and emitted when the specified [[rx.lang.scala.Observable]]s publish certain objects.
*/
def buffer(openings: Observable[Opening], closings: Opening => Observable[Closing]): Observable[Seq[T]] = {
def buffer[Opening, Closing](openings: Observable[Opening], closings: Opening => Observable[Closing]): Observable[Seq[T]] = {
val opening: rx.Observable[_ <: Opening] = openings.asJavaObservable
val closing: Func1[Opening, _ <: rx.Observable[_ <: Closing]] = (o: Opening) => closings(o).asJavaObservable
val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(opening, closing)
val closing: Func1[_ >: Opening, _ <: rx.Observable[_ <: Closing]] = (o: Opening) => closings(o).asJavaObservable
val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer[Opening, Closing](opening, closing)
Observable.jObsOfListToScObsOfSeq(jObs.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
}

Expand Down Expand Up @@ -512,22 +510,22 @@ trait Observable[+T]
/**
* Creates an Observable which produces windows of collected values. This Observable produces connected
* non-overlapping windows. The current window is emitted and replaced with a new window when the
* Observable produced by the specified function produces a [[rx.lang.scala.util.Closing]] object.
* Observable produced by the specified function produces an object.
* The function will then be used to create a new Observable to listen for the end of the next
* window.
*
* @param closings
* The function which is used to produce an [[rx.lang.scala.Observable]] for every window created.
* When this [[rx.lang.scala.Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated window
* When this [[rx.lang.scala.Observable]] produces an object, the associated window
* is emitted and replaced with a new one.
* @return
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows, which are emitted
* when the current [[rx.lang.scala.Observable]] created with the function argument produces a [[rx.lang.scala.util.Closing]] object.
* when the current [[rx.lang.scala.Observable]] created with the function argument produces an object.
*/
def window(closings: () => Observable[Closing]): Observable[Observable[T]] = {
def window[Closing](closings: () => Observable[Closing]): Observable[Observable[T]] = {
val func : Func0[_ <: rx.Observable[_ <: Closing]] = closings().asJavaObservable
val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window(func)
val o2 = toScalaObservable[rx.Observable[_]](o1).map((x: rx.Observable[_]) => {
val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window[Closing](func)
val o2 = Observable[rx.Observable[_]](o1).map((x: rx.Observable[_]) => {
val x2 = x.asInstanceOf[rx.Observable[_ <: T]]
toScalaObservable[T](x2)
})
Expand All @@ -536,23 +534,23 @@ trait Observable[+T]

/**
* Creates an Observable which produces windows of collected values. This Observable produces windows.
* Chunks are created when the specified `openings` Observable produces a [[rx.lang.scala.util.Opening]] object.
* Chunks are created when the specified `openings` Observable produces an object.
* Additionally the `closings` argument is used to create an Observable which produces [[rx.lang.scala.util.Closing]] objects.
* When this Observable produces such an object, the associated window is emitted.
*
* @param openings
* The [[rx.lang.scala.Observable]] which when it produces a [[rx.lang.scala.util.Opening]] object, will cause
* The [[rx.lang.scala.Observable]] which when it produces an object, will cause
* another window to be created.
* @param closings
* The function which is used to produce an [[rx.lang.scala.Observable]] for every window created.
* When this [[rx.lang.scala.Observable]] produces a [[rx.lang.scala.util.Closing]] object, the associated window
* When this [[rx.lang.scala.Observable]] produces an object, the associated window
* is emitted.
* @return
* An [[rx.lang.scala.Observable]] which produces windows which are created and emitted when the specified [[rx.lang.scala.Observable]]s publish certain objects.
*/
def window(openings: Observable[Opening], closings: Opening => Observable[Closing]) = {
def window[Opening, Closing](openings: Observable[Opening], closings: Opening => Observable[Closing]) = {
Observable.jObsOfJObsToScObsOfScObs(
asJavaObservable.window(openings.asJavaObservable, (op: Opening) => closings(op).asJavaObservable))
asJavaObservable.window[Opening, Closing](openings.asJavaObservable, (op: Opening) => closings(op).asJavaObservable))
: Observable[Observable[T]] // SI-7818
}

Expand Down

This file was deleted.

56 changes: 27 additions & 29 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,7 @@
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
import rx.subscriptions.Subscriptions;
import rx.util.Closing;
import rx.util.OnErrorNotImplementedException;
import rx.util.Opening;
import rx.util.Range;
import rx.util.TimeInterval;
import rx.util.Timestamped;
Expand Down Expand Up @@ -2812,31 +2810,31 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> combineLates
return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, o7, o8, o9, combineFunction));
}

/**
/**
* Creates an Observable that produces buffers of collected items.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/buffer1.png">
* <p>
* This Observable produces connected, non-overlapping buffers. The current
* buffer is emitted and replaced with a new buffer when the Observable
* produced by the specified <code>bufferClosingSelector</code> produces a
* {@link rx.util.Closing} object. The <code>bufferClosingSelector</code>
* produced by the specified <code>bufferClosingSelector</code> produces an
* object. The <code>bufferClosingSelector</code>
* will then be used to create a new Observable to listen for the end of
* the next buffer.
*
* @param bufferClosingSelector the {@link Func0} which is used to produce
* an {@link Observable} for every buffer
* created. When this {@link Observable}
* produces a {@link rx.util.Closing} object,
* produces an object,
* the associated buffer is emitted and
* replaced with a new one.
* @return an {@link Observable} which produces connected, non-overlapping
* buffers, which are emitted when the current {@link Observable}
* created with the {@link Func0} argument produces a
* {@link rx.util.Closing} object
* created with the {@link Func0} argument produces an
* object
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#buffer">RxJava Wiki: buffer()</a>
*/
public Observable<List<T>> buffer(Func0<? extends Observable<? extends Closing>> bufferClosingSelector) {
public <TClosing> Observable<List<T>> buffer(Func0<? extends Observable<? extends TClosing>> bufferClosingSelector) {
return create(OperationBuffer.buffer(this, bufferClosingSelector));
}

Expand All @@ -2846,26 +2844,26 @@ public Observable<List<T>> buffer(Func0<? extends Observable<? extends Closing>>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/buffer2.png">
* <p>
* This Observable produces buffers. Buffers are created when the specified
* <code>bufferOpenings</code> Observable produces a {@link rx.util.Opening}
* <code>bufferOpenings</code> Observable produces an
* object. Additionally the <code>bufferClosingSelector</code> argument is
* used to create an Observable which produces {@link rx.util.Closing}
* used to create an Observable which produces
* objects. When this Observable produces such an object, the associated
* buffer is emitted.
*
* @param bufferOpenings the {@link Observable} that, when it produces a
* {@link rx.util.Opening} object, will cause another
* @param bufferOpenings the {@link Observable} that, when it produces an
* object, will cause another
* buffer to be created
* @param bufferClosingSelector the {@link Func1} that is used to produce
* an {@link Observable} for every buffer
* created. When this {@link Observable}
* produces a {@link rx.util.Closing} object,
* produces an object,
* the associated buffer is emitted.
* @return an {@link Observable} that produces buffers that are created and
* emitted when the specified {@link Observable}s publish certain
* objects
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#buffer">RxJava Wiki: buffer()</a>
*/
public Observable<List<T>> buffer(Observable<? extends Opening> bufferOpenings, Func1<Opening, ? extends Observable<? extends Closing>> bufferClosingSelector) {
public <TOpening, TClosing> Observable<List<T>> buffer(Observable<? extends TOpening> bufferOpenings, Func1<? super TOpening, ? extends Observable<? extends TClosing>> bufferClosingSelector) {
return create(OperationBuffer.buffer(this, bufferOpenings, bufferClosingSelector));
}

Expand Down Expand Up @@ -3062,54 +3060,54 @@ public Observable<List<T>> buffer(long timespan, long timeshift, TimeUnit unit,
* Creates an Observable that produces windows of collected items. This
* Observable produces connected, non-overlapping windows. The current
* window is emitted and replaced with a new window when the Observable
* produced by the specified <code>closingSelector</code> produces a
* {@link rx.util.Closing} object. The <code>closingSelector</code> will
* produced by the specified <code>closingSelector</code> produces an
* object. The <code>closingSelector</code> will
* then be used to create a new Observable to listen for the end of the next
* window.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/window1.png">
*
* @param closingSelector the {@link Func0} used to produce an
* {@link Observable} for every window created. When this
* {@link Observable} emits a {@link rx.util.Closing} object, the
* {@link Observable} emits an object, the
* associated window is emitted and replaced with a new one.
* @return an {@link Observable} that produces connected, non-overlapping
* windows, which are emitted when the current {@link Observable}
* created with the <code>closingSelector</code> argument emits a
* {@link rx.util.Closing} object.
* created with the <code>closingSelector</code> argument emits an
* object.
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#window">RxJava Wiki: window()</a>
*/
public Observable<Observable<T>> window(Func0<? extends Observable<? extends Closing>> closingSelector) {
public <TClosing> Observable<Observable<T>> window(Func0<? extends Observable<? extends TClosing>> closingSelector) {
return create(OperationWindow.window(this, closingSelector));
}

/**
* Creates an Observable that produces windows of collected items. This
* Observable produces windows. Chunks are created when the
* <code>windowOpenings</code> Observable produces a {@link rx.util.Opening}
* <code>windowOpenings</code> Observable produces an
* object. Additionally the <code>closingSelector</code> argument creates an
* Observable that produces {@link rx.util.Closing} objects. When this
* Observable that produces objects. When this
* Observable produces such an object, the associated window is emitted.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/window2.png">
*
* @param windowOpenings the {@link Observable} that, when it produces a
* {@link rx.util.Opening} object, causes another
* @param windowOpenings the {@link Observable} that, when it produces an
* object, causes another
* window to be created
* @param closingSelector the {@link Func1} that produces an
* {@link Observable} for every window created. When
* this {@link Observable} produces a
* {@link rx.util.Closing} object, the associated
* this {@link Observable} produces an
* object, the associated
* window is emitted.
* @return an {@link Observable} that produces windows that are created and
* emitted when the specified {@link Observable}s publish certain
* objects
* @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#window">RxJava Wiki: window()</a>
*/
public Observable<Observable<T>> window(Observable<? extends Opening> windowOpenings, Func1<Opening, ? extends Observable<? extends Closing>> closingSelector) {
public <TOpening, TClosing> Observable<Observable<T>> window(Observable<? extends TOpening> windowOpenings, Func1<? super TOpening, ? extends Observable<? extends TClosing>> closingSelector) {
return create(OperationWindow.window(this, windowOpenings, closingSelector));
}

/**
* Creates an Observable that produces windows of collected items. This
* Observable produces connected, non-overlapping windows, each containing
Expand Down
Loading

0 comments on commit ae866be

Please sign in to comment.