Skip to content

Commit

Permalink
Periodic dequeue of jobs with timed out heartbeats. (#3470)
Browse files Browse the repository at this point in the history
* Periodic dequeue of jobs with timed out heardbeats.

* Check per thread

* Start stopwatch

* var
  • Loading branch information
SergeyGaluzo authored Aug 7, 2023
1 parent 6691fa6 commit c2ae2bb
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ private async Task<IReadOnlyCollection<JobInfo>> DequeueItemsInternalAsync(JobGr
}

/// <inheritdoc />
public async Task<JobInfo> DequeueAsync(byte queueType, string worker, int heartbeatTimeoutSec, CancellationToken cancellationToken, long? jobId = null)
public async Task<JobInfo> DequeueAsync(byte queueType, string worker, int heartbeatTimeoutSec, CancellationToken cancellationToken, long? jobId = null, bool checkTimeoutJobsOnly = false)
{
return await _retryPolicy.ExecuteAsync(async () =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,13 @@ public async Task<IReadOnlyCollection<JobInfo>> DequeueJobsAsync(byte queueType,
return dequeuedJobs;
}

public async Task<JobInfo> DequeueAsync(byte queueType, string worker, int heartbeatTimeoutSec, CancellationToken cancellationToken, long? jobId = null)
public async Task<JobInfo> DequeueAsync(byte queueType, string worker, int heartbeatTimeoutSec, CancellationToken cancellationToken, long? jobId = null, bool checkTimeoutJobsOnly = false)
{
using var sqlCommand = new SqlCommand() { CommandText = "dbo.DequeueJob", CommandType = CommandType.StoredProcedure };
sqlCommand.Parameters.AddWithValue("@QueueType", queueType);
sqlCommand.Parameters.AddWithValue("@Worker", worker);
sqlCommand.Parameters.AddWithValue("@HeartbeatTimeoutSec", heartbeatTimeoutSec);
sqlCommand.Parameters.AddWithValue("@CheckTimeoutJobs", checkTimeoutJobsOnly);
if (jobId.HasValue)
{
sqlCommand.Parameters.AddWithValue("@InputJobId", jobId.Value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public async Task<IReadOnlyCollection<JobInfo>> DequeueJobsAsync(byte queueType,
return dequeuedJobs;
}

public Task<JobInfo> DequeueAsync(byte queueType, string worker, int heartbeatTimeoutSec, CancellationToken cancellationToken, long? jobId = null)
public Task<JobInfo> DequeueAsync(byte queueType, string worker, int heartbeatTimeoutSec, CancellationToken cancellationToken, long? jobId = null, bool checkTimeoutJobsOnly = false)
{
DequeueFaultAction?.Invoke();
JobInfo job = null;
Expand Down
3 changes: 2 additions & 1 deletion src/Microsoft.Health.TaskManagement/IQueueClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public interface IQueueClient
/// <param name="heartbeatTimeoutSec">Heartbeat timeout for retry</param>
/// <param name="cancellationToken">Cancellation Token</param>
/// <param name="jobId">Requested job id for dequeue</param>
public Task<JobInfo> DequeueAsync(byte queueType, string worker, int heartbeatTimeoutSec, CancellationToken cancellationToken, long? jobId = null);
/// <param name="checkTimeoutJobsOnly">Forces to check only jobs that timed out heartbeats</param>
public Task<JobInfo> DequeueAsync(byte queueType, string worker, int heartbeatTimeoutSec, CancellationToken cancellationToken, long? jobId = null, bool checkTimeoutJobsOnly = false);

/// <summary>
/// Get job by id
Expand Down
11 changes: 10 additions & 1 deletion src/Microsoft.Health.TaskManagement/JobHosting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Security.Cryptography;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -57,14 +58,22 @@ public async Task ExecuteAsync(byte queueType, string workerName, CancellationTo
// random delay to avoid convoys
await Task.Delay(TimeSpan.FromSeconds(RandomNumberGenerator.GetInt32(100) / 100.0 * PollingFrequencyInSeconds));
var checkTimeoutJobStopwatch = Stopwatch.StartNew();
while (!cancellationTokenSource.Token.IsCancellationRequested)
{
JobInfo nextJob = null;
if (_queueClient.IsInitialized())
{
try
{
nextJob = await _queueClient.DequeueAsync(queueType, workerName, JobHeartbeatTimeoutThresholdInSeconds, cancellationTokenSource.Token);
if (checkTimeoutJobStopwatch.Elapsed.TotalSeconds > 600)
{
checkTimeoutJobStopwatch.Restart();
nextJob = await _queueClient.DequeueAsync(queueType, workerName, JobHeartbeatTimeoutThresholdInSeconds, cancellationTokenSource.Token, null, true);
}
nextJob ??= await _queueClient.DequeueAsync(queueType, workerName, JobHeartbeatTimeoutThresholdInSeconds, cancellationTokenSource.Token);
}
catch (Exception ex)
{
Expand Down

0 comments on commit c2ae2bb

Please sign in to comment.