1.0.0-RC5
Pre-release- Pull 1729 CombineLatest: Request Up When Dropping Values
- Pull 1728 ObserveOn Error Propagation
- Pull 1727 Proposed groupBy/groupByUntil Changes
- Pull 1726 Fix Merge: backpressure + scalarValueQueue don't play nicely
- Pull 1720 Change repeatWhen and retryWhen signatures.
- Pull 1719 Fix Bug in the onBackpressure operators
groupBy/groupByUntil
The groupByUntil
operator was removed by collapsing its behavior into groupBy
. Previously on groupBy
when a child GroupedObservable
was unsubscribed it would internally retain the state and ignore all future onNext
for that key.
This matched behavior in Rx.Net but was found to be non-obvious and almost everyone using groupBy
on long-lived streams actually wanted the behavior of groupByUntil
where an unsubscribed GroupedObservable
would clean up the resources and then if onNext
for that key arrived again a new GroupedObservable
would be emitted.
Adding backpressure (reactive pull) to groupByUntil
was found to not work easily with its signatures so before 1.0 Final it was decided to collapse groupBy
and groupByUntil
. Further details on this can be found in Pull Request 1727.
Here is an example of how groupBy
now behaves when a child GroupedObservable
is unsubscribed (using take
here):
// odd/even into lists of 10
Observable.range(1, 100)
.groupBy(n -> n % 2 == 0)
.flatMap(g -> {
return g.take(10).toList();
}).forEach(System.out::println);
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
[21, 23, 25, 27, 29, 31, 33, 35, 37, 39]
[22, 24, 26, 28, 30, 32, 34, 36, 38, 40]
[41, 43, 45, 47, 49, 51, 53, 55, 57, 59]
[42, 44, 46, 48, 50, 52, 54, 56, 58, 60]
[61, 63, 65, 67, 69, 71, 73, 75, 77, 79]
[62, 64, 66, 68, 70, 72, 74, 76, 78, 80]
[81, 83, 85, 87, 89, 91, 93, 95, 97, 99]
[82, 84, 86, 88, 90, 92, 94, 96, 98, 100]
Previously this would have only emitted 2 groups and ignored all subsequent values:
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
On a finite stream, similar behavior of the previous groupBy
implementation that would filter can be achieved like this:
//odd/even into lists of 10
Observable.range(1, 100)
.groupBy(n -> n % 2 == 0)
.flatMap(g -> {
return g.filter(i -> i <= 20).toList();
}).forEach(System.out::println);
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
That however does allow the stream to complete (which may not be wanted).
To unsubscribe here are some choices that get the same output but efficiently unsubscribe up so the source only emits 40 values:
Observable.timer(0, 1, TimeUnit.MILLISECONDS)
.groupBy(n -> n % 2 == 0)
.flatMap(g -> {
return g.take(10).toList();
}).take(2).toBlocking().forEach(System.out::println);
or
Observable.timer(0, 1, TimeUnit.MILLISECONDS)
.take(20)
.groupBy(n -> n % 2 == 0)
.flatMap(g -> {
return g.toList();
}).toBlocking().forEach(System.out::println);
These show that now groupBy
composes like any other operator without the nuanced and hidden behavior of ignoring values after a child GroupedObservable
is unsubscribed.
Uses of groupByUntil
can now all be done by just using operators like take
, takeWhile
and takeUntil
on the GroupedObservable
directly, such as this:
Observable.from(Arrays.asList("a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c"))
.groupBy(n -> n)
.flatMap(g -> {
return g.take(3).reduce((s, s2) -> s + s2);
}).forEach(System.out::println);
aaa
bbb
ccc
aaa
bbb
ccc
retryWhen/repeatWhen
The retryWhen
and repeatWhen
method signatures both emitted a Observable<Notification>
type which could be queried to represent either onError
in the retryWhen
case or onCompleted
in the repeatWhen
case. This was found to be confusing and unnecessary. The signatures were changed to emit Observable<Throwable>
for retryWhen
and Observable<Void>
for repeatWhen
to better signal the type of notification they are emitting without the need to then query the Notification
.
The following contrived examples shows how the Observable<Throwable>
is used to get the error that occurred when deciding to retry:
AtomicInteger count = new AtomicInteger();
Observable.create((Subscriber<? super String> s) -> {
if (count.getAndIncrement() == 0) {
s.onError(new RuntimeException("always fails"));
} else {
s.onError(new IllegalArgumentException("user error"));
}
}).retryWhen(attempts -> {
return attempts.flatMap(throwable -> {
if (throwable instanceof IllegalArgumentException) {
System.out.println("don't retry on IllegalArgumentException... allow failure");
return Observable.error(throwable);
} else {
System.out.println(throwable + " => retry after 1 second");
return Observable.timer(1, TimeUnit.SECONDS);
}
});
})
.toBlocking().forEach(System.out::println);
Artifacts: Maven Central