diff --git a/language-adaptors/rxjava-groovy/src/examples/groovy/rx/lang/groovy/examples/VideoExample.groovy b/language-adaptors/rxjava-groovy/src/examples/groovy/rx/lang/groovy/examples/VideoExample.groovy index 9d0034139d..9f0d97dc23 100644 --- a/language-adaptors/rxjava-groovy/src/examples/groovy/rx/lang/groovy/examples/VideoExample.groovy +++ b/language-adaptors/rxjava-groovy/src/examples/groovy/rx/lang/groovy/examples/VideoExample.groovy @@ -18,6 +18,7 @@ package rx.lang.groovy.examples; import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subscriptions.BooleanSubscription; import rx.util.functions.Func1; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -66,8 +67,9 @@ static void main(String[] args) { * [id:1000, title:video-1000-title, length:5428, bookmark:0, * rating:[actual:4, average:3, predicted:0]] */ -def Observable getVideoGridForDisplay(userId) { - getListOfLists(userId).mapMany({ VideoList list -> +Observable getVideoGridForDisplay(userId) { + // take the first 5 lists + getListOfLists(userId).take(5).mapMany({ VideoList list -> // for each VideoList we want to fetch the videos list.getVideos() .take(10) // we only want the first 10 of each list @@ -103,29 +105,23 @@ def Observable getVideoGridForDisplay(userId) { * * Observable is the "push" equivalent to List */ -def Observable getListOfLists(userId) { +Observable getListOfLists(userId) { return Observable.create({ observer -> - AtomicBoolean isRunning = new AtomicBoolean(true); + BooleanSubscription subscription = new BooleanSubscription(); // this will happen on a separate thread as it requires a network call - executor.execute(new Runnable() { - def void run() { + executor.execute({ // simulate network latency Thread.sleep(180); for(i in 0..15) { - if(!isRunning.get()) { - // we have received an unsubscribe + if(subscription.isUnsubscribed()) { break; } //println("****** emitting list: " + i) observer.onNext(new VideoList(i)) } observer.onCompleted(); - } }) - return Observable.createSubscription({ - // see https://github.com/Netflix/RxJava/issues/173 for a possibly simpler way of doing this - isRunning.set(false); - }); + return subscription; }) } @@ -139,15 +135,15 @@ class VideoList { this.listPosition = position } - def String getListName() { + String getListName() { return "ListName-" + listPosition } - def Integer getListPosition() { + Integer getListPosition() { return listPosition } - def Observable