Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

onErrorContinue not invoked when RejectedExecutionException is thrown #1488

Closed
mbuchwald opened this issue Jan 15, 2019 · 6 comments
Closed
Labels
status/declined We feel we shouldn't currently apply this change/suggestion

Comments

@mbuchwald
Copy link

We are dealing with an issue regarding the use of onErrorContinue. Basically, we need to handle any exception thrown and continue the execution. We added onErrorContinue in different flows but there seems to be a problem handling a RejectedExecutionException thrown by the scheduler to which we are doing the publishOn. In this case, doOnError or onErrorResume are invoked but onErrorContinue is not. We are using reactor 3.2.0 but it was also reproduced in 3.2.5.

I have the following test case that reproduce this scenario:

public class ReactorOnErrorContinueTestCase {

  @Test
  public void someTest() throws Exception {
    TestScheduler cpuLightBusy = new TestScheduler(2);
    FluxSinkRecorder<Integer> recorder = new FluxSinkRecorder();
    AtomicInteger sum = new AtomicInteger(0);
    CountDownLatch latch = new CountDownLatch(3);
    Flux<Integer> flux = Flux.create(recorder);
    flux
        .publishOn(fromExecutorService(cpuLightBusy))
        .doOnNext(value -> {
          sum.addAndGet(value);
          latch.countDown();
        })
        .doOnError(t -> System.err.println(t))
        .onErrorContinue((t, o) -> {
          System.err.println("Exception is: " + t);
          Integer value = (Integer) o;
          sum.addAndGet(value * 2);
          latch.countDown();
        })
        .subscribe();
    recorder.getFluxSink().next(10);
    recorder.getFluxSink().next(12);
    recorder.getFluxSink().next(13);
    latch.await(5, TimeUnit.SECONDS);
    assertThat(sum.get(), is(45));
  }

  static class TestScheduler extends ScheduledThreadPoolExecutor {

    private final ExecutorService executor;
    private AtomicInteger countdown = new AtomicInteger(0);

    public TestScheduler(int threads) {
      super(1);
      executor = new ThreadPoolExecutor(threads, threads, 0l, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(threads));
    }

    public void countAndThrow() {
      // Avoid throwing exception for the publishOn thread jump
      if (countdown.getAndIncrement() == 1) {
        throw new RejectedExecutionException();
      }
    }

    @Override
    public Future<?> submit(Runnable task) {
      countAndThrow();
      return executor.submit(task);
    }

    @Override
    public Future<?> submit(Callable task) {
      countAndThrow();
      return executor.submit(task);
    }
  }

  public class FluxSinkRecorder<T> implements Consumer<FluxSink<T>> {

    private FluxSink<T> fluxSink;

    @Override
    public void accept(FluxSink<T> fluxSink) {
      this.fluxSink = fluxSink;
    }

    public FluxSink<T> getFluxSink() {
      return fluxSink;
    }
  }
}
@simonbasle
Copy link
Member

onErrorContinue and publishOn+RejectedExecutionException could be a bit more complicated that you anticipate: it is not only data signals that are submitted to the ExecutorService, but termination signals as well (onComplete, onError). The operator also can conflate bursts of signals into a single submission of a loop-draining task (especially when dealing with Fuseable sources).

This makes the decision of "continuing" difficult (how much should we request? isn't there a risk of infinitely looping if the executor is actually shut down? etc...)

Bounded executors are probably not the right tool if you can't deal with REEs using retry patterns. A mix of ThreadPoolExecutor's RejectedExecutionHandler (eg. the callers run policy) and backpressure maybe?

And in the case the RejectedExecutionException happens due to the ExecuterService being shut down, then there's nothing much helping in skipping to the next signal: it will also be rejected.

@mbuchwald
Copy link
Author

mbuchwald commented Jan 16, 2019

@simonbasle we added a retryWhen, that handle this correctly, but as soon as the scheduler stops throwing the REE (and the task is submitted to the scheduler) the whole Flow simply stops.
For example, by adding to the example above the same retryWhen I mention:

Flux<Integer> flux = Flux.create(recorder);
    flux
        .publishOn(fromExecutorService(cpuLightBusy))
        .doOnNext(value -> {
          sum.addAndGet(value);
          latch.countDown();
        })
        .doOnError(t -> System.err.println(t))
        .onErrorContinue((t, o) -> {
          System.err.println("Exception is: " + t);
          Integer value = (Integer) o;
          sum.addAndGet(value * 2);
          latch.countDown();
        })
        .retryWhen(onlyIf(ctx -> true)
            .backoff(ctx -> new BackoffDelay(ofMillis(2)))
            .withBackoffScheduler(fromExecutorService(cpuLightBusy)))
        .subscribe();

In our (real) chain what happens is that, after correctly retrying and submitting the task to the scheduler, the FluxPublishOn checkTerminated method simply returns true, even though it should continue the execution.

@dfeist
Copy link
Contributor

dfeist commented Jan 16, 2019

@mbuchwald publishOn does not support onErrorContinue, that's why you are seeing this behaviour.

That said, I don't think this is necessarily an issue that prevents EmitterProcessor/PublishOn being used instead of WorkQueueProcessor to decouple work stages. Feel free to reach out to me.

@mbuchwald
Copy link
Author

Even if publishOn does not support it, there must be some way to avoid the entire FluxSink to terminate

@dfeist
Copy link
Contributor

dfeist commented Jan 21, 2019

@mbuchwald No, thats the correct behaviour because errors are terminal. That's why we introduced onErrorContinue to provide an alternative to the default 'terminal' error behaviour. See #572 and #629.

@simonbasle Is there any way to log a warning in this case, where onErrorContinue is used, but has no effect because the upstream operator does not support it?

@simonbasle
Copy link
Member

@dfeist not really, this is not a centralized process. the onErrorContinue puts a flag in the Context. the operators that support error recovery look for it. other operators don't have knowledge of its existence.

@simonbasle simonbasle added the status/declined We feel we shouldn't currently apply this change/suggestion label Sep 19, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/declined We feel we shouldn't currently apply this change/suggestion
Projects
None yet
Development

No branches or pull requests

3 participants