Skip to content

Commit

Permalink
Feature/verify trie on state finish (#7691)
Browse files Browse the repository at this point in the history
Co-authored-by: Lukasz Rozmej <[email protected]>
Co-authored-by: Szymon Kulec <[email protected]>
Co-authored-by: Kamil Chodoła <[email protected]>
  • Loading branch information
4 people authored Oct 31, 2024
1 parent 159c2a4 commit 4eb7f09
Show file tree
Hide file tree
Showing 14 changed files with 284 additions and 63 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/sync-supported-chains.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ jobs:
--network $stripped_network \
--consensus-url $CONSENSUS_URL \
--execution-api-url $EXECUTION_URL \
--el-op-extra-flag Sync.VerifyTrieOnStateSyncFinished=true \
$extra_param
else
./build/sedge generate \
Expand All @@ -176,6 +177,7 @@ jobs:
--el-extra-flag Sync.DownloadBodiesInFastSync=false \
--el-extra-flag Sync.DownloadReceiptsInFastSync=false \
--el-extra-flag JsonRpc.EnabledModules=[Eth,Subscribe,Trace,TxPool,Web3,Personal,Proof,Net,Parity,Health,Rpc,Debug] \
--el-extra-flag Sync.VerifyTrieOnStateSyncFinished=true \
--el-extra-flag Sync.SnapSync=true \
--checkpoint-sync-url=${{ matrix.config.checkpoint-sync-url }}
fi
Expand All @@ -190,12 +192,14 @@ jobs:
declare -A good_logs
declare -A required_count
bad_logs["Corrupt"]=1
bad_logs["Exception"]=1
#bad_logs["Corrupt"]=1
#bad_logs["Exception"]=1
bad_logs["Error in verify trie"]=1
good_logs["Processed"]=0
required_count["Processed"]=20
required_count["Stats after finishing state"]=1
network="${{ matrix.config.network }}"
if [[ "$network" != "joc-mainnet" && "$network" != "joc-testnet" && "$network" != "linea-mainnet" && "$network" != "linea-sepolia" ]]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,7 @@ public interface ISyncConfig : IConfig

[ConfigItem(Description = "_Technical._ MultiSyncModeSelector will wait for header to completely sync first.", DefaultValue = "false", HiddenFromDocs = true)]
bool NeedToWaitForHeader { get; set; }

[ConfigItem(Description = "_Technical._ Run verify trie on state sync is finished.", DefaultValue = "false", HiddenFromDocs = true)]
bool VerifyTrieOnStateSyncFinished { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public string? PivotHash
public bool? SnapServingEnabled { get; set; } = null;
public int MultiSyncModeSelectorLoopTimerMs { get; set; } = 1000;
public bool NeedToWaitForHeader { get; set; }
public bool VerifyTrieOnStateSyncFinished { get; set; }
public bool TrieHealing { get; set; } = true;

public override string ToString()
Expand Down
8 changes: 8 additions & 0 deletions src/Nethermind/Nethermind.Core/ContainerBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Linq;
using System.Reflection;
using Autofac;
using Autofac.Core;
using Autofac.Features.AttributeFilters;

namespace Nethermind.Core;
Expand Down Expand Up @@ -115,6 +116,13 @@ public static ContainerBuilder RegisterNamedComponentInItsOwnLifetime<T>(this Co

return builder;
}

public static ContainerBuilder AddModule(this ContainerBuilder builder, IModule module)
{
builder.RegisterModule(module);

return builder;
}
}

/// <summary>
Expand Down
18 changes: 4 additions & 14 deletions src/Nethermind/Nethermind.Init/InitializeStateDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -198,20 +198,10 @@ public Task Execute(CancellationToken cancellationToken)

if (_api.Config<IInitConfig>().DiagnosticMode == DiagnosticMode.VerifyTrie)
{
Task.Run(() =>
{
try
{
_logger!.Info("Collecting trie stats and verifying that no nodes are missing...");
Hash256 stateRoot = getApi.BlockTree!.Head?.StateRoot ?? Keccak.EmptyTreeHash;
TrieStats stats = stateManager.GlobalStateReader.CollectStats(stateRoot, getApi.DbProvider.CodeDb, _api.LogManager);
_logger.Info($"Starting from {getApi.BlockTree.Head?.Number} {getApi.BlockTree.Head?.StateRoot}{Environment.NewLine}" + stats);
}
catch (Exception ex)
{
_logger!.Error(ex.ToString());
}
});
_logger!.Info("Collecting trie stats and verifying that no nodes are missing...");
Hash256 stateRoot = getApi.BlockTree!.Head?.StateRoot ?? Keccak.EmptyTreeHash;
TrieStats stats = stateManager.GlobalStateReader.CollectStats(stateRoot, getApi.DbProvider.CodeDb, _api.LogManager, _api.ProcessExit!.Token);
_logger.Info($"Starting from {getApi.BlockTree.Head?.Number} {getApi.BlockTree.Head?.StateRoot}{Environment.NewLine}" + stats);
}

// Init state if we need system calls before actual processing starts
Expand Down
5 changes: 3 additions & 2 deletions src/Nethermind/Nethermind.State/StateReaderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading;
using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Core.Extensions;
Expand Down Expand Up @@ -47,9 +48,9 @@ public static bool HasStateForBlock(this IStateReader stateReader, BlockHeader h
return stateReader.HasStateForRoot(header.StateRoot!);
}

public static TrieStats CollectStats(this IStateReader stateProvider, Hash256 root, IKeyValueStore codeStorage, ILogManager logManager)
public static TrieStats CollectStats(this IStateReader stateProvider, Hash256 root, IKeyValueStore codeStorage, ILogManager logManager, CancellationToken cancellationToken = default)
{
TrieStatsCollector collector = new(codeStorage, logManager);
TrieStatsCollector collector = new(codeStorage, logManager, cancellationToken);
stateProvider.RunTreeVisitor(collector, root, new VisitingOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading;
using System.Threading.Tasks;
using Autofac;
using FluentAssertions;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Config;
using Nethermind.Consensus.Processing;
using Nethermind.Core;
using Nethermind.Core.Test.Builders;
using Nethermind.Db;
using Nethermind.Logging;
using Nethermind.State;
using Nethermind.Synchronization.FastSync;
using Nethermind.Trie;
using NSubstitute;
using NUnit.Framework;

namespace Nethermind.Synchronization.Test;

public class SynchronizerModuleTests
{
public IContainer CreateTestContainer()
{
ITreeSync treeSync = Substitute.For<ITreeSync>();
IStateReader stateReader = Substitute.For<IStateReader>();
IBlockProcessingQueue blockQueue = Substitute.For<IBlockProcessingQueue>();

return new ContainerBuilder()
.AddModule(new SynchronizerModule(new SyncConfig()
{
FastSync = true,
VerifyTrieOnStateSyncFinished = true
}))
.AddKeyedSingleton(DbNames.Code, Substitute.For<IDb>())
.AddSingleton(stateReader)
.AddSingleton(treeSync)
.AddSingleton(blockQueue)
.AddSingleton(Substitute.For<IProcessExitSource>())
.AddSingleton<ILogManager>(LimboLogs.Instance)
.Build();
}

[Test]
public void TestOnTreeSyncFinish_CallVisit()
{
IContainer ctx = CreateTestContainer();
ITreeSync treeSync = ctx.Resolve<ITreeSync>();
IStateReader stateReader = ctx.Resolve<IStateReader>();

treeSync.SyncCompleted += Raise.EventWith(null, new ITreeSync.SyncCompletedEventArgs(TestItem.KeccakA));

stateReader
.Received()
.RunTreeVisitor(Arg.Any<ITreeVisitor>(), Arg.Is(TestItem.KeccakA), Arg.Any<VisitingOptions>());
}

[Test]
public async Task TestOnTreeSyncFinish_BlockProcessingQueue_UntilFinished()
{
IContainer ctx = CreateTestContainer();
ITreeSync treeSync = ctx.Resolve<ITreeSync>();
IStateReader stateReader = ctx.Resolve<IStateReader>();
IBlockProcessingQueue blockQueue = ctx.Resolve<IBlockProcessingQueue>();

ManualResetEvent treeVisitorBlocker = new ManualResetEvent(false);

stateReader
.When(sr => sr.RunTreeVisitor(Arg.Any<ITreeVisitor>(), Arg.Is(TestItem.KeccakA), Arg.Any<VisitingOptions>()))
.Do((ci) =>
{
treeVisitorBlocker.WaitOne();
});

Task triggerTask = Task.Run(() =>
{
treeSync.SyncCompleted += Raise.EventWith(null, new ITreeSync.SyncCompletedEventArgs(TestItem.KeccakA));
});

await Task.Delay(100);

Task blockQueueTask = Task.Run(() =>
{
blockQueue.BlockRemoved +=
Raise.EventWith(null, new BlockRemovedEventArgs(null!, ProcessingResult.Success));
});

await Task.Delay(100);

blockQueueTask.IsCompleted.Should().BeFalse();
treeVisitorBlocker.Set();

await triggerTask;
await blockQueueTask;
blockQueue.BlockRemoved += Raise.EventWith(null, new BlockRemovedEventArgs(null!, ProcessingResult.Success));
}
}
17 changes: 17 additions & 0 deletions src/Nethermind/Nethermind.Synchronization/FastSync/ITreeSync.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using Nethermind.Core.Crypto;

namespace Nethermind.Synchronization.FastSync;

public interface ITreeSync
{
public event EventHandler<SyncCompletedEventArgs> SyncCompleted;

public class SyncCompletedEventArgs(Hash256 root) : EventArgs
{
public Hash256 Root => root;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace Nethermind.Synchronization.FastSync
{
public partial class StateSyncFeed : SyncFeed<StateSyncBatch?>, IDisposable
public class StateSyncFeed : SyncFeed<StateSyncBatch?>, IDisposable
{
private const StateSyncBatch EmptyBatch = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

namespace Nethermind.Synchronization.FastSync
{
public class TreeSync
public class TreeSync : ITreeSync
{
public const int AlreadySavedCapacity = 1024 * 1024;
public const int MaxRequestSize = 384;
Expand Down Expand Up @@ -76,6 +76,8 @@ public class TreeSync
private long _blockNumber;
private readonly SyncMode _syncMode;

public event EventHandler<ITreeSync.SyncCompletedEventArgs>? SyncCompleted;

public TreeSync([KeyFilter(DbNames.Code)] IDb codeDb, INodeStorage nodeStorage, IBlockTree blockTree, ILogManager logManager)
: this(SyncMode.StateNodes, codeDb, nodeStorage, blockTree, logManager)
{
Expand Down Expand Up @@ -707,7 +709,6 @@ private void VerifyPostSyncCleanUp()
}

_dependencies = new Dictionary<StateSyncItem.NodeKey, HashSet<DependentItem>>();
// _alreadySaved = new LruKeyCache<Keccak>(AlreadySavedCapacity, "saved nodes");
}

if (_pendingItems.Count != 0)
Expand All @@ -716,6 +717,8 @@ private void VerifyPostSyncCleanUp()
}

CleanupMemory();

SyncCompleted?.Invoke(this, new ITreeSync.SyncCompletedEventArgs(_rootNode));
}

private void CleanupMemory()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading;
using Autofac;
using Autofac.Features.AttributeFilters;
using Nethermind.Config;
using Nethermind.Consensus.Processing;
using Nethermind.Core.Crypto;
using Nethermind.Db;
using Nethermind.Logging;
using Nethermind.State;
using Nethermind.Trie;

namespace Nethermind.Synchronization.FastSync;

public class VerifyStateOnStateSyncFinished(
IBlockProcessingQueue processingQueue,
ITreeSync treeSync,
IStateReader stateReader,
[KeyFilter(DbNames.Code)] IDb codeDb,
IProcessExitSource exitSource,
ILogManager logManager) : IStartable
{
private readonly ILogger _logger = logManager.GetClassLogger<VerifyStateOnStateSyncFinished>();

public void Start()
{
treeSync.SyncCompleted += TreeSyncOnOnVerifyPostSyncCleanup;
}

private void TreeSyncOnOnVerifyPostSyncCleanup(object? sender, ITreeSync.SyncCompletedEventArgs evt)
{
ManualResetEvent processingBlocker = new ManualResetEvent(false);

processingQueue.BlockRemoved += ProcessingQueueOnBlockRemoved;

try
{
Hash256 rootNode = evt.Root;
_logger!.Info("Collecting trie stats and verifying that no nodes are missing...");
TrieStats stats = stateReader.CollectStats(rootNode, codeDb, logManager, exitSource.Token);
if (stats.MissingNodes > 0)
{
_logger.Error($"Missing node found!");
}
_logger.Info($"Stats after finishing state \n" + stats);
}
catch (Exception e)
{
_logger.Error($"Error in verify trie", e);
}
finally
{
processingBlocker.Set();
processingQueue.BlockRemoved -= ProcessingQueueOnBlockRemoved;
}

return;

void ProcessingQueueOnBlockRemoved(object? o, BlockRemovedEventArgs blockRemovedEventArgs)
{
processingBlocker.WaitOne();
}
}
}
10 changes: 9 additions & 1 deletion src/Nethermind/Nethermind.Synchronization/Synchronizer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -380,12 +380,20 @@ private void ConfigureBodiesSyncComponent(ContainerBuilder serviceCollection)
private void ConfigureStateSyncComponent(ContainerBuilder serviceCollection)
{
serviceCollection
.AddSingleton<TreeSync>();
.AddSingleton<ITreeSync, TreeSync>();

ConfigureSingletonSyncFeed<StateSyncBatch, StateSyncFeed, StateSyncDownloader, StateSyncAllocationStrategyFactory>(serviceCollection);

// Disable it by setting noop
if (!syncConfig.FastSync) serviceCollection.AddSingleton<ISyncFeed<StateSyncBatch>, NoopSyncFeed<StateSyncBatch>>();

if (syncConfig.FastSync && syncConfig.VerifyTrieOnStateSyncFinished)
{
serviceCollection
.RegisterType<VerifyStateOnStateSyncFinished>()
.WithAttributeFiltering()
.As<IStartable>();
}
}

private static void ConfigureSingletonSyncFeed<TBatch, TFeed, TDownloader, TAllocationStrategy>(ContainerBuilder serviceCollection) where TFeed : class, ISyncFeed<TBatch> where TDownloader : class, ISyncDownloader<TBatch> where TAllocationStrategy : class, IPeerAllocationStrategyFactory<TBatch>
Expand Down
Loading

0 comments on commit 4eb7f09

Please sign in to comment.