Skip to content

Commit

Permalink
feat(Server notifs): Add server sent notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
nokome committed Nov 1, 2019
1 parent a808d11 commit 6bbdc46
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 40 deletions.
5 changes: 2 additions & 3 deletions src/base/BaseExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import {
} from './Executor'

const log = getLogger('executa:base-executor')
const notifications = getLogger('executa:notifications')

const ajv = new Ajv()

Expand Down Expand Up @@ -424,7 +423,7 @@ export class BaseExecutor implements Executor {
}

/**
* @inheritdoc
* @override
*
* Implementation of `Executor.notify` which performs a
* notification. Currently by calling a method of `log`.
Expand All @@ -435,7 +434,7 @@ export class BaseExecutor implements Executor {
case 'info':
case 'warn':
case 'error':
notifications[subject](message)
log[subject](message)
}
}

Expand Down
47 changes: 37 additions & 10 deletions src/base/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ import { JsonRpcError, JsonRpcErrorCode } from './JsonRpcError'
import { JsonRpcRequest } from './JsonRpcRequest'
import { JsonRpcResponse } from './JsonRpcResponse'
import { InternalError } from './InternalError'
import { getLogger } from '@stencila/logga'

const log = getLogger('executa:client')

const notifications = getLogger('executa:client:notifs')

/**
* A client to a remote, out of process, `Executor`.
Expand Down Expand Up @@ -132,24 +137,46 @@ export abstract class Client implements Executor {
* when a response is returned. Uses the `id` of the response to match it to the corresponding
* request and resolve it's promise.
*
* @param response The JSON-RPC response
* @param message A JSON-RPC response (to a request) or a notification.
*/
protected receive(response: string | JsonRpcResponse): void {
if (typeof response === 'string')
response = JSON.parse(response) as JsonRpcResponse
if (response.id < 0)
protected receive(message: string | JsonRpcResponse | JsonRpcRequest): void {
if (typeof message === 'string')
message = JSON.parse(message) as JsonRpcResponse | JsonRpcRequest
const { id } = message

if (id === undefined) {
// A notification request
const { method, params = [] } = message as JsonRpcRequest
const msg = Object.values(params)[0]
switch (method) {
case 'debug':
case 'info':
case 'warn':
case 'error':
notifications[method](msg)
return
default:
notifications.info(`${method}:${msg}`)
return
}
}

// Must be a response....
message = message as JsonRpcResponse
if (id < 0)
// A response with accidentally missing id
throw new JsonRpcError(
JsonRpcErrorCode.InternalError,
`Response is missing id: ${response}`
`Response is missing id: ${message}`
)
const resolve = this.requests[response.id]
const resolve = this.requests[id]
if (resolve === undefined)
throw new JsonRpcError(
JsonRpcErrorCode.InternalError,
`No request found for response with id: ${response.id}`
`No request found for response with id: ${id}`
)
resolve(response)
delete this.requests[response.id]
resolve(message)
delete this.requests[id]
}

/**
Expand Down
14 changes: 14 additions & 0 deletions src/base/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ export abstract class Server {
*/
public abstract get address(): Address

/**
* Send a notification to one or all clients.
*
* @param subject The notification subject
* @param message The notification message
* @param clients The clients to send the notification to
*
* @see Executor#notify
* @see Client#notify
*/
public notify(subject: string, message: string, clients?: string[]): void {
// Only servers that have persistent connections can implement this
}

/**
* Receive a request or notification
*
Expand Down
19 changes: 12 additions & 7 deletions src/direct/DirectClient.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
import { Client } from '../base/Client'
import { JsonRpcRequest } from '../base/JsonRpcRequest'
import { Server } from '../base/Server'
import { DirectAddress } from '../base/Transports'
import { DirectServer } from './DirectServer'

export class DirectClient extends Client {
private server: Server
private server: DirectServer

public constructor(address: Omit<DirectAddress, 'type'>) {
super()
this.server = address.server
this.server.client = this
}

protected send(request: JsonRpcRequest): void {
// @ts-ignore server.receive is private
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.server.receive(request).then(response => {
if (response !== undefined) this.receive(response)
})
this.server
// @ts-ignore server.receive is private
.receive(request)
.then(response => {
if (response !== undefined) this.receive(response)
})
.catch(error => {
throw error
})
}
}
6 changes: 6 additions & 0 deletions src/direct/DirectClientServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,10 @@ test('DirectClient and DirectServer', async () => {
// There should be no more requests waiting for a response
// @ts-ignore that client.requests is private
expect(Object.keys(client.requests).length).toEqual(0)

// Notify the server
client.notify('info', 'A message from client to server')

// Notify the client
server.notify('info', 'A message from server to client')
})
13 changes: 13 additions & 0 deletions src/direct/DirectServer.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
import { Server } from '../base/Server'
import { DirectAddress, Transport } from '../base/Transports'
import { Client } from '../base/Client'
import { InternalError } from '../base/InternalError'
import { JsonRpcRequest } from '../base/JsonRpcRequest'

export class DirectServer extends Server {
client?: Client

public get address(): DirectAddress {
return {
type: Transport.direct,
server: this
}
}

public notify(subject: string, message: string): void {
if (this.client === undefined)
throw new InternalError('No client connected!')
const notification = new JsonRpcRequest(subject, { message }, false)
// @ts-ignore client.receive is private
this.client.receive(notification)
}
}
5 changes: 5 additions & 0 deletions src/stream/StreamServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,9 @@ export abstract class StreamServer extends Server {
if (typeof data !== 'string') data = JSON.stringify(data)
this.encoder.write(data)
}

public notify(subject: string, message: string): void {
const notification = new JsonRpcRequest(subject, { message }, false)
this.send(notification)
}
}
6 changes: 6 additions & 0 deletions src/tcp/TcpClientServer.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
import { TcpClient } from './TcpClient'
import { TcpServer } from './TcpServer'
import { testClient } from '../test/testClient'
import { delay } from '../test/delay'

test('TcpClient and TcpServer', async () => {
const server = new TcpServer()
await server.start()

const client1 = new TcpClient(server.address)
const client2 = new TcpClient(server.address)

await testClient(client1)
await testClient(client2)

server.notify('info', 'A notification from TCP server to clients')
await delay(100)

await client1.stop()
await client2.stop()

Expand Down
30 changes: 12 additions & 18 deletions src/tcp/TcpServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import { Executor, User } from '../base/Executor'
import { TcpAddress, TcpAddressInitializer } from '../base/Transports'
import { StreamServer } from '../stream/StreamServer'
import { Server } from '../base/Server'
import { JsonRpcResponse } from '../base/JsonRpcResponse'
import { JsonRpcRequest } from '../base/JsonRpcRequest'
import { Connection } from '../base/Connection'

const log = getLogger('executa:tcp:server')

Expand Down Expand Up @@ -42,21 +41,6 @@ export class TcpConnection extends StreamServer implements Connection {
return new TcpAddress()
}

public start(executor?: Executor): Promise<void> {
return super.start(executor, this.socket, this.socket)
}

/**
* Send a notification to the client.
*
* This method has the same signature as `Executor.notify`
* and a similar implementation to `Client.notify`.
*/
public notify(subject: string, message: string): void {
const notification = new JsonRpcRequest(subject, { message }, false)
this.send(notification)
}

public stop(): Promise<void> {
this.socket.destroy()
return Promise.resolve()
Expand Down Expand Up @@ -108,14 +92,24 @@ export class TcpServer extends Server {
socket.on('close', () => this.onDisconnected(connection))

// Handle messages from connection
connection.start(executor).catch(error => log.error(error))
connection
.start(executor, socket, socket)
.catch(error => log.error(error))
})

const { host, port } = this.address
return new Promise(resolve => server.listen(port, host, () => resolve()))
}
}

public notify(subject: string, message: string, clients?: string[]): void {
if (clients === undefined) clients = Object.keys(this.connections)
for (const client of clients) {
const connection = this.connections[client]
if (connection !== undefined) connection.notify(subject, message)
}
}

public async stop(): Promise<void> {
if (this.server !== undefined) {
const url = this.address.url()
Expand Down
2 changes: 2 additions & 0 deletions src/test/delay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export const delay = async (milliseconds: number) =>
new Promise(resolve => setTimeout(resolve, milliseconds))
33 changes: 31 additions & 2 deletions src/ws/WebSocketClientServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { EchoExecutor } from '../test/EchoExecutor'
import { testClient } from '../test/testClient'
import { WebSocketClient } from './WebSocketClient'
import { WebSocketServer } from './WebSocketServer'
import { delay } from '../test/delay'

const JWT_SECRET = 'not-a-secret-at-all'

Expand All @@ -21,6 +22,13 @@ test('WebSocketClient and WebSocketServer', async () => {
}
})

let clientNotifs: LogData[] = []
addHandler((logData: LogData) => {
if (logData.tag === 'executa:client:notifs') {
clientNotifs = [...clientNotifs, logData]
}
})

const server = new WebSocketServer()
const executor = new EchoExecutor()
await server.start(executor)
Expand Down Expand Up @@ -59,7 +67,7 @@ test('WebSocketClient and WebSocketServer', async () => {
{
// Client with malformed JWT
const _ = new WebSocketClient({ ...server.address, jwt: 'jwhaaaat?' })
await new Promise(resolve => setTimeout(resolve, 100))
await delay(100)
expect(clientLog.message).toMatch(/Unexpected server response: 401/)
}

Expand All @@ -69,9 +77,30 @@ test('WebSocketClient and WebSocketServer', async () => {
...server.address,
jwt: JWT.sign({}, 'not-the-right-secret')
})
await new Promise(resolve => setTimeout(resolve, 100))
await delay(100)
expect(clientLog.message).toMatch(/Unexpected server response: 401/)
}

{
const client1 = new WebSocketClient(server.address)
const client2 = new WebSocketClient(server.address)
const client3 = new WebSocketClient(server.address)
await delay(10)

// Server notification to several clients
server.notify('debug', 'To all clients')
await delay(1)
expect(clientNotifs.length).toBe(4) // included global client
clientNotifs = []

// Server notification to some clients
// @ts-ignore that connections is protected
const clients = Object.keys(server.connections).slice(2)
server.notify('debug', 'To all clients', clients)
await delay(1)
expect(clientNotifs.length).toBe(2)
clientNotifs = []
}

await server.stop()
})

0 comments on commit 6bbdc46

Please sign in to comment.