diff --git a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ILogsProcessor.cs b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ILogsProcessor.cs index efe253d17b6..d0337a6511b 100644 --- a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ILogsProcessor.cs +++ b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ILogsProcessor.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft. All rights reserved. namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs { + using System; using System.Collections.Generic; using System.IO; using System.Threading.Tasks; @@ -10,5 +11,7 @@ public interface ILogsProcessor Task> GetMessages(Stream stream, string moduleId, ModuleLogFilter filter); Task> GetText(Stream stream, string moduleId, ModuleLogFilter filter); + + Task ProcessLogsStream(Stream stream, ModuleLogOptions logOptions, Func, Task> callback); } } diff --git a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogsProcessor.cs b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogsProcessor.cs index 7e7242d1664..2c799c8a7be 100644 --- a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogsProcessor.cs +++ b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogsProcessor.cs @@ -13,7 +13,7 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs using akka::Akka.IO; using Akka.Streams; using Akka.Streams.Dsl; - using Akka.Streams.IO; + using Microsoft.Azure.Devices.Edge.Storage; using Microsoft.Azure.Devices.Edge.Util; // Processes incoming logs stream and converts to the required format @@ -83,6 +83,38 @@ IRunnableGraph>> GetGraph() return result; } + public async Task ProcessLogsStream(Stream stream, ModuleLogOptions logOptions, Func, Task> callback) + { + string id = logOptions.Id; + GraphBuilder graphBuilder = GraphBuilder.CreateParsingGraphBuilder(stream, b => this.logMessageParser.Parse(b, id)); + logOptions.Filter.LogLevel.ForEach(l => graphBuilder.AddFilter(m => m.LogLevel == l)); + logOptions.Filter.Regex.ForEach(r => graphBuilder.AddFilter(m => r.IsMatch(m.Text))); + + async Task ConsumerCallback(ArraySegment a) + { + await callback(a); + return true; + } + + ArraySegment BasicMapper(ModuleLogMessageData l) + => logOptions.ContentType == LogsContentType.Text + ? new ArraySegment(l.FullFrame.ToArray()) + : new ArraySegment(l.ToBytes()); + + var mappers = new List, ArraySegment>>(); + if (logOptions.ContentEncoding == LogsContentEncoding.Gzip) + { + mappers.Add(m => new ArraySegment(Compression.CompressToGzip(m.Array))); + } + + IRunnableGraph graph = graphBuilder.GetStreamingGraph( + ConsumerCallback, + BasicMapper, + mappers); + + await graph.Run(this.materializer); + } + public void Dispose() { this.system?.Dispose(); @@ -91,9 +123,9 @@ public void Dispose() class GraphBuilder { - Source> parsingGraphSource; + Source parsingGraphSource; - GraphBuilder(Source> parsingGraphSource) + GraphBuilder(Source parsingGraphSource) { this.parsingGraphSource = parsingGraphSource; } @@ -103,7 +135,8 @@ public static GraphBuilder CreateParsingGraphBuilder(Stream stream, Func stream); var graph = source .Via(FramingFlow) - .Select(parserFunc); + .Select(parserFunc) + .MapMaterializedValue(_ => NotUsed.Instance); return new GraphBuilder(graph); } @@ -131,6 +164,23 @@ public IRunnableGraph>> GetMaterializingGraph(Func GetStreamingGraph(Func> callback, Func basicMapper, IList> mappers) + { + Source streamingGraphSource = this.parsingGraphSource + .Select(basicMapper); + + if (mappers?.Count > 0) + { + foreach (Func mapper in mappers) + { + streamingGraphSource = streamingGraphSource.Select(mapper); + } + } + + return streamingGraphSource.SelectAsync(1, callback) + .ToMaterialized(Sink.Ignore(), Keep.Right); + } } } } diff --git a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogsProvider.cs b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogsProvider.cs index d3e587997ac..ccab92e1047 100644 --- a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogsProvider.cs +++ b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogsProvider.cs @@ -36,17 +36,20 @@ public async Task GetLogsStream(ModuleLogOptions logOptions, Func(), Option.None(), cancellationToken); + Stream logsStream = await this.runtimeInfoProvider.GetModuleLogs(logOptions.Id, true, logOptions.Filter.Tail, logOptions.Filter.Since, cancellationToken); Events.ReceivedStream(logOptions.Id); - await this.WriteLogsStreamToOutput(logOptions.Id, callback, logsStream, cancellationToken); + await (NeedToProcessStream(logOptions) + ? this.logsProcessor.ProcessLogsStream(logsStream, logOptions, callback) + : this.WriteLogsStreamToOutput(logOptions.Id, callback, logsStream, cancellationToken)); } + internal static bool NeedToProcessStream(ModuleLogOptions logOptions) => + logOptions.Filter.LogLevel.HasValue + || logOptions.Filter.Regex.HasValue + || logOptions.ContentEncoding != LogsContentEncoding.None + || logOptions.ContentType != LogsContentType.Text; + static byte[] ProcessByContentEncoding(byte[] bytes, LogsContentEncoding contentEncoding) => contentEncoding == LogsContentEncoding.Gzip ? Compression.CompressToGzip(bytes) diff --git a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ModuleLogFilter.cs b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ModuleLogFilter.cs index 36fcbdce977..1e9372f76f3 100644 --- a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ModuleLogFilter.cs +++ b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ModuleLogFilter.cs @@ -1,8 +1,10 @@ // Copyright (c) Microsoft. All rights reserved. namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs { + using System; using System.Text.RegularExpressions; using Microsoft.Azure.Devices.Edge.Util; + using Microsoft.Azure.Devices.Edge.Util.Json; using Newtonsoft.Json; public class ModuleLogFilter @@ -23,12 +25,20 @@ public ModuleLogFilter(Option tail, Option since, Option logLevel public static ModuleLogFilter Empty = new ModuleLogFilter(Option.None(), Option.None(), Option.None(), Option.None()); + [JsonProperty("tail")] + [JsonConverter(typeof(OptionConverter), true)] public Option Tail { get; } + [JsonProperty("since")] + [JsonConverter(typeof(OptionConverter), true)] public Option Since { get; } + [JsonProperty("loglevel")] + [JsonConverter(typeof(OptionConverter), true)] public Option LogLevel { get; } + [JsonProperty("regex")] + [JsonConverter(typeof(OptionConverter))] public Option Regex { get; } } } diff --git a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ModuleLogMessage.cs b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ModuleLogMessage.cs index 694e6fa6ab3..e1e4fd06f4d 100644 --- a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ModuleLogMessage.cs +++ b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ModuleLogMessage.cs @@ -1,7 +1,6 @@ // Copyright (c) Microsoft. All rights reserved. namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs { - extern alias akka; using System; using Microsoft.Azure.Devices.Edge.Util; using Microsoft.Azure.Devices.Edge.Util.Json; diff --git a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.IoTHub/stream/LogsStreamRequest.cs b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.IoTHub/stream/LogsStreamRequest.cs index e120dadb1d6..76393117b77 100644 --- a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.IoTHub/stream/LogsStreamRequest.cs +++ b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.IoTHub/stream/LogsStreamRequest.cs @@ -1,16 +1,37 @@ // Copyright (c) Microsoft. All rights reserved. namespace Microsoft.Azure.Devices.Edge.Agent.IoTHub.Stream { + using System.ComponentModel; + using Microsoft.Azure.Devices.Edge.Agent.Core.Logs; + using Microsoft.Azure.Devices.Edge.Util; + using Newtonsoft.Json; + public class LogsStreamRequest { - public LogsStreamRequest(string schemaVersion, string id) + public LogsStreamRequest(string schemaVersion, string id, LogsContentEncoding encoding, LogsContentType contentType, ModuleLogFilter filter) { this.SchemaVersion = schemaVersion; - this.Id = id; + this.Id = Preconditions.CheckNonWhiteSpace(id, nameof(id)); + this.Filter = filter ?? ModuleLogFilter.Empty; + this.Encoding = encoding; + this.ContentType = contentType; } + [JsonProperty("schemaVersion")] public string SchemaVersion { get; } + [JsonProperty("id")] public string Id { get; } + + [JsonProperty("encoding", DefaultValueHandling = DefaultValueHandling.Populate)] + [DefaultValue(LogsContentEncoding.None)] + public LogsContentEncoding Encoding { get; } + + [JsonProperty("contentType", DefaultValueHandling = DefaultValueHandling.Populate)] + [DefaultValue(LogsContentType.Text)] + public LogsContentType ContentType { get; } + + [JsonProperty("filter")] + public ModuleLogFilter Filter { get; } } } diff --git a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.IoTHub/stream/LogsStreamRequestHandler.cs b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.IoTHub/stream/LogsStreamRequestHandler.cs index 3880abbd94c..ce759ac3e3e 100644 --- a/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.IoTHub/stream/LogsStreamRequestHandler.cs +++ b/edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.IoTHub/stream/LogsStreamRequestHandler.cs @@ -28,29 +28,28 @@ public async Task Handle(IClientWebSocket clientWebSocket, CancellationToken can LogsStreamRequest streamRequest = await this.ReadLogsStreamingRequest(clientWebSocket, cancellationToken); Events.RequestData(streamRequest); - var logOptions = new ModuleLogOptions(streamRequest.Id, LogsContentEncoding.None, LogsContentType.Text, ModuleLogFilter.Empty); - using (var socketCancellationTokenSource = new CancellationTokenSource()) + var logOptions = new ModuleLogOptions(streamRequest.Id, streamRequest.Encoding, streamRequest.ContentType, streamRequest.Filter); + var socketCancellationTokenSource = new CancellationTokenSource(); + + Task ProcessLogsFrame(ArraySegment bytes) { - Task ProcessLogsFrame(ArraySegment bytes) + if (clientWebSocket.State != WebSocketState.Open) { - if (clientWebSocket.State != WebSocketState.Open) - { - Events.WebSocketNotOpen(streamRequest.Id, clientWebSocket.State); - socketCancellationTokenSource.Cancel(); - return Task.CompletedTask; - } - else - { - return clientWebSocket.SendAsync(bytes, WebSocketMessageType.Binary, true, cancellationToken); - } + Events.WebSocketNotOpen(streamRequest.Id, clientWebSocket.State); + socketCancellationTokenSource.Cancel(); + return Task.CompletedTask; } - - using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, socketCancellationTokenSource.Token)) + else { - await this.logsProvider.GetLogsStream(logOptions, ProcessLogsFrame, linkedCts.Token); + return clientWebSocket.SendAsync(bytes, WebSocketMessageType.Binary, true, cancellationToken); } } + using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, socketCancellationTokenSource.Token)) + { + await this.logsProvider.GetLogsStream(logOptions, ProcessLogsFrame, linkedCts.Token); + } + Events.StreamingCompleted(streamRequest.Id); } catch (Exception e) diff --git a/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.Core.Test/logs/LogsProcessorTest.cs b/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.Core.Test/logs/LogsProcessorTest.cs index 914e13014f7..4c3c7f43829 100644 --- a/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.Core.Test/logs/LogsProcessorTest.cs +++ b/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.Core.Test/logs/LogsProcessorTest.cs @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Core.Test.Logs using System.Linq; using System.Threading.Tasks; using Microsoft.Azure.Devices.Edge.Agent.Core.Logs; + using Microsoft.Azure.Devices.Edge.Storage; using Microsoft.Azure.Devices.Edge.Util; using Microsoft.Azure.Devices.Edge.Util.Test.Common; using Xunit; @@ -278,5 +279,114 @@ public async Task GetMessagesWithMultipleFiltersTest(int logLevel, string regex, Assert.Contains(logMessage.Text, expectedText, StringComparison.OrdinalIgnoreCase); } } + + [Fact] + public async Task ProcessStreamTest() + { + // Arrange + string iotHub = "foo.azure-devices.net"; + string deviceId = "dev1"; + string moduleId = "mod1"; + int logLevel = 6; + string regex = "Starting"; + var logMessageParser = new LogMessageParser(iotHub, deviceId); + var logsProcessor = new LogsProcessor(logMessageParser); + var stream = new MemoryStream(DockerFraming.Frame(TestLogTexts)); + var filter = new ModuleLogFilter(Option.None(), Option.None(), Option.Some(logLevel), Option.Some(regex)); + var logOptions = new ModuleLogOptions(moduleId, LogsContentEncoding.None, LogsContentType.Text, filter); + + var receivedBytes = new List(); + + Task Callback(ArraySegment bytes) + { + receivedBytes.AddRange(bytes.ToArray()); + return Task.CompletedTask; + } + + // Act + await logsProcessor.ProcessLogsStream(stream, logOptions, Callback); + await Task.Delay(TimeSpan.FromSeconds(5)); + + // Assert + Assert.NotEmpty(receivedBytes); + string receivedText = receivedBytes + .Skip(8) + .ToArray() + .FromBytes(); + Assert.Equal(TestLogTexts[0], receivedText); + } + + [Fact] + public async Task ProcessStreamToMessageTest() + { + // Arrange + string iotHub = "foo.azure-devices.net"; + string deviceId = "dev1"; + string moduleId = "mod1"; + int logLevel = 6; + string regex = "Starting"; + var logMessageParser = new LogMessageParser(iotHub, deviceId); + var logsProcessor = new LogsProcessor(logMessageParser); + var stream = new MemoryStream(DockerFraming.Frame(TestLogTexts)); + var filter = new ModuleLogFilter(Option.None(), Option.None(), Option.Some(logLevel), Option.Some(regex)); + var logOptions = new ModuleLogOptions(moduleId, LogsContentEncoding.None, LogsContentType.Json, filter); + + var receivedBytes = new List(); + + Task Callback(ArraySegment bytes) + { + receivedBytes.AddRange(bytes.ToArray()); + return Task.CompletedTask; + } + + // Act + await logsProcessor.ProcessLogsStream(stream, logOptions, Callback); + await Task.Delay(TimeSpan.FromSeconds(5)); + + // Assert + Assert.NotEmpty(receivedBytes); + var logMessage = receivedBytes.ToArray().FromBytes(); + Assert.Equal(iotHub, logMessage.IoTHub); + Assert.Equal(deviceId, logMessage.DeviceId); + Assert.Equal(moduleId, logMessage.ModuleId); + Assert.Equal(6, logMessage.LogLevel); + Assert.Contains(logMessage.Text, TestLogTexts[0]); + } + + [Fact] + public async Task ProcessStreamWithGzipTest() + { + // Arrange + string iotHub = "foo.azure-devices.net"; + string deviceId = "dev1"; + string moduleId = "mod1"; + int logLevel = 6; + string regex = "Starting"; + var logMessageParser = new LogMessageParser(iotHub, deviceId); + var logsProcessor = new LogsProcessor(logMessageParser); + var stream = new MemoryStream(DockerFraming.Frame(TestLogTexts)); + var filter = new ModuleLogFilter(Option.None(), Option.None(), Option.Some(logLevel), Option.Some(regex)); + var logOptions = new ModuleLogOptions(moduleId, LogsContentEncoding.Gzip, LogsContentType.Text, filter); + + var receivedBytes = new List(); + + Task Callback(ArraySegment bytes) + { + receivedBytes.AddRange(bytes.ToArray()); + return Task.CompletedTask; + } + + // Act + await logsProcessor.ProcessLogsStream(stream, logOptions, Callback); + await Task.Delay(TimeSpan.FromSeconds(5)); + + // Assert + Assert.NotEmpty(receivedBytes); + string receivedText = Compression.DecompressFromGzip(receivedBytes.ToArray()) + .Skip(8) + .ToArray() + .FromBytes(); + Assert.Equal(TestLogTexts[0], receivedText); + } } } diff --git a/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.Core.Test/logs/LogsProviderTest.cs b/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.Core.Test/logs/LogsProviderTest.cs index 6869d72fdd9..05470885d8b 100644 --- a/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.Core.Test/logs/LogsProviderTest.cs +++ b/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.Core.Test/logs/LogsProviderTest.cs @@ -4,6 +4,7 @@ namespace Microsoft.Azure.Devices.Edge.Agent.Core.Test.Logs using System; using System.Collections.Generic; using System.IO; + using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -40,7 +41,7 @@ public async Task GetLogsAsTextTest() var runtimeInfoProvider = new Mock(); runtimeInfoProvider.Setup(r => r.GetModuleLogs(moduleId, false, tail, since, cancellationToken)) - .ReturnsAsync(new MemoryStream(GetDockerLogsStream(TestLogTexts))); + .ReturnsAsync(new MemoryStream(DockerFraming.Frame(TestLogTexts))); var logsProcessor = new LogsProcessor(new LogMessageParser(iotHub, deviceId)); var logsProvider = new LogsProvider(runtimeInfoProvider.Object, logsProcessor); @@ -69,7 +70,7 @@ public async Task GetLogsAsTextWithCompressionTest() var runtimeInfoProvider = new Mock(); runtimeInfoProvider.Setup(r => r.GetModuleLogs(moduleId, false, tail, since, cancellationToken)) - .ReturnsAsync(new MemoryStream(GetDockerLogsStream(TestLogTexts))); + .ReturnsAsync(new MemoryStream(DockerFraming.Frame(TestLogTexts))); var logsProcessor = new LogsProcessor(new LogMessageParser(iotHub, deviceId)); var logsProvider = new LogsProvider(runtimeInfoProvider.Object, logsProcessor); @@ -98,7 +99,7 @@ public async Task GetLogsAsJsonTest() var runtimeInfoProvider = new Mock(); runtimeInfoProvider.Setup(r => r.GetModuleLogs(moduleId, false, tail, since, cancellationToken)) - .ReturnsAsync(new MemoryStream(GetDockerLogsStream(TestLogTexts))); + .ReturnsAsync(new MemoryStream(DockerFraming.Frame(TestLogTexts))); var logsProcessor = new LogsProcessor(new LogMessageParser(iotHub, deviceId)); var logsProvider = new LogsProvider(runtimeInfoProvider.Object, logsProcessor); @@ -139,7 +140,7 @@ public async Task GetLogsAsJsonWithCompressionTest() var runtimeInfoProvider = new Mock(); runtimeInfoProvider.Setup(r => r.GetModuleLogs(moduleId, false, tail, since, cancellationToken)) - .ReturnsAsync(new MemoryStream(GetDockerLogsStream(TestLogTexts))); + .ReturnsAsync(new MemoryStream(DockerFraming.Frame(TestLogTexts))); var logsProcessor = new LogsProcessor(new LogMessageParser(iotHub, deviceId)); var logsProvider = new LogsProvider(runtimeInfoProvider.Object, logsProcessor); @@ -179,7 +180,7 @@ public async Task GetLogsStreamTest() Option since = Option.None(); CancellationToken cancellationToken = CancellationToken.None; - byte[] dockerLogsStreamBytes = GetDockerLogsStream(TestLogTexts); + byte[] dockerLogsStreamBytes = DockerFraming.Frame(TestLogTexts); var runtimeInfoProvider = new Mock(); runtimeInfoProvider.Setup(r => r.GetModuleLogs(moduleId, true, tail, since, cancellationToken)) .ReturnsAsync(new MemoryStream(dockerLogsStreamBytes)); @@ -205,34 +206,65 @@ Task Callback(ArraySegment bytes) Assert.Equal(dockerLogsStreamBytes, receivedBytes); } - static byte[] GetDockerLogsStream(IEnumerable logTexts) + [Fact] + public async Task GetLogsStreamWithFiltersTest() { - byte streamByte = 01; - var padding = new byte[3]; - var outputBytes = new List(); - foreach (string text in logTexts) + // Arrange + string iotHub = "foo.azure-devices.net"; + string deviceId = "dev1"; + string moduleId = "mod1"; + Option tail = Option.Some(10); + Option since = Option.Some(1552887267); + CancellationToken cancellationToken = CancellationToken.None; + + byte[] dockerLogsStreamBytes = DockerFraming.Frame(TestLogTexts); + var runtimeInfoProvider = new Mock(); + runtimeInfoProvider.Setup(r => r.GetModuleLogs(moduleId, true, tail, since, cancellationToken)) + .ReturnsAsync(new MemoryStream(dockerLogsStreamBytes)); + + var logsProcessor = new LogsProcessor(new LogMessageParser(iotHub, deviceId)); + var logsProvider = new LogsProvider(runtimeInfoProvider.Object, logsProcessor); + + var filter = new ModuleLogFilter(tail, since, Option.Some(6), Option.Some("Starting")); + var logOptions = new ModuleLogOptions(moduleId, LogsContentEncoding.Gzip, LogsContentType.Text, filter); + + var receivedBytes = new List(); + + Task Callback(ArraySegment bytes) { - byte[] textBytes = Encoding.UTF8.GetBytes(text); - byte[] lenBytes = GetLengthBytes(textBytes.Length); - outputBytes.Add(streamByte); - outputBytes.AddRange(padding); - outputBytes.AddRange(lenBytes); - outputBytes.AddRange(textBytes); + receivedBytes.AddRange(bytes.ToArray()); + return Task.CompletedTask; } - return outputBytes.ToArray(); + // Act + await logsProvider.GetLogsStream(logOptions, Callback, cancellationToken); + await Task.Delay(TimeSpan.FromSeconds(3)); + + // Assert + Assert.NotEmpty(receivedBytes); + string receivedText = Compression.DecompressFromGzip(receivedBytes.ToArray()) + .Skip(8) + .ToArray() + .FromBytes(); + Assert.Equal(TestLogTexts[0], receivedText); } - static byte[] GetLengthBytes(int len) + [Theory] + [MemberData(nameof(GetNeedToProcessStreamTestData))] + public void NeedToProcessStreamTest(ModuleLogOptions logOptions, bool expectedResult) { - byte[] intBytes = BitConverter.GetBytes(len); - if (BitConverter.IsLittleEndian) - { - Array.Reverse(intBytes); - } + Assert.Equal(expectedResult, LogsProvider.NeedToProcessStream(logOptions)); + } - byte[] result = intBytes; - return result; + public static IEnumerable GetNeedToProcessStreamTestData() + { + yield return new object[] { new ModuleLogOptions("id", LogsContentEncoding.None, LogsContentType.Text, ModuleLogFilter.Empty), false }; + yield return new object[] { new ModuleLogOptions("id", LogsContentEncoding.None, LogsContentType.Text, new ModuleLogFilter(Option.Some(10), Option.Some(100), Option.None(), Option.None())), false }; + yield return new object[] { new ModuleLogOptions("id", LogsContentEncoding.Gzip, LogsContentType.Text, ModuleLogFilter.Empty), true }; + yield return new object[] { new ModuleLogOptions("id", LogsContentEncoding.None, LogsContentType.Text, new ModuleLogFilter(Option.Some(10), Option.Some(100), Option.Some(3), Option.Some("foo"))), true }; + yield return new object[] { new ModuleLogOptions("id", LogsContentEncoding.None, LogsContentType.Text, new ModuleLogFilter(Option.Some(10), Option.Some(100), Option.Some(3), Option.None())), true }; + yield return new object[] { new ModuleLogOptions("id", LogsContentEncoding.None, LogsContentType.Text, new ModuleLogFilter(Option.Some(10), Option.Some(100), Option.None(), Option.Some("foo"))), true }; + yield return new object[] { new ModuleLogOptions("id", LogsContentEncoding.None, LogsContentType.Json, new ModuleLogFilter(Option.Some(10), Option.Some(100), Option.None(), Option.None())), true }; } } } diff --git a/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.IoTHub.Test/stream/LogsStreamRequestHandlerTest.cs b/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.IoTHub.Test/stream/LogsStreamRequestHandlerTest.cs index 158ffc71c07..07570ba88bd 100644 --- a/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.IoTHub.Test/stream/LogsStreamRequestHandlerTest.cs +++ b/edge-agent/test/Microsoft.Azure.Devices.Edge.Agent.IoTHub.Test/stream/LogsStreamRequestHandlerTest.cs @@ -34,7 +34,7 @@ public async Task HandleTest() var logsProvider = new LogsProvider(runtimeInfoProvider.Object, Mock.Of()); - var logsStreamRequest = new LogsStreamRequest("1.0", id); + var logsStreamRequest = new LogsStreamRequest("1.0", id, LogsContentEncoding.None, LogsContentType.Text, ModuleLogFilter.Empty); byte[] logsStreamRequestBytes = logsStreamRequest.ToBytes(); var logsStreamRequestArraySeg = new ArraySegment(logsStreamRequestBytes); var clientWebSocket = new Mock(); @@ -77,7 +77,7 @@ public async Task HandleWithCancellationTest() var logsProvider = new LogsProvider(runtimeInfoProvider.Object, Mock.Of()); - var logsStreamRequest = new LogsStreamRequest("1.0", id); + var logsStreamRequest = new LogsStreamRequest("1.0", id, LogsContentEncoding.None, LogsContentType.Text, ModuleLogFilter.Empty); byte[] logsStreamRequestBytes = logsStreamRequest.ToBytes(); var logsStreamRequestArraySeg = new ArraySegment(logsStreamRequestBytes); var clientWebSocket = new Mock(); diff --git a/edge-util/src/Microsoft.Azure.Devices.Edge.Util/json/OptionConverter.cs b/edge-util/src/Microsoft.Azure.Devices.Edge.Util/json/OptionConverter.cs index 5a667fb365c..d0c71039e0c 100644 --- a/edge-util/src/Microsoft.Azure.Devices.Edge.Util/json/OptionConverter.cs +++ b/edge-util/src/Microsoft.Azure.Devices.Edge.Util/json/OptionConverter.cs @@ -6,13 +6,32 @@ namespace Microsoft.Azure.Devices.Edge.Util.Json public class OptionConverter : JsonConverter { + readonly bool nullOnNone; + + public OptionConverter() + : this(false) + { + } + + public OptionConverter(bool nullOnNone) + { + this.nullOnNone = nullOnNone; + } + public override bool CanRead => false; public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) { if (value is Option option) { - serializer.Serialize(writer, option.OrDefault()); + if (option.HasValue || !this.nullOnNone) + { + serializer.Serialize(writer, option.OrDefault()); + } + else + { + serializer.Serialize(writer, null); + } } else {