-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
implements new version of WindowTimeout #2822
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
high level review
} | ||
|
||
@Override | ||
public Object scanUnsafe(Attr key) { | ||
if (key == Attr.RUN_ON) return timer; | ||
if (key == Attr.RUN_STYLE) return Attr.RunStyle.ASYNC; | ||
if (key == Attr.RUN_ON) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please revert to single lines in that method
final Scheduler timer; | ||
|
||
FluxWindowTimeout(Flux<T> source, int maxSize, long timespan, TimeUnit unit, Scheduler timer) { | ||
final int maxSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please add your name above in the @author
list
@@ -41,7 +41,7 @@ | |||
public void windowWithTimeoutAccumulateOnSize() { | |||
StepVerifier.withVirtualTime(() -> Flux.range(1, 6) | |||
.delayElements(Duration.ofMillis(300)) | |||
.windowTimeout(5, Duration.ofMillis(2000)) | |||
.windowTimeout(5, Duration.ofMillis(2000), true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the test coverage of FluxWindowTimeoutTest
wasn't great in the first place (65% of lines in 3.3.x, 62% in this PR), but I think we can improve it?
Either:
- turn these tests that use
fair == true
into parameterized ones (⚠️ this means forward merging will need to turn them into@ParameterizedTestWithName
in main) - duplicate the tests, eg. in a
@Nested
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait, this is targeting main
🤔 so yeah, direct use of ParameterizedTestWithName
then
@@ -9788,6 +9788,11 @@ public final void subscribe(Subscriber<? super T> actual) { | |||
return windowTimeout(maxSize, maxTime , Schedulers.parallel()); | |||
} | |||
|
|||
public final Flux<Flux<T>> windowTimeout(int maxSize, Duration maxTime, | |||
boolean fairBackpressure) { | |||
return windowTimeout(maxSize, maxTime , Schedulers.parallel(), true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fairBackpressure
parameter isn't used here, hardcoded to true
@@ -88,13 +856,13 @@ public Object scanUnsafe(Attr key) { | |||
volatile boolean done; | |||
volatile boolean cancelled; | |||
|
|||
volatile long requested; | |||
volatile long requested; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: avoid applying reformatting to the whole file, only edited lines
@@ -165,7 +165,7 @@ public void dispose() { | |||
}; | |||
|
|||
StepVerifier.create(Flux.range(1, 3).hide() | |||
.windowTimeout(10, Duration.ofMillis(500), testScheduler)) | |||
.windowTimeout(10, Duration.ofMillis(500), testScheduler, false)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess there is a reason this particular test only works with old mode, in which case the parameterized comment above wouldn't apply.
@@ -230,7 +230,7 @@ public void dispose() { | |||
|
|||
@Test | |||
public void scanOperator() { | |||
FluxWindowTimeout<Integer> test = new FluxWindowTimeout<>(Flux.just(1), 123, 100, TimeUnit.MILLISECONDS, Schedulers.immediate()); | |||
FluxWindowTimeout<Integer> test = new FluxWindowTimeout<>(Flux.just(1), 123, 100, TimeUnit.MILLISECONDS, Schedulers.immediate(), true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scan tests of WindowTimeoutBackpressureSubscriber
and InnerWindow
would need to be added
} | ||
} | ||
|
||
static final long FINALIZED_STATE = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's at least put constants at the top of the class. static methods usually go there too (although here there are a lot of them, so I'm less sure)
To try and give a little detail on the implementation and tradeoffs:
The other big tradeoff is that this variant will actually prolong the life of the active window if it "closes" due to timeout (ie. there is still pending demand to the source) AND there is currently 0 request for a new window. So it doesn't really pause the upstream when there is 0 demand. There is a fundamental question here: what does it actually mean to say "there is 0 demand for a new window"? windows are by nature tied to the timing of pushes from the source. Possible alternative interpretations that I can think of:
|
Tagging @JonathanGiles @anuchandy |
Hi @simonbasle, I pulled the draft and tried using the new operator with the below code. To give you a summary - The code has a producer, producing one event in every 250 ms once the backpressure request is received. It means it produces 4 events per second. The code has a consumer using the new operator to retrieve events from this producer with a max wait time of 1 second with a max size of 10. I was expecting the consumer with the new operator to receive a window with four events every 1 second, but it is not. We can see backpressure request going to the producer, and it is indeed producing events, but for some reason it never comes out of the new operator @Test
public void windowTimeoutWithBackPressureFromCore() throws InterruptedException {
// -- The Event Producer
// The producer emitting requested events to downstream but with a delay of 250ms between each emission.
//
final int eventProduceDelayInMillis = 250;
Flux<String> eventProducer = Flux.create(sink -> {
sink.onRequest(request -> {
if (request != Long.MAX_VALUE) {
System.out.println("Backpressure Request(" + request + ")");
LongStream.range(0, request)
.mapToObj(String::valueOf)
.forEach(message -> {
try {
TimeUnit.MILLISECONDS.sleep(eventProduceDelayInMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Producing:" + message);
sink.next(message);
});
} else {
sink.error(new RuntimeException("No_Backpressure unsupported"));
}
});
});
// -- The Event Consumer
// The consumer using windowTimeout that batches maximum 10 events with a max wait time of 1 second.
// Given the Event producer produces at most 4 events per second (due to 250 ms delay between events),
// the consumer should receive 3-4 events.
//
final int eventConsumeDelayInMillis = 0;
final Scheduler scheduler = Schedulers.newBoundedElastic(10, 10000, "queued-tasks");
final AtomicBoolean hasError = new AtomicBoolean(false);
final Semaphore isCompleted = new Semaphore(1);
isCompleted.acquire();
Disposable subscription = eventProducer.windowTimeout(10, Duration.ofSeconds(1), true)
.concatMap(Flux::collectList)
.publishOn(scheduler)
.subscribe(eventBatch -> {
for (String event : eventBatch) {
System.out.println("Consuming: " + event);
try {
TimeUnit.MILLISECONDS.sleep(eventConsumeDelayInMillis);
} catch (InterruptedException e) {
System.err.println("Could not sleep for delay. Error: " + e);
}
}
System.out.println("Completed batch.");
}, error -> {
System.err.println("Error: " + error);
hasError.set(true);
isCompleted.release();
}, () -> {
System.out.println("Completed.");
isCompleted.release();
});
System.out.println("Running test...");
final Duration TIME_TO_PUBLISH_EVENTS = Duration.ofMinutes(1);
try {
assertFalse(isCompleted.tryAcquire(TIME_TO_PUBLISH_EVENTS.toMinutes(), TimeUnit.MINUTES),
"Should have been false because it would not error.");
assertFalse(hasError.get(), "Should not have received an error.");
} finally {
subscription.dispose();
}
System.out.println("Completed test.");
} Output
|
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
c69ca37
to
2ea6bca
Compare
@anuchandy updated implementation a bit. Also, added your tests which demonstrate that the new impl is working as expected (probably). Also, just FYI - operators like publishOn have a |
Thank you @OlegDokuka - we will get to testing this straight away. |
Hi @OlegDokuka, thanks of the updated impl and cleaning up the test. I’ll continue testing the impl. Just curious, the stress test you mentioned - is it a part of reactor-core public repo or is it kind of validations done internally? |
@anuchandy all existing stress tests could be found there -> https://github.com/reactor/reactor-core/tree/main/reactor-core/src/jcstress/java/reactor/core/publisher |
Thanks, @OlegDokuka. As part of validations, I'm trying to understand the various backpressure involved, IIUC, considering the following code, there are two categories of backpressure - Flux<Event> source = ..
Flux<Flux<Event>> sourceWindowed = source.windowTimeout(500, Duration.ofSeconds(3), ..);
If we chain an operator with
Is this understanding correct?
|
there is a backpressure equal to the max elements in the window. So
it depends on the downstream. For example using
yes, if downstream prefetches 50, then it will be stored in the
nope, only 500 can be requested at a time.
since only a single window can be opened at a time. There is not possible intersections between windows in that operator Hope, that explains everything, |
Thank you, @OlegDokuka, for the answers. Using the diff for the demand makes sense; we don't want to overread from the main source; combining that with the prolonged window (if no demand for new window) collecting any events after the timeout seems to ensure no event loss. I am continuing the test/profiling. Please tag me if you amend new commit(s), that are ready for validation. |
@anuchandy working on more fixes... will ping you |
Hi @OlegDokuka, Hope you're doing great; I just want to check; how is the implementation going? |
Any status update here @simonbasle and @OlegDokuka ? Thanks!! |
@JonathanGiles it is coming. Stay tuned. |
Thanks for the update, much appreciated. We are very eager to see this improvement! |
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
Signed-off-by: Oleh Dokuka <[email protected]>
thanks @OlegDokuka, just tag me when the next draft is ready, so that I can pull it to test via our use cases |
Hi @OlegDokuka, I hope you're doing good. Just checking, how is the work progressing? P lease let me know once you have the next draft to test. |
@anuchandy this PR is superseed by the #3054 which is about to be merged and going to be released next Tuesday. Stay tuned |
close since is superseded by #3054 |
closes #1099
Signed-off-by: Oleh Dokuka [email protected]