Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buffer() using TimeAndSizeBasedChunks incorrectly forces thread into interrupted state #428

Closed
elandau opened this issue Oct 10, 2013 · 5 comments
Labels

Comments

@elandau
Copy link

elandau commented Oct 10, 2013

In TimeAndSizeBasedChunks.emitChunk, if emitChunk is called from the scheduled action in createChunk (i.e from the timeout thread), calling subscription.unsubscribe() ends up calling cancel() on the future which sets the thread's interrupted state to true. This state has an adverse effect on any blocking call performed in that thread.

@benjchristensen
Copy link
Member

Can you provide a unit test to demonstrate the issue? Here are existing unit tests for buffer: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/operators/OperationBuffer.java#L372

Also, what Future are you referring to? Are you using custom schedulers?

@elandau
Copy link
Author

elandau commented Oct 10, 2013

I'm unable to run the unit tests in eclipse for some reason. Here's a unit test from my project. While we're on the subject of buffer() it would be nice if the observer was not called with an empty list.

    @Test
    public void testInterrupted() throws Exception {
        Observable.create(new OnSubscribeFunc<Integer>() {
            @Override
            public Subscription onSubscribe(final Observer<? super Integer> t1) {
                final Future<?> t = Executors.newSingleThreadExecutor().submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            Thread.sleep(10000);
                        } catch (InterruptedException e) {
                        }
                    }
                });

                return new Subscription() {
                    @Override
                    public void unsubscribe() {
                        t.cancel(true);
                    }
                };
            }
        })
        .buffer(1000, TimeUnit.MILLISECONDS, 10) 
        .subscribe(new Action1<List<Integer>>() {
            @Override
            public void call(List<Integer> t1) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // This should be called
                    e.printStackTrace();
                }
            }
        });

        Thread.sleep(10000);

    }

@elandau
Copy link
Author

elandau commented Oct 10, 2013

Regarding the Future, no, i'm not using a custom scheduler. Subscriptions.create(final Future<?> f) calls cancel on the future when it is unsubscribed.

@benjchristensen
Copy link
Member

Yes, that's what a Subscription of a Future is supposed to do.

@benjchristensen
Copy link
Member

Thanks for submitting the unit test, I'll try it out.

While we're on the subject of buffer() it would be nice if the observer was not called with an empty list.

You're using time, so it will emit every 1000ms whatever is buffered, even if it's empty.

zsxwing added a commit to zsxwing/RxJava that referenced this issue Oct 12, 2013
benjchristensen added a commit that referenced this issue Oct 16, 2013
rickbw pushed a commit to rickbw/RxJava that referenced this issue Jan 9, 2014
rickbw pushed a commit to rickbw/RxJava that referenced this issue Jan 9, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants