Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically flush trace data to Azure storage. #679

Merged
merged 2 commits into from
Aug 4, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Diagnostics.Tools.Monitor.Egress.AzureBlob
{
partial class AzureBlobEgressProvider
{
/// <summary>
/// Automatically flushes the stream after a certain amount of bytes have been written.
/// </summary>
private sealed class AutoFlushStream : Stream
{
private Stream _baseStream;
wiktork marked this conversation as resolved.
Show resolved Hide resolved
private long _written;

public long FlushSize { get; }
wiktork marked this conversation as resolved.
Show resolved Hide resolved

public AutoFlushStream(Stream stream, long flushSize)
{
FlushSize = flushSize >= 0 ? flushSize : throw new ArgumentOutOfRangeException(nameof(flushSize));
_baseStream = stream ?? throw new ArgumentNullException(nameof(stream));
}

public override bool CanRead => _baseStream.CanRead;
public override bool CanSeek => _baseStream.CanSeek;
public override bool CanWrite => _baseStream.CanWrite;
public override long Length => _baseStream.Length;
public override bool CanTimeout => _baseStream.CanTimeout;
public override int WriteTimeout { get => _baseStream.WriteTimeout; set => _baseStream.WriteTimeout = value; }
public override int ReadTimeout { get => _baseStream.ReadTimeout; set => _baseStream.ReadTimeout = value; }
public override long Position { get => _baseStream.Position; set => _baseStream.Position = value; }
public override int Read(byte[] buffer, int offset, int count) => _baseStream.Read(buffer, offset, count);
public override long Seek(long offset, SeekOrigin origin) => _baseStream.Seek(offset, origin);
public override void SetLength(long value) => _baseStream.SetLength(value);
public override void Close() => _baseStream.Close();
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) =>
_baseStream.CopyToAsync(destination, bufferSize, cancellationToken);
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
_baseStream.ReadAsync(buffer, offset, count, cancellationToken);
public override int ReadByte() => _baseStream.ReadByte();

//CONSIDER These are not really used, but should still autoflush.
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) =>
_baseStream.BeginRead(buffer, offset, count, callback, state);
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state) =>
_baseStream.BeginWrite(buffer, offset, count, callback, state);
public override int EndRead(IAsyncResult asyncResult) => _baseStream.EndRead(asyncResult);
public override void EndWrite(IAsyncResult asyncResult) => _baseStream.EndWrite(asyncResult);

public override void Write(byte[] buffer, int offset, int count)
wiktork marked this conversation as resolved.
Show resolved Hide resolved
{
_baseStream.Write(buffer, offset, count);
ProcessWrite(count);
}

public override void Flush()
{
_baseStream.Flush();
_written = 0;
}

private void ProcessWrite(int count)
{
_written += count;
if (_written >= FlushSize)
{
Flush();
}
}

private async Task ProcessWriteAsync(int count, CancellationToken token)
wiktork marked this conversation as resolved.
Show resolved Hide resolved
{
_written += count;
if (_written >= FlushSize)
{
await FlushAsync(token);
wiktork marked this conversation as resolved.
Show resolved Hide resolved
}
}

public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
await _baseStream.WriteAsync(buffer, offset, count, cancellationToken);
await ProcessWriteAsync(count, cancellationToken);
}

public override async Task FlushAsync(CancellationToken cancellationToken)
{
await _baseStream.FlushAsync(cancellationToken);
_written = 0;
}

public override void WriteByte(byte value)
{
_baseStream.WriteByte(value);
ProcessWrite(1);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ namespace Microsoft.Diagnostics.Tools.Monitor.Egress.AzureBlob
/// <remarks>
/// Blobs created through this provider will overwrite existing blobs if they have the same blob name.
/// </remarks>
internal class AzureBlobEgressProvider :
internal partial class AzureBlobEgressProvider :
EgressProvider<AzureBlobEgressProviderOptions>
{
private int BlobStorageBufferSize = 4 * 1024 * 1024;

public AzureBlobEgressProvider(ILogger<AzureBlobEgressProvider> logger)
: base(logger)
{
Expand Down Expand Up @@ -75,12 +77,24 @@ public override async Task<string> EgressAsync(
BlockBlobClient blobClient = containerClient.GetBlockBlobClient(GetBlobName(options, artifactSettings));

// Write blob content
using (Stream blobStream = await blobClient.OpenWriteAsync(overwrite: true, cancellationToken: token))

var bloboptions = new BlockBlobOpenWriteOptions
{
BufferSize = BlobStorageBufferSize,
};
using (Stream blobStream = await blobClient.OpenWriteAsync(overwrite: true, options: bloboptions, cancellationToken: token))
using (AutoFlushStream flushStream = new AutoFlushStream(blobStream, BlobStorageBufferSize))
{
//Azure's stream from OpenWriteAsync will do the following
//1. Write the data to a local buffer
//2. Once that buffer is full, stage the data remotely (this data is not considered valid yet)
//3. After 4Gi of data has been staged, the data will be commited. This can be forced earlier by flushing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//3. After 4Gi of data has been staged, the data will be commited. This can be forced earlier by flushing
//3. After 4GiB of data has been staged, the data will be commited. This can be forced earlier by flushing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been using k8 nomenclature here: Mi, Gi, etc.

//the stream.
// Since we want the data to be readily available, we automatically flush (and therefore commit) every time we fill up the buffer.
Logger?.EgressProviderInvokeStreamAction(EgressProviderTypes.AzureBlobStorage);
await action(blobStream, token);
await action(flushStream, token);

await blobStream.FlushAsync(token);
await flushStream.FlushAsync(token);
}

// Write blob headers
Expand Down