Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce lock contention #6417

Merged
merged 24 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
583f966
Reduce TxPool lock contention
benaadams Dec 22, 2023
659cc29
Merge branch 'master' into reduce-txpool-lock-contention
benaadams Dec 22, 2023
638769f
Reduce lock contention with added spice
benaadams Dec 24, 2023
e766902
Use McsLock for LruCache
benaadams Dec 24, 2023
5554588
Whitespace
benaadams Dec 24, 2023
3d7d1a7
Calculate hashes in parallel
benaadams Dec 24, 2023
4451666
Name clash with extension and invalid round trip check
benaadams Dec 24, 2023
8291ab5
Merge branch 'master' into reduce-txpool-lock-spice
benaadams Dec 24, 2023
03dcb36
Merge branch 'master' into reduce-txpool-lock-spice
benaadams Dec 24, 2023
aab847c
Merge branch 'master' into reduce-txpool-lock-spice
benaadams Dec 26, 2023
83f8825
Merge branch 'master' into reduce-txpool-lock-spice
benaadams Dec 26, 2023
d4b15fb
Add tests
benaadams Dec 27, 2023
7bf7c09
Merge branch 'master' into reduce-txpool-lock-spice
benaadams Dec 27, 2023
1704393
Fix
benaadams Dec 27, 2023
f43ac5e
Use monitor signalling to wake up sleeping threads
benaadams Dec 27, 2023
8f4a7de
Additional comments
benaadams Dec 27, 2023
35ab582
Merge branch 'master' into reduce-txpool-lock-spice
benaadams Dec 27, 2023
8bed013
throttle db read/writes
benaadams Dec 27, 2023
dcaaab2
Less contention on the priority lock
benaadams Dec 28, 2023
bfb52fe
Whitespace
benaadams Dec 28, 2023
81a42da
Merge branch 'master' into throttle-db
benaadams Dec 28, 2023
af39811
Merge branch 'reduce-txpool-lock-spice' into throttle-db
benaadams Dec 28, 2023
407e68b
Boost forkchoice
benaadams Dec 28, 2023
f2782fd
Merge branch 'master' into reduce-txpool-lock-spice
benaadams Dec 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 44 additions & 10 deletions src/Nethermind/Nethermind.Consensus/Processing/RecoverSignature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,55 @@ public void RecoverData(Block block)
// so we assume the rest of txs in the block are already recovered
return;

var releaseSpec = _specProvider.GetSpec(block.Header);

Parallel.ForEach(
block.Transactions.Where(tx => tx.IsSigned && tx.SenderAddress is null),
block.Transactions.Where(tx => !tx.IsHashCalculated),
blockTransaction =>
{
_txPool.TryGetPendingTransaction(blockTransaction.Hash, out Transaction? transaction);
blockTransaction.CalculateHashInternal();
});

Address sender = transaction?.SenderAddress;
Address blockTransactionAddress = blockTransaction.SenderAddress;
var releaseSpec = _specProvider.GetSpec(block.Header);

blockTransaction.SenderAddress =
sender ?? _ecdsa.RecoverAddress(blockTransaction, !releaseSpec.ValidateChainId);
if (_logger.IsTrace) _logger.Trace($"Recovered {blockTransaction.SenderAddress} sender for {blockTransaction.Hash} (tx pool cached value: {sender}, block transaction address: {blockTransactionAddress})");
});
int recoverFromEcdsa = 0;
// Don't access txPool in Parallel loop as increases contention
foreach (Transaction blockTransaction in block.Transactions.Where(tx => tx.IsSigned && tx.SenderAddress is null))
{
_txPool.TryGetPendingTransaction(blockTransaction.Hash, out Transaction? transaction);

Address sender = transaction?.SenderAddress;
if (sender != null)
{
blockTransaction.SenderAddress = sender;

if (_logger.IsTrace) _logger.Trace($"Recovered {blockTransaction.SenderAddress} sender for {blockTransaction.Hash} (tx pool cached value: {sender})");
}
else
{
recoverFromEcdsa++;
}
}

if (recoverFromEcdsa >= 4)
benaadams marked this conversation as resolved.
Show resolved Hide resolved
{
// Recover ecdsa in Parallel
Parallel.ForEach(
block.Transactions.Where(tx => tx.IsSigned && tx.SenderAddress is null),
blockTransaction =>
{
blockTransaction.SenderAddress = _ecdsa.RecoverAddress(blockTransaction, !releaseSpec.ValidateChainId);

if (_logger.IsTrace) _logger.Trace($"Recovered {blockTransaction.SenderAddress} sender for {blockTransaction.Hash}");
});
}
else if (recoverFromEcdsa > 0)
{
foreach (Transaction blockTransaction in block.Transactions.Where(tx => tx.IsSigned && tx.SenderAddress is null))
{
blockTransaction.SenderAddress = _ecdsa.RecoverAddress(blockTransaction, !releaseSpec.ValidateChainId);

if (_logger.IsTrace) _logger.Trace($"Recovered {blockTransaction.SenderAddress} sender for {blockTransaction.Hash}");
}
}
}
}
}
114 changes: 114 additions & 0 deletions src/Nethermind/Nethermind.Core.Test/MCSLockTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using Nethermind.Core.Threading;
using NUnit.Framework;

using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace Nethermind.Core.Test;

[TestFixture]
public class MCSLockTests
{
private McsLock mcsLock;

[SetUp]
public void Setup()
{
mcsLock = new McsLock();
}

[Test]
public void SingleThreadAcquireRelease()
{
using (var handle = mcsLock.Acquire())
{
Thread.Sleep(10);
}

Assert.Pass(); // Test passes if no deadlock or exception occurs.
}

[Test]
public void MultipleThreads()
{
int counter = 0;
int numberOfThreads = 10;
var threads = new List<Thread>();

for (int i = 0; i < numberOfThreads; i++)
{
var thread = new Thread(() =>
{
using var handle = mcsLock.Acquire();

counter++;
});
threads.Add(thread);
thread.Start();
}

foreach (Thread thread in threads)
{
thread.Join(); // Wait for all threads to complete.
}

Assert.That(counter, Is.EqualTo(numberOfThreads)); // Counter should equal the number of threads.
}

[Test]
public void LockFairnessTest()
{
int numberOfThreads = 10;
var executionOrder = new List<int>();
var threads = new List<Thread>();

for (int i = 0; i < numberOfThreads; i++)
{
int threadId = i;
var thread = new Thread(() =>
{
using var handle = mcsLock.Acquire();
executionOrder.Add(threadId);
Thread.Sleep(15); // Ensure the order is maintained
});
threads.Add(thread);
thread.Start();
Thread.Sleep(1); // Ensure the order is maintained
}

foreach (Thread thread in threads)
{
thread.Join();
}

var expectedOrder = Enumerable.Range(0, numberOfThreads).ToList();
CollectionAssert.AreEqual(expectedOrder, executionOrder, "Threads did not acquire lock in the order they were started.");
}

[Test]
public void NonReentrantTest()
{
bool reentrancyDetected = false;
var thread = new Thread(() =>
{
using var handle = mcsLock.Acquire();
try
{
using var innerHandle = mcsLock.Acquire(); // Attempt to re-lock
}
catch
{
reentrancyDetected = true;
}
});

thread.Start();
thread.Join();

Assert.IsTrue(reentrancyDetected, "Reentrancy was not properly detected.");
}
}
171 changes: 171 additions & 0 deletions src/Nethermind/Nethermind.Core.Test/McsPriorityLock.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using Nethermind.Core.Threading;
using NUnit.Framework;
using NUnit.Framework.Internal;

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace Nethermind.Core.Test;

[TestFixture]
public class McsPriorityLockTests
{
private McsPriorityLock mcsLock;

[SetUp]
public void Setup()
{
mcsLock = new McsPriorityLock();
}

[Test]
public void SingleThreadAcquireRelease()
{
using (var handle = mcsLock.Acquire())
{
Thread.Sleep(10);
}

Assert.Pass(); // Test passes if no deadlock or exception occurs.
}

[Test]
public void MultipleThreads()
{
int counter = 0;
int numberOfThreads = 10;
var threads = new List<Thread>();

for (int i = 0; i < numberOfThreads; i++)
{
var thread = new Thread(() =>
{
using var handle = mcsLock.Acquire();

counter++;
});
threads.Add(thread);
thread.Start();
}

foreach (Thread thread in threads)
{
thread.Join(); // Wait for all threads to complete.
}

Assert.That(counter, Is.EqualTo(numberOfThreads)); // Counter should equal the number of threads.
}

[Test]
public void LockFairnessTest()
{
int numberOfThreads = 10;
var executionOrder = new List<int>();
var threads = new List<Thread>();

for (int i = 0; i < numberOfThreads; i++)
{
int threadId = i;
var thread = new Thread(() =>
{
using var handle = mcsLock.Acquire();
executionOrder.Add(threadId);
Thread.Sleep(15); // Ensure the order is maintained
});
threads.Add(thread);
thread.Start();
Thread.Sleep(1); // Ensure the order is maintained
}

foreach (Thread thread in threads)
{
thread.Join();
}

var expectedOrder = Enumerable.Range(0, numberOfThreads).ToList();
CollectionAssert.AreEqual(expectedOrder, executionOrder, "Threads did not acquire lock in the order they were started.");
}


[Test]
public void PriorityQueueJumpingTest()
{
int numberOfThreads = 100;
var threads = new List<Thread>();
List<int> executionOrder = new();
Dictionary<Thread, ThreadPriority> threadPriorities = new();

// Create threads with varying priorities.
for (int i = 0; i < numberOfThreads; i++)
{
ThreadPriority priority = i % 2 == 0 ? ThreadPriority.Highest : ThreadPriority.Normal; // Alternate priorities
var thread = new Thread(() =>
{
using var handle = mcsLock.Acquire();
executionOrder.Add(Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(25); // Simulate work
});
thread.Priority = priority; // Set thread priority
threads.Add(thread);
threadPriorities[thread] = priority;
}

// Start threads.
foreach (var thread in threads)
{
thread.Start();
}

// Wait for all threads to complete.
foreach (var thread in threads)
{
thread.Join();
}

// Analyze execution order based on priority.
int lowPriorityFirst = 0;
for (int i = 0; i < executionOrder.Count - 1; i++)
{
int currentThreadId = executionOrder[i];
int nextThreadId = executionOrder[i + 1];
Thread currentThread = threads.First(t => t.ManagedThreadId == currentThreadId);
Thread nextThread = threads.First(t => t.ManagedThreadId == nextThreadId);

if (threadPriorities[currentThread] < threadPriorities[nextThread])
{
lowPriorityFirst++;
}
}

// Some lower priority threads will acquire first; we are asserting that they mostly queue jump
Assert.That(lowPriorityFirst < (numberOfThreads / 8), Is.True, "High priority threads did not acquire the lock before lower priority ones.");
}

[Test]
public void NonReentrantTest()
{
bool reentrancyDetected = false;
var thread = new Thread(() =>
{
using var handle = mcsLock.Acquire();
try
{
using var innerHandle = mcsLock.Acquire(); // Attempt to re-lock
}
catch
{
reentrancyDetected = true;
}
});

thread.Start();
thread.Join();

Assert.IsTrue(reentrancyDetected, "Reentrancy was not properly detected.");
}
}
Loading