Skip to content

Commit

Permalink
feat: retransmit on demand and handle impossible retransmits by reset…
Browse files Browse the repository at this point in the history
…ting the connection
  • Loading branch information
Julusian committed Dec 7, 2019
1 parent 22ff629 commit 537d907
Showing 1 changed file with 40 additions and 36 deletions.
76 changes: 40 additions & 36 deletions src/lib/atemSocketChild.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import { ConnectionState, IPCMessageType, PacketFlag } from '../enums'
import * as NanoTimer from 'nanotimer'
import { DEFAULT_PORT } from '../atem'

const IN_FLIGHT_TIMEOUT = 30 // ms
const IN_FLIGHT_TIMEOUT = 60 // ms
const CONNECTION_TIMEOUT = 5000 // ms
const CONNECTION_RETRY_INTERVAL = 1000 // ms
const MAX_PACKET_RETRIES = 5
const MAX_PACKET_RETRIES = 10
const MAX_PACKET_ID = (1 << 15) // Atem expects 15 not 16 bits before wrapping
const MAX_PACKET_PER_ACK = 16

Expand Down Expand Up @@ -44,14 +44,12 @@ export class AtemSocketChild extends EventEmitter {
public on!: ((event: IPCMessageType.Disconnect, listener: () => void) => this) &
((event: IPCMessageType.Log, listener: (payload: string) => void) => this) &
((event: IPCMessageType.InboundCommand, listener: (payload: Buffer, packetId: number) => void) => this) &
((event: IPCMessageType.CommandAcknowledged, listener: (packetId: number, trackingId: number) => void) => this) &
((event: IPCMessageType.CommandReject, listener: (packetId: number, trackingId: number) => void) => this)
((event: IPCMessageType.CommandAcknowledged, listener: (packetId: number, trackingId: number) => void) => this)

public emit!: ((event: IPCMessageType.Disconnect) => boolean) &
((event: IPCMessageType.Log, payload: string) => boolean) &
((event: IPCMessageType.InboundCommand, payload: Buffer, packetId: number) => boolean) &
((event: IPCMessageType.CommandAcknowledged, packetId: number, trackingId: number) => boolean) &
((event: IPCMessageType.CommandReject, packetId: number, trackingId: number) => boolean)
((event: IPCMessageType.CommandAcknowledged, packetId: number, trackingId: number) => boolean)

constructor (options: { address?: string, port?: number } = {}) {
super()
Expand All @@ -76,10 +74,10 @@ export class AtemSocketChild extends EventEmitter {
this._retransmitTimer = setInterval(() => this._checkForRetransmit(), 10)
}

if (address) {
if (address !== undefined) {
this._address = address
}
if (port) {
if (port !== undefined) {
this._port = port
}

Expand Down Expand Up @@ -206,8 +204,7 @@ export class AtemSocketChild extends EventEmitter {
const fromPacketId = packet.readUInt16BE(6)
this.log(`Retransmit request: ${fromPacketId}`)

// TODO - enable and test this
// this._checkForRetransmit(fromPacketId)
this._retransmitFrom(fromPacketId)
}

// Got a packet that needs an ack
Expand Down Expand Up @@ -239,13 +236,16 @@ export class AtemSocketChild extends EventEmitter {
return true
}
})
// this.log(`${Date.now()} Got ack ${ackPacketId} Remaining=${this._inFlight.length}`)
}
}
}

private _sendPacket (packet: Buffer) {
if (this._debug) this.log(`SEND ${packet}`)
if (Math.random() > 0.25) {
this._socket.send(packet, 0, packet.length, this._port, this._address)
}
}

private _sendOrQueueAck () {
Expand Down Expand Up @@ -276,38 +276,42 @@ export class AtemSocketChild extends EventEmitter {
this._sendPacket(buffer)
}

private _checkForRetransmit (retransmitFromPacketId?: number) {
this._inFlight = this._inFlight.filter(sentPacket => {
if (retransmitFromPacketId && sentPacket.packetId > retransmitFromPacketId) {
sentPacket.lastSent = Date.now()
sentPacket.resent++
this._sendPacket(sentPacket.payload)
} else if (sentPacket && sentPacket.lastSent + IN_FLIGHT_TIMEOUT < Date.now()) {
if (sentPacket.resent <= MAX_PACKET_RETRIES && this._isPacketCoveredByAck(this._nextSendPacketId, sentPacket.packetId)) {
private _retransmitFrom (fromId: number) {
// this.log(`Resending from ${fromId} to ${this._inFlight.length > 0 ? this._inFlight[this._inFlight.length - 1].packetId : '-'}`)

const fromIndex = this._inFlight.findIndex(pkt => pkt.packetId === fromId)
if (fromIndex === -1) {
// fromId is not inflight, so we cannot resend. only fix is to abort
this.restartConnection()
} else {
this.log(`Resending from ${fromId} to ${this._inFlight[this._inFlight.length - 1].packetId}`)
// Resend from the requested
for (let i = fromIndex; i < this._inFlight.length; i++) {
const sentPacket = this._inFlight[i]
if (sentPacket.packetId === fromId || !this._isPacketCoveredByAck(fromId, sentPacket.packetId)) {
sentPacket.lastSent = Date.now()
sentPacket.resent++

this.log(`Resending ${sentPacket.packetId}`)
if (this._debug) {
this.log(`RESEND: ${sentPacket}`)
}
// this.log(`${Date.now()} Resending ${sentPacket.packetId} Last=${this._nextSendPacketId - 1}`)
this._sendPacket(sentPacket.payload)
retransmitFromPacketId = sentPacket.packetId
} else {
this.emit(IPCMessageType.CommandReject, sentPacket.packetId, sentPacket.trackingId)

this.log(`Timed out: ${sentPacket.packetId}`)
if (this._debug) {
this.log(`TIMED OUT: ${sentPacket}`)
}
// @todo: we should probably break up the connection here.

// Discard
return false
}
}
}
}

return true
})
private _checkForRetransmit () {
for (const sentPacket of this._inFlight) {
if (sentPacket.lastSent + IN_FLIGHT_TIMEOUT < Date.now()) {
if (sentPacket.resent <= MAX_PACKET_RETRIES && this._isPacketCoveredByAck(this._nextSendPacketId, sentPacket.packetId)) {
this.log(`Retransmit from timeout: ${sentPacket.packetId}`)
// Retransmit the packet and anything after it
this._retransmitFrom(sentPacket.packetId)
} else {
// A command has timed out, so we need to reset to avoid getting stuck
this.restartConnection()
}
return
}
}
}
}

0 comments on commit 537d907

Please sign in to comment.