Skip to content

Commit

Permalink
Merge branch 'main' into granular-read
Browse files Browse the repository at this point in the history
  • Loading branch information
gpanshu authored Aug 23, 2023
2 parents 881769c + 07c0de3 commit 3e67d82
Show file tree
Hide file tree
Showing 10 changed files with 489 additions and 47 deletions.
2 changes: 2 additions & 0 deletions aws-api/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ dependencies {
testImplementation(libs.test.jsonassert)
testImplementation(libs.test.junit)
testImplementation(libs.test.mockito.core)
testImplementation(libs.test.mockk)
testImplementation(libs.test.kotest.assertions)
testImplementation(libs.test.mockwebserver)
testImplementation(libs.rxjava)
testImplementation(libs.test.robolectric)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ private <R> GraphQLOperation<R> buildSubscriptionOperation(
if (AuthModeStrategyType.MULTIAUTH.equals(authModeStrategyType)) {
// If it gets here, we know that the request is an AppSyncGraphQLRequest because
// getAuthModeStrategyType checks for that, so we can safely cast the graphQLRequest.
return MutiAuthSubscriptionOperation.<R>builder()
return MultiAuthSubscriptionOperation.<R>builder()
.subscriptionEndpoint(clientDetails.getSubscriptionEndpoint())
.graphQlRequest((AppSyncGraphQLRequest<R>) graphQLRequest)
.responseFactory(gqlResponseFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package com.amplifyframework.api.aws;

import androidx.annotation.NonNull;
import androidx.annotation.VisibleForTesting;

import com.amplifyframework.AmplifyException;
import com.amplifyframework.api.ApiException;
import com.amplifyframework.api.ApiException.ApiAuthException;
import com.amplifyframework.api.aws.auth.AuthRuleRequestDecorator;
Expand All @@ -38,7 +38,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

final class MutiAuthSubscriptionOperation<T> extends GraphQLOperation<T> {
final class MultiAuthSubscriptionOperation<T> extends GraphQLOperation<T> {
private static final Logger LOG = Amplify.Logging.logger(CategoryType.API, "amplify:aws-api");

private final SubscriptionEndpoint subscriptionEndpoint;
Expand All @@ -49,12 +49,11 @@ final class MutiAuthSubscriptionOperation<T> extends GraphQLOperation<T> {
private final Action onSubscriptionComplete;
private final AtomicBoolean canceled;
private final AuthRuleRequestDecorator requestDecorator;

private AuthorizationTypeIterator authTypes;
private String subscriptionId;
private Future<?> subscriptionFuture;

private MutiAuthSubscriptionOperation(Builder<T> builder) {
private MultiAuthSubscriptionOperation(Builder<T> builder) {
super(builder.graphQlRequest, builder.responseFactory);
this.subscriptionEndpoint = builder.subscriptionEndpoint;
this.onSubscriptionStart = builder.onSubscriptionStart;
Expand Down Expand Up @@ -115,12 +114,12 @@ private void dispatchRequest() {
request,
authorizationType,
subscriptionId -> {
MutiAuthSubscriptionOperation.this.subscriptionId = subscriptionId;
MultiAuthSubscriptionOperation.this.subscriptionId = subscriptionId;
onSubscriptionStart.accept(subscriptionId);
},
response -> {
if (response.hasErrors() && hasAuthRelatedErrors(response) && authTypes.hasNext()) {
// If there are auth-related errors, dispatch an ApiAuthException
// If there are auth-related errors queue up a retry with the next authType
executorService.submit(this::dispatchRequest);
} else {
// Otherwise, we just want to dispatch it as a next item and
Expand All @@ -139,8 +138,10 @@ private void dispatchRequest() {
onSubscriptionComplete
);
} else {
emitErrorAndCancelSubscription(new ApiException("Unable to establish subscription connection.",
AmplifyException.TODO_RECOVERY_SUGGESTION));
emitErrorAndCancelSubscription(new ApiAuthException(
"Unable to establish subscription connection with any of the compatible auth types.",
"Check your application logs for detail."
));
}

}
Expand Down Expand Up @@ -179,6 +180,21 @@ private void emitErrorAndCancelSubscription(ApiException apiException) {
onSubscriptionError.accept(apiException);
}

@VisibleForTesting
boolean isCanceled() {
return canceled.get();
}

@VisibleForTesting
void setCanceled(boolean canceled) {
this.canceled.set(canceled);
}

@VisibleForTesting
Future<?> getSubscriptionFuture() {
return subscriptionFuture;
}

static final class Builder<T> {
private SubscriptionEndpoint subscriptionEndpoint;
private AppSyncGraphQLRequest<T> graphQlRequest;
Expand Down Expand Up @@ -244,8 +260,8 @@ public Builder<T> requestDecorator(AuthRuleRequestDecorator requestDecorator) {
}

@NonNull
public MutiAuthSubscriptionOperation<T> build() {
return new MutiAuthSubscriptionOperation<>(this);
public MultiAuthSubscriptionOperation<T> build() {
return new MultiAuthSubscriptionOperation<>(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ final class SubscriptionEndpoint {
private final TimeoutWatchdog timeoutWatchdog;
private final Set<String> pendingSubscriptionIds;
private final OkHttpClient okHttpClient;
private final Object webSocketLock = new Object();
private WebSocket webSocket;
private AmplifyWebSocketListener webSocketListener;

Expand Down Expand Up @@ -126,26 +127,35 @@ synchronized <T> void requestSubscription(
Objects.requireNonNull(onSubscriptionError);
Objects.requireNonNull(onSubscriptionComplete);

// The first call to subscribe OR a disconnected websocket listener will
// force a new connection to be created.
if (webSocketListener == null || webSocketListener.isDisconnectedState()) {
webSocketListener = new AmplifyWebSocketListener();
try {
webSocket = okHttpClient.newWebSocket(new Request.Builder()
.url(buildConnectionRequestUrl(authType))
.addHeader("Sec-WebSocket-Protocol", "graphql-ws")
.header("User-Agent", UserAgent.string())
.build(), webSocketListener);
} catch (ApiException apiException) {
onSubscriptionError.accept(apiException);
return;
final String subscriptionId = UUID.randomUUID().toString();
final AmplifyWebSocketListener socketListener;
final WebSocket socket;

synchronized (webSocketLock) {
// The first call to subscribe OR a disconnected websocket listener will
// force a new connection to be created.
if (webSocketListener == null || webSocketListener.isDisconnectedState()) {
webSocketListener = new AmplifyWebSocketListener();
try {
webSocket = okHttpClient.newWebSocket(new Request.Builder()
.url(buildConnectionRequestUrl(authType))
.addHeader("Sec-WebSocket-Protocol", "graphql-ws")
.header("User-Agent", UserAgent.string())
.build(), webSocketListener);
} catch (ApiException apiException) {
onSubscriptionError.accept(apiException);
return;
}

}

pendingSubscriptionIds.add(subscriptionId);
socketListener = webSocketListener;
socket = webSocket;
}
final String subscriptionId = UUID.randomUUID().toString();
pendingSubscriptionIds.add(subscriptionId);

// Every request waits here for the connection to be ready.
Connection connection = webSocketListener.waitForConnectionReady();
Connection connection = socketListener.waitForConnectionReady();
if (connection.hasFailure()) {
// If the latch didn't count all the way down
if (pendingSubscriptionIds.remove(subscriptionId)) {
Expand All @@ -166,7 +176,7 @@ synchronized <T> void requestSubscription(
.put("authorization", authorizer.createHeadersForSubscription(request, authType))))
.toString();

webSocket.send(jsonMessage);
socket.send(jsonMessage);
} catch (JSONException | ApiException exception) {
// If the subscriptionId was still pending, then we can call the onSubscriptionError
if (pendingSubscriptionIds.remove(subscriptionId)) {
Expand Down Expand Up @@ -273,8 +283,8 @@ synchronized void releaseSubscription(String subscriptionId) throws ApiException

// Only do this if the subscription was NOT pending.
// Otherwise it would probably fail since it was never established in the first place.

if (!wasSubscriptionPending && !webSocketListener.isDisconnectedState()) {
final AmplifyWebSocketListener socketListener = webSocketListener;
if (!wasSubscriptionPending && socketListener != null && !socketListener.isDisconnectedState()) {
try {
String jsonMessage = new JSONObject()
.put("type", "stop")
Expand All @@ -292,13 +302,15 @@ synchronized void releaseSubscription(String subscriptionId) throws ApiException
subscription.awaitSubscriptionCompleted();
}

subscriptions.remove(subscriptionId);

// If we have zero subscriptions, close the WebSocket
if (subscriptions.size() == 0) {
LOG.info("No more active subscriptions. Closing web socket.");
timeoutWatchdog.stop();
webSocket.close(NORMAL_CLOSURE_STATUS, "No active subscriptions");
synchronized (webSocketLock) {
subscriptions.remove(subscriptionId);
if (subscriptions.isEmpty() && pendingSubscriptionIds.isEmpty()) {
LOG.info("No more active subscriptions. Closing web socket.");
timeoutWatchdog.stop();
webSocket.close(NORMAL_CLOSURE_STATUS, "No active subscriptions");
webSocketListener = null;
}
}
}

Expand Down
Loading

0 comments on commit 3e67d82

Please sign in to comment.