Skip to content

Commit

Permalink
Heavy memory optimisations
Browse files Browse the repository at this point in the history
Fixes #654
Fixes #655
  • Loading branch information
borrrden committed Jun 4, 2016
1 parent 52363b6 commit 6f4c226
Show file tree
Hide file tree
Showing 13 changed files with 1,657 additions and 70 deletions.
2 changes: 2 additions & 0 deletions src/Couchbase.Lite.Shared/Couchbase.Lite.Shared.projitems
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@
<Compile Include="$(MSBuildThisFileDirectory)Util\ExceptionEnumerator.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Util\TraceLogger.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Util\TimeSeries.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Util\RecyclableMemoryStreamManager.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Util\RecyclableMemoryStream.cs" />
</ItemGroup>
<ItemGroup>
<Folder Include="$(MSBuildThisFileDirectory)View\" />
Expand Down
5 changes: 4 additions & 1 deletion src/Couchbase.Lite.Shared/Documents/AttachmentInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

using Couchbase.Lite;
using Couchbase.Lite.Util;
using Microsoft.IO;

namespace Couchbase.Lite.Internal
{
Expand Down Expand Up @@ -140,7 +141,9 @@ public Stream ContentStream {
return Database.Attachments.BlobStreamForKey(_blobKey);
}

return new MemoryStream(Content.ToArray());
var data = Content.ToArray();
return RecyclableMemoryStreamManager.SharedInstance.GetStream("AttachmentInternal",
data, 0, data.Length);
}
}

Expand Down
27 changes: 19 additions & 8 deletions src/Couchbase.Lite.Shared/Replication/ChunkedChanges.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
using ICSharpCode.SharpZipLib.GZip;
using ICSharpCode.SharpZipLib.Zip.Compression;
using System.Threading;
using System.Text;
using Microsoft.IO;

namespace Couchbase.Lite.Internal
{
Expand All @@ -50,7 +52,7 @@ internal sealed class ChunkedChanges : IDisposable
// filled since we need to be informed of the newest data immediately and we don't
// know when the next piece is going to come. However, I don't want to read byte
// after byte one at a time because it feels wrong.
private const int BufferSize = 64;
private const int BufferSize = 1024;

#endregion

Expand All @@ -60,6 +62,7 @@ internal sealed class ChunkedChanges : IDisposable
private readonly Inflater _inflater; // Due to internal buffering, GZipStream cannot be used
private bool _disposed;
private bool _readHeader;
private readonly ManualResetEventSlim _pauseWait;

public event TypedEventHandler<ChunkedChanges, IDictionary<string, object>> ChunkFound;

Expand All @@ -69,32 +72,36 @@ internal sealed class ChunkedChanges : IDisposable

#region Constructors

public ChunkedChanges(bool compressed, CancellationToken token)
public ChunkedChanges(bool compressed, CancellationToken token, ManualResetEventSlim pauseWait)
{
_innerStream = new ChunkStream();
if (compressed) {
_inflater = new Inflater(true);
}

token.Register(Dispose);
_pauseWait = pauseWait;
Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
}

#endregion

#region Public Methods

public void AddData(IEnumerable<byte> data)
public void AddData(Stream data)
{
if (_disposed) {
Log.To.ChangeTracker.E(Tag, "AddData called on disposed object, throwing...");
throw new ObjectDisposedException("ChunkedGZipChanges");
}

var realized = data.ToArray();
using (var stream = new MemoryStream(realized)) {
ReadHeader(stream);
stream.CopyTo(_innerStream);
ReadHeader(data);
var buffer = RecyclableMemoryStreamManager.SharedInstance.GetBlock();
try {
var bytesRead = data.Read(buffer, 0, buffer.Length);
_innerStream.Write(buffer, 0, bytesRead);
} finally {
RecyclableMemoryStreamManager.SharedInstance.ReturnBlocks(new byte[][] { buffer }, "ChunkedChanges");
}
}

Expand All @@ -121,6 +128,7 @@ private void Process()
if (--nestedCount == 0 && parseBuffer.Count > 0) {
// We have a complete JSON object or array ready for processing
var changes = Manager.GetObjectMapper().ReadValue<IList<object>>(parseBuffer);
Log.To.Sync.I(Tag, "Parse found {0} changes", changes.Count);
foreach (var change in changes) {
if (ChunkFound != null) {
ChunkFound(this, change.AsDictionary<string, object>());
Expand All @@ -129,12 +137,15 @@ private void Process()

parseBuffer.Clear();
}
_pauseWait.Wait();
} else if (IsStart(parseBuffer.Last())) {
// Begin an embedded array
nestedCount++;
}
}

Log.To.Sync.I(Tag, "Parsed {0} (nested count: {1})", new LogString(unzipBuffer.Take(decodedBytes)),
nestedCount);
decodedBytes = Decode(null, 0, unzipBuffer, out exception);
}

Expand Down Expand Up @@ -189,7 +200,7 @@ private static bool IsStart(byte nextChar) {
}

// From SharpZipLib GZipInputStream, slightly modified
private void ReadHeader(MemoryStream stream)
private void ReadHeader(Stream stream)
{
if (_readHeader || _inflater == null) {
return;
Expand Down
9 changes: 6 additions & 3 deletions src/Couchbase.Lite.Shared/Replication/MultiStreamWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

using Couchbase.Lite.Util;
using System.Threading.Tasks;
using Microsoft.IO;

#if NET_3_5
using Rackspace.Threading;
Expand Down Expand Up @@ -229,7 +230,7 @@ public void Close()
public IEnumerable<byte> AllOutput()
{
_nextInputIndex = 0;
using (var ms = new MemoryStream()) {
using (var ms = RecyclableMemoryStreamManager.SharedInstance.GetStream()) {
if (!WriteAsync(ms).Wait(TimeSpan.FromSeconds(30))) {
Log.To.Database.W(TAG, "{0} unable to get output!", this);
return null;
Expand Down Expand Up @@ -287,8 +288,10 @@ private void StartWriting(ManualResetEventSlim doneSignal)
private Stream StreamForInput(object input)
{
var data = input as IEnumerable<byte>;
if (data != null) {
return new MemoryStream(data.ToArray());
var realized = data?.ToArray();
if (realized != null) {
return new RecyclableMemoryStreamManager.SharedInstance.GetStream("MultiStreamWriter",
realized, 0, realized.Length);
}

var fileUri = input as Uri;
Expand Down
36 changes: 30 additions & 6 deletions src/Couchbase.Lite.Shared/Replication/WebSocketChangeTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,22 @@
using WebSocketSharp;
using Couchbase.Lite.Auth;
using System.Collections.Generic;
using Microsoft.IO;

namespace Couchbase.Lite.Internal
{
internal enum ChangeTrackerMessageType : byte
{
Unknown,
Plaintext,
GZip,
EOF
}

// Concrete class for receiving changes over web sockets
internal class WebSocketChangeTracker : ChangeTracker
{

#region Constants

private static readonly string Tag = typeof(WebSocketChangeTracker).Name;
Expand Down Expand Up @@ -150,11 +158,27 @@ private void OnReceive(object sender, MessageEventArgs args)
return;
}

var responseStream = new MemoryStream();
responseStream.WriteByte(args.IsText ? (byte)1 : (byte)2);
responseStream.Write(args.RawData, 0, args.RawData.Length);
responseStream.Seek(0, SeekOrigin.Begin);
_responseLogic.ProcessResponseStream(responseStream, _cts.Token);
var code = ChangeTrackerMessageType.Unknown;
if(args.IsText) {
if(args.RawData.Length == 2 && args.RawData[0] == '\r' && args.RawData[1] == '\n') {
code = ChangeTrackerMessageType.EOF;
} else {
code = ChangeTrackerMessageType.Plaintext;
}
} else {
code = ChangeTrackerMessageType.GZip;
}

Log.To.Sync.I(Tag, "Preparing to process {0}", args.Data);
var responseStream = RecyclableMemoryStreamManager.SharedInstance.GetStream("WebSocketChangeTracker", args.RawData.Length + 1);
try {
responseStream.WriteByte((byte)code);
responseStream.Write(args.RawData, 0, args.RawData.Length);
responseStream.Seek(0, SeekOrigin.Begin);
_responseLogic.ProcessResponseStream(responseStream, _cts.Token);
} finally {
responseStream.Dispose();
}
} catch(Exception e) {
Log.To.ChangeTracker.E(Tag, String.Format("{0} is not parseable", GetLogString(args)), e);
}
Expand Down
52 changes: 21 additions & 31 deletions src/Couchbase.Lite.Shared/Replication/WebSocketLogic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
using System.IO;
using System.Threading;
using System.Collections.Generic;
using Couchbase.Lite.Util;

namespace Couchbase.Lite.Internal
{
internal sealed class WebSocketLogic : IChangeTrackerResponseLogic
{
private static readonly string Tag = typeof(WebSocketLogic).Name;
private ManualResetEventSlim _pauseWait = new ManualResetEventSlim(true);
private bool _caughtUp;
private ChunkedChanges _changeProcessor;
Expand All @@ -36,37 +38,14 @@ internal sealed class WebSocketLogic : IChangeTrackerResponseLogic
public Action<IDictionary<string, object>> OnChangeFound { get; set; }
public Action<Exception> OnFinished { get; set; }

private ChangeTrackerResponseCode ProcessGzippedStream(Stream stream, CancellationToken token)
private ChangeTrackerResponseCode ProcessResponseStream(Stream stream, CancellationToken token, bool compressed)
{
if (_changeProcessor == null) {
_changeProcessor = new ChunkedChanges(true, token);
_changeProcessor = new ChunkedChanges(compressed, token, _pauseWait);
SetupChangeProcessorCallback();
}

_changeProcessor.AddData(stream.ReadAllBytes());
return ChangeTrackerResponseCode.Normal;
}

private ChangeTrackerResponseCode ProcessRegularStream(Stream stream, CancellationToken token)
{
if (_changeProcessor == null) {
_changeProcessor = new ChunkedChanges(false, token);
SetupChangeProcessorCallback();
}

if(stream.Length == 3) { // +1 for the first type ID
if(!_caughtUp) {
_caughtUp = true;
if (OnCaughtUp != null) {
OnCaughtUp();
}

return ChangeTrackerResponseCode.Normal;
}
}

_changeProcessor.AddData(stream.ReadAllBytes());

_changeProcessor.AddData(stream);
return ChangeTrackerResponseCode.Normal;
}

Expand All @@ -87,12 +66,23 @@ private void SetupChangeProcessorCallback()

public ChangeTrackerResponseCode ProcessResponseStream(Stream stream, CancellationToken token)
{
var type = stream.ReadByte();
if(type == 1) {
return ProcessRegularStream(stream, token);
} else {
return ProcessGzippedStream(stream, token);
_pauseWait.Wait();
var type = (ChangeTrackerMessageType)stream.ReadByte();
if (type == ChangeTrackerMessageType.Plaintext || type == ChangeTrackerMessageType.GZip) {
return ProcessResponseStream(stream, token, type == ChangeTrackerMessageType.GZip);
} else if (type == ChangeTrackerMessageType.EOF) {
if(!_caughtUp) {
_caughtUp = true;
if (OnCaughtUp != null) {
OnCaughtUp();
}
}

return ChangeTrackerResponseCode.Normal;
}

Log.To.Sync.E(Tag, "Unknown response code {0}, returning failed status", type);
return ChangeTrackerResponseCode.Failed;
}

public void Pause()
Expand Down
6 changes: 5 additions & 1 deletion src/Couchbase.Lite.Shared/Revisions/UnsavedRevision.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
using Couchbase.Lite.Util;
using Couchbase.Lite.Revisions;
using Couchbase.Lite.Internal;
using Microsoft.IO;

namespace Couchbase.Lite
{
Expand Down Expand Up @@ -272,7 +273,10 @@ public SavedRevision Save()
/// <param name="content">The <see cref="Couchbase.Lite.Attachment"/> content.</param>
public void SetAttachment(string name, string contentType, IEnumerable<byte> content)
{
var attachment = new Attachment(new MemoryStream(content.ToArray()), contentType);
var data = content.ToArray();
var stream = RecyclableMemoryStreamManager.SharedInstance.GetStream("UnsavedRevision",
data, 0, data.Length);
var attachment = new Attachment(stream, contentType);
AddAttachment(attachment, name);

}
Expand Down
3 changes: 2 additions & 1 deletion src/Couchbase.Lite.Shared/Store/SymmetricKey.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
using System.Text;
using System.Collections.Generic;
using Couchbase.Lite.Util;
using Microsoft.IO;

#if NET_3_5
using Rackspace.Threading;
Expand Down Expand Up @@ -220,7 +221,7 @@ public byte[] EncryptData(byte[] data)

byte[] encrypted = null;
_cryptor.GenerateIV();
using(var ms = new MemoryStream())
using(var ms = RecyclableMemoryStreamManager.SharedInstance.GetStream())
using(var cs = new CryptoStream(ms, _cryptor.CreateEncryptor(), CryptoStreamMode.Write)) {
ms.Write(_cryptor.IV, 0, IV_SIZE);
cs.Write(data, 0, data.Length);
Expand Down
Loading

0 comments on commit 6f4c226

Please sign in to comment.