Skip to content

Commit

Permalink
Fix Reactive Streams 3.3 violation in PublisherBasedStreamMessage (#…
Browse files Browse the repository at this point in the history
…4298)

Motivation:

Working on #4058, I found `PublisherBasedStreamMessage` failed with
`required_spec303_mustNotAllowUnboundedRecursion`.
```java
Gradle suite > Gradle test > com.linecorp.armeria.common.stream.DefaultByteStreamMessageTckTest > required_spec303_mustNotAllowUnboundedRecursion FAILED
    java.lang.AssertionError: Got 2 onNext calls within thread: Thread[armeria-common-worker-epoll-2-3,5,main], yet expected recursive bound was 1
        at org.testng.Assert.fail(Assert.java:110)
        at org.reactivestreams.tck.TestEnvironment.flop(TestEnvironment.java:229)
        at org.reactivestreams.tck.PublisherVerification$19$2.onNext(PublisherVerification.java:781)
        at com.linecorp.armeria.common.stream.PublisherBasedStreamMessage$AbortableSubscriber.onNext0(PublisherBasedStreamMessage.java:337)
        at com.linecorp.armeria.common.stream.PublisherBasedStreamMessage$AbortableSubscriber.onNext(PublisherBasedStreamMessage.java:323)
        at reactor.core.publisher.StrictSubscriber.onNext(StrictSubscriber.java:89)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:156)
        at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:111)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169)
        at reactor.core.publisher.StrictSubscriber.request(StrictSubscriber.java:138)
        at com.linecorp.armeria.common.stream.PublisherBasedStreamMessage$AbortableSubscriber.request(PublisherBasedStreamMessage.java:228)
        at org.reactivestreams.tck.PublisherVerification$19$2.onNext(PublisherVerification.java:795)
        at com.linecorp.armeria.common.stream.PublisherBasedStreamMessage$AbortableSubscriber.onNext0(PublisherBasedStreamMessage.java:337)
```
https://github.com/line/armeria/runs/6831427505?check_suite_focus=true#step:7:2706

`Flux.range()` was used as an upstream for testing. `Flux.range()` itself passed 
the Reactive Streams TCK but when it was wrapped by `PublishBasedStreamMessage`, 
it failed.

After digging into the problem I found that a thread switching caused the problem.
The `subscrption.request()` started with `main` thread and it was switched the
`executor` when `onNext()` is called.
https://github.com/line/armeria/blob/8e61fc1f66f357d608a77b3f7602e12ee12907f1/core/src/main/java/com/linecorp/armeria/common/stream/PublisherBasedStreamMessage.java#L322-L326
The second `subscription.request()` was called with the `executor`.
This rescheduling occurred with unbounded recursive calls because
`Flux.range()` can correctly handle the recursions when each `request()` is
called in the same thread.

Modifications:

- Changed to call `subscription.request(n)` in an event loop always.
  - This will also fix inconstitency of execution orders between
  `increaseDemand()` and `subscription.request()`

Result:

- Abide by Reactive Streams 3.3
- Fix possible deep recursive calls in `PublisherBasedStreamMessage`
  • Loading branch information
ikhoon authored Jun 23, 2022
1 parent 4f85eb9 commit 5edaef0
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public final CompletableFuture<Void> whenComplete() {
static final class AbortableSubscriber implements Subscriber<Object>, Subscription {
private final PublisherBasedStreamMessage<?> parent;
private final EventExecutor executor;
private boolean withPooledObjects;
private final boolean withPooledObjects;
private final boolean notifyCancellation;
private Subscriber<Object> subscriber;
@Nullable
Expand All @@ -222,10 +222,13 @@ public void request(long n) {

if (executor.inEventLoop()) {
increaseDemand(n);
subscription.request(n);
} else {
executor.execute(() -> increaseDemand(n));
executor.execute(() -> {
increaseDemand(n);
subscription.request(n);
});
}
subscription.request(n);
}

private void increaseDemand(long n) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2022 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common.stream;

import org.reactivestreams.Publisher;
import org.reactivestreams.tck.PublisherVerification;
import org.reactivestreams.tck.TestEnvironment;
import org.testng.annotations.Test;

import reactor.core.publisher.Flux;

@Test
public class PublisherBasedStreamMessageTck extends PublisherVerification<Integer> {

public PublisherBasedStreamMessageTck() {
super(new TestEnvironment(200));
}

@Override
public Publisher<Integer> createPublisher(long elements) {
return StreamMessage.of(Flux.range(0, (int) elements));
}

@Override
public Publisher<Integer> createFailedPublisher() {
return null;
}

@Override
public long maxElementsFromPublisher() {
return Integer.MAX_VALUE;
}
}

0 comments on commit 5edaef0

Please sign in to comment.