Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closures are kept for children instead of global #189

Merged
merged 7 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion src/Speckle.Sdk.Dependencies/Pools.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,31 @@ public bool Return(Dictionary<string, object?> obj)
public static Pool<StringBuilder> StringBuilders { get; } =
new(new StringBuilderPooledObjectPolicy() { MaximumRetainedCapacity = 100 * 1024 * 1024 });

public static Pool<List<T>> CreateListPool<T>() => new(new DefaultPooledObjectPolicy<List<T>>());
private sealed class ObjectDictionaryPolicy<TKey, TValue> : IPooledObjectPolicy<Dictionary<TKey, TValue>>
where TKey : notnull
{
public Dictionary<TKey, TValue> Create() => new(50);

public bool Return(Dictionary<TKey, TValue> obj)
{
obj.Clear();
return true;
}
}

private sealed class ObjectListPolicy<T> : IPooledObjectPolicy<List<T>>
{
public List<T> Create() => new(50);

public bool Return(List<T> obj)
{
obj.Clear();
return true;
}
}

public static Pool<List<T>> CreateListPool<T>() => new(new ObjectListPolicy<T>());

public static Pool<Dictionary<TKey, TValue>> CreateDictionaryPool<TKey, TValue>()
where TKey : notnull => new(new ObjectDictionaryPolicy<TKey, TValue>());
}
10 changes: 5 additions & 5 deletions src/Speckle.Sdk.Dependencies/Serialization/ChannelExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
using System.Threading.Channels;
using Open.ChannelExtensions;
using Speckle.Sdk.Dependencies.Serialization;

namespace Speckle.Sdk.Serialisation.V2.Send;

public static class ChannelExtensions
{
public static BatchingChannelReader<BaseItem, List<BaseItem>> BatchBySize(
this ChannelReader<BaseItem> source,
public static BatchingChannelReader<T, List<T>> BatchBySize<T>(
this ChannelReader<T> source,
int batchSize,
bool singleReader = false,
bool allowSynchronousContinuations = false
) =>
new SizeBatchingChannelReader(
)
where T : IHasSize =>
new SizeBatchingChannelReader<T>(
source ?? throw new ArgumentNullException(nameof(source)),
batchSize,
singleReader,
Expand Down
6 changes: 3 additions & 3 deletions src/Speckle.Sdk.Dependencies/Serialization/ChannelLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace Speckle.Sdk.Dependencies.Serialization;

public abstract class ChannelLoader
public abstract class ChannelLoader<T>
{
private const int HTTP_GET_CHUNK_SIZE = 500;
private const int MAX_PARALLELISM_HTTP = 4;
Expand All @@ -27,7 +27,7 @@ await allChildrenIds

public abstract string? CheckCache(string id);

public abstract Task<List<BaseItem>> Download(List<string?> ids);
public abstract Task<List<T>> Download(List<string?> ids);

public abstract void SaveToCache(List<BaseItem> x);
public abstract void SaveToCache(List<T> x);
}
93 changes: 52 additions & 41 deletions src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs
Original file line number Diff line number Diff line change
@@ -1,27 +1,11 @@
using System.Text;
using System.Threading.Channels;
using Open.ChannelExtensions;
using Speckle.Sdk.Serialisation.V2.Send;

namespace Speckle.Sdk.Dependencies.Serialization;

public readonly record struct BaseItem(string Id, string Json, bool NeedsStorage)
{
public int Size { get; } = Encoding.UTF8.GetByteCount(Json);

public bool Equals(BaseItem? other)
{
if (other is null)
{
return false;
}
return string.Equals(Id, other.Value.Id, StringComparison.OrdinalIgnoreCase);
}

public override int GetHashCode() => Id.GetHashCode();
}

public abstract class ChannelSaver
public abstract class ChannelSaver<T>
where T : IHasSize
{
private const int SEND_CAPACITY = 50;
private const int HTTP_SEND_CHUNK_SIZE = 25_000_000; //bytes
Expand All @@ -31,7 +15,9 @@ public abstract class ChannelSaver
private const int MAX_CACHE_WRITE_PARALLELISM = 1;
private const int MAX_CACHE_BATCH = 200;

private readonly Channel<BaseItem> _checkCacheChannel = Channel.CreateBounded<BaseItem>(
private bool _enabled;

private readonly Channel<T> _checkCacheChannel = Channel.CreateBounded<T>(
new BoundedChannelOptions(SEND_CAPACITY)
{
AllowSynchronousContinuations = true,
Expand All @@ -43,35 +29,60 @@ public abstract class ChannelSaver
_ => throw new NotImplementedException("Dropping items not supported.")
);

public Task Start(CancellationToken cancellationToken = default)
public Task<long> Start(
bool enableServerSending = true,
bool enableCacheSaving = true,
CancellationToken cancellationToken = default
)
{
var t = _checkCacheChannel
.Reader.BatchBySize(HTTP_SEND_CHUNK_SIZE)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.PipeAsync(
MAX_PARALLELISM_HTTP,
async x => await SendToServer(x, cancellationToken).ConfigureAwait(false),
HTTP_CAPACITY,
false,
cancellationToken
)
.Join()
.Batch(MAX_CACHE_BATCH)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken);
return t;
ValueTask<long> t = new(Task.FromResult(0L));
if (enableServerSending)
{
_enabled = true;
var tChannelReader = _checkCacheChannel
.Reader.BatchBySize(HTTP_SEND_CHUNK_SIZE)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.PipeAsync(
MAX_PARALLELISM_HTTP,
async x => await SendToServer(x, cancellationToken).ConfigureAwait(false),
HTTP_CAPACITY,
false,
cancellationToken
);
if (enableCacheSaving)
{
t = new(
tChannelReader
.Join()
.Batch(MAX_CACHE_BATCH)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken)
);
}
else
{
t = tChannelReader.ReadUntilCancelledAsync(cancellationToken, (list, l) => new ValueTask());
}
}

return t.AsTask();
}

public async Task Save(BaseItem item, CancellationToken cancellationToken = default) =>
await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
public async ValueTask Save(T item, CancellationToken cancellationToken = default)
{
if (_enabled)
{
await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
}
}

public abstract Task<List<BaseItem>> SendToServer(List<BaseItem> batch, CancellationToken cancellationToken);
public abstract Task<List<T>> SendToServer(List<T> batch, CancellationToken cancellationToken);

public Task Done()
public ValueTask Done()
{
_checkCacheChannel.Writer.Complete();
return Task.CompletedTask;
return new(Task.CompletedTask);
}

public abstract void SaveToCache(List<BaseItem> item);
public abstract void SaveToCache(List<T> item);
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,33 @@
using System.Threading.Channels;
using Open.ChannelExtensions;
using Speckle.Sdk.Dependencies.Serialization;

namespace Speckle.Sdk.Serialisation.V2.Send;

public class SizeBatchingChannelReader(
ChannelReader<BaseItem> source,
public interface IHasSize
{
int Size { get; }
}

public class SizeBatchingChannelReader<T>(
ChannelReader<T> source,
int batchSize,
bool singleReader,
bool syncCont = false
) : BatchingChannelReader<BaseItem, List<BaseItem>>(source, batchSize, singleReader, syncCont)
) : BatchingChannelReader<T, List<T>>(source, batchSize, singleReader, syncCont)
where T : IHasSize
{
private readonly int _batchSize = batchSize;

protected override List<BaseItem> CreateBatch(int capacity) => new();
protected override List<T> CreateBatch(int capacity) => new();

protected override void TrimBatch(List<BaseItem> batch) => batch.TrimExcess();
protected override void TrimBatch(List<T> batch) => batch.TrimExcess();

protected override void AddBatchItem(List<BaseItem> batch, BaseItem item) => batch.Add(item);
protected override void AddBatchItem(List<T> batch, T item) => batch.Add(item);

protected override int GetBatchSize(List<BaseItem> batch)
protected override int GetBatchSize(List<T> batch)
{
int size = 0;
foreach (BaseItem item in batch)
foreach (T item in batch)
{
size += item.Size;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using System.Text;
using Speckle.Sdk.Dependencies.Serialization;
using Speckle.Sdk.Serialisation.V2.Send;
using Speckle.Sdk.SQLite;
using Speckle.Sdk.Transports;

Expand All @@ -23,7 +23,7 @@ public class DummySqLiteJsonCacheManager : ISqLiteJsonCacheManager
public class DummySendServerObjectManager : IServerObjectManager
{
public IAsyncEnumerable<(string, string)> DownloadObjects(
IReadOnlyList<string> objectIds,
IReadOnlyCollection<string> objectIds,
IProgress<ProgressArgs>? progress,
CancellationToken cancellationToken
) => throw new NotImplementedException();
Expand All @@ -35,7 +35,7 @@ CancellationToken cancellationToken
) => throw new NotImplementedException();

public Task<Dictionary<string, bool>> HasObjects(
IReadOnlyList<string> objectIds,
IReadOnlyCollection<string> objectIds,
CancellationToken cancellationToken
) => throw new NotImplementedException();

Expand All @@ -49,7 +49,7 @@ CancellationToken cancellationToken
long totalBytes = 0;
foreach (var item in objects)
{
totalBytes += Encoding.Default.GetByteCount(item.Json);
totalBytes += Encoding.Default.GetByteCount(item.Json.Value);
}

progress?.Report(new(ProgressEvent.UploadBytes, totalBytes, totalBytes));
Expand Down
9 changes: 5 additions & 4 deletions src/Speckle.Sdk/Serialisation/V2/Receive/ObjectLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Speckle.Sdk.Dependencies;
using Speckle.Sdk.Dependencies.Serialization;
using Speckle.Sdk.Serialisation.Utilities;
using Speckle.Sdk.Serialisation.V2.Send;
using Speckle.Sdk.SQLite;
using Speckle.Sdk.Transports;

Expand All @@ -13,7 +14,7 @@ public sealed class ObjectLoader(
ISqLiteJsonCacheManager sqLiteJsonCacheManager,
IServerObjectManager serverObjectManager,
IProgress<ProgressArgs>? progress
) : ChannelLoader, IObjectLoader
) : ChannelLoader<BaseItem>, IObjectLoader
{
private int? _allChildrenCount;
private long _checkCache;
Expand Down Expand Up @@ -80,13 +81,13 @@ public override async Task<List<BaseItem>> Download(List<string?> ids)
var (id, json) in serverObjectManager.DownloadObjects(ids.Select(x => x.NotNull()).ToList(), progress, default)
)
{
toCache.Add(new(id, json, true));
toCache.Add(new(new(id), new(json), true, null));
}

if (toCache.Count != ids.Count)
{
throw new SpeckleException(
$"Objects in batch missing: {string.Join(",", ids.Except(toCache.Select(y => y.Id)).Take(10))}"
$"Objects in batch missing: {string.Join(",", ids.Except(toCache.Select(y => y.Id.Value)).Take(10))}"
);
}
return toCache;
Expand All @@ -97,7 +98,7 @@ public override void SaveToCache(List<BaseItem> batch)
{
if (!_options.SkipCache)
{
sqLiteJsonCacheManager.SaveObjects(batch.Select(x => (x.Id, x.Json)));
sqLiteJsonCacheManager.SaveObjects(batch.Select(x => (x.Id.Value, x.Json.Value)));
Interlocked.Exchange(ref _cached, _cached + batch.Count);
progress?.Report(new(ProgressEvent.CachedToLocal, _cached, _allChildrenCount));
}
Expand Down
Loading
Loading