Skip to content

Commit

Permalink
Implement Abort() on XZStreamBase (#14)
Browse files Browse the repository at this point in the history
Abort() is useful when an user wants to shudown XZStream immediately.
The original intended operation to abort is good old Close(), but there
is a report that Dispose()'s internal flushing takes a quite amount of
time.

To mitigate this, add Abort() method. Abort() simply frees LzmaStream
structs. This will prevent any further opertaions WITHOUT ANY FLUSHING.
Users must dispose XZStream right after calling Abort().
  • Loading branch information
ied206 committed Aug 31, 2023
1 parent 61f9750 commit 83dcb43
Showing 3 changed files with 205 additions and 19 deletions.
146 changes: 146 additions & 0 deletions Joveler.Compression.XZ.Tests/XZStreamsTests.cs
Original file line number Diff line number Diff line change
@@ -457,5 +457,151 @@ private static void AutoDecompressTemplate(string lzmaFileName, string originFil
Assert.IsTrue(decompDigest.SequenceEqual(originDigest));
}
#endregion

#region Abort Compress
[TestMethod]
public void AbortCompress()
{
AbortCompressTemplate("A.pdf", -1);
AbortCompressTemplate("B.txt", -1);
AbortCompressTemplate("C.bin", -1);

AbortCompressTemplate("A.pdf", 1);
AbortCompressTemplate("B.txt", 2);
AbortCompressTemplate("C.bin", 2);
}

private static void AbortCompressTemplate(string sampleFileName, int threads)
{
XZCompressOptions compOpts = new XZCompressOptions
{
Level = LzmaCompLevel.Default,
};

string sampleFile = Path.Combine(TestSetup.SampleDir, sampleFileName);

using (FileStream sampleFs = new FileStream(sampleFile, FileMode.Open, FileAccess.Read, FileShare.Read))
using (MemoryStream rms = new MemoryStream())
{
XZStream xzs = null;
try
{
if (threads == -1)
{ // Single-thread compression
xzs = new XZStream(rms, compOpts);
}
else if (0 < threads)
{ // Multi-thread compression
XZThreadedCompressOptions threadOpts = new XZThreadedCompressOptions
{
Threads = threads,
};
xzs = new XZStream(rms, compOpts, threadOpts);
}
else
{
Assert.Fail($"threads [{threads}] is not a valid test value.");
}

sampleFs.CopyTo(xzs);

xzs.Abort();

// Internal xz resources are now freed. Every compress operation will fail.
sampleFs.Position = 0;
bool hadThrown = false;
try
{
sampleFs.CopyTo(xzs);
}
catch (XZException e)
{
Assert.AreEqual(LzmaRet.ProgError, e.ReturnCode);
hadThrown = true;
}
Assert.IsTrue(hadThrown);
}
finally
{
xzs?.Dispose();
xzs = null;
}
}
}
#endregion

#region Abort Decompress
[TestMethod]
public void AbortDecompress()
{
AbortDecompressTemplate("A.xz", -1);
AbortDecompressTemplate("B9.xz", -1);
AbortDecompressTemplate("C.xz", -1);

AbortDecompressTemplate("A_mt16.xz", 1);
AbortDecompressTemplate("B1_mt16.xz", 2);
AbortDecompressTemplate("C.xz", 2);
}

private static void AbortDecompressTemplate(string sampleFileName, int threads)
{
string xzFile = Path.Combine(TestSetup.SampleDir, sampleFileName);

XZDecompressOptions decompOpts = new XZDecompressOptions();

XZStream xzs = null;
try
{
using (FileStream compFs = new FileStream(xzFile, FileMode.Open, FileAccess.Read, FileShare.Read))
{
if (threads == -1)
{ // Single-thread compression
xzs = new XZStream(compFs, decompOpts);
}
else if (0 < threads)
{ // Multi-thread compression
XZThreadedDecompressOptions threadOpts = new XZThreadedDecompressOptions
{
Threads = threads,
};
xzs = new XZStream(compFs, decompOpts, threadOpts);
}
else
{
Assert.Fail($"threads [{threads}] is not a valid test value.");
}

long firstReadLen = compFs.Length / 2;
byte[] firstBuffer = new byte[firstReadLen];
int bytesRead = xzs.Read(firstBuffer, 0, firstBuffer.Length);

xzs.Abort();

// Internal xz resources are now freed. Every decompress operation will fail.
bool hadThrown = false;
try
{
byte[] buffer = new byte[64 * 1024];
do
{
bytesRead = xzs.Read(buffer, 0, buffer.Length);
} while (0 < bytesRead);
}
catch (XZException e)
{
Assert.AreEqual(LzmaRet.ProgError, e.ReturnCode);
hadThrown = true;
}
Assert.IsTrue(hadThrown);
}
}
finally
{
xzs?.Dispose();
xzs = null;
}
Assert.IsNull(xzs);
}
#endregion
}
}
58 changes: 49 additions & 9 deletions Joveler.Compression.XZ/XZStreams.cs
Original file line number Diff line number Diff line change
@@ -295,7 +295,8 @@ public class XZThreadedDecompressOptions
#region XZStreamBase
/// <inheritdoc />
/// <summary>
/// The stream to handle .xz file format.
/// The stream to handle xz-related file/stream format.
/// <para>This symbol can be changed anytime, consider this as not a part of public ABI!</para>
/// </summary>
public abstract class XZStreamBase : Stream
{
@@ -333,6 +334,8 @@ protected enum CoderFormat
private int _workBufPos = 0;
private readonly byte[] _workBuf;

private bool _isAborted = false;

// Property
public Stream BaseStream { get; private set; }
public long TotalIn { get; private set; } = 0;
@@ -556,17 +559,21 @@ protected override void Dispose(bool disposing)
{
if (_lzmaStream != null)
{
if (_mode == Mode.Compress)
if (_isAborted == false)
{
Flush();
FinishWrite();
}
else
{
_workBufPos = ReadDone;
if (_mode == Mode.Compress)
{
Flush();
FinishWrite();
}
else
{
_workBufPos = ReadDone;
}

FreeLzmaStream();
}

XZInit.Lib.LzmaEnd(_lzmaStream);
_lzmaStreamPin.Free();
_lzmaStream = null;
}
@@ -583,6 +590,35 @@ protected override void Dispose(bool disposing)
}
#endregion

#region LzmaStream management and Abort
private void FreeLzmaStream()
{
// lzma_end frees memory allocated for coder data structures.
// It must be called to avoid memory leak.
if (_lzmaStream != null)
{
XZInit.Lib.LzmaEnd(_lzmaStream);
}
}

/// <summary>
/// Immediately aborts the current operation.
/// Internal XZ resources will be freed without flushing nor finalizing.
/// <para>The instance will not be able to perform any operations except disposing.</para>
/// <para>Data written to the BaseStream will become invalid, dispose it immediately.</para>
/// </summary>
public void Abort()
{
// Invalidate LzmaStream instance.
// After running this code, liblzma will refuse any operations via this LzmaStream object.
if (_isAborted)
return;

FreeLzmaStream();
_isAborted = true;
}
#endregion

#region Stream Methods and Properties
/// <inheritdoc />
public override int Read(byte[] buffer, int offset, int count)
@@ -859,6 +895,8 @@ public double CompressionRatio
}
#endregion



#region GetProgress
/// <summary>
/// Get progress information of XZ stream.
@@ -888,6 +926,8 @@ public void GetProgress(out ulong progressIn, out ulong progressOut)
}
#endregion



#region Memory Usage (Decompression Only) - DISABLED
// lzma_memusage() only works on per-thread basis.
// It would not help users to perceive how many memory cap would needed on multi-threaded decompression.
20 changes: 10 additions & 10 deletions Joveler.Compression.ZLib/ZLibStreams.cs
Original file line number Diff line number Diff line change
@@ -84,12 +84,12 @@ public sealed class ZLibDecompressOptions
}
#endregion

#region DeflateBaseStream
#region DeflateStreamBase
/// <summary>
/// The stream which compress or decompress deflate stream format.
/// <para>This class can be changed anytime, does not rely on this as public stable ABI!</para>
/// The stream which compress or decompress zlib-related stream format.
/// <para>This symbol can be changed anytime, consider this as not a part of public ABI!</para>
/// </summary>
public abstract class DeflateBaseStream : Stream
public abstract class DeflateStreamBase : Stream
{
#region enum Mode, Format
internal enum Mode
@@ -145,7 +145,7 @@ AMD Ryzen 5 3600 / .NET Core 3.1.13 / Windows 10.0.19042 x64 / zlib 1.2.11
/// <summary>
/// Create compressing DeflateStream.
/// </summary>
protected DeflateBaseStream(Stream baseStream, ZLibCompressOptions compOpts, Format format)
protected DeflateStreamBase(Stream baseStream, ZLibCompressOptions compOpts, Format format)
{
ZLibInit.Manager.EnsureLoaded();

@@ -167,7 +167,7 @@ protected DeflateBaseStream(Stream baseStream, ZLibCompressOptions compOpts, For
ZLibException.CheckReturnValue(ret, _zs);
}

protected DeflateBaseStream(Stream baseStream, ZLibDecompressOptions decompOpts, Format format)
protected DeflateStreamBase(Stream baseStream, ZLibDecompressOptions decompOpts, Format format)
{
ZLibInit.Manager.EnsureLoaded();

@@ -205,7 +205,7 @@ protected DeflateBaseStream(Stream baseStream, ZLibDecompressOptions decompOpts,
#endregion

#region Disposable Pattern
~DeflateBaseStream()
~DeflateStreamBase()
{
Dispose(false);
}
@@ -506,7 +506,7 @@ private static void CheckMemLevel(ZLibMemLevel memLevel)
/// <summary>
/// The stream which compress or decompress deflate stream format.
/// </summary>
public sealed class DeflateStream : DeflateBaseStream
public sealed class DeflateStream : DeflateStreamBase
{
/// <summary>
/// Create compressing DeflateStream.
@@ -527,7 +527,7 @@ public DeflateStream(Stream baseStream, ZLibDecompressOptions decompOpts)
/// <summary>
/// The stream which compress or decompress zlib stream format.
/// </summary>
public sealed class ZLibStream : DeflateBaseStream
public sealed class ZLibStream : DeflateStreamBase
{
/// <summary>
/// Create compressing ZLibStream.
@@ -548,7 +548,7 @@ public ZLibStream(Stream baseStream, ZLibDecompressOptions decompOpts)
/// /// <summary>
/// The stream which compress or decompress gzip stream format.
/// </summary>
public sealed class GZipStream : DeflateBaseStream
public sealed class GZipStream : DeflateStreamBase
{
/// <summary>
/// Create compressing GZipStream.

0 comments on commit 83dcb43

Please sign in to comment.