Skip to content

Commit

Permalink
EdgeHub: Upstream perf improvements (#1006)
Browse files Browse the repository at this point in the history
* Fix StoringAsyncEndpointExecutor to read in parallel to processing

* Add parallel processing to CloudEndpoint

* Cleanup

* Fix metrics calculation

* Add StoreMessageProvider tests

* Fix names

* Cleanup

* Add more tests

* Fix build

* Fix casing

* Fix tests

* Add tests for batch

* Fix stylecop issues

* Fix routing tests

* Increase the endpoint timeout based on Fanout factor

* Fix build

* Fix getBatchSize calculation

* Add comment to StoreMessageProvider
  • Loading branch information
varunpuranik authored Apr 3, 2019
1 parent 232c67f commit 864b33d
Show file tree
Hide file tree
Showing 22 changed files with 682 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ public class Constants

public const string IotEdgeIdentityCapability = "iotEdge";
public const string ServiceIdentityRefreshMethodName = "RefreshDeviceScopeIdentityCache";

public const long MaxMessageSize = 256 * 1024; // matches IoTHub

public static readonly Version ConfigSchemaVersion = new Version("1.0");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
<PackageReference Include="App.Metrics" Version="3.0.0" />
<PackageReference Include="JetBrains.Annotations" Version="2018.3.0" />
<PackageReference Include="Microsoft.Azure.Devices.Client" Version="1.18.1" />
<PackageReference Include="Nito.AsyncEx" Version="5.0.0-pre-05" />
<PackageReference Include="System.ValueTuple" Version="4.5.0" />
</ItemGroup>

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,29 @@ public class EndpointFactory : IEndpointFactory
readonly Core.IMessageConverter<IRoutingMessage> messageConverter;
readonly string edgeDeviceId;
readonly ConcurrentDictionary<string, Endpoint> cache;
readonly int maxBatchSize;
readonly int upstreamFanOutFactor;

public EndpointFactory(
IConnectionManager connectionManager,
Core.IMessageConverter<IRoutingMessage> messageConverter,
string edgeDeviceId)
string edgeDeviceId,
int maxBatchSize,
int upstreamFanOutFactor)
{
this.connectionManager = Preconditions.CheckNotNull(connectionManager, nameof(connectionManager));
this.messageConverter = Preconditions.CheckNotNull(messageConverter, nameof(messageConverter));
this.edgeDeviceId = Preconditions.CheckNonWhiteSpace(edgeDeviceId, nameof(edgeDeviceId));
this.cache = new ConcurrentDictionary<string, Endpoint>();
this.maxBatchSize = maxBatchSize;
this.upstreamFanOutFactor = upstreamFanOutFactor;
}

public Endpoint CreateSystemEndpoint(string endpoint)
{
if (CloudEndpointName.Equals(endpoint, StringComparison.OrdinalIgnoreCase))
{
return this.cache.GetOrAdd(CloudEndpointName, s => new CloudEndpoint("iothub", id => this.connectionManager.GetCloudConnection(id), this.messageConverter));
return this.cache.GetOrAdd(CloudEndpointName, s => new CloudEndpoint("iothub", id => this.connectionManager.GetCloudConnection(id), this.messageConverter, this.maxBatchSize, this.upstreamFanOutFactor));
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ namespace Microsoft.Azure.Devices.Edge.Hub.Core.Routing

public class RoutingEdgeHub : IEdgeHub
{
const long MaxMessageSize = 256 * 1024; // matches IoTHub
readonly Router router;
readonly Core.IMessageConverter<IRoutingMessage> messageConverter;
readonly IConnectionManager connectionManager;
Expand Down Expand Up @@ -161,9 +160,9 @@ internal void AddEdgeSystemProperties(IMessage message)
static void ValidateMessageSize(IRoutingMessage messageToBeValidated)
{
long messageSize = messageToBeValidated.Size();
if (messageSize > MaxMessageSize)
if (messageSize > Constants.MaxMessageSize)
{
throw new EdgeHubMessageTooLargeException($"Message size is {messageSize} bytes which is greater than the max size {MaxMessageSize} bytes allowed");
throw new EdgeHubMessageTooLargeException($"Message size is {messageSize} bytes which is greater than the max size {Constants.MaxMessageSize} bytes allowed");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ void RegisterRoutingModule(ContainerBuilder builder, (bool isEnabled, bool usePe
bool useV1TwinManager = this.GetConfigurationValueIfExists<string>("TwinManagerVersion")
.Map(v => v.Equals("v1", StringComparison.OrdinalIgnoreCase))
.GetOrElse(true);
int maxUpstreamBatchSize = this.configuration.GetValue("MaxUpstreamBatchSize", 10);
int upstreamFanOutFactor = this.configuration.GetValue("UpstreamFanOutFactor", 10);

builder.RegisterModule(
new RoutingModule(
Expand All @@ -151,7 +153,9 @@ void RegisterRoutingModule(ContainerBuilder builder, (bool isEnabled, bool usePe
cloudOperationTimeout,
minTwinSyncPeriod,
reportedPropertiesSyncFrequency,
useV1TwinManager));
useV1TwinManager,
maxUpstreamBatchSize,
upstreamFanOutFactor));
}

void RegisterCommonModule(ContainerBuilder builder, bool optimizeForPerformance, (bool isEnabled, bool usePersistentStorage, StoreAndForwardConfiguration config, string storagePath) storeAndForward)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class RoutingModule : Module
readonly Option<TimeSpan> minTwinSyncPeriod;
readonly Option<TimeSpan> reportedPropertiesSyncFrequency;
readonly bool useV1TwinManager;
readonly int maxUpstreamBatchSize;
readonly int upstreamFanOutFactor;

public RoutingModule(
string iotHubName,
Expand All @@ -66,7 +68,9 @@ public RoutingModule(
TimeSpan operationTimeout,
Option<TimeSpan> minTwinSyncPeriod,
Option<TimeSpan> reportedPropertiesSyncFrequency,
bool useV1TwinManager)
bool useV1TwinManager,
int maxUpstreamBatchSize,
int upstreamFanOutFactor)
{
this.iotHubName = Preconditions.CheckNonWhiteSpace(iotHubName, nameof(iotHubName));
this.edgeDeviceId = Preconditions.CheckNonWhiteSpace(edgeDeviceId, nameof(edgeDeviceId));
Expand All @@ -87,6 +91,8 @@ public RoutingModule(
this.minTwinSyncPeriod = minTwinSyncPeriod;
this.reportedPropertiesSyncFrequency = reportedPropertiesSyncFrequency;
this.useV1TwinManager = useV1TwinManager;
this.maxUpstreamBatchSize = maxUpstreamBatchSize;
this.upstreamFanOutFactor = upstreamFanOutFactor;
}

protected override void Load(ContainerBuilder builder)
Expand Down Expand Up @@ -239,7 +245,7 @@ protected override void Load(ContainerBuilder builder)
{
var messageConverter = c.Resolve<Core.IMessageConverter<IRoutingMessage>>();
IConnectionManager connectionManager = await c.Resolve<Task<IConnectionManager>>();
return new EndpointFactory(connectionManager, messageConverter, this.edgeDeviceId) as IEndpointFactory;
return new EndpointFactory(connectionManager, messageConverter, this.edgeDeviceId, this.maxUpstreamBatchSize, this.upstreamFanOutFactor) as IEndpointFactory;
})
.As<Task<IEndpointFactory>>()
.SingleInstance();
Expand Down
2 changes: 2 additions & 0 deletions edge-hub/src/Microsoft.Azure.Devices.Routing.Core/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ protected Endpoint(string id, string name, string iotHubName)

public abstract void LogUserMetrics(long messageCount, long latencyInMs);

public virtual int FanOutFactor => 1;

public bool Equals(Endpoint other)
{
if (ReferenceEquals(null, other))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@ namespace Microsoft.Azure.Devices.Routing.Core.Endpoints
using App.Metrics;
using App.Metrics.Counter;
using App.Metrics.Timer;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Routing.Core.Endpoints.StateMachine;
using Microsoft.Azure.Devices.Routing.Core.Util;
using Microsoft.Azure.Devices.Routing.Core.Util.Concurrency;
using Microsoft.Extensions.Logging;
using Nito.AsyncEx;
using static System.FormattableString;
using AsyncLock = Microsoft.Azure.Devices.Routing.Core.Util.Concurrency.AsyncLock;

public class StoringAsyncEndpointExecutor : IEndpointExecutor
{
readonly AtomicBoolean closed = new AtomicBoolean();
readonly IMessageStore messageStore;
readonly Task sendMessageTask;
readonly ManualResetEvent hasMessagesInQueue = new ManualResetEvent(true);
readonly AsyncManualResetEvent hasMessagesInQueue = new AsyncManualResetEvent(true);
readonly ICheckpointer checkpointer;
readonly AsyncEndpointExecutorOptions options;
readonly EndpointExecutorFsm machine;
Expand Down Expand Up @@ -126,19 +128,23 @@ async Task SendMessagesPump()
{
Events.StartSendMessagesPump(this);
IMessageIterator iterator = this.messageStore.GetMessageIterator(this.Endpoint.Id, this.checkpointer.Offset + 1);
int batchSize = this.options.BatchSize * this.Endpoint.FanOutFactor;
var storeMessagesProvider = new StoreMessagesProvider(iterator, this.options.BatchTimeout, batchSize);
while (!this.cts.IsCancellationRequested)
{
try
{
this.hasMessagesInQueue.WaitOne(this.options.BatchTimeout);
IMessage[] messages = (await iterator.GetNext(this.options.BatchSize)).ToArray();
await this.ProcessMessages(messages);
Events.SendMessagesSuccess(this, messages);
Metrics.DrainedCountIncrement(this.Endpoint.Id, messages.Length);

// If store has no messages, then reset the hasMessagesInQueue flag.
if (messages.Length == 0)
await this.hasMessagesInQueue.WaitAsync(this.options.BatchTimeout);
IMessage[] messages = await storeMessagesProvider.GetMessages();
if (messages.Length > 0)
{
await this.ProcessMessages(messages);
Events.SendMessagesSuccess(this, messages);
Metrics.DrainedCountIncrement(this.Endpoint.Id, messages.Length);
}
else
{
// If store has no messages, then reset the hasMessagesInQueue flag.
this.hasMessagesInQueue.Reset();
}
}
Expand Down Expand Up @@ -172,6 +178,73 @@ void Dispose(bool disposing)
}
}

// This class is used to prefetch messages from the store before they are needed.
// As soon as the previous batch is consumed, the next batch is fetched.
// A pump is started as soon as the object is created, and it keeps the messages list populated.
internal class StoreMessagesProvider
{
readonly IMessageIterator iterator;
readonly int batchSize;
readonly AsyncLock messagesLock = new AsyncLock();
readonly AsyncManualResetEvent messagesResetEvent = new AsyncManualResetEvent(true);
readonly TimeSpan timeout;
readonly Task populateTask;
List<IMessage> messagesList;

public StoreMessagesProvider(IMessageIterator iterator, TimeSpan timeout, int batchSize)
{
this.iterator = iterator;
this.batchSize = batchSize;
this.timeout = timeout;
this.messagesList = new List<IMessage>(this.batchSize);
this.populateTask = this.PopulatePump();
}

public async Task<IMessage[]> GetMessages()
{
List<IMessage> currentMessagesList;
using (await this.messagesLock.LockAsync())
{
currentMessagesList = this.messagesList;
this.messagesList = new List<IMessage>(this.batchSize);
this.messagesResetEvent.Set();
}

return currentMessagesList.ToArray();
}

async Task PopulatePump()
{
while (true)
{
try
{
await this.messagesResetEvent.WaitAsync(this.timeout);
while (this.messagesList.Count < this.batchSize)
{
int curBatchSize = this.batchSize - this.messagesList.Count;
IList<IMessage> messages = (await this.iterator.GetNext(curBatchSize)).ToList();
if (!messages.Any())
{
break;
}

using (await this.messagesLock.LockAsync())
{
this.messagesList.AddRange(messages);
}
}

this.messagesResetEvent.Reset();
}
catch (Exception e)
{
Events.ErrorInPopulatePump(e);
}
}
}
}

static class Events
{
const int IdStart = Routing.EventIds.StoringAsyncEndpointExecutor;
Expand All @@ -191,6 +264,7 @@ enum EventIds
Close,
CloseSuccess,
CloseFailure,
ErrorInPopulatePump
}

public static void AddMessageSuccess(StoringAsyncEndpointExecutor executor, long offset)
Expand Down Expand Up @@ -263,6 +337,11 @@ public static void CloseFailure(StoringAsyncEndpointExecutor executor, Exception
{
Log.LogError((int)EventIds.CloseFailure, ex, "[CloseFailure] Close failed. EndpointId: {0}, EndpointName: {1}", executor.Endpoint.Id, executor.Endpoint.Name);
}

public static void ErrorInPopulatePump(Exception ex)
{
Log.LogWarning((int)EventIds.ErrorInPopulatePump, ex, "Error in populate messages pump");
}
}

static class Metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ static async Task EnterSendingAsync(EndpointExecutorFsm thisPtr)
TimeSpan retryAfter;
ICollection<IMessage> messages = EmptyMessages;
Stopwatch stopwatch = Stopwatch.StartNew();

TimeSpan endpointTimeout = TimeSpan.FromMilliseconds(thisPtr.config.Timeout.TotalMilliseconds * thisPtr.Endpoint.FanOutFactor);
try
{
Preconditions.CheckNotNull(thisPtr.currentSendCommand);
Expand All @@ -294,7 +294,8 @@ static async Task EnterSendingAsync(EndpointExecutorFsm thisPtr)
{
ISinkResult<IMessage> result;
Events.Send(thisPtr, thisPtr.currentSendCommand.Messages, messages);
using (var cts = new CancellationTokenSource(thisPtr.config.Timeout))

using (var cts = new CancellationTokenSource(endpointTimeout))
{
result = await thisPtr.processor.ProcessAsync(messages, cts.Token);
}
Expand Down Expand Up @@ -890,16 +891,16 @@ static void SetProcessingInternalCounters(EndpointExecutorFsm fsm, string status
Log.LogError((int)EventIds.CounterFailure, "[LogEventsProcessedCounterFailed] {0}", error);
}

TimeSpan totalTime = messages.Select(m => m.DequeuedTime).Aggregate(TimeSpan.Zero, (span, time) => span + (fsm.systemTime.UtcNow - time));
long averageLatencyInMs = totalTime < TimeSpan.Zero ? 0L : (long)(totalTime.TotalMilliseconds / messages.Count);
double totalTimeMSecs = messages.Select(m => m.DequeuedTime).Aggregate(0D, (span, time) => span + (fsm.systemTime.UtcNow - time).TotalMilliseconds);
long averageLatencyInMs = totalTimeMSecs < 0 ? 0L : (long)(totalTimeMSecs / messages.Count);

if (!Routing.PerfCounter.LogEventProcessingLatency(fsm.Endpoint.IotHubName, fsm.Endpoint.Name, fsm.Endpoint.Type, status, averageLatencyInMs, out error))
{
Log.LogError((int)EventIds.CounterFailure, "[LogEventProcessingLatencyCounterFailed] {0}", error);
}

TimeSpan messageE2EProcessingLatencyTotal = messages.Select(m => m.EnqueuedTime).Aggregate(TimeSpan.Zero, (span, time) => span + (fsm.systemTime.UtcNow - time));
long averageE2ELatencyInMs = messageE2EProcessingLatencyTotal < TimeSpan.Zero ? 0L : (long)(messageE2EProcessingLatencyTotal.TotalMilliseconds / messages.Count);
double messageE2EProcessingLatencyTotalMSecs = messages.Select(m => m.EnqueuedTime).Aggregate(0D, (span, time) => span + (fsm.systemTime.UtcNow - time).TotalMilliseconds);
long averageE2ELatencyInMs = messageE2EProcessingLatencyTotalMSecs < 0 ? 0L : (long)(messageE2EProcessingLatencyTotalMSecs / messages.Count);

if (!Routing.PerfCounter.LogE2EEventProcessingLatency(fsm.Endpoint.IotHubName, fsm.Endpoint.Name, fsm.Endpoint.Type, status, averageE2ELatencyInMs, out error))
{
Expand All @@ -921,8 +922,8 @@ static void SetSuccessfulEgressUserMetricCounter(EndpointExecutorFsm fsm, IColle
}

// calculate average latency
TimeSpan totalTime = messages.Select(m => m.EnqueuedTime).Aggregate(TimeSpan.Zero, (span, time) => span + (fsm.systemTime.UtcNow - time));
long averageLatencyInMs = totalTime < TimeSpan.Zero ? 0L : (long)(totalTime.TotalMilliseconds / messages.Count);
double totalTimeMSecs = messages.Select(m => m.EnqueuedTime).Aggregate(0D, (span, time) => span + (fsm.systemTime.UtcNow - time).TotalMilliseconds);
long averageLatencyInMs = totalTimeMSecs < 0 ? 0L : (long)(totalTimeMSecs / messages.Count);

fsm.Endpoint.LogUserMetrics(messages.Count, averageLatencyInMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void CloudMessageProcessor_CloseAsyncTest()

IProcessor moduleMessageProcessor = cloudEndpoint.CreateProcessor();
Task result = moduleMessageProcessor.CloseAsync(CancellationToken.None);
Assert.Equal(TaskEx.Done, result);
Assert.Equal(Task.CompletedTask, result);
}

[Fact]
Expand Down
Loading

0 comments on commit 864b33d

Please sign in to comment.