diff --git a/Snappier.Tests/SnappyTests.cs b/Snappier.Tests/SnappyTests.cs index dc14b8b..ab6526b 100644 --- a/Snappier.Tests/SnappyTests.cs +++ b/Snappier.Tests/SnappyTests.cs @@ -1,7 +1,7 @@ using System; using System.Buffers; -using System.Collections.Generic; using System.IO; +using System.Linq; using System.Text; using Xunit; @@ -42,6 +42,89 @@ public void CompressAndDecompressFile(string filename) Assert.Equal(input, output); } + [Fact] + public void CompressAndDecompressFile_LimitedOutputBuffer() + { + // Covers the branch where the output buffer is too small to hold the maximum compressed length + // but is larger than the actual compressed length + + using var resource = + typeof(SnappyTests).Assembly.GetManifestResourceStream($"Snappier.Tests.TestData.alice29.txt"); + Assert.NotNull(resource); + + var input = new byte[65536]; + var bytesRead = resource.Read(input, 0, input.Length); + + var compressed = new byte[Snappy.GetMaxCompressedLength(bytesRead) - 5]; + var compressedLength = Snappy.Compress(input.AsSpan(0, bytesRead), compressed); + + var compressedSpan = compressed.AsSpan(0, compressedLength); + + var output = new byte[Snappy.GetUncompressedLength(compressedSpan)]; + var outputLength = Snappy.Decompress(compressedSpan, output); + + Assert.Equal(input.Length, outputLength); + Assert.Equal(input, output); + } + +#if NET6_0_OR_GREATER + + [Theory] + [InlineData("alice29.txt")] + [InlineData("asyoulik.txt")] + [InlineData("fireworks.jpeg")] + [InlineData("geo.protodata")] + [InlineData("html")] + [InlineData("html_x_4")] + [InlineData("kppkn.gtb")] + [InlineData("lcet10.txt")] + [InlineData("paper-100k.pdf")] + [InlineData("plrabn12.txt")] + [InlineData("urls.10K")] + public void CompressAndDecompressFile_ViaBufferWriter(string filename) + { + using var resource = + typeof(SnappyTests).Assembly.GetManifestResourceStream($"Snappier.Tests.TestData.{filename}"); + Assert.NotNull(resource); + + var input = new byte[resource.Length]; + var bytesRead = resource.Read(input, 0, input.Length); + + var compressed = new ArrayBufferWriter(); + Snappy.Compress(new ReadOnlySequence(input).Slice(0, bytesRead), compressed); + + var output = new ArrayBufferWriter(); // new byte[Snappy.GetUncompressedLength(compressedSpan)]; + Snappy.Decompress(new ReadOnlySequence(compressed.WrittenMemory), output); + + Assert.Equal(input.Length, output.WrittenCount); + Assert.True(input.AsSpan().SequenceEqual(output.WrittenSpan)); + } + + [Theory] + [InlineData(16384)] + [InlineData(32768)] + [InlineData(65536)] + public void CompressAndDecompressFile_ViaBufferWriter_SplitInput(int maxSegmentSize) + { + using var resource = + typeof(SnappyTests).Assembly.GetManifestResourceStream($"Snappier.Tests.TestData.alice29.txt"); + Assert.NotNull(resource); + + var input = new byte[resource.Length]; + var bytesRead = resource.Read(input, 0, input.Length); + + var compressed = new ArrayBufferWriter(); + Snappy.Compress(SequenceHelpers.CreateSequence(input.AsMemory(0, bytesRead), maxSegmentSize), compressed); + + var output = new ArrayBufferWriter(); // new byte[Snappy.GetUncompressedLength(compressedSpan)]; + Snappy.Decompress(SequenceHelpers.CreateSequence(compressed.WrittenMemory, maxSegmentSize), output); + + Assert.Equal(input.Length, output.WrittenCount); + Assert.True(input.AsSpan().SequenceEqual(output.WrittenSpan)); + } + +#endif + public static TheoryData CompressAndDecompressStringCases() => [ "", diff --git a/Snappier/Internal/HashTable.cs b/Snappier/Internal/HashTable.cs index 9abef88..1fd13ae 100644 --- a/Snappier/Internal/HashTable.cs +++ b/Snappier/Internal/HashTable.cs @@ -21,9 +21,9 @@ internal class HashTable : IDisposable private ushort[]? _buffer; - public void EnsureCapacity(int inputSize) + public void EnsureCapacity(long inputSize) { - int maxFragmentSize = Math.Min(inputSize, (int) Constants.BlockSize); + int maxFragmentSize = (int) Math.Min(inputSize, Constants.BlockSize); int tableSize = CalculateTableSize(maxFragmentSize); if (_buffer is null || tableSize > _buffer.Length) diff --git a/Snappier/Internal/SnappyCompressor.cs b/Snappier/Internal/SnappyCompressor.cs index fdb4cd6..7a4c16f 100644 --- a/Snappier/Internal/SnappyCompressor.cs +++ b/Snappier/Internal/SnappyCompressor.cs @@ -31,6 +31,9 @@ public int Compress(ReadOnlySpan input, Span output) if (output.Length >= maxOutput) { + // The output span is large enough to hold the maximum possible compressed output, + // compress directly to that span. + int written = CompressFragment(fragment, output, hashTable); output = output.Slice(written); @@ -38,6 +41,9 @@ public int Compress(ReadOnlySpan input, Span output) } else { + // The output span is too small to hold the maximum possible compressed output, + // compress to a temporary buffer and copy the compressed data to the output span. + byte[] scratch = ArrayPool.Shared.Rent(maxOutput); try { @@ -63,6 +69,69 @@ public int Compress(ReadOnlySpan input, Span output) return bytesWritten; } + public void Compress(ReadOnlySequence input, IBufferWriter bufferWriter) + { + ThrowHelper.ThrowIfNull(bufferWriter); + if (input.Length > uint.MaxValue) + { + ThrowHelper.ThrowArgumentException($"{nameof(input)} is larger than the maximum size of {uint.MaxValue} bytes.", nameof(input)); + } + if (_workingMemory is null) + { + ThrowHelper.ThrowObjectDisposedException(nameof(SnappyCompressor)); + } + + _workingMemory.EnsureCapacity(input.Length); + + Span sizeBuffer = bufferWriter.GetSpan(VarIntEncoding.MaxLength); + int bytesWritten = VarIntEncoding.Write(sizeBuffer, (uint)input.Length); + bufferWriter.Advance(bytesWritten); + + while (input.Length > 0) + { + SequencePosition position = input.GetPosition(Math.Min(input.Length, Constants.BlockSize)); + ReadOnlySequence fragment = input.Slice(0, position); + + if (fragment.IsSingleSegment || fragment.First.Length >= (Constants.BlockSize / 2)) + { + // Either this fragment is contiguous, or the first segment in the fragment is at least 32KB. + // In either case, compress the first (and possibly only) segment. + +#if NET6_0_OR_GREATER + ReadOnlySpan fragmentSpan = fragment.FirstSpan; +#else + ReadOnlySpan fragmentSpan = fragment.First.Span; +#endif + + CompressFragment(fragmentSpan, bufferWriter); + + // Advance the length of the processed segment of the fragment + input = input.Slice(fragmentSpan.Length); + } + else + { + // This fragment is split and the first segment is <32KB, copy the entire fragment to a single + // buffer before compressing. + + int fragmentLength = (int)fragment.Length; + byte[] scratch = ArrayPool.Shared.Rent(fragmentLength); + try + { + fragment.CopyTo(scratch); + + CompressFragment(scratch.AsSpan(0, fragmentLength), bufferWriter); + + // Advance the length of the entire fragment + input = input.Slice(position); + } + finally + { + ArrayPool.Shared.Return(scratch); + } + } + } + } + public void Dispose() { _workingMemory?.Dispose(); @@ -71,6 +140,19 @@ public void Dispose() #region CompressFragment + private void CompressFragment(ReadOnlySpan fragment, IBufferWriter bufferWriter) + { + Debug.Assert(_workingMemory is not null); + + Span hashTable = _workingMemory.GetHashTable(fragment.Length); + + int maxOutput = Helpers.MaxCompressedLength(fragment.Length); + + Span fragmentBuffer = bufferWriter.GetSpan(maxOutput); + int bytesWritten = CompressFragment(fragment, fragmentBuffer, hashTable); + bufferWriter.Advance(bytesWritten); + } + private static int CompressFragment(ReadOnlySpan input, Span output, Span tableSpan) { unchecked diff --git a/Snappier/Snappy.cs b/Snappier/Snappy.cs index eb51964..4faa46d 100644 --- a/Snappier/Snappy.cs +++ b/Snappier/Snappy.cs @@ -17,10 +17,13 @@ public static class Snappy /// Length of the input data, in bytes. /// The maximum potential size of the compressed output. /// - /// This is useful for allocating a sufficient output buffer before calling . + /// This is useful for allocating a sufficient output buffer before calling . /// public static int GetMaxCompressedLength(int inputLength) => - Helpers.MaxCompressedLength(inputLength); + // When used to allocate a precise buffer for compression, we need to also pad for the length encoding. + // Failure to do so will cause the compression process to think the buffer may not be large enough after the + // length is encoded and use a temporary buffer for compression which must then be copied. + Helpers.MaxCompressedLength(inputLength) + VarIntEncoding.MaxLength; /// /// Compress a block of Snappy data. @@ -42,9 +45,25 @@ public static int Compress(ReadOnlySpan input, Span output) /// Compress a block of Snappy data. /// /// Data to compress. - /// An with the decompressed data. The caller is responsible for disposing this object. + /// Buffer writer to receive the compressed data. /// - /// Failing to dispose of the returned may result in memory leaks. + /// For the best performance the input sequence should be comprised of segments some multiple of 64KB + /// in size or a single wrapped in a sequence. + /// + public static void Compress(ReadOnlySequence input, IBufferWriter output) + { + using var compressor = new SnappyCompressor(); + + compressor.Compress(input, output); + } + + /// + /// Compress a block of Snappy data. + /// + /// Data to compress. + /// An with the compressed data. The caller is responsible for disposing this object. + /// + /// Failing to dispose of the returned may result in performance loss. /// public static IMemoryOwner CompressToMemory(ReadOnlySpan input) {