Skip to content

Commit

Permalink
feat: use a nanotimer in socket child for ack'ing
Browse files Browse the repository at this point in the history
  • Loading branch information
Balte de Wit committed Nov 1, 2018
1 parent 09bd942 commit 84482ca
Showing 1 changed file with 28 additions and 20 deletions.
48 changes: 28 additions & 20 deletions src/lib/atemSocketChild.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ import { EventEmitter } from 'events'
import { format } from 'util'
import { Util } from './atemUtil'
import { ConnectionState, IPCMessageType, PacketFlag } from '../enums'
import * as NanoTimer from 'nanotimer'

export class AtemSocketChild extends EventEmitter {
private _connectionState = ConnectionState.Closed
private _debug = false
private _reconnectTimer: NodeJS.Timer | undefined
private _retransmitTimer: NodeJS.Timer | undefined
// private _retransmitTimer: NodeJS.Timer | undefined
private _retransmitTimer = new NanoTimer()

private _localPacketId = 1
private _maxPacketID = (1 << 15) - 1 // Atem expects 15 not 16 bits before wrapping
Expand All @@ -19,12 +21,15 @@ export class AtemSocketChild extends EventEmitter {
private _socket: Socket
private _reconnectInterval = 5000

private _inFlightTimeout = 100
// private _inFlightTimeout = 100
private _inFlightTimeout = 30
private _maxRetries = 5
private _lastReceivedAt: number = Date.now()
private _lastReceivedPacketId = 0
private _inFlight: Array<{packetId: number, trackingId: number, lastSent: number, packet: Buffer, resent: number}> = []
// private _ackTimer: NodeJS.Timer | null
private _ackTimer = new NanoTimer()
private _hasTimeout = false
private _receivedWithoutAck = 0

constructor (options: { address?: string, port?: number } = {}) {
super()
Expand All @@ -51,7 +56,8 @@ export class AtemSocketChild extends EventEmitter {
}, this._reconnectInterval)
}
if (!this._retransmitTimer) {
this._retransmitTimer = setInterval(() => this._checkForRetransmit(), 50)
// this._retransmitTimer = setInterval(() => this._checkForRetransmit(), 50)
this._retransmitTimer.setInterval(() => this._checkForRetransmit(), '', 10)
}

if (address) {
Expand Down Expand Up @@ -149,7 +155,7 @@ export class AtemSocketChild extends EventEmitter {
if (flags & PacketFlag.AckRequest) {
if (this._connectionState === ConnectionState.Established) {
if (remotePacketId === (this._lastReceivedPacketId + 1) % this._maxPacketID) {
this._sendAck(remotePacketId)
this._attemptAck(remotePacketId)
this._lastReceivedPacketId = remotePacketId
} else {
return
Expand Down Expand Up @@ -184,21 +190,23 @@ export class AtemSocketChild extends EventEmitter {
this._socket.send(packet, 0, packet.length, this._port, this._address)
}

// private _attemptAck (packetId: number) {
// if (this._lastReceivedPacketId && packetId !== (this._lastReceivedPacketId + 1) % this._maxPacketID) return false
// this._lastReceivedPacketId = packetId
// this._receivedWithoutAck++
// if (this._receivedWithoutAck === 16) {
// this._receivedWithoutAck = 0
// this._ackTimer = null
// this._sendAck(this._lastReceivedPacketId)
// } else if (!this._ackTimer) this._ackTimer = setTimeout(() => {
// this._receivedWithoutAck = 0
// this._ackTimer = null
// this._sendAck(this._lastReceivedPacketId)
// }, 0)
// return true
// }
private _attemptAck (packetId: number) {
this._lastReceivedPacketId = packetId
this._receivedWithoutAck++
if (this._receivedWithoutAck === 16) {
this._receivedWithoutAck = 0
this._hasTimeout = false
this._ackTimer.clearTimeout()
this._sendAck(this._lastReceivedPacketId)
} else if (!this._hasTimeout) {
this._hasTimeout = true
this._ackTimer.setTimeout(() => {
this._receivedWithoutAck = 0
this._hasTimeout = false
this._sendAck(this._lastReceivedPacketId)
}, '', '5ms')
}
}

private _sendAck (packetId: number) {
const buffer = new Buffer(12)
Expand Down

0 comments on commit 84482ca

Please sign in to comment.