Skip to content

Commit

Permalink
Fixed an issue where event streams might fail with ClassCastException…
Browse files Browse the repository at this point in the history
… or NoSuchElementException. (#2684)

Fixed an issue where event streams might fail with ClassCastException or NoSuchElementException.

This issue was caused by a race condition in EventStreamAsyncResponseTransformer. A simpler fix was done as part of #2678, but the entire class was overly-complex and difficult to understand. This change simplifies the transformer implementation to use SdkIterable's built-in methods.
  • Loading branch information
millems authored Aug 31, 2021
1 parent bb20fd4 commit 135b373
Show file tree
Hide file tree
Showing 14 changed files with 774 additions and 402 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AWSSDKforJavav2-974558b.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"category": "AWS SDK for Java v2",
"contributor": "",
"type": "bugfix",
"description": "Fixed an issue where event streams might fail with ClassCastException or NoSuchElementExceptions"
}
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AWSSDKforJavav2-d2a3922.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"category": "AWS SDK for Java v2",
"contributor": "",
"type": "feature",
"description": "Added new convenience methods to SdkPublisher: doAfterOnError, doAfterOnComplete, and doAfterCancel."
}
4 changes: 0 additions & 4 deletions core/aws-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@
<artifactId>utils</artifactId>
<version>${awsjavasdk.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.eventstream</groupId>
<artifactId>eventstream</artifactId>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public void onComplete() {
.executor(Executors.newSingleThreadExecutor())
.future(new CompletableFuture<>())
.build();
transformer.prepare();
transformer.onStream(SdkPublisher.adapt(bytePublisher));
latch.await();
assertThat(numEvents)
Expand Down Expand Up @@ -327,9 +328,10 @@ private void verifyExceptionThrown(Map<String, HeaderValue> headers) {

Flowable<ByteBuffer> bytePublisher = Flowable.just(exceptionMessage.toByteBuffer());

SubscribingResponseHandler handler = new SubscribingResponseHandler();
AsyncResponseTransformer<SdkResponse, Void> transformer =
EventStreamAsyncResponseTransformer.builder()
.eventStreamResponseHandler(new SubscribingResponseHandler())
.eventStreamResponseHandler(handler)
.exceptionResponseHandler((response, executionAttributes) -> exception)
.executor(Executors.newSingleThreadExecutor())
.future(new CompletableFuture<>())
Expand All @@ -343,13 +345,16 @@ private void verifyExceptionThrown(Map<String, HeaderValue> headers) {
cf.join();
} catch (CompletionException e) {
if (e.getCause() instanceof SdkServiceException) {
throw ((SdkServiceException) e.getCause());
throw e.getCause();
}
}
}).isSameAs(exception);

assertThat(handler.exceptionOccurredCalled).isTrue();
}

private static class SubscribingResponseHandler implements EventStreamResponseHandler<Object, Object> {
private volatile boolean exceptionOccurredCalled = false;

@Override
public void responseReceived(Object response) {
Expand All @@ -363,6 +368,7 @@ public void onEventStream(SdkPublisher<Object> publisher) {

@Override
public void exceptionOccurred(Throwable throwable) {
exceptionOccurredCalled = true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.utils.async.BufferingSubscriber;
import software.amazon.awssdk.utils.async.EventListeningSubscriber;
import software.amazon.awssdk.utils.async.FilteringSubscriber;
import software.amazon.awssdk.utils.async.FlatteningSubscriber;
import software.amazon.awssdk.utils.async.LimitingSubscriber;
Expand Down Expand Up @@ -116,6 +118,36 @@ default SdkPublisher<T> limit(int limit) {
return subscriber -> subscribe(new LimitingSubscriber<>(subscriber, limit));
}

/**
* Add a callback that will be invoked after this publisher invokes {@link Subscriber#onComplete()}.
*
* @param afterOnComplete The logic that should be run immediately after onComplete.
* @return New publisher that invokes the requested callback.
*/
default SdkPublisher<T> doAfterOnComplete(Runnable afterOnComplete) {
return subscriber -> subscribe(new EventListeningSubscriber<>(subscriber, afterOnComplete, null, null));
}

/**
* Add a callback that will be invoked after this publisher invokes {@link Subscriber#onError(Throwable)}.
*
* @param afterOnError The logic that should be run immediately after onError.
* @return New publisher that invokes the requested callback.
*/
default SdkPublisher<T> doAfterOnError(Consumer<Throwable> afterOnError) {
return subscriber -> subscribe(new EventListeningSubscriber<>(subscriber, null, afterOnError, null));
}

/**
* Add a callback that will be invoked after this publisher invokes {@link Subscription#cancel()}.
*
* @param afterOnCancel The logic that should be run immediately after cancellation of the subscription.
* @return New publisher that invokes the requested callback.
*/
default SdkPublisher<T> doAfterOnCancel(Runnable afterOnCancel) {
return subscriber -> subscribe(new EventListeningSubscriber<>(subscriber, null, null, afterOnCancel));
}

/**
* Subscribes to the publisher with the given {@link Consumer}. This consumer will be called for each event
* published. There is no backpressure using this method if the Consumer dispatches processing asynchronously. If more
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -201,9 +203,9 @@ private List<SubscribeToShardEventStream> subscribeToShard() throws Throwable {
SubscribeToShardResponseHandler.builder()
.subscriber(events::add)
.build())
.join();
.get(10, TimeUnit.SECONDS);
return events;
} catch (CompletionException e) {
} catch (ExecutionException e) {
throw e.getCause();
}
}
Expand Down Expand Up @@ -234,9 +236,6 @@ public void request(long l) {

@Override
public void cancel() {
RuntimeException e = new RuntimeException();
subscriber.onError(e);
value.onError(e);
}
}));
return cf;
Expand Down
5 changes: 5 additions & 0 deletions utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@

package software.amazon.awssdk.utils.async;

import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkProtectedApi;

@SdkProtectedApi
public abstract class DelegatingSubscriber<T, U> implements Subscriber<T> {

protected final Subscriber<? super U> subscriber;
private final AtomicBoolean complete = new AtomicBoolean(false);

protected DelegatingSubscriber(Subscriber<? super U> subscriber) {
this.subscriber = subscriber;
Expand All @@ -35,12 +36,15 @@ public void onSubscribe(Subscription subscription) {

@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
if (complete.compareAndSet(false, true)) {
subscriber.onError(throwable);
}
}

@Override
public void onComplete() {
subscriber.onComplete();
if (complete.compareAndSet(false, true)) {
subscriber.onComplete();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.utils.async;

import java.util.function.Consumer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.utils.Logger;

/**
* A {@link Subscriber} that can invoke callbacks during various parts of the subscriber and subscription lifecycle.
*/
@SdkProtectedApi
public final class EventListeningSubscriber<T> extends DelegatingSubscriber<T, T> {
private static final Logger log = Logger.loggerFor(EventListeningSubscriber.class);

private final Runnable afterCompleteListener;
private final Consumer<Throwable> afterErrorListener;
private final Runnable afterCancelListener;

public EventListeningSubscriber(Subscriber<T> subscriber,
Runnable afterCompleteListener,
Consumer<Throwable> afterErrorListener,
Runnable afterCancelListener) {
super(subscriber);
this.afterCompleteListener = afterCompleteListener;
this.afterErrorListener = afterErrorListener;
this.afterCancelListener = afterCancelListener;
}

@Override
public void onNext(T t) {
super.subscriber.onNext(t);
}

@Override
public void onSubscribe(Subscription subscription) {
super.onSubscribe(new CancelListeningSubscriber(subscription));
}

@Override
public void onError(Throwable throwable) {
super.onError(throwable);
if (afterErrorListener != null) {
callListener(() -> afterErrorListener.accept(throwable),
"Post-onError callback failed. This exception will be dropped.");
}
}

@Override
public void onComplete() {
super.onComplete();
callListener(afterCompleteListener, "Post-onComplete callback failed. This exception will be dropped.");
}

private class CancelListeningSubscriber extends DelegatingSubscription {
protected CancelListeningSubscriber(Subscription s) {
super(s);
}

@Override
public void cancel() {
super.cancel();
callListener(afterCompleteListener, "Post-cancel callback failed. This exception will be dropped.");
}
}

private void callListener(Runnable listener, String listenerFailureMessage) {
if (listener != null) {
try {
listener.run();
} catch (RuntimeException e) {
log.error(() -> listenerFailureMessage, e);
}
}
}
}
Loading

0 comments on commit 135b373

Please sign in to comment.