Skip to content

Commit

Permalink
Fixes #776 and #796 (#798)
Browse files Browse the repository at this point in the history
* Initial Work.  Some progress, but the symptoms might be unavoidable

* More refactoring to remove cases where multiple HttpClients are being used
Fix .NET 3.5 bug where replication connections are not being released and causing replication to quickly freeze
Made NetworkReachabilityManager more lightweight (just check for TCP connectivity to given host and port)

* Fixes #796
Closes #776
  • Loading branch information
borrrden authored Jan 7, 2017
1 parent d550c7d commit fa8b121
Show file tree
Hide file tree
Showing 19 changed files with 164 additions and 133 deletions.
2 changes: 2 additions & 0 deletions src/Couchbase.Lite.Shared/Manager/IJsonSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ public interface IJsonSerializer : IDisposable
/// <param name="json">The stream containing JSON data</param>
void StartIncrementalParse(Stream json);

void StopIncrementalParse();

/// <summary>
/// Reads the next token from a JSON stream. Note that an incremental parse
/// must be started first.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Net.Http;
using System.Net.NetworkInformation;
using Couchbase.Lite.Internal;

namespace Couchbase.Lite
{
Expand All @@ -25,7 +27,7 @@ internal interface INetworkReachabilityManager
/// <returns><c>true</c> if this instance can reach the specified endpoint; otherwise, <c>false</c>.</returns>
/// <param name="remoteUri">The endpoint to test</param>
/// <param name="timeout">The amount of time to wait for a response</param>
bool CanReach(string remoteUri, TimeSpan timeout);
bool CanReach(RemoteSession session, string remoteUri, TimeSpan timeout);
}

#region Enum
Expand Down
31 changes: 5 additions & 26 deletions src/Couchbase.Lite.Shared/Manager/NetworkReachabilityManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Net.Http;
using System.Text;
using Couchbase.Lite.Support;
using Couchbase.Lite.Internal;

#if __ANDROID__
using Android.App;
Expand Down Expand Up @@ -46,37 +47,15 @@ internal sealed class NetworkReachabilityManager : INetworkReachabilityManager

public Exception LastError { get; private set; }

public bool CanReach(string remoteUri, TimeSpan timeout)
public bool CanReach(RemoteSession session, string remoteUri, TimeSpan timeout)
{
CouchbaseLiteHttpClientFactory.SetupSslCallback();
HttpWebRequest request;

var uri = new Uri (remoteUri);
var credentials = uri.UserInfo;
if (!String.IsNullOrEmpty(credentials)) {
remoteUri = string.Format ("{0}://{1}{2}", uri.Scheme, uri.Authority, uri.PathAndQuery);
request = WebRequest.CreateHttp (remoteUri);
request.Headers.Add ("Authorization", "Basic " + Convert.ToBase64String (Encoding.UTF8.GetBytes (credentials)));
request.PreAuthenticate = true;
}
else {
request = WebRequest.CreateHttp (remoteUri);
}

request.AllowWriteStreamBuffering = true;
request.Timeout = (int)timeout.TotalMilliseconds;
request.Method = "HEAD";

var uri = new Uri(remoteUri);
try {
using(var response = (HttpWebResponse)request.GetResponse()) {
return true; //We only care that the server responded
using(var c = new TcpClient(uri.Host, uri.Port)) {
return true;
}
} catch(Exception e) {
var we = e as WebException;
if(we != null && we.Status == WebExceptionStatus.ProtocolError) {
return true; //Getting an HTTP error technically means we can connect
}

Log.To.Sync.I(TAG, "Didn't get successful connection to {0}", remoteUri);
Log.To.Sync.V(TAG, " Cause: ", e);
LastError = e;
Expand Down
13 changes: 13 additions & 0 deletions src/Couchbase.Lite.Shared/Manager/NewtonsoftJsonSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Couchbase.Lite.Util;
using System.Net.Sockets;

namespace Couchbase.Lite
{
Expand Down Expand Up @@ -132,6 +133,12 @@ public void StartIncrementalParse(Stream json)
_textReader = new JsonTextReader(new StreamReader(json));
}

public void StopIncrementalParse()
{
_textReader?.Close();
Misc.SafeDispose(ref _textReader);
}

public bool Read()
{
try {
Expand All @@ -142,6 +149,12 @@ public bool Read()
"Error reading from streaming parser");
}

var se = e.InnerException as SocketException;
if(se?.SocketErrorCode == SocketError.Interrupted) {
Log.To.NoDomain.I(Tag, "Streaming read cancelled, returning false for Read()...");
return false;
}

throw Misc.CreateExceptionAndLog(Log.To.NoDomain, e, TAG, "Error reading from streaming parser");
}
}
Expand Down
15 changes: 8 additions & 7 deletions src/Couchbase.Lite.Shared/Replication.cs
Original file line number Diff line number Diff line change
Expand Up @@ -604,14 +604,14 @@ internal IHttpClientFactory ClientFactory {
/// <summary>
/// Gets or sets the headers that should be used when making HTTP requests
/// </summary>
[Obsolete("Use Headers")]
[Obsolete("Use Headers, this method will throw an exception")]
protected internal IDictionary<string, object> RequestHeaders
{
get {
return _remoteSession.RequestHeaders;
throw new NotSupportedException();
}
set {
_remoteSession.RequestHeaders = value ?? new Dictionary<string, object>();
set {
throw new NotSupportedException();
}
}

Expand Down Expand Up @@ -1090,7 +1090,7 @@ protected virtual void StartInternal()
var reachabilityManager = LocalDatabase.Manager.NetworkReachabilityManager;
reachabilityManager.StatusChanged += NetworkStatusChanged;

if (!LocalDatabase.Manager.NetworkReachabilityManager.CanReach(RemoteUrl.AbsoluteUri, ReplicationOptions.RequestTimeout)) {
if (!LocalDatabase.Manager.NetworkReachabilityManager.CanReach(_remoteSession, RemoteUrl.AbsoluteUri, ReplicationOptions.RequestTimeout)) {
Log.To.Sync.I(Tag, "Remote endpoint is not reachable, going offline...");
LastError = LocalDatabase.Manager.NetworkReachabilityManager.LastError;
FireTrigger(ReplicationTrigger.GoOffline);
Expand Down Expand Up @@ -1271,6 +1271,7 @@ internal virtual void Stopping()
if(!LocalDatabase.IsOpen || remoteSession.Disposed) {
// This logic has already been handled by DatabaseClosing(), or
// this replication never started in the first place (client still null)
remoteSession?.Dispose();
return;
}

Expand Down Expand Up @@ -1721,7 +1722,7 @@ private void RefreshRemoteCheckpointDoc()
lastSequenceChanged = true;
SaveLastSequence(null);
}
});
}, true);
}

private void SetupRevisionBodyTransformationFunction()
Expand Down Expand Up @@ -1777,7 +1778,7 @@ private void InitializeStateMachine()

_stateMachine.Configure(ReplicationState.Running).Permit(ReplicationTrigger.GoOffline, ReplicationState.Offline);
_stateMachine.Configure(ReplicationState.Offline).PermitIf(ReplicationTrigger.GoOnline, ReplicationState.Running,
() => LocalDatabase.Manager.NetworkReachabilityManager.CanReach(RemoteUrl.AbsoluteUri, ReplicationOptions.RequestTimeout));
() => LocalDatabase.Manager.NetworkReachabilityManager.CanReach(_remoteSession, RemoteUrl.AbsoluteUri, ReplicationOptions.RequestTimeout));

_stateMachine.Configure(ReplicationState.Stopping).Permit(ReplicationTrigger.StopImmediate, ReplicationState.Stopped);
_stateMachine.Configure(ReplicationState.Stopped).Permit(ReplicationTrigger.Start, ReplicationState.Running);
Expand Down
25 changes: 8 additions & 17 deletions src/Couchbase.Lite.Shared/Replication/BulkDownloader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ namespace Couchbase.Lite.Replicator
internal sealed class BulkDownloaderOptions : ConstructorOptions
{
[RequiredProperty]
public IHttpClientFactory ClientFactory { get; set; }
public RemoteSession Session { get; set; }

[RequiredProperty]
public Uri DatabaseUri { get; set; }
Expand All @@ -79,7 +79,7 @@ internal sealed class BulkDownloaderOptions : ConstructorOptions
public CookieStore CookieStore { get; set; }
}

internal class BulkDownloader : IMultipartReaderDelegate, IDisposable
internal class BulkDownloader : IMultipartReaderDelegate
{
internal static readonly string Tag = typeof(BulkDownloader).Name;

Expand All @@ -89,7 +89,7 @@ internal class BulkDownloader : IMultipartReaderDelegate, IDisposable
private CancellationTokenSource _tokenSource;
private MultipartDocumentReader _docReader;
private Database _db;
private readonly CouchbaseLiteHttpClient _httpClient;
private readonly RemoteSession _session;
private readonly object _body;

private int _docCount;
Expand All @@ -116,7 +116,7 @@ public BulkDownloader(BulkDownloaderOptions options)
options.Validate();
_bulkGetUri = new Uri(AppendRelativeURLString(options.DatabaseUri, "/_bulk_get?revs=true&attachments=true"));
_db = options.Database;
_httpClient = options.ClientFactory.GetHttpClient(options.CookieStore, options.RetryStrategy);

_requestHeaders = options.RequestHeaders;
_tokenSource = options.TokenSource ?? new CancellationTokenSource();
_body = CreatePostBody(options.Revisions, _db);
Expand All @@ -138,13 +138,9 @@ public void Start()

SetBody(requestMessage);

ExecuteRequest(_httpClient, requestMessage).ContinueWith(t =>
ExecuteRequest(requestMessage).ContinueWith(t =>
{
Log.To.Sync.V(Tag, "RemoteRequest run() finished, url: {0}", _bulkGetUri);
if(_httpClient != null) {
_httpClient.Dispose();
}

requestMessage.Dispose();
});
}
Expand Down Expand Up @@ -180,7 +176,7 @@ private HttpRequestMessage CreateConcreteRequest()
return newRequest;
}

private Task ExecuteRequest(CouchbaseLiteHttpClient httpClient, HttpRequestMessage request)
private Task ExecuteRequest(HttpRequestMessage request)
{
object fullBody = null;
Exception error = null;
Expand All @@ -197,8 +193,8 @@ private Task ExecuteRequest(CouchbaseLiteHttpClient httpClient, HttpRequestMessa

Log.To.Sync.V(Tag, "Sending request: {0}", request);
var requestTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_tokenSource.Token);
httpClient.Authenticator = Authenticator;
return httpClient.SendAsync(request, requestTokenSource.Token).ContinueWith(t =>

return _session.SendAsyncRequest(request, HttpCompletionOption.ResponseContentRead, requestTokenSource.Token).ContinueWith(t =>
{
requestTokenSource.Dispose();
try {
Expand Down Expand Up @@ -404,10 +400,5 @@ public override string ToString()
{
return String.Format("BulkDownloader ({0})", new SecureLogUri(_bulkGetUri));
}

public void Dispose()
{
_httpClient.Dispose();
}
}
}
19 changes: 14 additions & 5 deletions src/Couchbase.Lite.Shared/Replication/CouchbaseLiteHttpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,19 @@ public Task<HttpResponseMessage> SendAsync(HttpRequestMessage message, HttpCompl


(Authenticator as ICustomHeadersAuthorizer)?.AuthorizeRequest(message);
return httpClient.SendAsync(message, option, token);
#if !NET_3_5
return httpClient.SendAsync(message, option, token)
#if NET_3_5
.ContinueWith(t =>
{
Interlocked.Decrement(ref _connectionCount);
return t.Result;
})
#endif
;
#if !NET_3_5
})?.Unwrap()?.ContinueWith(t =>
{
message.Dispose();
_sendSemaphore?.Release();
if(t.IsFaulted) {
var e = t.Exception;
Expand All @@ -116,16 +125,16 @@ public Task<HttpResponseMessage> SendAsync(HttpRequestMessage message, HttpCompl

return t.Result;
});
#endif
#endif
}

public void Dispose()
{
_httpClient.Dispose();
_authHandler.Dispose();
#if !NET_3_5
#if !NET_3_5
Misc.SafeDispose(ref _sendSemaphore);
#endif
#endif
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ internal class CouchbaseLiteHttpClientFactory : IHttpClientFactory
public CouchbaseLiteHttpClientFactory()
{
SocketTimeout = ReplicationOptions.DefaultSocketTimeout;
Headers = new ConcurrentDictionary<string, string>();
}

internal static void SetupSslCallback()
Expand Down Expand Up @@ -196,13 +195,6 @@ public CouchbaseLiteHttpClient GetHttpClient(CookieStore cookieStore, IRetryStra
client.DefaultRequestHeaders.TryAddWithoutValidation("User-Agent", String.Format("CouchbaseLite/{0} ({1})", Replication.SyncProtocolVersion, Manager.VersionString));
client.DefaultRequestHeaders.Connection.Add("keep-alive");

foreach(var header in Headers)
{
var success = client.DefaultRequestHeaders.TryAddWithoutValidation(header.Key, header.Value);
if (!success)
Log.To.Sync.W(Tag, String.Format("Unabled to add header to request: {0}: {1}", header.Key, header.Value));
}

var transientHandler = authHandler as TransientErrorRetryHandler;
var defaultAuthHandler = default(DefaultAuthHandler);
if (transientHandler != null) {
Expand All @@ -213,8 +205,6 @@ public CouchbaseLiteHttpClient GetHttpClient(CookieStore cookieStore, IRetryStra

return new CouchbaseLiteHttpClient(client, defaultAuthHandler);
}

public IDictionary<string, string> Headers { get; set; }

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ namespace Couchbase.Lite.Internal
{
internal interface IChangeTrackerClient
{
CouchbaseLiteHttpClient GetHttpClient();
CookieContainer GetCookieStore();
void ChangeTrackerReceivedChange(IDictionary<string, object> change);
void ChangeTrackerStopped(ChangeTracker tracker, ErrorResolution resolution);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ private bool ReceivedPollResponse(IJsonSerializer jsonReader, CancellationToken
{
bool started = false;
var start = DateTime.Now;
token.Register(() =>
{
jsonReader.StopIncrementalParse();
});

try {
while (jsonReader.Read() && !token.IsCancellationRequested) {
_pauseWait.Wait();
Expand Down
19 changes: 8 additions & 11 deletions src/Couchbase.Lite.Shared/Replication/Puller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ private void PullBulkRevisions(IList<RevisionInternal> bulkRevs)
Log.To.SyncPerf.I(TAG, "{0} bulk-getting {1} remote revisions...", ReplicatorID, nRevs);
var remainingRevs = new List<RevisionInternal>(bulkRevs);
BulkDownloader dl = new BulkDownloader(new BulkDownloaderOptions {
ClientFactory = ClientFactory,
Session = _remoteSession,
DatabaseUri = RemoteUrl,
Revisions = bulkRevs,
Database = LocalDatabase,
Expand Down Expand Up @@ -393,7 +393,6 @@ private void PullBulkRevisions(IList<RevisionInternal> bulkRevs)

SafeAddToCompletedChangesCount(remainingRevs.Count);
LastSequence = _pendingSequences.GetCheckpointedValue();
Misc.SafeDispose(ref dl);

PullRemoteRevisions();
};
Expand Down Expand Up @@ -740,10 +739,13 @@ private void InsertDownloads(IList<RevisionInternal> downloads)

public override IEnumerable<string> DocIds { get; set; }

public override IDictionary<string, string> Headers
{
get { return ClientFactory.Headers; }
set { ClientFactory.Headers = value; }
public override IDictionary<string, string> Headers {
get {
return _remoteSession.RequestHeaders;
}
set {
_remoteSession.RequestHeaders = value;
}
}

protected override void Retry()
Expand Down Expand Up @@ -1016,11 +1018,6 @@ public void ChangeTrackerStopped(ChangeTracker tracker, ErrorResolution resoluti
WorkExecutor.StartNew(() => ProcessChangeTrackerStopped(tracker, resolution));
}

public CouchbaseLiteHttpClient GetHttpClient()
{
return ClientFactory.GetHttpClient(CookieContainer, ReplicationOptions.RetryStrategy);
}

public CookieContainer GetCookieStore()
{
return CookieContainer;
Expand Down
Loading

0 comments on commit fa8b121

Please sign in to comment.