Skip to content

Commit

Permalink
Closures are kept for children instead of global (#189)
Browse files Browse the repository at this point in the history
* disable channels when skipping things

* pass child closures to current.  Current closures out to parent.

* fix build

* adjust options

* use a dictionary pool and pool correctly

* add pools for data chunks

* format
  • Loading branch information
adamhathcock authored Dec 13, 2024
1 parent 722df50 commit defcee1
Show file tree
Hide file tree
Showing 15 changed files with 306 additions and 146 deletions.
28 changes: 27 additions & 1 deletion src/Speckle.Sdk.Dependencies/Pools.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,31 @@ public bool Return(Dictionary<string, object?> obj)
public static Pool<StringBuilder> StringBuilders { get; } =
new(new StringBuilderPooledObjectPolicy() { MaximumRetainedCapacity = 100 * 1024 * 1024 });

public static Pool<List<T>> CreateListPool<T>() => new(new DefaultPooledObjectPolicy<List<T>>());
private sealed class ObjectDictionaryPolicy<TKey, TValue> : IPooledObjectPolicy<Dictionary<TKey, TValue>>
where TKey : notnull
{
public Dictionary<TKey, TValue> Create() => new(50);

public bool Return(Dictionary<TKey, TValue> obj)
{
obj.Clear();
return true;
}
}

private sealed class ObjectListPolicy<T> : IPooledObjectPolicy<List<T>>
{
public List<T> Create() => new(50);

public bool Return(List<T> obj)
{
obj.Clear();
return true;
}
}

public static Pool<List<T>> CreateListPool<T>() => new(new ObjectListPolicy<T>());

public static Pool<Dictionary<TKey, TValue>> CreateDictionaryPool<TKey, TValue>()
where TKey : notnull => new(new ObjectDictionaryPolicy<TKey, TValue>());
}
10 changes: 5 additions & 5 deletions src/Speckle.Sdk.Dependencies/Serialization/ChannelExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
using System.Threading.Channels;
using Open.ChannelExtensions;
using Speckle.Sdk.Dependencies.Serialization;

namespace Speckle.Sdk.Serialisation.V2.Send;

public static class ChannelExtensions
{
public static BatchingChannelReader<BaseItem, List<BaseItem>> BatchBySize(
this ChannelReader<BaseItem> source,
public static BatchingChannelReader<T, List<T>> BatchBySize<T>(
this ChannelReader<T> source,
int batchSize,
bool singleReader = false,
bool allowSynchronousContinuations = false
) =>
new SizeBatchingChannelReader(
)
where T : IHasSize =>
new SizeBatchingChannelReader<T>(
source ?? throw new ArgumentNullException(nameof(source)),
batchSize,
singleReader,
Expand Down
6 changes: 3 additions & 3 deletions src/Speckle.Sdk.Dependencies/Serialization/ChannelLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace Speckle.Sdk.Dependencies.Serialization;

public abstract class ChannelLoader
public abstract class ChannelLoader<T>
{
private const int HTTP_GET_CHUNK_SIZE = 500;
private const int MAX_PARALLELISM_HTTP = 4;
Expand All @@ -27,7 +27,7 @@ await allChildrenIds

public abstract string? CheckCache(string id);

public abstract Task<List<BaseItem>> Download(List<string?> ids);
public abstract Task<List<T>> Download(List<string?> ids);

public abstract void SaveToCache(List<BaseItem> x);
public abstract void SaveToCache(List<T> x);
}
93 changes: 52 additions & 41 deletions src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs
Original file line number Diff line number Diff line change
@@ -1,27 +1,11 @@
using System.Text;
using System.Threading.Channels;
using Open.ChannelExtensions;
using Speckle.Sdk.Serialisation.V2.Send;

namespace Speckle.Sdk.Dependencies.Serialization;

public readonly record struct BaseItem(string Id, string Json, bool NeedsStorage)
{
public int Size { get; } = Encoding.UTF8.GetByteCount(Json);

public bool Equals(BaseItem? other)
{
if (other is null)
{
return false;
}
return string.Equals(Id, other.Value.Id, StringComparison.OrdinalIgnoreCase);
}

public override int GetHashCode() => Id.GetHashCode();
}

public abstract class ChannelSaver
public abstract class ChannelSaver<T>
where T : IHasSize
{
private const int SEND_CAPACITY = 50;
private const int HTTP_SEND_CHUNK_SIZE = 25_000_000; //bytes
Expand All @@ -31,7 +15,9 @@ public abstract class ChannelSaver
private const int MAX_CACHE_WRITE_PARALLELISM = 1;
private const int MAX_CACHE_BATCH = 200;

private readonly Channel<BaseItem> _checkCacheChannel = Channel.CreateBounded<BaseItem>(
private bool _enabled;

private readonly Channel<T> _checkCacheChannel = Channel.CreateBounded<T>(
new BoundedChannelOptions(SEND_CAPACITY)
{
AllowSynchronousContinuations = true,
Expand All @@ -43,35 +29,60 @@ public abstract class ChannelSaver
_ => throw new NotImplementedException("Dropping items not supported.")
);

public Task Start(CancellationToken cancellationToken = default)
public Task<long> Start(
bool enableServerSending = true,
bool enableCacheSaving = true,
CancellationToken cancellationToken = default
)
{
var t = _checkCacheChannel
.Reader.BatchBySize(HTTP_SEND_CHUNK_SIZE)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.PipeAsync(
MAX_PARALLELISM_HTTP,
async x => await SendToServer(x, cancellationToken).ConfigureAwait(false),
HTTP_CAPACITY,
false,
cancellationToken
)
.Join()
.Batch(MAX_CACHE_BATCH)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken);
return t;
ValueTask<long> t = new(Task.FromResult(0L));
if (enableServerSending)
{
_enabled = true;
var tChannelReader = _checkCacheChannel
.Reader.BatchBySize(HTTP_SEND_CHUNK_SIZE)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.PipeAsync(
MAX_PARALLELISM_HTTP,
async x => await SendToServer(x, cancellationToken).ConfigureAwait(false),
HTTP_CAPACITY,
false,
cancellationToken
);
if (enableCacheSaving)
{
t = new(
tChannelReader
.Join()
.Batch(MAX_CACHE_BATCH)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken)
);
}
else
{
t = tChannelReader.ReadUntilCancelledAsync(cancellationToken, (list, l) => new ValueTask());
}
}

return t.AsTask();
}

public async Task Save(BaseItem item, CancellationToken cancellationToken = default) =>
await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
public async ValueTask Save(T item, CancellationToken cancellationToken = default)
{
if (_enabled)
{
await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
}
}

public abstract Task<List<BaseItem>> SendToServer(List<BaseItem> batch, CancellationToken cancellationToken);
public abstract Task<List<T>> SendToServer(List<T> batch, CancellationToken cancellationToken);

public Task Done()
public ValueTask Done()
{
_checkCacheChannel.Writer.Complete();
return Task.CompletedTask;
return new(Task.CompletedTask);
}

public abstract void SaveToCache(List<BaseItem> item);
public abstract void SaveToCache(List<T> item);
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,33 @@
using System.Threading.Channels;
using Open.ChannelExtensions;
using Speckle.Sdk.Dependencies.Serialization;

namespace Speckle.Sdk.Serialisation.V2.Send;

public class SizeBatchingChannelReader(
ChannelReader<BaseItem> source,
public interface IHasSize
{
int Size { get; }
}

public class SizeBatchingChannelReader<T>(
ChannelReader<T> source,
int batchSize,
bool singleReader,
bool syncCont = false
) : BatchingChannelReader<BaseItem, List<BaseItem>>(source, batchSize, singleReader, syncCont)
) : BatchingChannelReader<T, List<T>>(source, batchSize, singleReader, syncCont)
where T : IHasSize
{
private readonly int _batchSize = batchSize;

protected override List<BaseItem> CreateBatch(int capacity) => new();
protected override List<T> CreateBatch(int capacity) => new();

protected override void TrimBatch(List<BaseItem> batch) => batch.TrimExcess();
protected override void TrimBatch(List<T> batch) => batch.TrimExcess();

protected override void AddBatchItem(List<BaseItem> batch, BaseItem item) => batch.Add(item);
protected override void AddBatchItem(List<T> batch, T item) => batch.Add(item);

protected override int GetBatchSize(List<BaseItem> batch)
protected override int GetBatchSize(List<T> batch)
{
int size = 0;
foreach (BaseItem item in batch)
foreach (T item in batch)
{
size += item.Size;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using System.Text;
using Speckle.Sdk.Dependencies.Serialization;
using Speckle.Sdk.Serialisation.V2.Send;
using Speckle.Sdk.SQLite;
using Speckle.Sdk.Transports;

Expand All @@ -23,7 +23,7 @@ public class DummySqLiteJsonCacheManager : ISqLiteJsonCacheManager
public class DummySendServerObjectManager : IServerObjectManager
{
public IAsyncEnumerable<(string, string)> DownloadObjects(
IReadOnlyList<string> objectIds,
IReadOnlyCollection<string> objectIds,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken
) => throw new NotImplementedException();
Expand All @@ -35,7 +35,7 @@ CancellationToken cancellationToken
) => throw new NotImplementedException();

public Task<Dictionary<string, bool>> HasObjects(
IReadOnlyList<string> objectIds,
IReadOnlyCollection<string> objectIds,
CancellationToken cancellationToken
) => throw new NotImplementedException();

Expand All @@ -49,7 +49,7 @@ CancellationToken cancellationToken
long totalBytes = 0;
foreach (var item in objects)
{
totalBytes += Encoding.Default.GetByteCount(item.Json);
totalBytes += Encoding.Default.GetByteCount(item.Json.Value);
}

progress?.Report(new(ProgressEvent.UploadBytes, totalBytes, totalBytes));
Expand Down
9 changes: 5 additions & 4 deletions src/Speckle.Sdk/Serialisation/V2/Receive/ObjectLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Speckle.Sdk.Dependencies;
using Speckle.Sdk.Dependencies.Serialization;
using Speckle.Sdk.Serialisation.Utilities;
using Speckle.Sdk.Serialisation.V2.Send;
using Speckle.Sdk.SQLite;
using Speckle.Sdk.Transports;

Expand All @@ -13,7 +14,7 @@ public sealed class ObjectLoader(
ISqLiteJsonCacheManager sqLiteJsonCacheManager,
IServerObjectManager serverObjectManager,
IProgress<ProgressArgs>? progress
) : ChannelLoader, IObjectLoader
) : ChannelLoader<BaseItem>, IObjectLoader
{
private int? _allChildrenCount;
private long _checkCache;
Expand Down Expand Up @@ -80,13 +81,13 @@ public override async Task<List<BaseItem>> Download(List<string?> ids)
var (id, json) in serverObjectManager.DownloadObjects(ids.Select(x => x.NotNull()).ToList(), progress, default)
)
{
toCache.Add(new(id, json, true));
toCache.Add(new(new(id), new(json), true, null));
}

if (toCache.Count != ids.Count)
{
throw new SpeckleException(
$"Objects in batch missing: {string.Join(",", ids.Except(toCache.Select(y => y.Id)).Take(10))}"
$"Objects in batch missing: {string.Join(",", ids.Except(toCache.Select(y => y.Id.Value)).Take(10))}"
);
}
return toCache;
Expand All @@ -97,7 +98,7 @@ public override void SaveToCache(List<BaseItem> batch)
{
if (!_options.SkipCache)
{
sqLiteJsonCacheManager.SaveObjects(batch.Select(x => (x.Id, x.Json)));
sqLiteJsonCacheManager.SaveObjects(batch.Select(x => (x.Id.Value, x.Json.Value)));
Interlocked.Exchange(ref _cached, _cached + batch.Count);
progress?.Report(new(ProgressEvent.CachedToLocal, _cached, _allChildrenCount));
}
Expand Down
Loading

0 comments on commit defcee1

Please sign in to comment.