Skip to content

Commit

Permalink
Add http fallback for unavailable websockets
Browse files Browse the repository at this point in the history
  • Loading branch information
msujew committed Aug 24, 2021
1 parent ff9e050 commit 5e5b015
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 12 deletions.
3 changes: 2 additions & 1 deletion packages/core/src/browser/frontend-application-bindings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
********************************************************************************/

import { interfaces } from 'inversify';
import { bindContributionProvider, DefaultResourceProvider, MessageClient, MessageService, ResourceProvider, ResourceResolver } from '../common';
import { bindContributionProvider, DefaultResourceProvider, MessageClient, MessageService, MessageServiceFactory, ResourceProvider, ResourceResolver } from '../common';
import {
bindPreferenceSchemaProvider, PreferenceProvider,
PreferenceProviderProvider, PreferenceSchemaProvider, PreferenceScope,
Expand All @@ -24,6 +24,7 @@ import {

export function bindMessageService(bind: interfaces.Bind): interfaces.BindingWhenOnSyntax<MessageService> {
bind(MessageClient).toSelf().inSingletonScope();
bind(MessageServiceFactory).toFactory(({ container }) => () => container.get(MessageService));
return bind(MessageService).toSelf().inSingletonScope();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
********************************************************************************/

import { ContainerModule } from 'inversify';
import { WebSocketConnectionProvider } from './ws-connection-provider';
import { DEFAULT_HTTP_FALLBACK_OPTIONS, HttpFallbackOptions, WebSocketConnectionProvider } from './ws-connection-provider';

export const messagingFrontendModule = new ContainerModule(bind => {
bind(HttpFallbackOptions).toConstantValue(DEFAULT_HTTP_FALLBACK_OPTIONS);
bind(WebSocketConnectionProvider).toSelf().inSingletonScope();
});
116 changes: 110 additions & 6 deletions packages/core/src/browser/messaging/ws-connection-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************/

import { injectable, interfaces, decorate, unmanaged } from 'inversify';
import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event } from '../../common';
import { injectable, interfaces, decorate, unmanaged, inject } from 'inversify';
import { JsonRpcProxyFactory, JsonRpcProxy, Emitter, Event, MessageService, MessageServiceFactory } from '../../common';
import { WebSocketChannel } from '../../common/messaging/web-socket-channel';
import { Endpoint } from '../endpoint';
import ReconnectingWebSocket from 'reconnecting-websocket';
import { AbstractConnectionProvider } from '../../common/messaging/abstract-connection-provider';
import { v4 as uuid } from 'uuid';

decorate(injectable(), JsonRpcProxyFactory);
decorate(unmanaged(), JsonRpcProxyFactory, 0);
Expand All @@ -31,6 +32,29 @@ export interface WebSocketOptions {
reconnecting?: boolean;
}

export const HttpFallbackOptions = Symbol('HttpFallbackOptions');

export interface HttpFallbackOptions {
/** Determines whether Theia is allowed to use the http fallback. True by default. */
allowed: boolean;
/** Number of failed websocket connection attempts before the fallback is triggered. 2 by default. */
maxAttempts: number;
/** The maximum duration (in ms) after which the http request should timeout. 5000 by default. */
pollingTimeout: number;
/** The timeout duration (in ms) after a request was answered with an error code. 5000 by default. */
errorTimeout: number;
/** The minimum timeout duration (in ms) between two http requests. 0 by default. */
requestTimeout: number;
}

export const DEFAULT_HTTP_FALLBACK_OPTIONS: HttpFallbackOptions = {
allowed: true,
maxAttempts: 2,
errorTimeout: 5000,
pollingTimeout: 5000,
requestTimeout: 0
};

@injectable()
export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebSocketOptions> {

Expand All @@ -40,17 +64,27 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
protected readonly onSocketDidCloseEmitter: Emitter<void> = new Emitter();
readonly onSocketDidClose: Event<void> = this.onSocketDidCloseEmitter.event;

protected readonly onHttpFallbackDidActivateEmitter: Emitter<void> = new Emitter();
readonly onHttpFallbackDidActivate: Event<void> = this.onHttpFallbackDidActivateEmitter.event;

static createProxy<T extends object>(container: interfaces.Container, path: string, arg?: object): JsonRpcProxy<T> {
return container.get(WebSocketConnectionProvider).createProxy<T>(path, arg);
}

@inject(MessageServiceFactory) protected readonly messageService: () => MessageService;
@inject(HttpFallbackOptions) protected readonly httpFallbackOptions: HttpFallbackOptions;

protected readonly socket: ReconnectingWebSocket;
protected useHttpFallback = false;
protected websocketErrorCounter = 0;
protected httpFallbackId = uuid();
protected httpFallbackDisconnected = true;

constructor() {
super();
const url = this.createWebSocketUrl(WebSocketChannel.wsPath);
const socket = this.createWebSocket(url);
socket.onerror = console.error;
socket.onerror = event => this.handleSocketError(event);
socket.onopen = () => {
this.fireSocketDidOpen();
};
Expand All @@ -61,28 +95,93 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
this.fireSocketDidClose();
};
socket.onmessage = ({ data }) => {
this.websocketErrorCounter = 0;
this.handleIncomingRawMessage(data);
};
this.socket = socket;
window.addEventListener('offline', () => this.tryReconnect());
window.addEventListener('online', () => this.tryReconnect());
}

handleSocketError(event: unknown): void {
this.websocketErrorCounter += 1;
if (this.httpFallbackOptions.allowed && this.websocketErrorCounter >= this.httpFallbackOptions.maxAttempts) {
this.useHttpFallback = true;
this.socket.close();
const httpUrl = this.createHttpWebSocketUrl(WebSocketChannel.wsPath);
this.onHttpFallbackDidActivateEmitter.fire(undefined);
this.doLongPolling(httpUrl);
this.messageService().warn(
'Could not establish a websocket connection. The application will be using the HTTP fallback mode. This may affect performance and the behavior of some features.'
);
}
console.error(event);
}

async doLongPolling(url: string): Promise<void> {
let timeoutDuration = this.httpFallbackOptions.requestTimeout;
const controller = new AbortController();
const pollingId = window.setTimeout(() => controller.abort(), this.httpFallbackOptions.pollingTimeout);
try {
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
signal: controller.signal,
keepalive: true,
body: JSON.stringify({ id: this.httpFallbackId, polling: true })
});
if (response.status === 200) {
window.clearTimeout(pollingId);
if (this.httpFallbackDisconnected) {
this.fireSocketDidOpen();
}
const json: string[] = await response.json();
if (Array.isArray(json)) {
for (const item of json) {
this.handleIncomingRawMessage(item);
}
} else {
throw new Error('Received invalid long polling response.');
}
} else {
timeoutDuration = this.httpFallbackOptions.errorTimeout;
this.httpFallbackDisconnected = true;
this.fireSocketDidClose();
throw new Error('Response has error code: ' + response.status);
}
} catch (e) {
console.error('Error occured during long polling', e);
}
setTimeout(() => this.doLongPolling(url), timeoutDuration);
}

openChannel(path: string, handler: (channel: WebSocketChannel) => void, options?: WebSocketOptions): void {
if (this.socket.readyState === WebSocket.OPEN) {
if (this.useHttpFallback || this.socket.readyState === WebSocket.OPEN) {
super.openChannel(path, handler, options);
} else {
const openChannel = () => {
this.socket.removeEventListener('open', openChannel);
this.openChannel(path, handler, options);
};
this.socket.addEventListener('open', openChannel);
this.onHttpFallbackDidActivate(openChannel);
}
}

protected createChannel(id: number): WebSocketChannel {
const httpUrl = this.createHttpWebSocketUrl(WebSocketChannel.wsPath);
return new WebSocketChannel(id, content => {
if (this.socket.readyState < WebSocket.CLOSING) {
if (this.useHttpFallback) {
fetch(httpUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ id: this.httpFallbackId, content })
});
} else if (this.socket.readyState < WebSocket.CLOSING) {
this.socket.send(content);
}
});
Expand All @@ -96,6 +195,11 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
return endpoint.getWebSocketUrl().toString();
}

protected createHttpWebSocketUrl(path: string): string {
const endpoint = new Endpoint({ path });
return endpoint.getRestUrl().toString();
}

/**
* Creates a web socket for the given url
*/
Expand All @@ -119,7 +223,7 @@ export class WebSocketConnectionProvider extends AbstractConnectionProvider<WebS
}

protected tryReconnect(): void {
if (this.socket.readyState !== WebSocket.CONNECTING) {
if (!this.useHttpFallback && this.socket.readyState !== WebSocket.CONNECTING) {
this.socket.reconnect();
}
}
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/common/message-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import {
} from './message-service-protocol';
import { CancellationTokenSource } from './cancellation';

export const MessageServiceFactory = Symbol('MessageServiceFactory');

/**
* Service to log and categorize messages, show progress information and offer actions.
*
Expand Down
15 changes: 13 additions & 2 deletions packages/core/src/common/promise-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,23 @@ import { CancellationToken, cancelled } from './cancellation';
* An object that exposes a promise and functions to resolve and reject it.
*/
export class Deferred<T> {
state: 'resolved' | 'rejected' | 'unresolved' = 'unresolved';
resolve: (value?: T) => void;
reject: (err?: any) => void; // eslint-disable-line @typescript-eslint/no-explicit-any

promise = new Promise<T>((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
this.resolve = result => {
resolve(result as T);
if (this.state === 'unresolved') {
this.state = 'resolved';
}
};
this.reject = err => {
reject(err);
if (this.state === 'unresolved') {
this.state = 'rejected';
}
};
});
}

Expand Down
103 changes: 103 additions & 0 deletions packages/core/src/node/messaging/http-websocket-adapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/********************************************************************************
* Copyright (C) 2021 TypeFox and others.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the Eclipse
* Public License v. 2.0 are satisfied: GNU General Public License, version 2
* with the GNU Classpath Exception which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************/

import * as ws from 'ws';
import { inject, injectable } from 'inversify';
import { CancellationTokenSource } from '../../common';
import { Deferred, timeout } from '../../common/promise-util';

export const HttpWebsocketAdapterFactory = Symbol('HttpWebsocketAdapterFactory');
export const HttpWebsocketAdapterTimeout = Symbol('HttpWebsocketAdapterTimeout');

export const DEFAULT_HTTP_WEBSOCKET_ADAPTER_TIMEOUT = 4000;

@injectable()
export class HttpWebsocketAdapter {

@inject(HttpWebsocketAdapterTimeout)
protected readonly adapterTimeout: number;

readyState: number = ws.OPEN;
alive: boolean = true;

protected pendingTimeout?: CancellationTokenSource;
protected pendingMessages: unknown[] = [];
protected deferredMessageHandler: Deferred<unknown[]> = new Deferred();

getPendingMessages(): Promise<unknown[]> {
this.alive = true;
this.deferredMessageHandler = new Deferred();
if (!this.pendingMessages.length) {
this.pendingTimeout = new CancellationTokenSource();
timeout(this.adapterTimeout, this.pendingTimeout.token)
.then(() => this.deferredMessageHandler.resolve([]))
.catch(() => { });
} else {
this.deferredMessageHandler.resolve(this.pendingMessages);
this.pendingMessages = [];
}
return this.deferredMessageHandler.promise;
}

protected _onerror: (error: Error) => void;
protected _onclose: (code?: number, reason?: string) => void;
protected _onmessage: (data: string) => void;

onerror(error: Error): void {
if (this._onerror) {
this._onerror(error);
}
}

onclose(code?: number, reason?: string): void {
this.readyState = ws.CLOSING;
if (this._onclose) {
this._onclose(code, reason);
}
this.readyState = ws.CLOSED;
}

onmessage(data: string): void {
if (this._onmessage) {
this._onmessage(data);
}
}

send(data: unknown): void {
this.pendingMessages.push(data);
if (this.deferredMessageHandler.state === 'unresolved') {
this.pendingTimeout?.cancel();
this.deferredMessageHandler.resolve(this.pendingMessages);
this.pendingMessages = [];
}
}

// Events
on(event: 'close', listener: (this: WebSocket, code?: number, reason?: string) => void): this;
on(event: 'error', listener: (this: WebSocket, err: Error) => void): this;
on(event: 'message', listener: (this: WebSocket, data: ws.Data) => void): this;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
on(event: string | symbol, listener: (this: WebSocket, ...args: any[]) => void): this {
if (event === 'error') {
this.onerror = listener;
} else if (event === 'message') {
this.onmessage = listener;
} else if (event === 'close') {
this.onclose = listener;
}
return this;
}
}
4 changes: 4 additions & 0 deletions packages/core/src/node/messaging/messaging-backend-module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { MessagingContribution, MessagingContainer } from './messaging-contribut
import { ConnectionContainerModule } from './connection-container-module';
import { MessagingService } from './messaging-service';
import { MessagingListener, MessagingListenerContribution } from './messaging-listeners';
import { DEFAULT_HTTP_WEBSOCKET_ADAPTER_TIMEOUT, HttpWebsocketAdapter, HttpWebsocketAdapterFactory, HttpWebsocketAdapterTimeout } from './http-websocket-adapter';

export const messagingBackendModule = new ContainerModule(bind => {
bindContributionProvider(bind, ConnectionContainerModule);
Expand All @@ -34,4 +35,7 @@ export const messagingBackendModule = new ContainerModule(bind => {
bind(BackendApplicationContribution).toService(MessagingContribution);
bind(MessagingListener).toSelf().inSingletonScope();
bindContributionProvider(bind, MessagingListenerContribution);
bind(HttpWebsocketAdapter).toSelf();
bind(HttpWebsocketAdapterFactory).toFactory(({ container }) => () => container.get(HttpWebsocketAdapter));
bind(HttpWebsocketAdapterTimeout).toConstantValue(DEFAULT_HTTP_WEBSOCKET_ADAPTER_TIMEOUT);
});
Loading

0 comments on commit 5e5b015

Please sign in to comment.