Skip to content

Commit

Permalink
Perf/background task scheduler (#6655)
Browse files Browse the repository at this point in the history
  • Loading branch information
asdacap authored Feb 8, 2024
1 parent 2c6a789 commit 956c49f
Show file tree
Hide file tree
Showing 39 changed files with 598 additions and 135 deletions.
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Api/IApiWithBlockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Nethermind.Consensus.Processing;
using Nethermind.Consensus.Producers;
using Nethermind.Consensus.Rewards;
using Nethermind.Consensus.Scheduler;
using Nethermind.Consensus.Validators;
using Nethermind.Core;
using Nethermind.Evm.TransactionProcessing;
Expand Down Expand Up @@ -94,5 +95,6 @@ public interface IApiWithBlockchain : IApiWithStores, IBlockchainBridgeFactory
CompositePruningTrigger PruningTrigger { get; }

IBlockProductionPolicy? BlockProductionPolicy { get; set; }
BackgroundTaskScheduler BackgroundTaskScheduler { get; set; }
}
}
3 changes: 3 additions & 0 deletions src/Nethermind/Nethermind.Api/IInitConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public interface IInitConfig : IConfig

[ConfigItem(Description = "[TECHNICAL] Exit when block number is reached. Useful for scripting and testing.", DefaultValue = "null", HiddenFromDocs = true)]
long? ExitOnBlockNumber { get; set; }

[ConfigItem(Description = "[TECHNICAL] Specify concurrency limit for background task.", DefaultValue = "1", HiddenFromDocs = true)]
int BackgroundTaskConcurrency { get; set; }
}

public enum DiagnosticMode
Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Api/InitConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class InitConfig : IInitConfig
public bool DisableGcOnNewPayload { get; set; } = true;
public bool DisableMallocOpts { get; set; } = false;
public long? ExitOnBlockNumber { get; set; } = null;
public int BackgroundTaskConcurrency { get; set; } = 1;

[Obsolete("Use DiagnosticMode with MemDb instead")]
public bool UseMemDb
Expand Down
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Api/NethermindApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using Nethermind.Consensus.Processing;
using Nethermind.Consensus.Producers;
using Nethermind.Consensus.Rewards;
using Nethermind.Consensus.Scheduler;
using Nethermind.Consensus.Validators;
using Nethermind.Core;
using Nethermind.Core.Authentication;
Expand Down Expand Up @@ -209,6 +210,7 @@ public ISealEngine SealEngine

public IEthSyncingInfo? EthSyncingInfo { get; set; }
public IBlockProductionPolicy? BlockProductionPolicy { get; set; }
public BackgroundTaskScheduler BackgroundTaskScheduler { get; set; } = null!;
public IWallet? Wallet { get; set; }
public IBlockStore? BadBlocksStore { get; set; }
public ITransactionComparerProvider? TransactionComparerProvider { get; set; }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Nethermind.Consensus.Processing;
using Nethermind.Consensus.Scheduler;
using Nethermind.Core.Extensions;
using Nethermind.Logging;
using NSubstitute;
using NUnit.Framework;
using TaskCompletionSource = DotNetty.Common.Concurrency.TaskCompletionSource;

namespace Nethermind.Consensus.Test.Scheduler;

public class BackgroundTaskSchedulerTests
{
private IBlockProcessor _blockProcessor;

[SetUp]
public void Setup()
{
_blockProcessor = Substitute.For<IBlockProcessor>();
}

[Test]
public async Task Test_task_will_execute()
{
TaskCompletionSource tcs = new TaskCompletionSource();
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 1, LimboLogs.Instance);

scheduler.ScheduleTask(1, (_, token) =>
{
tcs.SetResult(1);
return Task.CompletedTask;
});

await tcs.Task;
}

[Test]
public async Task Test_task_will_execute_concurrently_when_configured_so()
{
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 2, LimboLogs.Instance);

int counter = 0;

ManualResetEvent waitSignal = new ManualResetEvent(false);
scheduler.ScheduleTask(1, async (_, token) =>
{
counter++;
await waitSignal.WaitOneAsync(token);
counter--;
});
scheduler.ScheduleTask(1, async (_, token) =>
{
counter++;
await waitSignal.WaitOneAsync(token);
counter--;
});

Assert.That(() => counter, Is.EqualTo(2).After(10, 1));
waitSignal.Set();
}

[Test]
public async Task Test_task_will_cancel_on_block_processing()
{
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 2, LimboLogs.Instance);

bool wasCancelled = false;

ManualResetEvent waitSignal = new ManualResetEvent(false);
scheduler.ScheduleTask(1, async (_, token) =>
{
waitSignal.Set();
try
{
await Task.Delay(100000, token);
}
catch (OperationCanceledException)
{
wasCancelled = true;
}
});

await waitSignal.WaitOneAsync(CancellationToken.None);
_blockProcessor.BlocksProcessing += Raise.EventWith(new BlocksProcessingEventArgs(null));
Assert.That(() => wasCancelled, Is.EqualTo(true).After(10, 1));
}

[Test]
public async Task Test_task_that_is_scheduled_during_block_processing_will_continue_after()
{
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 2, LimboLogs.Instance);
_blockProcessor.BlocksProcessing += Raise.EventWith(new BlocksProcessingEventArgs(null));

int executionCount = 0;
for (int i = 0; i < 5; i++)
{
scheduler.ScheduleTask(1, (_, token) =>
{
executionCount++;
return Task.CompletedTask;
});
}

await Task.Delay(10);
executionCount.Should().Be(0);

_blockProcessor.BlockProcessed += Raise.EventWith(new BlockProcessedEventArgs(null, null));
Assert.That(() => executionCount, Is.EqualTo(5).After(10, 1));
}

[Test]
public async Task Test_task_that_is_scheduled_during_block_processing_but_deadlined_will_get_called_and_cancelled()
{
await using BackgroundTaskScheduler scheduler = new BackgroundTaskScheduler(_blockProcessor, 2, LimboLogs.Instance);
_blockProcessor.BlocksProcessing += Raise.EventWith(new BlocksProcessingEventArgs(null));

bool wasCancelled = false;
ManualResetEvent waitSignal = new ManualResetEvent(false);
scheduler.ScheduleTask(1, (_, token) =>
{
wasCancelled = token.IsCancellationRequested;
waitSignal.Set();
return Task.CompletedTask;
}, TimeSpan.FromMilliseconds(1));

(await waitSignal.WaitOneAsync(CancellationToken.None)).Should().BeTrue();

wasCancelled.Should().BeTrue();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Nethermind.Consensus.Processing;
using Nethermind.Core.Extensions;
using Nethermind.Logging;

namespace Nethermind.Consensus.Scheduler;

/// <summary>
/// Provide a way to orchestrate task to run in background.
/// - Task will be run in a separate thread.. well it depends on the threadpool, but there is a concurrency limit.
/// - Task closure will have CancellationToken which will be cancelled if block processing happens while the task is running.
/// - Task have a default timeout, which is counted from the time it is queued. If timedout because too many other background
/// task before it for example, the cancellation token passed to it will be cancelled.
/// - Task will still run when block processing is happening and its timedout this is so that it can handle its cancellation.
/// - Task will not run if block processing is happening and it still have some time left.
/// It is up to the task to determine what happen if cancelled, maybe it will reschedule for later, or resume later, but
/// preferably, stop execution immediately. Don't hang BTW. Other background task need to cancel too.
///
/// Note: Yes, I know there is a built in TaskScheduler that can do some magical stuff that stop execution on async
/// and stuff, but that is complicated and I don't wanna explain why you need `async Task.Yield()` in the middle of a loop,
/// or explicitly specify it to run on this task scheduler and such. Maybe some other time ok?
/// </summary>
public class BackgroundTaskScheduler : IBackgroundTaskScheduler, IAsyncDisposable
{
private static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(2);

private readonly CancellationTokenSource _mainCancellationTokenSource;
private CancellationTokenSource _blockProcessorCancellationTokenSource;
private readonly Channel<IActivity> _taskQueue;
private readonly ILogger _logger;
private readonly IBlockProcessor _blockProcessor;
private readonly ManualResetEvent _restartQueueSignal;
private readonly Task<Task>[] _tasksExecutors;

public BackgroundTaskScheduler(IBlockProcessor blockProcessor, int concurrency, ILogManager logManager)
{
if (concurrency < 1) throw new ArgumentException("concurrency must be at least 1");

_mainCancellationTokenSource = new CancellationTokenSource();
_blockProcessorCancellationTokenSource = new CancellationTokenSource();
_taskQueue = Channel.CreateUnbounded<IActivity>();
_logger = logManager.GetClassLogger();
_blockProcessor = blockProcessor;
_restartQueueSignal = new ManualResetEvent(true);

_blockProcessor.BlocksProcessing += BlockProcessorOnBlocksProcessing;
_blockProcessor.BlockProcessed += BlockProcessorOnBlockProcessed;

_tasksExecutors = Enumerable.Range(0, concurrency).Select(_ => Task.Factory.StartNew(StartChannel)).ToArray();
}

private void BlockProcessorOnBlocksProcessing(object? sender, BlocksProcessingEventArgs e)
{
// On block processing, we cancel the block process cts, causing current task to get cancelled.
_blockProcessorCancellationTokenSource.Cancel();
// We also reset queue signal, causing it to wait
_restartQueueSignal.Reset();
}

private void BlockProcessorOnBlockProcessed(object? sender, BlockProcessedEventArgs e)
{
// Once block is processed, we replace it with the
CancellationTokenSource oldTokenSource = Interlocked.Exchange(ref _blockProcessorCancellationTokenSource, new CancellationTokenSource());
oldTokenSource.Dispose();
// We also set queue signal causing it to continue queue.
_restartQueueSignal.Set();
}


private async Task StartChannel()
{
await foreach (IActivity activity in _taskQueue.Reader.ReadAllAsync(_mainCancellationTokenSource.Token))
{
try
{
if (_blockProcessorCancellationTokenSource.IsCancellationRequested)
{
// In case of task that is suppose to run when a block is being processed, if there is some time left
// from its deadline, we re-queue it. We do this in case there are some task in the queue that already
// reached deadline during block processing in which case, it will need to execute in order to handle
// its cancellation.
if (DateTimeOffset.Now < activity.Deadline)
{
await _taskQueue.Writer.WriteAsync(activity, _mainCancellationTokenSource.Token);
// Throttle deque to prevent infinite loop.
await _restartQueueSignal.WaitOneAsync(TimeSpan.FromMilliseconds(1), _mainCancellationTokenSource.Token);
continue;
}
}

using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(
_blockProcessorCancellationTokenSource.Token,
_mainCancellationTokenSource.Token
);
await activity.Do(cts.Token);
}
catch (OperationCanceledException)
{
}
catch (Exception e)
{
if (_logger.IsDebug) _logger.Debug($"Error processing background task {e}.");
}
}
}

public void ScheduleTask<TReq>(TReq request, Func<TReq, CancellationToken, Task> fulfillFunc, TimeSpan? timeout = null)
{
timeout ??= DefaultTimeout;
DateTimeOffset deadline = DateTimeOffset.Now + timeout.Value;

IActivity activity = new Activity<TReq>()
{
Deadline = deadline,
Request = request,
FulfillFunc = fulfillFunc,
};

if (!_taskQueue.Writer.TryWrite(activity))
{
// This should never happen unless something goes very wrong.
throw new InvalidOperationException("Unable to write to background task queue.");
}
}

public async ValueTask DisposeAsync()
{
_blockProcessor.BlocksProcessing -= BlockProcessorOnBlocksProcessing;
_blockProcessor.BlockProcessed -= BlockProcessorOnBlockProcessed;

_taskQueue.Writer.Complete();
await _mainCancellationTokenSource.CancelAsync();
await Task.WhenAll(_tasksExecutors);
}

private readonly struct Activity<TReq> : IActivity
{
public DateTimeOffset Deadline { get; init; }
public TReq Request { get; init; }
public Func<TReq, CancellationToken, Task> FulfillFunc { get; init; }

public async Task Do(CancellationToken cancellationToken)
{
using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
DateTimeOffset now = DateTimeOffset.Now;
TimeSpan timeToComplete = Deadline - now;
if (timeToComplete <= TimeSpan.Zero)
{
// Cancel immediately. Got no time left.
await cts.CancelAsync();
}
else
{
cts.CancelAfter(timeToComplete);
}

await FulfillFunc.Invoke(Request, cts.Token);
}
}

private interface IActivity
{
DateTimeOffset Deadline { get; }
Task Do(CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Nethermind.Consensus.Scheduler;

public interface IBackgroundTaskScheduler
{
void ScheduleTask<TReq>(TReq request, Func<TReq, CancellationToken, Task> fulfillFunc, TimeSpan? timeout = null);
}
23 changes: 23 additions & 0 deletions src/Nethermind/Nethermind.Core.Test/RunImmediatelyScheduler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading;
using System.Threading.Tasks;
using Nethermind.Consensus.Scheduler;

namespace Nethermind.Core.Test;

public class RunImmediatelyScheduler : IBackgroundTaskScheduler
{
public static RunImmediatelyScheduler Instance = new RunImmediatelyScheduler();

private RunImmediatelyScheduler()
{
}

public void ScheduleTask<TReq>(TReq request, Func<TReq, CancellationToken, Task> fulfillFunc, TimeSpan? timeout = null)
{
fulfillFunc(request, CancellationToken.None);
}
}
Loading

0 comments on commit 956c49f

Please sign in to comment.