Skip to content

Commit

Permalink
Cherry-pick relevant parts of 655fbdd (#1354)
Browse files Browse the repository at this point in the history
  • Loading branch information
varunpuranik authored Jun 18, 2019
1 parent 681a353 commit 95a657a
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,6 @@ public Task GetLogsStream(IList<(string id, ModuleLogOptions logOptions)> ids, F
return Task.WhenAll(streamingTasks);
}

internal static bool NeedToProcessStream(ModuleLogOptions logOptions) =>
logOptions.Filter.LogLevel.HasValue
|| logOptions.Filter.Regex.HasValue
|| logOptions.ContentEncoding != LogsContentEncoding.None
|| logOptions.ContentType != LogsContentType.Text
|| logOptions.OutputFraming != LogOutputFraming.None;

internal async Task GetLogsStreamInternal(string id, ModuleLogOptions logOptions, Func<ArraySegment<byte>, Task> callback, CancellationToken cancellationToken)
{
Preconditions.CheckNotNull(logOptions, nameof(logOptions));
Expand All @@ -69,46 +62,14 @@ internal async Task GetLogsStreamInternal(string id, ModuleLogOptions logOptions
Stream logsStream = await this.runtimeInfoProvider.GetModuleLogs(id, logOptions.Follow, logOptions.Filter.Tail, logOptions.Filter.Since, cancellationToken);
Events.ReceivedStream(id);

await (NeedToProcessStream(logOptions)
? this.logsProcessor.ProcessLogsStream(id, logsStream, logOptions, callback)
: this.WriteLogsStreamToOutput(id, callback, logsStream, cancellationToken));
await this.logsProcessor.ProcessLogsStream(id, logsStream, logOptions, callback);
}

static byte[] ProcessByContentEncoding(byte[] bytes, LogsContentEncoding contentEncoding) =>
contentEncoding == LogsContentEncoding.Gzip
? Compression.CompressToGzip(bytes)
: bytes;

async Task WriteLogsStreamToOutput(string id, Func<ArraySegment<byte>, Task> callback, Stream stream, CancellationToken cancellationToken)
{
var buf = new byte[1024];
try
{
while (true)
{
if (cancellationToken.IsCancellationRequested)
{
Events.StreamingCancelled(id);
break;
}

int count = await stream.ReadAsync(buf, 0, buf.Length, cancellationToken);
if (count == 0)
{
Events.StreamingCompleted(id);
break;
}

var arrSeg = new ArraySegment<byte>(buf, 0, count);
await callback(arrSeg);
}
}
catch (Exception ex)
{
Events.ErrorWhileProcessingStream(id, ex);
}
}

async Task<byte[]> GetProcessedLogs(string id, Stream logsStream, ModuleLogOptions logOptions)
{
byte[] logBytes = await this.ProcessByContentType(id, logsStream, logOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public LogsUploadRequest(
[DefaultValue(LogsContentType.Text)]
public LogsContentType ContentType { get; }

[JsonProperty("sasUrl")]
[JsonIgnore]
public string SasUrl { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Core.Requests
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Agent.Core.Logs;
using Microsoft.Azure.Devices.Edge.Storage;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -36,6 +37,7 @@ protected override async Task<Option<object>> HandleRequestInternal(Option<LogsU
Events.MismatchedMinorVersions(payload.SchemaVersion, ExpectedSchemaVersion);
}

Events.ProcessingRequest(payload);
ILogsRequestToOptionsMapper requestToOptionsMapper = new LogsRequestToOptionsMapper(
this.runtimeInfoProvider,
payload.Encoding,
Expand Down Expand Up @@ -70,13 +72,19 @@ static class Events

enum EventIds
{
MismatchedMinorVersions = IdStart
MismatchedMinorVersions = IdStart,
ProcessingRequest
}

public static void MismatchedMinorVersions(string payloadSchemaVersion, Version expectedSchemaVersion)
{
Log.LogWarning((int)EventIds.MismatchedMinorVersions, $"Logs upload request schema version {payloadSchemaVersion} does not match expected schema version {expectedSchemaVersion}. Some settings may not be supported.");
}

public static void ProcessingRequest(LogsUploadRequest payload)
{
Log.LogInformation((int)EventIds.ProcessingRequest, $"Processing request to upload logs for {payload.ToJson()}");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,10 @@ static class Events

enum EventIds
{
ScheduledModule = IdStart + 1,
HandlingRequest,
HandlingRequest = IdStart + 1,
ErrorHandlingRequest
}

public static void ScheduledModule(IRuntimeModule module, TimeSpan elapsedTime, TimeSpan coolOffPeriod)
{
TimeSpan timeLeft = coolOffPeriod - elapsedTime;
Log.LogInformation(
(int)EventIds.ScheduledModule,
$"Module '{module.Name}' scheduled to restart after {coolOffPeriod.Humanize()} ({timeLeft.Humanize()} left).");
}

public static void ErrorHandlingRequest(string request, Exception exception)
{
Log.LogWarning((int)EventIds.ErrorHandlingRequest, exception, $"Error handling request {request}");
Expand All @@ -104,7 +95,7 @@ public static void HandlingRequest(string request, string payloadJson)
(int)EventIds.HandlingRequest,
string.IsNullOrWhiteSpace(payloadJson)
? $"Received request {request}"
: $"Received request {request} with payload {payloadJson}");
: $"Received request {request} with payload");
}

public static void HandledRequest(string request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ Task Callback(ArraySegment<byte> bytes)

// Assert
Assert.NotEmpty(receivedBytes);
Assert.Equal(dockerLogsStreamBytes, receivedBytes);
Assert.Equal(string.Join(string.Empty, TestLogTexts).ToBytes(), receivedBytes);
}

[Fact]
Expand Down Expand Up @@ -404,12 +404,5 @@ Task Callback(ArraySegment<byte> bytes)

Assert.Equal(expectedTextLines, receivedText);
}

[Theory]
[MemberData(nameof(GetNeedToProcessStreamTestData))]
public void NeedToProcessStreamTest(ModuleLogOptions logOptions, bool expectedResult)
{
Assert.Equal(expectedResult, LogsProvider.NeedToProcessStream(logOptions));
}
}
}

0 comments on commit 95a657a

Please sign in to comment.