From da8a9c667468f21420010e187b593feea5c9d9bc Mon Sep 17 00:00:00 2001 From: jsolman Date: Thu, 14 Mar 2019 15:01:54 -0700 Subject: [PATCH 1/9] Properly stop importing blocks on system shutdown. --- ImportBlocks/BlockImporter.cs | 127 +++++++++++++++++++++++++++++++ ImportBlocks/ImportBlocks.cs | 81 +++----------------- ImportBlocks/ImportBlocks.csproj | 2 +- 3 files changed, 137 insertions(+), 73 deletions(-) create mode 100644 ImportBlocks/BlockImporter.cs diff --git a/ImportBlocks/BlockImporter.cs b/ImportBlocks/BlockImporter.cs new file mode 100644 index 000000000..3eaf7b399 --- /dev/null +++ b/ImportBlocks/BlockImporter.cs @@ -0,0 +1,127 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.IO.Compression; +using System.Linq; +using System.Text.RegularExpressions; +using Akka; +using Akka.Actor; +using Neo.IO; +using Neo.Ledger; +using Neo.Network.P2P; +using Neo.Network.P2P.Payloads; + +namespace Neo.Plugins +{ + public class BlockImporter : UntypedActor + { + public class StartImport { public IActorRef BlockchainActorRef; public Action OnComplete; } + private IActorRef _blockchainActorRef; + private const int BlocksPerBatch = 10; + private bool isImporting; + private IEnumerator blocksBeingImported; + private Action _doneAction; + + protected override void OnReceive(object message) + { + switch (message) + { + case StartImport startImport: + if (isImporting) return; + isImporting = true; + _blockchainActorRef = startImport.BlockchainActorRef; + _doneAction = startImport.OnComplete; + blocksBeingImported = GetBlocksFromFile().GetEnumerator(); + // Start the first import + Self.Tell(new Blockchain.ImportCompleted()); + break; + case Blockchain.ImportCompleted _: + // Import the next batch + List blocksToImport = new List(); + for (int i = 0; i < BlocksPerBatch; i++) + { + if (!blocksBeingImported.MoveNext()) + break; + blocksToImport.Add(blocksBeingImported.Current); + } + if (blocksToImport.Count > 0) + _blockchainActorRef.Tell(new Blockchain.Import { Blocks = blocksToImport }); + else + _doneAction(); + break; + } + } + + private static bool CheckMaxOnImportHeight(uint currentImportBlockHeight) + { + if (Settings.Default.MaxOnImportHeight == 0 || Settings.Default.MaxOnImportHeight >= currentImportBlockHeight) + return true; + return false; + } + + private static IEnumerable GetBlocks(Stream stream, bool read_start = false) + { + using (BinaryReader r = new BinaryReader(stream)) + { + uint start = read_start ? r.ReadUInt32() : 0; + uint count = r.ReadUInt32(); + uint end = start + count - 1; + if (end <= Blockchain.Singleton.Height) yield break; + for (uint height = start; height <= end; height++) + { + byte[] array = r.ReadBytes(r.ReadInt32()); + if (!CheckMaxOnImportHeight(height)) yield break; + if (height > Blockchain.Singleton.Height) + { + Block block = array.AsSerializable(); + yield return block; + } + } + } + } + + private IEnumerable GetBlocksFromFile() + { + const string pathAcc = "chain.acc"; + if (File.Exists(pathAcc)) + using (FileStream fs = new FileStream(pathAcc, FileMode.Open, FileAccess.Read, FileShare.Read)) + foreach (var block in GetBlocks(fs)) + yield return block; + + const string pathAccZip = pathAcc + ".zip"; + if (File.Exists(pathAccZip)) + using (FileStream fs = new FileStream(pathAccZip, FileMode.Open, FileAccess.Read, FileShare.Read)) + using (ZipArchive zip = new ZipArchive(fs, ZipArchiveMode.Read)) + using (Stream zs = zip.GetEntry(pathAcc).Open()) + foreach (var block in GetBlocks(zs)) + yield return block; + + var paths = Directory.EnumerateFiles(".", "chain.*.acc", SearchOption.TopDirectoryOnly).Concat(Directory.EnumerateFiles(".", "chain.*.acc.zip", SearchOption.TopDirectoryOnly)).Select(p => new + { + FileName = Path.GetFileName(p), + Start = uint.Parse(Regex.Match(p, @"\d+").Value), + IsCompressed = p.EndsWith(".zip") + }).OrderBy(p => p.Start); + + foreach (var path in paths) + { + if (path.Start > Blockchain.Singleton.Height + 1) break; + if (path.IsCompressed) + using (FileStream fs = new FileStream(path.FileName, FileMode.Open, FileAccess.Read, FileShare.Read)) + using (ZipArchive zip = new ZipArchive(fs, ZipArchiveMode.Read)) + using (Stream zs = zip.GetEntry(Path.GetFileNameWithoutExtension(path.FileName)).Open()) + foreach (var block in GetBlocks(zs, true)) + yield return block; + else + using (FileStream fs = new FileStream(path.FileName, FileMode.Open, FileAccess.Read, FileShare.Read)) + foreach (var block in GetBlocks(fs, true)) + yield return block; + } + } + + public static Props Props() + { + return Akka.Actor.Props.Create(() => new BlockImporter()); + } + } +} \ No newline at end of file diff --git a/ImportBlocks/ImportBlocks.cs b/ImportBlocks/ImportBlocks.cs index 31c38b606..3ac1d75ab 100644 --- a/ImportBlocks/ImportBlocks.cs +++ b/ImportBlocks/ImportBlocks.cs @@ -14,42 +14,25 @@ namespace Neo.Plugins { public class ImportBlocks : Plugin { + private IActorRef _blockImporter; public ImportBlocks() { - OnImport(); + _blockImporter = System.ActorSystem.ActorOf(BlockImporter.Props()); } - private static bool CheckMaxOnImportHeight(uint currentImportBlockHeight) + private void OnImportComplete() { - if (Settings.Default.MaxOnImportHeight == 0 || Settings.Default.MaxOnImportHeight >= currentImportBlockHeight) - return true; - return false; + ResumeNodeStartup(); } - - public override void Configure() + protected override void OnPluginsLoaded() { - Settings.Load(GetConfiguration()); + SuspendNodeStartup(); + _blockImporter.Tell(new BlockImporter.StartImport { BlockchainActorRef = System.Blockchain, OnComplete = OnImportComplete}); } - private static IEnumerable GetBlocks(Stream stream, bool read_start = false) + public override void Configure() { - using (BinaryReader r = new BinaryReader(stream)) - { - uint start = read_start ? r.ReadUInt32() : 0; - uint count = r.ReadUInt32(); - uint end = start + count - 1; - if (end <= Blockchain.Singleton.Height) yield break; - for (uint height = start; height <= end; height++) - { - byte[] array = r.ReadBytes(r.ReadInt32()); - if (!CheckMaxOnImportHeight(height)) yield break; - if (height > Blockchain.Singleton.Height) - { - Block block = array.AsSerializable(); - yield return block; - } - } - } + Settings.Load(GetConfiguration()); } private bool OnExport(string[] args) @@ -138,52 +121,6 @@ private bool OnHelp(string[] args) return true; } - private async void OnImport() - { - SuspendNodeStartup(); - const string path_acc = "chain.acc"; - if (File.Exists(path_acc)) - using (FileStream fs = new FileStream(path_acc, FileMode.Open, FileAccess.Read, FileShare.Read)) - await System.Blockchain.Ask(new Blockchain.Import - { - Blocks = GetBlocks(fs) - }); - const string path_acc_zip = path_acc + ".zip"; - if (File.Exists(path_acc_zip)) - using (FileStream fs = new FileStream(path_acc_zip, FileMode.Open, FileAccess.Read, FileShare.Read)) - using (ZipArchive zip = new ZipArchive(fs, ZipArchiveMode.Read)) - using (Stream zs = zip.GetEntry(path_acc).Open()) - await System.Blockchain.Ask(new Blockchain.Import - { - Blocks = GetBlocks(zs) - }); - var paths = Directory.EnumerateFiles(".", "chain.*.acc", SearchOption.TopDirectoryOnly).Concat(Directory.EnumerateFiles(".", "chain.*.acc.zip", SearchOption.TopDirectoryOnly)).Select(p => new - { - FileName = Path.GetFileName(p), - Start = uint.Parse(Regex.Match(p, @"\d+").Value), - IsCompressed = p.EndsWith(".zip") - }).OrderBy(p => p.Start); - foreach (var path in paths) - { - if (path.Start > Blockchain.Singleton.Height + 1) break; - if (path.IsCompressed) - using (FileStream fs = new FileStream(path.FileName, FileMode.Open, FileAccess.Read, FileShare.Read)) - using (ZipArchive zip = new ZipArchive(fs, ZipArchiveMode.Read)) - using (Stream zs = zip.GetEntry(Path.GetFileNameWithoutExtension(path.FileName)).Open()) - await System.Blockchain.Ask(new Blockchain.Import - { - Blocks = GetBlocks(zs, true) - }); - else - using (FileStream fs = new FileStream(path.FileName, FileMode.Open, FileAccess.Read, FileShare.Read)) - await System.Blockchain.Ask(new Blockchain.Import - { - Blocks = GetBlocks(fs, true) - }); - } - ResumeNodeStartup(); - } - protected override bool OnMessage(object message) { if (!(message is string[] args)) return false; diff --git a/ImportBlocks/ImportBlocks.csproj b/ImportBlocks/ImportBlocks.csproj index 174fe8e08..fb56398a1 100644 --- a/ImportBlocks/ImportBlocks.csproj +++ b/ImportBlocks/ImportBlocks.csproj @@ -14,7 +14,7 @@ - + From 130713ca3358ef4046ee050687beb3525d1c887d Mon Sep 17 00:00:00 2001 From: jsolman Date: Fri, 15 Mar 2019 19:35:58 -0700 Subject: [PATCH 2/9] Minor clean-up. --- ImportBlocks/ImportBlocks.cs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/ImportBlocks/ImportBlocks.cs b/ImportBlocks/ImportBlocks.cs index 3ac1d75ab..dc8f90575 100644 --- a/ImportBlocks/ImportBlocks.cs +++ b/ImportBlocks/ImportBlocks.cs @@ -20,19 +20,20 @@ public ImportBlocks() _blockImporter = System.ActorSystem.ActorOf(BlockImporter.Props()); } - private void OnImportComplete() + public override void Configure() { - ResumeNodeStartup(); + Settings.Load(GetConfiguration()); } + protected override void OnPluginsLoaded() { SuspendNodeStartup(); _blockImporter.Tell(new BlockImporter.StartImport { BlockchainActorRef = System.Blockchain, OnComplete = OnImportComplete}); } - public override void Configure() + private void OnImportComplete() { - Settings.Load(GetConfiguration()); + ResumeNodeStartup(); } private bool OnExport(string[] args) From 00e1bc9261fb48ed1cc13be8e6139ad447e5cfc3 Mon Sep 17 00:00:00 2001 From: jsolman Date: Fri, 15 Mar 2019 19:52:09 -0700 Subject: [PATCH 3/9] Remove unused using statements. --- ImportBlocks/ImportBlocks.cs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ImportBlocks/ImportBlocks.cs b/ImportBlocks/ImportBlocks.cs index dc8f90575..7b463f0a6 100644 --- a/ImportBlocks/ImportBlocks.cs +++ b/ImportBlocks/ImportBlocks.cs @@ -4,11 +4,7 @@ using Neo.Network.P2P.Payloads; using Neo.Persistence; using System; -using System.Collections.Generic; using System.IO; -using System.IO.Compression; -using System.Linq; -using System.Text.RegularExpressions; namespace Neo.Plugins { From db39fb6d3f92de01fe69a10040bb6c9d03b1cd64 Mon Sep 17 00:00:00 2001 From: jsolman Date: Fri, 15 Mar 2019 19:53:14 -0700 Subject: [PATCH 4/9] Remove unused using statements. --- ImportBlocks/BlockImporter.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/ImportBlocks/BlockImporter.cs b/ImportBlocks/BlockImporter.cs index 3eaf7b399..f3d70133e 100644 --- a/ImportBlocks/BlockImporter.cs +++ b/ImportBlocks/BlockImporter.cs @@ -4,11 +4,9 @@ using System.IO.Compression; using System.Linq; using System.Text.RegularExpressions; -using Akka; using Akka.Actor; using Neo.IO; using Neo.Ledger; -using Neo.Network.P2P; using Neo.Network.P2P.Payloads; namespace Neo.Plugins From 9d6cf062d10aef3a4ced92701d5220ff64c26de1 Mon Sep 17 00:00:00 2001 From: jsolman Date: Thu, 21 Mar 2019 15:57:47 -0700 Subject: [PATCH 5/9] Fix whitespace in ImportBlocks.csproj --- ImportBlocks/ImportBlocks.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ImportBlocks/ImportBlocks.csproj b/ImportBlocks/ImportBlocks.csproj index fb56398a1..c1aaff668 100644 --- a/ImportBlocks/ImportBlocks.csproj +++ b/ImportBlocks/ImportBlocks.csproj @@ -5,7 +5,7 @@ netstandard2.0 Neo.Plugins - + PreserveNewest From 403cf50d39332368b9cc7d0109b52eb492e76e04 Mon Sep 17 00:00:00 2001 From: jsolman Date: Tue, 2 Apr 2019 21:56:41 -0700 Subject: [PATCH 6/9] No need to create actor until plugins are loaded. --- ImportBlocks/ImportBlocks.cs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/ImportBlocks/ImportBlocks.cs b/ImportBlocks/ImportBlocks.cs index 7b463f0a6..ff5c489ca 100644 --- a/ImportBlocks/ImportBlocks.cs +++ b/ImportBlocks/ImportBlocks.cs @@ -11,11 +11,6 @@ namespace Neo.Plugins public class ImportBlocks : Plugin { private IActorRef _blockImporter; - public ImportBlocks() - { - _blockImporter = System.ActorSystem.ActorOf(BlockImporter.Props()); - } - public override void Configure() { Settings.Load(GetConfiguration()); @@ -23,6 +18,7 @@ public override void Configure() protected override void OnPluginsLoaded() { + _blockImporter = System.ActorSystem.ActorOf(BlockImporter.Props()); SuspendNodeStartup(); _blockImporter.Tell(new BlockImporter.StartImport { BlockchainActorRef = System.Blockchain, OnComplete = OnImportComplete}); } From 95c6b2c8c64b96850fca250bd230399726328879 Mon Sep 17 00:00:00 2001 From: jsolman Date: Sun, 7 Apr 2019 02:05:05 -0700 Subject: [PATCH 7/9] Dispose enumerator after import finishes. --- ImportBlocks/BlockImporter.cs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ImportBlocks/BlockImporter.cs b/ImportBlocks/BlockImporter.cs index f3d70133e..faa22e6d8 100644 --- a/ImportBlocks/BlockImporter.cs +++ b/ImportBlocks/BlockImporter.cs @@ -45,7 +45,10 @@ protected override void OnReceive(object message) if (blocksToImport.Count > 0) _blockchainActorRef.Tell(new Blockchain.Import { Blocks = blocksToImport }); else + { + blocksBeingImported.Dispose(); _doneAction(); + } break; } } @@ -122,4 +125,4 @@ public static Props Props() return Akka.Actor.Props.Create(() => new BlockImporter()); } } -} \ No newline at end of file +} From 1e7b947639273275b54868154ac4ef8bf7b64030 Mon Sep 17 00:00:00 2001 From: erikzhang Date: Sun, 7 Apr 2019 21:19:57 +0800 Subject: [PATCH 8/9] format --- ImportBlocks/BlockImporter.cs | 77 ++++++++++++++++---------------- ImportBlocks/ImportBlocks.cs | 25 ++++++----- ImportBlocks/ImportBlocks.csproj | 2 +- 3 files changed, 53 insertions(+), 51 deletions(-) diff --git a/ImportBlocks/BlockImporter.cs b/ImportBlocks/BlockImporter.cs index faa22e6d8..8d5249c20 100644 --- a/ImportBlocks/BlockImporter.cs +++ b/ImportBlocks/BlockImporter.cs @@ -1,58 +1,26 @@ +using Akka.Actor; +using Neo.IO; +using Neo.Ledger; +using Neo.Network.P2P.Payloads; using System; using System.Collections.Generic; using System.IO; using System.IO.Compression; using System.Linq; using System.Text.RegularExpressions; -using Akka.Actor; -using Neo.IO; -using Neo.Ledger; -using Neo.Network.P2P.Payloads; namespace Neo.Plugins { public class BlockImporter : UntypedActor { public class StartImport { public IActorRef BlockchainActorRef; public Action OnComplete; } - private IActorRef _blockchainActorRef; + private const int BlocksPerBatch = 10; + private IActorRef _blockchainActorRef; private bool isImporting; private IEnumerator blocksBeingImported; private Action _doneAction; - protected override void OnReceive(object message) - { - switch (message) - { - case StartImport startImport: - if (isImporting) return; - isImporting = true; - _blockchainActorRef = startImport.BlockchainActorRef; - _doneAction = startImport.OnComplete; - blocksBeingImported = GetBlocksFromFile().GetEnumerator(); - // Start the first import - Self.Tell(new Blockchain.ImportCompleted()); - break; - case Blockchain.ImportCompleted _: - // Import the next batch - List blocksToImport = new List(); - for (int i = 0; i < BlocksPerBatch; i++) - { - if (!blocksBeingImported.MoveNext()) - break; - blocksToImport.Add(blocksBeingImported.Current); - } - if (blocksToImport.Count > 0) - _blockchainActorRef.Tell(new Blockchain.Import { Blocks = blocksToImport }); - else - { - blocksBeingImported.Dispose(); - _doneAction(); - } - break; - } - } - private static bool CheckMaxOnImportHeight(uint currentImportBlockHeight) { if (Settings.Default.MaxOnImportHeight == 0 || Settings.Default.MaxOnImportHeight >= currentImportBlockHeight) @@ -120,6 +88,39 @@ private IEnumerable GetBlocksFromFile() } } + protected override void OnReceive(object message) + { + switch (message) + { + case StartImport startImport: + if (isImporting) return; + isImporting = true; + _blockchainActorRef = startImport.BlockchainActorRef; + _doneAction = startImport.OnComplete; + blocksBeingImported = GetBlocksFromFile().GetEnumerator(); + // Start the first import + Self.Tell(new Blockchain.ImportCompleted()); + break; + case Blockchain.ImportCompleted _: + // Import the next batch + List blocksToImport = new List(); + for (int i = 0; i < BlocksPerBatch; i++) + { + if (!blocksBeingImported.MoveNext()) + break; + blocksToImport.Add(blocksBeingImported.Current); + } + if (blocksToImport.Count > 0) + _blockchainActorRef.Tell(new Blockchain.Import { Blocks = blocksToImport }); + else + { + blocksBeingImported.Dispose(); + _doneAction(); + } + break; + } + } + public static Props Props() { return Akka.Actor.Props.Create(() => new BlockImporter()); diff --git a/ImportBlocks/ImportBlocks.cs b/ImportBlocks/ImportBlocks.cs index ff5c489ca..a44443594 100644 --- a/ImportBlocks/ImportBlocks.cs +++ b/ImportBlocks/ImportBlocks.cs @@ -11,23 +11,12 @@ namespace Neo.Plugins public class ImportBlocks : Plugin { private IActorRef _blockImporter; + public override void Configure() { Settings.Load(GetConfiguration()); } - protected override void OnPluginsLoaded() - { - _blockImporter = System.ActorSystem.ActorOf(BlockImporter.Props()); - SuspendNodeStartup(); - _blockImporter.Tell(new BlockImporter.StartImport { BlockchainActorRef = System.Blockchain, OnComplete = OnImportComplete}); - } - - private void OnImportComplete() - { - ResumeNodeStartup(); - } - private bool OnExport(string[] args) { if (args.Length < 2) return false; @@ -114,6 +103,18 @@ private bool OnHelp(string[] args) return true; } + private void OnImportComplete() + { + ResumeNodeStartup(); + } + + protected override void OnPluginsLoaded() + { + SuspendNodeStartup(); + _blockImporter = System.ActorSystem.ActorOf(BlockImporter.Props()); + _blockImporter.Tell(new BlockImporter.StartImport { BlockchainActorRef = System.Blockchain, OnComplete = OnImportComplete }); + } + protected override bool OnMessage(object message) { if (!(message is string[] args)) return false; diff --git a/ImportBlocks/ImportBlocks.csproj b/ImportBlocks/ImportBlocks.csproj index e8b1d5f08..d064bfbb7 100644 --- a/ImportBlocks/ImportBlocks.csproj +++ b/ImportBlocks/ImportBlocks.csproj @@ -5,7 +5,7 @@ netstandard2.0 Neo.Plugins - + PreserveNewest From 65cc9a80ddcd5b4592ba058188fb7c3e0904e1d0 Mon Sep 17 00:00:00 2001 From: erikzhang Date: Sun, 7 Apr 2019 21:23:28 +0800 Subject: [PATCH 9/9] Stop `_blockImporter` when import complete. --- ImportBlocks/ImportBlocks.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/ImportBlocks/ImportBlocks.cs b/ImportBlocks/ImportBlocks.cs index a44443594..8c7d37cd6 100644 --- a/ImportBlocks/ImportBlocks.cs +++ b/ImportBlocks/ImportBlocks.cs @@ -106,6 +106,7 @@ private bool OnHelp(string[] args) private void OnImportComplete() { ResumeNodeStartup(); + System.ActorSystem.Stop(_blockImporter); } protected override void OnPluginsLoaded()