-
Notifications
You must be signed in to change notification settings - Fork 462
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Add LogsProvider * Fix build * Fix build issue * Change return type to IReadOnlyList instead of IEnumerable * Fix bad merge
- Loading branch information
1 parent
f7e820f
commit 6bc92d2
Showing
16 changed files
with
956 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
25 changes: 25 additions & 0 deletions
25
edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/SystemInfo.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
// Copyright (c) Microsoft. All rights reserved. | ||
|
||
namespace Microsoft.Azure.Devices.Edge.Agent.Core | ||
{ | ||
using Newtonsoft.Json; | ||
|
||
public class SystemInfo | ||
{ | ||
[JsonConstructor] | ||
public SystemInfo(string operatingSystemType, string architecture, string version) | ||
{ | ||
this.OperatingSystemType = operatingSystemType; | ||
this.Architecture = architecture; | ||
this.Version = version; | ||
} | ||
|
||
public string OperatingSystemType { get; } | ||
|
||
public string Architecture { get; } | ||
|
||
public string Version { get; } | ||
|
||
static SystemInfo Empty { get; } = new SystemInfo(string.Empty, string.Empty, string.Empty); | ||
} | ||
} |
11 changes: 11 additions & 0 deletions
11
edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ILogMessageParser.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
// Copyright (c) Microsoft. All rights reserved. | ||
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs | ||
{ | ||
extern alias akka; | ||
using akka::Akka.IO; | ||
|
||
public interface ILogMessageParser | ||
{ | ||
ModuleLogMessage Parse(ByteString byteString, string moduleId); | ||
} | ||
} |
14 changes: 14 additions & 0 deletions
14
edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ILogsProcessor.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
// Copyright (c) Microsoft. All rights reserved. | ||
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs | ||
{ | ||
using System.Collections.Generic; | ||
using System.IO; | ||
using System.Threading.Tasks; | ||
|
||
public interface ILogsProcessor | ||
{ | ||
Task<IReadOnlyList<ModuleLogMessage>> GetMessages(Stream stream, string moduleId); | ||
|
||
Task<IReadOnlyList<string>> GetText(Stream stream); | ||
} | ||
} |
11 changes: 11 additions & 0 deletions
11
edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/ILogsProvider.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
// Copyright (c) Microsoft. All rights reserved. | ||
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs | ||
{ | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
|
||
public interface ILogsProvider | ||
{ | ||
Task<byte[]> GetLogs(ModuleLogOptions logOptions, CancellationToken cancellationToken); | ||
} | ||
} |
93 changes: 93 additions & 0 deletions
93
edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogMessageParser.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
// Copyright (c) Microsoft. All rights reserved. | ||
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs | ||
{ | ||
extern alias akka; | ||
using System; | ||
using System.Text; | ||
using System.Text.RegularExpressions; | ||
using akka::Akka.IO; | ||
using Microsoft.Azure.Devices.Edge.Util; | ||
|
||
// Parses logs into message objects. | ||
// | ||
// Expected format - | ||
// Each input payload should contain one frame in Docker format - | ||
// 01 00 00 00 00 00 00 1f 52 6f 73 65 73 20 61 72 65 ... | ||
// │ ─────┬── ─────┬───── R o s e s a r e... | ||
// │ │ │ | ||
// └stdout │ │ | ||
// │ └─ 0x0000001f = 31 bytes (including the \n at the end) | ||
// unused | ||
// | ||
// The payload itself is expected to be in this format - | ||
// <logLevel> TimeStamp log text | ||
// For example, this log line will be parsed as follows - | ||
// <6> 2019-02-14 16:15:35.243 -08:00 [INF] [EdgeHub] - Version - 1.0.7-dev.BUILDNUMBER (COMMITID) | ||
// LogLevel = 6 | ||
// TimeStamp = 2019-02-14 16:15:35.243 -08:00 | ||
// Text = [INF] [EdgeHub] - Version - 1.0.7-dev.BUILDNUMBER (COMMITID) | ||
public class LogMessageParser : ILogMessageParser | ||
{ | ||
const int DefaultLogLevel = 6; | ||
const string LogRegexPattern = @"^(<(?<logLevel>\d)>)?\s*((?<timestamp>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3}\s[+-]\d{2}:\d{2})\s)?\s*(?<logtext>.*)"; | ||
|
||
readonly string iotHubName; | ||
readonly string deviceId; | ||
|
||
public LogMessageParser(string iotHubName, string deviceId) | ||
{ | ||
this.iotHubName = Preconditions.CheckNonWhiteSpace(iotHubName, nameof(iotHubName)); | ||
this.deviceId = Preconditions.CheckNonWhiteSpace(deviceId, nameof(deviceId)); | ||
} | ||
|
||
public ModuleLogMessage Parse(ByteString byteString, string moduleId) => | ||
GetLogMessage(byteString, this.iotHubName, this.deviceId, moduleId); | ||
|
||
internal static ModuleLogMessage GetLogMessage(ByteString arg, string iotHubName, string deviceId, string moduleId) | ||
{ | ||
string stream = GetStream(arg[0]); | ||
ByteString payload = arg.Slice(8); | ||
string payloadString = payload.ToString(Encoding.UTF8); | ||
(int logLevel, Option<DateTime> timeStamp, string logText) = ParseLogText(payloadString); | ||
var moduleLogMessage = new ModuleLogMessage(iotHubName, deviceId, moduleId, stream, logLevel, timeStamp, logText); | ||
return moduleLogMessage; | ||
} | ||
|
||
internal static string GetStream(byte streamByte) => streamByte == 2 ? "stderr" : "stdout"; | ||
|
||
internal static (int logLevel, Option<DateTime> timeStamp, string text) ParseLogText(string value) | ||
{ | ||
var regex = new Regex(LogRegexPattern); | ||
var match = regex.Match(value); | ||
int logLevel = DefaultLogLevel; | ||
string text = value; | ||
Option<DateTime> timeStamp = Option.None<DateTime>(); | ||
if (match.Success) | ||
{ | ||
var tsg = match.Groups["timestamp"]; | ||
if (tsg?.Length > 0) | ||
{ | ||
if (DateTime.TryParse(tsg.Value, out DateTime dt)) | ||
{ | ||
timeStamp = Option.Some(dt); | ||
} | ||
} | ||
|
||
var llg = match.Groups["logLevel"]; | ||
if (llg?.Length > 0) | ||
{ | ||
string ll = llg.Value; | ||
int.TryParse(ll, out logLevel); | ||
} | ||
|
||
var textGroup = match.Groups["logtext"]; | ||
if (textGroup?.Length > 0) | ||
{ | ||
text = textGroup.Value; | ||
} | ||
} | ||
|
||
return (logLevel, timeStamp, text); | ||
} | ||
} | ||
} |
81 changes: 81 additions & 0 deletions
81
edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogsProcessor.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
// Copyright (c) Microsoft. All rights reserved. | ||
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs | ||
{ | ||
extern alias akka; | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Collections.Immutable; | ||
using System.IO; | ||
using System.Text; | ||
using System.Threading.Tasks; | ||
using akka::Akka; | ||
using akka::Akka.Actor; | ||
using akka::Akka.IO; | ||
using Akka.Streams; | ||
using Akka.Streams.Dsl; | ||
using Microsoft.Azure.Devices.Edge.Util; | ||
|
||
// Processes incoming logs stream and converts to the required format | ||
// | ||
// Docker format - | ||
// Each input payload should contain one frame in Docker format - | ||
// 01 00 00 00 00 00 00 1f 52 6f 73 65 73 20 61 72 65 ... | ||
// │ ─────┬── ─────┬───── R o s e s a r e... | ||
// │ │ │ | ||
// └stdout │ │ | ||
// │ └─ 0x0000001f = 31 bytes (including the \n at the end) | ||
// unused | ||
public class LogsProcessor : ILogsProcessor, IDisposable | ||
{ | ||
static readonly Flow<ByteString, ByteString, NotUsed> FramingFlow | ||
= Framing.LengthField(4, int.MaxValue, 4, ByteOrder.BigEndian); | ||
|
||
readonly ActorSystem system; | ||
readonly ActorMaterializer materializer; | ||
readonly ILogMessageParser logMessageParser; | ||
|
||
public LogsProcessor(ILogMessageParser logMessageParser) | ||
{ | ||
this.logMessageParser = Preconditions.CheckNotNull(logMessageParser, nameof(logMessageParser)); | ||
this.system = ActorSystem.Create("LogsProcessor"); | ||
this.materializer = this.system.Materializer(); | ||
} | ||
|
||
public async Task<IReadOnlyList<ModuleLogMessage>> GetMessages(Stream stream, string moduleId) | ||
{ | ||
Preconditions.CheckNotNull(stream, nameof(stream)); | ||
Preconditions.CheckNonWhiteSpace(moduleId, nameof(moduleId)); | ||
|
||
var source = StreamConverters.FromInputStream(() => stream); | ||
var seqSink = Sink.Seq<ModuleLogMessage>(); | ||
IRunnableGraph<Task<IImmutableList<ModuleLogMessage>>> graph = source | ||
.Via(FramingFlow) | ||
.Select(b => this.logMessageParser.Parse(b, moduleId)) | ||
.ToMaterialized(seqSink, Keep.Right); | ||
|
||
IImmutableList<ModuleLogMessage> result = await graph.Run(this.materializer); | ||
return result; | ||
} | ||
|
||
public async Task<IReadOnlyList<string>> GetText(Stream stream) | ||
{ | ||
Preconditions.CheckNotNull(stream, nameof(stream)); | ||
var source = StreamConverters.FromInputStream(() => stream); | ||
var seqSink = Sink.Seq<string>(); | ||
IRunnableGraph<Task<IImmutableList<string>>> graph = source | ||
.Via(FramingFlow) | ||
.Select(b => b.Slice(8)) | ||
.Select(b => b.ToString(Encoding.UTF8)) | ||
.ToMaterialized(seqSink, Keep.Right); | ||
|
||
IImmutableList<string> result = await graph.Run(this.materializer); | ||
return result; | ||
} | ||
|
||
public void Dispose() | ||
{ | ||
this.system?.Dispose(); | ||
this.materializer?.Dispose(); | ||
} | ||
} | ||
} |
56 changes: 56 additions & 0 deletions
56
edge-agent/src/Microsoft.Azure.Devices.Edge.Agent.Core/logs/LogsProvider.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
// Copyright (c) Microsoft. All rights reserved. | ||
namespace Microsoft.Azure.Devices.Edge.Agent.Core.Logs | ||
{ | ||
using System.Collections.Generic; | ||
using System.IO; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using Microsoft.Azure.Devices.Edge.Storage; | ||
using Microsoft.Azure.Devices.Edge.Util; | ||
|
||
public class LogsProvider : ILogsProvider | ||
{ | ||
readonly IRuntimeInfoProvider runtimeInfoProvider; | ||
readonly ILogsProcessor logsProcessor; | ||
|
||
public LogsProvider(IRuntimeInfoProvider runtimeInfoProvider, ILogsProcessor logsProcessor) | ||
{ | ||
this.runtimeInfoProvider = Preconditions.CheckNotNull(runtimeInfoProvider, nameof(runtimeInfoProvider)); | ||
this.logsProcessor = Preconditions.CheckNotNull(logsProcessor, nameof(logsProcessor)); | ||
} | ||
|
||
public async Task<byte[]> GetLogs(ModuleLogOptions logOptions, CancellationToken cancellationToken) | ||
{ | ||
Stream logsStream = await this.runtimeInfoProvider.GetModuleLogs(logOptions.Id, false, Option.None<int>(), cancellationToken); | ||
byte[] logBytes = await this.GetProcessedLogs(logsStream, logOptions); | ||
return logBytes; | ||
} | ||
|
||
static byte[] ProcessByContentEncoding(byte[] bytes, LogsContentEncoding contentEncoding) => | ||
contentEncoding == LogsContentEncoding.Gzip | ||
? Compression.CompressToGzip(bytes) | ||
: bytes; | ||
|
||
async Task<byte[]> GetProcessedLogs(Stream logsStream, ModuleLogOptions logOptions) | ||
{ | ||
byte[] logBytes = await this.ProcessByContentType(logsStream, logOptions); | ||
logBytes = ProcessByContentEncoding(logBytes, logOptions.ContentEncoding); | ||
return logBytes; | ||
} | ||
|
||
async Task<byte[]> ProcessByContentType(Stream logsStream, ModuleLogOptions logOptions) | ||
{ | ||
switch (logOptions.ContentType) | ||
{ | ||
case LogsContentType.Json: | ||
IEnumerable<ModuleLogMessage> logMessages = await this.logsProcessor.GetMessages(logsStream, logOptions.Id); | ||
return logMessages.ToBytes(); | ||
|
||
default: | ||
IEnumerable<string> logTexts = await this.logsProcessor.GetText(logsStream); | ||
string logTextString = logTexts.Join(string.Empty); | ||
return logTextString.ToBytes(); | ||
} | ||
} | ||
} | ||
} |
Oops, something went wrong.