Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ability to limit the size of the send queue buffer. #561

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions SocketRocket/SRWebSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ extern NSString *const SRHTTPResponseErrorKey;
*/
@property (nonatomic, assign, readonly) BOOL allowsUntrustedSSLCertificates;

/**
Allow limiting the internal tx queue size to a specific bytes count limit
Once the tx queue size is reached the following calls to send: will block until data is sent
over the websocket connection.
Setting this to 0 does not limit the tx queue size (default)
*/
@property (nonatomic, assign) NSUInteger maxTxQueueSize;

///--------------------------------------
#pragma mark - Constructors
///--------------------------------------
Expand Down
49 changes: 49 additions & 0 deletions SocketRocket/SRWebSocket.m
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ @implementation SRWebSocket {

// proxy support
SRProxyConnect *_proxyConnect;

NSCondition *__txQueueSizeCond;
}

@synthesize readyState = _readyState;
Expand Down Expand Up @@ -179,6 +181,8 @@ - (instancetype)initWithURLRequest:(NSURLRequest *)request protocols:(NSArray<NS
_consumerPool = [[SRIOConsumerPool alloc] init];

_scheduledRunloops = [[NSMutableSet alloc] init];
_maxTxQueueSize = 0;
__txQueueSizeCond = [[NSCondition alloc] init];

return self;
}
Expand Down Expand Up @@ -250,6 +254,11 @@ - (void)assertOnWorkQueue;
assert(dispatch_get_specific((__bridge void *)self) == (__bridge void *)_workQueue);
}

- (void)assertNotOnWorkQueue;
{
assert(dispatch_get_specific((__bridge void *)self) != (__bridge void *)_workQueue);
}

///--------------------------------------
#pragma mark - Dealloc
///--------------------------------------
Expand Down Expand Up @@ -606,6 +615,39 @@ - (void)send:(nullable id)message
}
}

- (void)setMaxTxQueueSize:(NSUInteger)maxTxQueueSize
{
if (maxTxQueueSize > 0)
assert(maxTxQueueSize > SRDefaultBufferSize());
_maxTxQueueSize = maxTxQueueSize;
[__txQueueSizeCond lock];
[__txQueueSizeCond signal];
[__txQueueSizeCond unlock];
}

- (void)checkTxQueue
{
if (!_maxTxQueueSize) {
/* no limit on the tx queue size */
return;
}

[self assertNotOnWorkQueue];

/* check internal queue size */
[__txQueueSizeCond lock];
while (true) {
/* check queue size */
NSUInteger txQueueSize = dispatch_data_get_size(_outputBuffer);
if (txQueueSize < _maxTxQueueSize)
break;

/* need to block until data is effectively sent over ws connection */
[__txQueueSizeCond wait];
}
[__txQueueSizeCond unlock];
}

- (BOOL)sendString:(NSString *)string error:(NSError **)error
{
if (self.readyState != SR_OPEN) {
Expand All @@ -617,6 +659,8 @@ - (BOOL)sendString:(NSString *)string error:(NSError **)error
return NO;
}

[self checkTxQueue];

string = [string copy];
dispatch_async(_workQueue, ^{
[self _sendFrameWithOpcode:SROpCodeTextFrame data:[string dataUsingEncoding:NSUTF8StringEncoding]];
Expand All @@ -641,6 +685,8 @@ - (BOOL)sendDataNoCopy:(nullable NSData *)data error:(NSError **)error
return NO;
}

[self checkTxQueue];

dispatch_async(_workQueue, ^{
if (data) {
[self _sendFrameWithOpcode:SROpCodeBinaryFrame data:data];
Expand Down Expand Up @@ -1074,6 +1120,9 @@ - (void)_pumpWriting;
if (_outputBufferOffset > SRDefaultBufferSize() && _outputBufferOffset > dataLength / 2) {
_outputBuffer = dispatch_data_create_subrange(_outputBuffer, _outputBufferOffset, dataLength - _outputBufferOffset);
_outputBufferOffset = 0;
[__txQueueSizeCond lock];
[__txQueueSizeCond signal];
[__txQueueSizeCond unlock];
}
}

Expand Down