Skip to content

Commit

Permalink
Merge pull request ReactiveX#631 from benjchristensen/NewThreadSchedu…
Browse files Browse the repository at this point in the history
…ler-Daemon

Make NewThreadScheduler create Daemon threads
  • Loading branch information
benjchristensen committed Dec 17, 2013
2 parents 9c9076e + 3239222 commit d599c6c
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ private EventLoopScheduler() {

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet());
Thread t = new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet());
t.setDaemon(true);
return t;
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,8 @@ public void testParallelMerge() {

@Test
public void testNumberOfThreads() {
final ConcurrentHashMap<String, String> threads = new ConcurrentHashMap<String, String>();
Observable.merge(getStreams())
.toBlockingObservable().forEach(new Action1<String>() {

@Override
public void call(String o) {
System.out.println("o: " + o + " Thread: " + Thread.currentThread());
threads.put(Thread.currentThread().getName(), Thread.currentThread().getName());
}
});

// without injecting anything, the getStream() method uses Interval which runs on a default scheduler
assertEquals(Runtime.getRuntime().availableProcessors(), threads.keySet().size());

// clear
threads.clear();

// now we parallelMerge into 3 streams and observeOn for each
final ConcurrentHashMap<Long, Long> threads = new ConcurrentHashMap<Long, Long>();
// parallelMerge into 3 streams and observeOn for each
// we expect 3 threads in the output
OperationParallelMerge.parallelMerge(getStreams(), 3)
.flatMap(new Func1<Observable<String>, Observable<String>>() {
Expand All @@ -90,8 +74,8 @@ public Observable<String> call(Observable<String> o) {

@Override
public void call(String o) {
System.out.println("o: " + o + " Thread: " + Thread.currentThread());
threads.put(Thread.currentThread().getName(), Thread.currentThread().getName());
System.out.println("o: " + o + " Thread: " + Thread.currentThread().getId());
threads.put(Thread.currentThread().getId(), Thread.currentThread().getId());
}
});

Expand All @@ -100,7 +84,7 @@ public void call(String o) {

@Test
public void testNumberOfThreadsOnScheduledMerge() {
final ConcurrentHashMap<String, String> threads = new ConcurrentHashMap<String, String>();
final ConcurrentHashMap<Long, Long> threads = new ConcurrentHashMap<Long, Long>();

// now we parallelMerge into 3 streams and observeOn for each
// we expect 3 threads in the output
Expand All @@ -109,8 +93,8 @@ public void testNumberOfThreadsOnScheduledMerge() {

@Override
public void call(String o) {
System.out.println("o: " + o + " Thread: " + Thread.currentThread());
threads.put(Thread.currentThread().getName(), Thread.currentThread().getName());
System.out.println("o: " + o + " Thread: " + Thread.currentThread().getId());
threads.put(Thread.currentThread().getId(), Thread.currentThread().getId());
}
});

Expand Down

0 comments on commit d599c6c

Please sign in to comment.