Skip to content

Commit

Permalink
Parallel Trie Branch KeyGeneration (#7048)
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams authored May 20, 2024
1 parent 1df6ed6 commit 0a0a1c9
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 23 deletions.
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Serialization.Rlp/Rlp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,8 @@ public static int Encode(Span<byte> buffer, int position, Hash256 hash)
ThrowArgumentOutOfRangeException();
}

Unsafe.Add(ref MemoryMarshal.GetReference(buffer), position) = 160;
Unsafe.As<byte, ValueHash256>(ref Unsafe.Add(ref MemoryMarshal.GetReference(buffer), position + 1)) = hash.ValueHash256;
Unsafe.Add(ref MemoryMarshal.GetReference(buffer), (nuint)position) = 160;
Unsafe.As<byte, ValueHash256>(ref Unsafe.Add(ref MemoryMarshal.GetReference(buffer), (nuint)position + 1)) = hash.ValueHash256;
return newPosition;

[DoesNotReturn]
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.State/PersistentStorageProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ void UpdateRootHashesSingleThread()
}

StorageTree storageTree = kvp.Value;
storageTree.UpdateRootHash();
storageTree.UpdateRootHash(canBeParallel: true);
_stateProvider.UpdateStorageRoot(address: kvp.Key, storageTree.RootHash);
}
}
Expand All @@ -253,7 +253,7 @@ void UpdateRootHashesMultiThread()
return;
}
StorageTree storageTree = kvp.Value;
storageTree.UpdateRootHash();
storageTree.UpdateRootHash(canBeParallel: false);
});

// Update the storage roots in the main thread non in parallel
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Trie/PatriciaTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,10 @@ void TraceSkipInlineNode(TrieNode node)
}
}

public void UpdateRootHash()
public void UpdateRootHash(bool canBeParallel = true)
{
TreePath path = TreePath.Empty;
RootRef?.ResolveKey(TrieStore, ref path, isRoot: true, bufferPool: _bufferPool);
RootRef?.ResolveKey(TrieStore, ref path, isRoot: true, bufferPool: _bufferPool, canBeParallel);
SetRootHash(RootRef?.Keccak ?? EmptyTreeHash, false);
}

Expand Down
109 changes: 97 additions & 12 deletions src/Nethermind/Nethermind.Trie/TrieNode.Decoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Nethermind.Core.Buffers;
using Nethermind.Core.Crypto;
using Nethermind.Serialization.Rlp;
Expand Down Expand Up @@ -125,12 +127,12 @@ private static void ThrowNullKey(TrieNode node)
throw new TrieException($"Hex prefix of a leaf node is null at node {node.Keccak}");
}

public static CappedArray<byte> RlpEncodeBranch(TrieNode item, ITrieNodeResolver tree, ref TreePath path, ICappedArrayPool? pool)
public static CappedArray<byte> RlpEncodeBranch(TrieNode item, ITrieNodeResolver tree, ref TreePath path, ICappedArrayPool? pool, bool canBeParallel)
{
Metrics.TreeNodeRlpEncodings++;

int valueRlpLength = AllowBranchValues ? Rlp.LengthOf(item.Value.AsSpan()) : 1;
int contentLength = valueRlpLength + GetChildrenRlpLengthForBranch(tree, ref path, item, pool);
int contentLength = valueRlpLength + (UseParallel(canBeParallel) ? GetChildrenRlpLengthForBranchParallel(tree, ref path, item, pool) : GetChildrenRlpLengthForBranch(tree, ref path, item, pool));
int sequenceLength = Rlp.LengthOfSequence(contentLength);
CappedArray<byte> result = pool.SafeRentBuffer(sequenceLength);
Span<byte> resultSpan = result.AsSpan();
Expand All @@ -147,28 +149,69 @@ public static CappedArray<byte> RlpEncodeBranch(TrieNode item, ITrieNodeResolver
}

return result;

static bool UseParallel(bool canBeParallel) => Environment.ProcessorCount > 1 && canBeParallel;
}

private static int GetChildrenRlpLengthForBranch(ITrieNodeResolver tree, ref TreePath path, TrieNode item, ICappedArrayPool? bufferPool)
{
item.EnsureInitialized();
// Tail call optimized.
if (item.HasRlp)
{
return GetChildrenRlpLengthForBranchRlp(tree, ref path, item, bufferPool);
}
else
{
return GetChildrenRlpLengthForBranchNonRlp(tree, ref path, item, bufferPool);
}
return item.HasRlp
? GetChildrenRlpLengthForBranchRlp(tree, ref path, item, bufferPool)
: GetChildrenRlpLengthForBranchNonRlp(tree, ref path, item, bufferPool);
}

private static int GetChildrenRlpLengthForBranchParallel(ITrieNodeResolver tree, ref TreePath path, TrieNode item, ICappedArrayPool? bufferPool)
{
item.EnsureInitialized();
// Tail call optimized.
return item.HasRlp
? GetChildrenRlpLengthForBranchRlpParallel(tree, path, item, bufferPool)
: GetChildrenRlpLengthForBranchNonRlpParallel(tree, path, item, bufferPool);
}

private static int GetChildrenRlpLengthForBranchNonRlpParallel(ITrieNodeResolver tree, TreePath rootPath, TrieNode item, ICappedArrayPool bufferPool)
{
int totalLength = 0;
Parallel.For(0, BranchesCount,
() => 0,
(i, _, local) =>
{
object? data = item._data[i];
if (ReferenceEquals(data, _nullNode) || data is null)
{
local++;
}
else if (data is Hash256)
{
local += Rlp.LengthOfKeccakRlp;
}
else
{
TreePath path = rootPath;
path.AppendMut(i);
TrieNode childNode = Unsafe.As<TrieNode>(data);
childNode.ResolveKey(tree, ref path, isRoot: false, bufferPool: bufferPool);
local += childNode.Keccak is null ? childNode.FullRlp.Length : Rlp.LengthOfKeccakRlp;
}

return local;
},
local =>
{
Interlocked.Add(ref totalLength, local);
});

return totalLength;
}

private static int GetChildrenRlpLengthForBranchNonRlp(ITrieNodeResolver tree, ref TreePath path, TrieNode item, ICappedArrayPool bufferPool)
{
int totalLength = 0;
for (int i = 0; i < BranchesCount; i++)
{
object data = item._data[i];
object? data = item._data[i];
if (ReferenceEquals(data, _nullNode) || data is null)
{
totalLength++;
Expand All @@ -180,7 +223,7 @@ private static int GetChildrenRlpLengthForBranchNonRlp(ITrieNodeResolver tree, r
else
{
path.AppendMut(i);
TrieNode childNode = (TrieNode)data;
TrieNode childNode = Unsafe.As<TrieNode>(data);
childNode.ResolveKey(tree, ref path, isRoot: false, bufferPool: bufferPool);
path.TruncateOne();
totalLength += childNode.Keccak is null ? childNode.FullRlp.Length : Rlp.LengthOfKeccakRlp;
Expand All @@ -189,6 +232,48 @@ private static int GetChildrenRlpLengthForBranchNonRlp(ITrieNodeResolver tree, r
return totalLength;
}

private static int GetChildrenRlpLengthForBranchRlpParallel(ITrieNodeResolver tree, TreePath rootPath, TrieNode item, ICappedArrayPool? bufferPool)
{
int totalLength = 0;
Parallel.For(0, BranchesCount,
() => 0,
(i, _, local) =>
{
ValueRlpStream rlpStream = item.RlpStream;
item.SeekChild(ref rlpStream, i);
object? data = item._data[i];
if (data is null)
{
local += rlpStream.PeekNextRlpLength();
}
else if (ReferenceEquals(data, _nullNode))
{
local++;
}
else if (data is Hash256)
{
local += Rlp.LengthOfKeccakRlp;
}
else
{
TreePath path = rootPath;
path.AppendMut(i);
Debug.Assert(data is TrieNode, "Data is not TrieNode");
TrieNode childNode = Unsafe.As<TrieNode>(data);
childNode.ResolveKey(tree, ref path, isRoot: false, bufferPool: bufferPool);
local += childNode.Keccak is null ? childNode.FullRlp.Length : Rlp.LengthOfKeccakRlp;
}

return local;
},
local =>
{
Interlocked.Add(ref totalLength, local);
});

return totalLength;
}

private static int GetChildrenRlpLengthForBranchRlp(ITrieNodeResolver tree, ref TreePath path, TrieNode item, ICappedArrayPool? bufferPool)
{
int totalLength = 0;
Expand Down
10 changes: 5 additions & 5 deletions src/Nethermind/Nethermind.Trie/TrieNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ private bool DecodeRlp(ValueRlpStream rlpStream, ICappedArrayPool bufferPool, ou
return true;
}

public void ResolveKey(ITrieNodeResolver tree, ref TreePath path, bool isRoot, ICappedArrayPool? bufferPool = null)
public void ResolveKey(ITrieNodeResolver tree, ref TreePath path, bool isRoot, ICappedArrayPool? bufferPool = null, bool canBeParallel = true)
{
if (Keccak is not null)
{
Expand All @@ -500,17 +500,17 @@ public void ResolveKey(ITrieNodeResolver tree, ref TreePath path, bool isRoot, I
return;
}

Keccak = GenerateKey(tree, ref path, isRoot, bufferPool);
Keccak = GenerateKey(tree, ref path, isRoot, bufferPool, canBeParallel);
}

public Hash256? GenerateKey(ITrieNodeResolver tree, ref TreePath path, bool isRoot, ICappedArrayPool? bufferPool = null)
public Hash256? GenerateKey(ITrieNodeResolver tree, ref TreePath path, bool isRoot, ICappedArrayPool? bufferPool = null, bool canBeParallel = true)
{
RlpFactory rlp = _rlp;
if (rlp is null || IsDirty)
{
ref readonly CappedArray<byte> oldRlp = ref rlp is not null ? ref rlp.Data : ref CappedArray<byte>.Empty;
CappedArray<byte> fullRlp = NodeType == NodeType.Branch ?
TrieNodeDecoder.RlpEncodeBranch(this, tree, ref path, bufferPool) :
TrieNodeDecoder.RlpEncodeBranch(this, tree, ref path, bufferPool, canBeParallel: isRoot && canBeParallel) :
RlpEncode(tree, ref path, bufferPool);

if (fullRlp.IsNotNullOrEmpty)
Expand All @@ -536,7 +536,7 @@ internal CappedArray<byte> RlpEncode(ITrieNodeResolver tree, ref TreePath path,
{
return NodeType switch
{
NodeType.Branch => TrieNodeDecoder.RlpEncodeBranch(this, tree, ref path, bufferPool),
NodeType.Branch => TrieNodeDecoder.RlpEncodeBranch(this, tree, ref path, bufferPool, canBeParallel: false),
NodeType.Extension => TrieNodeDecoder.EncodeExtension(this, tree, ref path, bufferPool),
NodeType.Leaf => TrieNodeDecoder.EncodeLeaf(this, bufferPool),
_ => ThrowUnhandledNodeType(this)
Expand Down

0 comments on commit 0a0a1c9

Please sign in to comment.