Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel Trie Branch KeyGeneration #7048

Merged
merged 7 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Serialization.Rlp/Rlp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,8 @@ public static int Encode(Span<byte> buffer, int position, Hash256 hash)
ThrowArgumentOutOfRangeException();
}

Unsafe.Add(ref MemoryMarshal.GetReference(buffer), position) = 160;
Unsafe.As<byte, ValueHash256>(ref Unsafe.Add(ref MemoryMarshal.GetReference(buffer), position + 1)) = hash.ValueHash256;
Unsafe.Add(ref MemoryMarshal.GetReference(buffer), (nuint)position) = 160;
Unsafe.As<byte, ValueHash256>(ref Unsafe.Add(ref MemoryMarshal.GetReference(buffer), (nuint)position + 1)) = hash.ValueHash256;
benaadams marked this conversation as resolved.
Show resolved Hide resolved
return newPosition;

[DoesNotReturn]
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.State/PersistentStorageProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ void UpdateRootHashesSingleThread()
}

StorageTree storageTree = kvp.Value;
storageTree.UpdateRootHash();
storageTree.UpdateRootHash(canBeParallel: true);
_stateProvider.UpdateStorageRoot(address: kvp.Key, storageTree.RootHash);
}
}
Expand All @@ -253,7 +253,7 @@ void UpdateRootHashesMultiThread()
return;
}
StorageTree storageTree = kvp.Value;
storageTree.UpdateRootHash();
storageTree.UpdateRootHash(canBeParallel: false);
});

// Update the storage roots in the main thread non in parallel
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Trie/PatriciaTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,10 @@ void TraceSkipInlineNode(TrieNode node)
}
}

public void UpdateRootHash()
public void UpdateRootHash(bool canBeParallel = true)
{
TreePath path = TreePath.Empty;
RootRef?.ResolveKey(TrieStore, ref path, isRoot: true, bufferPool: _bufferPool);
RootRef?.ResolveKey(TrieStore, ref path, isRoot: true, bufferPool: _bufferPool, canBeParallel);
SetRootHash(RootRef?.Keccak ?? EmptyTreeHash, false);
}

Expand Down
109 changes: 97 additions & 12 deletions src/Nethermind/Nethermind.Trie/TrieNode.Decoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Nethermind.Core.Buffers;
using Nethermind.Core.Crypto;
using Nethermind.Serialization.Rlp;
Expand Down Expand Up @@ -125,12 +127,12 @@ private static void ThrowNullKey(TrieNode node)
throw new TrieException($"Hex prefix of a leaf node is null at node {node.Keccak}");
}

public static CappedArray<byte> RlpEncodeBranch(TrieNode item, ITrieNodeResolver tree, ref TreePath path, ICappedArrayPool? pool)
public static CappedArray<byte> RlpEncodeBranch(TrieNode item, ITrieNodeResolver tree, ref TreePath path, ICappedArrayPool? pool, bool canBeParallel)
{
Metrics.TreeNodeRlpEncodings++;

int valueRlpLength = AllowBranchValues ? Rlp.LengthOf(item.Value.AsSpan()) : 1;
int contentLength = valueRlpLength + GetChildrenRlpLengthForBranch(tree, ref path, item, pool);
int contentLength = valueRlpLength + (UseParallel(canBeParallel) ? GetChildrenRlpLengthForBranchParallel(tree, ref path, item, pool) : GetChildrenRlpLengthForBranch(tree, ref path, item, pool));
int sequenceLength = Rlp.LengthOfSequence(contentLength);
CappedArray<byte> result = pool.SafeRentBuffer(sequenceLength);
Span<byte> resultSpan = result.AsSpan();
Expand All @@ -147,28 +149,69 @@ public static CappedArray<byte> RlpEncodeBranch(TrieNode item, ITrieNodeResolver
}

return result;

static bool UseParallel(bool canBeParallel) => Environment.ProcessorCount > 1 && canBeParallel;
}

private static int GetChildrenRlpLengthForBranch(ITrieNodeResolver tree, ref TreePath path, TrieNode item, ICappedArrayPool? bufferPool)
{
item.EnsureInitialized();
// Tail call optimized.
if (item.HasRlp)
{
return GetChildrenRlpLengthForBranchRlp(tree, ref path, item, bufferPool);
}
else
{
return GetChildrenRlpLengthForBranchNonRlp(tree, ref path, item, bufferPool);
}
return item.HasRlp
? GetChildrenRlpLengthForBranchRlp(tree, ref path, item, bufferPool)
: GetChildrenRlpLengthForBranchNonRlp(tree, ref path, item, bufferPool);
}

private static int GetChildrenRlpLengthForBranchParallel(ITrieNodeResolver tree, ref TreePath path, TrieNode item, ICappedArrayPool? bufferPool)
{
item.EnsureInitialized();
// Tail call optimized.
return item.HasRlp
? GetChildrenRlpLengthForBranchRlpParallel(tree, path, item, bufferPool)
: GetChildrenRlpLengthForBranchNonRlpParallel(tree, path, item, bufferPool);
}

private static int GetChildrenRlpLengthForBranchNonRlpParallel(ITrieNodeResolver tree, TreePath rootPath, TrieNode item, ICappedArrayPool bufferPool)
{
int totalLength = 0;
Parallel.For(0, BranchesCount,
() => 0,
(i, _, local) =>
{
object? data = item._data[i];
if (ReferenceEquals(data, _nullNode) || data is null)
{
local++;
}
else if (data is Hash256)
{
local += Rlp.LengthOfKeccakRlp;
}
else
{
TreePath path = rootPath;
path.AppendMut(i);
TrieNode childNode = Unsafe.As<TrieNode>(data);
childNode.ResolveKey(tree, ref path, isRoot: false, bufferPool: bufferPool);
local += childNode.Keccak is null ? childNode.FullRlp.Length : Rlp.LengthOfKeccakRlp;
}

return local;
},
local =>
{
Interlocked.Add(ref totalLength, local);
});

return totalLength;
}

private static int GetChildrenRlpLengthForBranchNonRlp(ITrieNodeResolver tree, ref TreePath path, TrieNode item, ICappedArrayPool bufferPool)
{
int totalLength = 0;
for (int i = 0; i < BranchesCount; i++)
{
object data = item._data[i];
object? data = item._data[i];
if (ReferenceEquals(data, _nullNode) || data is null)
{
totalLength++;
Expand All @@ -180,7 +223,7 @@ private static int GetChildrenRlpLengthForBranchNonRlp(ITrieNodeResolver tree, r
else
{
path.AppendMut(i);
TrieNode childNode = (TrieNode)data;
TrieNode childNode = Unsafe.As<TrieNode>(data);
childNode.ResolveKey(tree, ref path, isRoot: false, bufferPool: bufferPool);
path.TruncateOne();
totalLength += childNode.Keccak is null ? childNode.FullRlp.Length : Rlp.LengthOfKeccakRlp;
Expand All @@ -189,6 +232,48 @@ private static int GetChildrenRlpLengthForBranchNonRlp(ITrieNodeResolver tree, r
return totalLength;
}

private static int GetChildrenRlpLengthForBranchRlpParallel(ITrieNodeResolver tree, TreePath rootPath, TrieNode item, ICappedArrayPool? bufferPool)
{
int totalLength = 0;
Parallel.For(0, BranchesCount,
() => 0,
(i, _, local) =>
{
ValueRlpStream rlpStream = item.RlpStream;
item.SeekChild(ref rlpStream, i);
object? data = item._data[i];
if (data is null)
{
local += rlpStream.PeekNextRlpLength();
}
else if (ReferenceEquals(data, _nullNode))
{
local++;
}
else if (data is Hash256)
{
local += Rlp.LengthOfKeccakRlp;
}
else
{
TreePath path = rootPath;
path.AppendMut(i);
Debug.Assert(data is TrieNode, "Data is not TrieNode");
TrieNode childNode = Unsafe.As<TrieNode>(data);
childNode.ResolveKey(tree, ref path, isRoot: false, bufferPool: bufferPool);
local += childNode.Keccak is null ? childNode.FullRlp.Length : Rlp.LengthOfKeccakRlp;
}

return local;
},
local =>
{
Interlocked.Add(ref totalLength, local);
});

return totalLength;
}

private static int GetChildrenRlpLengthForBranchRlp(ITrieNodeResolver tree, ref TreePath path, TrieNode item, ICappedArrayPool? bufferPool)
{
int totalLength = 0;
Expand Down
10 changes: 5 additions & 5 deletions src/Nethermind/Nethermind.Trie/TrieNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ private bool DecodeRlp(ValueRlpStream rlpStream, ICappedArrayPool bufferPool, ou
return true;
}

public void ResolveKey(ITrieNodeResolver tree, ref TreePath path, bool isRoot, ICappedArrayPool? bufferPool = null)
public void ResolveKey(ITrieNodeResolver tree, ref TreePath path, bool isRoot, ICappedArrayPool? bufferPool = null, bool canBeParallel = true)
{
if (Keccak is not null)
{
Expand All @@ -500,17 +500,17 @@ public void ResolveKey(ITrieNodeResolver tree, ref TreePath path, bool isRoot, I
return;
}

Keccak = GenerateKey(tree, ref path, isRoot, bufferPool);
Keccak = GenerateKey(tree, ref path, isRoot, bufferPool, canBeParallel);
}

public Hash256? GenerateKey(ITrieNodeResolver tree, ref TreePath path, bool isRoot, ICappedArrayPool? bufferPool = null)
public Hash256? GenerateKey(ITrieNodeResolver tree, ref TreePath path, bool isRoot, ICappedArrayPool? bufferPool = null, bool canBeParallel = true)
{
RlpFactory rlp = _rlp;
if (rlp is null || IsDirty)
{
ref readonly CappedArray<byte> oldRlp = ref rlp is not null ? ref rlp.Data : ref CappedArray<byte>.Empty;
CappedArray<byte> fullRlp = NodeType == NodeType.Branch ?
TrieNodeDecoder.RlpEncodeBranch(this, tree, ref path, bufferPool) :
TrieNodeDecoder.RlpEncodeBranch(this, tree, ref path, bufferPool, canBeParallel: isRoot && canBeParallel) :
RlpEncode(tree, ref path, bufferPool);

if (fullRlp.IsNotNullOrEmpty)
Expand All @@ -536,7 +536,7 @@ internal CappedArray<byte> RlpEncode(ITrieNodeResolver tree, ref TreePath path,
{
return NodeType switch
{
NodeType.Branch => TrieNodeDecoder.RlpEncodeBranch(this, tree, ref path, bufferPool),
NodeType.Branch => TrieNodeDecoder.RlpEncodeBranch(this, tree, ref path, bufferPool, canBeParallel: false),
NodeType.Extension => TrieNodeDecoder.EncodeExtension(this, tree, ref path, bufferPool),
NodeType.Leaf => TrieNodeDecoder.EncodeLeaf(this, bufferPool),
_ => ThrowUnhandledNodeType(this)
Expand Down