Skip to content

Commit

Permalink
Revert "Fixed an issue where event streams might fail with ClassCastE…
Browse files Browse the repository at this point in the history
…xception or NoSuchElementException. (#2684)"

This reverts commit 135b373.
  • Loading branch information
millems committed Sep 1, 2021
1 parent 2150aa8 commit fcb4ef3
Show file tree
Hide file tree
Showing 12 changed files with 402 additions and 762 deletions.
4 changes: 4 additions & 0 deletions core/aws-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@
<artifactId>utils</artifactId>
<version>${awsjavasdk.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.eventstream</groupId>
<artifactId>eventstream</artifactId>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ public void onComplete() {
.executor(Executors.newSingleThreadExecutor())
.future(new CompletableFuture<>())
.build();
transformer.prepare();
transformer.onStream(SdkPublisher.adapt(bytePublisher));
latch.await();
assertThat(numEvents)
Expand Down Expand Up @@ -328,10 +327,9 @@ private void verifyExceptionThrown(Map<String, HeaderValue> headers) {

Flowable<ByteBuffer> bytePublisher = Flowable.just(exceptionMessage.toByteBuffer());

SubscribingResponseHandler handler = new SubscribingResponseHandler();
AsyncResponseTransformer<SdkResponse, Void> transformer =
EventStreamAsyncResponseTransformer.builder()
.eventStreamResponseHandler(handler)
.eventStreamResponseHandler(new SubscribingResponseHandler())
.exceptionResponseHandler((response, executionAttributes) -> exception)
.executor(Executors.newSingleThreadExecutor())
.future(new CompletableFuture<>())
Expand All @@ -345,16 +343,13 @@ private void verifyExceptionThrown(Map<String, HeaderValue> headers) {
cf.join();
} catch (CompletionException e) {
if (e.getCause() instanceof SdkServiceException) {
throw e.getCause();
throw ((SdkServiceException) e.getCause());
}
}
}).isSameAs(exception);

assertThat(handler.exceptionOccurredCalled).isTrue();
}

private static class SubscribingResponseHandler implements EventStreamResponseHandler<Object, Object> {
private volatile boolean exceptionOccurredCalled = false;

@Override
public void responseReceived(Object response) {
Expand All @@ -368,7 +363,6 @@ public void onEventStream(SdkPublisher<Object> publisher) {

@Override
public void exceptionOccurred(Throwable throwable) {
exceptionOccurredCalled = true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.utils.async.BufferingSubscriber;
import software.amazon.awssdk.utils.async.EventListeningSubscriber;
import software.amazon.awssdk.utils.async.FilteringSubscriber;
import software.amazon.awssdk.utils.async.FlatteningSubscriber;
import software.amazon.awssdk.utils.async.LimitingSubscriber;
Expand Down Expand Up @@ -118,36 +116,6 @@ default SdkPublisher<T> limit(int limit) {
return subscriber -> subscribe(new LimitingSubscriber<>(subscriber, limit));
}

/**
* Add a callback that will be invoked after this publisher invokes {@link Subscriber#onComplete()}.
*
* @param afterOnComplete The logic that should be run immediately after onComplete.
* @return New publisher that invokes the requested callback.
*/
default SdkPublisher<T> doAfterOnComplete(Runnable afterOnComplete) {
return subscriber -> subscribe(new EventListeningSubscriber<>(subscriber, afterOnComplete, null, null));
}

/**
* Add a callback that will be invoked after this publisher invokes {@link Subscriber#onError(Throwable)}.
*
* @param afterOnError The logic that should be run immediately after onError.
* @return New publisher that invokes the requested callback.
*/
default SdkPublisher<T> doAfterOnError(Consumer<Throwable> afterOnError) {
return subscriber -> subscribe(new EventListeningSubscriber<>(subscriber, null, afterOnError, null));
}

/**
* Add a callback that will be invoked after this publisher invokes {@link Subscription#cancel()}.
*
* @param afterOnCancel The logic that should be run immediately after cancellation of the subscription.
* @return New publisher that invokes the requested callback.
*/
default SdkPublisher<T> doAfterOnCancel(Runnable afterOnCancel) {
return subscriber -> subscribe(new EventListeningSubscriber<>(subscriber, null, null, afterOnCancel));
}

/**
* Subscribes to the publisher with the given {@link Consumer}. This consumer will be called for each event
* published. There is no backpressure using this method if the Consumer dispatches processing asynchronously. If more
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -203,9 +201,9 @@ private List<SubscribeToShardEventStream> subscribeToShard() throws Throwable {
SubscribeToShardResponseHandler.builder()
.subscriber(events::add)
.build())
.get(10, TimeUnit.SECONDS);
.join();
return events;
} catch (ExecutionException e) {
} catch (CompletionException e) {
throw e.getCause();
}
}
Expand Down Expand Up @@ -236,6 +234,9 @@ public void request(long l) {

@Override
public void cancel() {
RuntimeException e = new RuntimeException();
subscriber.onError(e);
value.onError(e);
}
}));
return cf;
Expand Down
5 changes: 0 additions & 5 deletions utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,6 @@
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@

package software.amazon.awssdk.utils.async;

import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkProtectedApi;

@SdkProtectedApi
public abstract class DelegatingSubscriber<T, U> implements Subscriber<T> {

protected final Subscriber<? super U> subscriber;
private final AtomicBoolean complete = new AtomicBoolean(false);

protected DelegatingSubscriber(Subscriber<? super U> subscriber) {
this.subscriber = subscriber;
Expand All @@ -36,15 +35,12 @@ public void onSubscribe(Subscription subscription) {

@Override
public void onError(Throwable throwable) {
if (complete.compareAndSet(false, true)) {
subscriber.onError(throwable);
}
subscriber.onError(throwable);
}

@Override
public void onComplete() {
if (complete.compareAndSet(false, true)) {
subscriber.onComplete();
}
subscriber.onComplete();
}

}

This file was deleted.

Loading

0 comments on commit fcb4ef3

Please sign in to comment.