From 7d4dd3fd80d2e97ec4b9dd7c70ad0df496c24b68 Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Thu, 11 Oct 2018 18:21:54 +0100 Subject: [PATCH] feat: send pings to detect timeouts --- src/commands/connect.ts | 21 ------------- src/commands/index.ts | 2 +- src/commands/internal.ts | 52 ++++++++++++++++++++++++++++++++ src/hyperdeck.ts | 65 ++++++++++++++++++++++++++++++---------- 4 files changed, 103 insertions(+), 37 deletions(-) create mode 100644 src/commands/internal.ts diff --git a/src/commands/connect.ts b/src/commands/connect.ts index ad89eac..a45d08a 100644 --- a/src/commands/connect.ts +++ b/src/commands/connect.ts @@ -1,25 +1,4 @@ -import { AsynchronousCode } from '../codes' -import { ResponseMessage } from '../message' -import { AbstractCommandBase } from './abstractCommand' - export interface ConnectionInfoResponse { ProtocolVersion: number Model: string } - -// Purpose of this is to emit the connect event with the connectionInfo -export class DummyConnectCommand extends AbstractCommandBase { - expectedResponseCode = AsynchronousCode.ConnectionInfo - - deserialize (msg: ResponseMessage) { - const res: ConnectionInfoResponse = { - ProtocolVersion: parseFloat(msg.Params['protocol version']), - Model: msg.Params['model'] - } - return res - } - serialize () { - // Nothing to send - return null - } -} diff --git a/src/commands/index.ts b/src/commands/index.ts index 907b39b..e93c650 100644 --- a/src/commands/index.ts +++ b/src/commands/index.ts @@ -1,6 +1,6 @@ export { AbstractCommand, ErrorResponse } from './abstractCommand' -export { ConnectionInfoResponse } from './connect' +export * from './connect' export * from './deviceInfo' export * from './notify' export * from './record' diff --git a/src/commands/internal.ts b/src/commands/internal.ts new file mode 100644 index 0000000..402239f --- /dev/null +++ b/src/commands/internal.ts @@ -0,0 +1,52 @@ +import { AsynchronousCode } from '../codes' +import { ResponseMessage, NamedMessage } from '../message' +import { AbstractCommandBase, AbstractCommandBaseNoResponse } from './abstractCommand' +import { ConnectionInfoResponse } from './connect' + +// Purpose of this is to emit the connect event with the connectionInfo +export class DummyConnectCommand extends AbstractCommandBase { + expectedResponseCode = AsynchronousCode.ConnectionInfo + + deserialize (msg: ResponseMessage) { + const res: ConnectionInfoResponse = { + ProtocolVersion: parseFloat(msg.Params['protocol version']), + Model: msg.Params['model'] + } + return res + } + serialize () { + // Nothing to send + return null + } +} + +export class WatchdogPeriodCommand extends AbstractCommandBaseNoResponse { + readonly Period: number + + constructor(period: number){ + super() + this.Period = period + } + + serialize () { + const res: NamedMessage = { + Name: 'watchdog', + Params: { + period: this.Period + '' + } + } + + return res + } +} + +export class PingCommand extends AbstractCommandBaseNoResponse { + serialize () { + const res: NamedMessage = { + Name: 'ping', + Params: {} + } + + return res + } +} \ No newline at end of file diff --git a/src/hyperdeck.ts b/src/hyperdeck.ts index 4a16133..61570d9 100644 --- a/src/hyperdeck.ts +++ b/src/hyperdeck.ts @@ -4,12 +4,13 @@ import { Socket } from 'net' import { ResponseCodeType, GetResponseCodeType, AsynchronousCode } from './codes' import { AbstractCommand, TransportInfoChange, SlotInfoChange } from './commands' import { ResponseMessage } from './message' -import { DummyConnectCommand } from './commands/connect' +import { DummyConnectCommand, WatchdogPeriodCommand, PingCommand } from './commands/internal' import { parseResponse, buildMessageStr } from './parser' export interface HyperdeckOptions { // address?: string, // port?: number, + pingPeriod?: number // set to 0 to disable debug?: boolean, externalLog?: (arg0?: any, arg1?: any, arg2?: any, arg3?: any) => void } @@ -24,6 +25,9 @@ export class Hyperdeck extends EventEmitter { private _log: (...args: any[]) => void private _commandQueue: AbstractCommand[] = [] private _receivedLinesQueue: string[] = [] + private _pingPeriod: number = 5000 + private _pingInterval: NodeJS.Timer | null = null + private _lastCommandTime: number = 0 constructor (options?: HyperdeckOptions) { super() @@ -32,21 +36,21 @@ export class Hyperdeck extends EventEmitter { this._log = options.externalLog || function (...args: any[]): void { console.log(...args) } + if (options.pingPeriod !== undefined) this._pingPeriod = options.pingPeriod } - this.socket = new Socket({ - // host: (options || {}).address, - // port: (options || {}).port - }) + this.socket = new Socket() this.socket.setEncoding('utf8') - - // this.socket.on('receivedStateChange', (command: AbstractCommand) => this._mutateState(command)) this.socket.on('error', (e) => this.emit('error', e)) - // this.socket.on('connect', () => this.emit('connected')) - this.socket.on('end', () => this.emit('disconnected')) // TODO - should this be on close? - this.socket.on('data', (d) => this._handleData((d as any) as string)) // TODO - fix this casting mess + this.socket.on('end', () => { + if (this._pingInterval) { + clearInterval(this._pingInterval) + this._pingInterval = null + } - this._log('init') + this.emit('disconnected') + }) // TODO - should this be on close? + this.socket.on('data', (d) => this._handleData((d as any) as string)) // TODO - fix this casting mess } connect (address: string, port?: number) { @@ -57,7 +61,18 @@ export class Hyperdeck extends EventEmitter { if (c.ProtocolVersion !== 1.6) { throw new Error('unknown protocol version: ' + c.ProtocolVersion) } + + if (this._pingPeriod > 0) { + const cmd = new WatchdogPeriodCommand(1 + Math.round(this._pingPeriod / 1000)) // TODO - set this slightly higher? + this.sendCommand(cmd) + cmd.then(() => { + if (this.DEBUG) this._log('ping: setting up') + this._pingInterval = setInterval(() => this._performPing(), this._pingPeriod) + }) + } + this.emit('connected', c) + }, e => { // TODO - clean up connection etc this._log('connection failed', e) @@ -77,21 +92,40 @@ export class Hyperdeck extends EventEmitter { } sendCommand (command: AbstractCommand) { + // TODO - abort if not connected + this._commandQueue.push(command) - + if (this.DEBUG) this._log('queued:', this._commandQueue.length) + if (this._commandQueue.length === 1) { this._sendQueuedCommand() } } + private _performPing () { + const timeout = this._pingPeriod + 1500 + if (Date.now() - this._lastCommandTime > timeout) { + this._log('ping: timed out') + // TODO - timed out + } else if (this._commandQueue.length > 0) { + // There are commands queued, which will reset the ping timers once executed + if (this.DEBUG) this._log('ping: queue has commands') + } else { + if (this.DEBUG) this._log('ping: queueing') + this.sendCommand(new PingCommand()) + } + } + private _sendQueuedCommand () { + if (this.DEBUG) this._log('try send:', this._commandQueue.length) + if (this._commandQueue.length === 0) return const cmd = this._commandQueue[0] const sent = this._sendCommand(cmd) // Command failed to send, so try next if (!sent) { - this._commandQueue.pop() + this._commandQueue.shift() this._sendQueuedCommand() } } @@ -105,6 +139,7 @@ export class Hyperdeck extends EventEmitter { try { this.socket.write(cmdString) + this._lastCommandTime = Date.now() command.markSent() return true @@ -124,7 +159,7 @@ export class Hyperdeck extends EventEmitter { while (this._receivedLinesQueue.length > 0) { // skip any blank lines if (this._receivedLinesQueue[0] === '') { - this._receivedLinesQueue.pop() + this._receivedLinesQueue.shift() continue } @@ -167,7 +202,7 @@ export class Hyperdeck extends EventEmitter { if (this._commandQueue.length > 0 && (!codeIsAsync || this._commandQueue[0].expectedResponseCode === resMsg.Code)) { // this belongs to the command, so handle it const cmd = this._commandQueue[0] - this._commandQueue.pop() + this._commandQueue.shift() cmd.handle(resMsg) this._sendQueuedCommand()