Skip to content

Commit

Permalink
feat: Remove promise from command, and create it when command is queu…
Browse files Browse the repository at this point in the history
…ed instead
  • Loading branch information
Julusian committed Oct 15, 2018
1 parent b1f395d commit 6577549
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 73 deletions.
12 changes: 3 additions & 9 deletions src/__tests__/hyperdeck.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,7 @@ describe('Hyperdeck', () => {
expect(onSocketCreate).toHaveBeenCalledTimes(1)

let onResolve = jest.fn()
const stopCommand = new StopCommand()
stopCommand.then(onResolve)
hp.sendCommand(stopCommand)
hp.sendCommand(new StopCommand()).then(onResolve)

await waitALittleBit()
expect(onSocketWrite).toHaveBeenCalledTimes(1)
Expand Down Expand Up @@ -285,9 +283,7 @@ describe('Hyperdeck', () => {
expect(onSocketCreate).toHaveBeenCalledTimes(1)

let onResolve = jest.fn()
const stopCommand = new StopCommand()
stopCommand.catch(onResolve)
hp.sendCommand(stopCommand)
hp.sendCommand(new StopCommand()).catch(onResolve)

await waitALittleBit()
expect(onSocketWrite).toHaveBeenCalledTimes(1)
Expand Down Expand Up @@ -378,9 +374,7 @@ describe('Hyperdeck', () => {
hp.on('notify.slot', onAsyncReceive)

let onResolve = jest.fn()
const stopCommand = new StopCommand()
stopCommand.then(onResolve)
hp.sendCommand(stopCommand)
hp.sendCommand(new StopCommand()).then(onResolve)

await waitALittleBit()
expect(onAsyncReceive).toHaveBeenCalledTimes(1)
Expand Down
42 changes: 3 additions & 39 deletions src/commands/abstractCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,14 @@ import { ResponseMessage, NamedMessage } from '../message'
export interface ErrorResponse extends ResponseMessage {
}

export interface ICommand extends Promise<any> {
expectedResponseCode: ResponseCode

handle (msg: ResponseMessage)

serialize (): NamedMessage | null
}

export abstract class AbstractCommand<T> implements Promise<T>, ICommand {
export abstract class AbstractCommand {
abstract expectedResponseCode: ResponseCode

[Symbol.toStringTag]

protected resolve: (res: T) => void
protected reject: (res: ErrorResponse) => void
private _promise: Promise<T>

constructor () {
this._promise = new Promise<T>((resolve, reject) => {
this.resolve = resolve
this.reject = reject
})
}

abstract deserialize (msg: ResponseMessage): T
abstract deserialize (msg: ResponseMessage): any
abstract serialize (): NamedMessage | null

async then<TResult1 = T, TResult2 = never> (onfulfilled?: ((value: T) => TResult1 | PromiseLike<TResult1>) | undefined | null, onrejected?: ((reason: any) => TResult2 | PromiseLike<TResult2>) | undefined | null): Promise<TResult1 | TResult2> {
return this._promise.then(onfulfilled, onrejected)
}
async catch<TResult = never> (onrejected?: ((reason: any) => TResult | PromiseLike<TResult>) | undefined | null): Promise<T | TResult> {
return this._promise.catch(onrejected)
}

handle (msg: ResponseMessage) {
if (msg.code === this.expectedResponseCode) {
this.resolve(this.deserialize(msg))
} else {
this.reject(msg)
}
}
}

export abstract class AbstractCommandNoResponse extends AbstractCommand<void> {
export abstract class AbstractCommandNoResponse extends AbstractCommand {
expectedResponseCode = SynchronousCode.OK

deserialize (msg: ResponseMessage): void {
Expand Down
2 changes: 1 addition & 1 deletion src/commands/deviceInfo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export interface DeviceInfoCommandResponse {
uniqueId: string
}

export class DeviceInfoCommand extends AbstractCommand<DeviceInfoCommandResponse> {
export class DeviceInfoCommand extends AbstractCommand {
expectedResponseCode = SynchronousCode.DeviceInfo

deserialize (msg: ResponseMessage) {
Expand Down
2 changes: 1 addition & 1 deletion src/commands/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export { ICommand, ErrorResponse } from './abstractCommand'
export { AbstractCommand, ErrorResponse } from './abstractCommand'

export * from './connect'
export * from './deviceInfo'
Expand Down
2 changes: 1 addition & 1 deletion src/commands/internal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { AbstractCommand, AbstractCommandNoResponse } from './abstractCommand'
import { ConnectionInfoResponse } from './connect'

// Purpose of this is to emit the connect event with the connectionInfo
export class DummyConnectCommand extends AbstractCommand<ConnectionInfoResponse> {
export class DummyConnectCommand extends AbstractCommand {
expectedResponseCode = AsynchronousCode.ConnectionInfo

deserialize (msg: ResponseMessage) {
Expand Down
2 changes: 1 addition & 1 deletion src/commands/notify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export interface NotifyCommandResponse {
droppedFrames: boolean
}

export class NotifyGetCommand extends AbstractCommand<NotifyCommandResponse> {
export class NotifyGetCommand extends AbstractCommand {
expectedResponseCode = SynchronousCode.Notify

deserialize (msg: ResponseMessage) {
Expand Down
2 changes: 1 addition & 1 deletion src/commands/slotInfo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export interface SlotInfoCommandResponse {
videoFormat: VideoFormat
}

export class SlotInfoCommand extends AbstractCommand<SlotInfoCommandResponse> {
export class SlotInfoCommand extends AbstractCommand {
expectedResponseCode = SynchronousCode.SlotInfo

deserialize (msg: ResponseMessage) {
Expand Down
2 changes: 1 addition & 1 deletion src/commands/transportInfo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export interface TransportInfoCommandResponse {
loop: boolean
}

export class TransportInfoCommand extends AbstractCommand<TransportInfoCommandResponse> {
export class TransportInfoCommand extends AbstractCommand {
expectedResponseCode = SynchronousCode.TransportInfo

deserialize (msg: ResponseMessage) {
Expand Down
63 changes: 44 additions & 19 deletions src/hyperdeck.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { EventEmitter } from 'events'
import { Socket } from 'net'

import { ResponseCodeType, GetResponseCodeType, AsynchronousCode } from './codes'
import { ICommand } from './commands'
import { AbstractCommand, ErrorResponse } from './commands'
import * as AsyncHandlers from './asyncHandlers'
import { ResponseMessage } from './message'
import { DummyConnectCommand, WatchdogPeriodCommand, PingCommand } from './commands/internal'
Expand All @@ -14,6 +14,22 @@ export interface HyperdeckOptions {
externalLog?: (arg0?: any, arg1?: any, arg2?: any, arg3?: any) => void
}

class QueuedCommand {
public readonly promise: Promise<any>;
public readonly command: AbstractCommand;

public resolve: (res: any) => void
public reject: (res: ErrorResponse) => void

constructor(command: AbstractCommand) {
this.command = command;
this.promise = new Promise((resolve, reject) => {
this.resolve = resolve
this.reject = reject
})
}
}

export class Hyperdeck extends EventEmitter {
DEFAULT_PORT = 9993
RECONNECT_INTERVAL = 5000
Expand All @@ -24,7 +40,7 @@ export class Hyperdeck extends EventEmitter {
private _connected: boolean = false
private _retryConnectTimeout: NodeJS.Timer
private _log: (...args: any[]) => void
private _commandQueue: ICommand[] = []
private _commandQueue: QueuedCommand[] = []
private _pingPeriod: number = 5000
private _pingInterval: NodeJS.Timer | null = null
private _lastCommandTime: number = 0
Expand Down Expand Up @@ -82,8 +98,8 @@ export class Hyperdeck extends EventEmitter {
if (this._connection_active) return
this._connection_active = true

const connCommand = new DummyConnectCommand()
connCommand.then(c => {
this._commandQueue = []
this._queueCommand(new DummyConnectCommand()).then(c => {
// TODO - we can filter supported versions here. for now we shall not as it is likely that there will not be any issues
// if (c.protocolVersion !== 1.6) {
// throw new Error('unknown protocol version: ' + c.protocolVersion)
Expand All @@ -93,9 +109,10 @@ export class Hyperdeck extends EventEmitter {
const cmd = new WatchdogPeriodCommand(1 + Math.round(this._pingPeriod / 1000))

// force the command to send
this._commandQueue = [cmd]
this._commandQueue = []
const prom = this._queueCommand(cmd)
this._sendQueuedCommand()
return cmd.then(() => {
return prom.then(() => {
this._logDebug('ping: setting up')
this._pingInterval = setInterval(() => this._performPing(), this._pingPeriod)
}).then(() => c)
Expand All @@ -113,7 +130,6 @@ export class Hyperdeck extends EventEmitter {

this._triggerRetryConnection()
})
this._commandQueue = [connCommand]

this._host = address
this._port = port || this.DEFAULT_PORT
Expand All @@ -136,19 +152,17 @@ export class Hyperdeck extends EventEmitter {
})
}

sendCommand (...commands: ICommand[]) {
if (!this._connected) return false

commands.forEach(command => {
this._commandQueue.push(command)
this._logDebug('queued:', this._commandQueue.length)
})
sendCommand (command: AbstractCommand): Promise<any> {
if (!this._connected) return Promise.reject()

const res = this._queueCommand(command)
this._logDebug('queued:', this._commandQueue.length)

if (this._commandQueue.length === 1) {
this._sendQueuedCommand()
}

return true
return res
}

get connected () {
Expand Down Expand Up @@ -198,8 +212,14 @@ export class Hyperdeck extends EventEmitter {
}
}

private _sendCommand (command: ICommand): boolean {
const msg = command.serialize()
private _queueCommand (command: AbstractCommand): Promise<any> {
const cmdWrapper = new QueuedCommand(command);
this._commandQueue.push(cmdWrapper)
return cmdWrapper.promise
}

private _sendCommand (command: QueuedCommand): boolean {
const msg = command.command.serialize()
if (msg === null) return false

const cmdString = buildMessageStr(msg)
Expand Down Expand Up @@ -239,11 +259,16 @@ export class Hyperdeck extends EventEmitter {
// leave it to fall through in case the queued command is waiting for an async response
}

if (this._commandQueue.length > 0 && (!codeIsAsync || this._commandQueue[0].expectedResponseCode === resMsg.code)) {
if (this._commandQueue.length > 0 && (!codeIsAsync || this._commandQueue[0].command.expectedResponseCode === resMsg.code)) {
const cmd = this._commandQueue[0]
this._commandQueue.shift()

cmd.handle(resMsg)
if (cmd.command.expectedResponseCode === resMsg.code) {
cmd.resolve(cmd.command.deserialize(resMsg))
} else {
cmd.reject(resMsg)
}

this._sendQueuedCommand()
}
})
Expand Down

0 comments on commit 6577549

Please sign in to comment.