Skip to content

Commit

Permalink
Better API for expanded DbParameter usage
Browse files Browse the repository at this point in the history
  • Loading branch information
odinserj committed Nov 25, 2024
1 parent 502a238 commit b36e67d
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 44 deletions.
35 changes: 35 additions & 0 deletions src/Hangfire.SqlServer/DbCommandExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,21 @@
// License along with Hangfire. If not, see <http://www.gnu.org/licenses/>.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Globalization;
using System.Linq;
using Hangfire.Annotations;

namespace Hangfire.SqlServer
{
internal static class DbCommandExtensions
{
private static readonly ConcurrentDictionary<KeyValuePair<string, KeyValuePair<string, int>>, string> ExpandedQueries =
new ConcurrentDictionary<KeyValuePair<string, KeyValuePair<string, int>>, string>();

public static DbCommand Create(
[NotNull] this DbConnection connection,
[NotNull] string text,
Expand Down Expand Up @@ -83,5 +90,33 @@ public static DbCommand AddReturnParameter(
command.Parameters.Add(parameter);
return command;
}

public static DbCommand AddExpandedParameter<T>(
[NotNull] this DbCommand command,
[NotNull] string parameterName,
[NotNull] T[] parameterValues,
DbType parameterType,
int? parameterSize = null)
{
if (command == null) throw new ArgumentNullException(nameof(command));
if (parameterName == null) throw new ArgumentNullException(nameof(parameterName));
if (parameterValues == null) throw new ArgumentNullException(nameof(parameterValues));

command.CommandText = ExpandedQueries.GetOrAdd(
new KeyValuePair<string, KeyValuePair<string, int>>(command.CommandText, new KeyValuePair<string, int>(parameterName, parameterValues.Length)),
static pair =>
{
return pair.Key.Replace(
pair.Value.Key,
"(" + String.Join(",", Enumerable.Range(0, pair.Value.Value).Select(i => pair.Value.Key + i.ToString(CultureInfo.InvariantCulture))) + ")");
});

for (var i = 0; i < parameterValues.Length; i++)
{
command.AddParameter(parameterName + i.ToString(CultureInfo.InvariantCulture), parameterValues[i], parameterType, parameterSize);
}

return command;
}
}
}
55 changes: 11 additions & 44 deletions src/Hangfire.SqlServer/SqlServerJobQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,7 @@ private static DbCommand CreateNonBlockingFetchCommand(
string[] queues,
int invisibilityTimeout)
{
const string queuesParameterName = "@queues";

var query = NonBlockingQueriesCache.GetOrAdd(
new KeyValuePair<SqlServerStorage, int>(storage, queues.Length),
static pair =>
{
var template = pair.Key.GetQueryFromTemplate(static schemaName => $@"
var template = storage.GetQueryFromTemplate(static schemaName => $@"
set nocount on;set xact_abort on;set tran isolation level read committed;
update top (1) JQ
Expand All @@ -218,20 +212,9 @@ update top (1) JQ
where Queue in @queues and
(FetchedAt is null or FetchedAt < DATEADD(second, @timeoutSs, GETUTCDATE()));");

return template.Replace(
queuesParameterName,
"(" + String.Join(",", Enumerable.Range(0, pair.Value).Select(static i => queuesParameterName + i.ToString(CultureInfo.InvariantCulture))) + ")");
});

var command = connection.Create(query, timeout: storage.CommandTimeout);
command.AddParameter("@timeoutSs", invisibilityTimeout, DbType.Int32);

for (var i = 0; i < queues.Length; i++)
{
command.AddParameter(queuesParameterName + i, queues[i], DbType.String);
}

return command;
return connection.Create(template, timeout: storage.CommandTimeout)
.AddParameter("@timeoutSs", invisibilityTimeout, DbType.Int32)
.AddExpandedParameter("@queues", queues, DbType.String);
}

private SqlServerTransactionJob DequeueUsingTransaction(string[] queues, CancellationToken cancellationToken)
Expand Down Expand Up @@ -314,32 +297,16 @@ private static DbCommand CreateTransactionalFetchCommand(
string[] queues,
int invisibilityTimeout)
{
const string queuesParameterName = "@queues";

var query = TransactionalQueriesCache.GetOrAdd(
new KeyValuePair<SqlServerStorage, int>(storage, queues.Length),
static pair =>
{
var template = pair.Key.GetQueryFromTemplate(static schemaName =>
$@"delete top (1) JQ
var template = storage.GetQueryFromTemplate(static schemaName =>
$@"delete top (1) JQ
output DELETED.Id, DELETED.JobId, DELETED.Queue
from [{schemaName}].JobQueue JQ with (readpast, updlock, rowlock, forceseek)
where Queue in @queues and (FetchedAt is null or FetchedAt < DATEADD(second, @timeout, GETUTCDATE()))");

return template.Replace(
queuesParameterName,
"(" + String.Join(",", Enumerable.Range(0, pair.Value).Select(static i => queuesParameterName + i.ToString(CultureInfo.InvariantCulture))) + ")");
});

var command = connection.Create(query, timeout: storage.CommandTimeout);
command.AddParameter("@timeout", invisibilityTimeout, DbType.Int32);

for (var i = 0; i < queues.Length; i++)
{
command.AddParameter(queuesParameterName + i, queues[i], DbType.String);
}

return command;

return connection
.Create(template, timeout: storage.CommandTimeout)
.AddParameter("@timeout", invisibilityTimeout, DbType.Int32)
.AddExpandedParameter("@queues", queues, DbType.String);
}

private static WaitHandle[] GetWaitArrayForQueueSignals(SqlServerStorage storage, string[] queues, CancellationToken cancellationToken)
Expand Down

0 comments on commit b36e67d

Please sign in to comment.