Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
adamhathcock committed Dec 12, 2024
1 parent 6371996 commit 285d7b6
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 37 deletions.
15 changes: 8 additions & 7 deletions src/Speckle.Sdk.Dependencies/Pools.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public bool Return(Dictionary<string, object?> obj)
new(new StringBuilderPooledObjectPolicy() { MaximumRetainedCapacity = 100 * 1024 * 1024 });

private sealed class ObjectDictionaryPolicy<TKey, TValue> : IPooledObjectPolicy<Dictionary<TKey, TValue>>
where TKey : notnull
where TKey : notnull
{
public Dictionary<TKey, TValue> Create() => new(50);

Expand All @@ -31,8 +31,8 @@ public bool Return(Dictionary<TKey, TValue> obj)
obj.Clear();
return true;
}
}
}

private sealed class ObjectListPolicy<T> : IPooledObjectPolicy<List<T>>
{
public List<T> Create() => new(50);
Expand All @@ -42,9 +42,10 @@ public bool Return(List<T> obj)
obj.Clear();
return true;
}
}
}

public static Pool<List<T>> CreateListPool<T>() => new(new ObjectListPolicy<T>());
public static Pool<Dictionary<TKey, TValue>> CreateDictionaryPool<TKey, TValue>() where TKey : notnull => new(new ObjectDictionaryPolicy<TKey, TValue>());


public static Pool<Dictionary<TKey, TValue>> CreateDictionaryPool<TKey, TValue>()
where TKey : notnull => new(new ObjectDictionaryPolicy<TKey, TValue>());
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public static BatchingChannelReader<T, List<T>> BatchBySize<T>(
int batchSize,
bool singleReader = false,
bool allowSynchronousContinuations = false
)
)
where T : IHasSize =>
new SizeBatchingChannelReader<T>(
source ?? throw new ArgumentNullException(nameof(source)),
Expand Down
22 changes: 14 additions & 8 deletions src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@

namespace Speckle.Sdk.Dependencies.Serialization;


public abstract class ChannelSaver<T>
where T : IHasSize
where T : IHasSize
{
private const int SEND_CAPACITY = 50;
private const int HTTP_SEND_CHUNK_SIZE = 25_000_000; //bytes
Expand All @@ -15,7 +14,7 @@ public abstract class ChannelSaver<T>
private const int HTTP_CAPACITY = 50;
private const int MAX_CACHE_WRITE_PARALLELISM = 1;
private const int MAX_CACHE_BATCH = 200;

private bool _enabled;

private readonly Channel<T> _checkCacheChannel = Channel.CreateBounded<T>(
Expand All @@ -30,7 +29,11 @@ public abstract class ChannelSaver<T>
_ => throw new NotImplementedException("Dropping items not supported.")
);

public Task<long> Start(bool enableServerSending = true, bool enableCacheSaving = true, CancellationToken cancellationToken = default)
public Task<long> Start(
bool enableServerSending = true,
bool enableCacheSaving = true,
CancellationToken cancellationToken = default
)
{
ValueTask<long> t = new(Task.FromResult(0L));
if (enableServerSending)
Expand All @@ -48,10 +51,13 @@ public Task<long> Start(bool enableServerSending = true, bool enableCacheSaving
);
if (enableCacheSaving)
{
t =new (tChannelReader.Join()
.Batch(MAX_CACHE_BATCH)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken));
t = new(
tChannelReader
.Join()
.Batch(MAX_CACHE_BATCH)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrently(MAX_CACHE_WRITE_PARALLELISM, SaveToCache, cancellationToken)
);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ public interface IHasSize
{
int Size { get; }
}

public class SizeBatchingChannelReader<T>(
ChannelReader<T> source,
int batchSize,
bool singleReader,
bool syncCont = false
) : BatchingChannelReader<T, List<T>>(source, batchSize, singleReader, syncCont)
where T : IHasSize
where T : IHasSize
{
private readonly int _batchSize = batchSize;

Expand Down
2 changes: 1 addition & 1 deletion src/Speckle.Sdk/Serialisation/V2/Receive/ObjectLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public override async Task<List<BaseItem>> Download(List<string?> ids)
var (id, json) in serverObjectManager.DownloadObjects(ids.Select(x => x.NotNull()).ToList(), progress, default)
)
{
toCache.Add(new(new (id), new (json), true, null));
toCache.Add(new(new(id), new(json), true, null));
}

if (toCache.Count != ids.Count)
Expand Down
21 changes: 13 additions & 8 deletions src/Speckle.Sdk/Serialisation/V2/Send/ObjectSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ namespace Speckle.Sdk.Serialisation.V2.Send;

public readonly record struct NodeInfo(Json Json, Closures? C)
{
public Closures GetClosures() => C ?? ClosureParser.GetClosures( Json.Value ).ToDictionary(x => new Id(x.Item1), x => x.Item2 );
public Closures GetClosures() =>
C ?? ClosureParser.GetClosures(Json.Value).ToDictionary(x => new Id(x.Item1), x => x.Item2);
}

public partial interface IObjectSerializer : IDisposable;
Expand All @@ -40,10 +41,10 @@ public sealed class ObjectSerializer : IObjectSerializer

private readonly List<(Id, Json, Closures)> _chunks;
private readonly Pool<List<(Id, Json, Closures)>> _chunksPool;

private readonly List<List<DataChunk>> _chunks2 = new();
private readonly Pool<List<DataChunk>> _chunks2Pool;

private readonly List<List<object?>> _chunks3 = new();
private readonly Pool<List<object?>> _chunks3Pool;

Expand All @@ -55,7 +56,10 @@ public sealed class ObjectSerializer : IObjectSerializer
public ObjectSerializer(
IBasePropertyGatherer propertyGatherer,
IReadOnlyDictionary<Id, NodeInfo> childCache,
Pool<List<(Id, Json, Closures)>> chunksPool, Pool<List<DataChunk>> chunks2Pool, Pool<List<object?>> chunks3Pool, bool trackDetachedChildren = false,
Pool<List<(Id, Json, Closures)>> chunksPool,
Pool<List<DataChunk>> chunks2Pool,
Pool<List<object?>> chunks3Pool,
bool trackDetachedChildren = false,
CancellationToken cancellationToken = default
)
{
Expand Down Expand Up @@ -278,7 +282,7 @@ private void SerializeProperty(object? obj, JsonWriter writer, PropertyAttribute
Id id;
Json json;
//avoid multiple serialization to get closures
if (baseObj.id != null && _childCache.TryGetValue(new (baseObj.id), out var info))
if (baseObj.id != null && _childCache.TryGetValue(new(baseObj.id), out var info))
{
id = new Id(baseObj.id);
childClosures = info.GetClosures();
Expand Down Expand Up @@ -306,7 +310,7 @@ private void SerializeProperty(object? obj, JsonWriter writer, PropertyAttribute
applicationId = baseObj.applicationId,
closure = childClosures.ToDictionary(x => x.Key.Value, x => x.Value),
};
}
}
_chunks.Add(new(id, json, []));
return new(id, json2);
}
Expand Down Expand Up @@ -379,14 +383,15 @@ private Id SerializeBaseObject(Base baseObj, JsonWriter writer, Closures closure
_chunks3.Add(chunk);
return chunk;
}

private void SerializeOrChunkProperty(object? baseValue, JsonWriter jsonWriter, PropertyAttributeInfo detachInfo)
{
if (baseValue is IEnumerable chunkableCollection && detachInfo.IsChunkable)
{
List<DataChunk> chunks = _chunks2Pool.Get();
_chunks2.Add(chunks);
DataChunk crtChunk = new() { data =GetChunk() };

DataChunk crtChunk = new() { data = GetChunk() };

foreach (object element in chunkableCollection)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class ObjectSerializerFactory(IBasePropertyGatherer propertyGatherer) : I
private readonly Pool<List<(Id, Json, Closures)>> _chunkPool = Pools.CreateListPool<(Id, Json, Closures)>();
private readonly Pool<List<DataChunk>> _chunk2Pool = Pools.CreateListPool<DataChunk>();
private readonly Pool<List<object?>> _chunk3Pool = Pools.CreateListPool<object?>();

public IObjectSerializer Create(IReadOnlyDictionary<Id, NodeInfo> baseCache, CancellationToken cancellationToken) =>
new ObjectSerializer(propertyGatherer, baseCache, _chunkPool, _chunk2Pool, _chunk3Pool, true, cancellationToken);
}
14 changes: 8 additions & 6 deletions src/Speckle.Sdk/Serialisation/V2/Send/SerializeProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public readonly record struct SerializeProcessResults(
IReadOnlyDictionary<Id, ObjectReference> ConvertedReferences
);


public readonly record struct BaseItem(Id Id, Json Json, bool NeedsStorage, Closures? Closures) : IHasSize
{
public int Size { get; } = Encoding.UTF8.GetByteCount(Json.Value);
Expand Down Expand Up @@ -58,7 +57,6 @@ public class SerializeProcess(
private readonly Pool<List<(Id, Json, Closures)>> _pool = Pools.CreateListPool<(Id, Json, Closures)>();
private readonly Pool<Dictionary<Id, NodeInfo>> _childClosurePool = Pools.CreateDictionaryPool<Id, NodeInfo>();


private long _objectCount;
private long _objectsFound;

Expand Down Expand Up @@ -130,7 +128,7 @@ private async Task<Dictionary<Id, NodeInfo>> Traverse(Base obj, bool isEnd, Canc
}

var items = Serialise(obj, childClosures, cancellationToken);

var currentClosures = new Dictionary<Id, NodeInfo>();
Interlocked.Increment(ref _objectCount);
progress?.Report(new(ProgressEvent.FromCacheOrSerialized, _objectCount, _objectsFound));
Expand All @@ -157,14 +155,18 @@ private async Task<Dictionary<Id, NodeInfo>> Traverse(Base obj, bool isEnd, Canc
}

//leave this sync
private IEnumerable<BaseItem> Serialise(Base obj, IReadOnlyDictionary<Id, NodeInfo> childInfo, CancellationToken cancellationToken)
private IEnumerable<BaseItem> Serialise(
Base obj,
IReadOnlyDictionary<Id, NodeInfo> childInfo,
CancellationToken cancellationToken
)
{
if (!_options.SkipCacheRead && obj.id != null)
{
var cachedJson = sqLiteJsonCacheManager.GetObject(obj.id);
if (cachedJson != null)
{
yield return new BaseItem(new (obj.id.NotNull()), new (cachedJson), false, null);
yield return new BaseItem(new(obj.id.NotNull()), new(cachedJson), false, null);
yield break;
}
}
Expand Down Expand Up @@ -200,7 +202,7 @@ private BaseItem CheckCache(Id id, Json json, Dictionary<Id, int> closures)
var cachedJson = sqLiteJsonCacheManager.GetObject(id.Value);
if (cachedJson != null)
{
return new BaseItem(id, new (cachedJson), false, null);
return new BaseItem(id, new(cachedJson), false, null);
}
}
return new BaseItem(id, json, true, closures);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ CancellationToken cancellationToken
CancellationToken cancellationToken
) => throw new NotImplementedException();

public Task<Dictionary<string, bool>> HasObjects(IReadOnlyCollection<string> objectIds, CancellationToken cancellationToken)
public Task<Dictionary<string, bool>> HasObjects(
IReadOnlyCollection<string> objectIds,
CancellationToken cancellationToken
)
{
return Task.FromResult(objectIds.Distinct().ToDictionary(x => x, savedObjects.ContainsKey));
}
Expand Down
20 changes: 16 additions & 4 deletions tests/Speckle.Sdk.Serialization.Tests/ExternalIdTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ public void Setup()
public void ExternalIdTest_Detached(string lineId, string valueId)
{
var p = new Polyline() { units = "cm", value = [1, 2] };
var serializer = new ObjectSerializer(new BasePropertyGatherer(), new Dictionary<Id, NodeInfo>(), true);
using var serializer = new ObjectSerializerFactory(new BasePropertyGatherer()).Create(
new Dictionary<Id, NodeInfo>(),
default
);
var list = serializer.Serialize(p).ToDictionary(x => x.Item1, x => x.Item2);
list.ContainsKey(new Id(lineId)).ShouldBeTrue();
var json = list[new Id(lineId)];
Expand All @@ -53,7 +56,10 @@ public void ExternalIdTest_Detached_Nested(string lineId, string valueId)
knots = [],
weights = [],
};
var serializer = new ObjectSerializer(new BasePropertyGatherer(), new Dictionary<Id, NodeInfo>(), true);
using var serializer = new ObjectSerializerFactory(new BasePropertyGatherer()).Create(
new Dictionary<Id, NodeInfo>(),
default
);
var list = serializer.Serialize(curve).ToDictionary(x => x.Item1, x => x.Item2);
list.ContainsKey(new Id(lineId)).ShouldBeTrue();
var json = list[new Id(lineId)];
Expand Down Expand Up @@ -81,7 +87,10 @@ public void ExternalIdTest_Detached_Nested_More(string lineId, string valueId)
weights = [],
};
var polycurve = new Polycurve() { segments = [curve], units = "cm" };
var serializer = new ObjectSerializer(new BasePropertyGatherer(), new Dictionary<Id, NodeInfo>(), true);
using var serializer = new ObjectSerializerFactory(new BasePropertyGatherer()).Create(
new Dictionary<Id, NodeInfo>(),
default
);
var list = serializer.Serialize(polycurve).ToDictionary(x => x.Item1, x => x.Item2);
list.ContainsKey(new Id(lineId)).ShouldBeTrue();
var json = list[new Id(lineId)];
Expand Down Expand Up @@ -111,7 +120,10 @@ public void ExternalIdTest_Detached_Nested_More_Too(string lineId, string valueI
var polycurve = new Polycurve() { segments = [curve], units = "cm" };
var @base = new Base();
@base.SetDetachedProp("profile", polycurve);
var serializer = new ObjectSerializer(new BasePropertyGatherer(), new Dictionary<Id, NodeInfo>(), true);
using var serializer = new ObjectSerializerFactory(new BasePropertyGatherer()).Create(
new Dictionary<Id, NodeInfo>(),
default
);
var list = serializer.Serialize(@base).ToDictionary(x => x.Item1, x => x.Item2);
list.ContainsKey(new Id(lineId)).ShouldBeTrue();
var json = list[new Id(lineId)];
Expand Down

0 comments on commit 285d7b6

Please sign in to comment.