Skip to content

1.0.5

Compare
Choose a tag to compare
@benjchristensen benjchristensen released this 04 Feb 07:03
· 1098 commits to 1.x since this release

This release includes many bug fixes along with a few new operators and enhancements.

Experimental Operators

This release adds a few experimental operators.

Note that these APIs may still change or be removed altogether since they are marked as @Experimental.

takeUntil(predicate)

This operator allows conditionally unsubscribing an Observable but inclusively emitting the final onNext. This differs from takeWhile which excludes the final onNext.

// takeUntil(predicate) example
Observable.just(1, 2, 3, 4, 5, 6, 7)
        .doOnEach(System.out::println)
        .takeUntil(i -> i == 3)
        .forEach(System.out::println);

// takeWhile(predicate) example
Observable.just(1, 2, 3, 4, 5, 6, 7)
        .doOnEach(System.out::println)
        .takeWhile(i -> i <= 3)
        .forEach(System.out::println);

This outputs:

// takeUntil(predicate)
[rx.Notification@30e84925 OnNext 1]
1
[rx.Notification@30e84926 OnNext 2]
2
[rx.Notification@30e84927 OnNext 3]
3

// takeWhile(predicate)
[rx.Notification@30e84925 OnNext 1]
1
[rx.Notification@30e84926 OnNext 2]
2
[rx.Notification@30e84927 OnNext 3]
3
[rx.Notification@30e84928 OnNext 4]

Note how takeWhile produces 4 values and takeUntil produces 3.

switchIfEmpty

The new switchIfEmpty operator is a companion to defaultIfEmpty that switches to a different Observable if the primary Observable is empty.

Observable.empty()
        .switchIfEmpty(Observable.just(1, 2, 3))
        .forEach(System.out::println);

Enhancements

merge(maxConcurrent) with backpressure

This release adds backpressure to merge(maxConcurrent) so that horizontal buffer bloat can also be controll with the maxConcurrent parameter.

This allows parallel execution such as the following to work with backpressure:

public class MergeMaxConcurrent {

    public static void main(String[] args) {
        // define 1,000,000 async tasks
        Observable<Observable<Integer>> asyncWork = range(1, 1000000)
                      .doOnNext(i -> System.out.println("Value: " + i))
                .doOnRequest(r -> System.out.println("request1 " + r))
                .map(item -> {
                    return just(item)
                            // simulate slow IO or computation
                            .doOnNext(MergeMaxConcurrent::sleep)
                            .subscribeOn(Schedulers.io());
                })
                .doOnRequest(r -> System.out.println("request2 " + r));

        // allow 10 outstanding tasks at a time
        merge(asyncWork, 10).toBlocking().forEach(System.out::println);
    }

    public static void sleep(int value) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

In prior versions all 1,000,000 tasks are immediately emitted and queued for execution. As of this release it correctly allows 10 at a time to be emitted.

Changes

  • Pull 2493 Experimental Operator TakeUntil with predicate
  • Pull 2585 Experimental Operator: switchIfEmpty
  • Pull 2470 Experimental Subject state information methods & bounded ReplaySubject termination
  • Pull 2540 Merge with max concurrency now supports backpressure.
  • Pull 2332 Operator retry test fix attempt
  • Pull 2244 OperatorTakeLast add check for isUnsubscribed to fast path
  • Pull 2469 Remove the execute permission from source files
  • Pull 2455 Fix for #2191 - OperatorMulticast fails to unsubscribe from source
  • Pull 2474 MergeTest.testConcurrency timeout to let other tests run
  • Pull 2335 A set of stateless operators that don't need to be instantiated
  • Pull 2447 Fail early if a null subscription is added to a CompositeSubscription.
  • Pull 2475 SynchronousQueue.clone fix
  • Pull 2477 Backpressure tests fix0121
  • Pull 2476 Fixed off-by-one error and value-drop in the window operator.
  • Pull 2478 RefCountAsync: adjusted time values as 1 ms is unreliable
  • Pull 2238 Fix the bug that cache doesn't unsubscribe the source Observable when the source is terminated
  • Pull 1840 Unsubscribe when thread is interrupted
  • Pull 2471 Fixes NPEs reported in #1702 by synchronizing queue.
  • Pull 2482 Merge: fixed hangs & missed scalar emissions
  • Pull 2547 Warnings cleanup
  • Pull 2465 ScheduledExecutorService: call purge periodically on JDK 6 to avoid cancelled task-retention
  • Pull 2591 Changed the naming of the NewThreadWorker's system parameters
  • Pull 2543 OperatorMerge handle request overflow
  • Pull 2548 Subscriber.request should throw exception if negative request made
  • Pull 2550 Subscriber.onStart requests should be additive (and check for overflow)
  • Pull 2553 RxRingBuffer with synchronization
  • Pull 2565 Obstruction detection in tests.
  • Pull 2563 Retry backpressure test: split error conditions into separate test lines.
  • Pull 2572 Give more time to certain concurrency tests.
  • Pull 2559 OnSubscribeFromIterable - add request overflow check
  • Pull 2574 SizeEviction test needs to return false
  • Pull 2561 Updating queue code from JCTools
  • Pull 2566 CombineLatest: fixed concurrent requestUpTo yielding -1 requests
  • Pull 2552 Publish: fixed incorrect subscriber requested accounting
  • Pull 2583 Added perf tests for various container-like subscriptions
  • Pull 1955 OnBackpressureXXX: support for common drain manager & fix for former concurrency bugs
  • Pull 2590 Zip: fixed unbounded downstream requesting above Long.MAX_VALUE
  • Pull 2589 Repeat/retry: fixed unbounded downstream requesting above Long.MAX_VALUE
  • Pull 2567 RefCount: disconnect all if upstream terminates
  • Pull 2593 Zip: emit onCompleted without waiting for request + avoid re-reading fields

Artifacts: Maven Central