Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

windowTimeout - Flux disposed prematurely #1898

Closed
kbeineke opened this issue Sep 19, 2019 · 3 comments
Closed

windowTimeout - Flux disposed prematurely #1898

kbeineke opened this issue Sep 19, 2019 · 3 comments
Labels
status/need-investigation This needs more in-depth investigation
Milestone

Comments

@kbeineke
Copy link

When using windowTimeout(maxElements, duration) in combination with concat/concatWith with the following preconditions:

  • the number of concatenated elements is greater than maxElements,
  • duration is small and the following operation rather slow.

Then, the following exception is thrown (and not all concatenated elements are processed):

11:09:26.874 [parallel-1] DEBUG reactor.core.publisher.Operators - onNextDropped: UnicastProcessor
11:09:26.879 [parallel-1] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ReactorRejectedExecutionException: Scheduler unavailable
	at reactor.core.Exceptions.failWithRejected(Exceptions.java:249)
	at reactor.core.publisher.Operators.onRejectedExecution(Operators.java:807)
	at reactor.core.publisher.FluxWindowTimeout$WindowTimeoutSubscriber.newPeriod(FluxWindowTimeout.java:188)
	at reactor.core.publisher.FluxWindowTimeout$WindowTimeoutSubscriber.drainLoop(FluxWindowTimeout.java:385)
	at reactor.core.publisher.FluxWindowTimeout$WindowTimeoutSubscriber$ConsumerIndexHolder.run(FluxWindowTimeout.java:440)
	at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
	at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.RejectedExecutionException: Scheduler unavailable
	at reactor.core.Exceptions.<clinit>(Exceptions.java:502)
	at reactor.core.scheduler.Schedulers.workerSchedulePeriodically(Schedulers.java:959)
	at reactor.core.scheduler.ExecutorServiceWorker.schedulePeriodically(ExecutorServiceWorker.java:56)
	at reactor.core.publisher.FluxWindowTimeout$WindowTimeoutSubscriber.newPeriod(FluxWindowTimeout.java:184)
	... 10 common frames omitted

Steps to Reproduce

This seems to be a time-dependent problem. Thus, it does not occur every time. On my machine, it occurred in 9 out of 10 case in this simplified example.

@Test
public void repoCase() {

    final var disposable = Flux
        .just(0)
        .concatWith(Flux.fromStream(IntStream
            .range(1, 1000)
            .boxed()))
        .windowTimeout(100, Duration.ofMillis(1))
        .flatMap(integerFlux -> integerFlux.compose(this::slowOperation))
        .subscribe();

    while (!disposable.isDisposed()) {
        LockSupport.parkNanos(100 * 1000 * 1000);
    }
}

private Flux<Double> slowOperation(final Flux<Integer> flux) {
    return flux.map(integer -> {
        double sum = 0;
        for (int counter = 1; counter < 10000; counter++) {
            sum += Math.pow(-1, counter + 1) / ((2 * counter) - 1);
        }
        return sum;
    });
}

Possible Solution

For now, I use bufferTimeout instead. This is not ideal as buffering is not necessary but it works reliably.

Your Environment

  • Reactor version(s) used: 3.2.11.RELEASE
  • Other relevant libraries versions (eg. netty, ...):
  • JVM version (javar -version): OpenJDK 11.0.1
  • OS and version (eg uname -a): macOS 10.14.6
@simonbasle simonbasle added the status/need-investigation This needs more in-depth investigation label Nov 27, 2019
@simonbasle simonbasle added this to the Backlog milestone Nov 27, 2019
@srnagar
Copy link

srnagar commented Apr 6, 2020

I am also noticing the same stack trace intermittently when using windowTimeout().

  • JVM version (javar -version): OpenJDK 11.0.1
  • Reactor version: 3.3.0.RELEASE
  • OS: Windows 10

simonbasle pushed a commit that referenced this issue Jun 10, 2022
This commit adds a variant of windowTimeout that tries
to honor backpressure of a slow downstream Subscriber
better than the current implementation (which just errors
in the face of backpressure and timeouts).

The operator tries to request the minimum possible amount
from upstream to honor the maxSize, but if a timeout occurs
before all these elements could arrive - and if no more window
is requested from downstream, the elements  get temporarily
stored into a pending window that is opened on timeout, until
1 more window is requested.

See also #1898 and #2920.
Fixes #1099.
@OlegDokuka
Copy link
Contributor

@kbeineke can you please check if this issue reproduces after the #3054. Thanks

@OlegDokuka
Copy link
Contributor

closing since can not reproduce it with windowTimeout with fair back pressure

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/need-investigation This needs more in-depth investigation
Projects
None yet
Development

No branches or pull requests

4 participants