Skip to content

Commit

Permalink
Perf/faster full tree visit (#7692)
Browse files Browse the repository at this point in the history
Co-authored-by: Lukasz Rozmej <[email protected]>
Co-authored-by: Szymon Kulec <[email protected]>
  • Loading branch information
3 people authored Oct 31, 2024
1 parent 65055f1 commit 159c2a4
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using FluentAssertions;
using Nethermind.Core.Threading;
using NUnit.Framework;

namespace Nethermind.Core.Test.Threading;

public class ConcurrencyControllerTests
{
[Test]
public void ThreadLimiterWillLimit()
{
ConcurrencyController.Slot returner;
ConcurrencyController limiter = new ConcurrencyController(3);

limiter.TryTakeSlot(out returner).Should().Be(true);
limiter.TryTakeSlot(out returner).Should().Be(true);
limiter.TryTakeSlot(out returner).Should().Be(false);

returner.Dispose();

limiter.TryTakeSlot(out returner).Should().Be(true);
limiter.TryTakeSlot(out returner).Should().Be(false);
}
}
52 changes: 52 additions & 0 deletions src/Nethermind/Nethermind.Core/Threading/ConcurrencyController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading;

namespace Nethermind.Core.Threading;

/// <summary>
/// Encapsulate the pattern of checking if new task can be spawned based on a predefined limit.
/// Used in multithreaded tree visit where we don't know if we can spawn task or not and spawning task itself
/// is not a cheap operation.
///
/// Yes, I don't like the name. Give me a good one.
/// </summary>
/// <param name="concurrency">Desired concurrency which include the calling thread. So slot is slot-1.</param>
public class ConcurrencyController(int concurrency)
{
private int _slots = concurrency;

public bool TryTakeSlot(out Slot returner)
{
returner = new Slot(this);
int newSlot = Volatile.Read(ref _slots);
if (newSlot < 2)
{
return false;
}

newSlot = Interlocked.Decrement(ref _slots);
if (newSlot < 1)
{
Interlocked.Increment(ref _slots);
return false;
}

return true;
}

private void ReturnSlot()
{
Interlocked.Increment(ref _slots);
}

public readonly struct Slot(ConcurrencyController limiter) : IDisposable
{
public void Dispose()
{
limiter.ReturnSlot();
}
}
}
67 changes: 37 additions & 30 deletions src/Nethermind/Nethermind.Trie/TrieNode.Visitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@

using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Nethermind.Core;
using Nethermind.Core.Collections;
using Nethermind.Core.Cpu;
using Nethermind.Core.Crypto;
using Nethermind.Core.Threading;
using Nethermind.Serialization.Rlp;
using Nethermind.Trie.Pruning;

Expand Down Expand Up @@ -187,31 +190,47 @@ void VisitSingleThread(ref TreePath parentPath, ITreeVisitor<TNodeContext> treeV
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
void VisitMultiThread(TreePath parentPath, ITreeVisitor<TNodeContext> treeVisitor, in TNodeContext nodeContext, ITrieNodeResolver trieNodeResolver, TrieVisitContext visitContext, TrieNode?[] children)
void VisitMultiThread(TreePath parentPath, ITreeVisitor<TNodeContext> treeVisitor, in TNodeContext nodeContext, ITrieNodeResolver trieNodeResolver, TrieVisitContext visitContext)
{
var copy = nodeContext;
// we need to preallocate children
TNodeContext contextCopy = nodeContext;

// multithreaded route
Parallel.For(0, BranchesCount, RuntimeInformation.ParallelOptionsPhysicalCores, i =>
ArrayPoolList<Task>? tasks = null;
for (int i = 0; i < BranchesCount; i++)
{
visitContext.Semaphore.Wait();
try
if (i < BranchesCount - 1 && visitContext.ConcurrencyController.TryTakeSlot(out ConcurrencyController.Slot returner))
{
TreePath closureParentPath = parentPath;
// we need to have separate context for each thread as context tracks level and branch child index
TrieVisitContext childContext = visitContext.Clone();
VisitChild(ref closureParentPath, i, children[i], trieNodeResolver, treeVisitor, copy, childContext);
tasks ??= new ArrayPoolList<Task>(BranchesCount);
tasks.Add(SpawnChildVisit(parentPath, i, GetChild(nodeResolver, ref parentPath, i), returner));
}
finally
else
{
visitContext.Semaphore.Release();
VisitChild(ref parentPath, i, GetChild(nodeResolver, ref parentPath, i), trieNodeResolver, treeVisitor, contextCopy, visitContext);
}
});
}

if (tasks is { Count: > 0 })
{
Task.WaitAll(tasks.ToArray());
tasks.Dispose();
}
return;

Task SpawnChildVisit(TreePath closureParentPath, int i, TrieNode? childNode, ConcurrencyController.Slot slotReturner) =>
Task.Run(() =>
{
using ConcurrencyController.Slot _ = slotReturner;

// we need to have separate context for each thread as context tracks level and branch child index
TrieVisitContext childContext = visitContext.Clone();
VisitChild(ref closureParentPath, i, childNode, trieNodeResolver, treeVisitor,
contextCopy, childContext);
});
}

static void VisitAllSingleThread(TrieNode currentNode, ref TreePath path, ITreeVisitor<TNodeContext> visitor, TNodeContext nodeContext, ITrieNodeResolver nodeResolver, TrieVisitContext visitContext)
{
TrieNode?[] output = new TrieNode?[16];
TrieNode?[] output = new TrieNode?[BranchesCount];
currentNode.ResolveAllChildBranch(nodeResolver, ref path, output);
path.AppendMut(0);
for (int i = 0; i < 16; i++)
Expand All @@ -234,23 +253,11 @@ static void VisitAllSingleThread(TrieNode currentNode, ref TreePath path, ITreeV
trieVisitContext.AddVisited();
trieVisitContext.Level++;

if (trieVisitContext.MaxDegreeOfParallelism != 1 && trieVisitContext.Semaphore.CurrentCount > 1)
// Limiting the multithread path to top state tree and first level storage double the throughput on mainnet.
// Top level state split to 16^3 while storage is 16, which should be ok for large contract in most case.
if (trieVisitContext.MaxDegreeOfParallelism != 1 && (trieVisitContext.IsStorage ? path.Length == 0 : path.Length <= 2))
{
// we need to preallocate children
TrieNode?[] children = new TrieNode?[BranchesCount];
for (int i = 0; i < BranchesCount; i++)
{
children[i] = GetChild(nodeResolver, ref path, i);
}

if (trieVisitContext.Semaphore.CurrentCount > 1)
{
VisitMultiThread(path, visitor, nodeContext, nodeResolver, trieVisitContext, children);
}
else
{
VisitSingleThread(ref path, visitor, nodeContext, nodeResolver, trieVisitContext);
}
VisitMultiThread(path, visitor, nodeContext, nodeResolver, trieVisitContext);
}
else
{
Expand Down
22 changes: 7 additions & 15 deletions src/Nethermind/Nethermind.Trie/VisitContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@
using System;
using System.Runtime.InteropServices;
using System.Threading;
using Nethermind.Core.Threading;

namespace Nethermind.Trie
{
public class TrieVisitContext : IDisposable
{
private SemaphoreSlim? _semaphore;
private readonly int _maxDegreeOfParallelism = 1;
private int _visitedNodes;

private ConcurrencyController? _threadLimiter = null;
public ConcurrencyController ConcurrencyController => _threadLimiter ??= new ConcurrencyController(MaxDegreeOfParallelism);

public int Level { get; internal set; }
public bool IsStorage { get; set; }
public int? BranchChildIndex { get; internal set; }
Expand All @@ -22,28 +25,17 @@ public class TrieVisitContext : IDisposable
public int MaxDegreeOfParallelism
{
get => _maxDegreeOfParallelism;
internal init => _maxDegreeOfParallelism = VisitingOptions.AdjustMaxDegreeOfParallelism(value);
}

public SemaphoreSlim Semaphore
{
get
internal init
{
if (_semaphore is null)
{
if (MaxDegreeOfParallelism == 1) throw new InvalidOperationException("Can not create semaphore for single threaded trie visitor.");
_semaphore = new SemaphoreSlim(MaxDegreeOfParallelism, MaxDegreeOfParallelism);
}

return _semaphore;
_maxDegreeOfParallelism = VisitingOptions.AdjustMaxDegreeOfParallelism(value);
_threadLimiter = null;
}
}

public TrieVisitContext Clone() => (TrieVisitContext)MemberwiseClone();

public void Dispose()
{
_semaphore?.Dispose();
}

public void AddVisited()
Expand Down

0 comments on commit 159c2a4

Please sign in to comment.