Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compress from ReadOnlySequence<byte> to an IBufferWriter<byte> #107

Merged
merged 1 commit into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 84 additions & 1 deletion Snappier.Tests/SnappyTests.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using Xunit;

Expand Down Expand Up @@ -42,6 +42,89 @@ public void CompressAndDecompressFile(string filename)
Assert.Equal(input, output);
}

[Fact]
public void CompressAndDecompressFile_LimitedOutputBuffer()
{
// Covers the branch where the output buffer is too small to hold the maximum compressed length
// but is larger than the actual compressed length

using var resource =
typeof(SnappyTests).Assembly.GetManifestResourceStream($"Snappier.Tests.TestData.alice29.txt");
Assert.NotNull(resource);

var input = new byte[65536];
var bytesRead = resource.Read(input, 0, input.Length);

var compressed = new byte[Snappy.GetMaxCompressedLength(bytesRead) - 5];
var compressedLength = Snappy.Compress(input.AsSpan(0, bytesRead), compressed);

var compressedSpan = compressed.AsSpan(0, compressedLength);

var output = new byte[Snappy.GetUncompressedLength(compressedSpan)];
var outputLength = Snappy.Decompress(compressedSpan, output);

Assert.Equal(input.Length, outputLength);
Assert.Equal(input, output);
}

#if NET6_0_OR_GREATER

[Theory]
[InlineData("alice29.txt")]
[InlineData("asyoulik.txt")]
[InlineData("fireworks.jpeg")]
[InlineData("geo.protodata")]
[InlineData("html")]
[InlineData("html_x_4")]
[InlineData("kppkn.gtb")]
[InlineData("lcet10.txt")]
[InlineData("paper-100k.pdf")]
[InlineData("plrabn12.txt")]
[InlineData("urls.10K")]
public void CompressAndDecompressFile_ViaBufferWriter(string filename)
{
using var resource =
typeof(SnappyTests).Assembly.GetManifestResourceStream($"Snappier.Tests.TestData.{filename}");
Assert.NotNull(resource);

var input = new byte[resource.Length];
var bytesRead = resource.Read(input, 0, input.Length);

var compressed = new ArrayBufferWriter<byte>();
Snappy.Compress(new ReadOnlySequence<byte>(input).Slice(0, bytesRead), compressed);

var output = new ArrayBufferWriter<byte>(); // new byte[Snappy.GetUncompressedLength(compressedSpan)];
Snappy.Decompress(new ReadOnlySequence<byte>(compressed.WrittenMemory), output);

Assert.Equal(input.Length, output.WrittenCount);
Assert.True(input.AsSpan().SequenceEqual(output.WrittenSpan));
}

[Theory]
[InlineData(16384)]
[InlineData(32768)]
[InlineData(65536)]
public void CompressAndDecompressFile_ViaBufferWriter_SplitInput(int maxSegmentSize)
{
using var resource =
typeof(SnappyTests).Assembly.GetManifestResourceStream($"Snappier.Tests.TestData.alice29.txt");
Assert.NotNull(resource);

var input = new byte[resource.Length];
var bytesRead = resource.Read(input, 0, input.Length);

var compressed = new ArrayBufferWriter<byte>();
Snappy.Compress(SequenceHelpers.CreateSequence(input.AsMemory(0, bytesRead), maxSegmentSize), compressed);

var output = new ArrayBufferWriter<byte>(); // new byte[Snappy.GetUncompressedLength(compressedSpan)];
Snappy.Decompress(SequenceHelpers.CreateSequence(compressed.WrittenMemory, maxSegmentSize), output);

Assert.Equal(input.Length, output.WrittenCount);
Assert.True(input.AsSpan().SequenceEqual(output.WrittenSpan));
}

#endif

public static TheoryData<string> CompressAndDecompressStringCases() =>
[
"",
Expand Down
4 changes: 2 additions & 2 deletions Snappier/Internal/HashTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ internal class HashTable : IDisposable

private ushort[]? _buffer;

public void EnsureCapacity(int inputSize)
public void EnsureCapacity(long inputSize)
{
int maxFragmentSize = Math.Min(inputSize, (int) Constants.BlockSize);
int maxFragmentSize = (int) Math.Min(inputSize, Constants.BlockSize);
int tableSize = CalculateTableSize(maxFragmentSize);

if (_buffer is null || tableSize > _buffer.Length)
Expand Down
82 changes: 82 additions & 0 deletions Snappier/Internal/SnappyCompressor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,19 @@ public int Compress(ReadOnlySpan<byte> input, Span<byte> output)

if (output.Length >= maxOutput)
{
// The output span is large enough to hold the maximum possible compressed output,
// compress directly to that span.

int written = CompressFragment(fragment, output, hashTable);

output = output.Slice(written);
bytesWritten += written;
}
else
{
// The output span is too small to hold the maximum possible compressed output,
// compress to a temporary buffer and copy the compressed data to the output span.

byte[] scratch = ArrayPool<byte>.Shared.Rent(maxOutput);
try
{
Expand All @@ -63,6 +69,69 @@ public int Compress(ReadOnlySpan<byte> input, Span<byte> output)
return bytesWritten;
}

public void Compress(ReadOnlySequence<byte> input, IBufferWriter<byte> bufferWriter)
{
ThrowHelper.ThrowIfNull(bufferWriter);
if (input.Length > uint.MaxValue)
{
ThrowHelper.ThrowArgumentException($"{nameof(input)} is larger than the maximum size of {uint.MaxValue} bytes.", nameof(input));
}
if (_workingMemory is null)
{
ThrowHelper.ThrowObjectDisposedException(nameof(SnappyCompressor));
}

_workingMemory.EnsureCapacity(input.Length);

Span<byte> sizeBuffer = bufferWriter.GetSpan(VarIntEncoding.MaxLength);
int bytesWritten = VarIntEncoding.Write(sizeBuffer, (uint)input.Length);
bufferWriter.Advance(bytesWritten);

while (input.Length > 0)
{
SequencePosition position = input.GetPosition(Math.Min(input.Length, Constants.BlockSize));
ReadOnlySequence<byte> fragment = input.Slice(0, position);

if (fragment.IsSingleSegment || fragment.First.Length >= (Constants.BlockSize / 2))
{
// Either this fragment is contiguous, or the first segment in the fragment is at least 32KB.
// In either case, compress the first (and possibly only) segment.

#if NET6_0_OR_GREATER
ReadOnlySpan<byte> fragmentSpan = fragment.FirstSpan;
#else
ReadOnlySpan<byte> fragmentSpan = fragment.First.Span;
#endif

CompressFragment(fragmentSpan, bufferWriter);

// Advance the length of the processed segment of the fragment
input = input.Slice(fragmentSpan.Length);
}
else
{
// This fragment is split and the first segment is <32KB, copy the entire fragment to a single
// buffer before compressing.

int fragmentLength = (int)fragment.Length;
byte[] scratch = ArrayPool<byte>.Shared.Rent(fragmentLength);
try
{
fragment.CopyTo(scratch);

CompressFragment(scratch.AsSpan(0, fragmentLength), bufferWriter);

// Advance the length of the entire fragment
input = input.Slice(position);
}
finally
{
ArrayPool<byte>.Shared.Return(scratch);
}
}
}
}

public void Dispose()
{
_workingMemory?.Dispose();
Expand All @@ -71,6 +140,19 @@ public void Dispose()

#region CompressFragment

private void CompressFragment(ReadOnlySpan<byte> fragment, IBufferWriter<byte> bufferWriter)
{
Debug.Assert(_workingMemory is not null);

Span<ushort> hashTable = _workingMemory.GetHashTable(fragment.Length);

int maxOutput = Helpers.MaxCompressedLength(fragment.Length);

Span<byte> fragmentBuffer = bufferWriter.GetSpan(maxOutput);
int bytesWritten = CompressFragment(fragment, fragmentBuffer, hashTable);
bufferWriter.Advance(bytesWritten);
}

private static int CompressFragment(ReadOnlySpan<byte> input, Span<byte> output, Span<ushort> tableSpan)
{
unchecked
Expand Down
27 changes: 23 additions & 4 deletions Snappier/Snappy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ public static class Snappy
/// <param name="inputLength">Length of the input data, in bytes.</param>
/// <returns>The maximum potential size of the compressed output.</returns>
/// <remarks>
/// This is useful for allocating a sufficient output buffer before calling <see cref="Compress"/>.
/// This is useful for allocating a sufficient output buffer before calling <see cref="Compress(ReadOnlySpan{byte}, Span{byte})"/>.
/// </remarks>
public static int GetMaxCompressedLength(int inputLength) =>
Helpers.MaxCompressedLength(inputLength);
// When used to allocate a precise buffer for compression, we need to also pad for the length encoding.
// Failure to do so will cause the compression process to think the buffer may not be large enough after the
// length is encoded and use a temporary buffer for compression which must then be copied.
Helpers.MaxCompressedLength(inputLength) + VarIntEncoding.MaxLength;

/// <summary>
/// Compress a block of Snappy data.
Expand All @@ -42,9 +45,25 @@ public static int Compress(ReadOnlySpan<byte> input, Span<byte> output)
/// Compress a block of Snappy data.
/// </summary>
/// <param name="input">Data to compress.</param>
/// <returns>An <see cref="IMemoryOwner{T}"/> with the decompressed data. The caller is responsible for disposing this object.</returns>
/// <param name="output">Buffer writer to receive the compressed data.</param>
/// <remarks>
/// Failing to dispose of the returned <see cref="IMemoryOwner{T}"/> may result in memory leaks.
/// For the best performance the input sequence should be comprised of segments some multiple of 64KB
/// in size or a single <see cref="ReadOnlySpan{T}"/> wrapped in a sequence.
/// </remarks>
public static void Compress(ReadOnlySequence<byte> input, IBufferWriter<byte> output)
{
using var compressor = new SnappyCompressor();

compressor.Compress(input, output);
}

/// <summary>
/// Compress a block of Snappy data.
/// </summary>
/// <param name="input">Data to compress.</param>
/// <returns>An <see cref="IMemoryOwner{T}"/> with the compressed data. The caller is responsible for disposing this object.</returns>
/// <remarks>
/// Failing to dispose of the returned <see cref="IMemoryOwner{T}"/> may result in performance loss.
/// </remarks>
public static IMemoryOwner<byte> CompressToMemory(ReadOnlySpan<byte> input)
{
Expand Down
Loading