Skip to content

Commit

Permalink
- Fix Stability of the Sockets
Browse files Browse the repository at this point in the history
shogo4405#606

- Do not interrupt recording on reconnect
  • Loading branch information
mkrn committed Jun 16, 2020
1 parent c89df37 commit b45f8e3
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 11 deletions.
59 changes: 49 additions & 10 deletions Sources/Net/NetSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ open class NetSocket: NSObject {
public var windowSizeC: Int = NetSocket.defaultWindowSizeC
/// The statistics of total incoming bytes.
open var totalBytesIn: Atomic<Int64> = .init(0)
open var qualityOfService: DispatchQoS = .default
open var qualityOfService: DispatchQoS = .userInitiated
open var securityLevel: StreamSocketSecurityLevel = .none
/// The statistics of total outgoing bytes.
open private(set) var totalBytesOut: Atomic<Int64> = .init(0)
Expand All @@ -30,7 +30,7 @@ open class NetSocket: NSObject {
private lazy var outputQueue = DispatchQueue(label: "com.haishinkit.HaishinKit.NetSocket.output", qos: qualityOfService)

public func connect(withName: String, port: Int) {
inputQueue.async {
DispatchQueue.global(qos: .userInteractive).async {
Stream.getStreamsToHost(
withName: withName,
port: port,
Expand All @@ -45,12 +45,41 @@ open class NetSocket: NSObject {
public func doOutput(data: Data, locked: UnsafeMutablePointer<UInt32>? = nil) -> Int {
queueBytesOut.mutate { $0 += Int64(data.count) }
outputQueue.async {
data.withUnsafeBytes { (buffer: UnsafeRawBufferPointer) -> Void in
self.doOutputProcess(buffer.baseAddress?.assumingMemoryBound(to: UInt8.self), maxLength: data.count)
guard let outputStream = self.outputStream else { // trying this , outputStream.streamStatus == .open
print("Connection not created yet ! =====> Return")
return
}
if locked != nil {
OSAtomicAnd32Barrier(0, locked!)

do {
let bytesWritten: Int? = try data.withUnsafeBytes({
let unsafeUInt8BufferPtr = $0.bindMemory(to: UInt8.self)

if let unsafeUInt8Ptr = unsafeUInt8BufferPtr.baseAddress, let outputStream = self.outputStream {

let count = try outputStream.write(unsafeUInt8Ptr, maxLength: unsafeUInt8BufferPtr.count);

if count == 0 {
print("Stream at capacity")
} else if count == -1 {
print("Operation failed: \(outputStream.streamError)")
} else {
self.totalBytesOut.mutate { $0 += Int64(count) }
self.queueBytesOut.mutate { $0 -= Int64(count) }

}

if count > 0 {
return count
}
}
throw NSError(domain:"", code:123, userInfo:nil)
})
}
catch {print("err in doOutput") }
}
// Handle lock here
if locked != nil {
OSAtomicAnd32Barrier(0, locked!)
}
return data.count
}
Expand Down Expand Up @@ -85,8 +114,12 @@ open class NetSocket: NSObject {
}

final func doOutputProcess(_ data: Data) {
data.withUnsafeBytes { (buffer: UnsafeRawBufferPointer) -> Void in
doOutputProcess(buffer.baseAddress?.assumingMemoryBound(to: UInt8.self), maxLength: data.count)
do {
try data.withUnsafeBytes { (buffer: UnsafeRawBufferPointer) -> Void in
self.doOutputProcess(buffer.baseAddress?.assumingMemoryBound(to: UInt8.self), maxLength: data.count)
}
} catch {
print("err in doOutputProcess(data)")
}
}

Expand All @@ -97,10 +130,16 @@ open class NetSocket: NSObject {
var total: Int = 0
repeat {
guard let outputStream = outputStream else {
print("Connection not created yet ! =====> Return")
return
}
let length = outputStream.write(buffer.advanced(by: total), maxLength: maxLength - total)
if 0 < length {
if length == 0 {
print("Stream at capacity")
}
else if length == -1 {
print("Operation failed: \(outputStream.streamError)")
} else {
total += length
totalBytesOut.mutate { $0 += Int64(length) }
queueBytesOut.mutate { $0 -= Int64(length) }
Expand All @@ -109,7 +148,7 @@ open class NetSocket: NSObject {
}

func close(isDisconnected: Bool) {
outputQueue.async {
DispatchQueue.global(qos: .userInteractive).async {
guard self.runloop != nil else {
return
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/RTMP/RTMPStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ open class RTMPStream: NetStream {
mixer.audioIO.encoder.stopRunning()
mixer.videoIO.encoder.stopRunning()
sampler?.stopRunning()
mixer.recorder.stopRunning()

default:
break
}
Expand Down

0 comments on commit b45f8e3

Please sign in to comment.