Skip to content

Commit

Permalink
zip: finish stream async when async (#808)
Browse files Browse the repository at this point in the history
  • Loading branch information
piksel authored Dec 15, 2022
1 parent 77c5a97 commit cbb9e83
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ protected override void Dispose(bool disposing)
}
}

#if NETSTANDARD2_1_OR_GREATER
#if NETSTANDARD2_1 || NETCOREAPP3_0_OR_GREATER
/// <summary>
/// Calls <see cref="FinishAsync"/> and closes the underlying
/// stream when <see cref="IsStreamOwner"></see> is true.
Expand Down
53 changes: 36 additions & 17 deletions src/ICSharpCode.SharpZipLib/Zip/ZipOutputStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,8 @@ await baseOutputStream_.WriteProcToStreamAsync(s =>
/// </exception>
public void CloseEntry()
{
// Note: This method will run synchronously
FinishCompression(null).Wait();
WriteEntryFooter(baseOutputStream_);

// Patch the header if possible
Expand All @@ -564,9 +566,41 @@ public void CloseEntry()
curEntry = null;
}

private async Task FinishCompression(CancellationToken? ct)
{
// Compression handled externally
if (entryIsPassthrough) return;

// First finish the deflater, if appropriate
if (curMethod == CompressionMethod.Deflated)
{
if (size >= 0)
{
if (ct.HasValue) {
await base.FinishAsync(ct.Value).ConfigureAwait(false);
} else {
base.Finish();
}
}
else
{
deflater_.Reset();
}
}
if (curMethod == CompressionMethod.Stored)
{
// This is done by Finish() for Deflated entries, but we need to do it
// ourselves for Stored ones
base.GetAuthCodeIfAES();
}

return;
}

/// <inheritdoc cref="CloseEntry"/>
public async Task CloseEntryAsync(CancellationToken ct)
{
await FinishCompression(ct).ConfigureAwait(false);
await baseOutputStream_.WriteProcToStreamAsync(WriteEntryFooter, ct).ConfigureAwait(false);

// Patch the header if possible
Expand Down Expand Up @@ -600,24 +634,9 @@ internal void WriteEntryFooter(Stream stream)

long csize = size;

// First finish the deflater, if appropriate
if (curMethod == CompressionMethod.Deflated)
{
if (size >= 0)
{
base.Finish();
csize = deflater_.TotalOut;
}
else
{
deflater_.Reset();
}
}
else if (curMethod == CompressionMethod.Stored)
if (curMethod == CompressionMethod.Deflated && size >= 0)
{
// This is done by Finish() for Deflated entries, but we need to do it
// ourselves for Stored ones
base.GetAuthCodeIfAES();
csize = deflater_.TotalOut;
}

// Write the AES Authentication Code (a hash of the compressed and encrypted data)
Expand Down
72 changes: 72 additions & 0 deletions test/ICSharpCode.SharpZipLib.Tests/TestSupport/Streams.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace ICSharpCode.SharpZipLib.Tests.TestSupport
{
Expand Down Expand Up @@ -188,6 +189,77 @@ public override long Position

}

#if NETSTANDARD2_1 || NETCOREAPP3_0_OR_GREATER
/// <summary>
/// A <see cref="Stream"/> that does not support non-async operations.
/// </summary>
/// <remarks>
/// This could not be done by extending MemoryStream itself, since other instances of MemoryStream tries to us a faster (non-async) method of copying
/// if it detects that it's a (subclass of) MemoryStream.
/// </remarks>
public class MemoryStreamWithoutSync : Stream
{
MemoryStream _inner = new MemoryStream();

public override bool CanRead => _inner.CanRead;
public override bool CanSeek => _inner.CanSeek;
public override bool CanWrite => _inner.CanWrite;
public override long Length => _inner.Length;
public override long Position { get => _inner.Position; set => _inner.Position = value; }

public byte[] ToArray() => _inner.ToArray();

public override void Flush() => throw new NotSupportedException($"Non-async call to {nameof(Flush)}");


public override void CopyTo(Stream destination, int bufferSize) => throw new NotSupportedException($"Non-async call to {nameof(CopyTo)}");
public override void Write(ReadOnlySpan<byte> buffer) => throw new NotSupportedException($"Non-async call to {nameof(Write)}");
public override int Read(Span<byte> buffer) => throw new NotSupportedException($"Non-async call to {nameof(Read)}");

public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException($"Non-async call to {nameof(Write)}");
public override void WriteByte(byte value) => throw new NotSupportedException($"Non-async call to {nameof(Write)}");

public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException($"Non-async call to {nameof(Read)}");
public override int ReadByte() => throw new NotSupportedException($"Non-async call to {nameof(ReadByte)}");

// Even though our mock stream is writing synchronously, this should not fail the tests
public override async Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
var buf = new byte[bufferSize];
while(_inner.Read(buf, 0, bufferSize) > 0) {
await destination.WriteAsync(buf, cancellationToken);
}
}
public override Task FlushAsync(CancellationToken cancellationToken) => TaskFromBlocking(() => _inner.Flush());
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => TaskFromBlocking(() => _inner.Write(buffer, offset, count));
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => Task.FromResult(_inner.Read(buffer, offset, count));
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) => ValueTaskFromBlocking(() => _inner.Write(buffer.Span));
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) => ValueTask.FromResult(_inner.Read(buffer.Span));

static Task TaskFromBlocking(Action action)
{
action();
return Task.CompletedTask;
}

static ValueTask ValueTaskFromBlocking(Action action)
{
action();
return ValueTask.CompletedTask;
}

public override long Seek(long offset, SeekOrigin origin)
{
return _inner.Seek(offset, origin);
}

public override void SetLength(long value)
{
_inner.SetLength(value);
}
}
#endif

/// <summary>
/// A <see cref="Stream"/> that cannot be read but supports infinite writes.
/// </summary>
Expand Down
29 changes: 28 additions & 1 deletion test/ICSharpCode.SharpZipLib.Tests/Zip/ZipStreamAsyncTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class ZipStreamAsyncTests
[Category("Async")]
public async Task WriteZipStreamUsingAsync()
{
#if NETCOREAPP3_1_OR_GREATER
#if NETSTANDARD2_1 || NETCOREAPP3_0_OR_GREATER
await using var ms = new MemoryStream();

await using (var outStream = new ZipOutputStream(ms){IsStreamOwner = false})
Expand Down Expand Up @@ -121,5 +121,32 @@ public async Task WriteReadOnlyZipStreamAsync ()
ZipTesting.AssertValidZip(new MemoryStream(ms.ToArray()));
}

[Test]
[Category("Zip")]
[Category("Async")]
public async Task WriteZipStreamToAsyncOnlyStream ()
{
#if NETSTANDARD2_1 || NETCOREAPP3_0_OR_GREATER
await using(var ms = new MemoryStreamWithoutSync()){
await using(var outStream = new ZipOutputStream(ms) { IsStreamOwner = false })
{
await outStream.PutNextEntryAsync(new ZipEntry("FirstFile"));
await Utils.WriteDummyDataAsync(outStream, 12);

await outStream.PutNextEntryAsync(new ZipEntry("SecondFile"));
await Utils.WriteDummyDataAsync(outStream, 12);

await outStream.FinishAsync(CancellationToken.None);
await outStream.DisposeAsync();
}

ZipTesting.AssertValidZip(new MemoryStream(ms.ToArray()));
}
#else
await Task.CompletedTask;
Assert.Ignore("AsyncDispose is not supported");
#endif
}

}
}

0 comments on commit cbb9e83

Please sign in to comment.