Skip to content

Commit

Permalink
Add SqlServerOptions.CommandTimeout option
Browse files Browse the repository at this point in the history
  • Loading branch information
odinserj committed Mar 2, 2017
1 parent 6d15563 commit 162ccea
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 65 deletions.
3 changes: 2 additions & 1 deletion src/Hangfire.SqlServer/CountersAggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public void Execute(CancellationToken cancellationToken)
{
removedCount = connection.Execute(
GetAggregationQuery(_storage),
new { now = DateTime.UtcNow, count = NumberOfRecordsInSinglePass });
new { now = DateTime.UtcNow, count = NumberOfRecordsInSinglePass },
commandTimeout: 0);
});

if (removedCount >= NumberOfRecordsInSinglePass)
Expand Down
65 changes: 40 additions & 25 deletions src/Hangfire.SqlServer/SqlServerConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ output inserted.Id
arguments = invocationData.Arguments,
createdAt = createdAt,
expireAt = createdAt.Add(expireIn)
}).ToString();
},
commandTimeout: _storage.CommandTimeout).ToString();

if (parameters.Count > 0)
{
Expand All @@ -114,7 +115,7 @@ output inserted.Id
$@"insert into [{_storage.SchemaName}].JobParameter (JobId, Name, Value)
values (@jobId, @name, @value)";

connection.Execute(insertParameterSql, parameterArray);
connection.Execute(insertParameterSql, parameterArray, commandTimeout: _storage.CommandTimeout);
}

return jobId;
Expand All @@ -130,7 +131,7 @@ public override JobData GetJobData(string id)

return _storage.UseConnection(connection =>
{
var jobData = connection.Query<SqlJob>(sql, new { id = long.Parse(id) })
var jobData = connection.Query<SqlJob>(sql, new { id = long.Parse(id) }, commandTimeout: _storage.CommandTimeout)
.SingleOrDefault();

if (jobData == null) return null;
Expand Down Expand Up @@ -173,7 +174,7 @@ public override StateData GetStateData(string jobId)

return _storage.UseConnection(connection =>
{
var sqlState = connection.Query<SqlState>(sql, new { jobId = long.Parse(jobId) }).SingleOrDefault();
var sqlState = connection.Query<SqlState>(sql, new { jobId = long.Parse(jobId) }, commandTimeout: _storage.CommandTimeout).SingleOrDefault();
if (sqlState == null)
{
return null;
Expand Down Expand Up @@ -205,7 +206,8 @@ public override void SetJobParameter(string id, string name, string value)
on Target.JobId = Source.JobId AND Target.Name = Source.Name
when matched then update set Value = Source.Value
when not matched then insert (JobId, Name, Value) values (Source.JobId, Source.Name, Source.Value);",
new { jobId = long.Parse(id), name, value });
new { jobId = long.Parse(id), name, value },
commandTimeout: _storage.CommandTimeout);
});
}

Expand All @@ -216,7 +218,8 @@ public override string GetJobParameter(string id, string name)

return _storage.UseConnection(connection => connection.ExecuteScalar<string>(
$@"select top (1) Value from [{_storage.SchemaName}].JobParameter with (readcommittedlock) where JobId = @id and Name = @name",
new { id = long.Parse(id), name = name }));
new { id = long.Parse(id), name = name },
commandTimeout: _storage.CommandTimeout));
}

public override HashSet<string> GetAllItemsFromSet(string key)
Expand All @@ -227,7 +230,8 @@ public override HashSet<string> GetAllItemsFromSet(string key)
{
var result = connection.Query<string>(
$@"select Value from [{_storage.SchemaName}].[Set] with (readcommittedlock) where [Key] = @key",
new { key });
new { key },
commandTimeout: _storage.CommandTimeout);

return new HashSet<string>(result);
});
Expand All @@ -240,7 +244,8 @@ public override string GetFirstByLowestScoreFromSet(string key, double fromScore

return _storage.UseConnection(connection => connection.ExecuteScalar<string>(
$@"select top 1 Value from [{_storage.SchemaName}].[Set] with (readcommittedlock) where [Key] = @key and Score between @from and @to order by Score",
new { key, from = fromScore, to = toScore }));
new { key, from = fromScore, to = toScore },
commandTimeout: _storage.CommandTimeout));
}

public override void SetRangeInHash(string key, IEnumerable<KeyValuePair<string, string>> keyValuePairs)
Expand All @@ -259,7 +264,11 @@ public override void SetRangeInHash(string key, IEnumerable<KeyValuePair<string,
{
foreach (var keyValuePair in keyValuePairs)
{
connection.Execute(sql, new { key = key, field = keyValuePair.Key, value = keyValuePair.Value }, transaction);
connection.Execute(
sql,
new { key = key, field = keyValuePair.Key, value = keyValuePair.Value },
transaction,
commandTimeout: _storage.CommandTimeout);
}
});
}
Expand All @@ -272,7 +281,8 @@ public override Dictionary<string, string> GetAllEntriesFromHash(string key)
{
var result = connection.Query<SqlHash>(
$"select Field, Value from [{_storage.SchemaName}].Hash with (forceseek, readcommittedlock) where [Key] = @key",
new { key })
new { key },
commandTimeout: _storage.CommandTimeout)
.ToDictionary(x => x.Field, x => x.Value);

return result.Count != 0 ? result : null;
Expand All @@ -299,7 +309,8 @@ public override void AnnounceServer(string serverId, ServerContext context)
on Target.Id = Source.Id
when matched then update set Data = Source.Data, LastHeartbeat = Source.Heartbeat
when not matched then insert (Id, Data, LastHeartbeat) values (Source.Id, Source.Data, Source.Heartbeat);",
new { id = serverId, data = JobHelper.ToJson(data), heartbeat = DateTime.UtcNow });
new { id = serverId, data = JobHelper.ToJson(data), heartbeat = DateTime.UtcNow },
commandTimeout: _storage.CommandTimeout);
});
}

Expand All @@ -311,7 +322,8 @@ public override void RemoveServer(string serverId)
{
connection.Execute(
$@"delete from [{_storage.SchemaName}].Server where Id = @id",
new { id = serverId });
new { id = serverId },
commandTimeout: _storage.CommandTimeout);
});
}

Expand All @@ -323,7 +335,8 @@ public override void Heartbeat(string serverId)
{
connection.Execute(
$@"update [{_storage.SchemaName}].Server set LastHeartbeat = @now where Id = @id",
new { now = DateTime.UtcNow, id = serverId });
new { now = DateTime.UtcNow, id = serverId },
commandTimeout: _storage.CommandTimeout);
});
}

Expand All @@ -336,7 +349,8 @@ public override int RemoveTimedOutServers(TimeSpan timeOut)

return _storage.UseConnection(connection => connection.Execute(
$@"delete from [{_storage.SchemaName}].Server where LastHeartbeat < @timeOutAt",
new { timeOutAt = DateTime.UtcNow.Add(timeOut.Negate()) }));
new { timeOutAt = DateTime.UtcNow.Add(timeOut.Negate()) },
commandTimeout: _storage.CommandTimeout));
}

public override long GetSetCount(string key)
Expand All @@ -345,7 +359,8 @@ public override long GetSetCount(string key)

return _storage.UseConnection(connection => connection.Query<int>(
$"select count([Key]) from [{_storage.SchemaName}].[Set] with (readcommittedlock) where [Key] = @key",
new { key = key }).First());
new { key = key },
commandTimeout: _storage.CommandTimeout).First());
}

public override List<string> GetRangeFromSet(string key, int startingFrom, int endingAt)
Expand All @@ -360,7 +375,7 @@ public override List<string> GetRangeFromSet(string key, int startingFrom, int e
) as s where s.row_num between @startingFrom and @endingAt";

return _storage.UseConnection(connection => connection
.Query<string>(query, new { key = key, startingFrom = startingFrom + 1, endingAt = endingAt + 1 })
.Query<string>(query, new { key = key, startingFrom = startingFrom + 1, endingAt = endingAt + 1 }, commandTimeout: _storage.CommandTimeout)
.ToList());
}

Expand All @@ -372,7 +387,7 @@ public override TimeSpan GetSetTtl(string key)

return _storage.UseConnection(connection =>
{
var result = connection.ExecuteScalar<DateTime?>(query, new { key = key });
var result = connection.ExecuteScalar<DateTime?>(query, new { key = key }, commandTimeout: _storage.CommandTimeout);
if (!result.HasValue) return TimeSpan.FromSeconds(-1);

return result.Value - DateTime.UtcNow;
Expand All @@ -391,7 +406,7 @@ union all
where [Key] = @key) as s";

return _storage.UseConnection(connection =>
connection.ExecuteScalar<long?>(query, new { key = key }) ?? 0);
connection.ExecuteScalar<long?>(query, new { key = key }, commandTimeout: _storage.CommandTimeout) ?? 0);
}

public override long GetHashCount(string key)
Expand All @@ -400,7 +415,7 @@ public override long GetHashCount(string key)

string query = $@"select count([Id]) from [{_storage.SchemaName}].Hash with (readcommittedlock) where [Key] = @key";

return _storage.UseConnection(connection => connection.ExecuteScalar<long>(query, new { key = key }));
return _storage.UseConnection(connection => connection.ExecuteScalar<long>(query, new { key = key }, commandTimeout: _storage.CommandTimeout));
}

public override TimeSpan GetHashTtl(string key)
Expand All @@ -411,7 +426,7 @@ public override TimeSpan GetHashTtl(string key)

return _storage.UseConnection(connection =>
{
var result = connection.ExecuteScalar<DateTime?>(query, new { key = key });
var result = connection.ExecuteScalar<DateTime?>(query, new { key = key }, commandTimeout: _storage.CommandTimeout);
if (!result.HasValue) return TimeSpan.FromSeconds(-1);

return result.Value - DateTime.UtcNow;
Expand All @@ -428,7 +443,7 @@ public override string GetValueFromHash(string key, string name)
where [Key] = @key and [Field] = @field";

return _storage.UseConnection(connection => connection
.ExecuteScalar<string>(query, new { key = key, field = name }));
.ExecuteScalar<string>(query, new { key = key, field = name }, commandTimeout: _storage.CommandTimeout));
}

public override long GetListCount(string key)
Expand All @@ -439,7 +454,7 @@ public override long GetListCount(string key)
$@"select count([Id]) from [{_storage.SchemaName}].List with (readcommittedlock)
where [Key] = @key";

return _storage.UseConnection(connection => connection.ExecuteScalar<long>(query, new { key = key }));
return _storage.UseConnection(connection => connection.ExecuteScalar<long>(query, new { key = key }, commandTimeout: _storage.CommandTimeout));
}

public override TimeSpan GetListTtl(string key)
Expand All @@ -452,7 +467,7 @@ public override TimeSpan GetListTtl(string key)

return _storage.UseConnection(connection =>
{
var result = connection.ExecuteScalar<DateTime?>(query, new { key = key });
var result = connection.ExecuteScalar<DateTime?>(query, new { key = key }, commandTimeout: _storage.CommandTimeout);
if (!result.HasValue) return TimeSpan.FromSeconds(-1);

return result.Value - DateTime.UtcNow;
Expand All @@ -471,7 +486,7 @@ public override List<string> GetRangeFromList(string key, int startingFrom, int
) as s where s.row_num between @startingFrom and @endingAt";

return _storage.UseConnection(connection => connection
.Query<string>(query, new { key = key, startingFrom = startingFrom + 1, endingAt = endingAt + 1 })
.Query<string>(query, new { key = key, startingFrom = startingFrom + 1, endingAt = endingAt + 1 }, commandTimeout: _storage.CommandTimeout)
.ToList());
}

Expand All @@ -484,7 +499,7 @@ public override List<string> GetAllItemsFromList(string key)
where [Key] = @key
order by [Id] desc";

return _storage.UseConnection(connection => connection.Query<string>(query, new { key = key }).ToList());
return _storage.UseConnection(connection => connection.Query<string>(query, new { key = key }, commandTimeout: _storage.CommandTimeout).ToList());
}
}
}
5 changes: 3 additions & 2 deletions src/Hangfire.SqlServer/SqlServerJobQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public IFetchedJob Dequeue(string[] queues, CancellationToken cancellationToken)
fetchedJob = connection.Query<FetchedJob>(
fetchJobSqlTemplate,
new { queues = queues },
transaction).SingleOrDefault();
transaction,
commandTimeout: _storage.CommandTimeout).SingleOrDefault();

if (fetchedJob != null)
{
Expand Down Expand Up @@ -117,7 +118,7 @@ public void Enqueue(DbConnection connection, DbTransaction transaction, string q
#if !NETFULL
, transaction
#endif
);
, commandTimeout: _storage.CommandTimeout);
}

[UsedImplicitly(ImplicitUseTargetFlags.WithMembers)]
Expand Down
7 changes: 4 additions & 3 deletions src/Hangfire.SqlServer/SqlServerJobQueueMonitoringApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public IEnumerable<string> GetQueues()
{
var result = UseTransaction((connection, transaction) =>
{
return connection.Query(sqlQuery, transaction: transaction).Select(x => (string) x.Queue).ToList();
return connection.Query(sqlQuery, transaction: transaction, commandTimeout: _storage.CommandTimeout).Select(x => (string) x.Queue).ToList();
});

_queuesCache = result;
Expand All @@ -83,7 +83,8 @@ public IEnumerable<int> GetEnqueuedJobIds(string queue, int @from, int perPage)
return connection.Query<JobIdDto>(
sqlQuery,
new { queue = queue, start = from + 1, end = @from + perPage },
transaction)
transaction,
commandTimeout: _storage.CommandTimeout)
.ToList()
.Select(x => (int)x.JobId)
.ToList();
Expand All @@ -102,7 +103,7 @@ public EnqueuedAndFetchedCountDto GetEnqueuedAndFetchedCount(string queue)

return UseTransaction((connection, transaction) =>
{
var result = connection.ExecuteScalar<int>(sqlQuery, new { queue = queue }, transaction);
var result = connection.ExecuteScalar<int>(sqlQuery, new { queue = queue }, transaction, commandTimeout: _storage.CommandTimeout);

return new EnqueuedAndFetchedCountDto
{
Expand Down
21 changes: 13 additions & 8 deletions src/Hangfire.SqlServer/SqlServerMonitoringApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public IList<ServerDto> Servers()
return UseConnection<IList<ServerDto>>(connection =>
{
var servers = connection.Query<Entities.Server>(
$@"select * from [{_storage.SchemaName}].Server with (nolock)")
$@"select * from [{_storage.SchemaName}].Server with (nolock)", commandTimeout: _storage.CommandTimeout)
.ToList();

var result = new List<ServerDto>();
Expand Down Expand Up @@ -266,7 +266,7 @@ public JobDetailsDto JobDetails(string jobId)
select * from [{_storage.SchemaName}].JobParameter with (nolock) where JobId = @id
select * from [{_storage.SchemaName}].State with (nolock) where JobId = @id order by Id desc";

using (var multi = connection.QueryMultiple(sql, new { id = jobId }))
using (var multi = connection.QueryMultiple(sql, new { id = jobId }, commandTimeout: _storage.CommandTimeout))
{
var job = multi.Read<SqlJob>().SingleOrDefault();
if (job == null) return null;
Expand Down Expand Up @@ -336,7 +336,7 @@ union all
var statistics = UseConnection(connection =>
{
var stats = new StatisticsDto();
using (var multi = connection.QueryMultiple(sql))
using (var multi = connection.QueryMultiple(sql, commandTimeout: _storage.CommandTimeout))
{
stats.Enqueued = multi.ReadSingle<int>();
stats.Failed = multi.ReadSingle<int>();
Expand Down Expand Up @@ -400,7 +400,8 @@ private Dictionary<DateTime, long> GetTimelineStats(

var valuesMap = connection.Query(
sqlQuery,
new { keys = keyMaps.Keys })
new { keys = keyMaps.Keys },
commandTimeout: _storage.CommandTimeout)
.ToDictionary(x => (string)x.Key, x => (long)x.Count);

foreach (var key in keyMaps.Keys)
Expand Down Expand Up @@ -441,7 +442,8 @@ private JobList<EnqueuedJobDto> EnqueuedJobs(DbConnection connection, long[] job

var jobs = connection.Query<SqlJob>(
enqueuedJobsSql,
new { jobIds = jobIds })
new { jobIds = jobIds },
commandTimeout: _storage.CommandTimeout)
.ToDictionary(x => x.Id, x => x);

var sortedSqlJobs = jobIds
Expand All @@ -468,7 +470,8 @@ private long GetNumberOfJobsByStateName(DbConnection connection, string stateNam

var count = connection.ExecuteScalar<int>(
sqlQuery,
new { state = stateName, limit = _jobListLimit });
new { state = stateName, limit = _jobListLimit },
commandTimeout: _storage.CommandTimeout);

return count;
}
Expand Down Expand Up @@ -511,7 +514,8 @@ where cte.row_num between @start and @end

var jobs = connection.Query<SqlJob>(
jobsSql,
new { stateName = stateName, start = @from + 1, end = @from + count })
new { stateName = stateName, start = @from + 1, end = @from + count },
commandTimeout: _storage.CommandTimeout)
.ToList();

return DeserializeJobs(jobs, selector);
Expand Down Expand Up @@ -555,7 +559,8 @@ private JobList<FetchedJobDto> FetchedJobs(DbConnection connection, IEnumerable<

var jobs = connection.Query<SqlJob>(
fetchedJobsSql,
new { jobIds = jobIds })
new { jobIds = jobIds },
commandTimeout: _storage.CommandTimeout)
.ToList();

var result = new List<KeyValuePair<string, FetchedJobDto>>(jobs.Count);
Expand Down
4 changes: 2 additions & 2 deletions src/Hangfire.SqlServer/SqlServerObjectsInstaller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static void Install(DbConnection connection, string schema)
{
try
{
connection.Execute(script);
connection.Execute(script, commandTimeout: 0);
break;
}
catch (DbException ex)
Expand All @@ -76,7 +76,7 @@ public static void Install(DbConnection connection, string schema)
}
}
#else
connection.Execute(script);
connection.Execute(script, commandTimeout: 0);
#endif

Log.Info("Hangfire SQL objects installed.");
Expand Down
Loading

0 comments on commit 162ccea

Please sign in to comment.