diff --git a/ImportBlocks/BlockImporter.cs b/ImportBlocks/BlockImporter.cs new file mode 100644 index 000000000..8d5249c20 --- /dev/null +++ b/ImportBlocks/BlockImporter.cs @@ -0,0 +1,129 @@ +using Akka.Actor; +using Neo.IO; +using Neo.Ledger; +using Neo.Network.P2P.Payloads; +using System; +using System.Collections.Generic; +using System.IO; +using System.IO.Compression; +using System.Linq; +using System.Text.RegularExpressions; + +namespace Neo.Plugins +{ + public class BlockImporter : UntypedActor + { + public class StartImport { public IActorRef BlockchainActorRef; public Action OnComplete; } + + private const int BlocksPerBatch = 10; + private IActorRef _blockchainActorRef; + private bool isImporting; + private IEnumerator blocksBeingImported; + private Action _doneAction; + + private static bool CheckMaxOnImportHeight(uint currentImportBlockHeight) + { + if (Settings.Default.MaxOnImportHeight == 0 || Settings.Default.MaxOnImportHeight >= currentImportBlockHeight) + return true; + return false; + } + + private static IEnumerable GetBlocks(Stream stream, bool read_start = false) + { + using (BinaryReader r = new BinaryReader(stream)) + { + uint start = read_start ? r.ReadUInt32() : 0; + uint count = r.ReadUInt32(); + uint end = start + count - 1; + if (end <= Blockchain.Singleton.Height) yield break; + for (uint height = start; height <= end; height++) + { + byte[] array = r.ReadBytes(r.ReadInt32()); + if (!CheckMaxOnImportHeight(height)) yield break; + if (height > Blockchain.Singleton.Height) + { + Block block = array.AsSerializable(); + yield return block; + } + } + } + } + + private IEnumerable GetBlocksFromFile() + { + const string pathAcc = "chain.acc"; + if (File.Exists(pathAcc)) + using (FileStream fs = new FileStream(pathAcc, FileMode.Open, FileAccess.Read, FileShare.Read)) + foreach (var block in GetBlocks(fs)) + yield return block; + + const string pathAccZip = pathAcc + ".zip"; + if (File.Exists(pathAccZip)) + using (FileStream fs = new FileStream(pathAccZip, FileMode.Open, FileAccess.Read, FileShare.Read)) + using (ZipArchive zip = new ZipArchive(fs, ZipArchiveMode.Read)) + using (Stream zs = zip.GetEntry(pathAcc).Open()) + foreach (var block in GetBlocks(zs)) + yield return block; + + var paths = Directory.EnumerateFiles(".", "chain.*.acc", SearchOption.TopDirectoryOnly).Concat(Directory.EnumerateFiles(".", "chain.*.acc.zip", SearchOption.TopDirectoryOnly)).Select(p => new + { + FileName = Path.GetFileName(p), + Start = uint.Parse(Regex.Match(p, @"\d+").Value), + IsCompressed = p.EndsWith(".zip") + }).OrderBy(p => p.Start); + + foreach (var path in paths) + { + if (path.Start > Blockchain.Singleton.Height + 1) break; + if (path.IsCompressed) + using (FileStream fs = new FileStream(path.FileName, FileMode.Open, FileAccess.Read, FileShare.Read)) + using (ZipArchive zip = new ZipArchive(fs, ZipArchiveMode.Read)) + using (Stream zs = zip.GetEntry(Path.GetFileNameWithoutExtension(path.FileName)).Open()) + foreach (var block in GetBlocks(zs, true)) + yield return block; + else + using (FileStream fs = new FileStream(path.FileName, FileMode.Open, FileAccess.Read, FileShare.Read)) + foreach (var block in GetBlocks(fs, true)) + yield return block; + } + } + + protected override void OnReceive(object message) + { + switch (message) + { + case StartImport startImport: + if (isImporting) return; + isImporting = true; + _blockchainActorRef = startImport.BlockchainActorRef; + _doneAction = startImport.OnComplete; + blocksBeingImported = GetBlocksFromFile().GetEnumerator(); + // Start the first import + Self.Tell(new Blockchain.ImportCompleted()); + break; + case Blockchain.ImportCompleted _: + // Import the next batch + List blocksToImport = new List(); + for (int i = 0; i < BlocksPerBatch; i++) + { + if (!blocksBeingImported.MoveNext()) + break; + blocksToImport.Add(blocksBeingImported.Current); + } + if (blocksToImport.Count > 0) + _blockchainActorRef.Tell(new Blockchain.Import { Blocks = blocksToImport }); + else + { + blocksBeingImported.Dispose(); + _doneAction(); + } + break; + } + } + + public static Props Props() + { + return Akka.Actor.Props.Create(() => new BlockImporter()); + } + } +} diff --git a/ImportBlocks/ImportBlocks.cs b/ImportBlocks/ImportBlocks.cs index 8b5f00aa8..8c7d37cd6 100644 --- a/ImportBlocks/ImportBlocks.cs +++ b/ImportBlocks/ImportBlocks.cs @@ -4,49 +4,19 @@ using Neo.Network.P2P.Payloads; using Neo.Persistence; using System; -using System.Collections.Generic; using System.IO; -using System.IO.Compression; -using System.Linq; -using System.Text.RegularExpressions; namespace Neo.Plugins { public class ImportBlocks : Plugin { - private static bool CheckMaxOnImportHeight(uint currentImportBlockHeight) - { - if (Settings.Default.MaxOnImportHeight == 0 || Settings.Default.MaxOnImportHeight >= currentImportBlockHeight) - return true; - return false; - } + private IActorRef _blockImporter; public override void Configure() { Settings.Load(GetConfiguration()); } - private static IEnumerable GetBlocks(Stream stream, bool read_start = false) - { - using (BinaryReader r = new BinaryReader(stream)) - { - uint start = read_start ? r.ReadUInt32() : 0; - uint count = r.ReadUInt32(); - uint end = start + count - 1; - if (end <= Blockchain.Singleton.Height) yield break; - for (uint height = start; height <= end; height++) - { - byte[] array = r.ReadBytes(r.ReadInt32()); - if (!CheckMaxOnImportHeight(height)) yield break; - if (height > Blockchain.Singleton.Height) - { - Block block = array.AsSerializable(); - yield return block; - } - } - } - } - private bool OnExport(string[] args) { if (args.Length < 2) return false; @@ -133,50 +103,17 @@ private bool OnHelp(string[] args) return true; } - protected override async void OnPluginsLoaded() + private void OnImportComplete() { - SuspendNodeStartup(); - const string path_acc = "chain.acc"; - if (File.Exists(path_acc)) - using (FileStream fs = new FileStream(path_acc, FileMode.Open, FileAccess.Read, FileShare.Read)) - await System.Blockchain.Ask(new Blockchain.Import - { - Blocks = GetBlocks(fs) - }); - const string path_acc_zip = path_acc + ".zip"; - if (File.Exists(path_acc_zip)) - using (FileStream fs = new FileStream(path_acc_zip, FileMode.Open, FileAccess.Read, FileShare.Read)) - using (ZipArchive zip = new ZipArchive(fs, ZipArchiveMode.Read)) - using (Stream zs = zip.GetEntry(path_acc).Open()) - await System.Blockchain.Ask(new Blockchain.Import - { - Blocks = GetBlocks(zs) - }); - var paths = Directory.EnumerateFiles(".", "chain.*.acc", SearchOption.TopDirectoryOnly).Concat(Directory.EnumerateFiles(".", "chain.*.acc.zip", SearchOption.TopDirectoryOnly)).Select(p => new - { - FileName = Path.GetFileName(p), - Start = uint.Parse(Regex.Match(p, @"\d+").Value), - IsCompressed = p.EndsWith(".zip") - }).OrderBy(p => p.Start); - foreach (var path in paths) - { - if (path.Start > Blockchain.Singleton.Height + 1) break; - if (path.IsCompressed) - using (FileStream fs = new FileStream(path.FileName, FileMode.Open, FileAccess.Read, FileShare.Read)) - using (ZipArchive zip = new ZipArchive(fs, ZipArchiveMode.Read)) - using (Stream zs = zip.GetEntry(Path.GetFileNameWithoutExtension(path.FileName)).Open()) - await System.Blockchain.Ask(new Blockchain.Import - { - Blocks = GetBlocks(zs, true) - }); - else - using (FileStream fs = new FileStream(path.FileName, FileMode.Open, FileAccess.Read, FileShare.Read)) - await System.Blockchain.Ask(new Blockchain.Import - { - Blocks = GetBlocks(fs, true) - }); - } ResumeNodeStartup(); + System.ActorSystem.Stop(_blockImporter); + } + + protected override void OnPluginsLoaded() + { + SuspendNodeStartup(); + _blockImporter = System.ActorSystem.ActorOf(BlockImporter.Props()); + _blockImporter.Tell(new BlockImporter.StartImport { BlockchainActorRef = System.Blockchain, OnComplete = OnImportComplete }); } protected override bool OnMessage(object message)