Skip to content

Commit

Permalink
Merge branch 'master' into idiomaticscala
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelgruetter committed Sep 19, 2013
2 parents b236f89 + 9ec8b0e commit a078522
Show file tree
Hide file tree
Showing 22 changed files with 741 additions and 135 deletions.
10 changes: 10 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# RxJava Releases #

### Version 0.13.4 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.13.4%22)) ###

* [Pull 393](https://github.com/Netflix/RxJava/pull/393) Parallel Operator & ObserveOn/ScheduledObserver Fixes
* [Pull 394](https://github.com/Netflix/RxJava/pull/394) Change Interval and Sample default Scheduler
* [Pull 391](https://github.com/Netflix/RxJava/pull/391) Fix OSGI support for rxjava-scala

### Version 0.13.3

* Upload to Sonatype failed so version skipped

### Version 0.13.2 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.13.2%22)) ###

* [Pull 389](https://github.com/Netflix/RxJava/pull/389) Scala Adaptor Improvements
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.13.3-SNAPSHOT
version=0.13.5-SNAPSHOT
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package rx.lang.groovy

import org.junit.Test

import rx.Observable
import rx.Scheduler
import rx.concurrency.Schedulers
import rx.util.functions.Func1

class TestParallel {

@Test
public void testParallelOperator() {
Observable.range(0, 100)
.parallel({
it.map({ return it; })
})
.toBlockingObservable()
.forEach({ println("T: " + it + " Thread: " + Thread.currentThread()); });
}
}
2 changes: 1 addition & 1 deletion language-adaptors/rxjava-scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jar {
name = 'rxjava-scala'
instruction 'Bundle-Vendor', 'Netflix'
instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava'
instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,!org.scalatest.*,*'
instruction 'Fragment-Host', 'com.netflix.rxjava.core'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
* Observable, and that synchronously notifies its {@link Observer}s
*/
def synchronize: Observable[T] = {
Observable[T](JObservable.synchronize(asJava))
Observable[T](asJava.synchronize)
}

/**
Expand Down
245 changes: 205 additions & 40 deletions rxjava-core/src/main/java/rx/Observable.java

Large diffs are not rendered by default.

13 changes: 12 additions & 1 deletion rxjava-core/src/main/java/rx/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,23 @@ public Subscription call(Scheduler scheduler, Void state) {
}

/**
* Returns the scheduler's notion of current absolute time in milliseconds.
* @return the scheduler's notion of current absolute time in milliseconds.
*/
public long now() {
return System.currentTimeMillis();
}

/**
* Parallelism available to a Scheduler.
* <p>
* This defaults to {@code Runtime.getRuntime().availableProcessors()} but can be overridden for use cases such as scheduling work on a computer cluster.
*
* @return the scheduler's available degree of parallelism.
*/
public int degreeOfParallelism() {
return Runtime.getRuntime().availableProcessors();
}

public static class UnitTest {
@SuppressWarnings("unchecked") // mocking is unchecked, unfortunately
@Test
Expand Down
78 changes: 64 additions & 14 deletions rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
*/
package rx.concurrency;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import rx.Scheduler;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func2;
Expand All @@ -29,27 +32,74 @@
* Schedules work on a new thread.
*/
public class NewThreadScheduler extends Scheduler {
private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();

private final static NewThreadScheduler INSTANCE = new NewThreadScheduler();
private final static AtomicLong count = new AtomicLong();

public static NewThreadScheduler getInstance() {
return INSTANCE;
}

@Override
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
final SafeObservableSubscription subscription = new SafeObservableSubscription();
final Scheduler _scheduler = this;
private NewThreadScheduler() {

Thread t = new Thread(new Runnable() {
@Override
public void run() {
subscription.wrap(action.call(_scheduler, state));
}
}, "RxNewThreadScheduler");
}

t.start();
private static class EventLoopScheduler extends Scheduler {
private final ExecutorService executor;

return subscription;
private EventLoopScheduler() {
executor = Executors.newFixedThreadPool(1, new ThreadFactory() {

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet());
}
});
}

@Override
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
final Scheduler _scheduler = this;
return Subscriptions.from(executor.submit(new Runnable() {

@Override
public void run() {
action.call(_scheduler, state);
}
}));
}

@Override
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, final long delayTime, final TimeUnit unit) {
// we will use the system scheduler since it doesn't make sense to launch a new Thread and then sleep
// we will instead schedule the event then launch the thread after the delay has passed
final Scheduler _scheduler = this;
final CompositeSubscription subscription = new CompositeSubscription();
ScheduledFuture<?> f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() {

@Override
public void run() {
if (!subscription.isUnsubscribed()) {
// when the delay has passed we now do the work on the actual scheduler
Subscription s = _scheduler.schedule(state, action);
// add the subscription to the CompositeSubscription so it is unsubscribed
subscription.add(s);
}
}
}, delayTime, unit);

// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
subscription.add(Subscriptions.create(f));

return subscription;
}

}

@Override
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
EventLoopScheduler s = new EventLoopScheduler();
return s.schedule(state, action);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public final class OperationInterval {
* Creates an event each time interval.
*/
public static OnSubscribeFunc<Long> interval(long interval, TimeUnit unit) {
return interval(interval, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor()));
return interval(interval, unit, Schedulers.threadPoolForComputation());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Test;
import org.mockito.InOrder;
Expand All @@ -33,6 +34,8 @@
import rx.Subscription;
import rx.concurrency.ImmediateScheduler;
import rx.concurrency.Schedulers;
import rx.subscriptions.CompositeSubscription;
import rx.util.functions.Func2;

/**
* Asynchronously notify Observers on the specified Scheduler.
Expand Down Expand Up @@ -60,7 +63,9 @@ public Subscription onSubscribe(final Observer<? super T> observer) {
// do nothing if we request ImmediateScheduler so we don't invoke overhead
return source.subscribe(observer);
} else {
return source.subscribe(new ScheduledObserver<T>(observer, scheduler));
CompositeSubscription s = new CompositeSubscription();
s.add(source.subscribe(new ScheduledObserver<T>(s, observer, scheduler)));
return s;
}
}
}
Expand Down
99 changes: 99 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationParallel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import static org.junit.Assert.*;

import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import rx.Observable;
import rx.Scheduler;
import rx.concurrency.Schedulers;
import rx.observables.GroupedObservable;
import rx.util.functions.Action1;
import rx.util.functions.Func0;
import rx.util.functions.Func1;

/**
* Identifies unit of work that can be executed in parallel on a given Scheduler.
*/
public final class OperationParallel<T> {

public static <T, R> Observable<R> parallel(Observable<T> source, Func1<Observable<T>, Observable<R>> f) {
return parallel(source, f, Schedulers.threadPoolForComputation());
}

public static <T, R> Observable<R> parallel(final Observable<T> source, final Func1<Observable<T>, Observable<R>> f, final Scheduler s) {
return Observable.defer(new Func0<Observable<R>>() {

@Override
public Observable<R> call() {
final AtomicInteger i = new AtomicInteger(0);
return source.groupBy(new Func1<T, Integer>() {

@Override
public Integer call(T t) {
return i.incrementAndGet() % s.degreeOfParallelism();
}

}).flatMap(new Func1<GroupedObservable<Integer, T>, Observable<R>>() {

@Override
public Observable<R> call(GroupedObservable<Integer, T> group) {
return f.call(group.observeOn(s));
}
}).synchronize();
}
});
}

public static class UnitTest {

@Test
public void testParallel() {
int NUM = 1000;
final AtomicInteger count = new AtomicInteger();
Observable.range(1, NUM).parallel(
new Func1<Observable<Integer>, Observable<Integer[]>>() {

@Override
public Observable<Integer[]> call(Observable<Integer> o) {
return o.map(new Func1<Integer, Integer[]>() {

@Override
public Integer[] call(Integer t) {
return new Integer[] { t, t * 99 };
}

});
}
}).toBlockingObservable().forEach(new Action1<Integer[]>() {

@Override
public void call(Integer[] v) {
count.incrementAndGet();
System.out.println("V: " + v[0] + " R: " + v[1] + " Thread: " + Thread.currentThread());
}

});

// just making sure we finish and get the number we expect
assertEquals(NUM, count.get());
}
}
}
25 changes: 14 additions & 11 deletions rxjava-core/src/main/java/rx/operators/OperationRetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.concurrency.Schedulers;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.MultipleAssignmentSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func2;

public class OperationRetry {

Expand Down Expand Up @@ -58,17 +60,19 @@ public Retry(Observable<T> source, int retryCount) {

@Override
public Subscription onSubscribe(Observer<? super T> observer) {
subscription.add(Schedulers.currentThread().schedule(attemptSubscription(observer)));
MultipleAssignmentSubscription rescursiveSubscription = new MultipleAssignmentSubscription();
subscription.add(Schedulers.currentThread().schedule(rescursiveSubscription, attemptSubscription(observer)));
subscription.add(rescursiveSubscription);
return subscription;
}

private Action0 attemptSubscription(final Observer<? super T> observer) {
return new Action0() {
private Func2<Scheduler, MultipleAssignmentSubscription, Subscription> attemptSubscription(final Observer<? super T> observer) {
return new Func2<Scheduler, MultipleAssignmentSubscription, Subscription>() {

@Override
public void call() {
public Subscription call(final Scheduler scheduler, final MultipleAssignmentSubscription rescursiveSubscription) {
attempts.incrementAndGet();
source.subscribe(new Observer<T>() {
return source.subscribe(new Observer<T>() {

@Override
public void onCompleted() {
Expand All @@ -79,10 +83,8 @@ public void onCompleted() {
public void onError(Throwable e) {
if ((retryCount == INFINITE_RETRY || attempts.get() <= retryCount) && !subscription.isUnsubscribed()) {
// retry again
// remove the last subscription since we have completed (so as we retry we don't build up a huge list)
subscription.removeLast();
// add the new subscription and schedule a retry
subscription.add(Schedulers.currentThread().schedule(attemptSubscription(observer)));
// add the new subscription and schedule a retry recursively
rescursiveSubscription.setSubscription(scheduler.schedule(rescursiveSubscription, attemptSubscription(observer)));
} else {
// give up and pass the failure
observer.onError(e);
Expand All @@ -96,6 +98,7 @@ public void onNext(T v) {
});

}

};
}

Expand Down Expand Up @@ -157,7 +160,7 @@ public void testRetrySuccess() {
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testInfiniteRetry() {
int NUM_FAILURES = 20;
Expand Down
Loading

0 comments on commit a078522

Please sign in to comment.