Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Double the size of the previous segment #66930

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ internal sealed class BufferSegment : ReadOnlySequenceSegment<byte>
private BufferSegment? _next;
private int _end;

/// <summary>
/// The amount of avaiable memory in the segment.
/// </summary>
public int Capacity => AvailableMemory.Length;

/// <summary>
/// The End represents the offset into AvailableMemory where the range of "active" bytes ends. At the point when the block is leased
/// the End is guaranteed to be equal to Start. The value of Start may be assigned anywhere between 0 and
Expand Down
20 changes: 12 additions & 8 deletions src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private void AllocateWriteHeadSynchronized(int sizeHint)
if (_writingHead == null)
{
// We need to allocate memory to write since nobody has written before
BufferSegment newSegment = AllocateSegment(sizeHint);
BufferSegment newSegment = AllocateSegment(MinimumSegmentSize, sizeHint);

// Set all the pointers
_writingHead = _readHead = _readTail = newSegment;
Expand All @@ -197,7 +197,9 @@ private void AllocateWriteHeadSynchronized(int sizeHint)
_writingHeadBytesBuffered = 0;
}

BufferSegment newSegment = AllocateSegment(sizeHint);
// Double the minimum segment size on subsequent segements
int newSegmentSize = Math.Min(PipeOptions.MaximumSegmentSize, _writingHead.Capacity * 2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What pool are you testing this with?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArrayPool. I need to change the implementation of the PinnedBlockMemoryPool to make this work the way it's intended.

BufferSegment newSegment = AllocateSegment(newSegmentSize, sizeHint);

_writingHead.SetNext(newSegment);
_writingHead = newSegment;
Expand All @@ -206,7 +208,7 @@ private void AllocateWriteHeadSynchronized(int sizeHint)
}
}

private BufferSegment AllocateSegment(int sizeHint)
private BufferSegment AllocateSegment(int minimumSegmentSize, int sizeHint)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We should rename the sizeHint to minSize or something in internal/private APIs because it now violates the contract to provide spans/memory smaller than that so it's not really a "hint". With the new variables, this becoming somewhat confusing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we should but I dislike unrelated changes. I'll make that change as well.

{
Debug.Assert(sizeHint >= 0);
BufferSegment newSegment = CreateSegmentUnsynchronized();
Expand All @@ -223,12 +225,12 @@ private BufferSegment AllocateSegment(int sizeHint)
if (sizeHint <= maxSize)
{
// Use the specified pool as it fits. Specified pool is not null as maxSize == -1 if _pool is null.
newSegment.SetOwnedMemory(pool!.Rent(GetSegmentSize(sizeHint, maxSize)));
newSegment.SetOwnedMemory(pool!.Rent(GetSegmentSize(minimumSegmentSize, sizeHint, maxSize)));
}
else
{
// Use the array pool
int sizeToRequest = GetSegmentSize(sizeHint);
int sizeToRequest = GetSegmentSize(minimumSegmentSize, sizeHint);
newSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(sizeToRequest));
}

Expand All @@ -237,10 +239,11 @@ private BufferSegment AllocateSegment(int sizeHint)
return newSegment;
}

private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue)
private static int GetSegmentSize(int minimumSegmentSize, int sizeHint, int maxBufferSize = int.MaxValue)
{
// First we need to handle case where hint is smaller than minimum segment size
sizeHint = Math.Max(MinimumSegmentSize, sizeHint);
sizeHint = Math.Max(minimumSegmentSize, sizeHint);

// After that adjust it to fit into pools max buffer size
int adjustedToMaximumSize = Math.Min(maxBufferSize, sizeHint);
return adjustedToMaximumSize;
Expand Down Expand Up @@ -1090,7 +1093,8 @@ private void WriteMultiSegment(ReadOnlySpan<byte> source)

// This is optimized to use pooled memory. That's why we pass 0 instead of
// source.Length
BufferSegment newSegment = AllocateSegment(0);
int newSegmentSize = Math.Min(PipeOptions.MaximumSegmentSize, _writingHead.Capacity * 2);
BufferSegment newSegment = AllocateSegment(newSegmentSize, 0);

_writingHead.SetNext(newSegment);
_writingHead = newSegment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ public class PipeOptions
{
private const int DefaultMinimumSegmentSize = 4096;

// Arbitrary 1MB max segment size
internal const int MaximumSegmentSize = 1024 * 1024;
Copy link
Member Author

@davidfowl davidfowl Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be higher for large copy scenarios? Not sure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should think about how this interacts with logic like that for SocketConnection.MinAllocBufferSize. I could see us potentially unnecessarily doing syscalls using relatively small 2KB buffer for reads after this change as we reach the end of the block. This is a DoS mitigation given smaller segment sizes. But with larger segment sizes, we could leave more bytes unfilled before allocating syscalls without wasting too much memory proportionally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, we'll need to tweak how we think about this (and if it matters). You'll end up throwing away 2K (which maybe is too aggressive anyways?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think throwing away up to 2K at the end of the segment is that bad especially if the segments get larger. I think we could throw away more. We want to reduce syscalls, so reading into small tail space would be counterproductive given large amounts of data. When we're copying data from one buffer to another in user mode, it makes far more sense to use all the available space.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting anything or just saying this is interesting?


/// <summary>Gets the default instance of <see cref="System.IO.Pipelines.PipeOptions" />.</summary>
/// <value>A <see cref="System.IO.Pipelines.PipeOptions" /> object initialized with default parameters.</value>
public static PipeOptions Default { get; } = new PipeOptions();
Expand Down Expand Up @@ -38,10 +41,10 @@ public PipeOptions(
// to let users specify the maximum buffer size, so we pick a reasonable number based on defaults. They can influence
// how much gets buffered by increasing the minimum segment size.

// With a defaukt segment size of 4K this maps to 16K
// With a default minimum segment size of 4 this maps to 60K
InitialSegmentPoolSize = 4;

// With a defaukt segment size of 4K this maps to 1MB. If the pipe has large segments this will be bigger than 1MB...
// With a default minimum segment size of 4K this maps to ~250MB.
MaxSegmentPoolSize = 256;

// By default, we'll throttle the writer at 64K of buffered data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private void AllocateMemory(int sizeHint)
if (_head == null)
{
// We need to allocate memory to write since nobody has written before
BufferSegment newSegment = AllocateSegment(sizeHint);
BufferSegment newSegment = AllocateSegment(_minimumBufferSize, sizeHint);

// Set all the pointers
_head = _tail = newSegment;
Expand All @@ -135,15 +135,16 @@ private void AllocateMemory(int sizeHint)
_tailBytesBuffered = 0;
}

BufferSegment newSegment = AllocateSegment(sizeHint);
int newSegmentSize = Math.Min(PipeOptions.MaximumSegmentSize, _tail.Capacity * 2);
Copy link
Member Author

@davidfowl davidfowl Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is simple to understand but another possible technique could be to base the growth on number of segments rather than the last segment (so taking the speed of the consumer into account to shrink the next segment). I'm not sure if it's a big concern though because:

  • If the last segment is ever fully consumed, it'll restart to the minimum segment size (default 4K).
  • If the last segment isn't fully consumed because the reader can't keep up, the pause threshold can be adjusted and working with bigger blocks of memory is better for the reader.

BufferSegment newSegment = AllocateSegment(newSegmentSize, sizeHint);

_tail.SetNext(newSegment);
_tail = newSegment;
}
}
}

private BufferSegment AllocateSegment(int sizeHint)
private BufferSegment AllocateSegment(int minimumBufferSize, int sizeHint)
{
Debug.Assert(sizeHint >= 0);
BufferSegment newSegment = CreateSegmentUnsynchronized();
Expand All @@ -152,12 +153,12 @@ private BufferSegment AllocateSegment(int sizeHint)
if (sizeHint <= maxSize)
{
// Use the specified pool as it fits. Specified pool is not null as maxSize == -1 if _pool is null.
newSegment.SetOwnedMemory(_pool!.Rent(GetSegmentSize(sizeHint, maxSize)));
newSegment.SetOwnedMemory(_pool!.Rent(GetSegmentSize(minimumBufferSize, sizeHint, maxSize)));
}
else
{
// Use the array pool
int sizeToRequest = GetSegmentSize(sizeHint);
int sizeToRequest = GetSegmentSize(minimumBufferSize, sizeHint);
newSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(sizeToRequest));
}

Expand All @@ -166,12 +167,12 @@ private BufferSegment AllocateSegment(int sizeHint)
return newSegment;
}

private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue)
private static int GetSegmentSize(int minimumBufferSize, int sizeHint, int maxBufferSize = int.MaxValue)
{
// First we need to handle case where hint is smaller than minimum segment size
sizeHint = Math.Max(_minimumBufferSize, sizeHint);
sizeHint = Math.Max(minimumBufferSize, sizeHint);
// After that adjust it to fit into pools max buffer size
var adjustedToMaximumSize = Math.Min(maxBufferSize, sizeHint);
int adjustedToMaximumSize = Math.Min(maxBufferSize, sizeHint);
return adjustedToMaximumSize;
}

Expand Down
4 changes: 2 additions & 2 deletions src/libraries/System.IO.Pipelines/tests/PipePoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public async Task MultipleCompleteReaderWriterCauseDisposeOnlyOnce()
}

[Fact]
public async Task RentsMinimumSegmentSize()
public async Task DoublesMinimumSegmentSize()
{
var pool = new DisposeTrackingBufferPool();
var writeSize = 512;
Expand All @@ -118,7 +118,7 @@ public async Task RentsMinimumSegmentSize()
pipe.Reader.Complete();
pipe.Writer.Complete();

Assert.Equal(2020, ensuredSize);
Assert.Equal(4040, ensuredSize);
Assert.Equal(2020, allocatedSize);
}

Expand Down