Skip to content

Commit

Permalink
fix(socket): prevent connection from dying when the main thread is bl…
Browse files Browse the repository at this point in the history
…ocked (#19)

Moves the socket to a child process, so that the main thread can be blocked without our socket missing pings, therefore keeping our connection to the ATEM alive regardless of how hard the main thread is churning.
  • Loading branch information
Alex Van Camp authored Sep 11, 2018
1 parent 10e6eed commit 6814713
Show file tree
Hide file tree
Showing 8 changed files with 471 additions and 163 deletions.
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@
"node": ">=4.5"
},
"devDependencies": {
"@types/exit-hook": "^1.1.0",
"@types/jest": "^22.1.3",
"@types/node": "^8.0.4",
"@types/p-retry": "^2.0.0",
"codecov": "^2.2.0",
"cpx": "^1.5.0",
"gh-pages": "^1.0.0",
Expand Down Expand Up @@ -103,6 +105,8 @@
"video"
],
"dependencies": {
"exit-hook": "^2.0.0",
"p-retry": "^2.0.0",
"pngjs": "^3.3.2",
"tslib": "^1.9.0"
},
Expand Down
21 changes: 10 additions & 11 deletions src/atem.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { EventEmitter } from 'events'
import { AtemState } from './state'
import { AtemSocket } from './lib/atemSocket'
import { MacroAction } from './enums'
import { IPCMessageType, MacroAction } from './enums'
import AbstractCommand from './commands/AbstractCommand'
import * as Commands from './commands'
import { MediaPlayer } from './state/media'
Expand Down Expand Up @@ -56,14 +56,14 @@ export class Atem extends EventEmitter {
port: (options || {}).port
})
this.socket.on('receivedStateChange', (command: AbstractCommand) => this._mutateState(command))
this.socket.on('commandAcknowleged', (packetId: number) => this._resolveCommand(packetId))
this.socket.on(IPCMessageType.CommandAcknowledged, ({trackingId}: {trackingId: number}) => this._resolveCommand(trackingId))
this.socket.on('error', (e) => this.emit('error', e))
this.socket.on('connect', () => this.emit('connected'))
this.socket.on('disconnect', () => this.emit('disconnected'))
}

connect (address: string, port?: number) {
this.socket.connect(address, port)
return this.socket.connect(address, port)
}

disconnect (): Promise<void> {
Expand All @@ -74,13 +74,12 @@ export class Atem extends EventEmitter {

sendCommand (command: AbstractCommand): Promise<any> {
const nextPacketId = this.socket.nextPacketId
const promise = new Promise((resolve, reject) => {
this._sentQueue[nextPacketId] = command
return new Promise((resolve, reject) => {
command.resolve = resolve
command.reject = reject
this.socket._sendCommand(command, nextPacketId).catch(reject)
})
this._sentQueue[nextPacketId] = command
this.socket._sendCommand(command)
return promise
}

changeProgramInput (input: number, me = 0) {
Expand Down Expand Up @@ -340,10 +339,10 @@ export class Atem extends EventEmitter {
}
}

private _resolveCommand (packetId: number) {
if (this._sentQueue[packetId]) {
this._sentQueue[packetId].resolve(this._sentQueue[packetId])
delete this._sentQueue[packetId]
private _resolveCommand (trackingId: number) {
if (this._sentQueue[trackingId]) {
this._sentQueue[trackingId].resolve(this._sentQueue[trackingId])
delete this._sentQueue[trackingId]
}
}
}
9 changes: 9 additions & 0 deletions src/enums/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,12 @@ export enum SuperSourceArtOption {
Background,
Foreground
}

export enum IPCMessageType {
Log = 'log',
Connect = 'connect',
Disconnect = 'disconnect',
InboundCommand = 'inboundCommand',
OutboundCommand = 'outboundCommand',
CommandAcknowledged = 'commandAcknowledged'
}
Loading

0 comments on commit 6814713

Please sign in to comment.