Skip to content

Commit

Permalink
fix(@aws-amplify/datastore) Adding socket disconnection detection (#5086
Browse files Browse the repository at this point in the history
)
  • Loading branch information
elorzafe authored Mar 13, 2020
1 parent dae4824 commit 28ff4c9
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const DB_NAME = '@AmplifyDatastore';
const COLLECTION = 'Collection';
const DATA = 'Data';

//TODO: Consider refactoring to a batch save operation.
// TODO: Consider refactoring to a batch save operation.
class AsyncStorageDatabase {
async save<T extends PersistentModel>(item: T, storeName: string) {
const itemKey = this.getKeyForItem(storeName, item.id);
Expand Down
56 changes: 56 additions & 0 deletions packages/datastore/src/sync/datastoreConnectivity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import * as Observable from 'zen-observable';
import { ConsoleLogger as Logger, Reachability } from '@aws-amplify/core';

const logger = new Logger('DataStore');

const RECONNECTING_IN = 5000; // 5s this may be configurable in the future

type ConnectionStatus = {
// Might add other params in the future
online: boolean;
};

export default class DataStoreConnectivity {
private connectionStatus: ConnectionStatus;
private observer: ZenObservable.SubscriptionObserver<ConnectionStatus>;
constructor() {
this.connectionStatus = {
online: false,
};
}

status(): Observable<ConnectionStatus> {
if (this.observer) {
throw new Error('Subscriber already exists');
}
return new Observable(observer => {
this.observer = observer;
// Will be used to forward socket connection changes, enhancing Reachability

const subs = new Reachability()
.networkMonitor()
.subscribe(({ online }) => {
this.connectionStatus.online = online;

const observerResult = { ...this.connectionStatus }; // copyOf status

observer.next(observerResult);
});

return () => {
subs.unsubscribe();
};
});
}

socketDisconnected() {
if (this.observer && typeof this.observer.next === 'function') {
this.observer.next({ online: false }); // Notify network issue from the socket

setTimeout(() => {
const observerResult = { ...this.connectionStatus }; // copyOf status
this.observer.next(observerResult);
}, RECONNECTING_IN); // giving time for socket cleanup and network status stabilization
}
}
}
41 changes: 34 additions & 7 deletions packages/datastore/src/sync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import {
ModelInit,
MutableModel,
NamespaceResolver,
PersistentModel,
PersistentModelConstructor,
SchemaModel,
SchemaNamespace,
Expand All @@ -26,6 +25,8 @@ import {
predicateToGraphQLCondition,
TransformerMutationType,
} from './utils';
import DataStoreConnectivity from './datastoreConnectivity';
import { CONTROL_MSG as PUBSUB_CONTROL_MSG } from '@aws-amplify/pubsub';

const logger = new Logger('DataStore');

Expand Down Expand Up @@ -134,17 +135,26 @@ export class SyncEngine {
return;
}

new Reachability().networkMonitor().subscribe(async ({ online }) => {
this.online = online;
if (online) {
const datastoreConnectivity = new DataStoreConnectivity();

datastoreConnectivity.status().subscribe(async ({ online }) => {
if (online && !this.online) {
// From offline to online
//#region GraphQL Subscriptions
const [
ctlSubsObservable,
dataSubsObservable,
] = this.subscriptionsProcessor.start();

const errorHandler = this.disconnectionHandler(
datastoreConnectivity
);
try {
subscriptions.push(
await this.waitForSubscriptionsReady(ctlSubsObservable)
await this.waitForSubscriptionsReady(
ctlSubsObservable,
errorHandler
)
);
} catch (err) {
observer.error(err);
Expand Down Expand Up @@ -225,10 +235,11 @@ export class SyncEngine {
}
)
);
} else {
} else if (!online) {
subscriptions.forEach(sub => sub.unsubscribe());
subscriptions = [];
}
this.online = online;
});

this.storage
Expand Down Expand Up @@ -397,8 +408,23 @@ export class SyncEngine {
});
}

private disconnectionHandler(
datastoreConnectivity: DataStoreConnectivity
): (msg: string) => void {
return (msg: string) => {
// This implementation is tight to AWSAppSyncRealTimeProvider 'Connection closed', 'Timeout disconnect' msg
if (
PUBSUB_CONTROL_MSG.CONNECTION_CLOSED === msg ||
PUBSUB_CONTROL_MSG.TIMEOUT_DISCONNECT === msg
) {
datastoreConnectivity.socketDisconnected();
}
};
}

private async waitForSubscriptionsReady(
ctlSubsObservable: Observable<CONTROL_MSG>
ctlSubsObservable: Observable<CONTROL_MSG>,
errorHandler: (msg: string) => void
): Promise<ZenObservable.Subscription> {
return new Promise((resolve, reject) => {
const subscription = ctlSubsObservable.subscribe({
Expand All @@ -409,6 +435,7 @@ export class SyncEngine {
},
error: err => {
reject(`subscription failed ${err}`);
errorHandler(err);
},
});
});
Expand Down
31 changes: 21 additions & 10 deletions packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
import Cache from '@aws-amplify/cache';
import Auth from '@aws-amplify/auth';
import { AbstractPubSubProvider } from './PubSubProvider';
import { CONTROL_MSG } from '@aws-amplify/pubsub';

const logger = new Logger('AWSAppSyncRealTimeProvider');

Expand Down Expand Up @@ -148,6 +149,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider {
private keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT;
private subscriptionObserverMap: Map<string, ObserverQuery> = new Map();
private promiseArray: Array<{ res: Function; rej: Function }> = [];

getProviderName() {
return 'AWSAppSyncRealTimeProvider';
}
Expand Down Expand Up @@ -191,17 +193,24 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider {
try {
// Waiting that subscription has been connected before trying to unsubscribe
await this._waitForSubscriptionToBeConnected(subscriptionId);
const { subscriptionState } = this.subscriptionObserverMap.get(
subscriptionId
);

const { subscriptionState } =
this.subscriptionObserverMap.get(subscriptionId) || {};

if (!subscriptionState) {
// subscription already unsubscribed
return;
}

if (subscriptionState === SUBSCRIPTION_STATUS.CONNECTED) {
this._sendUnsubscriptionMessage(subscriptionId);
} else {
throw new Error('Subscription fail, start removing subscription');
throw new Error('Subscription never connected');
}
} catch (err) {
logger.debug(`Error while unsubscribing ${err}`);
} finally {
this._removeSubscriptionObserver(subscriptionId);
return;
}
};
}
Expand Down Expand Up @@ -361,8 +370,6 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider {
};
const stringToAWSRealTime = JSON.stringify(unsubscribeMessage);
this.awsRealTimeSocket.send(stringToAWSRealTime);

this._removeSubscriptionObserver(subscriptionId);
}
} catch (err) {
// If GQL_STOP is not sent because of disconnection issue, then there is nothing the client can do
Expand Down Expand Up @@ -394,6 +401,9 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider {
logger.debug('closing WebSocket...');
clearTimeout(this.keepAliveTimeoutId);
const tempSocket = this.awsRealTimeSocket;
// Cleaning callbacks to avoid race condition, socket still exists
tempSocket.onclose = undefined;
tempSocket.onerror = undefined;
tempSocket.close(1000);
this.awsRealTimeSocket = null;
this.socketStatus = SOCKET_STATUS.CLOSED;
Expand Down Expand Up @@ -456,7 +466,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider {
if (type === MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE) {
clearTimeout(this.keepAliveTimeoutId);
this.keepAliveTimeoutId = setTimeout(
this._errorDisconnect.bind(this, 'Timeout disconnect'),
this._errorDisconnect.bind(this, CONTROL_MSG.TIMEOUT_DISCONNECT),
this.keepAliveTimeout
);
return;
Expand Down Expand Up @@ -493,6 +503,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider {
}

private _errorDisconnect(msg: string) {
logger.debug(`Disconnect error: ${msg}`);
this.subscriptionObserverMap.forEach(({ observer }) => {
if (!observer.closed) {
observer.error({
Expand Down Expand Up @@ -636,7 +647,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider {
return new Promise((res, rej) => {
let ackOk = false;
this.awsRealTimeSocket.onerror = error => {
logger.debug(`WebSocket closed ${JSON.stringify(error)}`);
logger.debug(`WebSocket error ${JSON.stringify(error)}`);
};
this.awsRealTimeSocket.onclose = event => {
logger.debug(`WebSocket closed ${event.reason}`);
Expand All @@ -662,7 +673,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider {
);
this.awsRealTimeSocket.onerror = err => {
logger.debug(err);
this._errorDisconnect('Connection closed');
this._errorDisconnect(CONTROL_MSG.CONNECTION_CLOSED);
};
res('Cool, connected to AWS AppSyncRealTime');
return;
Expand Down
8 changes: 7 additions & 1 deletion packages/pubsub/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import Amplify, { ConsoleLogger as Logger } from '@aws-amplify/core';

const logger = new Logger('PubSub');

enum CONTROL_MSG {
CONNECTION_CLOSED = 'Connection closed',
TIMEOUT_DISCONNECT = 'Timeout disconnect',
}

let _instance: PubSubClass = null;

if (!_instance) {
Expand All @@ -29,4 +34,5 @@ Amplify.register(PubSub);
export default PubSub;

export * from './Providers/AWSIotProvider';
export { PubSubClass };

export { PubSubClass, CONTROL_MSG };

0 comments on commit 28ff4c9

Please sign in to comment.