Skip to content

Commit

Permalink
虽然回调线程在主队列执行,不存在多线程问题。但是在短线重连的情况下,存在多线程执行连接操作,导致异常出现。加入rhsocket队列,将连接…
Browse files Browse the repository at this point in the history
…、断开、数据发送、数据接收 放入串行队列操作。
  • Loading branch information
zhuruhong committed Aug 5, 2016
1 parent 774b3f1 commit f53bd09
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 29 deletions.
2 changes: 2 additions & 0 deletions RHSocketKit/Core/Channel/RHSocketChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

@protocol RHSocketChannelDelegate <NSObject>

@required

- (void)channelOpened:(RHSocketChannel *)channel host:(NSString *)host port:(int)port;
- (void)channelClosed:(RHSocketChannel *)channel error:(NSError *)error;
- (void)channel:(RHSocketChannel *)channel received:(id<RHDownstreamPacket>)packet;
Expand Down
25 changes: 14 additions & 11 deletions RHSocketKit/Core/Channel/RHSocketChannel.m
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,16 @@ - (instancetype)initWithHost:(NSString *)host port:(int)port

- (void)openConnection
{
@synchronized(self) {
[self closeConnection];
[self connectWithHost:self.host port:self.port];
}//@synchronized
if ([self isConnected]) {
return;
}
[self closeConnection];
[self connectWithHost:self.host port:self.port];
}

- (void)closeConnection
{
@synchronized(self) {
[self disconnect];
}//synchronized
[self disconnect];
}

- (void)asyncSendPacket:(id<RHUpstreamPacket>)packet
Expand Down Expand Up @@ -91,12 +90,16 @@ - (void)writeInt64:(int64_t)param

- (void)didDisconnect:(id<RHSocketConnectionDelegate>)con withError:(NSError *)err
{
[self.delegate channelClosed:self error:err];
dispatch_async(dispatch_get_main_queue(), ^{
[self.delegate channelClosed:self error:err];
});
}

- (void)didConnect:(id<RHSocketConnectionDelegate>)con toHost:(NSString *)host port:(uint16_t)port
{
[self.delegate channelOpened:self host:host port:port];
dispatch_async(dispatch_get_main_queue(), ^{
[self.delegate channelOpened:self host:host port:port];
});
}

- (void)didRead:(id<RHSocketConnectionDelegate>)con withData:(NSData *)data tag:(long)tag
Expand Down Expand Up @@ -132,9 +135,9 @@ - (void)didRead:(id<RHSocketConnectionDelegate>)con withData:(NSData *)data tag:

- (void)didReceived:(id<RHSocketConnectionDelegate>)con withPacket:(id<RHDownstreamPacket>)packet
{
if ([self.delegate respondsToSelector:@selector(channel:received:)]) {
dispatch_async(dispatch_get_main_queue(), ^{
[self.delegate channel:self received:packet];
}
});
}

#pragma mark - RHSocketEncoderOutputProtocol
Expand Down
6 changes: 6 additions & 0 deletions RHSocketKit/Core/Channel/RHSocketConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#import "RHSocketConnectionDelegate.h"
#import "RHSocketConfig.h"

extern NSString * const RHSocketQueueSpecific;

/**
* socket网络连接对象,只负责socket网络的连接通信,内部使用GCDAsyncSocket。
* 1-只公开GCDAsyncSocket的主要方法,增加使用的便捷性。
Expand All @@ -25,4 +27,8 @@

- (instancetype)initWithHost:(NSString *)host port:(int)port;

#pragma mark - queue

- (void)dispatchOnSocketQueue:(dispatch_block_t)block async:(BOOL)async;

@end
89 changes: 71 additions & 18 deletions RHSocketKit/Core/Channel/RHSocketConnection.m
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@
#import "RHSocketConnection.h"
#import "GCDAsyncSocket.h"

NSString * const RHSocketQueueSpecific = @"com.zrh.rhsocket.RHSocketQueueSpecific";

@interface RHSocketConnection () <GCDAsyncSocketDelegate>

@property (nonatomic, strong, readonly) GCDAsyncSocket *asyncSocket;
@property (nonatomic, strong) dispatch_queue_t socketQueue;
@property (nonatomic, assign) void *IsOnSocketQueueOrTargetQueueKey;

@property (nonatomic, strong) GCDAsyncSocket *asyncSocket;

@end

Expand All @@ -20,52 +25,96 @@ @implementation RHSocketConnection
- (instancetype)initWithHost:(NSString *)host port:(int)port
{
if (self = [super init]) {
//queue
_socketQueue = dispatch_queue_create([RHSocketQueueSpecific UTF8String], DISPATCH_QUEUE_SERIAL);

_IsOnSocketQueueOrTargetQueueKey = &_IsOnSocketQueueOrTargetQueueKey;
void *nonNullUnusedPointer = (__bridge void *)self;
dispatch_queue_set_specific(_socketQueue, _IsOnSocketQueueOrTargetQueueKey, nonNullUnusedPointer, NULL);

//
_host = host;
_port = port;
}
return self;
}

#pragma mark - RHSocketConnectionDelegate
#pragma mark - queue

- (BOOL)isOnSocketQueue
{
return dispatch_get_specific(_IsOnSocketQueueOrTargetQueueKey) != NULL;
}

- (void)dispatchOnSocketQueue:(dispatch_block_t)block async:(BOOL)async
{
if ([self isOnSocketQueue]) {
@autoreleasepool {
block();
}
return;
}

if (async) {
dispatch_async([self socketQueue], ^{
@autoreleasepool {
block();
}
});
return;
}

dispatch_sync([self socketQueue], ^{
@autoreleasepool {
block();
}
});
}

#pragma mark - RHSocketConnection protocol

- (void)connectWithHost:(NSString *)hostName port:(int)port
{
@synchronized (self) {
[self dispatchOnSocketQueue:^{
[self disconnect];

if (_useSecureConnection && (nil == _tlsSettings)) {
if (self.useSecureConnection && (nil == self.tlsSettings)) {
// Configure SSL/TLS settings
NSMutableDictionary *settings = [NSMutableDictionary dictionaryWithCapacity:3];
settings[(NSString *)kCFStreamSSLPeerName] = hostName;
_tlsSettings= settings;
self.tlsSettings= settings;
}

_asyncSocket = [[GCDAsyncSocket alloc] initWithDelegate:self delegateQueue:dispatch_get_main_queue()];
[_asyncSocket setIPv4PreferredOverIPv6:NO];
self.asyncSocket = [[GCDAsyncSocket alloc] initWithDelegate:self delegateQueue:_socketQueue];
[self.asyncSocket setIPv4PreferredOverIPv6:NO];

NSError *err = nil;
[_asyncSocket connectToHost:hostName onPort:port error:&err];
[self.asyncSocket connectToHost:hostName onPort:port error:&err];
if (err) {
[self didDisconnect:self withError:err];
}
}//@synchronized
} async:YES];
}

- (void)disconnect
{
@synchronized (self) {
if (nil == _asyncSocket) {
[self dispatchOnSocketQueue:^{
if (nil == self.asyncSocket) {
return;
}
[_asyncSocket disconnect];
_asyncSocket.delegate = nil;
_asyncSocket = nil;
}//@synchronized
[self.asyncSocket disconnect];
self.asyncSocket.delegate = nil;
self.asyncSocket = nil;
} async:YES];
}

- (BOOL)isConnected
{
return [_asyncSocket isConnected];
__block BOOL result = NO;
[self dispatchOnSocketQueue:^{
result = [self.asyncSocket isConnected];
} async:NO];
return result;
}

- (void)didDisconnect:(id<RHSocketConnectionDelegate>)con withError:(NSError *)err
Expand All @@ -87,12 +136,16 @@ - (void)didRead:(id<RHSocketConnectionDelegate>)con withData:(NSData *)data tag:

- (void)readDataWithTimeout:(NSTimeInterval)timeout tag:(long)tag
{
[self.asyncSocket readDataWithTimeout:timeout tag:tag];
[self dispatchOnSocketQueue:^{
[self.asyncSocket readDataWithTimeout:timeout tag:tag];
} async:YES];
}

- (void)writeData:(NSData *)data timeout:(NSTimeInterval)timeout tag:(long)tag
{
[self.asyncSocket writeData:data withTimeout:timeout tag:tag];
[self dispatchOnSocketQueue:^{
[self.asyncSocket writeData:data withTimeout:timeout tag:tag];
} async:YES];
}

#pragma mark - GCDAsyncSocketDelegate
Expand Down

0 comments on commit f53bd09

Please sign in to comment.