Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow derived IO writer classes to specify base path #1377

Merged
merged 2 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Bonsai.System/Bonsai.System.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<Description>Bonsai System Library containing reactive infrastructure to interface with the underlying operating system.</Description>
<PackageTags>Bonsai Rx Reactive Extensions IO Serial Port Resources</PackageTags>
<TargetFrameworks>net462;netstandard2.0</TargetFrameworks>
<VersionPrefix>2.7.1</VersionPrefix>
<VersionPrefix>2.8.0</VersionPrefix>
</PropertyGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<PackageReference Include="System.IO.Ports" Version="6.0.0" />
Expand Down
44 changes: 35 additions & 9 deletions Bonsai.System/IO/FileSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
namespace Bonsai.IO
{
/// <summary>
/// Provides a non-generic base class for sinks that write the elements from the input sequence
/// into a file.
/// Provides a non-generic base class for sinks that write all elements from the
/// source sequence to a file.
/// </summary>
[Combinator]
[DefaultProperty(nameof(FileName))]
Expand Down Expand Up @@ -77,9 +77,10 @@ public abstract class FileSink<TSource, TWriter> : FileSink where TWriter : clas
protected abstract void Write(TWriter writer, TSource input);

/// <summary>
/// Writes all elements of an observable sequence into the specified file.
/// Writes all elements of an observable sequence to a file.
/// </summary>
/// <param name="source">The source sequence for which to write elements.</param>
/// <typeparam name="TElement">The type of the elements in the source sequence.</typeparam>
/// <param name="source">The sequence of elements to write to the file.</param>
/// <param name="selector">
/// The transform function used to convert each element of the sequence into the type
/// of inputs accepted by the file writer.
Expand All @@ -88,16 +89,42 @@ public abstract class FileSink<TSource, TWriter> : FileSink where TWriter : clas
/// An observable sequence that is identical to the <paramref name="source"/> sequence
/// but where there is an additional side effect of writing the elements to a file.
/// </returns>
/// <exception cref="ArgumentNullException"></exception>
protected IObservable<TElement> Process<TElement>(IObservable<TElement> source, Func<TElement, TSource> selector)
{
return Process(source, selector, FileName);
}

/// <summary>
/// Writes all elements of an observable sequence to the specified file.
/// </summary>
/// <typeparam name="TElement">The type of the elements in the source sequence.</typeparam>
/// <param name="source">The sequence of elements to write to the file.</param>
/// <param name="selector">
/// The transform function used to convert each element of the sequence into the type
/// of inputs accepted by the file writer.
/// </param>
/// <param name="fileName">
/// The name of the file on which to write the elements.
/// </param>
/// <returns>
/// An observable sequence that is identical to the <paramref name="source"/> sequence
/// but where there is an additional side effect of writing the elements to a file.
/// </returns>
/// <exception cref="ArgumentNullException"></exception>
protected IObservable<TElement> Process<TElement>(
IObservable<TElement> source,
Func<TElement, TSource> selector,
string fileName)
{
if (source == null)
{
throw new ArgumentNullException("source");
throw new ArgumentNullException(nameof(source));
}

if (selector == null)
{
throw new ArgumentNullException("selector");
throw new ArgumentNullException(nameof(selector));
}

return Observable.Create<TElement>(observer =>
Expand All @@ -113,7 +140,6 @@ protected IObservable<TElement> Process<TElement>(IObservable<TElement> source,
var runningWriter = disposable.Writer;
if (runningWriter == null)
{
var fileName = FileName;
if (string.IsNullOrEmpty(fileName))
{
throw new InvalidOperationException("A valid file path must be specified.");
Expand Down Expand Up @@ -145,9 +171,9 @@ protected IObservable<TElement> Process<TElement>(IObservable<TElement> source,
}

/// <summary>
/// Writes all elements of an observable sequence into the specified file.
/// Writes all elements of an observable sequence to the specified file.
/// </summary>
/// <param name="source">The source sequence for which to write elements.</param>
/// <param name="source">The sequence of elements to write to the file.</param>
/// <returns>
/// An observable sequence that is identical to the <paramref name="source"/> sequence
/// but where there is an additional side effect of writing the elements to a file.
Expand Down
39 changes: 33 additions & 6 deletions Bonsai.System/IO/StreamSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@
namespace Bonsai.IO
{
/// <summary>
/// Provides a non-generic base class for sinks that write the elements from the input sequence
/// into a named stream (e.g. a named pipe).
/// Provides a non-generic base class for sinks that write all elements from the
/// source sequence to a named stream.
/// </summary>
[Combinator]
[DefaultProperty(nameof(Path))]
[WorkflowElementCategory(ElementCategory.Sink)]
public abstract class StreamSink
{
/// <summary>
/// Gets or sets the identifier of the named stream on which to write the elements.
/// Gets or sets the identifier of the stream on which to write the elements.
/// </summary>
/// <remarks>
/// If the identifier uses the named pipe prefix <c>\\.\pipe\</c>, a corresponding
Expand Down Expand Up @@ -94,9 +94,9 @@ static Stream CreateStream(string path, bool overwrite, CancellationToken cancel
}

/// <summary>
/// Writes all elements of an observable sequence to the specified stream
/// using the specified selector function.
/// Writes all elements of an observable sequence to a stream.
/// </summary>
/// <typeparam name="TElement">The type of the elements in the source sequence.</typeparam>
/// <param name="source">The sequence of elements to write.</param>
/// <param name="selector">
/// The transform function used to convert each element of the sequence into the type
Expand All @@ -106,7 +106,35 @@ static Stream CreateStream(string path, bool overwrite, CancellationToken cancel
/// An observable sequence that is identical to the source sequence but where
/// there is an additional side effect of writing the elements to a stream.
/// </returns>
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="InvalidOperationException">A valid path must be specified.</exception>
protected IObservable<TElement> Process<TElement>(IObservable<TElement> source, Func<TElement, TSource> selector)
{
return Process(source, selector, Path);
}

/// <summary>
/// Writes all elements of an observable sequence into the specified stream.
/// </summary>
/// <typeparam name="TElement">The type of the elements in the source sequence.</typeparam>
/// <param name="source">The sequence of elements to write.</param>
/// <param name="selector">
/// The transform function used to convert each element of the sequence into the type
/// of inputs accepted by the stream writer.
/// </param>
/// <param name="path">
/// The identifier of the stream on which to write the elements.
/// </param>
/// <returns>
/// An observable sequence that is identical to the source sequence but where
/// there is an additional side effect of writing the elements to the named stream.
/// </returns>
/// <exception cref="ArgumentNullException"></exception>
/// <exception cref="InvalidOperationException">A valid path must be specified.</exception>
protected IObservable<TElement> Process<TElement>(
IObservable<TElement> source,
Func<TElement, TSource> selector,
string path)
{
if (source == null)
{
Expand All @@ -120,7 +148,6 @@ protected IObservable<TElement> Process<TElement>(IObservable<TElement> source,

return Observable.Create<TElement>(observer =>
{
var path = Path;
if (string.IsNullOrEmpty(path))
{
throw new InvalidOperationException("A valid path must be specified.");
Expand Down