Skip to content

Commit

Permalink
feat: async termination of connection (using ack from server) (#1022)
Browse files Browse the repository at this point in the history
* feat: cleanup invalid files

* feat: cleanup

* feat: cleanup

* feat: cleanup

* feat: cleanup test

* feat: catching errors

* feat: unit tests

* fix: unit tests

* feat: test and cleanup

* docs: comments
  • Loading branch information
abretonc7s authored Sep 19, 2024
1 parent 1e41819 commit 7c1894a
Show file tree
Hide file tree
Showing 31 changed files with 381 additions and 293 deletions.
75 changes: 55 additions & 20 deletions packages/sdk-communication-layer/src/KeyExchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,17 @@ export class KeyExchange extends EventEmitter2 {
this.setOtherPublicKey(message.pubkey);
}

this.communicationLayer.sendMessage({
type: KeyExchangeMessageType.KEY_HANDSHAKE_SYNACK,
pubkey: this.myPublicKey,
});
this.communicationLayer
.sendMessage({
type: KeyExchangeMessageType.KEY_HANDSHAKE_SYNACK,
pubkey: this.myPublicKey,
})
.catch((error) => {
logger.KeyExchange(
`[KeyExchange: onKeyExchangeMessage()] Error sending KEY_HANDSHAKE_SYNACK`,
error,
);
});

this.setStep(KeyExchangeMessageType.KEY_HANDSHAKE_ACK);
} else if (message.type === KeyExchangeMessageType.KEY_HANDSHAKE_SYNACK) {
Expand All @@ -140,9 +147,16 @@ export class KeyExchange extends EventEmitter2 {
this.setOtherPublicKey(message.pubkey);
}

this.communicationLayer.sendMessage({
type: KeyExchangeMessageType.KEY_HANDSHAKE_ACK,
});
this.communicationLayer
.sendMessage({
type: KeyExchangeMessageType.KEY_HANDSHAKE_ACK,
})
.catch((error) => {
logger.KeyExchange(
`[KeyExchange: onKeyExchangeMessage()] Error sending KEY_HANDSHAKE_ACK`,
error,
);
});
this.keysExchanged = true;
// Reset step value for next exchange.
this.setStep(KeyExchangeMessageType.KEY_HANDSHAKE_ACK);
Expand Down Expand Up @@ -212,19 +226,33 @@ export class KeyExchange extends EventEmitter2 {
if (!this.keysExchanged || force === true) {
if (v2Protocol) {
// Ask to start exchange only if not already in progress
this.communicationLayer.sendMessage({
type: KeyExchangeMessageType.KEY_HANDSHAKE_SYNACK,
pubkey: this.myPublicKey,
v: PROTOCOL_VERSION,
});
this.communicationLayer
.sendMessage({
type: KeyExchangeMessageType.KEY_HANDSHAKE_SYNACK,
pubkey: this.myPublicKey,
v: PROTOCOL_VERSION,
})
.catch((error) => {
logger.KeyExchange(
`[KeyExchange: start()] Error sending KEY_HANDSHAKE_SYNACK`,
error,
);
});
// Ignore completion --- already consider keys exchanged completed in case mobile to mobile and the client was disconnected
// We need to be able to send the walletInfo onto the relayer.
// TODO: this.keysExchanged = true;
} else {
// Ask to start exchange only if not already in progress
this.communicationLayer.sendMessage({
type: KeyExchangeMessageType.KEY_HANDSHAKE_START,
});
this.communicationLayer
.sendMessage({
type: KeyExchangeMessageType.KEY_HANDSHAKE_START,
})
.catch((error) => {
logger.KeyExchange(
`[KeyExchange: start()] Error sending KEY_HANDSHAKE_START`,
error,
);
});
this.clean();
}
} else {
Expand Down Expand Up @@ -263,11 +291,18 @@ export class KeyExchange extends EventEmitter2 {
// except a SYN_ACK for next step
this.setStep(KeyExchangeMessageType.KEY_HANDSHAKE_SYNACK);
// From v0.2.0, we Always send the public key because exchange can be restarted at any time.
this.communicationLayer.sendMessage({
type: KeyExchangeMessageType.KEY_HANDSHAKE_SYN,
pubkey: this.myPublicKey,
v: PROTOCOL_VERSION,
});
this.communicationLayer
.sendMessage({
type: KeyExchangeMessageType.KEY_HANDSHAKE_SYN,
pubkey: this.myPublicKey,
v: PROTOCOL_VERSION,
})
.catch((error) => {
logger.KeyExchange(
`[KeyExchange: start()] Error sending KEY_HANDSHAKE_SYN`,
error,
);
});
}

setStep(step: KeyExchangeMessageType): void {
Expand Down
16 changes: 4 additions & 12 deletions packages/sdk-communication-layer/src/RemoteCommunication.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import packageJson from '../package.json';
import { RemoteCommunication } from './RemoteCommunication';
import { SocketService } from './SocketService';
import { clean } from './services/RemoteCommunication/ChannelManager';
import { resume } from './services/RemoteCommunication/ConnectionManager';
import { sendMessage } from './services/RemoteCommunication/MessageHandlers';
import { testStorage } from './services/RemoteCommunication/StorageManager';
import { CommunicationLayerPreference } from './types/CommunicationLayerPreference';
Expand Down Expand Up @@ -125,13 +124,13 @@ describe('RemoteCommunication', () => {
});

describe('ping', () => {
it('should call communicationLayer ping method if defined and debug state is true', () => {
it('should call communicationLayer ping method if defined and debug state is true', async () => {
const pingMock = jest.fn();
remoteCommunicationInstance.state.communicationLayer = {
ping: pingMock,
} as unknown as SocketService;
remoteCommunicationInstance.state.debug = true;
remoteCommunicationInstance.ping();
await remoteCommunicationInstance.ping();
expect(pingMock).toHaveBeenCalled();
});
});
Expand Down Expand Up @@ -263,15 +262,15 @@ describe('RemoteCommunication', () => {
});

describe('pause', () => {
it('should call pause on the communicationLayer and set connection status to PAUSED', () => {
it('should call pause on the communicationLayer and set connection status to PAUSED', async () => {
const pauseMock = jest.fn();
const getKeyInfoMock = jest.fn().mockReturnValue({ key: 'key' });
const loggerSpy = jest.spyOn(logger, 'RemoteCommunication');
remoteCommunicationInstance.state.communicationLayer = {
getKeyInfo: getKeyInfoMock,
pause: pauseMock,
} as unknown as SocketService;
remoteCommunicationInstance.pause();
await remoteCommunicationInstance.pause();
expect(pauseMock).toHaveBeenCalled();
expect(remoteCommunicationInstance.state._connectionStatus).toBe(
ConnectionStatus.PAUSED,
Expand All @@ -291,13 +290,6 @@ describe('RemoteCommunication', () => {
});
});

describe('resume', () => {
it('should call resume with the current instance', async () => {
await remoteCommunicationInstance.resume();
expect(resume).toHaveBeenCalledWith(remoteCommunicationInstance);
});
});

describe('getChannelId', () => {
it('should return the channelId state', () => {
remoteCommunicationInstance.state.channelId = mockChannelId;
Expand Down
14 changes: 7 additions & 7 deletions packages/sdk-communication-layer/src/RemoteCommunication.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ export class RemoteCommunication extends EventEmitter2 {
});
}

sendMessage(message: CommunicationLayerMessage): Promise<void> {
sendMessage(message: CommunicationLayerMessage): Promise<boolean> {
return sendMessage(this, message);
}

Expand Down Expand Up @@ -314,12 +314,12 @@ export class RemoteCommunication extends EventEmitter2 {
return this.state.communicationLayer;
}

ping() {
async ping() {
logger.RemoteCommunication(
`[RemoteCommunication: ping()] channel=${this.state.channelId}`,
);

this.state.communicationLayer?.ping();
await this.state.communicationLayer?.ping();
}

testLogger() {
Expand Down Expand Up @@ -384,12 +384,12 @@ export class RemoteCommunication extends EventEmitter2 {
}
}

pause() {
async pause() {
logger.RemoteCommunication(
`[RemoteCommunication: pause()] channel=${this.state.channelId}`,
);

this.state.communicationLayer?.pause();
await this.state.communicationLayer?.pause();
this.setConnectionStatus(ConnectionStatus.PAUSED);
}

Expand All @@ -401,7 +401,7 @@ export class RemoteCommunication extends EventEmitter2 {
return this.state.relayPersistence ?? false;
}

resume() {
async resume() {
return resume(this);
}

Expand Down Expand Up @@ -439,7 +439,7 @@ export class RemoteCommunication extends EventEmitter2 {
});
}

disconnect(options?: DisconnectOptions) {
async disconnect(options?: DisconnectOptions): Promise<boolean> {
return disconnect({
options,
instance: this,
Expand Down
16 changes: 8 additions & 8 deletions packages/sdk-communication-layer/src/SocketService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,19 @@ describe('SocketService', () => {
expect(keyInfo).toStrictEqual(mockKeyInfo);
});

it('should send a message', () => {
it('should send a message', async () => {
const mockMessage = {
type: MessageType.PING,
data: {},
} as CommunicationLayerMessage;

socketService.sendMessage(mockMessage);
await socketService.sendMessage(mockMessage);

expect(handleSendMessage).toHaveBeenCalledWith(socketService, mockMessage);
});

it('should send a ping to verify connection', () => {
socketService.ping();
it('should send a ping to verify connection', async () => {
await socketService.ping();
expect(ping).toHaveBeenCalledWith(socketService);
});

Expand All @@ -108,13 +108,13 @@ describe('SocketService', () => {
expect(isConnected).toBe(true);
});

it('should pause the connection', () => {
socketService.pause();
it('should pause the connection', async () => {
await socketService.pause();
expect(pause).toHaveBeenCalledWith(socketService);
});

it('should resume a paused connection', () => {
socketService.resume();
it('should resume a paused connection', async () => {
await socketService.resume();
expect(resume).toHaveBeenCalledWith(socketService);
});

Expand Down
6 changes: 3 additions & 3 deletions packages/sdk-communication-layer/src/SocketService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,23 +196,23 @@ export class SocketService extends EventEmitter2 {
return this.state.keyExchange as KeyExchange;
}

sendMessage(message: CommunicationLayerMessage): void {
async sendMessage(message: CommunicationLayerMessage): Promise<boolean> {
return handleSendMessage(this, message);
}

ping() {
return ping(this);
}

pause(): void {
pause(): Promise<void> {
return pause(this);
}

isConnected() {
return this.state.socket?.connected as boolean;
}

resume(): void {
resume(): Promise<void> {
return resume(this);
}

Expand Down
Loading

0 comments on commit 7c1894a

Please sign in to comment.