Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf/faster full tree visit #7692

Merged
merged 11 commits into from
Oct 31, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using FluentAssertions;
using Nethermind.Core.Threading;
using NUnit.Framework;

namespace Nethermind.Core.Test.Threading;

public class ConcurrencyControllerTests
{
[Test]
public void ThreadLimiterWillLimit()
{
ConcurrencyController.Slot returner;
ConcurrencyController limiter = new ConcurrencyController(3);

limiter.TryTakeSlot(out returner).Should().Be(true);
limiter.TryTakeSlot(out returner).Should().Be(true);
limiter.TryTakeSlot(out returner).Should().Be(false);

returner.Dispose();

limiter.TryTakeSlot(out returner).Should().Be(true);
limiter.TryTakeSlot(out returner).Should().Be(false);
}
}
52 changes: 52 additions & 0 deletions src/Nethermind/Nethermind.Core/Threading/ConcurrencyController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading;

namespace Nethermind.Core.Threading;

/// <summary>
/// Encapsulate the pattern of checking if new task can be spawned based on a predefined limit.
/// Used in multithreaded tree visit where we don't know if we can spawn task or not and spawning task itself
/// is not a cheap operation.
///
/// Yes, I don't like the name. Give me a good one.
/// </summary>
/// <param name="concurrency">Desired concurrency which include the calling thread. So slot is slot-1.</param>
public class ConcurrencyController(int concurrency)
{
private int _slots = concurrency;

public bool TryTakeSlot(out Slot returner)
{
returner = new Slot(this);
int newSlot = Volatile.Read(ref _slots);
if (newSlot < 2)
{
return false;
}

newSlot = Interlocked.Decrement(ref _slots);
if (newSlot < 1)
{
Interlocked.Increment(ref _slots);
return false;
}

return true;
}

private void ReturnSlot()
{
Interlocked.Increment(ref _slots);
}

public readonly struct Slot(ConcurrencyController limiter) : IDisposable
{
public void Dispose()
{
limiter.ReturnSlot();
}
}
}
67 changes: 37 additions & 30 deletions src/Nethermind/Nethermind.Trie/TrieNode.Visitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@

using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Nethermind.Core;
using Nethermind.Core.Collections;
using Nethermind.Core.Cpu;
using Nethermind.Core.Crypto;
using Nethermind.Core.Threading;
using Nethermind.Serialization.Rlp;
using Nethermind.Trie.Pruning;

Expand Down Expand Up @@ -187,31 +190,47 @@ void VisitSingleThread(ref TreePath parentPath, ITreeVisitor<TNodeContext> treeV
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
void VisitMultiThread(TreePath parentPath, ITreeVisitor<TNodeContext> treeVisitor, in TNodeContext nodeContext, ITrieNodeResolver trieNodeResolver, TrieVisitContext visitContext, TrieNode?[] children)
void VisitMultiThread(TreePath parentPath, ITreeVisitor<TNodeContext> treeVisitor, in TNodeContext nodeContext, ITrieNodeResolver trieNodeResolver, TrieVisitContext visitContext)
{
var copy = nodeContext;
// we need to preallocate children
TNodeContext contextCopy = nodeContext;

// multithreaded route
Parallel.For(0, BranchesCount, RuntimeInformation.ParallelOptionsPhysicalCores, i =>
ArrayPoolList<Task>? tasks = null;
for (int i = 0; i < BranchesCount; i++)
{
visitContext.Semaphore.Wait();
try
if (i < BranchesCount - 1 && visitContext.ConcurrencyController.TryTakeSlot(out ConcurrencyController.Slot returner))
{
TreePath closureParentPath = parentPath;
// we need to have separate context for each thread as context tracks level and branch child index
TrieVisitContext childContext = visitContext.Clone();
VisitChild(ref closureParentPath, i, children[i], trieNodeResolver, treeVisitor, copy, childContext);
tasks ??= new ArrayPoolList<Task>(BranchesCount);
tasks.Add(SpawnChildVisit(parentPath, i, GetChild(nodeResolver, ref parentPath, i), returner));
}
finally
else
{
visitContext.Semaphore.Release();
VisitChild(ref parentPath, i, GetChild(nodeResolver, ref parentPath, i), trieNodeResolver, treeVisitor, contextCopy, visitContext);
}
});
}

if (tasks is { Count: > 0 })
{
Task.WaitAll(tasks.ToArray());
tasks.Dispose();
}
return;

Task SpawnChildVisit(TreePath closureParentPath, int i, TrieNode? childNode, ConcurrencyController.Slot slotReturner) =>
Task.Run(() =>
{
using ConcurrencyController.Slot _ = slotReturner;

// we need to have separate context for each thread as context tracks level and branch child index
TrieVisitContext childContext = visitContext.Clone();
VisitChild(ref closureParentPath, i, childNode, trieNodeResolver, treeVisitor,
contextCopy, childContext);
});
}

static void VisitAllSingleThread(TrieNode currentNode, ref TreePath path, ITreeVisitor<TNodeContext> visitor, TNodeContext nodeContext, ITrieNodeResolver nodeResolver, TrieVisitContext visitContext)
{
TrieNode?[] output = new TrieNode?[16];
TrieNode?[] output = new TrieNode?[BranchesCount];
currentNode.ResolveAllChildBranch(nodeResolver, ref path, output);
path.AppendMut(0);
for (int i = 0; i < 16; i++)
Expand All @@ -234,23 +253,11 @@ static void VisitAllSingleThread(TrieNode currentNode, ref TreePath path, ITreeV
trieVisitContext.AddVisited();
trieVisitContext.Level++;

if (trieVisitContext.MaxDegreeOfParallelism != 1 && trieVisitContext.Semaphore.CurrentCount > 1)
// Limiting the multithread path to top state tree and first level storage double the throughput on mainnet.
// Top level state split to 16^3 while storage is 16, which should be ok for large contract in most case.
if (trieVisitContext.MaxDegreeOfParallelism != 1 && (trieVisitContext.IsStorage ? path.Length == 0 : path.Length <= 2))
{
// we need to preallocate children
TrieNode?[] children = new TrieNode?[BranchesCount];
for (int i = 0; i < BranchesCount; i++)
{
children[i] = GetChild(nodeResolver, ref path, i);
}

if (trieVisitContext.Semaphore.CurrentCount > 1)
{
VisitMultiThread(path, visitor, nodeContext, nodeResolver, trieVisitContext, children);
}
else
{
VisitSingleThread(ref path, visitor, nodeContext, nodeResolver, trieVisitContext);
}
VisitMultiThread(path, visitor, nodeContext, nodeResolver, trieVisitContext);
}
else
{
Expand Down
22 changes: 7 additions & 15 deletions src/Nethermind/Nethermind.Trie/VisitContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@
using System;
using System.Runtime.InteropServices;
using System.Threading;
using Nethermind.Core.Threading;

namespace Nethermind.Trie
{
public class TrieVisitContext : IDisposable
{
private SemaphoreSlim? _semaphore;
private readonly int _maxDegreeOfParallelism = 1;
private int _visitedNodes;

private ConcurrencyController? _threadLimiter = null;
public ConcurrencyController ConcurrencyController => _threadLimiter ??= new ConcurrencyController(MaxDegreeOfParallelism);

public int Level { get; internal set; }
public bool IsStorage { get; set; }
public int? BranchChildIndex { get; internal set; }
Expand All @@ -22,28 +25,17 @@ public class TrieVisitContext : IDisposable
public int MaxDegreeOfParallelism
{
get => _maxDegreeOfParallelism;
internal init => _maxDegreeOfParallelism = VisitingOptions.AdjustMaxDegreeOfParallelism(value);
}

public SemaphoreSlim Semaphore
{
get
internal init
{
if (_semaphore is null)
{
if (MaxDegreeOfParallelism == 1) throw new InvalidOperationException("Can not create semaphore for single threaded trie visitor.");
_semaphore = new SemaphoreSlim(MaxDegreeOfParallelism, MaxDegreeOfParallelism);
}

return _semaphore;
_maxDegreeOfParallelism = VisitingOptions.AdjustMaxDegreeOfParallelism(value);
asdacap marked this conversation as resolved.
Show resolved Hide resolved
_threadLimiter = null;
}
}

public TrieVisitContext Clone() => (TrieVisitContext)MemberwiseClone();

public void Dispose()
{
_semaphore?.Dispose();
}

public void AddVisited()
Expand Down