Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Egress] Clean-Up For CopyBufferSize #4086

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion documentation/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -2579,7 +2579,9 @@
"null"
],
"description": "Buffer size used when copying data from an egress callback returning a stream to the egress callback that is provided a stream to which data is written.",
"format": "int32"
"format": "int32",
"maximum": 2147483647.0,
"minimum": 1.0
},
"QueueName": {
"type": [
Expand Down
6 changes: 3 additions & 3 deletions src/Extensions/AzureBlobStorage/AzureBlobEgressProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace Microsoft.Diagnostics.Monitoring.AzureBlobStorage
/// </remarks>
internal partial class AzureBlobEgressProvider : EgressProvider<AzureBlobEgressProviderOptions>
{
private int BlobStorageBufferSize = 4 * 1024 * 1024;
private int DefaultBlobStorageBufferSize = 4 * 1024 * 1024;

public AzureBlobEgressProvider(ILogger logger) : base(logger)
{
Expand All @@ -49,10 +49,10 @@ public override async Task<string> EgressAsync(

var bloboptions = new BlockBlobOpenWriteOptions
{
BufferSize = BlobStorageBufferSize,
BufferSize = options.CopyBufferSize.GetValueOrDefault(DefaultBlobStorageBufferSize),
kkeirstead marked this conversation as resolved.
Show resolved Hide resolved
};
using (Stream blobStream = await blobClient.OpenWriteAsync(overwrite: true, options: bloboptions, cancellationToken: token))
using (AutoFlushStream flushStream = new AutoFlushStream(blobStream, BlobStorageBufferSize))
using (AutoFlushStream flushStream = new AutoFlushStream(blobStream, bloboptions.BufferSize.Value))
{
//Azure's stream from OpenWriteAsync will do the following
//1. Write the data to a local buffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ internal sealed partial class AzureBlobEgressProviderOptions
[Display(
ResourceType = typeof(OptionsDisplayStrings),
Description = nameof(OptionsDisplayStrings.DisplayAttributeDescription_CommonEgressProviderOptions_CopyBufferSize))]
[Range(1, int.MaxValue)]
kkeirstead marked this conversation as resolved.
Show resolved Hide resolved
public int? CopyBufferSize { get; set; }

[Display(
Expand Down
4 changes: 2 additions & 2 deletions src/Extensions/S3Storage/MultiPartUploadStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ internal sealed class MultiPartUploadStream : Stream
public const int MinimumSize = 5 * 1024 * 1024; // the minimum size of an upload part (except for the last part)
private readonly int _bufferSize;

public MultiPartUploadStream(IS3Storage client, string bucketName, string objectKey, string uploadId, int bufferSize)
public MultiPartUploadStream(IS3Storage client, string bucketName, string objectKey, string uploadId, int? bufferSize)
{
_bufferSize = Math.Max(bufferSize, MinimumSize); // has to be at least the minimum
_bufferSize = Math.Max(bufferSize ?? 0, MinimumSize);
_buffer = ArrayPool<byte>.Shared.Rent(_bufferSize);
_offset = 0;
_client = client;
Expand Down
3 changes: 1 addition & 2 deletions src/Extensions/S3Storage/S3StorageEgressProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ public override async Task<string> EgressAsync(
{
client = await ClientFactory.CreateAsync(options, artifactSettings, token);
uploadId = await client.InitMultiPartUploadAsync(artifactSettings.Metadata, token);
int copyBufferSize = options.CopyBufferSize.GetValueOrDefault(0x100000);
kkeirstead marked this conversation as resolved.
Show resolved Hide resolved
await using var stream = new MultiPartUploadStream(client, options.BucketName, artifactSettings.Name, uploadId, copyBufferSize);
await using var stream = new MultiPartUploadStream(client, options.BucketName, artifactSettings.Name, uploadId, options.CopyBufferSize);
_logger.EgressProviderInvokeStreamAction(Constants.S3StorageProviderName);
await action(stream, token);
await stream.FinalizeAsync(token); // force to push the last part
Expand Down
9 changes: 0 additions & 9 deletions src/Microsoft.Diagnostics.Monitoring.WebApi/IEgressService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,6 @@ internal interface IEgressService
{
void ValidateProvider(string providerName);

Task<EgressResult> EgressAsync(
string providerName,
Func<CancellationToken, Task<Stream>> action,
string fileName,
string contentType,
IEndpointInfo source,
CollectionRuleMetadata collectionRuleMetadata,
CancellationToken token);

Task<EgressResult> EgressAsync(
string providerName,
Func<Stream, CancellationToken, Task> action,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,6 @@ internal class EgressOperation : IEgressOperation

private readonly IArtifactOperation _operation;


public EgressOperation(Func<CancellationToken, Task<Stream>> action, string endpointName, string artifactName, IProcessInfo processInfo, string contentType, KeyValueLogScope scope, string tags, CollectionRuleMetadata collectionRuleMetadata = null)
{
_egress = (service, token) => service.EgressAsync(endpointName, action, artifactName, contentType, processInfo.EndpointInfo, collectionRuleMetadata, token);
_scope = scope;

EgressProviderName = endpointName;
ProcessInfo = new EgressProcessInfo(processInfo.ProcessName, processInfo.EndpointInfo.ProcessId, processInfo.EndpointInfo.RuntimeInstanceCookie);
Tags = Utilities.SplitTags(tags);
}

public EgressOperation(Func<Stream, CancellationToken, Task> action, string endpointName, string artifactName, IProcessInfo processInfo, string contentType, KeyValueLogScope scope, string tags, CollectionRuleMetadata collectionRuleMetadata = null)
{
_egress = (service, token) => service.EgressAsync(endpointName, action, artifactName, contentType, processInfo.EndpointInfo, collectionRuleMetadata, token);
Expand All @@ -57,13 +46,6 @@ public EgressOperation(Func<Stream, CancellationToken, Task> action, string endp
_scope = scope;
}

public EgressOperation(Func<CancellationToken, Task<Stream>> action, string endpointName, string artifactName, IEndpointInfo source, string contentType, KeyValueLogScope scope, CollectionRuleMetadata collectionRuleMetadata)
{
_egress = (service, token) => service.EgressAsync(endpointName, action, artifactName, contentType, source, collectionRuleMetadata, token);
EgressProviderName = endpointName;
_scope = scope;
}

public async Task<ExecutionResult<EgressResult>> ExecuteAsync(IServiceProvider serviceProvider, CancellationToken token)
{
ILogger<EgressOperation> logger = serviceProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using Microsoft.Diagnostics.Tools.Monitor.Egress;
using System;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -14,15 +13,6 @@ internal sealed class PipeEgressProvider : IEgressProvider<PipeEgressOptions>
{
public const string Name = "Pipe";

public async Task<string> EgressAsync(string providerType, string providerName, PipeEgressOptions options, Func<CancellationToken, Task<Stream>> action, EgressArtifactSettings artifactSettings, CancellationToken token)
{
using Stream stream = await action(token);

await stream.CopyToAsync(options.Writer, token);

return null;
}

public async Task<string> EgressAsync(string providerType, string providerName, PipeEgressOptions options, Func<Stream, CancellationToken, Task> action, EgressArtifactSettings artifactSettings, CancellationToken token)
{
using Stream stream = options.Writer.AsStream();
Expand Down
41 changes: 0 additions & 41 deletions src/Tools/dotnet-monitor/Egress/EgressProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,47 +44,6 @@ protected EgressProvider(ILogger logger)
Logger = logger;
}

/// <summary>
/// Egress a stream via a callback by returning the stream from the callback.
/// </summary>
/// <param name="options">Described to where stream data should be egressed.</param>
/// <param name="action">Callback that is invoked in order to get the stream to be egressed.</param>
/// <param name="artifactSettings">Describes data about the artifact, such as file name and content type.</param>
/// <param name="token">The token to monitor for cancellation requests.</param>
/// <returns>A task that completes with a value of the identifier of the egress result. Typically,
/// this is a path to access the stream without any information indicating whether any particular
/// user has access to it (e.g. no file system permissions or SAS tokens).</returns>
public virtual Task<string> EgressAsync(
string providerType,
string providerName,
TOptions options,
Func<CancellationToken, Task<Stream>> action,
EgressArtifactSettings artifactSettings,
CancellationToken token)
{
Func<Stream, CancellationToken, Task> wrappingAction = async (targetStream, token) =>
{
using var sourceStream = await action(token);

int copyBufferSize = options.CopyBufferSize.GetValueOrDefault(0x100000);
kkeirstead marked this conversation as resolved.
Show resolved Hide resolved

Logger?.EgressCopyActionStreamToEgressStream(copyBufferSize);

await sourceStream.CopyToAsync(
targetStream,
copyBufferSize,
token);
};

return EgressAsync(
providerType,
providerName,
options,
wrappingAction,
artifactSettings,
token);
}

/// <summary>
/// Egress a stream via a callback by writing to the provided stream.
/// </summary>
Expand Down
17 changes: 0 additions & 17 deletions src/Tools/dotnet-monitor/Egress/EgressProviderInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,6 @@ public EgressProviderInternal(
_monitor = monitor;
}

/// <inheritdoc/>
public Task<string> EgressAsync(
string providerType,
string providerName,
Func<CancellationToken, Task<Stream>> action,
EgressArtifactSettings artifactSettings,
CancellationToken token)
{
return _provider.EgressAsync(
providerType,
providerName,
GetOptions(providerName),
action,
artifactSettings,
token);
}

/// <inheritdoc/>
public Task<string> EgressAsync(
string providerType,
Expand Down
15 changes: 0 additions & 15 deletions src/Tools/dotnet-monitor/Egress/EgressService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,6 @@ public void ValidateProvider(string providerName)
string providerType = GetProviderType(providerName);
}

public async Task<EgressResult> EgressAsync(string providerName, Func<CancellationToken, Task<Stream>> action, string fileName, string contentType, IEndpointInfo source, CollectionRuleMetadata collectionRuleMetadata, CancellationToken token)
{
string providerType = GetProviderType(providerName);
IEgressProviderInternal provider = GetProvider(providerType);

string value = await provider.EgressAsync(
providerType,
providerName,
action,
await CreateSettings(source, fileName, contentType, collectionRuleMetadata, token),
token);

return new EgressResult(value);
}

public async Task<EgressResult> EgressAsync(string providerName, Func<Stream, CancellationToken, Task> action, string fileName, string contentType, IEndpointInfo source, CollectionRuleMetadata collectionRuleMetadata, CancellationToken token)
{
string providerType = GetProviderType(providerName);
Expand Down
8 changes: 0 additions & 8 deletions src/Tools/dotnet-monitor/Egress/IEgressProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,6 @@ namespace Microsoft.Diagnostics.Tools.Monitor.Egress
{
internal interface IEgressProvider<TOptions>
{
Task<string> EgressAsync(
string providerType,
string providerName,
TOptions options,
Func<CancellationToken, Task<Stream>> action,
EgressArtifactSettings artifactSettings,
CancellationToken token);

Task<string> EgressAsync(
string providerType,
string providerName,
Expand Down
7 changes: 0 additions & 7 deletions src/Tools/dotnet-monitor/Egress/IEgressProviderInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,6 @@ namespace Microsoft.Diagnostics.Tools.Monitor.Egress
{
internal interface IEgressProviderInternal
{
Task<string> EgressAsync(
string providerType,
string providerName,
Func<CancellationToken, Task<Stream>> action,
EgressArtifactSettings artifactSettings,
CancellationToken token);

Task<string> EgressAsync(
string providerType,
string providerName,
Expand Down