Skip to content

Commit

Permalink
Automatically flush trace data to Azure storage. (#679)
Browse files Browse the repository at this point in the history
* Automatically flush trace data to Azure storage.

* PR feedback
  • Loading branch information
wiktork authored Aug 4, 2021
1 parent bf4956e commit 5124ce5
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Diagnostics.Tools.Monitor.Egress.AzureBlob
{
partial class AzureBlobEgressProvider
{
/// <summary>
/// Automatically flushes the stream after a certain amount of bytes have been written.
/// </summary>
private sealed class AutoFlushStream : Stream
{
private readonly Stream _baseStream;
private readonly long _flushSize;
private long _written;

public AutoFlushStream(Stream stream, long flushSize)
{
_flushSize = flushSize >= 0 ? flushSize : throw new ArgumentOutOfRangeException(nameof(flushSize));
_baseStream = stream ?? throw new ArgumentNullException(nameof(stream));
}

public override bool CanRead => _baseStream.CanRead;
public override bool CanSeek => _baseStream.CanSeek;
public override bool CanWrite => _baseStream.CanWrite;
public override long Length => _baseStream.Length;
public override bool CanTimeout => _baseStream.CanTimeout;
public override int WriteTimeout { get => _baseStream.WriteTimeout; set => _baseStream.WriteTimeout = value; }
public override int ReadTimeout { get => _baseStream.ReadTimeout; set => _baseStream.ReadTimeout = value; }
public override long Position { get => _baseStream.Position; set => _baseStream.Position = value; }
public override int Read(byte[] buffer, int offset, int count) => _baseStream.Read(buffer, offset, count);
public override long Seek(long offset, SeekOrigin origin) => _baseStream.Seek(offset, origin);
public override void SetLength(long value) => _baseStream.SetLength(value);
public override void Close() => _baseStream.Close();
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) =>
_baseStream.CopyToAsync(destination, bufferSize, cancellationToken);
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
_baseStream.ReadAsync(buffer, offset, count, cancellationToken);
public override int ReadByte() => _baseStream.ReadByte();

//CONSIDER These are not really used, but should still autoflush.
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) =>
_baseStream.BeginRead(buffer, offset, count, callback, state);
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) =>
_baseStream.BeginWrite(buffer, offset, count, callback, state);
public override int EndRead(IAsyncResult asyncResult) => _baseStream.EndRead(asyncResult);
public override void EndWrite(IAsyncResult asyncResult) => _baseStream.EndWrite(asyncResult);

public override void Write(byte[] buffer, int offset, int count)
{
_baseStream.Write(buffer, offset, count);
ProcessWrite(count);
}

public override void WriteByte(byte value)
{
_baseStream.WriteByte(value);
ProcessWrite(1);
}

public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await _baseStream.WriteAsync(buffer, offset, count, cancellationToken);
await ProcessWriteAsync(count, cancellationToken);
}

public override void Flush()
{
_baseStream.Flush();
_written = 0;
}

public override async Task FlushAsync(CancellationToken cancellationToken)
{
await _baseStream.FlushAsync(cancellationToken);
_written = 0;
}

private void ProcessWrite(int count)
{
_written += count;
if (_written >= _flushSize)
{
Flush();
}
}

private Task ProcessWriteAsync(int count, CancellationToken cancellationToken)
{
_written += count;
if (_written >= _flushSize)
{
return FlushAsync(cancellationToken);
}
return Task.CompletedTask;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ namespace Microsoft.Diagnostics.Tools.Monitor.Egress.AzureBlob
/// <remarks>
/// Blobs created through this provider will overwrite existing blobs if they have the same blob name.
/// </remarks>
internal class AzureBlobEgressProvider :
internal partial class AzureBlobEgressProvider :
EgressProvider<AzureBlobEgressProviderOptions>
{
private int BlobStorageBufferSize = 4 * 1024 * 1024;

public AzureBlobEgressProvider(ILogger<AzureBlobEgressProvider> logger)
: base(logger)
{
Expand Down Expand Up @@ -75,12 +77,24 @@ public override async Task<string> EgressAsync(
BlockBlobClient blobClient = containerClient.GetBlockBlobClient(GetBlobName(options, artifactSettings));

// Write blob content
using (Stream blobStream = await blobClient.OpenWriteAsync(overwrite: true, cancellationToken: token))

var bloboptions = new BlockBlobOpenWriteOptions
{
BufferSize = BlobStorageBufferSize,
};
using (Stream blobStream = await blobClient.OpenWriteAsync(overwrite: true, options: bloboptions, cancellationToken: token))
using (AutoFlushStream flushStream = new AutoFlushStream(blobStream, BlobStorageBufferSize))
{
//Azure's stream from OpenWriteAsync will do the following
//1. Write the data to a local buffer
//2. Once that buffer is full, stage the data remotely (this data is not considered valid yet)
//3. After 4Gi of data has been staged, the data will be commited. This can be forced earlier by flushing
//the stream.
// Since we want the data to be readily available, we automatically flush (and therefore commit) every time we fill up the buffer.
Logger?.EgressProviderInvokeStreamAction(EgressProviderTypes.AzureBlobStorage);
await action(blobStream, token);
await action(flushStream, token);

await blobStream.FlushAsync(token);
await flushStream.FlushAsync(token);
}

// Write blob headers
Expand Down

0 comments on commit 5124ce5

Please sign in to comment.