Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf/background task scheduler #6655

Merged
merged 14 commits into from
Feb 8, 2024
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Api/IApiWithBlockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Nethermind.Consensus.Processing;
using Nethermind.Consensus.Producers;
using Nethermind.Consensus.Rewards;
using Nethermind.Consensus.Scheduler;
using Nethermind.Consensus.Validators;
using Nethermind.Core;
using Nethermind.Evm.TransactionProcessing;
Expand Down Expand Up @@ -94,5 +95,6 @@ public interface IApiWithBlockchain : IApiWithStores, IBlockchainBridgeFactory
CompositePruningTrigger PruningTrigger { get; }

IBlockProductionPolicy? BlockProductionPolicy { get; set; }
BackgroundTaskScheduler BackgroundTaskScheduler { get; set; }
}
}
3 changes: 3 additions & 0 deletions src/Nethermind/Nethermind.Api/IInitConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public interface IInitConfig : IConfig

[ConfigItem(Description = "[TECHNICAL] Exit when block number is reached. Useful for scripting and testing.", DefaultValue = "null", HiddenFromDocs = true)]
long? ExitOnBlockNumber { get; set; }

[ConfigItem(Description = "[TECHNICAL] Specify concurrency limit for background task.", DefaultValue = "1", HiddenFromDocs = true)]
int BackgroundTaskConcurrency { get; set; }
}

public enum DiagnosticMode
Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Api/InitConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class InitConfig : IInitConfig
public bool DisableGcOnNewPayload { get; set; } = true;
public bool DisableMallocOpts { get; set; } = false;
public long? ExitOnBlockNumber { get; set; } = null;
public int BackgroundTaskConcurrency { get; set; } = 1;

[Obsolete("Use DiagnosticMode with MemDb instead")]
public bool UseMemDb
Expand Down
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Api/NethermindApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using Nethermind.Consensus.Processing;
using Nethermind.Consensus.Producers;
using Nethermind.Consensus.Rewards;
using Nethermind.Consensus.Scheduler;
using Nethermind.Consensus.Validators;
using Nethermind.Core;
using Nethermind.Core.Authentication;
Expand Down Expand Up @@ -209,6 +210,7 @@ public ISealEngine SealEngine

public IEthSyncingInfo? EthSyncingInfo { get; set; }
public IBlockProductionPolicy? BlockProductionPolicy { get; set; }
public BackgroundTaskScheduler BackgroundTaskScheduler { get; set; } = null!;
public IWallet? Wallet { get; set; }
public IBlockStore? BadBlocksStore { get; set; }
public ITransactionComparerProvider? TransactionComparerProvider { get; set; }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Nethermind.Consensus.Processing;
using Nethermind.Consensus.Scheduler;
using Nethermind.Core.Extensions;
using Nethermind.Logging;
using NSubstitute;
using NUnit.Framework;
using TaskCompletionSource = DotNetty.Common.Concurrency.TaskCompletionSource;

namespace Nethermind.Consensus.Test.Scheduler;

public class BackgroundTaskSchedulerTests
{
private IBlockProcessor _blockProcessor;

[SetUp]
public void Setup()
{
_blockProcessor = Substitute.For<IBlockProcessor>();
}

[Test]
public async Task Test_task_will_execute()
{
TaskCompletionSource tcs = new TaskCompletionSource();
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 1, LimboLogs.Instance);

scheduler.ScheduleTask(1, (_, token) =>
{
tcs.SetResult(1);
return Task.CompletedTask;
});

await tcs.Task;
}

[Test]
public async Task Test_task_will_execute_concurrently_when_configured_so()
{
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 2, LimboLogs.Instance);

int counter = 0;

ManualResetEvent waitSignal = new ManualResetEvent(false);
scheduler.ScheduleTask(1, async (_, token) =>
{
counter++;
await waitSignal.WaitOneAsync(token);
counter--;
});
scheduler.ScheduleTask(1, async (_, token) =>
{
counter++;
await waitSignal.WaitOneAsync(token);
counter--;
});

Assert.That(() => counter, Is.EqualTo(2).After(10, 1));
waitSignal.Set();
}

[Test]
public async Task Test_task_will_cancel_on_block_processing()
{
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 2, LimboLogs.Instance);

bool wasCancelled = false;

ManualResetEvent waitSignal = new ManualResetEvent(false);
scheduler.ScheduleTask(1, async (_, token) =>
{
waitSignal.Set();
try
{
await Task.Delay(100000, token);
}
catch (OperationCanceledException)
{
wasCancelled = true;
}
});

await waitSignal.WaitOneAsync(CancellationToken.None);
_blockProcessor.BlocksProcessing += Raise.EventWith(new BlocksProcessingEventArgs(null));
Assert.That(() => wasCancelled, Is.EqualTo(true).After(10, 1));
}

[Test]
public async Task Test_task_that_is_scheduled_during_block_processing_will_continue_after()
{
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 2, LimboLogs.Instance);
_blockProcessor.BlocksProcessing += Raise.EventWith(new BlocksProcessingEventArgs(null));

int executionCount = 0;
for (int i = 0; i < 5; i++)
{
scheduler.ScheduleTask(1, (_, token) =>
{
executionCount++;
return Task.CompletedTask;
});
}

await Task.Delay(10);
executionCount.Should().Be(0);

_blockProcessor.BlockProcessed += Raise.EventWith(new BlockProcessedEventArgs(null, null));
Assert.That(() => executionCount, Is.EqualTo(5).After(10, 1));
}

[Test]
public async Task Test_task_that_is_scheduled_during_block_processing_but_deadlined_will_get_called_and_cancelled()
{
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 2, LimboLogs.Instance);
_blockProcessor.BlocksProcessing += Raise.EventWith(new BlocksProcessingEventArgs(null));

bool wasCancelled = false;
ManualResetEvent waitSignal = new ManualResetEvent(false);
scheduler.ScheduleTask(1, (_, token) =>
{
wasCancelled = token.IsCancellationRequested;
waitSignal.Set();
return Task.CompletedTask;
}, TimeSpan.FromMilliseconds(1));

(await waitSignal.WaitOneAsync(CancellationToken.None)).Should().BeTrue();

wasCancelled.Should().BeTrue();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Nethermind.Consensus.Processing;
using Nethermind.Core.Extensions;
using Nethermind.Logging;

namespace Nethermind.Consensus.Scheduler;

/// <summary>
/// Provide a way to orchestrate task to run in background.
/// - Task will be run in a separate thread.. well it depends on the threadpool, but there is a concurrency limit.
/// - Task closure will have CancellationToken which will be cancelled if block processing happens while the task is running.
/// - Task have a default timeout, which is counted from the time it is queued. If timedout because too many other background
/// task before it for example, the cancellation token passed to it will be cancelled.
/// - Task will still run when block processing is happening and its timedout this is so that it can handle its cancellation.
/// - Task will not run if block processing is happening and it still have some time left.
/// It is up to the task to determine what happen if cancelled, maybe it will reschedule for later, or resume later, but
/// preferably, stop execution immediately. Don't hang BTW. Other background task need to cancel too.
///
/// Note: Yes, I know there is a built in TaskScheduler that can do some magical stuff that stop execution on async
/// and stuff, but that is complicated and I don't wanna explain why you need `async Task.Yield()` in the middle of a loop,
/// or explicitly specify it to run on this task scheduler and such. Maybe some other time ok?
/// </summary>
public class BackgroundTaskScheduler : IBackgroundTaskScheduler, IAsyncDisposable
{
private static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(2);

private readonly CancellationTokenSource _mainCancellationTokenSource;
private CancellationTokenSource _blockProcessorCancellationTokenSource;
private readonly Channel<IActivity> _taskQueue;
private readonly ILogger _logger;
private readonly IBlockProcessor _blockProcessor;
private readonly ManualResetEvent _restartQueueSignal;

public BackgroundTaskScheduler(IBlockProcessor blockProcessor, int concurrency, ILogManager logManager)
{
if (concurrency < 1) throw new ArgumentException("concurrency must be at least 1");

_mainCancellationTokenSource = new CancellationTokenSource();
_blockProcessorCancellationTokenSource = new CancellationTokenSource();
_taskQueue = Channel.CreateUnbounded<IActivity>();
_logger = logManager.GetClassLogger();
_blockProcessor = blockProcessor;
_restartQueueSignal = new ManualResetEvent(true);

_blockProcessor.BlocksProcessing += BlockProcessorOnBlocksProcessing;
_blockProcessor.BlockProcessed += BlockProcessorOnBlockProcessed;

for (int i = 0; i < concurrency; i++)
{
Task.Factory.StartNew(StartChannel, TaskCreationOptions.LongRunning);
benaadams marked this conversation as resolved.
Show resolved Hide resolved
}
}

private void BlockProcessorOnBlocksProcessing(object? sender, BlocksProcessingEventArgs e)
{
// On block processing, we cancel the block process cts, causing current task to get cancelled.
_blockProcessorCancellationTokenSource.Cancel();
// We also reset queue signal, causing it to wait
_restartQueueSignal.Reset();
}

private void BlockProcessorOnBlockProcessed(object? sender, BlockProcessedEventArgs e)
{
// Once block is processed, we replace it with the
CancellationTokenSource oldTokenSource = Interlocked.Exchange(ref _blockProcessorCancellationTokenSource, new CancellationTokenSource());
oldTokenSource.Dispose();
// We also set queue signal causing it to continue queue.
_restartQueueSignal.Set();
}


private async Task StartChannel()
{
await foreach (IActivity activity in _taskQueue.Reader.ReadAllAsync(_mainCancellationTokenSource.Token))
{
if (_blockProcessorCancellationTokenSource.IsCancellationRequested)
{
// In case of task that is suppose to run when a block is being processed, if there is some time left
// from its deadline, we re-queue it. We do this in case there are some task in the queue that already
// reached deadline during block processing in which case, it will need to execute in order to handle
// its cancellation.
if (DateTimeOffset.Now < activity.Deadline)
{
await _taskQueue.Writer.WriteAsync(activity, _mainCancellationTokenSource.Token);
// Throttle deque to prevent infinite loop.
await _restartQueueSignal.WaitOneAsync(TimeSpan.FromMilliseconds(1), _mainCancellationTokenSource.Token);
asdacap marked this conversation as resolved.
Show resolved Hide resolved
continue;
}
}

try
{
using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(
_blockProcessorCancellationTokenSource.Token,
_mainCancellationTokenSource.Token
);
await activity.Do(cts.Token);
}
catch (OperationCanceledException)
{
}
catch (Exception e)
{
if (_logger.IsDebug) _logger.Debug($"Error processing background task {e}.");
}
}
}

public void ScheduleTask<TReq>(TReq request, Func<TReq, CancellationToken, Task> fulfillFunc, TimeSpan? timeout = null)
{
timeout ??= DefaultTimeout;
DateTimeOffset deadline = DateTimeOffset.Now + timeout.Value;

IActivity activity = new Activity<TReq>()
{
Deadline = deadline,
Request = request,
FulfillFunc = fulfillFunc,
};

if (!_taskQueue.Writer.TryWrite(activity))
{
// This should never happen unless something goes very wrong.
throw new InvalidOperationException("Unable to write to background task queue.");
}
}

public async ValueTask DisposeAsync()
{
_blockProcessor.BlocksProcessing += BlockProcessorOnBlocksProcessing;
_blockProcessor.BlockProcessed += BlockProcessorOnBlockProcessed;

_taskQueue.Writer.Complete();
await _mainCancellationTokenSource.CancelAsync();
}

private readonly struct Activity<TReq> : IActivity
{
public DateTimeOffset Deadline { get; init; }
public TReq Request { get; init; }
public Func<TReq, CancellationToken, Task> FulfillFunc { get; init; }

public async Task Do(CancellationToken cancellationToken)
{
using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
DateTimeOffset now = DateTimeOffset.Now;
TimeSpan timeToComplete = Deadline - now;
if (timeToComplete <= TimeSpan.Zero)
{
// Cancel immediately. Got no time left.
await cts.CancelAsync();
}
else
{
cts.CancelAfter(timeToComplete);
}

await FulfillFunc.Invoke(Request, cts.Token);
}
}

private interface IActivity
{
DateTimeOffset Deadline { get; }
Task Do(CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Nethermind.Consensus.Scheduler;

public interface IBackgroundTaskScheduler
{
void ScheduleTask<TReq>(TReq request, Func<TReq, CancellationToken, Task> fulfillFunc, TimeSpan? timeout = null);
}
23 changes: 23 additions & 0 deletions src/Nethermind/Nethermind.Core.Test/RunImmediatelyScheduler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading;
using System.Threading.Tasks;
using Nethermind.Consensus.Scheduler;

namespace Nethermind.Core.Test;

public class RunImmediatelyScheduler : IBackgroundTaskScheduler
{
public static RunImmediatelyScheduler Instance = new RunImmediatelyScheduler();

private RunImmediatelyScheduler()
{
}

public void ScheduleTask<TReq>(TReq request, Func<TReq, CancellationToken, Task> fulfillFunc, TimeSpan? timeout = null)
{
fulfillFunc(request, CancellationToken.None);
}
}
Loading