Skip to content

Commit

Permalink
Compress from ReadOnlySequence<byte> to an IBufferWriter<byte> (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
brantburnett authored Dec 14, 2024
1 parent 5a01bad commit b1fb9ef
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 7 deletions.
85 changes: 84 additions & 1 deletion Snappier.Tests/SnappyTests.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using Xunit;

Expand Down Expand Up @@ -42,6 +42,89 @@ public void CompressAndDecompressFile(string filename)
Assert.Equal(input, output);
}

[Fact]
public void CompressAndDecompressFile_LimitedOutputBuffer()
{
// Covers the branch where the output buffer is too small to hold the maximum compressed length
// but is larger than the actual compressed length

using var resource =
typeof(SnappyTests).Assembly.GetManifestResourceStream($"Snappier.Tests.TestData.alice29.txt");
Assert.NotNull(resource);

var input = new byte[65536];
var bytesRead = resource.Read(input, 0, input.Length);

var compressed = new byte[Snappy.GetMaxCompressedLength(bytesRead) - 5];
var compressedLength = Snappy.Compress(input.AsSpan(0, bytesRead), compressed);

var compressedSpan = compressed.AsSpan(0, compressedLength);

var output = new byte[Snappy.GetUncompressedLength(compressedSpan)];
var outputLength = Snappy.Decompress(compressedSpan, output);

Assert.Equal(input.Length, outputLength);
Assert.Equal(input, output);
}

#if NET6_0_OR_GREATER

[Theory]
[InlineData("alice29.txt")]
[InlineData("asyoulik.txt")]
[InlineData("fireworks.jpeg")]
[InlineData("geo.protodata")]
[InlineData("html")]
[InlineData("html_x_4")]
[InlineData("kppkn.gtb")]
[InlineData("lcet10.txt")]
[InlineData("paper-100k.pdf")]
[InlineData("plrabn12.txt")]
[InlineData("urls.10K")]
public void CompressAndDecompressFile_ViaBufferWriter(string filename)
{
using var resource =
typeof(SnappyTests).Assembly.GetManifestResourceStream($"Snappier.Tests.TestData.{filename}");
Assert.NotNull(resource);

var input = new byte[resource.Length];
var bytesRead = resource.Read(input, 0, input.Length);

var compressed = new ArrayBufferWriter<byte>();
Snappy.Compress(new ReadOnlySequence<byte>(input).Slice(0, bytesRead), compressed);

var output = new ArrayBufferWriter<byte>(); // new byte[Snappy.GetUncompressedLength(compressedSpan)];
Snappy.Decompress(new ReadOnlySequence<byte>(compressed.WrittenMemory), output);

Assert.Equal(input.Length, output.WrittenCount);
Assert.True(input.AsSpan().SequenceEqual(output.WrittenSpan));
}

[Theory]
[InlineData(16384)]
[InlineData(32768)]
[InlineData(65536)]
public void CompressAndDecompressFile_ViaBufferWriter_SplitInput(int maxSegmentSize)
{
using var resource =
typeof(SnappyTests).Assembly.GetManifestResourceStream($"Snappier.Tests.TestData.alice29.txt");
Assert.NotNull(resource);

var input = new byte[resource.Length];
var bytesRead = resource.Read(input, 0, input.Length);

var compressed = new ArrayBufferWriter<byte>();
Snappy.Compress(SequenceHelpers.CreateSequence(input.AsMemory(0, bytesRead), maxSegmentSize), compressed);

var output = new ArrayBufferWriter<byte>(); // new byte[Snappy.GetUncompressedLength(compressedSpan)];
Snappy.Decompress(SequenceHelpers.CreateSequence(compressed.WrittenMemory, maxSegmentSize), output);

Assert.Equal(input.Length, output.WrittenCount);
Assert.True(input.AsSpan().SequenceEqual(output.WrittenSpan));
}

#endif

public static TheoryData<string> CompressAndDecompressStringCases() =>
[
"",
Expand Down
4 changes: 2 additions & 2 deletions Snappier/Internal/HashTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ internal class HashTable : IDisposable

private ushort[]? _buffer;

public void EnsureCapacity(int inputSize)
public void EnsureCapacity(long inputSize)
{
int maxFragmentSize = Math.Min(inputSize, (int) Constants.BlockSize);
int maxFragmentSize = (int) Math.Min(inputSize, Constants.BlockSize);
int tableSize = CalculateTableSize(maxFragmentSize);

if (_buffer is null || tableSize > _buffer.Length)
Expand Down
82 changes: 82 additions & 0 deletions Snappier/Internal/SnappyCompressor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,19 @@ public int Compress(ReadOnlySpan<byte> input, Span<byte> output)

if (output.Length >= maxOutput)
{
// The output span is large enough to hold the maximum possible compressed output,
// compress directly to that span.

int written = CompressFragment(fragment, output, hashTable);

output = output.Slice(written);
bytesWritten += written;
}
else
{
// The output span is too small to hold the maximum possible compressed output,
// compress to a temporary buffer and copy the compressed data to the output span.

byte[] scratch = ArrayPool<byte>.Shared.Rent(maxOutput);
try
{
Expand All @@ -63,6 +69,69 @@ public int Compress(ReadOnlySpan<byte> input, Span<byte> output)
return bytesWritten;
}

public void Compress(ReadOnlySequence<byte> input, IBufferWriter<byte> bufferWriter)
{
ThrowHelper.ThrowIfNull(bufferWriter);
if (input.Length > uint.MaxValue)
{
ThrowHelper.ThrowArgumentException($"{nameof(input)} is larger than the maximum size of {uint.MaxValue} bytes.", nameof(input));
}
if (_workingMemory is null)
{
ThrowHelper.ThrowObjectDisposedException(nameof(SnappyCompressor));
}

_workingMemory.EnsureCapacity(input.Length);

Span<byte> sizeBuffer = bufferWriter.GetSpan(VarIntEncoding.MaxLength);
int bytesWritten = VarIntEncoding.Write(sizeBuffer, (uint)input.Length);
bufferWriter.Advance(bytesWritten);

while (input.Length > 0)
{
SequencePosition position = input.GetPosition(Math.Min(input.Length, Constants.BlockSize));
ReadOnlySequence<byte> fragment = input.Slice(0, position);

if (fragment.IsSingleSegment || fragment.First.Length >= (Constants.BlockSize / 2))
{
// Either this fragment is contiguous, or the first segment in the fragment is at least 32KB.
// In either case, compress the first (and possibly only) segment.

#if NET6_0_OR_GREATER
ReadOnlySpan<byte> fragmentSpan = fragment.FirstSpan;
#else
ReadOnlySpan<byte> fragmentSpan = fragment.First.Span;
#endif

CompressFragment(fragmentSpan, bufferWriter);

// Advance the length of the processed segment of the fragment
input = input.Slice(fragmentSpan.Length);
}
else
{
// This fragment is split and the first segment is <32KB, copy the entire fragment to a single
// buffer before compressing.

int fragmentLength = (int)fragment.Length;
byte[] scratch = ArrayPool<byte>.Shared.Rent(fragmentLength);
try
{
fragment.CopyTo(scratch);

CompressFragment(scratch.AsSpan(0, fragmentLength), bufferWriter);

// Advance the length of the entire fragment
input = input.Slice(position);
}
finally
{
ArrayPool<byte>.Shared.Return(scratch);
}
}
}
}

public void Dispose()
{
_workingMemory?.Dispose();
Expand All @@ -71,6 +140,19 @@ public void Dispose()

#region CompressFragment

private void CompressFragment(ReadOnlySpan<byte> fragment, IBufferWriter<byte> bufferWriter)
{
Debug.Assert(_workingMemory is not null);

Span<ushort> hashTable = _workingMemory.GetHashTable(fragment.Length);

int maxOutput = Helpers.MaxCompressedLength(fragment.Length);

Span<byte> fragmentBuffer = bufferWriter.GetSpan(maxOutput);
int bytesWritten = CompressFragment(fragment, fragmentBuffer, hashTable);
bufferWriter.Advance(bytesWritten);
}

private static int CompressFragment(ReadOnlySpan<byte> input, Span<byte> output, Span<ushort> tableSpan)
{
unchecked
Expand Down
27 changes: 23 additions & 4 deletions Snappier/Snappy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ public static class Snappy
/// <param name="inputLength">Length of the input data, in bytes.</param>
/// <returns>The maximum potential size of the compressed output.</returns>
/// <remarks>
/// This is useful for allocating a sufficient output buffer before calling <see cref="Compress"/>.
/// This is useful for allocating a sufficient output buffer before calling <see cref="Compress(ReadOnlySpan{byte}, Span{byte})"/>.
/// </remarks>
public static int GetMaxCompressedLength(int inputLength) =>
Helpers.MaxCompressedLength(inputLength);
// When used to allocate a precise buffer for compression, we need to also pad for the length encoding.
// Failure to do so will cause the compression process to think the buffer may not be large enough after the
// length is encoded and use a temporary buffer for compression which must then be copied.
Helpers.MaxCompressedLength(inputLength) + VarIntEncoding.MaxLength;

/// <summary>
/// Compress a block of Snappy data.
Expand All @@ -42,9 +45,25 @@ public static int Compress(ReadOnlySpan<byte> input, Span<byte> output)
/// Compress a block of Snappy data.
/// </summary>
/// <param name="input">Data to compress.</param>
/// <returns>An <see cref="IMemoryOwner{T}"/> with the decompressed data. The caller is responsible for disposing this object.</returns>
/// <param name="output">Buffer writer to receive the compressed data.</param>
/// <remarks>
/// Failing to dispose of the returned <see cref="IMemoryOwner{T}"/> may result in memory leaks.
/// For the best performance the input sequence should be comprised of segments some multiple of 64KB
/// in size or a single <see cref="ReadOnlySpan{T}"/> wrapped in a sequence.
/// </remarks>
public static void Compress(ReadOnlySequence<byte> input, IBufferWriter<byte> output)
{
using var compressor = new SnappyCompressor();

compressor.Compress(input, output);
}

/// <summary>
/// Compress a block of Snappy data.
/// </summary>
/// <param name="input">Data to compress.</param>
/// <returns>An <see cref="IMemoryOwner{T}"/> with the compressed data. The caller is responsible for disposing this object.</returns>
/// <remarks>
/// Failing to dispose of the returned <see cref="IMemoryOwner{T}"/> may result in performance loss.
/// </remarks>
public static IMemoryOwner<byte> CompressToMemory(ReadOnlySpan<byte> input)
{
Expand Down

0 comments on commit b1fb9ef

Please sign in to comment.