Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flux#bufferWhen memory leak suspicion using Flux#buffer(timespan, timeshift) when timespan != timeshift #969

Closed
rreynaud opened this issue Nov 29, 2017 · 0 comments
Labels
type/bug A general bug
Milestone

Comments

@rreynaud
Copy link

Expected behavior

expect Flux.buffer(Duration.ofMillis(3000), Duration.ofMillis(2000)) behaves as Flux.buffer(Duration.ofMillis(3000), Duration.ofMillis(3000))

Actual behavior

When timespan is different than timeshift, heap dumps assert that buffered elements are never garbagged before flux completion, while when timespan equals timeshift, they are...

Instances are retained by reactor.core.publisher.FluxBufferWhen$BufferStartEndMainSubscriber

Steps to reproduce

static class Wrapper {
    final int i;

    Wrapper(int i) {this.i = i;}

    @Override
    public String toString() {
        return "Wrapper{" +
                "i=" + i +
                '}';
    }
}

public void test() throws InterruptedException {
    final CountDownLatch latch = new CountDownLatch(1);
    final UnicastProcessor<Wrapper> processor = UnicastProcessor.create();
    processor.buffer(Duration.ofMillis(3000), Duration.ofMillis(2000))
             .doOnNext(t -> System.out.println(String.format("tuple %s", t)))
             .subscribe();

    Flux.range(1, Integer.MAX_VALUE)
        .delayElements(Duration.ofMillis(10))
        .doOnNext(i -> processor.onNext(new Wrapper(i)))
        .doOnComplete(processor::onComplete)
        .subscribe();

    latch.await(10, TimeUnit.MINUTES);
}

Reactor Core version

3.1.2.RELEASE

JVM version (e.g. java -version)

1.8.151

OS version (e.g. uname -a)

@smaldini smaldini added the type/bug A general bug label Dec 1, 2017
@smaldini smaldini added this to the 3.1.3.RELEASE milestone Dec 1, 2017
simonbasle added a commit that referenced this issue Dec 22, 2017
This commit ensures that buffers in the timeout and bufferWhen variants
are not retained longer than necessary (ie until source completion or
cancellation), but rather when the buffer closes.
OlegDokuka pushed a commit to OlegDokuka/reactor-core that referenced this issue Jan 8, 2018
This commit ensures that buffers in the timeout and bufferWhen variants
are not retained longer than necessary (ie until source completion or
cancellation), but rather when the buffer closes.
OlegDokuka pushed a commit to OlegDokuka/reactor-core that referenced this issue Jan 8, 2018
This commit ensures that buffers in the timeout and bufferWhen variants
are not retained longer than necessary (ie until source completion or
cancellation), but rather when the buffer closes.
simonbasle added a commit that referenced this issue Jan 16, 2018
This commit improves FluxBufferWhen implementation once again, by:

 - keeping the tracking of buffer in the main subscriber (no reference
 to buffers in close subscribers, to prevent retaining)
 - tracking request in main
 - correctly cancelling the source when the open subscriber completes
 (which means that at this point no new buffer will open) IFF last close
 subscriber has also completed
 - correctly cancel upstream and clear buffers when open/close
 subscribers error.
 - overall simplifies the inner subscribers
simonbasle added a commit that referenced this issue Jan 16, 2018
This commit improves FluxBufferWhen implementation once again, by:

 - keeping the tracking of buffer in the main subscriber (no reference
 to buffers in close subscribers, to prevent retaining)
 - tracking request in main
 - correctly cancelling the source when the open subscriber completes
 (which means that at this point no new buffer will open) IFF last close
 subscriber has also completed
 - correctly cancel upstream and clear buffers when open/close
 subscribers error.
 - overall simplifies the inner subscribers
simonbasle added a commit that referenced this issue Jan 17, 2018
This commit improves FluxBufferWhen implementation once again, by:

 - keeping the tracking of buffer in the main subscriber (no reference
 to buffers in close subscribers, to prevent retaining)
 - tracking request in main
 - correctly cancelling the source when the open subscriber completes
 (which means that at this point no new buffer will open) IFF last close
 subscriber has also completed
 - correctly cancel upstream and clear buffers when open/close
 subscribers error.
 - overall simplifies the inner subscribers
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
Development

No branches or pull requests

2 participants