Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrading SyncOnSubscribe from experimental to beta #3780

Merged
merged 1 commit into from
Mar 31, 2016

Conversation

stealthcode
Copy link

No description provided.

@stevegury
Copy link
Member

👍

@stealthcode
Copy link
Author

@akarnokd is this alright with you?

@akarnokd
Copy link
Member

I'm not seeing why the change before 1.2?

@stealthcode
Copy link
Author

I think at Netflix we are comfortable using these going forward. Is there any reason to wait? This only communicates our comfort level with this api slightly more strongly.

@akarnokd
Copy link
Member

I see the value of SyncOnSubscribe but I'd like some user opinion about AsyncOnSubscribe outside Netflix.

@stealthcode
Copy link
Author

Okay, fair. I would also like to see some usage and feedback on AsyncOnSubscribe as well. However I would prefer both of these apis to be in beta so people can reliably attempt to adopt them. I am comfortable (eventually) upgrading SyncOnSubscribe to part of the standard api but have Async in Beta. Is this fair?

@akarnokd
Copy link
Member

Fine, but since this is a promotion, I'd like a majority vote just like before 1.1.

I personally would 👍 for SyncOnSubscribe promotion but I'm still not convinced about AsyncOnSubscribe's usability because the unpredictable request count if chained with any reasonable prefetching/replenishing operator (observeOn, flatMap, etc.).

@stealthcode
Copy link
Author

Okay. I still would like it if we cleaned up the prefetching behavior so the request 1 patterns which we know are detrimental to performance are resolved.

@stealthcode
Copy link
Author

@abersnaze @zsxwing @benjchristensen any concerns with this api upgrade from experimental to beta?

@akarnokd
Copy link
Member

Could you open a separate issue with the problem description and example code so we can discuss it and not clutter this PR?

@stealthcode
Copy link
Author

Do you mean the prefetching/batching behavior of observeOn and flatMap?

To confirm, you are in agreement that Experimental to Beta is acceptable for both?

@akarnokd
Copy link
Member

I still would like it if we cleaned up the prefetching behavior so the request 1 patterns which we know are detrimental to performance are resolved.

@stealthcode
Copy link
Author

Cool, Yes will do!

@stealthcode
Copy link
Author

So to confirm you do not up vote this PR as is?

@akarnokd
Copy link
Member

So to confirm you do not up vote this PR as is?

👍 for SyncOnSubscribe, 👎 for AsyncOnSubscribe. You'd better split the PR.

@kurzweil
Copy link

I've used AsyncOnSubcribe in a couple of places with success. One of which made integrating Elasticsearch's pagination api play nicely with Rx while allowing backpressure semantics to just work. Maybe one can use the following real code as an example of AsyncOnSubscribe usage:

import java.util.concurrent.TimeUnit;

import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.observables.AsyncOnSubscribe;

public class ScrollObservable {
    private static class ElasticsearchRequestState {
        private final Client client;
        private final ActionRequestBuilder<?, SearchResponse, ?, ?> request;
        private final int pageNumber;
        private final long hitCount;
        private final TimeValue timeout;
        private ElasticsearchRequestState nextState;

        private ElasticsearchRequestState(Client client, SearchRequestBuilder request, TimeValue timeout) {
            this.client = client;
            this.request = request.setScroll(timeout);
            this.timeout = timeout;
            this.pageNumber = 0;
            this.hitCount = 0;
        }

        private ElasticsearchRequestState(Client client, SearchScrollRequestBuilder request, TimeValue timeout, int pageNumber, long hitCount) {
            this.client = client;
            this.request = request.setScroll(timeout);
            this.timeout = timeout;
            this.pageNumber = pageNumber;
            this.hitCount = hitCount;
        }

        public Observable<SearchResponse> getResponse() {
            return Observable.defer(() -> Observable.just(request.execute().actionGet()))
                    .retryWhen(errors -> errors.flatMap(error -> {
                        if (error instanceof InterruptedException || error.getCause() instanceof InterruptedException) {
                            return Observable.just(null);
                        } else {
                            return Observable.error(error);
                        }
                    }))
                    .doOnNext(r -> {
                        long currentCount = hitCount + r.getHits().getHits().length;
                        if (r.getHits().getHits().length > 0 && currentCount < r.getHits().getTotalHits()) {
                            nextState = new ElasticsearchRequestState(client, client.prepareSearchScroll(r.getScrollId()), timeout, pageNumber + 1, currentCount);
                        }
                    });
        }

        public boolean hasNext() {
            return nextState != null;
        }

        public ElasticsearchRequestState next() {
            return nextState;
        }
    }

    public static Observable<SearchResponse> from(final Client client, final SearchRequestBuilder request, final TimeValue timeout) {
        OnSubscribe<SearchResponse> os = AsyncOnSubscribe.createStateful(() ->
                new ElasticsearchRequestState(client, request, timeout),
                (state, requested, observer) -> {
                    observer.onNext(state.getResponse());
                    if (state.hasNext()) {
                        return state.next();
                    } else {
                        observer.onCompleted();
                        return null;
                    }
                });
        return Observable.create(os);
    }

    public static Observable<SearchResponse> from(Client client, SearchRequestBuilder request, long duration, TimeUnit timeUnit) {
        return from(client, request, new TimeValue(duration, timeUnit));
    }

    public static Observable<SearchResponse> from(Client client, SearchRequestBuilder request) {
        return from(client, request, new TimeValue(1, TimeUnit.MINUTES));
    }
}

@stealthcode
Copy link
Author

@kurzweil interesting. It seems like you don't actually use the requested amount in producing an observable to return. I'd recommend changing state.getResponse() to take the long requested and limit the upper bounds of the data events.

@simonbasle
Copy link
Contributor

@stealthcode It was some time ago, but last time I checked I couldn't find a detailed documentation on how to use these onSubscribe, other than the javadoc, after a quick search. Can you suggest a pointer to such resources if they exist? If they don't exist, could promoting these to BETA include some detailed piece of documentation, like a wiki page giving examples on how and when to use the AsyncOnSubscribe and SyncOnSubscribe?

@stealthcode
Copy link
Author

@simonbasle Right now the documentation is pretty sparse on these topics. I would certainly welcome more documentation but don't currently have the time myself. @DavidMGross would you have some cycles to work on detailed documentation for the SyncOnSubscribe and AsyncOnSubscribe?

Here is one relevant SO post.

@simonbasle
Copy link
Contributor

thanks @stealthcode

@JakeWharton
Copy link
Contributor

To add some color here, there's currently no stable APIs for creating observables that do backpressure-aware work deferring. Even just for wrapping a synchronous method that returns a scalar value.

This make RxJava use in libraries very challenging since fromCallable and the helpers herein are unavailable.

@akarnokd
Copy link
Member

What do you mean by

that do backpressure-aware work deferring

?

@JakeWharton
Copy link
Contributor

A common pattern for deferring work is defer(() -> just(someMethod())), but if the subscriber has requested 0 the defer func is still called immediately on subscription, not on the first non-zero request. fromCallable doesn't suffer from this problem and the heavyweight hitters in this PR don't either, but all of those are non-stable APIs. As a workaround you can do things like empty().startWith(defer(() -> just(someMethod()))) or concat(just(defer(() -> just(someMethod())))) but that's a bit nuts.

@davidmoten
Copy link
Collaborator

@JakeWharton would be nice to support this strict deferral of work formally in the API and defer seems like a good place to put this support (so that subscription doesn't happen till first non-zero request). I'd be happy to see this happen in the existing defer operator but it could happen in an overload or elsewhere. Would you like to raise an issue to discuss this one?

@JakeWharton
Copy link
Contributor

Sure!

On Thu, Mar 31, 2016 at 3:35 PM Dave Moten [email protected] wrote:

@JakeWharton https://github.com/JakeWharton would be nice to support
this strict deferral of work formally in the API and defer seems like a
good place to put this support (so that subscription doesn't happen till
first non-zero request). I'd be happy to see this happen in the existing
defer operator but it could happen in an overload or elsewhere. Would you
like to raise an issue to discuss this one?


You are receiving this because you were mentioned.

Reply to this email directly or view it on GitHub
#3780 (comment)

@stealthcode
Copy link
Author

I have taken out AsyncOnSubscribe @akarnokd

@stealthcode stealthcode changed the title Upgrading SyncOnSubscribe and AsyncOnSubscribe from experimental to beta Upgrading SyncOnSubscribe from experimental to beta Mar 31, 2016
@zsxwing
Copy link
Member

zsxwing commented Mar 31, 2016

👍

@stealthcode stealthcode merged commit e804172 into ReactiveX:1.x Mar 31, 2016
@akarnokd
Copy link
Member

👍

@DavidMGross
Copy link
Collaborator

I'm currently in a holding pattern waiting to see where Netflix wants me to
apply my hours for them this season (they've been picking up the tab for my
documentation work thusfar), but I've filed a documentation issue so that I
or someone else can pick up the trail at some point: ReactiveX/
reactivex.github.io#221

On Mon, Mar 28, 2016 at 4:28 PM, Aaron Tull [email protected]
wrote:

@simonbasle https://github.com/simonbasle Right now the documentation
is pretty sparse on these topics. I would certainly welcome more
documentation but don't currently have the time myself. @DavidMGross
https://github.com/DavidMGross would you have some cycles to work on
detailed documentation for the SyncOnSubscribe and AsyncOnSubscribe?

Here is one SO post
http://stackoverflow.com/questions/32723315/is-unsubscribe-thread-safe-in-rxjava


You are receiving this because you were mentioned.
Reply to this email directly or view it on GitHub
#3780 (comment)

David M. Gross
PLP Consulting

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants