Skip to content

Commit

Permalink
Avoid using the CancellationToken.WaitHandle property (#1209)
Browse files Browse the repository at this point in the history
* Avoid using the CancellationToken.WaitHandle property

Looks like there's a bug in .NET related to the given property that lead
to the high CPU spikes after hours of running. The problem has gone
away, when we stopped to use the CT.WaitHandle property.

* Tune expected delay for .NET Core 1.0 a bit

* Fix the license file name in the psake project

* Fix LICENSE.md file extention in nuspec files
  • Loading branch information
odinserj authored and pieceofsummer committed Jul 5, 2018
1 parent f970f56 commit 140b92f
Show file tree
Hide file tree
Showing 15 changed files with 203 additions and 61 deletions.
2 changes: 1 addition & 1 deletion nuspecs/Hangfire.AspNetCore.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@

<file src="..\src\Hangfire.AspNetCore\**\*.cs" target="src" exclude="**\obj*\**\*.cs" />

<file src="LICENSE" />
<file src="LICENSE.md" />
<file src="NOTICES" />
<file src="COPYING" />
<file src="COPYING.LESSER" />
Expand Down
2 changes: 1 addition & 1 deletion nuspecs/Hangfire.Core.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@

<file src="..\src\Hangfire.Core\**\*.cs;..\src\Hangfire.Core\**\*.cshtml" target="src" exclude="**\obj*\**\*.cs" />

<file src="LICENSE" />
<file src="LICENSE.md" />
<file src="NOTICES" />
<file src="COPYING" />
<file src="COPYING.LESSER" />
Expand Down
2 changes: 1 addition & 1 deletion nuspecs/Hangfire.SqlServer.MSMQ.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<file src="net45\Hangfire.SqlServer.Msmq.xml" target="lib\net45" />
<file src="net45\Hangfire.SqlServer.Msmq.pdb" target="lib\net45" />
<file src="..\src\Hangfire.SqlServer.Msmq\**\*.cs" target="src" exclude="**\obj*\**\*.cs" />
<file src="LICENSE" />
<file src="LICENSE.md" />
<file src="NOTICES" />
<file src="COPYING" />
<file src="COPYING.LESSER" />
Expand Down
2 changes: 1 addition & 1 deletion nuspecs/Hangfire.SqlServer.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@
<file src="Tools\DefaultInstall.sql" target="tools\install.sql" />
<file src="..\src\Hangfire.SqlServer\**\*.cs" target="src" exclude="**\obj*\**\*.cs" />

<file src="LICENSE" />
<file src="LICENSE.md" />
<file src="NOTICES" />
<file src="COPYING" />
<file src="COPYING.LESSER" />
Expand Down
2 changes: 1 addition & 1 deletion psake-project.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Task Collect -Depends Merge -Description "Copy all artifacts to the build folder
Collect-Localizations "Hangfire.Core" "net45"
Collect-Localizations "Hangfire.Core" "netstandard1.3"

Collect-File "LICENSE"
Collect-File "LICENSE.md"
Collect-File "NOTICES"
Collect-File "COPYING.LESSER"
Collect-File "COPYING"
Expand Down
83 changes: 83 additions & 0 deletions src/Hangfire.Core/Common/CancellationTokenExtentions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// This file is part of Hangfire.
// Copyright © 2018 Sergey Odinokov.
//
// Hangfire is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3
// of the License, or any later version.
//
// Hangfire is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with Hangfire. If not, see <http://www.gnu.org/licenses/>.

using System;
using System.Threading;

namespace Hangfire.Common
{
public static class CancellationTokenExtentions
{
/// <summary>
/// Returns a class that contains a <see cref="EventWaitHandle"/> that is set, when
/// the given <paramref name="cancellationToken"/> is canceled. This method is based
/// on cancellation token registration and avoids using the <see cref="CancellationToken.WaitHandle"/>
/// property as it may lead to high CPU issues.
/// </summary>
public static CancellationEvent GetCancellationEvent(this CancellationToken cancellationToken)
{
return new CancellationEvent(cancellationToken);
}

/// <summary>
/// Performs a wait until the specified <paramref name="timeout"/> is elapsed or the
/// given cancellation token is canceled. The wait is performed on a dedicated event
/// wait handle to avoid using the <see cref="CancellationToken.WaitHandle"/> property
/// that may lead to high CPU issues.
/// </summary>
public static bool Wait(this CancellationToken cancellationToken, TimeSpan timeout)
{
using (var cancellationEvent = GetCancellationEvent(cancellationToken))
{
return cancellationEvent.WaitHandle.WaitOne(timeout);
}
}

public class CancellationEvent : IDisposable
{
private readonly ManualResetEvent _mre;
private CancellationTokenRegistration _registration;

public CancellationEvent(CancellationToken cancellationToken)
{
_mre = new ManualResetEvent(false);
_registration = cancellationToken.Register(SetEvent, _mre);
}

public EventWaitHandle WaitHandle => _mre;

public void Dispose()
{
_registration.Dispose();
_mre.Dispose();
}

private static void SetEvent(object state)
{
try
{
((ManualResetEvent)state).Set();
}
catch (ObjectDisposedException)
{
// When our event instance is already disposed, we already
// aren't interested in any notifications. This statement
// is just to ensure we don't throw any exceptions.
}
}
}
}
}
1 change: 1 addition & 0 deletions src/Hangfire.Core/Hangfire.Core.NetStandard.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
<Compile Include="Client\IClientExceptionFilter.cs" />
<Compile Include="Client\IClientFilter.cs" />
<Compile Include="Common\CachedExpressionCompiler.cs" />
<Compile Include="Common\CancellationTokenExtentions.cs" />
<Compile Include="Common\ExpressionUtil\BinaryExpressionFingerprint.cs" />
<Compile Include="Common\ExpressionUtil\CachedExpressionCompiler.cs" />
<Compile Include="Common\ExpressionUtil\ConditionalExpressionFingerprint.cs" />
Expand Down
1 change: 1 addition & 0 deletions src/Hangfire.Core/Hangfire.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
<Compile Include="BackgroundJobServer.cs" />
<Compile Include="BackgroundJobServerOptions.cs" />
<Compile Include="Client\CoreBackgroundJobFactory.cs" />
<Compile Include="Common\CancellationTokenExtentions.cs" />
<Compile Include="Common\MethodInfoExtensions.cs" />
<Compile Include="Dashboard\Content\resx\Strings.Designer.cs">
<AutoGen>True</AutoGen>
Expand Down
3 changes: 2 additions & 1 deletion src/Hangfire.Core/Server/BackgroundProcessContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using System.Collections.Generic;
using System.Threading;
using Hangfire.Annotations;
using Hangfire.Common;

namespace Hangfire.Server
{
Expand Down Expand Up @@ -54,7 +55,7 @@ public BackgroundProcessContext(

public void Wait(TimeSpan timeout)
{
CancellationToken.WaitHandle.WaitOne(timeout);
CancellationToken.Wait(timeout);
}
}
}
3 changes: 2 additions & 1 deletion src/Hangfire.Core/Server/EveryMinuteThrottler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

using System;
using System.Threading;
using Hangfire.Common;

namespace Hangfire.Server
{
Expand All @@ -36,7 +37,7 @@ public void Delay(CancellationToken token)

private static void WaitASecondOrThrowIfCanceled(CancellationToken token)
{
token.WaitHandle.WaitOne(TimeSpan.FromSeconds(1));
token.Wait(TimeSpan.FromSeconds(1));
token.ThrowIfCancellationRequested();
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/Hangfire.SqlServer/CountersAggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using System;
using System.Threading;
using Dapper;
using Hangfire.Common;
using Hangfire.Logging;
using Hangfire.Server;

Expand Down Expand Up @@ -63,15 +64,15 @@ public void Execute(CancellationToken cancellationToken)

if (removedCount >= NumberOfRecordsInSinglePass)
{
cancellationToken.WaitHandle.WaitOne(DelayBetweenPasses);
cancellationToken.Wait(DelayBetweenPasses);
cancellationToken.ThrowIfCancellationRequested();
}
// ReSharper disable once LoopVariableIsNeverChangedInsideLoop
} while (removedCount >= NumberOfRecordsInSinglePass);

Logger.Trace("Records from the 'Counter' table aggregated.");

cancellationToken.WaitHandle.WaitOne(_interval);
cancellationToken.Wait(_interval);
}

public override string ToString()
Expand Down
3 changes: 2 additions & 1 deletion src/Hangfire.SqlServer/ExpirationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using System.Data.Common;
using System.Data.SqlClient;
using System.Threading;
using Hangfire.Common;
using Hangfire.Logging;
using Hangfire.Server;
using Hangfire.Storage;
Expand Down Expand Up @@ -85,7 +86,7 @@ public void Execute(CancellationToken cancellationToken)
Logger.Trace($"Outdated records removed from the '{table}' table.");
}

cancellationToken.WaitHandle.WaitOne(_checkInterval);
cancellationToken.Wait(_checkInterval);
}

public override string ToString()
Expand Down
109 changes: 58 additions & 51 deletions src/Hangfire.SqlServer/SqlServerJobQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
using System.Threading;
using Dapper;
using Hangfire.Annotations;
using Hangfire.Common;
using Hangfire.Storage;

// ReSharper disable RedundantAnonymousTypePropertyName
Expand Down Expand Up @@ -95,31 +96,34 @@ update top (1) JQ
where Queue in @queues and
(FetchedAt is null or FetchedAt < DATEADD(second, @timeout, GETUTCDATE()))";

do
using (var cancellationEvent = cancellationToken.GetCancellationEvent())
{
cancellationToken.ThrowIfCancellationRequested();

_storage.UseConnection(null, connection =>
do
{
fetchedJob = connection
.Query<FetchedJob>(
fetchJobSqlTemplate,
new { queues = queues, timeout = _options.SlidingInvisibilityTimeout.Value.Negate().TotalSeconds })
.SingleOrDefault();
});
cancellationToken.ThrowIfCancellationRequested();

if (fetchedJob != null)
{
return new SqlServerTimeoutJob(
_storage,
fetchedJob.Id,
fetchedJob.JobId.ToString(CultureInfo.InvariantCulture),
fetchedJob.Queue);
}

WaitHandle.WaitAny(new[] { cancellationToken.WaitHandle, NewItemInQueueEvent }, _options.QueuePollInterval);
cancellationToken.ThrowIfCancellationRequested();
} while (true);
_storage.UseConnection(null, connection =>
{
fetchedJob = connection
.Query<FetchedJob>(
fetchJobSqlTemplate,
new { queues = queues, timeout = _options.SlidingInvisibilityTimeout.Value.Negate().TotalSeconds })
.SingleOrDefault();
});

if (fetchedJob != null)
{
return new SqlServerTimeoutJob(
_storage,
fetchedJob.Id,
fetchedJob.JobId.ToString(CultureInfo.InvariantCulture),
fetchedJob.Queue);
}

WaitHandle.WaitAny(new WaitHandle[] { cancellationEvent.WaitHandle, NewItemInQueueEvent }, _options.QueuePollInterval);
cancellationToken.ThrowIfCancellationRequested();
} while (true);
}
}

private SqlServerTransactionJob DequeueUsingTransaction(string[] queues, CancellationToken cancellationToken)
Expand All @@ -133,47 +137,50 @@ private SqlServerTransactionJob DequeueUsingTransaction(string[] queues, Cancell
from [{_storage.SchemaName}].JobQueue JQ with (readpast, updlock, rowlock, forceseek)
where Queue in @queues and (FetchedAt is null or FetchedAt < DATEADD(second, @timeout, GETUTCDATE()))";

do
using (var cancellationEvent = cancellationToken.GetCancellationEvent())
{
cancellationToken.ThrowIfCancellationRequested();
var connection = _storage.CreateAndOpenConnection();

try
do
{
transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
cancellationToken.ThrowIfCancellationRequested();
var connection = _storage.CreateAndOpenConnection();

try
{
transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);

fetchedJob = connection.Query<FetchedJob>(
fetchJobSqlTemplate,
fetchedJob = connection.Query<FetchedJob>(
fetchJobSqlTemplate,
#pragma warning disable 618
new { queues = queues, timeout = _options.InvisibilityTimeout.Negate().TotalSeconds },
#pragma warning restore 618
transaction,
commandTimeout: _storage.CommandTimeout).SingleOrDefault();

if (fetchedJob != null)
{
return new SqlServerTransactionJob(
_storage,
connection,
transaction,
fetchedJob.JobId.ToString(CultureInfo.InvariantCulture),
fetchedJob.Queue);
commandTimeout: _storage.CommandTimeout).SingleOrDefault();

if (fetchedJob != null)
{
return new SqlServerTransactionJob(
_storage,
connection,
transaction,
fetchedJob.JobId.ToString(CultureInfo.InvariantCulture),
fetchedJob.Queue);
}
}
}
finally
{
if (fetchedJob == null)
finally
{
transaction?.Dispose();
transaction = null;
if (fetchedJob == null)
{
transaction?.Dispose();
transaction = null;

_storage.ReleaseConnection(connection);
_storage.ReleaseConnection(connection);
}
}
}

WaitHandle.WaitAny(new[] { cancellationToken.WaitHandle, NewItemInQueueEvent }, _options.QueuePollInterval);
cancellationToken.ThrowIfCancellationRequested();
} while (true);
WaitHandle.WaitAny(new WaitHandle[] { cancellationEvent.WaitHandle, NewItemInQueueEvent }, _options.QueuePollInterval);
cancellationToken.ThrowIfCancellationRequested();
} while (true);
}
}

[UsedImplicitly(ImplicitUseTargetFlags.WithMembers)]
Expand Down
Loading

0 comments on commit 140b92f

Please sign in to comment.