Skip to content

Commit

Permalink
Merge branch 'main' into gai/docs-grammar
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelstaib authored Feb 19, 2024
2 parents 292159b + cab7730 commit 27fcc4a
Show file tree
Hide file tree
Showing 29 changed files with 380 additions and 248 deletions.
2 changes: 1 addition & 1 deletion src/GreenDonut/src/Core/AutoBatchScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class AutoBatchScheduler : IBatchScheduler
public void Schedule(Func<ValueTask> dispatch)
=> BeginDispatch(dispatch);

private void BeginDispatch(Func<ValueTask> dispatch)
private static void BeginDispatch(Func<ValueTask> dispatch)
=> Task.Factory.StartNew(
async () => await dispatch().ConfigureAwait(false),
default,
Expand Down
9 changes: 7 additions & 2 deletions src/GreenDonut/src/Core/BatchDataLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,14 @@ public abstract class BatchDataLoader<TKey, TValue>
/// </exception>
protected BatchDataLoader(
IBatchScheduler batchScheduler,
DataLoaderOptions? options = null)
DataLoaderOptions options)
: base(batchScheduler, options)
{ }
{
if (options is null)
{
throw new ArgumentNullException(nameof(options));
}
}

/// <inheritdoc />
protected sealed override async ValueTask FetchAsync(
Expand Down
24 changes: 15 additions & 9 deletions src/GreenDonut/src/Core/CacheDataLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,27 @@
using System.Threading;
using System.Threading.Tasks;

#nullable enable

namespace GreenDonut;

public abstract class CacheDataLoader<TKey, TValue>
: DataLoaderBase<TKey, TValue>
where TKey : notnull
{
protected CacheDataLoader(DataLoaderOptions? options = null)
: base(
AutoBatchScheduler.Default,
options is null
? new DataLoaderOptions { MaxBatchSize = 1, }
: CreateLocalOptions(options))
{ }
protected CacheDataLoader(DataLoaderOptions options)
: base(AutoBatchScheduler.Default, CreateLocalOptions(options))
{
if (options is null)
{
throw new ArgumentNullException(nameof(options));
}

if (options.Cache is null)
{
throw new ArgumentException(
"A cache must be provided when using the CacheDataLoader.",
nameof(options));
}
}

protected sealed override async ValueTask FetchAsync(
IReadOnlyList<TKey> keys,
Expand Down
116 changes: 37 additions & 79 deletions src/GreenDonut/src/Core/DataLoaderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,14 @@ namespace GreenDonut;
/// <typeparam name="TValue">A value type.</typeparam>
public abstract partial class DataLoaderBase<TKey, TValue>
: IDataLoader<TKey, TValue>
, IDisposable
where TKey : notnull
{
private readonly object _sync = new();
private readonly CancellationTokenSource _disposeTokenSource = new();
private readonly IBatchScheduler _batchScheduler;
private readonly int _maxBatchSize;
private readonly TaskCacheOwner? _cacheOwner;
private readonly IDataLoaderDiagnosticEvents _diagnosticEvents;
private readonly CancellationToken _ct;
private Batch<TKey>? _currentBatch;
private bool _disposed;

/// <summary>
/// Initializes a new instance of the <see cref="DataLoaderBase{TKey, TValue}"/> class.
Expand All @@ -55,19 +52,8 @@ protected DataLoaderBase(IBatchScheduler batchScheduler, DataLoaderOptions? opti
{
options ??= new DataLoaderOptions();
_diagnosticEvents = options.DiagnosticEvents ?? Default;

if (options.Caching && options.Cache is null)
{
_cacheOwner = new TaskCacheOwner();
Cache = _cacheOwner.Cache;
}
else
{
Cache = options.Caching
? options.Cache
: null;
}

Cache = options.Cache;
_ct = options.CancellationToken;
_batchScheduler = batchScheduler;
_maxBatchSize = options.MaxBatchSize;
CacheKeyType = GetCacheKeyType(GetType());
Expand Down Expand Up @@ -96,25 +82,25 @@ public Task<TValue> LoadAsync(TKey key, CancellationToken cancellationToken = de

lock (_sync)
{
if (Cache is not null)
if (Cache is null)
{
var cachedTask = Cache.GetOrAddTask(cacheKey, CreatePromise);
return CreatePromise().Task;
}

if (cached)
{
_diagnosticEvents.ResolvedTaskFromCache(this, cacheKey, cachedTask);
}
var cachedTask = Cache.GetOrAddTask(cacheKey, _ => CreatePromise());

return cachedTask;
if (cached)
{
_diagnosticEvents.ResolvedTaskFromCache(this, cacheKey, cachedTask);
}

return CreatePromise();
return cachedTask;
}

Task<TValue> CreatePromise()
Promise<TValue> CreatePromise()
{
cached = false;
return GetOrCreatePromiseUnsafe(key).Task;
return GetOrCreatePromiseUnsafe(key);
}
}

Expand All @@ -131,7 +117,6 @@ public Task<IReadOnlyList<TValue>> LoadAsync(
var index = 0;
var tasks = new Task<TValue>[keys.Count];
bool cached;
TKey currentKey;

lock (_sync)
{
Expand All @@ -154,9 +139,8 @@ void InitializeWithCache()
cancellationToken.ThrowIfCancellationRequested();

cached = true;
currentKey = key;
TaskCacheKey cacheKey = new(CacheKeyType, key);
var cachedTask = Cache.GetOrAddTask(cacheKey, CreatePromise);
var cachedTask = Cache.GetOrAddTask(cacheKey, k => CreatePromise((TKey)k.Key));

if (cached)
{
Expand All @@ -172,19 +156,17 @@ void Initialize()
foreach (var key in keys)
{
cancellationToken.ThrowIfCancellationRequested();

currentKey = key;
tasks[index++] = CreatePromise();
tasks[index++] = CreatePromise(key).Task;
}
}

async Task<IReadOnlyList<TValue>> WhenAll()
=> await Task.WhenAll(tasks).ConfigureAwait(false);

Task<TValue> CreatePromise()
Promise<TValue> CreatePromise(TKey key)
{
cached = false;
return GetOrCreatePromiseUnsafe(currentKey).Task;
return GetOrCreatePromiseUnsafe(key);
}
}

Expand Down Expand Up @@ -219,7 +201,7 @@ public void Set(TKey key, Task<TValue> value)
if (Cache is not null)
{
TaskCacheKey cacheKey = new(CacheKeyType, key);
Cache.TryAdd(cacheKey, value);
Cache.TryAdd(cacheKey, new Promise<TValue>(value));
}
}

Expand Down Expand Up @@ -282,7 +264,7 @@ private ValueTask DispatchBatchAsync(
async ValueTask StartDispatchingAsync()
{
var errors = false;

using (_diagnosticEvents.ExecuteBatch(this, batch.Keys))
{
var buffer = new Result<TValue>[batch.Keys.Count];
Expand All @@ -309,7 +291,8 @@ async ValueTask StartDispatchingAsync()
}
}

private TaskCompletionSource<TValue> GetOrCreatePromiseUnsafe(TKey key)
// ReSharper disable InconsistentlySynchronizedField
private Promise<TValue> GetOrCreatePromiseUnsafe(TKey key)
{
if (_currentBatch is not null && _currentBatch.Size < _maxBatchSize)
{
Expand All @@ -321,10 +304,11 @@ private TaskCompletionSource<TValue> GetOrCreatePromiseUnsafe(TKey key)

// set the batch before enqueueing to avoid concurrency issues.
_currentBatch = newBatch;
_batchScheduler.Schedule(() => DispatchBatchAsync(newBatch, _disposeTokenSource.Token));
_batchScheduler.Schedule(() => DispatchBatchAsync(newBatch, _ct));

return newPromise;
}
// ReSharper restore InconsistentlySynchronizedField

private void SetSingleResult(
TaskCompletionSource<TValue> promise,
Expand Down Expand Up @@ -363,13 +347,15 @@ protected void TryAddToCache<TItem, TK, TV>(
Func<TItem, TV> value)
where TK : notnull
{
if (Cache is not null)
if (Cache is null)
{
foreach (var item in items)
{
TaskCacheKey cacheKey = new(cacheKeyType, key(item));
Cache.TryAdd(cacheKey, () => Task.FromResult(value(item)));
}
return;
}

foreach (var item in items)
{
TaskCacheKey cacheKey = new(cacheKeyType, key(item));
Cache.TryAdd(cacheKey, () => new Promise<TV>(value(item)));
}
}

Expand All @@ -389,11 +375,13 @@ protected void TryAddToCache<TK, TV>(
TV value)
where TK : notnull
{
if (Cache is not null)
if (Cache is null)
{
TaskCacheKey cacheKey = new(cacheKeyType, key);
Cache.TryAdd(cacheKey, () => Task.FromResult(value));
return;
}

TaskCacheKey cacheKey = new(cacheKeyType, key);
Cache.TryAdd(cacheKey, () => new Promise<TV>(value));
}

/// <summary>
Expand All @@ -418,34 +406,4 @@ protected static string GetCacheKeyType<TDataLoader>()
/// </returns>
protected static string GetCacheKeyType(Type type)
=> type.FullName ?? type.Name;

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing,
/// or resetting unmanaged resources.
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing,
/// or resetting unmanaged resources.
/// </summary>
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
Clear();
_disposeTokenSource.Cancel();
_disposeTokenSource.Dispose();
_cacheOwner?.Dispose();
}

_disposed = true;
}
}
}
}
13 changes: 7 additions & 6 deletions src/GreenDonut/src/Core/DataLoaderOptions.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using System.Threading;

namespace GreenDonut;

/// <summary>
/// An options object to configure the behavior for <c>DataLoader</c>.
/// </summary>
public class DataLoaderOptions
public sealed class DataLoaderOptions
{
/// <summary>
/// Gets or sets the maximum batch size per request. If set to
Expand All @@ -20,12 +22,11 @@ public class DataLoaderOptions
/// The default value is set to <c>null</c>.
/// </summary>
public ITaskCache? Cache { get; set; }

/// <summary>
/// Gets or sets a value indicating whether caching is enabled. The
/// default value is <c>true</c>.
/// Gets the cancellation token that shall be used for dispatch tasks.
/// </summary>
public bool Caching { get; set; } = true;
public CancellationToken CancellationToken { get; set; }

/// <summary>
/// Gets the <see cref="IDataLoaderDiagnosticEvents"/> to intercept DataLoader events.
Expand All @@ -43,7 +44,7 @@ public DataLoaderOptions Copy()
{
MaxBatchSize = MaxBatchSize,
Cache = Cache,
Caching = Caching,
DiagnosticEvents = DiagnosticEvents,
CancellationToken = CancellationToken,
};
}
17 changes: 15 additions & 2 deletions src/GreenDonut/src/Core/FetchOnceDataLoader.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -9,9 +10,21 @@ namespace GreenDonut;
/// <typeparam name="TValue">A value type.</typeparam>
public abstract class FetchOnceDataLoader<TValue> : CacheDataLoader<string, TValue>
{
protected FetchOnceDataLoader(DataLoaderOptions? options = null)
protected FetchOnceDataLoader(DataLoaderOptions options)
: base(options)
{ }
{
if (options is null)
{
throw new ArgumentNullException(nameof(options));
}

if (options.Cache is null)
{
throw new ArgumentException(
"A cache must be provided when using the FetchOnceDataLoader.",
nameof(options));
}
}

/// <summary>
/// Loads a single value. This call may return a cached value
Expand Down
2 changes: 1 addition & 1 deletion src/GreenDonut/src/Core/IBatchScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
namespace GreenDonut;

/// <summary>
/// The batch scheduler is used by the dataloader to defer the data fetching
/// The batch scheduler is used by the DataLoader to defer the data fetching
/// work to a batch dispatcher that will execute the batches.
/// </summary>
public interface IBatchScheduler
Expand Down
19 changes: 19 additions & 0 deletions src/GreenDonut/src/Core/IPromise.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.Threading.Tasks;

namespace GreenDonut;

/// <summary>
/// Represents a promise that can be canceled.
/// </summary>
public interface IPromise
{
/// <summary>
/// Gets the task that represents the async work for this promise.
/// </summary>
Task Task { get; }

/// <summary>
/// Tries to cancel the async work for this promise.
/// </summary>
void TryCancel();
}
Loading

0 comments on commit 27fcc4a

Please sign in to comment.