Skip to content

Commit

Permalink
feat: connection retry logic. test for ping timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
Julusian committed Oct 15, 2018
1 parent 01b4634 commit b1f395d
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 12 deletions.
90 changes: 83 additions & 7 deletions src/__tests__/hyperdeck.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,22 @@ function waitALittleBit () {
}

describe('Hyperdeck', () => {
let now: number = 10000
beforeAll(() => {
Date.now = jest.fn(() => {
return getCurrentTime()
})
})
function getCurrentTime () {
return now
}
function advanceTime (advanceTime: number) {
now += advanceTime
jest.advanceTimersByTime(advanceTime)
}
beforeEach(() => {
now = 10000
})

test('Check simple connection', async () => {
let onSocketCreate = jest.fn()
Expand Down Expand Up @@ -50,21 +66,26 @@ describe('Hyperdeck', () => {
expect(onSocketWrite).toHaveBeenCalledTimes(0)
})

test('Check protocol version', async () => {
test('Check connection retry', async () => {
jest.useFakeTimers()

let onSocketCreate = jest.fn()
let onConnection = jest.fn()
let onDisconnection = jest.fn()
let onSocketWrite = jest.fn()

let thisSocket
// @ts-ignore MockSocket
MockSocket.mockOnNextSocket((socket: any) => {
onSocketCreate(onSocketCreate)

socket.onConnect = () => {
socket.mockData('500 connection info:\r\nprotocol version: 9.9\r\nmodel: test\r\n\r\n')
socket.mockData('500 connection info:\r\nprotocol version: 1.6\r\nmodel: test\r\n\r\n')
onConnection()
}

socket.onClose = onDisconnection
socket.onWrite = onSocketWrite
thisSocket = socket
})

const hp = new Hyperdeck({ pingPeriod: 0 })
Expand All @@ -77,16 +98,63 @@ describe('Hyperdeck', () => {
hp.connect('127.0.0.1')
await waitALittleBit()

expect(hp.connected).toBeFalsy()
expect(onClientConnection).toHaveBeenCalledTimes(0)
expect(onClientError).toHaveBeenCalledTimes(1)
expect(hp.connected).toBeTruthy()
expect(onClientConnection).toHaveBeenCalledTimes(1)
expect(onClientError).toHaveBeenCalledTimes(0)
expect(onConnection).toHaveBeenCalledTimes(1)
expect(onDisconnection).toHaveBeenCalledTimes(0)

thisSocket.end()
await waitALittleBit()

expect(hp.connected).toBeFalsy()
expect(onDisconnection).toHaveBeenCalledTimes(1)

advanceTime(6000)

expect(onSocketCreate).toHaveBeenCalledTimes(1)
expect(onSocketWrite).toHaveBeenCalledTimes(0)
})

// test('Check protocol version', async () => {
// let onSocketCreate = jest.fn()
// let onConnection = jest.fn()
// let onSocketWrite = jest.fn()

// // @ts-ignore MockSocket
// MockSocket.mockOnNextSocket((socket: any) => {
// onSocketCreate(onSocketCreate)

// socket.onConnect = () => {
// socket.mockData('500 connection info:\r\nprotocol version: 9.9\r\nmodel: test\r\n\r\n')
// onConnection()
// }

// socket.onWrite = onSocketWrite
// })

// const hp = new Hyperdeck({ pingPeriod: 0 })

// let onClientConnection = jest.fn()
// let onClientError = jest.fn()
// hp.on('connected', onClientConnection)
// hp.on('error', onClientError)

// hp.connect('127.0.0.1')
// await waitALittleBit()

// expect(hp.connected).toBeFalsy()
// expect(onClientConnection).toHaveBeenCalledTimes(0)
// expect(onClientError).toHaveBeenCalledTimes(1)
// expect(onConnection).toHaveBeenCalledTimes(1)

// expect(onSocketCreate).toHaveBeenCalledTimes(1)
// expect(onSocketWrite).toHaveBeenCalledTimes(0)
// })

test('Check ping setup', async () => {
jest.useFakeTimers()

let onSocketCreate = jest.fn()
let onConnection = jest.fn()
let onSocketWrite = jest.fn()
Expand All @@ -103,6 +171,7 @@ describe('Hyperdeck', () => {
socket.onWrite = onSocketWrite

socket.mockExpectedWrite('watchdog:\r\nperiod: 13\r\n\r\n', '200 ok\r\n')
socket.mockExpectedWrite('ping\r\n', '200 ok\r\n')
})

const hp = new Hyperdeck({ pingPeriod: 12000 })
Expand All @@ -122,8 +191,15 @@ describe('Hyperdeck', () => {

expect(onSocketCreate).toHaveBeenCalledTimes(1)
expect(onSocketWrite).toHaveBeenCalledTimes(1)

// Advance so that a ping should send
advanceTime(12200)
expect(onSocketWrite).toHaveBeenCalledTimes(2)

// Advance so socket times out
advanceTime(15000)

// TODO - test ping causes a timeout, and sends etc
expect(hp.connected).toBeFalsy()

hp.disconnect()
})
Expand Down
41 changes: 36 additions & 5 deletions src/hyperdeck.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@ export class Hyperdeck extends EventEmitter {
event: EventEmitter
private socket: Socket
private _connected: boolean = false
private _retryConnectTimeout: NodeJS.Timer
private _log: (...args: any[]) => void
private _commandQueue: ICommand[] = []
private _pingPeriod: number = 5000
private _pingInterval: NodeJS.Timer | null = null
private _lastCommandTime: number = 0
private _asyncHandlers: {[key: number]: AsyncHandlers.IHandler} = {}
private _parser: MultilineParser

private _connection_active: boolean = false // True when connected/connecting/reconnecting
private _host: string
private _port: number

constructor (options?: HyperdeckOptions) {
super()
Expand Down Expand Up @@ -57,7 +62,8 @@ export class Hyperdeck extends EventEmitter {
}

this.emit('disconnected')
// TODO - retry connection

this._triggerRetryConnection()
})
this.socket.on('data', (d) => this._handleData(d.toString()))

Expand All @@ -73,12 +79,15 @@ export class Hyperdeck extends EventEmitter {

connect (address: string, port?: number) {
if (this._connected) return
if (this._connection_active) return
this._connection_active = true

const connCommand = new DummyConnectCommand()
connCommand.then(c => {
if (c.protocolVersion !== 1.6) {
throw new Error('unknown protocol version: ' + c.protocolVersion)
}
// TODO - we can filter supported versions here. for now we shall not as it is likely that there will not be any issues
// if (c.protocolVersion !== 1.6) {
// throw new Error('unknown protocol version: ' + c.protocolVersion)
// }

if (this._pingPeriod > 0) {
const cmd = new WatchdogPeriodCommand(1 + Math.round(this._pingPeriod / 1000))
Expand All @@ -101,13 +110,20 @@ export class Hyperdeck extends EventEmitter {
this.socket.end()
this.emit('error', 'connection failed', e)
this._log('connection failed', e)

this._triggerRetryConnection()
})
this._commandQueue = [connCommand]

this.socket.connect(port || this.DEFAULT_PORT, address)
this._host = address
this._port = port || this.DEFAULT_PORT
this.socket.connect(this._port, this._host)
}

disconnect (): Promise<void> {
this._connection_active = false
clearTimeout(this._retryConnectTimeout)

if (!this._connected) return Promise.resolve()

return new Promise((resolve, reject) => {
Expand Down Expand Up @@ -139,6 +155,21 @@ export class Hyperdeck extends EventEmitter {
return this._connected
}

private _triggerRetryConnection () {
if (!this._retryConnectTimeout) {
this._retryConnectTimeout = setTimeout(() => {
this._retryConnection()
}, this.RECONNECT_INTERVAL)
}
}
private _retryConnection () {
clearTimeout(this._retryConnectTimeout)

if (!this.connected && this._connection_active) {
this.socket.connect(this._port, this._host)
}
}

private async _performPing () {
const timeout = this._pingPeriod + 1500
if (Date.now() - this._lastCommandTime > timeout) {
Expand Down

0 comments on commit b1f395d

Please sign in to comment.