Skip to content

Commit

Permalink
Added overloads to pass in ContinuationOptions to channels and asyn…
Browse files Browse the repository at this point in the history
…c synchronization primitives.

Renamed `SynchronousIfSameContext` to `AllowSynchronous`.
  • Loading branch information
timcassell committed Oct 28, 2024
1 parent 7ea7e88 commit 750c297
Show file tree
Hide file tree
Showing 23 changed files with 867 additions and 1,075 deletions.
65 changes: 61 additions & 4 deletions Package/Core/Channels/ChannelReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,27 @@ internal ChannelReader(Channel<T> channel)
/// </summary>
public int Count => _channel.ValidateAndGetRef().GetCount(_channel._id);

/// <summary>
/// Asynchronously waits for data to be available to be read.
/// </summary>
/// <returns>
/// A <see cref="Promise{T}"/> that will resolve with <see langword="true"/> when data is available to be read,
/// or <see langword="false"/> when the channel is closed.
/// </returns>
public Promise<bool> WaitToReadAsync()
=> WaitToReadAsync(CancelationToken.None);

/// <summary>
/// Asynchronously waits for data to be available to be read.
/// </summary>
/// <param name="continuationOptions">The options used to configure the continuation behavior of the returned <see cref="Promise{T}"/>.</param>
/// <returns>
/// A <see cref="Promise{T}"/> that will resolve with <see langword="true"/> when data is available to be read,
/// or <see langword="false"/> when the channel is closed.
/// </returns>
public Promise<bool> WaitToReadAsync(ContinuationOptions continuationOptions)
=> _channel.ValidateAndGetRef().WaitToReadAsync(_channel._id, default, continuationOptions.GetValidated());

/// <summary>
/// Asynchronously waits for data to be available to be read.
/// </summary>
Expand All @@ -43,8 +64,20 @@ internal ChannelReader(Channel<T> channel)
/// A <see cref="Promise{T}"/> that will resolve with <see langword="true"/> when data is available to be read,
/// or <see langword="false"/> when the channel is closed.
/// </returns>
public Promise<bool> WaitToReadAsync(CancelationToken cancelationToken = default)
=> _channel.ValidateAndGetRef().WaitToReadAsync(_channel._id, cancelationToken);
public Promise<bool> WaitToReadAsync(CancelationToken cancelationToken)
=> _channel.ValidateAndGetRef().WaitToReadAsync(_channel._id, cancelationToken, ContinuationOptions.CapturedContext);

/// <summary>
/// Asynchronously waits for data to be available to be read.
/// </summary>
/// <param name="cancelationToken">A <see cref="CancelationToken"/> used to cancel the wait operation.</param>
/// <param name="continuationOptions">The options used to configure the continuation behavior of the returned <see cref="Promise{T}"/>.</param>
/// <returns>
/// A <see cref="Promise{T}"/> that will resolve with <see langword="true"/> when data is available to be read,
/// or <see langword="false"/> when the channel is closed.
/// </returns>
public Promise<bool> WaitToReadAsync(CancelationToken cancelationToken, ContinuationOptions continuationOptions)
=> _channel.ValidateAndGetRef().WaitToReadAsync(_channel._id, cancelationToken, continuationOptions.GetValidated());

/// <summary>
/// Attempts to peek at an item from the channel in a non-blocking manner.
Expand All @@ -60,13 +93,37 @@ public ChannelPeekResult<T> TryPeek()
public ChannelReadResult<T> TryRead()
=> _channel.ValidateAndGetRef().TryRead(_channel._id);

/// <summary>
/// Asynchronously reads an item from the channel.
/// </summary>
/// <returns>A <see cref="Promise{T}"/> that yields the result of the read operation.</returns>
public Promise<ChannelReadResult<T>> ReadAsync()
=> ReadAsync(CancelationToken.None);

/// <summary>
/// Asynchronously reads an item from the channel.
/// </summary>
/// <param name="continuationOptions">The options used to configure the continuation behavior of the returned <see cref="Promise{T}"/>.</param>
/// <returns>A <see cref="Promise{T}"/> that yields the result of the read operation.</returns>
public Promise<ChannelReadResult<T>> ReadAsync(ContinuationOptions continuationOptions)
=> _channel.ValidateAndGetRef().ReadAsync(_channel._id, default, continuationOptions.GetValidated());

/// <summary>
/// Asynchronously reads an item from the channel.
/// </summary>
/// <param name="cancelationToken">A <see cref="CancelationToken"/> used to cancel the read operation.</param>
/// <returns>A <see cref="Promise{T}"/> that yields the result of the read operation.</returns>
public Promise<ChannelReadResult<T>> ReadAsync(CancelationToken cancelationToken)
=> _channel.ValidateAndGetRef().ReadAsync(_channel._id, cancelationToken, ContinuationOptions.CapturedContext);

/// <summary>
/// Asynchronously reads an item from the channel.
/// </summary>
/// <param name="cancelationToken">A <see cref="CancelationToken"/> used to cancel the read operation.</param>
/// <param name="continuationOptions">The options used to configure the continuation behavior of the returned <see cref="Promise{T}"/>.</param>
/// <returns>A <see cref="Promise{T}"/> that yields the result of the read operation.</returns>
public Promise<ChannelReadResult<T>> ReadAsync(CancelationToken cancelationToken = default)
=> _channel.ValidateAndGetRef().ReadAsync(_channel._id, cancelationToken);
public Promise<ChannelReadResult<T>> ReadAsync(CancelationToken cancelationToken, ContinuationOptions continuationOptions)
=> _channel.ValidateAndGetRef().ReadAsync(_channel._id, cancelationToken, continuationOptions.GetValidated());

/// <summary>
/// Creates an <see cref="AsyncEnumerable{T}"/> that enables reading all of the data from the channel.
Expand Down
68 changes: 64 additions & 4 deletions Package/Core/Channels/ChannelWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,49 @@ internal ChannelWriter(Channel<T> channel)
_channel = channel;
}

/// <summary>
/// Asynchronously waits for space to be available to write an item.
/// </summary>
/// <returns>
/// A <see cref="Promise{T}"/> that will resolve with <see langword="true"/> when space is available to write an item,
/// or <see langword="false"/> when the channel is closed.
/// </returns>
public Promise<bool> WaitToWriteAsync()
=> WaitToWriteAsync(CancelationToken.None);

/// <summary>
/// Asynchronously waits for space to be available to write an item.
/// </summary>
/// <param name="continuationOptions">The options used to configure the continuation behavior of the returned <see cref="Promise{T}"/>.</param>
/// <returns>
/// A <see cref="Promise{T}"/> that will resolve with <see langword="true"/> when space is available to write an item,
/// or <see langword="false"/> when the channel is closed.
/// </returns>
public Promise<bool> WaitToWriteAsync(ContinuationOptions continuationOptions)
=> _channel.ValidateAndGetRef().WaitToWriteAsync(_channel._id, default, continuationOptions.GetValidated());

/// <summary>
/// Asynchronously waits for space to be available to write an item.
/// </summary>
/// <param name="cancelationToken">A <see cref="CancelationToken"/> used to cancel the wait operation.</param>
/// <returns>
/// A <see cref="Promise{T}"/> that will resolve with <see langword="true"/> when space is available to write an item,
/// or <see langword="false"/> when the channel is closed.
/// </returns>
public Promise<bool> WaitToWriteAsync(CancelationToken cancelationToken)
=> _channel.ValidateAndGetRef().WaitToWriteAsync(_channel._id, cancelationToken, ContinuationOptions.CapturedContext);

/// <summary>
/// Asynchronously waits for space to be available to write an item.
/// </summary>
/// <param name="cancelationToken">A <see cref="CancelationToken"/> used to cancel the wait operation.</param>
/// <param name="continuationOptions">The options used to configure the continuation behavior of the returned <see cref="Promise{T}"/>.</param>
/// <returns>
/// A <see cref="Promise{T}"/> that will resolve with <see langword="true"/> when space is available to write an item,
/// or <see langword="false"/> when the channel is closed.
/// </returns>
public Promise<bool> WaitToWriteAsync(CancelationToken cancelationToken = default)
=> _channel.ValidateAndGetRef().WaitToWriteAsync(_channel._id, cancelationToken);
public Promise<bool> WaitToWriteAsync(CancelationToken cancelationToken, ContinuationOptions continuationOptions)
=> _channel.ValidateAndGetRef().WaitToWriteAsync(_channel._id, cancelationToken, continuationOptions.GetValidated());

/// <summary>
/// Attempts to write an item to the channel in a non-blocking manner.
Expand All @@ -46,14 +79,41 @@ public Promise<bool> WaitToWriteAsync(CancelationToken cancelationToken = defaul
public ChannelWriteResult<T> TryWrite(T item)
=> _channel.ValidateAndGetRef().TryWrite(item, _channel._id);

/// <summary>
/// Asynchronously writes an item to the channel.
/// </summary>
/// <param name="item">The value to write to the channel.</param>
/// <returns>A <see cref="Promise{T}"/> that yields the result of the write operation.</returns>
public Promise<ChannelWriteResult<T>> WriteAsync(T item)
=> _channel.ValidateAndGetRef().WriteAsync(item, _channel._id, default, ContinuationOptions.CapturedContext);

/// <summary>
/// Asynchronously writes an item to the channel.
/// </summary>
/// <param name="item">The value to write to the channel.</param>
/// <param name="continuationOptions">The options used to configure the continuation behavior of the returned <see cref="Promise{T}"/>.</param>
/// <returns>A <see cref="Promise{T}"/> that yields the result of the write operation.</returns>
public Promise<ChannelWriteResult<T>> WriteAsync(T item, ContinuationOptions continuationOptions)
=> _channel.ValidateAndGetRef().WriteAsync(item, _channel._id, default, continuationOptions.GetValidated());

/// <summary>
/// Asynchronously writes an item to the channel.
/// </summary>
/// <param name="item">The value to write to the channel.</param>
/// <param name="cancelationToken">A <see cref="CancelationToken"/> used to cancel the write operation.</param>
/// <returns>A <see cref="Promise{T}"/> that yields the result of the write operation.</returns>
public Promise<ChannelWriteResult<T>> WriteAsync(T item, CancelationToken cancelationToken)
=> _channel.ValidateAndGetRef().WriteAsync(item, _channel._id, cancelationToken, ContinuationOptions.CapturedContext);

/// <summary>
/// Asynchronously writes an item to the channel.
/// </summary>
/// <param name="item">The value to write to the channel.</param>
/// <param name="cancelationToken">A <see cref="CancelationToken"/> used to cancel the write operation.</param>
/// <param name="continuationOptions">The options used to configure the continuation behavior of the returned <see cref="Promise{T}"/>.</param>
/// <returns>A <see cref="Promise{T}"/> that yields the result of the write operation.</returns>
public Promise<ChannelWriteResult<T>> WriteAsync(T item, CancelationToken cancelationToken = default)
=> _channel.ValidateAndGetRef().WriteAsync(item, _channel._id, cancelationToken);
public Promise<ChannelWriteResult<T>> WriteAsync(T item, CancelationToken cancelationToken, ContinuationOptions continuationOptions)
=> _channel.ValidateAndGetRef().WriteAsync(item, _channel._id, cancelationToken, continuationOptions.GetValidated());

/// <summary>
/// Attempts to close the channel in a rejected state.
Expand Down
Loading

0 comments on commit 750c297

Please sign in to comment.