Skip to content

Commit

Permalink
Add support for filtering/processing log streams (#996)
Browse files Browse the repository at this point in the history
* Add filtering support for streaming logs

* Add LogsProvider tests

* Add LogsProcessor tests

* Fix tests
  • Loading branch information
varunpuranik authored Mar 28, 2019
1 parent 09ccdb9 commit 75d7460
Show file tree
Hide file tree
Showing 11 changed files with 304 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
Expand All @@ -10,5 +11,7 @@ public interface ILogsProcessor
Task<IReadOnlyList<ModuleLogMessage>> GetMessages(Stream stream, string moduleId, ModuleLogFilter filter);

Task<IReadOnlyList<string>> GetText(Stream stream, string moduleId, ModuleLogFilter filter);

Task ProcessLogsStream(Stream stream, ModuleLogOptions logOptions, Func<ArraySegment<byte>, Task> callback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs
using akka::Akka.IO;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Streams.IO;
using Microsoft.Azure.Devices.Edge.Storage;
using Microsoft.Azure.Devices.Edge.Util;

// Processes incoming logs stream and converts to the required format
Expand Down Expand Up @@ -83,6 +83,38 @@ IRunnableGraph<Task<IImmutableList<string>>> GetGraph()
return result;
}

public async Task ProcessLogsStream(Stream stream, ModuleLogOptions logOptions, Func<ArraySegment<byte>, Task> callback)
{
string id = logOptions.Id;
GraphBuilder graphBuilder = GraphBuilder.CreateParsingGraphBuilder(stream, b => this.logMessageParser.Parse(b, id));
logOptions.Filter.LogLevel.ForEach(l => graphBuilder.AddFilter(m => m.LogLevel == l));
logOptions.Filter.Regex.ForEach(r => graphBuilder.AddFilter(m => r.IsMatch(m.Text)));

async Task<bool> ConsumerCallback(ArraySegment<byte> a)
{
await callback(a);
return true;
}

ArraySegment<byte> BasicMapper(ModuleLogMessageData l)
=> logOptions.ContentType == LogsContentType.Text
? new ArraySegment<byte>(l.FullFrame.ToArray())
: new ArraySegment<byte>(l.ToBytes());

var mappers = new List<Func<ArraySegment<byte>, ArraySegment<byte>>>();
if (logOptions.ContentEncoding == LogsContentEncoding.Gzip)
{
mappers.Add(m => new ArraySegment<byte>(Compression.CompressToGzip(m.Array)));
}

IRunnableGraph<Task> graph = graphBuilder.GetStreamingGraph(
ConsumerCallback,
BasicMapper,
mappers);

await graph.Run(this.materializer);
}

public void Dispose()
{
this.system?.Dispose();
Expand All @@ -91,9 +123,9 @@ public void Dispose()

class GraphBuilder
{
Source<ModuleLogMessageData, Task<IOResult>> parsingGraphSource;
Source<ModuleLogMessageData, NotUsed> parsingGraphSource;

GraphBuilder(Source<ModuleLogMessageData, Task<IOResult>> parsingGraphSource)
GraphBuilder(Source<ModuleLogMessageData, NotUsed> parsingGraphSource)
{
this.parsingGraphSource = parsingGraphSource;
}
Expand All @@ -103,7 +135,8 @@ public static GraphBuilder CreateParsingGraphBuilder(Stream stream, Func<ByteStr
var source = StreamConverters.FromInputStream(() => stream);
var graph = source
.Via(FramingFlow)
.Select(parserFunc);
.Select(parserFunc)
.MapMaterializedValue(_ => NotUsed.Instance);
return new GraphBuilder(graph);
}

Expand Down Expand Up @@ -131,6 +164,23 @@ public IRunnableGraph<Task<IImmutableList<T>>> GetMaterializingGraph<T>(Func<Mod
.Select(mapper)
.ToMaterialized(seqSink, Keep.Right);
}

public IRunnableGraph<Task> GetStreamingGraph<TU, TV>(Func<TV, Task<TU>> callback, Func<ModuleLogMessageData, TV> basicMapper, IList<Func<TV, TV>> mappers)
{
Source<TV, NotUsed> streamingGraphSource = this.parsingGraphSource
.Select(basicMapper);

if (mappers?.Count > 0)
{
foreach (Func<TV, TV> mapper in mappers)
{
streamingGraphSource = streamingGraphSource.Select(mapper);
}
}

return streamingGraphSource.SelectAsync(1, callback)
.ToMaterialized(Sink.Ignore<TU>(), Keep.Right);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,20 @@ public async Task GetLogsStream(ModuleLogOptions logOptions, Func<ArraySegment<b
Preconditions.CheckNotNull(logOptions, nameof(logOptions));
Preconditions.CheckNotNull(callback, nameof(callback));

if (logOptions.ContentEncoding != LogsContentEncoding.None || logOptions.ContentType != LogsContentType.Text)
{
throw new NotImplementedException();
}

Stream logsStream = await this.runtimeInfoProvider.GetModuleLogs(logOptions.Id, true, Option.None<int>(), Option.None<int>(), cancellationToken);
Stream logsStream = await this.runtimeInfoProvider.GetModuleLogs(logOptions.Id, true, logOptions.Filter.Tail, logOptions.Filter.Since, cancellationToken);
Events.ReceivedStream(logOptions.Id);

await this.WriteLogsStreamToOutput(logOptions.Id, callback, logsStream, cancellationToken);
await (NeedToProcessStream(logOptions)
? this.logsProcessor.ProcessLogsStream(logsStream, logOptions, callback)
: this.WriteLogsStreamToOutput(logOptions.Id, callback, logsStream, cancellationToken));
}

internal static bool NeedToProcessStream(ModuleLogOptions logOptions) =>
logOptions.Filter.LogLevel.HasValue
|| logOptions.Filter.Regex.HasValue
|| logOptions.ContentEncoding != LogsContentEncoding.None
|| logOptions.ContentType != LogsContentType.Text;

static byte[] ProcessByContentEncoding(byte[] bytes, LogsContentEncoding contentEncoding) =>
contentEncoding == LogsContentEncoding.Gzip
? Compression.CompressToGzip(bytes)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs
{
using System;
using System.Text.RegularExpressions;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Edge.Util.Json;
using Newtonsoft.Json;

public class ModuleLogFilter
Expand All @@ -23,12 +25,20 @@ public ModuleLogFilter(Option<int> tail, Option<int> since, Option<int> logLevel

public static ModuleLogFilter Empty = new ModuleLogFilter(Option.None<int>(), Option.None<int>(), Option.None<int>(), Option.None<string>());

[JsonProperty("tail")]
[JsonConverter(typeof(OptionConverter<int>), true)]
public Option<int> Tail { get; }

[JsonProperty("since")]
[JsonConverter(typeof(OptionConverter<int>), true)]
public Option<int> Since { get; }

[JsonProperty("loglevel")]
[JsonConverter(typeof(OptionConverter<int>), true)]
public Option<int> LogLevel { get; }

[JsonProperty("regex")]
[JsonConverter(typeof(OptionConverter<Regex>))]
public Option<Regex> Regex { get; }
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs
{
extern alias akka;
using System;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Edge.Util.Json;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,37 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub.Stream
{
using System.ComponentModel;
using Microsoft.Azure.Devices.Edge.Agent.Core.Logs;
using Microsoft.Azure.Devices.Edge.Util;
using Newtonsoft.Json;

public class LogsStreamRequest
{
public LogsStreamRequest(string schemaVersion, string id)
public LogsStreamRequest(string schemaVersion, string id, LogsContentEncoding encoding, LogsContentType contentType, ModuleLogFilter filter)
{
this.SchemaVersion = schemaVersion;
this.Id = id;
this.Id = Preconditions.CheckNonWhiteSpace(id, nameof(id));
this.Filter = filter ?? ModuleLogFilter.Empty;
this.Encoding = encoding;
this.ContentType = contentType;
}

[JsonProperty("schemaVersion")]
public string SchemaVersion { get; }

[JsonProperty("id")]
public string Id { get; }

[JsonProperty("encoding", DefaultValueHandling = DefaultValueHandling.Populate)]
[DefaultValue(LogsContentEncoding.None)]
public LogsContentEncoding Encoding { get; }

[JsonProperty("contentType", DefaultValueHandling = DefaultValueHandling.Populate)]
[DefaultValue(LogsContentType.Text)]
public LogsContentType ContentType { get; }

[JsonProperty("filter")]
public ModuleLogFilter Filter { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,28 @@ public async Task Handle(IClientWebSocket clientWebSocket, CancellationToken can
LogsStreamRequest streamRequest = await this.ReadLogsStreamingRequest(clientWebSocket, cancellationToken);
Events.RequestData(streamRequest);

var logOptions = new ModuleLogOptions(streamRequest.Id, LogsContentEncoding.None, LogsContentType.Text, ModuleLogFilter.Empty);
using (var socketCancellationTokenSource = new CancellationTokenSource())
var logOptions = new ModuleLogOptions(streamRequest.Id, streamRequest.Encoding, streamRequest.ContentType, streamRequest.Filter);
var socketCancellationTokenSource = new CancellationTokenSource();

Task ProcessLogsFrame(ArraySegment<byte> bytes)
{
Task ProcessLogsFrame(ArraySegment<byte> bytes)
if (clientWebSocket.State != WebSocketState.Open)
{
if (clientWebSocket.State != WebSocketState.Open)
{
Events.WebSocketNotOpen(streamRequest.Id, clientWebSocket.State);
socketCancellationTokenSource.Cancel();
return Task.CompletedTask;
}
else
{
return clientWebSocket.SendAsync(bytes, WebSocketMessageType.Binary, true, cancellationToken);
}
Events.WebSocketNotOpen(streamRequest.Id, clientWebSocket.State);
socketCancellationTokenSource.Cancel();
return Task.CompletedTask;
}

using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, socketCancellationTokenSource.Token))
else
{
await this.logsProvider.GetLogsStream(logOptions, ProcessLogsFrame, linkedCts.Token);
return clientWebSocket.SendAsync(bytes, WebSocketMessageType.Binary, true, cancellationToken);
}
}

using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, socketCancellationTokenSource.Token))
{
await this.logsProvider.GetLogsStream(logOptions, ProcessLogsFrame, linkedCts.Token);
}

Events.StreamingCompleted(streamRequest.Id);
}
catch (Exception e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Core.Test.Logs
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Edge.Agent.Core.Logs;
using Microsoft.Azure.Devices.Edge.Storage;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Edge.Util.Test.Common;
using Xunit;
Expand Down Expand Up @@ -278,5 +279,114 @@ public async Task GetMessagesWithMultipleFiltersTest(int logLevel, string regex,
Assert.Contains(logMessage.Text, expectedText, StringComparison.OrdinalIgnoreCase);
}
}

[Fact]
public async Task ProcessStreamTest()
{
// Arrange
string iotHub = "foo.azure-devices.net";
string deviceId = "dev1";
string moduleId = "mod1";
int logLevel = 6;
string regex = "Starting";
var logMessageParser = new LogMessageParser(iotHub, deviceId);
var logsProcessor = new LogsProcessor(logMessageParser);
var stream = new MemoryStream(DockerFraming.Frame(TestLogTexts));
var filter = new ModuleLogFilter(Option.None<int>(), Option.None<int>(), Option.Some(logLevel), Option.Some(regex));
var logOptions = new ModuleLogOptions(moduleId, LogsContentEncoding.None, LogsContentType.Text, filter);

var receivedBytes = new List<byte>();

Task Callback(ArraySegment<byte> bytes)
{
receivedBytes.AddRange(bytes.ToArray());
return Task.CompletedTask;
}

// Act
await logsProcessor.ProcessLogsStream(stream, logOptions, Callback);
await Task.Delay(TimeSpan.FromSeconds(5));

// Assert
Assert.NotEmpty(receivedBytes);
string receivedText = receivedBytes
.Skip(8)
.ToArray()
.FromBytes();
Assert.Equal(TestLogTexts[0], receivedText);
}

[Fact]
public async Task ProcessStreamToMessageTest()
{
// Arrange
string iotHub = "foo.azure-devices.net";
string deviceId = "dev1";
string moduleId = "mod1";
int logLevel = 6;
string regex = "Starting";
var logMessageParser = new LogMessageParser(iotHub, deviceId);
var logsProcessor = new LogsProcessor(logMessageParser);
var stream = new MemoryStream(DockerFraming.Frame(TestLogTexts));
var filter = new ModuleLogFilter(Option.None<int>(), Option.None<int>(), Option.Some(logLevel), Option.Some(regex));
var logOptions = new ModuleLogOptions(moduleId, LogsContentEncoding.None, LogsContentType.Json, filter);

var receivedBytes = new List<byte>();

Task Callback(ArraySegment<byte> bytes)
{
receivedBytes.AddRange(bytes.ToArray());
return Task.CompletedTask;
}

// Act
await logsProcessor.ProcessLogsStream(stream, logOptions, Callback);
await Task.Delay(TimeSpan.FromSeconds(5));

// Assert
Assert.NotEmpty(receivedBytes);
var logMessage = receivedBytes.ToArray().FromBytes<ModuleLogMessage>();
Assert.Equal(iotHub, logMessage.IoTHub);
Assert.Equal(deviceId, logMessage.DeviceId);
Assert.Equal(moduleId, logMessage.ModuleId);
Assert.Equal(6, logMessage.LogLevel);
Assert.Contains(logMessage.Text, TestLogTexts[0]);
}

[Fact]
public async Task ProcessStreamWithGzipTest()
{
// Arrange
string iotHub = "foo.azure-devices.net";
string deviceId = "dev1";
string moduleId = "mod1";
int logLevel = 6;
string regex = "Starting";
var logMessageParser = new LogMessageParser(iotHub, deviceId);
var logsProcessor = new LogsProcessor(logMessageParser);
var stream = new MemoryStream(DockerFraming.Frame(TestLogTexts));
var filter = new ModuleLogFilter(Option.None<int>(), Option.None<int>(), Option.Some(logLevel), Option.Some(regex));
var logOptions = new ModuleLogOptions(moduleId, LogsContentEncoding.Gzip, LogsContentType.Text, filter);

var receivedBytes = new List<byte>();

Task Callback(ArraySegment<byte> bytes)
{
receivedBytes.AddRange(bytes.ToArray());
return Task.CompletedTask;
}

// Act
await logsProcessor.ProcessLogsStream(stream, logOptions, Callback);
await Task.Delay(TimeSpan.FromSeconds(5));

// Assert
Assert.NotEmpty(receivedBytes);
string receivedText = Compression.DecompressFromGzip(receivedBytes.ToArray())
.Skip(8)
.ToArray()
.FromBytes();
Assert.Equal(TestLogTexts[0], receivedText);
}
}
}
Loading

0 comments on commit 75d7460

Please sign in to comment.