Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

toSingle causes excessive stack traces #4594

Closed
DavidDTA opened this issue Sep 23, 2016 · 10 comments
Closed

toSingle causes excessive stack traces #4594

DavidDTA opened this issue Sep 23, 2016 · 10 comments
Labels

Comments

@DavidDTA
Copy link
Contributor

I have the following code:

        final ConnectableObservable<Object> connectable =
                subject.toSingle().flatMap(x -> whatever).toObservable().replay();

which produces the following stack trace:

          at rx.internal.operators.OperatorReplay$ReplaySubscriber.onCompleted(OperatorReplay.java:470)                                                                            
          at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:110)
          at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85)
          at rx.Single$1$1.onSuccess(Single.java:80)
          at rx.Single$18.onNext(Single.java:1911)                                                                                                                                 
          at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)                                                                                                           
          at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102)                                                                                      
          at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85)                                                                                   
          at rx.Single$1$1.onSuccess(Single.java:80)                                                                                                                               
          at rx.internal.operators.SingleOnSubscribeMap$MapSubscriber.onSuccess(SingleOnSubscribeMap.java:74)                                                                      
          at rx.Single$18.onNext(Single.java:1911)                                                                                                                                 
          at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)                                                                                                           
          at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102)                                                                                      
          at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85)                                                                                   
          at rx.Single$1$1.onSuccess(Single.java:80)                                                                                                                               
          at rx.internal.operators.SingleOnSubscribeMap$MapSubscriber.onSuccess(SingleOnSubscribeMap.java:74)                                                                      
          at rx.Single$18.onNext(Single.java:1911)
          at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
          at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102)
          at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85)
          at rx.Single$1$1.onSuccess(Single.java:80)
          at rx.Single$18.onNext(Single.java:1911)
          at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
          at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102)
          at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85)
          at rx.Single$1$1.onSuccess(Single.java:80)
          at rx.internal.operators.SingleOnSubscribeMap$MapSubscriber.onSuccess(SingleOnSubscribeMap.java:74)                                                                      
          at rx.Single$18.onNext(Single.java:1911)
          at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
          at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102)                                                                                      
          at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85)                                                                                   
          at rx.Single$1$1.onSuccess(Single.java:80)
          at rx.internal.util.ScalarSynchronousSingle$1.call(ScalarSynchronousSingle.java:40)
          at rx.internal.util.ScalarSynchronousSingle$1.call(ScalarSynchronousSingle.java:36)                                                                                      
          at rx.Single$1.call(Single.java:90)
          at rx.Single$1.call(Single.java:70)
          at rx.Single.subscribe(Single.java:1839)
          at rx.Single.subscribe(Single.java:1916)
          at rx.internal.operators.SingleOnSubscribeMap.call(SingleOnSubscribeMap.java:45)
          at rx.internal.operators.SingleOnSubscribeMap.call(SingleOnSubscribeMap.java:30)                                                                                         
          at rx.Single$1.call(Single.java:90)
          at rx.Single$1.call(Single.java:70)
          at rx.Single.subscribe(Single.java:1839)
          at rx.Single.subscribe(Single.java:1916)
          at rx.Single$5$1.onSuccess(Single.java:691)                                                                                                                              
          at rx.Single$5$1.onSuccess(Single.java:687)
          at rx.Single$18.onNext(Single.java:1911)                                                                                                                                 
          at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)                                                                                                           
          at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102)                                                                                      
          at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85)                                                                                   
          at rx.Single$1$1.onSuccess(Single.java:80)                                                                                                                               
          at rx.internal.operators.SingleOnSubscribeMap$MapSubscriber.onSuccess(SingleOnSubscribeMap.java:74)                                                                      
          at rx.Single$18.onNext(Single.java:1911)
          at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)                                                                                                           
          at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102)                                                                                      
          at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85)                                                                                   
          at rx.Single$1$1.onSuccess(Single.java:80)                                                                                                                               
          at rx.internal.operators.SingleOnSubscribeMap$MapSubscriber.onSuccess(SingleOnSubscribeMap.java:74)                                                                      
          at rx.Single$18.onNext(Single.java:1911)
          at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)                                                                                                           
          at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102)                                                                                      
          at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85)                                                                                   
          at rx.Single$1$1.onSuccess(Single.java:80)                                                                                                                               
          at rx.internal.util.ScalarSynchronousSingle$1.call(ScalarSynchronousSingle.java:40)                                                                                      
          at rx.internal.util.ScalarSynchronousSingle$1.call(ScalarSynchronousSingle.java:36)                                                                                      
          at rx.Single$1.call(Single.java:90)
          at rx.Single$1.call(Single.java:70)
          at rx.Single.subscribe(Single.java:1839)
          at rx.Single.subscribe(Single.java:1916)
          at rx.internal.operators.SingleOnSubscribeMap.call(SingleOnSubscribeMap.java:45)                                                                                         
          at rx.internal.operators.SingleOnSubscribeMap.call(SingleOnSubscribeMap.java:30)
          at rx.Single$1.call(Single.java:90)
          at rx.Single$1.call(Single.java:70)
          at rx.Single.subscribe(Single.java:1839)
          at rx.Single.subscribe(Single.java:1916)
          at rx.internal.operators.SingleOnSubscribeMap.call(SingleOnSubscribeMap.java:45)                                                                                         
          at rx.internal.operators.SingleOnSubscribeMap.call(SingleOnSubscribeMap.java:30)
          at rx.Single$1.call(Single.java:90)
          at rx.Single$1.call(Single.java:70)
          at rx.Single.subscribe(Single.java:1839)
          at rx.Single.subscribe(Single.java:1916)
          at rx.Single$5.call(Single.java:701)
          at rx.Single$5.call(Single.java:683)
          at rx.Single$1.call(Single.java:90)
          at rx.Single$1.call(Single.java:70)
          at rx.Single.subscribe(Single.java:1839)
          at rx.Single.subscribe(Single.java:1916)
          at rx.internal.operators.SingleOnSubscribeMap.call(SingleOnSubscribeMap.java:45)
          at rx.internal.operators.SingleOnSubscribeMap.call(SingleOnSubscribeMap.java:30)
          at rx.Single$1.call(Single.java:90)
          at rx.Single$1.call(Single.java:70)
          at rx.Single.subscribe(Single.java:1839)
          at rx.Single.subscribe(Single.java:1916)
          at rx.internal.operators.SingleOnSubscribeMap.call(SingleOnSubscribeMap.java:45)
          at rx.internal.operators.SingleOnSubscribeMap.call(SingleOnSubscribeMap.java:30)
          at rx.Single$1.call(Single.java:90)
          at rx.Single$1.call(Single.java:70)
          at rx.Single.subscribe(Single.java:1839)
          at rx.Single.subscribe(Single.java:1916)
          at rx.Single$5$1.onSuccess(Single.java:691)
          at rx.Single$5$1.onSuccess(Single.java:687)
          at rx.Single$18.onNext(Single.java:1911)
          at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
          at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102)
          at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85)
          at rx.Single$1$1.onSuccess(Single.java:80)
          at rx.internal.operators.SingleOnSubscribeMap$MapSubscriber.onSuccess(SingleOnSubscribeMap.java:74)
          at rx.Single$18.onNext(Single.java:1911)
          at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:134)
          at rx.internal.producers.SingleDelayedProducer.emit(SingleDelayedProducer.java:102)
          at rx.internal.producers.SingleDelayedProducer.setValue(SingleDelayedProducer.java:85)
          at rx.Single$1$1.onSuccess(Single.java:80)
          at rx.internal.operators.OnSubscribeSingle$1.onCompleted(OnSubscribeSingle.java:55)
          at rx.subjects.PublishSubject$PublishSubjectProducer.onCompleted(PublishSubject.java:323)
          at rx.subjects.PublishSubject$PublishSubjectState.onCompleted(PublishSubject.java:247)
          at rx.subjects.PublishSubject.onCompleted(PublishSubject.java:83)

When I change the code to:

        final ConnectableObservable<Object> connectable =
                subject.concatMap(x -> whatever).replay();

The stack trace I get is:

          at rx.internal.operators.OperatorReplay$ReplaySubscriber.onCompleted(OperatorReplay.java:470)
          at rx.observers.SerializedObserver.onCompleted(SerializedObserver.java:177)
          at rx.observers.SerializedSubscriber.onCompleted(SerializedSubscriber.java:64)
          at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.drain(OnSubscribeConcatMap.java:246)
          at rx.internal.operators.OnSubscribeConcatMap$ConcatMapSubscriber.onCompleted(OnSubscribeConcatMap.java:169)
          at rx.subjects.PublishSubject$PublishSubjectProducer.onCompleted(PublishSubject.java:323)
          at rx.subjects.PublishSubject$PublishSubjectState.onCompleted(PublishSubject.java:247)
          at rx.subjects.PublishSubject.onCompleted(PublishSubject.java:83)

This is happening on RxJava 1.1.9

@akarnokd
Copy link
Member

Could you post a unit test that shows the problem. The stacktrace refers to a bunch of mappings which I guess is in whatever.

@DavidDTA
Copy link
Contributor Author

DavidDTA commented Sep 26, 2016

Here is a unit test that demonstrates the situation:

package com.example;

import junit.framework.Assert;

import org.junit.Test;

import rx.Observable;
import rx.Single;
import rx.functions.Action1;
import rx.subjects.PublishSubject;

public class ExampleUnitTest {
    final int ITERATIONS = 10;

    private Action1<Integer> assertStack(final int baseStack, final int additionalFrames, final int iterationFrames) {
        return o -> Assert.assertEquals(baseStack + additionalFrames + ITERATIONS * iterationFrames, Thread.currentThread().getStackTrace().length);
    }

    @Test
    public void testFlatMap() {

        int baseStack = Thread.currentThread().getStackTrace().length;

        PublishSubject<Integer> subject = PublishSubject.create();

        Single<Integer> single1 = subject.toSingle().flatMap(integer -> {
            Single<Integer> modified = Single.just(integer);
            for (int i = 0; i < ITERATIONS; i++) {
                modified = modified.map(n -> n + 1);
            }
            return modified;
        });
        Observable<Integer> observable = subject.concatMap(integer -> {
            Observable<Integer> modified = Observable.just(integer);
            for (int i = 0; i < ITERATIONS; i++) {
                modified = modified.map(n -> n + 1);
            }
            return modified;
        });
        single1.subscribe(assertStack(baseStack, 34, 12));
        observable.subscribe(assertStack(baseStack, 18, 5));
        observable.toSingle().subscribe(assertStack(baseStack, 14, 0));

        subject.onNext(0);
        subject.onCompleted();
    }
}

The first subscription is my initial code. The third subscription is the comparison I was making. The second subscription makes clear that I was seeing such an improvement because the final toSingle was hiding the large stack since it delays the emission until onComplete. However, it also shows that Observable.map() only adds 5 frames, while Single.map() adds 12.

@akarnokd
Copy link
Member

We may be able to reduce the stack dept a bit but the problem is this:

for (int i = 0; i < ITERATIONS; i++) {
    modified = modified.map(n -> n + 1);
}

Your composition gets unconventionally long this way and you might want to find another way of having that many maps chained (like collect them into List<Func1> and loop through).

concatMap trampolines to prevent excessive stack depth on onComplete and synchronous sources. You could try adding .subscribeOn(Schedulers.trampoline()) occasionally to limit the depth in your sequence.

@DavidDTA
Copy link
Contributor Author

I edited my previous comment since the stack sizes were actually for Single.map() and Observable.map(). Single.flatMap() actually adds 36(!) frames and Observable.concatMap() adds 10.

@DavidDTA
Copy link
Contributor Author

I looked into the trampoline scheduler recently, and it looks like adding it into the chain with subscribeOn() would not work since each operator would create its own worker which would not share work with other trampolines in the chain.

@akarnokd
Copy link
Member

I see. The whole Single needs rearchitecting, including the operator implementations to reduce the nesting and there is a risk of binary-incompatible changes and I'm not even sure it can be done incrementally.

@DavidDTA
Copy link
Contributor Author

For the sake of completeness, here's a simpler (and perhaps more fair) test demonstrating the difference between Single.flatMap() and Single.toObservable().concatMap().toSingle()

package com.example;

import junit.framework.Assert;

import org.junit.Test;

import rx.Observable;
import rx.Single;
import rx.functions.Action1;
import rx.subjects.PublishSubject;

public class ExampleUnitTest {
    final int ITERATIONS = 20;

    private Action1<Integer> assertStack(final int baseStack, final int additionalFrames, final int iterationFrames) {
        return o -> Assert.assertEquals(baseStack + additionalFrames + ITERATIONS * iterationFrames, Thread.currentThread().getStackTrace().length);
    }

    @Test
    public void testFlatMap() {

        int baseStack = Thread.currentThread().getStackTrace().length;

        PublishSubject<Integer> subject = PublishSubject.create();

        Single<Integer> single1 = subject.toSingle();
        for (int i = 0; i < ITERATIONS; i++) {
            single1 = single1.flatMap(integer -> Single.just(integer + 1));
        }
        Single<Integer> single2 = subject.toSingle();
        for (int i = 0; i < ITERATIONS; i++) {
            single2 = single2.toObservable().concatMap(integer -> Observable.just(integer + 1)).toSingle();
        }
        single1.subscribe(assertStack(baseStack, 10, 24));
        single2.subscribe(assertStack(baseStack, 10, 8));

        subject.onNext(0);
        subject.onCompleted();
    }
}

It seems that using Single.toObservable().concatMap().toObservable() saves 16 stack frames over Single.flatMap(). As a stopgap, I'm planning to make this replacement in my code. This can be accomplished using Single.compose() and the following transformer:

    private Single.Transformer<Integer, Integer> transformer2(final Func1<Integer, Single<Integer>> mapper) {
        return integerSingle -> integerSingle.toObservable().concatMap(it -> mapper.call(it).toObservable()).toSingle();
    }

@akarnokd
Copy link
Member

Posted #4648. May not get all reduction but it's a base for further compaction.

@DavidDTA
Copy link
Contributor Author

Thanks for looking into this.

On Fri, Sep 30, 2016 at 2:40 PM, David Karnok [email protected]
wrote:

Posted #4648 #4648. May not get
all reduction but it's a base for further compaction.


You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
#4594 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AA-Ue3AWXA50Ku7RzkppbfAb3ODjztRoks5qvVeDgaJpZM4KFb8f
.

@akarnokd
Copy link
Member

Closing via #4648

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants