Skip to content

Commit

Permalink
Respect SubscriptionOption.WITH_POOLED_OBJECTS in `PublisherBasedSt…
Browse files Browse the repository at this point in the history
…reamMessage` (#3617)

Motivation:
The implementations of `StreamMessage` in Armeria are in charge of releasing the pooled objects
when the subscriber subscribes without the `WITH_POOLED_OBJECTS` option.
However, `PublisherBasedStreamMessage` currentyly ignores `WITH_POOLED_OBJECTS` option.
We should respect that so that the subscriber does not have to deal with when it does not specify
`WITH_POOLED_OBJECTS` option.

Modifications:
- Respect `SubscriptionOption.WITH_POOLED_OBJECTS` in `PublisherBasedStreamMessage`.
  - The Subscriber who does not specify `WITH_POOLED_OBJECTS` when subscribing does not have to release
    the pooled objects by itself anymore.

Result:
- Close #3608
  - You can use Spring WebFlux integration with compression enabled.
- You do not have to release the pooled objects when subscribing `PublisherBasedStreamMessage`
  without `WITH_POOLED_OBJECTS` option.
  • Loading branch information
minwoox authored Jun 14, 2021
1 parent e59bd34 commit 5bd7af0
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.FormatMethod;
Expand All @@ -43,6 +44,7 @@
import com.linecorp.armeria.common.FixedHttpRequest.TwoElementFixedHttpRequest;
import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.common.stream.HttpDecoder;
import com.linecorp.armeria.common.stream.PublisherBasedStreamMessage;
import com.linecorp.armeria.common.stream.StreamMessage;
import com.linecorp.armeria.common.stream.SubscriptionOption;
import com.linecorp.armeria.common.util.EventLoopCheckingFuture;
Expand Down Expand Up @@ -263,6 +265,10 @@ static HttpRequest of(RequestHeaders headers, HttpData... contents) {

/**
* Creates a new instance from an existing {@link RequestHeaders} and {@link Publisher}.
*
* <p>Note that the {@link HttpObject}s in the {@link Publisher} are not released when
* {@link Subscription#cancel()} or {@link #abort()} is called. You should add a hook in order to
* release the elements. See {@link PublisherBasedStreamMessage} for more information.
*/
static HttpRequest of(RequestHeaders headers, Publisher<? extends HttpObject> publisher) {
requireNonNull(headers, "headers");
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/com/linecorp/armeria/common/HttpResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import com.google.errorprone.annotations.CheckReturnValue;
import com.google.errorprone.annotations.FormatMethod;
Expand All @@ -43,6 +44,7 @@
import com.linecorp.armeria.common.FixedHttpResponse.TwoElementFixedHttpResponse;
import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.common.stream.HttpDecoder;
import com.linecorp.armeria.common.stream.PublisherBasedStreamMessage;
import com.linecorp.armeria.common.stream.StreamMessage;
import com.linecorp.armeria.common.stream.SubscriptionOption;
import com.linecorp.armeria.common.util.EventLoopCheckingFuture;
Expand Down Expand Up @@ -387,6 +389,10 @@ static HttpResponse of(HttpObject... objs) {

/**
* Creates a new HTTP response whose stream is produced from an existing {@link Publisher}.
*
* <p>Note that the {@link HttpObject}s in the {@link Publisher} are not released when
* {@link Subscription#cancel()} or {@link #abort()} is called. You should add a hook in order to
* release the elements. See {@link PublisherBasedStreamMessage} for more information.
*/
static HttpResponse of(Publisher<? extends HttpObject> publisher) {
requireNonNull(publisher, "publisher");
Expand All @@ -400,6 +406,10 @@ static HttpResponse of(Publisher<? extends HttpObject> publisher) {
/**
* Creates a new HTTP response with the specified headers whose stream is produced from an existing
* {@link Publisher}.
*
* <p>Note that the {@link HttpObject}s in the {@link Publisher} are not released when
* {@link Subscription#cancel()} or {@link #abort()} is called. You should add a hook in order to
* release the elements. See {@link PublisherBasedStreamMessage} for more information.
*/
static HttpResponse of(ResponseHeaders headers, Publisher<? extends HttpObject> publisher) {
requireNonNull(headers, "headers");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.linecorp.armeria.common.stream;

import static com.linecorp.armeria.common.stream.StreamMessageUtil.containsNotifyCancellation;
import static com.linecorp.armeria.common.stream.StreamMessageUtil.containsWithPooledObjects;
import static com.linecorp.armeria.common.stream.SubscriberUtil.abortedOrLate;
import static com.linecorp.armeria.common.util.Exceptions.throwIfFatal;
import static java.util.Objects.requireNonNull;
Expand All @@ -39,13 +40,21 @@
import com.linecorp.armeria.common.util.CompositeException;
import com.linecorp.armeria.common.util.EventLoopCheckingFuture;
import com.linecorp.armeria.internal.common.stream.NoopSubscription;
import com.linecorp.armeria.unsafe.PooledObjects;

import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.ImmediateEventExecutor;

/**
* Adapts a {@link Publisher} into a {@link StreamMessage}.
*
* <p>Note that the elements in the {@link Publisher} are not released when {@link Subscription#cancel()} or
* {@link #abort()} is called. So you should add a hook in order to release the elements. You can use
* <a href="https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#doOnDiscard-java.lang.Class-java.util.function.Consumer-">doOnDiscard</a>
* if you are using Reactor, or you can use
* <a href="http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Observable.html#doOnDispose-io.reactivex.rxjava3.functions.Action-">doOnDispose</a>
* if you are using RxJava.
*
* @param <T> the type of element signaled
*/
@UnstableApi
Expand Down Expand Up @@ -97,33 +106,34 @@ public final long demand() {

@Override
public final void subscribe(Subscriber<? super T> subscriber, EventExecutor executor) {
subscribe0(subscriber, executor, false);
subscribe0(subscriber, executor, false, false);
}

@Override
public final void subscribe(Subscriber<? super T> subscriber, EventExecutor executor,
SubscriptionOption... options) {
requireNonNull(options, "options");

final boolean withPooledObjects = containsWithPooledObjects(options);
final boolean notifyCancellation = containsNotifyCancellation(options);
subscribe0(subscriber, executor, notifyCancellation);
subscribe0(subscriber, executor, withPooledObjects, notifyCancellation);
}

private void subscribe0(Subscriber<? super T> subscriber, EventExecutor executor,
boolean notifyCancellation) {
boolean withPooledObjects, boolean notifyCancellation) {
requireNonNull(subscriber, "subscriber");
requireNonNull(executor, "executor");

if (!subscribe1(subscriber, executor, notifyCancellation)) {
if (!subscribe1(subscriber, executor, withPooledObjects, notifyCancellation)) {
final AbortableSubscriber oldSubscriber = this.subscriber;
assert oldSubscriber != null;
failLateSubscriber(executor, subscriber, oldSubscriber.subscriber);
}
}

private boolean subscribe1(Subscriber<? super T> subscriber, EventExecutor executor,
boolean notifyCancellation) {
final AbortableSubscriber s = new AbortableSubscriber(this, subscriber, executor, notifyCancellation);
boolean withPooledObjects, boolean notifyCancellation) {
final AbortableSubscriber s =
new AbortableSubscriber(this, subscriber, executor, withPooledObjects, notifyCancellation);
if (!subscriberUpdater.compareAndSet(this, null, s)) {
return false;
}
Expand Down Expand Up @@ -173,7 +183,7 @@ private void abort0(Throwable cause) {

final AbortableSubscriber abortable = new AbortableSubscriber(this, AbortingSubscriber.get(cause),
ImmediateEventExecutor.INSTANCE,
false);
false, false);
if (!subscriberUpdater.compareAndSet(this, null, abortable)) {
this.subscriber.abort(cause);
return;
Expand All @@ -192,6 +202,7 @@ public final CompletableFuture<Void> whenComplete() {
static final class AbortableSubscriber implements Subscriber<Object>, Subscription {
private final PublisherBasedStreamMessage<?> parent;
private final EventExecutor executor;
private boolean withPooledObjects;
private final boolean notifyCancellation;
private Subscriber<Object> subscriber;
@Nullable
Expand All @@ -201,10 +212,11 @@ static final class AbortableSubscriber implements Subscriber<Object>, Subscripti

@SuppressWarnings("unchecked")
AbortableSubscriber(PublisherBasedStreamMessage<?> parent, Subscriber<?> subscriber,
EventExecutor executor, boolean notifyCancellation) {
EventExecutor executor, boolean withPooledObjects, boolean notifyCancellation) {
this.parent = parent;
this.subscriber = (Subscriber<Object>) subscriber;
this.executor = executor;
this.withPooledObjects = withPooledObjects;
this.notifyCancellation = notifyCancellation;
}

Expand Down Expand Up @@ -324,6 +336,9 @@ private void onNext0(Object obj) {
parent.demand--;
}
try {
if (!withPooledObjects) {
obj = PooledObjects.copyAndClose(obj);
}
subscriber.onNext(obj);
} catch (Throwable t) {
abort(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public Long createElement(int element) {

@Override
public Subscriber<Object> createSubscriber() {
return new AbortableSubscriber(publisher, NoopSubscriber.get(), ImmediateEventExecutor.INSTANCE, false);
return new AbortableSubscriber(publisher, NoopSubscriber.get(), ImmediateEventExecutor.INSTANCE,
false, false);
}

@Test(enabled = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ public void onComplete() {

@Test
void notifyCancellation() {
final HttpData data = HttpData.wrap(newPooledBuffer()).withEndOfStream();
final ByteBuf buf = newPooledBuffer();
final HttpData data = HttpData.wrap(buf).withEndOfStream();
final DefaultStreamMessage<HttpData> stream = new DefaultStreamMessage<>();
stream.write(data);
stream.close();
Expand All @@ -119,7 +120,7 @@ protected HttpData filter(HttpData obj) {
return obj;
}
};
SubscriptionOptionTest.notifyCancellation(filtered);
SubscriptionOptionTest.notifyCancellation(buf, filtered);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.linecorp.armeria.common.stream;

import static com.linecorp.armeria.common.stream.StreamMessageTest.newPooledBuffer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
Expand All @@ -40,6 +41,8 @@
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.stream.PublisherBasedStreamMessage.AbortableSubscriber;

import io.netty.buffer.ByteBuf;

class PublisherBasedStreamMessageTest {

/**
Expand Down Expand Up @@ -97,9 +100,11 @@ void testAbortWithoutSubscriber(@Nullable Throwable cause) {

@Test
void notifyCancellation() {
final ByteBuf buf = newPooledBuffer();
final DefaultStreamMessage<HttpData> delegate = new DefaultStreamMessage<>();
delegate.write(HttpData.wrap(buf));
final PublisherBasedStreamMessage<HttpData> p = new PublisherBasedStreamMessage<>(delegate);
SubscriptionOptionTest.notifyCancellation(p);
SubscriptionOptionTest.notifyCancellation(buf, p);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.linecorp.armeria.common.HttpData;

import io.netty.buffer.ByteBuf;
import reactor.core.publisher.Mono;

class SubscriptionOptionTest {

Expand Down Expand Up @@ -100,11 +101,11 @@ public void onComplete() {

@ParameterizedTest
@ArgumentsSource(PooledHttpDataStreamProvider.class)
void notifyCancellation(HttpData unused1, ByteBuf unused2, StreamMessage<HttpData> stream) {
notifyCancellation(stream);
void notifyCancellation(HttpData data, ByteBuf buf, StreamMessage<HttpData> stream) {
notifyCancellation(buf, stream);
}

static void notifyCancellation(StreamMessage<HttpData> stream) {
static void notifyCancellation(ByteBuf buf, StreamMessage<HttpData> stream) {
final AtomicBoolean completed = new AtomicBoolean();
stream.subscribe(new Subscriber<HttpData>() {
@Override
Expand All @@ -131,6 +132,7 @@ public void onComplete() {

await().untilAsserted(() -> assertThat(completed).isTrue());
await().untilAsserted(() -> assertThat(stream.whenComplete()).isCompletedExceptionally());
assertThat(buf.refCnt()).isZero();
}

static SubscriptionOption[] subscriptionOptions(boolean subscribedWithPooledObjects) {
Expand All @@ -145,7 +147,7 @@ private static class PooledHttpDataStreamProvider implements ArgumentsProvider {

@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
return Stream.of(defaultStream(), fixedStream(), deferredStream());
return Stream.of(defaultStream(), fixedStream(), deferredStream(), publisherBasedStream());
}

private static Arguments defaultStream() {
Expand Down Expand Up @@ -174,5 +176,14 @@ private static Arguments deferredStream() {
d.close();
return of(data, buf, deferredStream);
}

private static Arguments publisherBasedStream() {
final ByteBuf buf = newPooledBuffer();
final HttpData data = HttpData.wrap(buf).withEndOfStream();
final PublisherBasedStreamMessage<HttpData> publisherBasedStream =
new PublisherBasedStreamMessage<>(Mono.just(data)
.doOnDiscard(HttpData.class, HttpData::close));
return of(data, buf, publisherBasedStream);
}
}
}
Loading

0 comments on commit 5bd7af0

Please sign in to comment.