Skip to content

Commit

Permalink
Stream leave open option
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Oct 17, 2023
1 parent 29ac88a commit 2ed63d9
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Buffers;
using System.Buffers;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
Expand Down
16 changes: 9 additions & 7 deletions src/NATS.Client.ObjectStore/NatsOBStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ internal NatsOBStore(NatsOBConfig config, NatsJSContext context, NatsJSStream st
public async ValueTask<byte[]> GetBytesAsync(string key, CancellationToken cancellationToken = default)
{
var memoryStream = new MemoryStream();
await GetAsync(key, memoryStream, cancellationToken).ConfigureAwait(false);
await GetAsync(key, memoryStream, cancellationToken: cancellationToken).ConfigureAwait(false);
return memoryStream.ToArray();
}

Expand All @@ -52,10 +52,11 @@ public async ValueTask<byte[]> GetBytesAsync(string key, CancellationToken cance
/// </summary>
/// <param name="key">Object key.</param>
/// <param name="stream">Stream to write the object value to.</param>
/// <param name="leaveOpen"><c>true</c> to not close the underlying stream when async method returns; otherwise, <c>false</c></param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Object metadata.</returns>
/// <exception cref="NatsOBException">Metadata didn't match the value retrieved e.g. the SHA digest.</exception>
public async ValueTask<ObjectMetadata> GetAsync(string key, Stream stream, CancellationToken cancellationToken = default)
public async ValueTask<ObjectMetadata> GetAsync(string key, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default)
{
ValidateObjectName(key);

Expand All @@ -76,7 +77,7 @@ public async ValueTask<ObjectMetadata> GetAsync(string key, Stream stream, Cance
var size = 0;
using (var sha256 = SHA256.Create())
{
await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Write))
await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Write, leaveOpen))
{
await foreach (var msg in pushConsumer.Msgs.ReadAllAsync(cancellationToken))
{
Expand Down Expand Up @@ -138,7 +139,7 @@ public async ValueTask<ObjectMetadata> GetAsync(string key, Stream stream, Cance
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Object metadata.</returns>
public ValueTask<ObjectMetadata> PutAsync(string key, byte[] value, CancellationToken cancellationToken = default) =>
PutAsync(new ObjectMetadata { Name = key }, new MemoryStream(value), cancellationToken);
PutAsync(new ObjectMetadata { Name = key }, new MemoryStream(value), cancellationToken: cancellationToken);

/// <summary>
/// Put an object by key.
Expand All @@ -150,18 +151,19 @@ public ValueTask<ObjectMetadata> PutAsync(string key, byte[] value, Cancellation
/// <exception cref="NatsOBException">There was an error calculating SHA digest.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
public ValueTask<ObjectMetadata> PutAsync(string key, Stream stream, CancellationToken cancellationToken = default) =>
PutAsync(new ObjectMetadata { Name = key }, stream, cancellationToken);
PutAsync(new ObjectMetadata { Name = key }, stream, cancellationToken: cancellationToken);

/// <summary>
/// Put an object by key.
/// </summary>
/// <param name="meta">Object metadata.</param>
/// <param name="stream">Stream to read the value from.</param>
/// <param name="leaveOpen"><c>true</c> to not close the underlying stream when async method returns; otherwise, <c>false</c></param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Object metadata.</returns>
/// <exception cref="NatsOBException">There was an error calculating SHA digest.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
public async ValueTask<ObjectMetadata> PutAsync(ObjectMetadata meta, Stream stream, CancellationToken cancellationToken = default)
public async ValueTask<ObjectMetadata> PutAsync(ObjectMetadata meta, Stream stream, bool leaveOpen = false, CancellationToken cancellationToken = default)
{
ValidateObjectName(meta.Name);

Expand Down Expand Up @@ -198,7 +200,7 @@ public async ValueTask<ObjectMetadata> PutAsync(ObjectMetadata meta, Stream stre
string digest;
using (var sha256 = SHA256.Create())
{
await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Read))
await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Read, leaveOpen))
{
while (true)
{
Expand Down
13 changes: 6 additions & 7 deletions tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ public class ObjectStoreTest

public ObjectStoreTest(ITestOutputHelper output) => _output = output;


[Fact]
public async Task Create_delete_object_store()
{
Expand Down Expand Up @@ -68,7 +67,7 @@ public async Task Put_chunks()
var buffer = Encoding.ASCII.GetBytes(buffer90);
var stream = new MemoryStream(buffer);

await store.PutAsync(meta, stream, cancellationToken);
await store.PutAsync(meta, stream, cancellationToken: cancellationToken);

var data = await store.GetInfoAsync("k1", cancellationToken: cancellationToken);

Expand All @@ -88,7 +87,7 @@ public async Task Put_chunks()
var buffer = Encoding.ASCII.GetBytes(buffer90 + "09-45");
var stream = new MemoryStream(buffer);

await store.PutAsync(meta, stream, cancellationToken);
await store.PutAsync(meta, stream, cancellationToken: cancellationToken);

var data = await store.GetInfoAsync("k2", cancellationToken: cancellationToken);

Expand Down Expand Up @@ -128,12 +127,12 @@ public async Task Get_chunks()
var meta = new ObjectMetadata { Name = "k1", Options = new Options { MaxChunkSize = 10 }, };
var buffer = Encoding.ASCII.GetBytes(buffer90);
var stream = new MemoryStream(buffer);
await store.PutAsync(meta, stream, cancellationToken);
await store.PutAsync(meta, stream, cancellationToken: cancellationToken);
}

{
var memoryStream = new MemoryStream();
await store.GetAsync("k1", memoryStream, cancellationToken);
await store.GetAsync("k1", memoryStream, cancellationToken: cancellationToken);
await memoryStream.FlushAsync(cancellationToken);
var buffer = memoryStream.ToArray();
Assert.Equal(buffer90, Encoding.ASCII.GetString(buffer));
Expand All @@ -144,12 +143,12 @@ public async Task Get_chunks()
var meta = new ObjectMetadata { Name = "k2", Options = new Options { MaxChunkSize = 10 }, };
var buffer = Encoding.ASCII.GetBytes(buffer90 + "09-45");
var stream = new MemoryStream(buffer);
await store.PutAsync(meta, stream, cancellationToken);
await store.PutAsync(meta, stream, cancellationToken: cancellationToken);
}

{
var memoryStream = new MemoryStream();
await store.GetAsync("k2", memoryStream, cancellationToken);
await store.GetAsync("k2", memoryStream, cancellationToken: cancellationToken);
await memoryStream.FlushAsync(cancellationToken);
var buffer = memoryStream.ToArray();
Assert.Equal(buffer90 + "09-45", Encoding.ASCII.GetString(buffer));
Expand Down

0 comments on commit 2ed63d9

Please sign in to comment.