diff --git a/src/neo/NeoSystem.cs b/src/neo/NeoSystem.cs index b837426d45..301df2c5d6 100644 --- a/src/neo/NeoSystem.cs +++ b/src/neo/NeoSystem.cs @@ -16,7 +16,6 @@ public class NeoSystem : IDisposable $"blockchain-mailbox {{ mailbox-type: \"{typeof(BlockchainMailbox).AssemblyQualifiedName}\" }}" + $"task-manager-mailbox {{ mailbox-type: \"{typeof(TaskManagerMailbox).AssemblyQualifiedName}\" }}" + $"remote-node-mailbox {{ mailbox-type: \"{typeof(RemoteNodeMailbox).AssemblyQualifiedName}\" }}" + - $"protocol-handler-mailbox {{ mailbox-type: \"{typeof(ProtocolHandlerMailbox).AssemblyQualifiedName}\" }}" + $"consensus-service-mailbox {{ mailbox-type: \"{typeof(ConsensusServiceMailbox).AssemblyQualifiedName}\" }}"); public IActorRef Blockchain { get; } public IActorRef LocalNode { get; } diff --git a/src/neo/Network/P2P/Connection.cs b/src/neo/Network/P2P/Connection.cs index 7dff7e3a02..b12b2723a5 100644 --- a/src/neo/Network/P2P/Connection.cs +++ b/src/neo/Network/P2P/Connection.cs @@ -9,7 +9,7 @@ namespace Neo.Network.P2P { public abstract class Connection : UntypedActor { - internal class Timer { public static Timer Instance = new Timer(); } + internal class Close { public bool Abort; } internal class Ack : Tcp.Event { public static Ack Instance = new Ack(); } /// @@ -32,7 +32,7 @@ protected Connection(object connection, IPEndPoint remote, IPEndPoint local) { this.Remote = remote; this.Local = local; - this.timer = Context.System.Scheduler.ScheduleTellOnceCancelable(TimeSpan.FromSeconds(connectionTimeoutLimitStart), Self, Timer.Instance, ActorRefs.NoSender); + this.timer = Context.System.Scheduler.ScheduleTellOnceCancelable(TimeSpan.FromSeconds(connectionTimeoutLimitStart), Self, new Close { Abort = true }, ActorRefs.NoSender); switch (connection) { case IActorRef tcp: @@ -89,8 +89,8 @@ protected override void OnReceive(object message) { switch (message) { - case Timer _: - Disconnect(true); + case Close close: + Disconnect(close.Abort); break; case Ack _: OnAck(); @@ -107,7 +107,7 @@ protected override void OnReceive(object message) private void OnReceived(ByteString data) { timer.CancelIfNotNull(); - timer = Context.System.Scheduler.ScheduleTellOnceCancelable(TimeSpan.FromSeconds(connectionTimeoutLimit), Self, Timer.Instance, ActorRefs.NoSender); + timer = Context.System.Scheduler.ScheduleTellOnceCancelable(TimeSpan.FromSeconds(connectionTimeoutLimit), Self, new Close { Abort = true }, ActorRefs.NoSender); try { OnData(data); diff --git a/src/neo/Network/P2P/ProtocolHandler.cs b/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs similarity index 74% rename from src/neo/Network/P2P/ProtocolHandler.cs rename to src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs index a97cb46922..91b3921326 100644 --- a/src/neo/Network/P2P/ProtocolHandler.cs +++ b/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs @@ -1,10 +1,8 @@ using Akka.Actor; -using Akka.Configuration; using Neo.Cryptography; -using Neo.IO; -using Neo.IO.Actors; using Neo.IO.Caching; using Neo.Ledger; +using Neo.Network.P2P.Capabilities; using Neo.Network.P2P.Payloads; using Neo.Persistence; using Neo.Plugins; @@ -17,11 +15,9 @@ namespace Neo.Network.P2P { - internal class ProtocolHandler : UntypedActor + partial class RemoteNode { - public class SetFilter { public BloomFilter Filter; } - internal class Timer { } - + private class Timer { } private class PendingKnownHashesCollection : KeyedCollection { protected override UInt256 GetKeyForItem((UInt256, DateTime) item) @@ -30,11 +26,9 @@ protected override UInt256 GetKeyForItem((UInt256, DateTime) item) } } - private readonly NeoSystem system; - private readonly PendingKnownHashesCollection pendingKnownHashes; - private readonly HashSetCache knownHashes; - private readonly HashSetCache sentHashes; - private VersionPayload version; + private readonly PendingKnownHashesCollection pendingKnownHashes = new PendingKnownHashesCollection(); + private readonly HashSetCache knownHashes = new HashSetCache(Blockchain.Singleton.MemPool.Capacity * 2 / 5); + private readonly HashSetCache sentHashes = new HashSetCache(Blockchain.Singleton.MemPool.Capacity * 2 / 5); private bool verack = false; private BloomFilter bloom_filter; @@ -43,33 +37,12 @@ protected override UInt256 GetKeyForItem((UInt256, DateTime) item) private readonly ICancelable timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimerInterval, TimerInterval, Context.Self, new Timer(), ActorRefs.NoSender); - public ProtocolHandler(NeoSystem system) - { - this.system = system; - this.pendingKnownHashes = new PendingKnownHashesCollection(); - this.knownHashes = new HashSetCache(Blockchain.Singleton.MemPool.Capacity * 2 / 5); - this.sentHashes = new HashSetCache(Blockchain.Singleton.MemPool.Capacity * 2 / 5); - } - - protected override void OnReceive(object message) - { - switch (message) - { - case Message msg: - OnMessage(msg); - break; - case Timer _: - OnTimer(); - break; - } - } - private void OnMessage(Message msg) { foreach (IP2PPlugin plugin in Plugin.P2PPlugins) if (!plugin.OnP2PMessage(msg)) return; - if (version == null) + if (Version == null) { if (msg.Command != MessageCommand.Version) throw new ProtocolViolationException(); @@ -158,20 +131,17 @@ private void OnAddrMessageReceived(AddrPayload payload) private void OnFilterAddMessageReceived(FilterAddPayload payload) { - if (bloom_filter != null) - bloom_filter.Add(payload.Data); + bloom_filter?.Add(payload.Data); } private void OnFilterClearMessageReceived() { bloom_filter = null; - Context.Parent.Tell(new SetFilter { Filter = null }); } private void OnFilterLoadMessageReceived(FilterLoadPayload payload) { bloom_filter = new BloomFilter(payload.Filter.Length * 8, payload.K, payload.Tweak, payload.Filter); - Context.Parent.Tell(new SetFilter { Filter = bloom_filter }); } /// @@ -189,7 +159,7 @@ private void OnGetAddrMessageReceived() .Take(AddrPayload.MaxCountToSend); NetworkAddressWithTime[] networkAddresses = peers.Select(p => NetworkAddressWithTime.Create(p.Listener.Address, p.Version.Timestamp, p.Version.Capabilities)).ToArray(); if (networkAddresses.Length == 0) return; - Context.Parent.Tell(Message.Create(MessageCommand.Addr, AddrPayload.Create(networkAddresses))); + EnqueueMessage(Message.Create(MessageCommand.Addr, AddrPayload.Create(networkAddresses))); } /// @@ -216,7 +186,7 @@ private void OnGetBlocksMessageReceived(GetBlocksPayload payload) hashes.Add(hash); } if (hashes.Count == 0) return; - Context.Parent.Tell(Message.Create(MessageCommand.Inv, InvPayload.Create(InventoryType.Block, hashes.ToArray()))); + EnqueueMessage(Message.Create(MessageCommand.Inv, InvPayload.Create(InventoryType.Block, hashes.ToArray()))); } private void OnGetBlockDataMessageReceived(GetBlockDataPayload payload) @@ -229,12 +199,12 @@ private void OnGetBlockDataMessageReceived(GetBlockDataPayload payload) if (bloom_filter == null) { - Context.Parent.Tell(Message.Create(MessageCommand.Block, block)); + EnqueueMessage(Message.Create(MessageCommand.Block, block)); } else { BitArray flags = new BitArray(block.Transactions.Select(p => bloom_filter.Test(p)).ToArray()); - Context.Parent.Tell(Message.Create(MessageCommand.MerkleBlock, MerkleBlockPayload.Create(block, flags))); + EnqueueMessage(Message.Create(MessageCommand.MerkleBlock, MerkleBlockPayload.Create(block, flags))); } } } @@ -255,7 +225,7 @@ private void OnGetDataMessageReceived(InvPayload payload) case InventoryType.TX: Transaction tx = Blockchain.Singleton.GetTransaction(hash); if (tx != null) - Context.Parent.Tell(Message.Create(MessageCommand.Transaction, tx)); + EnqueueMessage(Message.Create(MessageCommand.Transaction, tx)); break; case InventoryType.Block: Block block = Blockchain.Singleton.GetBlock(hash); @@ -263,18 +233,18 @@ private void OnGetDataMessageReceived(InvPayload payload) { if (bloom_filter == null) { - Context.Parent.Tell(Message.Create(MessageCommand.Block, block)); + EnqueueMessage(Message.Create(MessageCommand.Block, block)); } else { BitArray flags = new BitArray(block.Transactions.Select(p => bloom_filter.Test(p)).ToArray()); - Context.Parent.Tell(Message.Create(MessageCommand.MerkleBlock, MerkleBlockPayload.Create(block, flags))); + EnqueueMessage(Message.Create(MessageCommand.MerkleBlock, MerkleBlockPayload.Create(block, flags))); } } break; case InventoryType.Consensus: if (Blockchain.Singleton.ConsensusRelayCache.TryGet(hash, out IInventory inventoryConsensus)) - Context.Parent.Tell(Message.Create(MessageCommand.Consensus, inventoryConsensus)); + EnqueueMessage(Message.Create(MessageCommand.Consensus, inventoryConsensus)); break; } } @@ -304,18 +274,18 @@ private void OnGetHeadersMessageReceived(GetBlocksPayload payload) headers.Add(header); } if (headers.Count == 0) return; - Context.Parent.Tell(Message.Create(MessageCommand.Headers, HeadersPayload.Create(headers.ToArray()))); + EnqueueMessage(Message.Create(MessageCommand.Headers, HeadersPayload.Create(headers.ToArray()))); } private void OnHeadersMessageReceived(HeadersPayload payload) { if (payload.Headers.Length == 0) return; - system.Blockchain.Tell(payload.Headers, Context.Parent); + system.Blockchain.Tell(payload.Headers); } private void OnInventoryReceived(IInventory inventory) { - system.TaskManager.Tell(new TaskManager.TaskCompleted { Hash = inventory.Hash }, Context.Parent); + system.TaskManager.Tell(new TaskManager.TaskCompleted { Hash = inventory.Hash }); system.LocalNode.Tell(new LocalNode.Relay { Inventory = inventory }); pendingKnownHashes.Remove(inventory.Hash); knownHashes.Add(inventory.Hash); @@ -339,47 +309,61 @@ private void OnInvMessageReceived(InvPayload payload) if (hashes.Length == 0) return; foreach (UInt256 hash in hashes) pendingKnownHashes.Add((hash, DateTime.UtcNow)); - system.TaskManager.Tell(new TaskManager.NewTasks { Payload = InvPayload.Create(payload.Type, hashes) }, Context.Parent); + system.TaskManager.Tell(new TaskManager.NewTasks { Payload = InvPayload.Create(payload.Type, hashes) }); } private void OnMemPoolMessageReceived() { foreach (InvPayload payload in InvPayload.CreateGroup(InventoryType.TX, Blockchain.Singleton.MemPool.GetVerifiedTransactions().Select(p => p.Hash).ToArray())) - Context.Parent.Tell(Message.Create(MessageCommand.Inv, payload)); + EnqueueMessage(Message.Create(MessageCommand.Inv, payload)); } private void OnPingMessageReceived(PingPayload payload) { - Context.Parent.Tell(payload); - Context.Parent.Tell(Message.Create(MessageCommand.Pong, PingPayload.Create(Blockchain.Singleton.Height, payload.Nonce))); + UpdateLastBlockIndex(payload); + EnqueueMessage(Message.Create(MessageCommand.Pong, PingPayload.Create(Blockchain.Singleton.Height, payload.Nonce))); } private void OnPongMessageReceived(PingPayload payload) { - Context.Parent.Tell(payload); + UpdateLastBlockIndex(payload); } private void OnVerackMessageReceived() { verack = true; - Context.Parent.Tell(MessageCommand.Verack); + system.TaskManager.Tell(new TaskManager.Register { Version = Version }); + CheckMessageQueue(); } private void OnVersionMessageReceived(VersionPayload payload) { - version = payload; - Context.Parent.Tell(payload); - } - - private void OnTimer() - { - RefreshPendingKnownHashes(); - } - - protected override void PostStop() - { - timer.CancelIfNotNull(); - base.PostStop(); + Version = payload; + foreach (NodeCapability capability in payload.Capabilities) + { + switch (capability) + { + case FullNodeCapability fullNodeCapability: + IsFullNode = true; + LastBlockIndex = fullNodeCapability.StartHeight; + break; + case ServerCapability serverCapability: + if (serverCapability.Type == NodeCapabilityType.TcpServer) + ListenerTcpPort = serverCapability.Port; + break; + } + } + if (payload.Nonce == LocalNode.Nonce || payload.Magic != ProtocolSettings.Default.Magic) + { + Disconnect(true); + return; + } + if (LocalNode.Singleton.RemoteNodes.Values.Where(p => p != this).Any(p => p.Remote.Address.Equals(Remote.Address) && p.Version?.Nonce == payload.Nonce)) + { + Disconnect(true); + return; + } + SendMessage(Message.Create(MessageCommand.Verack)); } private void RefreshPendingKnownHashes() @@ -393,50 +377,12 @@ private void RefreshPendingKnownHashes() } } - public static Props Props(NeoSystem system) - { - return Akka.Actor.Props.Create(() => new ProtocolHandler(system)).WithMailbox("protocol-handler-mailbox"); - } - } - - internal class ProtocolHandlerMailbox : PriorityMailbox - { - public ProtocolHandlerMailbox(Settings settings, Config config) - : base(settings, config) - { - } - - internal protected override bool IsHighPriority(object message) + private void UpdateLastBlockIndex(PingPayload payload) { - if (!(message is Message msg)) return false; - switch (msg.Command) + if (payload.LastBlockIndex > LastBlockIndex) { - case MessageCommand.Consensus: - case MessageCommand.FilterAdd: - case MessageCommand.FilterClear: - case MessageCommand.FilterLoad: - case MessageCommand.Verack: - case MessageCommand.Version: - case MessageCommand.Alert: - return true; - default: - return false; - } - } - - internal protected override bool ShallDrop(object message, IEnumerable queue) - { - if (message is ProtocolHandler.Timer) return false; - if (!(message is Message msg)) return true; - switch (msg.Command) - { - case MessageCommand.GetAddr: - case MessageCommand.GetBlocks: - case MessageCommand.GetHeaders: - case MessageCommand.Mempool: - return queue.OfType().Any(p => p.Command == msg.Command); - default: - return false; + LastBlockIndex = payload.LastBlockIndex; + system.TaskManager.Tell(new TaskManager.Update { LastBlockIndex = LastBlockIndex }); } } } diff --git a/src/neo/Network/P2P/RemoteNode.cs b/src/neo/Network/P2P/RemoteNode.cs index 44bb2ef107..88b58163bf 100644 --- a/src/neo/Network/P2P/RemoteNode.cs +++ b/src/neo/Network/P2P/RemoteNode.cs @@ -7,24 +7,22 @@ using Neo.Ledger; using Neo.Network.P2P.Capabilities; using Neo.Network.P2P.Payloads; +using System.Collections; using System.Collections.Generic; using System.Linq; using System.Net; namespace Neo.Network.P2P { - public class RemoteNode : Connection + public partial class RemoteNode : Connection { internal class Relay { public IInventory Inventory; } private readonly NeoSystem system; - private readonly IActorRef protocol; private readonly Queue message_queue_high = new Queue(); private readonly Queue message_queue_low = new Queue(); private ByteString msg_buffer = ByteString.Empty; - private BloomFilter bloom_filter; private bool ack = true; - private bool verack = false; public IPEndPoint Listener => new IPEndPoint(Remote.Address, ListenerTcpPort); public int ListenerTcpPort { get; private set; } = 0; @@ -36,7 +34,6 @@ public RemoteNode(NeoSystem system, object connection, IPEndPoint remote, IPEndP : base(connection, remote, local) { this.system = system; - this.protocol = Context.ActorOf(ProtocolHandler.Props(system)); LocalNode.Singleton.RemoteNodes.TryAdd(Self, this); var capabilities = new List @@ -124,7 +121,7 @@ protected override void OnData(ByteString data) msg_buffer = msg_buffer.Concat(data); for (Message message = TryParseMessage(); message != null; message = TryParseMessage()) - protocol.Tell(message); + OnMessage(message); } protected override void OnReceive(object message) @@ -132,6 +129,9 @@ protected override void OnReceive(object message) base.OnReceive(message); switch (message) { + case Timer _: + RefreshPendingKnownHashes(); + break; case Message msg: EnqueueMessage(msg); break; @@ -141,27 +141,6 @@ protected override void OnReceive(object message) case Relay relay: OnRelay(relay.Inventory); break; - case VersionPayload payload: - OnVersionPayload(payload); - break; - case MessageCommand.Verack: - OnVerack(); - break; - case ProtocolHandler.SetFilter setFilter: - OnSetFilter(setFilter.Filter); - break; - case PingPayload payload: - OnPingPayload(payload); - break; - } - } - - private void OnPingPayload(PingPayload payload) - { - if (payload.LastBlockIndex > LastBlockIndex) - { - LastBlockIndex = payload.LastBlockIndex; - system.TaskManager.Tell(new TaskManager.Update { LastBlockIndex = LastBlockIndex }); } } @@ -187,50 +166,9 @@ private void OnSend(IInventory inventory) EnqueueMessage((MessageCommand)inventory.InventoryType, inventory); } - private void OnSetFilter(BloomFilter filter) - { - bloom_filter = filter; - } - - private void OnVerack() - { - verack = true; - system.TaskManager.Tell(new TaskManager.Register { Version = Version }); - CheckMessageQueue(); - } - - private void OnVersionPayload(VersionPayload version) - { - Version = version; - foreach (NodeCapability capability in version.Capabilities) - { - switch (capability) - { - case FullNodeCapability fullNodeCapability: - IsFullNode = true; - LastBlockIndex = fullNodeCapability.StartHeight; - break; - case ServerCapability serverCapability: - if (serverCapability.Type == NodeCapabilityType.TcpServer) - ListenerTcpPort = serverCapability.Port; - break; - } - } - if (version.Nonce == LocalNode.Nonce || version.Magic != ProtocolSettings.Default.Magic) - { - Disconnect(true); - return; - } - if (LocalNode.Singleton.RemoteNodes.Values.Where(p => p != this).Any(p => p.Remote.Address.Equals(Remote.Address) && p.Version?.Nonce == version.Nonce)) - { - Disconnect(true); - return; - } - SendMessage(Message.Create(MessageCommand.Verack)); - } - protected override void PostStop() { + timer.CancelIfNotNull(); LocalNode.Singleton.RemoteNodes.TryRemove(Self, out _); base.PostStop(); } @@ -246,15 +184,6 @@ private void SendMessage(Message message) SendData(ByteString.FromBytes(message.ToArray())); } - protected override SupervisorStrategy SupervisorStrategy() - { - return new OneForOneStrategy(ex => - { - Disconnect(true); - return Directive.Stop; - }, loggingEnabled: false); - } - private Message TryParseMessage() { var length = Message.TryDeserialize(msg_buffer, out var msg); @@ -273,13 +202,42 @@ internal protected override bool IsHighPriority(object message) { switch (message) { + case Message msg: + switch (msg.Command) + { + case MessageCommand.Consensus: + case MessageCommand.FilterAdd: + case MessageCommand.FilterClear: + case MessageCommand.FilterLoad: + case MessageCommand.Verack: + case MessageCommand.Version: + case MessageCommand.Alert: + return true; + default: + return false; + } case Tcp.ConnectionClosed _: - case Connection.Timer _: + case Connection.Close _: case Connection.Ack _: return true; default: return false; } } + + internal protected override bool ShallDrop(object message, IEnumerable queue) + { + if (!(message is Message msg)) return false; + switch (msg.Command) + { + case MessageCommand.GetAddr: + case MessageCommand.GetBlocks: + case MessageCommand.GetHeaders: + case MessageCommand.Mempool: + return queue.OfType().Any(p => p.Command == msg.Command); + default: + return false; + } + } } } diff --git a/tests/neo.UnitTests/Network/P2P/UT_ProtocolHandler.cs b/tests/neo.UnitTests/Network/P2P/UT_ProtocolHandler.cs deleted file mode 100644 index 1f62a77580..0000000000 --- a/tests/neo.UnitTests/Network/P2P/UT_ProtocolHandler.cs +++ /dev/null @@ -1,42 +0,0 @@ -using Akka.TestKit.Xunit2; -using Microsoft.VisualStudio.TestTools.UnitTesting; -using Neo.Network.P2P; -using Neo.Network.P2P.Capabilities; -using Neo.Network.P2P.Payloads; - -namespace Neo.UnitTests.Network.P2P -{ - [TestClass] - public class UT_ProtocolHandler : TestKit - { - [TestCleanup] - public void Cleanup() - { - Shutdown(); - } - - [TestMethod] - public void ProtocolHandler_Test_SendVersion_TellParent() - { - var senderProbe = CreateTestProbe(); - var parent = CreateTestProbe(); - var protocolActor = ActorOfAsTestActorRef(() => new ProtocolHandler(TestBlockchain.TheNeoSystem), parent); - - var payload = new VersionPayload() - { - UserAgent = "".PadLeft(1024, '0'), - Nonce = 1, - Magic = 2, - Timestamp = 5, - Version = 6, - Capabilities = new NodeCapability[] - { - new ServerCapability(NodeCapabilityType.TcpServer, 25) - } - }; - - senderProbe.Send(protocolActor, Message.Create(MessageCommand.Version, payload)); - parent.ExpectMsg(); - } - } -} diff --git a/tests/neo.UnitTests/Network/P2P/UT_ProtocolHandlerMailbox.cs b/tests/neo.UnitTests/Network/P2P/UT_ProtocolHandlerMailbox.cs deleted file mode 100644 index f41deead2a..0000000000 --- a/tests/neo.UnitTests/Network/P2P/UT_ProtocolHandlerMailbox.cs +++ /dev/null @@ -1,185 +0,0 @@ -using Akka.TestKit.Xunit2; -using FluentAssertions; -using Microsoft.VisualStudio.TestTools.UnitTesting; -using Neo.IO; -using Neo.Network.P2P; -using System; -using System.Collections.Generic; -using System.Linq; - -namespace Neo.UnitTests.Network.P2P -{ - [TestClass] - public class UT_ProtocolHandlerMailbox : TestKit - { - private static readonly Random TestRandom = new Random(1337); // use fixed seed for guaranteed determinism - - ProtocolHandlerMailbox uut; - - [TestCleanup] - public void Cleanup() - { - Shutdown(); - } - - [TestInitialize] - public void TestSetup() - { - Akka.Actor.ActorSystem system = Sys; - var config = TestKit.DefaultConfig; - var akkaSettings = new Akka.Actor.Settings(system, config); - uut = new ProtocolHandlerMailbox(akkaSettings, config); - } - - [TestMethod] - public void ProtocolHandlerMailbox_Test_IsHighPriority() - { - ISerializable s = null; - - //handshaking - uut.IsHighPriority(Message.Create(MessageCommand.Version, s)).Should().Be(true); - uut.IsHighPriority(Message.Create(MessageCommand.Verack, s)).Should().Be(true); - - //connectivity - uut.IsHighPriority(Message.Create(MessageCommand.GetAddr, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.Addr, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.Ping, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.Pong, s)).Should().Be(false); - - //synchronization - uut.IsHighPriority(Message.Create(MessageCommand.GetHeaders, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.Headers, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.GetBlocks, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.Mempool, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.Inv, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.GetData, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.NotFound, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.Transaction, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.Block, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.Consensus, s)).Should().Be(true); - uut.IsHighPriority(Message.Create(MessageCommand.Reject, s)).Should().Be(false); - - //SPV protocol - uut.IsHighPriority(Message.Create(MessageCommand.FilterLoad, s)).Should().Be(true); - uut.IsHighPriority(Message.Create(MessageCommand.FilterAdd, s)).Should().Be(true); - uut.IsHighPriority(Message.Create(MessageCommand.FilterClear, s)).Should().Be(true); - uut.IsHighPriority(Message.Create(MessageCommand.MerkleBlock, s)).Should().Be(false); - - //others - uut.IsHighPriority(Message.Create(MessageCommand.Alert, s)).Should().Be(true); - - // any random object (non Message) should not have priority - object obj = null; - uut.IsHighPriority(obj).Should().Be(false); - } - - - [TestMethod] - public void ProtocolHandlerMailbox_Test_ShallDrop() - { - // using this for messages - ISerializable s = null; - Message msg = null; // multiple uses - // empty queue - IEnumerable emptyQueue = Enumerable.Empty(); - - // any random object (non Message) should be dropped - object obj = null; - uut.ShallDrop(obj, emptyQueue).Should().Be(true); - - //handshaking - // Version (no drop) - msg = Message.Create(MessageCommand.Version, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // Verack (no drop) - msg = Message.Create(MessageCommand.Verack, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - - //connectivity - // GetAddr (drop) - msg = Message.Create(MessageCommand.GetAddr, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(true); - // Addr (no drop) - msg = Message.Create(MessageCommand.Addr, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // Ping (no drop) - msg = Message.Create(MessageCommand.Ping, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // Pong (no drop) - msg = Message.Create(MessageCommand.Pong, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - - //synchronization - // GetHeaders (drop) - msg = Message.Create(MessageCommand.GetHeaders, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(true); - // Headers (no drop) - msg = Message.Create(MessageCommand.Headers, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // GetBlocks (drop) - msg = Message.Create(MessageCommand.GetBlocks, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(true); - // Mempool (drop) - msg = Message.Create(MessageCommand.Mempool, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(true); - // Inv (no drop) - msg = Message.Create(MessageCommand.Inv, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // NotFound (no drop) - msg = Message.Create(MessageCommand.NotFound, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // Transaction (no drop) - msg = Message.Create(MessageCommand.Transaction, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // Block (no drop) - msg = Message.Create(MessageCommand.Block, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // Consensus (no drop) - msg = Message.Create(MessageCommand.Consensus, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // Reject (no drop) - msg = Message.Create(MessageCommand.Reject, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - - //SPV protocol - // FilterLoad (no drop) - msg = Message.Create(MessageCommand.FilterLoad, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // FilterAdd (no drop) - msg = Message.Create(MessageCommand.FilterAdd, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // FilterClear (no drop) - msg = Message.Create(MessageCommand.FilterClear, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // MerkleBlock (no drop) - msg = Message.Create(MessageCommand.MerkleBlock, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - - //others - // Alert (no drop) - msg = Message.Create(MessageCommand.Alert, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - } - } -} diff --git a/tests/neo.UnitTests/Network/P2P/UT_RemoteNode.cs b/tests/neo.UnitTests/Network/P2P/UT_RemoteNode.cs index 7b2e083cbb..297af38e0c 100644 --- a/tests/neo.UnitTests/Network/P2P/UT_RemoteNode.cs +++ b/tests/neo.UnitTests/Network/P2P/UT_RemoteNode.cs @@ -2,6 +2,7 @@ using Akka.TestKit.Xunit2; using FluentAssertions; using Microsoft.VisualStudio.TestTools.UnitTesting; +using Neo.IO; using Neo.Network.P2P; using Neo.Network.P2P.Capabilities; using Neo.Network.P2P.Payloads; @@ -14,8 +15,7 @@ public class UT_RemoteNode : TestKit private static NeoSystem testBlockchain; public UT_RemoteNode() - : base($"remote-node-mailbox {{ mailbox-type: \"{typeof(RemoteNodeMailbox).AssemblyQualifiedName}\" }}" + - $"protocol-handler-mailbox {{ mailbox-type: \"{typeof(ProtocolHandlerMailbox).AssemblyQualifiedName}\" }}") + : base($"remote-node-mailbox {{ mailbox-type: \"{typeof(RemoteNodeMailbox).AssemblyQualifiedName}\" }}") { } @@ -33,7 +33,7 @@ public void RemoteNode_Test_Abort_DifferentMagic() connectionTestProbe.ExpectMsg(); - var payload = new VersionPayload() + var msg = Message.Create(MessageCommand.Version, new VersionPayload { UserAgent = "".PadLeft(1024, '0'), Nonce = 1, @@ -44,10 +44,10 @@ public void RemoteNode_Test_Abort_DifferentMagic() { new ServerCapability(NodeCapabilityType.TcpServer, 25) } - }; + }); var testProbe = CreateTestProbe(); - testProbe.Send(remoteNodeActor, payload); + testProbe.Send(remoteNodeActor, new Tcp.Received((ByteString)msg.ToArray())); connectionTestProbe.ExpectMsg(); } @@ -60,7 +60,7 @@ public void RemoteNode_Test_Accept_IfSameMagic() connectionTestProbe.ExpectMsg(); - var payload = new VersionPayload() + var msg = Message.Create(MessageCommand.Version, new VersionPayload() { UserAgent = "Unit Test".PadLeft(1024, '0'), Nonce = 1, @@ -71,10 +71,10 @@ public void RemoteNode_Test_Accept_IfSameMagic() { new ServerCapability(NodeCapabilityType.TcpServer, 25) } - }; + }); var testProbe = CreateTestProbe(); - testProbe.Send(remoteNodeActor, payload); + testProbe.Send(remoteNodeActor, new Tcp.Received((ByteString)msg.ToArray())); var verackMessage = connectionTestProbe.ExpectMsg(); diff --git a/tests/neo.UnitTests/Network/P2P/UT_RemoteNodeMailbox.cs b/tests/neo.UnitTests/Network/P2P/UT_RemoteNodeMailbox.cs index cd566f44f2..5ae7aec0da 100644 --- a/tests/neo.UnitTests/Network/P2P/UT_RemoteNodeMailbox.cs +++ b/tests/neo.UnitTests/Network/P2P/UT_RemoteNodeMailbox.cs @@ -2,8 +2,11 @@ using Akka.TestKit.Xunit2; using FluentAssertions; using Microsoft.VisualStudio.TestTools.UnitTesting; +using Neo.IO; using Neo.Network.P2P; using System; +using System.Collections.Generic; +using System.Linq; namespace Neo.UnitTests.Network.P2P { @@ -32,14 +35,155 @@ public void TestSetup() [TestMethod] public void RemoteNode_Test_IsHighPriority() { + ISerializable s = null; + + //handshaking + uut.IsHighPriority(Message.Create(MessageCommand.Version, s)).Should().Be(true); + uut.IsHighPriority(Message.Create(MessageCommand.Verack, s)).Should().Be(true); + + //connectivity + uut.IsHighPriority(Message.Create(MessageCommand.GetAddr, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.Addr, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.Ping, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.Pong, s)).Should().Be(false); + + //synchronization + uut.IsHighPriority(Message.Create(MessageCommand.GetHeaders, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.Headers, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.GetBlocks, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.Mempool, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.Inv, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.GetData, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.NotFound, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.Transaction, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.Block, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.Consensus, s)).Should().Be(true); + uut.IsHighPriority(Message.Create(MessageCommand.Reject, s)).Should().Be(false); + + //SPV protocol + uut.IsHighPriority(Message.Create(MessageCommand.FilterLoad, s)).Should().Be(true); + uut.IsHighPriority(Message.Create(MessageCommand.FilterAdd, s)).Should().Be(true); + uut.IsHighPriority(Message.Create(MessageCommand.FilterClear, s)).Should().Be(true); + uut.IsHighPriority(Message.Create(MessageCommand.MerkleBlock, s)).Should().Be(false); + + //others + uut.IsHighPriority(Message.Create(MessageCommand.Alert, s)).Should().Be(true); + // high priority commands uut.IsHighPriority(new Tcp.ConnectionClosed()).Should().Be(true); - uut.IsHighPriority(new Connection.Timer()).Should().Be(true); + uut.IsHighPriority(new Connection.Close()).Should().Be(true); uut.IsHighPriority(new Connection.Ack()).Should().Be(true); // any random object should not have priority object obj = null; uut.IsHighPriority(obj).Should().Be(false); } + + public void ProtocolHandlerMailbox_Test_ShallDrop() + { + // using this for messages + ISerializable s = null; + Message msg; // multiple uses + // empty queue + IEnumerable emptyQueue = Enumerable.Empty(); + + // any random object (non Message) should be dropped + object obj = null; + uut.ShallDrop(obj, emptyQueue).Should().Be(true); + + //handshaking + // Version (no drop) + msg = Message.Create(MessageCommand.Version, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // Verack (no drop) + msg = Message.Create(MessageCommand.Verack, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + + //connectivity + // GetAddr (drop) + msg = Message.Create(MessageCommand.GetAddr, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(true); + // Addr (no drop) + msg = Message.Create(MessageCommand.Addr, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // Ping (no drop) + msg = Message.Create(MessageCommand.Ping, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // Pong (no drop) + msg = Message.Create(MessageCommand.Pong, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + + //synchronization + // GetHeaders (drop) + msg = Message.Create(MessageCommand.GetHeaders, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(true); + // Headers (no drop) + msg = Message.Create(MessageCommand.Headers, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // GetBlocks (drop) + msg = Message.Create(MessageCommand.GetBlocks, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(true); + // Mempool (drop) + msg = Message.Create(MessageCommand.Mempool, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(true); + // Inv (no drop) + msg = Message.Create(MessageCommand.Inv, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // NotFound (no drop) + msg = Message.Create(MessageCommand.NotFound, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // Transaction (no drop) + msg = Message.Create(MessageCommand.Transaction, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // Block (no drop) + msg = Message.Create(MessageCommand.Block, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // Consensus (no drop) + msg = Message.Create(MessageCommand.Consensus, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // Reject (no drop) + msg = Message.Create(MessageCommand.Reject, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + + //SPV protocol + // FilterLoad (no drop) + msg = Message.Create(MessageCommand.FilterLoad, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // FilterAdd (no drop) + msg = Message.Create(MessageCommand.FilterAdd, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // FilterClear (no drop) + msg = Message.Create(MessageCommand.FilterClear, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // MerkleBlock (no drop) + msg = Message.Create(MessageCommand.MerkleBlock, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + + //others + // Alert (no drop) + msg = Message.Create(MessageCommand.Alert, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + } } }