Skip to content

Commit

Permalink
feat: Implement WebSocket pings
Browse files Browse the repository at this point in the history
First half of #325
  • Loading branch information
gnarea committed Jul 26, 2021
1 parent 796064c commit 367f2e9
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 5 deletions.
14 changes: 10 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@
"devDependencies": {
"@relaycorp/relaynet-poweb": "^1.6.3",
"@relaycorp/shared-config": "^1.5.3",
"@relaycorp/ws-mock": "^2.0.3",
"@relaycorp/ws-mock": "^2.1.0",
"@semantic-release/exec": "^5.0.0",
"@types/jest": "^26.0.24",
"@types/mongoose": "^5.10.5",
"@types/pkijs": "^0.0.10",
"@types/split2": "^3.2.1",
"@types/verror": "^1.10.5",
"@types/ws": "^7.4.6",
"date-fns": "^2.23.0",
"fastify-plugin": "^3.0.0",
"jest": "^26.6.3",
"jest-extended": "^0.11.5",
Expand Down
10 changes: 10 additions & 0 deletions src/_test_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,13 @@ export async function generateCCA(
);
return Buffer.from(await cca.serialize(privateGatewayPrivateKey));
}

export function useFakeTimers(): void {
beforeEach(() => {
jest.useFakeTimers('modern');
});

afterEach(() => {
jest.useRealTimers();
});
}
44 changes: 44 additions & 0 deletions src/services/poweb/parcelCollection.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
import { CloseFrame, createMockWebSocketStream, MockClient } from '@relaycorp/ws-mock';
import AbortController from 'abort-controller';
import bufferToArray from 'buffer-to-arraybuffer';
import { addSeconds } from 'date-fns';
import { EventEmitter } from 'events';
import uuid from 'uuid-random';
import WS, { Server as WSServer } from 'ws';
Expand All @@ -25,6 +26,7 @@ import {
mockSpy,
partialPinoLog,
partialPinoLogger,
useFakeTimers,
UUID4_REGEX,
} from '../../_test_utils';
import {
Expand Down Expand Up @@ -626,6 +628,48 @@ test('Abrupt TCP connection closure should be handled gracefully', async () => {
);
});

describe('Pings', () => {
useFakeTimers();

test('Server should send ping every 10 seconds', async () => {
const client = new MockPoWebClient(mockWSServer, StreamingMode.KEEP_ALIVE);
await client.connect();
const connectionDate = new Date();

jest.advanceTimersByTime(10_100);
const [ping1] = client.incomingPings;
expect(ping1.date).toBeAfter(addSeconds(connectionDate, 9));
expect(ping1.date).toBeBefore(addSeconds(connectionDate, 11));

jest.advanceTimersByTime(10_000);
const [, ping2] = client.incomingPings;
expect(ping2.date).toBeAfter(addSeconds(connectionDate, 19));
expect(ping2.date).toBeBefore(addSeconds(connectionDate, 21));
});

test('Ping should be logged', async () => {
const client = new MockPoWebClient(mockWSServer, StreamingMode.KEEP_ALIVE);
await client.connect();

jest.advanceTimersByTime(10_100);
expect(mockLogging.logs).toContainEqual(
partialPinoLog('debug', 'Sending ping to client', {
reqId: UUID4_REGEX,
}),
);
await client.close();
});

test('Pings should stop when connection is closed', async () => {
const client = new MockPoWebClient(mockWSServer, StreamingMode.KEEP_ALIVE);
await client.connect();
await client.close();

jest.advanceTimersByTime(10_100);
expect(client.incomingPings).toHaveLength(0);
});
});

function mockParcelStreamMessage(
parcelSerialized: Buffer,
ackCallback: () => void = () => undefined,
Expand Down
11 changes: 11 additions & 0 deletions src/services/poweb/parcelCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import { WebSocketCode } from './websockets';
// The largest payload the client could send is the handshake response, which should be < 1.9 kib
const MAX_PAYLOAD = 2 * 1024;

const WEBSOCKET_PING_INTERVAL_MS = 10_000;

interface PendingACK {
readonly ack: () => Promise<void>;
readonly parcelObjectKey: string;
Expand Down Expand Up @@ -92,6 +94,15 @@ function makeConnectionHandler(
);
return;
}

const pingIntervalId = setInterval(() => {
requestAwareLogger.debug('Sending ping to client');
wsConnection.ping();
}, WEBSOCKET_PING_INTERVAL_MS);
wsConnection.once('close', () => {
clearInterval(pingIntervalId);
});

const abortController = makeAbortController(wsConnection, requestAwareLogger);

const peerGatewayAddress = await doHandshake(
Expand Down

0 comments on commit 367f2e9

Please sign in to comment.