Skip to content

Commit

Permalink
Updating end of interval Flex metrics publishing (#10210)
Browse files Browse the repository at this point in the history
  • Loading branch information
mathewc authored Jun 6, 2024
1 parent ac455ef commit d1067c5
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,19 @@ internal void Initialize()
IsAlwaysReady = _environment.GetEnvironmentVariable(EnvironmentSettingNames.FunctionsAlwaysReadyInstance) == "1";
}

internal async Task OnPublishMetrics()
internal async Task OnPublishMetrics(DateTime now)
{
try
{
lock (_lock)
{
if (ActiveFunctionCount > 0)
{
// at the end of an interval, we'll meter any outstanding activity up to the end of the interval
MeterCurrentActiveInterval(now);
}
}

if (FunctionExecutionCount == 0 && FunctionExecutionTimeMS == 0 && !IsAlwaysReady && !_metricsProvider.HasMetrics())
{
// no activity to report
Expand Down Expand Up @@ -137,7 +146,7 @@ internal async Task OnPublishMetrics()

private async void OnFunctionMetricsPublishTimer(object state)
{
await OnPublishMetrics();
await OnPublishMetrics(DateTime.UtcNow);
}

private async Task PublishMetricsAsync(Metrics metrics)
Expand Down Expand Up @@ -280,26 +289,7 @@ internal void OnFunctionCompleted(string functionName, string invocationId, Date
{
// We're transitioning from active to inactive, so we need to accumulate the elapsed time
// for this interval.
DateTime adjustedActivityIntervalStart = _currentActivityIntervalStart;
if (_activityIntervalHighWatermark > _currentActivityIntervalStart)
{
// If we've already metered a previous interval past the current time,
// we move forward (since we never want to meter the same interval twice).
adjustedActivityIntervalStart = _activityIntervalHighWatermark;
}

// If the elapsed duration is negative, it means invocations are still before
// the high watermark, so have already been metered.
double elapsedMS = (now - adjustedActivityIntervalStart).TotalMilliseconds;
if (elapsedMS > 0)
{
// Accumulate the duration for this interval, applying the minimum
var duration = Math.Max(elapsedMS, _options.MinimumActivityIntervalMS);
FunctionExecutionTimeMS += RoundUp(duration, _options.MetricsGranularityMS);

// Move the high watermark timestamp forward
_activityIntervalHighWatermark = adjustedActivityIntervalStart.AddMilliseconds(duration);
}
MeterCurrentActiveInterval(now);
}

// for every completed invocation, increment our invocation count
Expand All @@ -312,10 +302,36 @@ public void AddFunctionExecutionActivity(string functionName, string invocationI
// nothing to do here - we only care about Started/Completed events.
}

private void MeterCurrentActiveInterval(DateTime now)
{
DateTime adjustedActivityIntervalStart = _currentActivityIntervalStart;
if (_activityIntervalHighWatermark > _currentActivityIntervalStart)
{
// If we've already metered a previous interval past the current time,
// we move forward (since we never want to meter the same interval twice).
adjustedActivityIntervalStart = _activityIntervalHighWatermark;
}

// If the elapsed duration is negative, it means invocations are still before
// the high watermark, so have already been metered.
double elapsedMS = (now - adjustedActivityIntervalStart).TotalMilliseconds;
if (elapsedMS > 0)
{
// Accumulate the duration for this interval, applying minimums and rounding
var duration = Math.Max(elapsedMS, _options.MinimumActivityIntervalMS);
duration = RoundUp(duration, _options.MetricsGranularityMS);
FunctionExecutionTimeMS += (long)duration;

// Move the high watermark timestamp forward to the point
// up to which we've metered
_activityIntervalHighWatermark = adjustedActivityIntervalStart.AddMilliseconds(duration);
}
}

// Rounds up the given metric to a specified granularity. For example, RoundUp(1320.00, 100) = 1400, but RoundUp(1300.00, 100) = 1300.
private static long RoundUp(double metric, int granularity)
private static double RoundUp(double metric, int granularity)
{
return (long)Math.Ceiling(metric / granularity) * granularity;
return Math.Ceiling(metric / granularity) * granularity;
}

public void Dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public async Task OnPublishMetrics_WritesFileAndResetsCounts(bool isAlwaysReadyI
int delay = 100;
await Task.Delay(delay);

await publisher.OnPublishMetrics();
await publisher.OnPublishMetrics(DateTime.UtcNow);

FileInfo[] files = GetMetricsFilesSafe(_metricsFilePath);

Expand Down Expand Up @@ -137,7 +137,7 @@ public async Task OnPublishMetrics_WritesFileAndResetsCounts(bool isAlwaysReadyI
delay = (int)executionDurationMS + 100;
await Task.Delay(delay);

await publisher.OnPublishMetrics();
await publisher.OnPublishMetrics(DateTime.UtcNow);

files = GetMetricsFilesSafe(_metricsFilePath);
Assert.Equal(1, files.Length);
Expand Down Expand Up @@ -180,7 +180,7 @@ public async Task OnPublishMetrics_PurgesOldFiles()
int delay = (int)executionDurationMS + 100;
await Task.Delay(delay);

await publisher.OnPublishMetrics();
await publisher.OnPublishMetrics(DateTime.UtcNow);

FileInfo[] files = GetMetricsFilesSafe(_metricsFilePath);
Assert.Equal(5, files.Length);
Expand Down Expand Up @@ -437,6 +437,100 @@ public void FunctionsStartStop_MinimumActivityIntervals_Scenario3()
Assert.Equal(4800, publisher.FunctionExecutionTimeMS);
}

[Fact]
public async Task OnPublishMetrics_OutstandingActivityIsPublished()
{
CleanupMetricsFiles();

var publisher = CreatePublisher(metricsPublishInterval: TimeSpan.FromHours(1), inStandbyMode: false);

Assert.Equal(1000, _options.MinimumActivityIntervalMS);

Assert.Equal(0, publisher.ActiveFunctionCount);
Assert.Equal(0, publisher.FunctionExecutionCount);
Assert.Equal(0, publisher.FunctionExecutionTimeMS);

DateTime now = DateTime.UtcNow;

// *** Interval 0 ***
// one function starts but doesn't complete before the interval ends
now += TimeSpan.FromMilliseconds(200);
publisher.OnFunctionStarted("foo", "0", now);

now += TimeSpan.FromMilliseconds(1800);
await publisher.OnPublishMetrics(now);

FileInfo[] files = GetMetricsFilesSafe(_metricsFilePath);
var metrics = await ReadMetricsAsync(files[0].FullName, deleteFile: true);

// we expect to emit metrics for the duration of the active window
// while the function continues running
Assert.Equal(1, publisher.ActiveFunctionCount);
Assert.Equal(0, metrics.ExecutionCount);
Assert.Equal(1800, metrics.ExecutionTimeMS);

// *** Interval 1 ****
// another function starts and a short time later the first completes
now += TimeSpan.FromMilliseconds(100);
publisher.OnFunctionStarted("foo", "1", now);
now += TimeSpan.FromMilliseconds(100);
publisher.OnFunctionCompleted("foo", "0", now);

// a short time later another function starts
// then completes a short time later
now += TimeSpan.FromMilliseconds(700);
publisher.OnFunctionStarted("bar", "2", now);
now += TimeSpan.FromMilliseconds(800);
publisher.OnFunctionCompleted("bar", "2", now);

Assert.Equal(1, publisher.ActiveFunctionCount);
Assert.Equal(2, publisher.FunctionExecutionCount);
Assert.Equal(0, publisher.FunctionExecutionTimeMS);

// wait another 300ms then simulate the end of the interval
now += TimeSpan.FromMilliseconds(300);
await publisher.OnPublishMetrics(now);

files = GetMetricsFilesSafe(_metricsFilePath);
metrics = await ReadMetricsAsync(files[0].FullName, deleteFile: true);

// we expect metrics reflecting the activity during the entire window
Assert.Equal(1, publisher.ActiveFunctionCount);
Assert.Equal(2, metrics.ExecutionCount);
Assert.Equal(2000, metrics.ExecutionTimeMS);

// *** Interval 2 ***
// now, a little while later the invocation completes
// this gets metered for 1000ms (the minimum)
now += TimeSpan.FromMilliseconds(400);
publisher.OnFunctionCompleted("foo", "1", now);

// after a short time another function starts then completes
// because we've metered the previous function for 1000ms,
// this invocation isn't metered (it falls within the previous period)
now += TimeSpan.FromMilliseconds(200);
publisher.OnFunctionStarted("foo", "3", now);
now += TimeSpan.FromMilliseconds(300);
publisher.OnFunctionCompleted("foo", "3", now);

// another function starts and continues running past the
// end of the second interval
// at the end of thie interval, while the function has only
// ran for 700ms this gets rounded up to 1000ms
now += TimeSpan.FromMilliseconds(400);
publisher.OnFunctionStarted("foo", "4", now);
now += TimeSpan.FromMilliseconds(700);

await publisher.OnPublishMetrics(now);

files = GetMetricsFilesSafe(_metricsFilePath);
metrics = await ReadMetricsAsync(files[0].FullName, deleteFile: true);

Assert.Equal(1, publisher.ActiveFunctionCount);
Assert.Equal(2, metrics.ExecutionCount);
Assert.Equal(2000, metrics.ExecutionTimeMS);
}

[Fact]
public void OnFunctionCompleted_NoOutstandingInvocations_IgnoresEvent()
{
Expand Down Expand Up @@ -473,9 +567,15 @@ public void CleanupMetricsFiles()
}
}

private static async Task<FlexConsumptionMetricsPublisher.Metrics> ReadMetricsAsync(string metricsFilePath)
private static async Task<FlexConsumptionMetricsPublisher.Metrics> ReadMetricsAsync(string metricsFilePath, bool deleteFile = false)
{
string content = await File.ReadAllTextAsync(metricsFilePath);

if (deleteFile)
{
File.Delete(metricsFilePath);
}

return JsonConvert.DeserializeObject<FlexConsumptionMetricsPublisher.Metrics>(content);
}

Expand Down

0 comments on commit d1067c5

Please sign in to comment.