diff --git a/src/base/BaseExecutor.ts b/src/base/BaseExecutor.ts index e773b665..d6edd55a 100644 --- a/src/base/BaseExecutor.ts +++ b/src/base/BaseExecutor.ts @@ -15,7 +15,6 @@ import { } from './Executor' const log = getLogger('executa:base-executor') -const notifications = getLogger('executa:notifications') const ajv = new Ajv() @@ -424,7 +423,7 @@ export class BaseExecutor implements Executor { } /** - * @inheritdoc + * @override * * Implementation of `Executor.notify` which performs a * notification. Currently by calling a method of `log`. @@ -435,7 +434,7 @@ export class BaseExecutor implements Executor { case 'info': case 'warn': case 'error': - notifications[subject](message) + log[subject](message) } } diff --git a/src/base/Client.ts b/src/base/Client.ts index b5fbd93d..b8b41660 100644 --- a/src/base/Client.ts +++ b/src/base/Client.ts @@ -4,6 +4,11 @@ import { JsonRpcError, JsonRpcErrorCode } from './JsonRpcError' import { JsonRpcRequest } from './JsonRpcRequest' import { JsonRpcResponse } from './JsonRpcResponse' import { InternalError } from './InternalError' +import { getLogger } from '@stencila/logga' + +const log = getLogger('executa:client') + +const notifications = getLogger('executa:client:notifs') /** * A client to a remote, out of process, `Executor`. @@ -132,24 +137,46 @@ export abstract class Client implements Executor { * when a response is returned. Uses the `id` of the response to match it to the corresponding * request and resolve it's promise. * - * @param response The JSON-RPC response + * @param message A JSON-RPC response (to a request) or a notification. */ - protected receive(response: string | JsonRpcResponse): void { - if (typeof response === 'string') - response = JSON.parse(response) as JsonRpcResponse - if (response.id < 0) + protected receive(message: string | JsonRpcResponse | JsonRpcRequest): void { + if (typeof message === 'string') + message = JSON.parse(message) as JsonRpcResponse | JsonRpcRequest + const { id } = message + + if (id === undefined) { + // A notification request + const { method, params = [] } = message as JsonRpcRequest + const msg = Object.values(params)[0] + switch (method) { + case 'debug': + case 'info': + case 'warn': + case 'error': + notifications[method](msg) + return + default: + notifications.info(`${method}:${msg}`) + return + } + } + + // Must be a response.... + message = message as JsonRpcResponse + if (id < 0) + // A response with accidentally missing id throw new JsonRpcError( JsonRpcErrorCode.InternalError, - `Response is missing id: ${response}` + `Response is missing id: ${message}` ) - const resolve = this.requests[response.id] + const resolve = this.requests[id] if (resolve === undefined) throw new JsonRpcError( JsonRpcErrorCode.InternalError, - `No request found for response with id: ${response.id}` + `No request found for response with id: ${id}` ) - resolve(response) - delete this.requests[response.id] + resolve(message) + delete this.requests[id] } /** diff --git a/src/base/Server.ts b/src/base/Server.ts index 8432da64..ea3cc890 100644 --- a/src/base/Server.ts +++ b/src/base/Server.ts @@ -24,6 +24,20 @@ export abstract class Server { */ public abstract get address(): Address + /** + * Send a notification to one or all clients. + * + * @param subject The notification subject + * @param message The notification message + * @param clients The clients to send the notification to + * + * @see Executor#notify + * @see Client#notify + */ + public notify(subject: string, message: string, clients?: string[]): void { + // Only servers that have persistent connections can implement this + } + /** * Receive a request or notification * diff --git a/src/direct/DirectClient.ts b/src/direct/DirectClient.ts index 5a5b2dce..cd57b3e8 100644 --- a/src/direct/DirectClient.ts +++ b/src/direct/DirectClient.ts @@ -1,21 +1,26 @@ import { Client } from '../base/Client' import { JsonRpcRequest } from '../base/JsonRpcRequest' -import { Server } from '../base/Server' import { DirectAddress } from '../base/Transports' +import { DirectServer } from './DirectServer' export class DirectClient extends Client { - private server: Server + private server: DirectServer public constructor(address: Omit) { super() this.server = address.server + this.server.client = this } protected send(request: JsonRpcRequest): void { - // @ts-ignore server.receive is private - // eslint-disable-next-line @typescript-eslint/no-floating-promises - this.server.receive(request).then(response => { - if (response !== undefined) this.receive(response) - }) + this.server + // @ts-ignore server.receive is private + .receive(request) + .then(response => { + if (response !== undefined) this.receive(response) + }) + .catch(error => { + throw error + }) } } diff --git a/src/direct/DirectClientServer.test.ts b/src/direct/DirectClientServer.test.ts index 8457fe6a..52dd9bc8 100644 --- a/src/direct/DirectClientServer.test.ts +++ b/src/direct/DirectClientServer.test.ts @@ -31,4 +31,10 @@ test('DirectClient and DirectServer', async () => { // There should be no more requests waiting for a response // @ts-ignore that client.requests is private expect(Object.keys(client.requests).length).toEqual(0) + + // Notify the server + client.notify('info', 'A message from client to server') + + // Notify the client + server.notify('info', 'A message from server to client') }) diff --git a/src/direct/DirectServer.ts b/src/direct/DirectServer.ts index 0affc138..3f38af27 100644 --- a/src/direct/DirectServer.ts +++ b/src/direct/DirectServer.ts @@ -1,11 +1,24 @@ import { Server } from '../base/Server' import { DirectAddress, Transport } from '../base/Transports' +import { Client } from '../base/Client' +import { InternalError } from '../base/InternalError' +import { JsonRpcRequest } from '../base/JsonRpcRequest' export class DirectServer extends Server { + client?: Client + public get address(): DirectAddress { return { type: Transport.direct, server: this } } + + public notify(subject: string, message: string): void { + if (this.client === undefined) + throw new InternalError('No client connected!') + const notification = new JsonRpcRequest(subject, { message }, false) + // @ts-ignore client.receive is private + this.client.receive(notification) + } } diff --git a/src/stream/StreamServer.ts b/src/stream/StreamServer.ts index 0e92bedb..74c3b854 100644 --- a/src/stream/StreamServer.ts +++ b/src/stream/StreamServer.ts @@ -34,4 +34,9 @@ export abstract class StreamServer extends Server { if (typeof data !== 'string') data = JSON.stringify(data) this.encoder.write(data) } + + public notify(subject: string, message: string): void { + const notification = new JsonRpcRequest(subject, { message }, false) + this.send(notification) + } } diff --git a/src/tcp/TcpClientServer.test.ts b/src/tcp/TcpClientServer.test.ts index 1cc8fb2b..2183ea3e 100644 --- a/src/tcp/TcpClientServer.test.ts +++ b/src/tcp/TcpClientServer.test.ts @@ -1,6 +1,7 @@ import { TcpClient } from './TcpClient' import { TcpServer } from './TcpServer' import { testClient } from '../test/testClient' +import { delay } from '../test/delay' test('TcpClient and TcpServer', async () => { const server = new TcpServer() @@ -8,8 +9,13 @@ test('TcpClient and TcpServer', async () => { const client1 = new TcpClient(server.address) const client2 = new TcpClient(server.address) + await testClient(client1) await testClient(client2) + + server.notify('info', 'A notification from TCP server to clients') + await delay(100) + await client1.stop() await client2.stop() diff --git a/src/tcp/TcpServer.ts b/src/tcp/TcpServer.ts index aac01e21..5cd0d9cb 100644 --- a/src/tcp/TcpServer.ts +++ b/src/tcp/TcpServer.ts @@ -5,8 +5,7 @@ import { Executor, User } from '../base/Executor' import { TcpAddress, TcpAddressInitializer } from '../base/Transports' import { StreamServer } from '../stream/StreamServer' import { Server } from '../base/Server' -import { JsonRpcResponse } from '../base/JsonRpcResponse' -import { JsonRpcRequest } from '../base/JsonRpcRequest' +import { Connection } from '../base/Connection' const log = getLogger('executa:tcp:server') @@ -42,21 +41,6 @@ export class TcpConnection extends StreamServer implements Connection { return new TcpAddress() } - public start(executor?: Executor): Promise { - return super.start(executor, this.socket, this.socket) - } - - /** - * Send a notification to the client. - * - * This method has the same signature as `Executor.notify` - * and a similar implementation to `Client.notify`. - */ - public notify(subject: string, message: string): void { - const notification = new JsonRpcRequest(subject, { message }, false) - this.send(notification) - } - public stop(): Promise { this.socket.destroy() return Promise.resolve() @@ -108,7 +92,9 @@ export class TcpServer extends Server { socket.on('close', () => this.onDisconnected(connection)) // Handle messages from connection - connection.start(executor).catch(error => log.error(error)) + connection + .start(executor, socket, socket) + .catch(error => log.error(error)) }) const { host, port } = this.address @@ -116,6 +102,14 @@ export class TcpServer extends Server { } } + public notify(subject: string, message: string, clients?: string[]): void { + if (clients === undefined) clients = Object.keys(this.connections) + for (const client of clients) { + const connection = this.connections[client] + if (connection !== undefined) connection.notify(subject, message) + } + } + public async stop(): Promise { if (this.server !== undefined) { const url = this.address.url() diff --git a/src/test/delay.ts b/src/test/delay.ts new file mode 100644 index 00000000..63899473 --- /dev/null +++ b/src/test/delay.ts @@ -0,0 +1,2 @@ +export const delay = async (milliseconds: number) => + new Promise(resolve => setTimeout(resolve, milliseconds)) diff --git a/src/ws/WebSocketClientServer.test.ts b/src/ws/WebSocketClientServer.test.ts index 02f682bb..37bbc606 100644 --- a/src/ws/WebSocketClientServer.test.ts +++ b/src/ws/WebSocketClientServer.test.ts @@ -6,6 +6,7 @@ import { EchoExecutor } from '../test/EchoExecutor' import { testClient } from '../test/testClient' import { WebSocketClient } from './WebSocketClient' import { WebSocketServer } from './WebSocketServer' +import { delay } from '../test/delay' const JWT_SECRET = 'not-a-secret-at-all' @@ -21,6 +22,13 @@ test('WebSocketClient and WebSocketServer', async () => { } }) + let clientNotifs: LogData[] = [] + addHandler((logData: LogData) => { + if (logData.tag === 'executa:client:notifs') { + clientNotifs = [...clientNotifs, logData] + } + }) + const server = new WebSocketServer() const executor = new EchoExecutor() await server.start(executor) @@ -59,7 +67,7 @@ test('WebSocketClient and WebSocketServer', async () => { { // Client with malformed JWT const _ = new WebSocketClient({ ...server.address, jwt: 'jwhaaaat?' }) - await new Promise(resolve => setTimeout(resolve, 100)) + await delay(100) expect(clientLog.message).toMatch(/Unexpected server response: 401/) } @@ -69,9 +77,30 @@ test('WebSocketClient and WebSocketServer', async () => { ...server.address, jwt: JWT.sign({}, 'not-the-right-secret') }) - await new Promise(resolve => setTimeout(resolve, 100)) + await delay(100) expect(clientLog.message).toMatch(/Unexpected server response: 401/) } + { + const client1 = new WebSocketClient(server.address) + const client2 = new WebSocketClient(server.address) + const client3 = new WebSocketClient(server.address) + await delay(10) + + // Server notification to several clients + server.notify('debug', 'To all clients') + await delay(1) + expect(clientNotifs.length).toBe(4) // included global client + clientNotifs = [] + + // Server notification to some clients + // @ts-ignore that connections is protected + const clients = Object.keys(server.connections).slice(2) + server.notify('debug', 'To all clients', clients) + await delay(1) + expect(clientNotifs.length).toBe(2) + clientNotifs = [] + } + await server.stop() })