Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix crash for web socket in some race conditions #22439

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 133 additions & 96 deletions Libraries/WebSocket/RCTSRWebSocket.m
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ @implementation RCTSRWebSocket
int _closeCode;

BOOL _isPumping;

BOOL _cleanupScheduled;

NSMutableSet<NSArray *> *_scheduledRunloops;

Expand Down Expand Up @@ -324,17 +326,11 @@ - (void)dealloc

[_inputStream close];
[_outputStream close];

_workQueue = NULL;


if (_receivedHTTPHeaders) {
CFRelease(_receivedHTTPHeaders);
_receivedHTTPHeaders = NULL;
}

if (_delegateDispatchQueue) {
_delegateDispatchQueue = NULL;
}
}

#ifndef NDEBUG
Expand Down Expand Up @@ -626,11 +622,11 @@ - (void)_failWithError:(NSError *)error;
}];

self.readyState = RCTSR_CLOSED;
self->_selfRetain = nil;


RCTSRLog(@"Failing with error %@", error.localizedDescription);

[self _disconnect];
[self _scheduleCleanup];
}
});
}
Expand Down Expand Up @@ -1036,12 +1032,7 @@ - (void)_pumpWriting;
!_sentClose) {
_sentClose = YES;

[_outputStream close];
[_inputStream close];

for (NSArray *runLoop in [_scheduledRunloops copy]) {
[self unscheduleFromRunLoop:runLoop[0] forMode:runLoop[1]];
}
[self _scheduleCleanup];

if (!_failed) {
[self _performDelegateBlock:^{
Expand All @@ -1050,8 +1041,6 @@ - (void)_pumpWriting;
}
}];
}

_selfRetain = nil;
}
}

Expand Down Expand Up @@ -1345,94 +1334,142 @@ - (void)stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode;
}
}
}

assert(_workQueue != NULL);

// _workQueue cannot be NULL
if (!_workQueue) {
return;
}
__weak typeof(self) weakSelf = self;
dispatch_async(_workQueue, ^{
switch (eventCode) {
case NSStreamEventOpenCompleted: {
RCTSRLog(@"NSStreamEventOpenCompleted %@", aStream);
if (self.readyState >= RCTSR_CLOSING) {
return;
}
assert(self->_readBuffer);

if (self.readyState == RCTSR_CONNECTING && aStream == self->_inputStream) {
[self didConnect];
}
[self _pumpWriting];
[self _pumpScanner];
break;
}

case NSStreamEventErrorOccurred: {
RCTSRLog(@"NSStreamEventErrorOccurred %@ %@", aStream, [aStream.streamError copy]);
// TODO: specify error better!
[self _failWithError:aStream.streamError];
self->_readBufferOffset = 0;
self->_readBuffer.length = 0;
break;
typeof(self) strongSelf = weakSelf;
if (!strongSelf) {
return;
}
[strongSelf safeHandleEvent:eventCode stream:aStream];
});
}

- (void)safeHandleEvent:(NSStreamEvent)eventCode stream:(NSStream *)aStream
{
switch (eventCode) {
case NSStreamEventOpenCompleted: {
RCTSRLog(@"NSStreamEventOpenCompleted %@", aStream);
if (self.readyState >= RCTSR_CLOSING) {
return;
}

case NSStreamEventEndEncountered: {
[self _pumpScanner];
RCTSRLog(@"NSStreamEventEndEncountered %@", aStream);
if (aStream.streamError) {
[self _failWithError:aStream.streamError];
} else {
dispatch_async(self->_workQueue, ^{
if (self.readyState != RCTSR_CLOSED) {
self.readyState = RCTSR_CLOSED;
self->_selfRetain = nil;
}

if (!self->_sentClose && !self->_failed) {
self->_sentClose = YES;
// If we get closed in this state it's probably not clean because we should be sending this when we send messages
[self _performDelegateBlock:^{
if ([self.delegate respondsToSelector:@selector(webSocket:didCloseWithCode:reason:wasClean:)]) {
[self.delegate webSocket:self didCloseWithCode:RCTSRStatusCodeGoingAway reason:@"Stream end encountered" wasClean:NO];
}
}];
}
});
}

break;
assert(self->_readBuffer);

if (self.readyState == RCTSR_CONNECTING && aStream == self->_inputStream) {
[self didConnect];
}

case NSStreamEventHasBytesAvailable: {
RCTSRLog(@"NSStreamEventHasBytesAvailable %@", aStream);
const int bufferSize = 2048;
uint8_t buffer[bufferSize];

while (self->_inputStream.hasBytesAvailable) {
NSInteger bytes_read = [self->_inputStream read:buffer maxLength:bufferSize];

if (bytes_read > 0) {
[self->_readBuffer appendBytes:buffer length:bytes_read];
} else if (bytes_read < 0) {
[self _failWithError:self->_inputStream.streamError];
[self _pumpWriting];
[self _pumpScanner];
break;
}

case NSStreamEventErrorOccurred: {
RCTSRLog(@"NSStreamEventErrorOccurred %@ %@", aStream, [aStream.streamError copy]);
// TODO: specify error better!
[self _failWithError:aStream.streamError];
self->_readBufferOffset = 0;
self->_readBuffer.length = 0;
break;

}

case NSStreamEventEndEncountered: {
[self _pumpScanner];
RCTSRLog(@"NSStreamEventEndEncountered %@", aStream);
if (aStream.streamError) {
[self _failWithError:aStream.streamError];
} else {
dispatch_async(self->_workQueue, ^{
if (self.readyState != RCTSR_CLOSED) {
self.readyState = RCTSR_CLOSED;
[self _scheduleCleanup];
}

if (bytes_read != bufferSize) {
break;

if (!self->_sentClose && !self->_failed) {
self->_sentClose = YES;
// If we get closed in this state it's probably not clean because we should be sending this when we send messages
[self _performDelegateBlock:^{
if ([self.delegate respondsToSelector:@selector(webSocket:didCloseWithCode:reason:wasClean:)]) {
[self.delegate webSocket:self didCloseWithCode:RCTSRStatusCodeGoingAway reason:@"Stream end encountered" wasClean:NO];
}
}];
}
};
[self _pumpScanner];
break;
});
}

break;
}

case NSStreamEventHasBytesAvailable: {
RCTSRLog(@"NSStreamEventHasBytesAvailable %@", aStream);
const int bufferSize = 2048;
uint8_t buffer[bufferSize];

while (self->_inputStream.hasBytesAvailable) {
NSInteger bytes_read = [self->_inputStream read:buffer maxLength:bufferSize];

if (bytes_read > 0) {
[self->_readBuffer appendBytes:buffer length:bytes_read];
} else if (bytes_read < 0) {
[self _failWithError:self->_inputStream.streamError];
}

if (bytes_read != bufferSize) {
break;
}
};
[self _pumpScanner];
break;
}

case NSStreamEventHasSpaceAvailable: {
RCTSRLog(@"NSStreamEventHasSpaceAvailable %@", aStream);
[self _pumpWriting];
break;
}

default:
RCTSRLog(@"(default) %@", aStream);
break;
}
}

case NSStreamEventHasSpaceAvailable: {
RCTSRLog(@"NSStreamEventHasSpaceAvailable %@", aStream);
[self _pumpWriting];
break;
}
- (void)_scheduleCleanup
{
if (_cleanupScheduled) {
return;
}

_cleanupScheduled = YES;

// Cleanup NSStream's delegate in the same RunLoop used by the streams themselves:
// This way we'll prevent race conditions between handleEvent and SRWebsocket's dealloc
NSTimer *timer = [NSTimer timerWithTimeInterval:(0.0f) target:self selector:@selector(_cleanupSelfReference:) userInfo:nil repeats:NO];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why you can't use dispatch_async to schedule cleanup here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@janicduplessis Emm, we need to schedule cleanup on RCTSR_networkRunLoop, seems we have no dispatch_queue here. Maybe we can use performSelector, but we can keep use timer, because they are the same actually.

[[NSRunLoop RCTSR_networkRunLoop] addTimer:timer forMode:NSDefaultRunLoopMode];
}

default:
RCTSRLog(@"(default) %@", aStream);
break;
}
- (void)_cleanupSelfReference:(NSTimer *)timer
{
// Remove the streams, right now, from the networkRunLoop
[_inputStream close];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check the stream status here before trying to close it like we do here https://github.com/facebook/react-native/blob/master/Libraries/WebSocket/RCTSRWebSocket.m#L1034

Might be worth extracting to a shared method to safely close streams.

Copy link
Contributor Author

@zhongwuzw zhongwuzw Dec 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@janicduplessis 👍 We can extract it to a separate method.

[_outputStream close];

// Unschedule from RunLoop
for (NSArray *runLoop in [_scheduledRunloops copy]) {
[self unscheduleFromRunLoop:runLoop[0] forMode:runLoop[1]];
}

// Nuke NSStream's delegate
_inputStream.delegate = nil;
_outputStream.delegate = nil;

// Cleanup selfRetain in the same GCD queue as usual
dispatch_async(_workQueue, ^{
self->_selfRetain = nil;
});
}

Expand Down