Skip to content

Commit

Permalink
Fixed StreamPipeReader.CopyToAsync (#57943)
Browse files Browse the repository at this point in the history
* Fixed StreamPipeReader.CopyToAsync
- Take the segment index into account when copying buffered data. This handles the case where ReadAsync has consumed a partial segment and then the same PipeReader instance is used to copy to a Stream and PipeWriter.
- Added tests

* Always slice
  • Loading branch information
davidfowl authored Aug 23, 2021
1 parent 25cd447 commit 331cfe3
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -383,18 +383,21 @@ public override async Task CopyToAsync(PipeWriter destination, CancellationToken
try
{
BufferSegment? segment = _readHead;
int segmentIndex = _readIndex;

try
{
while (segment != null)
{
FlushResult flushResult = await destination.WriteAsync(segment.Memory, tokenSource.Token).ConfigureAwait(false);
FlushResult flushResult = await destination.WriteAsync(segment.Memory.Slice(segmentIndex), tokenSource.Token).ConfigureAwait(false);

if (flushResult.IsCanceled)
{
ThrowHelper.ThrowOperationCanceledException_FlushCanceled();
}

segment = segment.NextSegment;
segmentIndex = 0;

if (flushResult.IsCompleted)
{
Expand Down Expand Up @@ -451,13 +454,16 @@ public override async Task CopyToAsync(Stream destination, CancellationToken can
try
{
BufferSegment? segment = _readHead;
int segmentIndex = _readIndex;

try
{
while (segment != null)
{
await destination.WriteAsync(segment.Memory, tokenSource.Token).ConfigureAwait(false);
await destination.WriteAsync(segment.Memory.Slice(segmentIndex), tokenSource.Token).ConfigureAwait(false);

segment = segment.NextSegment;
segmentIndex = 0;
}
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,5 +286,41 @@ public async Task ThrowingFromStreamCallsAdvanceToWithStartOfLastReadResult(int
Assert.True(startPosition.Equals(wrappedPipeReader.LastConsumed));
Assert.True(startPosition.Equals(wrappedPipeReader.LastExamined));
}

[Fact]
public async Task CopyToAsyncStreamCopiesRemainderAfterReadingSome()
{
var buffer = Encoding.UTF8.GetBytes("Hello World");
await Pipe.Writer.WriteAsync(buffer);
Pipe.Writer.Complete();

var result = await PipeReader.ReadAsync();
Assert.Equal(result.Buffer.ToArray(), buffer);
// Consume Hello
PipeReader.AdvanceTo(result.Buffer.GetPosition(5));

var ms = new MemoryStream();
await PipeReader.CopyToAsync(ms);

Assert.Equal(buffer.AsMemory(5).ToArray(), ms.ToArray());
}

[Fact]
public async Task CopyToAsyncPipeWriterCopiesRemainderAfterReadingSome()
{
var buffer = Encoding.UTF8.GetBytes("Hello World");
await Pipe.Writer.WriteAsync(buffer);
Pipe.Writer.Complete();

var result = await PipeReader.ReadAsync();
Assert.Equal(result.Buffer.ToArray(), buffer);
// Consume Hello
PipeReader.AdvanceTo(result.Buffer.GetPosition(5));

var ms = new MemoryStream();
await PipeReader.CopyToAsync(PipeWriter.Create(ms));

Assert.Equal(buffer.AsMemory(5).ToArray(), ms.ToArray());
}
}
}

0 comments on commit 331cfe3

Please sign in to comment.