Skip to content

Commit

Permalink
feat: send pings to detect timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
Julusian committed Oct 11, 2018
1 parent af7c0ce commit 7d4dd3f
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 37 deletions.
21 changes: 0 additions & 21 deletions src/commands/connect.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,4 @@
import { AsynchronousCode } from '../codes'
import { ResponseMessage } from '../message'
import { AbstractCommandBase } from './abstractCommand'

export interface ConnectionInfoResponse {
ProtocolVersion: number
Model: string
}

// Purpose of this is to emit the connect event with the connectionInfo
export class DummyConnectCommand extends AbstractCommandBase<ConnectionInfoResponse> {
expectedResponseCode = AsynchronousCode.ConnectionInfo

deserialize (msg: ResponseMessage) {
const res: ConnectionInfoResponse = {
ProtocolVersion: parseFloat(msg.Params['protocol version']),
Model: msg.Params['model']
}
return res
}
serialize () {
// Nothing to send
return null
}
}
2 changes: 1 addition & 1 deletion src/commands/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export { AbstractCommand, ErrorResponse } from './abstractCommand'

export { ConnectionInfoResponse } from './connect'
export * from './connect'
export * from './deviceInfo'
export * from './notify'
export * from './record'
Expand Down
52 changes: 52 additions & 0 deletions src/commands/internal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { AsynchronousCode } from '../codes'
import { ResponseMessage, NamedMessage } from '../message'
import { AbstractCommandBase, AbstractCommandBaseNoResponse } from './abstractCommand'
import { ConnectionInfoResponse } from './connect'

// Purpose of this is to emit the connect event with the connectionInfo
export class DummyConnectCommand extends AbstractCommandBase<ConnectionInfoResponse> {
expectedResponseCode = AsynchronousCode.ConnectionInfo

deserialize (msg: ResponseMessage) {
const res: ConnectionInfoResponse = {
ProtocolVersion: parseFloat(msg.Params['protocol version']),
Model: msg.Params['model']
}
return res
}
serialize () {
// Nothing to send
return null
}
}

export class WatchdogPeriodCommand extends AbstractCommandBaseNoResponse {
readonly Period: number

constructor(period: number){
super()
this.Period = period
}

serialize () {
const res: NamedMessage = {
Name: 'watchdog',
Params: {
period: this.Period + ''
}
}

return res
}
}

export class PingCommand extends AbstractCommandBaseNoResponse {
serialize () {
const res: NamedMessage = {
Name: 'ping',
Params: {}
}

return res
}
}
65 changes: 50 additions & 15 deletions src/hyperdeck.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import { Socket } from 'net'
import { ResponseCodeType, GetResponseCodeType, AsynchronousCode } from './codes'
import { AbstractCommand, TransportInfoChange, SlotInfoChange } from './commands'
import { ResponseMessage } from './message'
import { DummyConnectCommand } from './commands/connect'
import { DummyConnectCommand, WatchdogPeriodCommand, PingCommand } from './commands/internal'
import { parseResponse, buildMessageStr } from './parser'

export interface HyperdeckOptions {
// address?: string,
// port?: number,
pingPeriod?: number // set to 0 to disable
debug?: boolean,
externalLog?: (arg0?: any, arg1?: any, arg2?: any, arg3?: any) => void
}
Expand All @@ -24,6 +25,9 @@ export class Hyperdeck extends EventEmitter {
private _log: (...args: any[]) => void
private _commandQueue: AbstractCommand[] = []
private _receivedLinesQueue: string[] = []
private _pingPeriod: number = 5000
private _pingInterval: NodeJS.Timer | null = null
private _lastCommandTime: number = 0

constructor (options?: HyperdeckOptions) {
super()
Expand All @@ -32,21 +36,21 @@ export class Hyperdeck extends EventEmitter {
this._log = options.externalLog || function (...args: any[]): void {
console.log(...args)
}
if (options.pingPeriod !== undefined) this._pingPeriod = options.pingPeriod
}

this.socket = new Socket({
// host: (options || {}).address,
// port: (options || {}).port
})
this.socket = new Socket()
this.socket.setEncoding('utf8')

// this.socket.on('receivedStateChange', (command: AbstractCommand) => this._mutateState(command))
this.socket.on('error', (e) => this.emit('error', e))
// this.socket.on('connect', () => this.emit('connected'))
this.socket.on('end', () => this.emit('disconnected')) // TODO - should this be on close?
this.socket.on('data', (d) => this._handleData((d as any) as string)) // TODO - fix this casting mess
this.socket.on('end', () => {
if (this._pingInterval) {
clearInterval(this._pingInterval)
this._pingInterval = null
}

this._log('init')
this.emit('disconnected')
}) // TODO - should this be on close?
this.socket.on('data', (d) => this._handleData((d as any) as string)) // TODO - fix this casting mess
}

connect (address: string, port?: number) {
Expand All @@ -57,7 +61,18 @@ export class Hyperdeck extends EventEmitter {
if (c.ProtocolVersion !== 1.6) {
throw new Error('unknown protocol version: ' + c.ProtocolVersion)
}

if (this._pingPeriod > 0) {
const cmd = new WatchdogPeriodCommand(1 + Math.round(this._pingPeriod / 1000)) // TODO - set this slightly higher?
this.sendCommand(cmd)
cmd.then(() => {
if (this.DEBUG) this._log('ping: setting up')
this._pingInterval = setInterval(() => this._performPing(), this._pingPeriod)
})
}

this.emit('connected', c)

}, e => {
// TODO - clean up connection etc
this._log('connection failed', e)
Expand All @@ -77,21 +92,40 @@ export class Hyperdeck extends EventEmitter {
}

sendCommand (command: AbstractCommand) {
// TODO - abort if not connected

this._commandQueue.push(command)

if (this.DEBUG) this._log('queued:', this._commandQueue.length)

if (this._commandQueue.length === 1) {
this._sendQueuedCommand()
}
}

private _performPing () {
const timeout = this._pingPeriod + 1500
if (Date.now() - this._lastCommandTime > timeout) {
this._log('ping: timed out')
// TODO - timed out
} else if (this._commandQueue.length > 0) {
// There are commands queued, which will reset the ping timers once executed
if (this.DEBUG) this._log('ping: queue has commands')
} else {
if (this.DEBUG) this._log('ping: queueing')
this.sendCommand(new PingCommand())
}
}

private _sendQueuedCommand () {
if (this.DEBUG) this._log('try send:', this._commandQueue.length)

if (this._commandQueue.length === 0) return
const cmd = this._commandQueue[0]
const sent = this._sendCommand(cmd)

// Command failed to send, so try next
if (!sent) {
this._commandQueue.pop()
this._commandQueue.shift()
this._sendQueuedCommand()
}
}
Expand All @@ -105,6 +139,7 @@ export class Hyperdeck extends EventEmitter {

try {
this.socket.write(cmdString)
this._lastCommandTime = Date.now()

command.markSent()
return true
Expand All @@ -124,7 +159,7 @@ export class Hyperdeck extends EventEmitter {
while (this._receivedLinesQueue.length > 0) {
// skip any blank lines
if (this._receivedLinesQueue[0] === '') {
this._receivedLinesQueue.pop()
this._receivedLinesQueue.shift()
continue
}

Expand Down Expand Up @@ -167,7 +202,7 @@ export class Hyperdeck extends EventEmitter {
if (this._commandQueue.length > 0 && (!codeIsAsync || this._commandQueue[0].expectedResponseCode === resMsg.Code)) {
// this belongs to the command, so handle it
const cmd = this._commandQueue[0]
this._commandQueue.pop()
this._commandQueue.shift()

cmd.handle(resMsg)
this._sendQueuedCommand()
Expand Down

0 comments on commit 7d4dd3f

Please sign in to comment.