Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WindowTimeout backpressure #1099

Closed
bsideup opened this issue Feb 26, 2018 · 31 comments
Closed

WindowTimeout backpressure #1099

bsideup opened this issue Feb 26, 2018 · 31 comments
Assignees
Labels
status/need-investigation This needs more in-depth investigation

Comments

@bsideup
Copy link
Contributor

bsideup commented Feb 26, 2018

Expected behavior

windowTimeout respects the backpressure

Actual behavior

Not verified if expected or not:
The receiver is overrun by more signals than expected (bounded queue...)

Steps to reproduce

Flux
        .generate(sink -> {
            System.out.println("generate");
            sink.next(UUID.randomUUID().toString());
        })
        .log("source", Level.INFO, SignalType.REQUEST)
        .limitRate(5)
        .windowTimeout(10, Duration.ofSeconds(2))
        .concatMap(batch -> batch.delayElements(Duration.ofMillis(200)))
        .blockLast()

Reactor Core version

3.1.4.RELEASE

JVM version (e.g. java -version)

Any

@simonbasle simonbasle added the status/need-investigation This needs more in-depth investigation label Feb 27, 2018
@simonbasle simonbasle added this to the 3.2.0.RELEASE milestone Feb 27, 2018
@alex-lx
Copy link

alex-lx commented Mar 21, 2018

Meet the same issue when implementing a Kafka consumer which bulk save the records to another database.

@bsideup
Copy link
Contributor Author

bsideup commented Mar 21, 2018

@alex-lx I had exactly the same source - Kafka :)

FYI here is a workaround for our use-case:
https://github.com/bsideup/liiklus/blob/2184d62d400a6ab72775395ba3f1a1eca9f64910/examples/java/src/main/java/com/example/Consumer.java#L67-L72

Basically, just do:

.window(size)
.concatMap(
    batch -> batch
        .delayUntil(process)
        .sample(windowDuration)
        .delayUntil(closeSubWindow),
    1
)

This way the window will be closed either after receiving size items or by delay thanks to .sample(), and it will respect the backpressure

@dforsl
Copy link

dforsl commented May 21, 2019

Hi, is there any plan on looking into this for windowTimeout and/or bufferTimeout?

There are a few questions around this on SO and in the gitter channel, but I haven't found any answers other than something along the lines of "not supported right now". It would be great to know if it's on the horizon, or if this simply isn't usage intended to be covered by reactor.

For instance, it would be great when reading from Kafka and persisting the records to a database in batches. As it is now, that doesn't seem possible out-of-the-box without the risk of an overflow-error if the process of writing to the database was temporarily slowed down for whatever reason.

The above work-around looks great for when you're processing records one by one, but I haven't been able to come up with anything for batching. Combining limitRate and onBackpressureBuffer isn't very attractive either, as it kind of cancels the backpressure-mechanism that is typically present when writing.

I've mainly looked into bufferTimeout, where I suppose the issue is that it is incapable of actually taking care of the result of the demand that itself has created. Would it be possible to make it buffer whatever that it has requested from upstream, when downstream isn't yet ready? If not always, then by specifying an overflow-strategy so that I can configure it to buffer? I fully accept that any additional items than those requested would cause an overflow-error, but I don't think it makes sense for records that the operator has requested itself just because they arrived after the time-boundary.

Essentially, what I'd want is being able to do, without risking an overflow-error on a temporary slow-down when writing to the database, is simply something like:

flux
  .bufferTimeout(1000, Duration.ofMillis(500))
  .concatMap { batch ->
    val result: Mono<Void> = // insert the batch
    return result.then(Mono.defer { //acknowledge offsets })
  }

Any input on this, @simonbasle @bsideup ? If you have ideas on how this can be solved by other operators, then I'd love to hear about those!

I hope it wasn't that bad of me to put it in here, I can open a new ticket if preferred! I just happened to find this old one. I really enjoy reactor, why I'm really keen on seeing if this is doable.
/d

@simonbasle simonbasle added status/has-workaround This has a known workaround described status/declined We feel we shouldn't currently apply this change/suggestion and removed status/need-investigation This needs more in-depth investigation labels Nov 27, 2019
@simonbasle simonbasle removed this from the Backlog milestone Nov 27, 2019
@bsideup
Copy link
Contributor Author

bsideup commented Nov 27, 2019

The workaround is for the use case and not for the reported issue. The issue remains and hurts really hard.

@bsideup bsideup reopened this Nov 27, 2019
@bsideup bsideup added status/need-investigation This needs more in-depth investigation and removed status/declined We feel we shouldn't currently apply this change/suggestion status/has-workaround This has a known workaround described labels Nov 27, 2019
@Ksnz
Copy link

Ksnz commented Dec 13, 2019

Any ideas for workarrounds?

@asyedin
Copy link

asyedin commented Apr 30, 2020

any chance this is going to be prioritized?

@mprowaznik
Copy link

This issue is very frustrating. And not fixed since 2018 🤦

@He-Pin

This comment has been minimized.

@vsethi13
Copy link

Got hit by this one today! Frustrating :/

@bsideup
Copy link
Contributor Author

bsideup commented Jun 8, 2020

To everyone affected by this issue:
Please share your use case! We want to introduce an improvement that should solve the problem, but need to understand the use cases better, so that it covers as much as possible.

Thanks in advance!

/cc @mprowaznik @vsethi13 @dforsl @alex-lx

@sneakythr0ws
Copy link

To everyone affected by this issue:
Please share your use case! We want to introduce an improvement that should solve the problem, but need to understand the use cases better, so that it covers as much as possible.

Thanks in advance!

/cc @mprowaznik @vsethi13 @dforsl @alex-lx

.bufferTimeout(..).concatMap() +1

@bsideup
Copy link
Contributor Author

bsideup commented Jun 19, 2020

@sneakythr0ws well, this is code. But maybe your use case can be done differently :) Hence the question about the use case.

@sneakythr0ws
Copy link

@sneakythr0ws well, this is code. But maybe your use case can be done differently :) Hence the question about the use case.

I use bufferTimeout and concatMap after it for batch insert. In my case I can replace it with buffer()

@dforsl
Copy link

dforsl commented Jun 24, 2020

@bsideup I'll try to descibe a, for us, common processing flow:

  1. Read events from Kafka
  2. Save a batch of events to a database, that'll eventually form a "complete" record
  3. Commit offsets
  4. "Complete" records are sent downstream for further processing

We have been using reactor-kafka (altough it has proven somewhat unstable with respect to error handling). Generally,
we split the flow into separate rails at 1 by grouping on partition. For each rail, we batch the incoming events, up to some limit, before do a batch-insert to our database. This enables us to handle both real-time peek hours as well reprocess a topic, when needed, in a timely manner.

For latency reasons (as our load various throughout the day) we also want to limit the batching with respect to time. But the different times we've researched this, we haven't been able to find a solution with reactor where we can do this and respect backpressure. As we can make fairly good guesses if an event will make a record complete, we could maybe utilise that "somehow". But that seems to grow complicated pretty quickly as opposed to just "batch with timeout".

I'm sure there are better ways to structure the processing and so on. But because we need to use a database as intermediate step to store partial records, as we'd otherwise have to start from way too far back on startup due to how long it can take before a record is complete, we seem to be somewhat stuck in trying to solve this with reactor.

In an attempt to generalise, the key characteristics to me are:

  • batch-op towards an external service
  • latency requirements
  • respect backpressure

chibenwa added a commit to chibenwa/james-project that referenced this issue Jun 30, 2020
As described in reactor/reactor-core#1099
windowTimeout does not play well with backpressure. When the number
of window exceeds internal buffer size (32) the throttling crashed.

We replaced "windowTimeout" with "window" and provides aditional
tests covering when the upstream do not succeed to keep up with
the throttling (try to cover regressions)
@chibenwa
Copy link

As requested I share my use case: apply a throttling to limit background operations resource consumptions. As part of the Apache JAMES mail server, some "background tasks" can be specified a "messagesPerSecond" parameters.

Our initial "throttling" implementation was:

Function<Flux<T>, Flux<U>> throttling = flux -> flux.windowTimeout(elements, duration)
    .zipWith(Flux.interval(DELAY, duration))
    .flatMap(Tuple2::getT1, elements, elements)
    .flatMap(operation, elements)

As demonstrated in linagora/james-project#3525 , it looks like we did not need windowTimeout and that window was enough (and did respect back-pressure) - I must admit I don't fully understand why slow sources are correctly handled though....

@bsideup
Copy link
Contributor Author

bsideup commented Jul 1, 2020

FYI we just merged #2202 that allows doing "one-by-one" window processing (will be included in Reactor 3.4.0.M2, but you can try the SNAPSHOTs today)

It does not solve WindowTimeout, but at least we can now window + ACK every N seconds without loading unnecessary records into the memory, see this example:
https://twitter.com/bsideup/status/1278268312808493062?s=20

@dforsl
Copy link

dforsl commented Jul 1, 2020

@bsideup that sounds absolutely fantastic!

asfgit pushed a commit to apache/james-project that referenced this issue Jul 3, 2020
As described in reactor/reactor-core#1099
windowTimeout does not play well with backpressure. When the number
of window exceeds internal buffer size (32) the throttling crashed.

We replaced "windowTimeout" with "window" and provides aditional
tests covering when the upstream do not succeed to keep up with
the throttling (try to cover regressions)
@lbilger
Copy link

lbilger commented Jul 7, 2020

My use case: Receive messages from Kafka, collect them in batches (using bufferTimeout in my case) and process each batch, applying a retry if an error occurs. So processing a batch takes until it completes successfully and eventually an overflow exception occurs if a lot of messages come in and some errors occur.

I think I have found a workaround, but am not sure if I'm not overlooking something. So I'm sharing this to get some feedback as much as hoping to help others. It's written in Kotlin, but I think Java developers should be able to read it. The main idea is to use a Semaphore to limit the count of "in-flight" items (items forwarded to the bufferTimeout operator but not yet processed by the rest of the stream):

fun <T> Flux<T>.bufferTimeoutWithBackpressure(maxSize: Int, maxTime: Duration, maxInFlightElements: Int = (maxSize * 4).coerceAtLeast(Queues.SMALL_BUFFER_SIZE), remainingSteps: Flux<List<T>>.() -> Flux<List<T>>): Flux<List<T>> =
    this
        .concatMap {
            Mono.subscriberContext()
                .map { context -> context.get("semaphore") as Semaphore }
                .doOnNext { semaphore -> if (!semaphore.tryAcquire()) throw RuntimeException() }
                .onErrorStop()
                .retryBackoff(Long.MAX_VALUE, Duration.ofMillis(1), Duration.ofMillis(100))
                .thenReturn(it)
        }
        .bufferTimeout(maxSize, maxTime)
        .onBackpressureBuffer(maxInFlightElements)
        .run(remainingSteps)
        .concatMap { list ->
            Mono.subscriberContext()
                .map { it.get("semaphore") as Semaphore }
                .doOnNext { semaphore -> semaphore.release(list.size) }
                .thenReturn(list)
        }
        .subscriberContext(Context.of("semaphore", Semaphore(maxInFlightElements)))

This is the test I used to try it:

class BufferTimeoutWithBackpressureTest {
    companion object : KLogging()

    @Test
    fun `bufferTimeoutWithBackpressure works as intended`() {
        Flux.fromIterable(1..1000)
            .delayElements(Duration.ofMillis(1))
            .bufferTimeoutWithBackpressure(5, Duration.ofMillis(2)) {
                concatMap {
                    Mono.delay(Duration.ofMillis(20))
                        .thenReturn(it)
                }
            }
            .concatMapIterable { it }
            .reduce(0) { lastSeen, new ->
                assertThat(new).isEqualTo(lastSeen + 1)
                if (new % 100 == 0) {
                    logger.info { "Got $new" }
                }
                new
            }
            .block()
    }

    @Test(expected = IllegalStateException::class)
    fun `bufferTimeout does not work`() {
        Flux.fromIterable(1..1000)
            .delayElements(Duration.ofMillis(1))
            .bufferTimeout(5, Duration.ofMillis(2))
            .concatMap {
                Mono.delay(Duration.ofMillis(20))
                    .thenReturn(it)
            }
            .blockLast()
    }
}

@bsideup bsideup removed their assignment Dec 1, 2020
@yakovs83
Copy link

yakovs83 commented Feb 25, 2021

I had a case very similar to @dforsl where flow from Kafka varies through the day, so I can't just use buffer() because some "leftover" elements may get stuck there waiting for more stuff to fill the buffer. bufferTimeout() does not respect backpressure, which is understandable, and blows up with a slow consumer, coupled with .onBackpressureBuffer() it is doing better but we take a hit in terms of memory in case of slow consumer.

The thing I came up with is something along the lines of

myFlux.timed().windowUntilChanged(periodic function based on elapsed time).concatMap(w -> w.map(Timed::get).bufferTimeout(size, timeout))

The downside is that I have to attach timing data and then remove it. Seems to work, at least better than bufferTimeout().onBackpressureBuffer() combo. Before that I came up with a convoluted recursive solution that would cache() the window then would try to collectList() it, timeout and repeat recursively, splitting original window in two. Recursion would terminate when our window size was 1. That looked good on paper, but didn't play nice memory-wise for some reason.

What I was looking for is a bufferTimeout-like operator, that would use timeout part to bracket the elements - something that would collect N elements or T seconds, but would not forcefully emit on a timer, but rather wait for a request - the time window would start after the accumulated buffer emitted (plus prefetch and such)

Edit: after some more thinking I came up with simpler windowing idea - wrap stream values in an Option (I like vavr one instead of standard Optional), merge with stream of Option.none() values that is generated via mapping over Flux.interval() and then I do .windowUntil(Option::isEmpty) windowing, filter empty Options, unwrap from Option, and slice resulted windows into buffers of the size I need.

@DarekDan
Copy link

@yakovs83 by any chance, would you be willing to share a code sample for the Vavr Option?

@anuchandy
Copy link

anuchandy commented Aug 2, 2021

@bsideup the way we're using the windowTimeout is:

ParitionClient {
    private Flux<PartitionEvent> receiverFlux;
    private Scheduler scheduler;

     public ParitionClient(Flux<PartitionEvent> receiverFlux, String partitionId) {
         this.scheduler = Schedulers.newBoundedElastic(schedulerSize, 100, "partition-" + partitionId);
     }

    public void processBatch(int maxBatchSize, 
        Duration maxWaitTime, 
        Consumer<List<PartitionEvent>> batchHandler) {
            receiver.windowTimeout(maxBatchSize, maxWaitTime)
                .concatMap(Flux::collectList)
                .publishOn(scheduler, false, PREFETCH)
                .subscribe(
                       batch -> batchHandler.apply(batch)
                       error -> {},
                       () -> {});
    }
}

I don't completely understand the original windowTimeout issue's internals. I wonder if you can help with providing a sample code address the above use case using a different operator combination.

I see you had shared below code:

.window(size)
.concatMap(
    batch -> batch
        .delayUntil(process)
        .sample(windowDuration)
        .delayUntil(closeSubWindow),
    1
)

Is there a way to tweak this code my use case?

@simonbasle
Copy link
Contributor

we want to start experimenting with an alternative implementation that keeps the window open despite timeout if there is no current demand from downstream. this would be a configurable feature, so the current behavior can be retained if a better fit.

@yakovs83
Copy link

yakovs83 commented Aug 10, 2021

@yakovs83 by any chance, would you be willing to share a code sample for the Vavr Option?

@DarekDan
Sorry for the late response. It went something like this:

Flux<T> f = ... //this is the flux you want to split into batches
Flux<Option<T>> cutter = Flux.interval(yourTimeInterval, Schedulers.newSingle("cutter")).onBackpressureLatest().map(x -> Option.none());
Flux<List<T>> = f.map(Option::some).mergeWith(cutter).windowUntil(Option::isEmpty, true).concatMap(w -> w.filter(Option::isEmpty).map(Option::get).buffer(yourBufferSize), 0);

I recall that separate single scheduler for the cutter flux is important, otherwise merge will not work as expected (I think without merged fluxes being on separate schedulers, merge will try to drain one of them first). The 0 in the last concatMap is prefetch parameter, I recall playing with it and ended up with 0 prefetch, but this is one thing that can be tweaked based on what you want.

@anuchandy
Copy link

anuchandy commented Aug 27, 2021

I've tried the pattern @yakovs83 shared, made some slight modifications - w.r.t position of cutter flux and where backpressureLatest is applied inorder to get a working prototype: Here is the code + a simple test:

public Flux<List<String>> windowTimeout(Flux<String> eventSource, int maxSize, Duration maxWaitTime) {
    final String cutMagicString = "$CUT$";
    final Flux<String> windowTimer = Flux.interval(maxWaitTime, Schedulers.newSingle("window-timer"))
            .map(x -> cutMagicString).onBackpressureLatest();

    return windowTimer.mergeWith(eventSource)
            .windowUntil(e -> e.equalsIgnoreCase(cutMagicString), true)
            .concatMap(w -> w.filter(e -> !e.equalsIgnoreCase(cutMagicString)).buffer(maxSize), 0);
}
@Test
public void eventWindowTimeout() throws InterruptedException {
    final int maxSize = 10;
    final Duration maxWaitTime = Duration.ofSeconds(2);

    windowTimeout(getEventSource(), maxSize, maxWaitTime)
            .subscribe(partitionEventBatch -> {
                System.out.println("[" + OffsetDateTime.now() + "] Received Batch Of Size:" + partitionEventBatch.size());
            }, error -> {
                System.err.println("Error: " + error);
            }, () -> {
                System.out.println("Completed.");
            });

    // Wait for 20 sec
    Thread.sleep(20 * 1000);
}
public Flux<String> getEventSource() {
    final int eventDelayInMillis = 250;
    
    return Flux.create(sink -> {
        sink.onRequest(request -> {
            if (request != Long.MAX_VALUE) {
                System.out.println("Backpressure Request(" + request + ")");
                LongStream.range(0, request)
                        .mapToObj(String::valueOf)
                        .forEach(message -> {
                            try {
                                Thread.sleep(eventDelayInMillis);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            sink.next(message);
                        });
            } else {
                sink.error(new RuntimeException("No Backpressure not supported"));
            }
        });
    });
}

But as I try different params combinations for max-waitime, max-items, it seems to consume too much CPU; I'm not sure what is happening underneath but it seems to work but bit worried to use in production.

@anuchandy
Copy link

anuchandy commented Aug 27, 2021

I think this is another way of doing the same. I haven't looked into any performance implication, just sharing here:

/**
 * Split given {@code source} {@link Flux} sequence into multiple {@link Flux} windows containing
 * {@code maxBatchSize} elements (or less for the final window) and starting from the first item.
 *
 * @param source The source stream to split into {@link Flux} windows.
 * @param maxBatchSize the maximum number of items to emit in the window before closing it
 * @param maxWaitTime the maximum {@link Duration} since the window was opened before closing it
 * @param timer a time-capable {@link Scheduler} instance to run on
 *
 * @return a {@link Flux} of {@link Flux} windows based on element count and duration
 */
public Flux<List<String>> windowWithTimeout(Flux<String> source,
                                         int maxBatchSize,
                                         Duration maxWaitTime,
                                         Scheduler timer) {

    return Flux.defer(() -> {
        final int[] currentBatchSize = new int[1];
        final String cutMagicString = "$CUT$";

        final Scheduler cuttingTimer = timer == null ?
                Schedulers.newSingle("cutting-timer") :
                timer;

        final Flux<String> sourceCutter = Flux.interval(maxWaitTime, cuttingTimer)
                .map(x -> cutMagicString)
                .onBackpressureLatest();

        return sourceCutter.mergeWith(source)
                .bufferUntil(e -> {
                    if (e.equalsIgnoreCase(cutMagicString)) {
                        currentBatchSize[0] = 0;
                        return true;
                    } else {
                        currentBatchSize[0]++;
                        if (currentBatchSize[0] >= maxBatchSize) {
                            currentBatchSize[0] = 0;
                            return true;
                        } else {
                            return false;
                        }
                    }
                });
    });
}

@JonathanGiles
Copy link

@OlegDokuka Do you have any further thoughts based on what @anuchandy has posted above? Thanks!

@JonathanGiles
Copy link

@simonbasle @OlegDokuka I would love to hear your thoughts on any path forward for this issue. Thanks!

@OlegDokuka
Copy link
Contributor

OlegDokuka commented Sep 28, 2021

@simonbasle @OlegDokuka I would love to hear your thoughts on any path forward for this issue. Thanks!

Hey, @JonathanGiles.

Bear with us, I'm working on new implementation for windowTimeout, and will drop a PR soon. Stay tuned.

In general, that is going to be a completelly new operator written from scratch. The implementation will not send new window until there is a demand from downstream. + in combination with prefetch strategy we should be able to have reflection of that missed demand to upstream, so the upstream should stop sending message eventualy. This is a compromise solution since new window is technically collecting elements, though the timer for next timeout is ignored and may be deffered till the next request from the downstream.

I will also workout an alternative strategy which is deferring window Close until there is no demand for the next window. In that case the timeout constrains will also be broken and window will remain open until the next request from the downstream

@JonathanGiles
Copy link

@OlegDokuka Thanks for the update!

@OlegDokuka
Copy link
Contributor

@JonathanGiles @anuchandy I have created a draft PR with the first impl of new WindowTimeout which works identically to what was proposed by @anuchandy here -> #1099 (comment).

Feel free to give it a try and let me know whether it works as expected

@simonbasle
Copy link
Contributor

@JonathanGiles @anuchandy we're not sure of the design and tradeoffs in #2822 and would like to discuss these and have you folks try it out before it gets merged. would you be able to do that (eg. by using https://jitpack.io or building locally yourselves)?

let's follow up on that discussion in the PR itself)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/need-investigation This needs more in-depth investigation
Projects
None yet
Development

Successfully merging a pull request may close this issue.