From 0e2e1174f17f780dbc3031d1dc0d8dab438faa53 Mon Sep 17 00:00:00 2001 From: erikzhang Date: Tue, 31 Mar 2020 16:08:15 +0800 Subject: [PATCH 1/3] Combine RemoteNode and ProtocolHandler --- src/neo/NeoSystem.cs | 1 - src/neo/Network/P2P/Connection.cs | 10 +- src/neo/Network/P2P/ProtocolHandler.cs | 443 ------------------ src/neo/Network/P2P/RemoteNode.cs | 412 ++++++++++++++-- .../Network/P2P/UT_ProtocolHandler.cs | 42 -- .../Network/P2P/UT_ProtocolHandlerMailbox.cs | 185 -------- .../Network/P2P/UT_RemoteNode.cs | 3 +- .../Network/P2P/UT_RemoteNodeMailbox.cs | 146 +++++- 8 files changed, 526 insertions(+), 716 deletions(-) delete mode 100644 src/neo/Network/P2P/ProtocolHandler.cs delete mode 100644 tests/neo.UnitTests/Network/P2P/UT_ProtocolHandler.cs delete mode 100644 tests/neo.UnitTests/Network/P2P/UT_ProtocolHandlerMailbox.cs diff --git a/src/neo/NeoSystem.cs b/src/neo/NeoSystem.cs index b837426d45..301df2c5d6 100644 --- a/src/neo/NeoSystem.cs +++ b/src/neo/NeoSystem.cs @@ -16,7 +16,6 @@ public class NeoSystem : IDisposable $"blockchain-mailbox {{ mailbox-type: \"{typeof(BlockchainMailbox).AssemblyQualifiedName}\" }}" + $"task-manager-mailbox {{ mailbox-type: \"{typeof(TaskManagerMailbox).AssemblyQualifiedName}\" }}" + $"remote-node-mailbox {{ mailbox-type: \"{typeof(RemoteNodeMailbox).AssemblyQualifiedName}\" }}" + - $"protocol-handler-mailbox {{ mailbox-type: \"{typeof(ProtocolHandlerMailbox).AssemblyQualifiedName}\" }}" + $"consensus-service-mailbox {{ mailbox-type: \"{typeof(ConsensusServiceMailbox).AssemblyQualifiedName}\" }}"); public IActorRef Blockchain { get; } public IActorRef LocalNode { get; } diff --git a/src/neo/Network/P2P/Connection.cs b/src/neo/Network/P2P/Connection.cs index 7dff7e3a02..b12b2723a5 100644 --- a/src/neo/Network/P2P/Connection.cs +++ b/src/neo/Network/P2P/Connection.cs @@ -9,7 +9,7 @@ namespace Neo.Network.P2P { public abstract class Connection : UntypedActor { - internal class Timer { public static Timer Instance = new Timer(); } + internal class Close { public bool Abort; } internal class Ack : Tcp.Event { public static Ack Instance = new Ack(); } /// @@ -32,7 +32,7 @@ protected Connection(object connection, IPEndPoint remote, IPEndPoint local) { this.Remote = remote; this.Local = local; - this.timer = Context.System.Scheduler.ScheduleTellOnceCancelable(TimeSpan.FromSeconds(connectionTimeoutLimitStart), Self, Timer.Instance, ActorRefs.NoSender); + this.timer = Context.System.Scheduler.ScheduleTellOnceCancelable(TimeSpan.FromSeconds(connectionTimeoutLimitStart), Self, new Close { Abort = true }, ActorRefs.NoSender); switch (connection) { case IActorRef tcp: @@ -89,8 +89,8 @@ protected override void OnReceive(object message) { switch (message) { - case Timer _: - Disconnect(true); + case Close close: + Disconnect(close.Abort); break; case Ack _: OnAck(); @@ -107,7 +107,7 @@ protected override void OnReceive(object message) private void OnReceived(ByteString data) { timer.CancelIfNotNull(); - timer = Context.System.Scheduler.ScheduleTellOnceCancelable(TimeSpan.FromSeconds(connectionTimeoutLimit), Self, Timer.Instance, ActorRefs.NoSender); + timer = Context.System.Scheduler.ScheduleTellOnceCancelable(TimeSpan.FromSeconds(connectionTimeoutLimit), Self, new Close { Abort = true }, ActorRefs.NoSender); try { OnData(data); diff --git a/src/neo/Network/P2P/ProtocolHandler.cs b/src/neo/Network/P2P/ProtocolHandler.cs deleted file mode 100644 index a97cb46922..0000000000 --- a/src/neo/Network/P2P/ProtocolHandler.cs +++ /dev/null @@ -1,443 +0,0 @@ -using Akka.Actor; -using Akka.Configuration; -using Neo.Cryptography; -using Neo.IO; -using Neo.IO.Actors; -using Neo.IO.Caching; -using Neo.Ledger; -using Neo.Network.P2P.Payloads; -using Neo.Persistence; -using Neo.Plugins; -using System; -using System.Collections; -using System.Collections.Generic; -using System.Collections.ObjectModel; -using System.Linq; -using System.Net; - -namespace Neo.Network.P2P -{ - internal class ProtocolHandler : UntypedActor - { - public class SetFilter { public BloomFilter Filter; } - internal class Timer { } - - private class PendingKnownHashesCollection : KeyedCollection - { - protected override UInt256 GetKeyForItem((UInt256, DateTime) item) - { - return item.Item1; - } - } - - private readonly NeoSystem system; - private readonly PendingKnownHashesCollection pendingKnownHashes; - private readonly HashSetCache knownHashes; - private readonly HashSetCache sentHashes; - private VersionPayload version; - private bool verack = false; - private BloomFilter bloom_filter; - - private static readonly TimeSpan TimerInterval = TimeSpan.FromSeconds(30); - private static readonly TimeSpan PendingTimeout = TimeSpan.FromMinutes(1); - - private readonly ICancelable timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimerInterval, TimerInterval, Context.Self, new Timer(), ActorRefs.NoSender); - - public ProtocolHandler(NeoSystem system) - { - this.system = system; - this.pendingKnownHashes = new PendingKnownHashesCollection(); - this.knownHashes = new HashSetCache(Blockchain.Singleton.MemPool.Capacity * 2 / 5); - this.sentHashes = new HashSetCache(Blockchain.Singleton.MemPool.Capacity * 2 / 5); - } - - protected override void OnReceive(object message) - { - switch (message) - { - case Message msg: - OnMessage(msg); - break; - case Timer _: - OnTimer(); - break; - } - } - - private void OnMessage(Message msg) - { - foreach (IP2PPlugin plugin in Plugin.P2PPlugins) - if (!plugin.OnP2PMessage(msg)) - return; - if (version == null) - { - if (msg.Command != MessageCommand.Version) - throw new ProtocolViolationException(); - OnVersionMessageReceived((VersionPayload)msg.Payload); - return; - } - if (!verack) - { - if (msg.Command != MessageCommand.Verack) - throw new ProtocolViolationException(); - OnVerackMessageReceived(); - return; - } - switch (msg.Command) - { - case MessageCommand.Addr: - OnAddrMessageReceived((AddrPayload)msg.Payload); - break; - case MessageCommand.Block: - OnInventoryReceived((Block)msg.Payload); - break; - case MessageCommand.Consensus: - OnInventoryReceived((ConsensusPayload)msg.Payload); - break; - case MessageCommand.FilterAdd: - OnFilterAddMessageReceived((FilterAddPayload)msg.Payload); - break; - case MessageCommand.FilterClear: - OnFilterClearMessageReceived(); - break; - case MessageCommand.FilterLoad: - OnFilterLoadMessageReceived((FilterLoadPayload)msg.Payload); - break; - case MessageCommand.GetAddr: - OnGetAddrMessageReceived(); - break; - case MessageCommand.GetBlocks: - OnGetBlocksMessageReceived((GetBlocksPayload)msg.Payload); - break; - case MessageCommand.GetBlockData: - OnGetBlockDataMessageReceived((GetBlockDataPayload)msg.Payload); - break; - case MessageCommand.GetData: - OnGetDataMessageReceived((InvPayload)msg.Payload); - break; - case MessageCommand.GetHeaders: - OnGetHeadersMessageReceived((GetBlocksPayload)msg.Payload); - break; - case MessageCommand.Headers: - OnHeadersMessageReceived((HeadersPayload)msg.Payload); - break; - case MessageCommand.Inv: - OnInvMessageReceived((InvPayload)msg.Payload); - break; - case MessageCommand.Mempool: - OnMemPoolMessageReceived(); - break; - case MessageCommand.Ping: - OnPingMessageReceived((PingPayload)msg.Payload); - break; - case MessageCommand.Pong: - OnPongMessageReceived((PingPayload)msg.Payload); - break; - case MessageCommand.Transaction: - if (msg.Payload.Size <= Transaction.MaxTransactionSize) - OnInventoryReceived((Transaction)msg.Payload); - break; - case MessageCommand.Verack: - case MessageCommand.Version: - throw new ProtocolViolationException(); - case MessageCommand.Alert: - case MessageCommand.MerkleBlock: - case MessageCommand.NotFound: - case MessageCommand.Reject: - default: break; - } - } - - private void OnAddrMessageReceived(AddrPayload payload) - { - system.LocalNode.Tell(new Peer.Peers - { - EndPoints = payload.AddressList.Select(p => p.EndPoint).Where(p => p.Port > 0) - }); - } - - private void OnFilterAddMessageReceived(FilterAddPayload payload) - { - if (bloom_filter != null) - bloom_filter.Add(payload.Data); - } - - private void OnFilterClearMessageReceived() - { - bloom_filter = null; - Context.Parent.Tell(new SetFilter { Filter = null }); - } - - private void OnFilterLoadMessageReceived(FilterLoadPayload payload) - { - bloom_filter = new BloomFilter(payload.Filter.Length * 8, payload.K, payload.Tweak, payload.Filter); - Context.Parent.Tell(new SetFilter { Filter = bloom_filter }); - } - - /// - /// Will be triggered when a MessageCommand.GetAddr message is received. - /// Randomly select nodes from the local RemoteNodes and tells to RemoteNode actors a MessageCommand.Addr message. - /// The message contains a list of networkAddresses from those selected random peers. - /// - private void OnGetAddrMessageReceived() - { - Random rand = new Random(); - IEnumerable peers = LocalNode.Singleton.RemoteNodes.Values - .Where(p => p.ListenerTcpPort > 0) - .GroupBy(p => p.Remote.Address, (k, g) => g.First()) - .OrderBy(p => rand.Next()) - .Take(AddrPayload.MaxCountToSend); - NetworkAddressWithTime[] networkAddresses = peers.Select(p => NetworkAddressWithTime.Create(p.Listener.Address, p.Version.Timestamp, p.Version.Capabilities)).ToArray(); - if (networkAddresses.Length == 0) return; - Context.Parent.Tell(Message.Create(MessageCommand.Addr, AddrPayload.Create(networkAddresses))); - } - - /// - /// Will be triggered when a MessageCommand.GetBlocks message is received. - /// Tell the specified number of blocks' hashes starting with the requested HashStart until payload.Count or MaxHashesCount - /// Responses are sent to RemoteNode actor as MessageCommand.Inv Message. - /// - /// A GetBlocksPayload including start block Hash and number of blocks requested. - private void OnGetBlocksMessageReceived(GetBlocksPayload payload) - { - UInt256 hash = payload.HashStart; - // The default value of payload.Count is -1 - int count = payload.Count < 0 || payload.Count > InvPayload.MaxHashesCount ? InvPayload.MaxHashesCount : payload.Count; - TrimmedBlock state = Blockchain.Singleton.View.Blocks.TryGet(hash); - if (state == null) return; - List hashes = new List(); - for (uint i = 1; i <= count; i++) - { - uint index = state.Index + i; - if (index > Blockchain.Singleton.Height) - break; - hash = Blockchain.Singleton.GetBlockHash(index); - if (hash == null) break; - hashes.Add(hash); - } - if (hashes.Count == 0) return; - Context.Parent.Tell(Message.Create(MessageCommand.Inv, InvPayload.Create(InventoryType.Block, hashes.ToArray()))); - } - - private void OnGetBlockDataMessageReceived(GetBlockDataPayload payload) - { - for (uint i = payload.IndexStart, max = payload.IndexStart + payload.Count; i < max; i++) - { - Block block = Blockchain.Singleton.GetBlock(i); - if (block == null) - break; - - if (bloom_filter == null) - { - Context.Parent.Tell(Message.Create(MessageCommand.Block, block)); - } - else - { - BitArray flags = new BitArray(block.Transactions.Select(p => bloom_filter.Test(p)).ToArray()); - Context.Parent.Tell(Message.Create(MessageCommand.MerkleBlock, MerkleBlockPayload.Create(block, flags))); - } - } - } - - /// - /// Will be triggered when a MessageCommand.GetData message is received. - /// The payload includes an array of hash values. - /// For different payload.Type (Tx, Block, Consensus), get the corresponding (Txs, Blocks, Consensus) and tell them to RemoteNode actor. - /// - /// The payload containing the requested information. - private void OnGetDataMessageReceived(InvPayload payload) - { - UInt256[] hashes = payload.Hashes.Where(p => sentHashes.Add(p)).ToArray(); - foreach (UInt256 hash in hashes) - { - switch (payload.Type) - { - case InventoryType.TX: - Transaction tx = Blockchain.Singleton.GetTransaction(hash); - if (tx != null) - Context.Parent.Tell(Message.Create(MessageCommand.Transaction, tx)); - break; - case InventoryType.Block: - Block block = Blockchain.Singleton.GetBlock(hash); - if (block != null) - { - if (bloom_filter == null) - { - Context.Parent.Tell(Message.Create(MessageCommand.Block, block)); - } - else - { - BitArray flags = new BitArray(block.Transactions.Select(p => bloom_filter.Test(p)).ToArray()); - Context.Parent.Tell(Message.Create(MessageCommand.MerkleBlock, MerkleBlockPayload.Create(block, flags))); - } - } - break; - case InventoryType.Consensus: - if (Blockchain.Singleton.ConsensusRelayCache.TryGet(hash, out IInventory inventoryConsensus)) - Context.Parent.Tell(Message.Create(MessageCommand.Consensus, inventoryConsensus)); - break; - } - } - } - - /// - /// Will be triggered when a MessageCommand.GetHeaders message is received. - /// Tell the specified number of blocks' headers starting with the requested HashStart to RemoteNode actor. - /// A limit set by HeadersPayload.MaxHeadersCount is also applied to the number of requested Headers, namely payload.Count. - /// - /// A GetBlocksPayload including start block Hash and number of blocks' headers requested. - private void OnGetHeadersMessageReceived(GetBlocksPayload payload) - { - UInt256 hash = payload.HashStart; - int count = payload.Count < 0 || payload.Count > HeadersPayload.MaxHeadersCount ? HeadersPayload.MaxHeadersCount : payload.Count; - DataCache cache = Blockchain.Singleton.View.Blocks; - TrimmedBlock state = cache.TryGet(hash); - if (state == null) return; - List
headers = new List
(); - for (uint i = 1; i <= count; i++) - { - uint index = state.Index + i; - hash = Blockchain.Singleton.GetBlockHash(index); - if (hash == null) break; - Header header = cache.TryGet(hash)?.Header; - if (header == null) break; - headers.Add(header); - } - if (headers.Count == 0) return; - Context.Parent.Tell(Message.Create(MessageCommand.Headers, HeadersPayload.Create(headers.ToArray()))); - } - - private void OnHeadersMessageReceived(HeadersPayload payload) - { - if (payload.Headers.Length == 0) return; - system.Blockchain.Tell(payload.Headers, Context.Parent); - } - - private void OnInventoryReceived(IInventory inventory) - { - system.TaskManager.Tell(new TaskManager.TaskCompleted { Hash = inventory.Hash }, Context.Parent); - system.LocalNode.Tell(new LocalNode.Relay { Inventory = inventory }); - pendingKnownHashes.Remove(inventory.Hash); - knownHashes.Add(inventory.Hash); - } - - private void OnInvMessageReceived(InvPayload payload) - { - UInt256[] hashes = payload.Hashes.Where(p => !pendingKnownHashes.Contains(p) && !knownHashes.Contains(p) && !sentHashes.Contains(p)).ToArray(); - if (hashes.Length == 0) return; - switch (payload.Type) - { - case InventoryType.Block: - using (SnapshotView snapshot = Blockchain.Singleton.GetSnapshot()) - hashes = hashes.Where(p => !snapshot.ContainsBlock(p)).ToArray(); - break; - case InventoryType.TX: - using (SnapshotView snapshot = Blockchain.Singleton.GetSnapshot()) - hashes = hashes.Where(p => !snapshot.ContainsTransaction(p)).ToArray(); - break; - } - if (hashes.Length == 0) return; - foreach (UInt256 hash in hashes) - pendingKnownHashes.Add((hash, DateTime.UtcNow)); - system.TaskManager.Tell(new TaskManager.NewTasks { Payload = InvPayload.Create(payload.Type, hashes) }, Context.Parent); - } - - private void OnMemPoolMessageReceived() - { - foreach (InvPayload payload in InvPayload.CreateGroup(InventoryType.TX, Blockchain.Singleton.MemPool.GetVerifiedTransactions().Select(p => p.Hash).ToArray())) - Context.Parent.Tell(Message.Create(MessageCommand.Inv, payload)); - } - - private void OnPingMessageReceived(PingPayload payload) - { - Context.Parent.Tell(payload); - Context.Parent.Tell(Message.Create(MessageCommand.Pong, PingPayload.Create(Blockchain.Singleton.Height, payload.Nonce))); - } - - private void OnPongMessageReceived(PingPayload payload) - { - Context.Parent.Tell(payload); - } - - private void OnVerackMessageReceived() - { - verack = true; - Context.Parent.Tell(MessageCommand.Verack); - } - - private void OnVersionMessageReceived(VersionPayload payload) - { - version = payload; - Context.Parent.Tell(payload); - } - - private void OnTimer() - { - RefreshPendingKnownHashes(); - } - - protected override void PostStop() - { - timer.CancelIfNotNull(); - base.PostStop(); - } - - private void RefreshPendingKnownHashes() - { - while (pendingKnownHashes.Count > 0) - { - var (_, time) = pendingKnownHashes[0]; - if (DateTime.UtcNow - time <= PendingTimeout) - break; - pendingKnownHashes.RemoveAt(0); - } - } - - public static Props Props(NeoSystem system) - { - return Akka.Actor.Props.Create(() => new ProtocolHandler(system)).WithMailbox("protocol-handler-mailbox"); - } - } - - internal class ProtocolHandlerMailbox : PriorityMailbox - { - public ProtocolHandlerMailbox(Settings settings, Config config) - : base(settings, config) - { - } - - internal protected override bool IsHighPriority(object message) - { - if (!(message is Message msg)) return false; - switch (msg.Command) - { - case MessageCommand.Consensus: - case MessageCommand.FilterAdd: - case MessageCommand.FilterClear: - case MessageCommand.FilterLoad: - case MessageCommand.Verack: - case MessageCommand.Version: - case MessageCommand.Alert: - return true; - default: - return false; - } - } - - internal protected override bool ShallDrop(object message, IEnumerable queue) - { - if (message is ProtocolHandler.Timer) return false; - if (!(message is Message msg)) return true; - switch (msg.Command) - { - case MessageCommand.GetAddr: - case MessageCommand.GetBlocks: - case MessageCommand.GetHeaders: - case MessageCommand.Mempool: - return queue.OfType().Any(p => p.Command == msg.Command); - default: - return false; - } - } - } -} diff --git a/src/neo/Network/P2P/RemoteNode.cs b/src/neo/Network/P2P/RemoteNode.cs index 44bb2ef107..696a54ab36 100644 --- a/src/neo/Network/P2P/RemoteNode.cs +++ b/src/neo/Network/P2P/RemoteNode.cs @@ -4,10 +4,16 @@ using Neo.Cryptography; using Neo.IO; using Neo.IO.Actors; +using Neo.IO.Caching; using Neo.Ledger; using Neo.Network.P2P.Capabilities; using Neo.Network.P2P.Payloads; +using Neo.Persistence; +using Neo.Plugins; +using System; +using System.Collections; using System.Collections.Generic; +using System.Collections.ObjectModel; using System.Linq; using System.Net; @@ -16,15 +22,29 @@ namespace Neo.Network.P2P public class RemoteNode : Connection { internal class Relay { public IInventory Inventory; } + private class Timer { } + private class PendingKnownHashesCollection : KeyedCollection + { + protected override UInt256 GetKeyForItem((UInt256, DateTime) item) + { + return item.Item1; + } + } + + private static readonly TimeSpan TimerInterval = TimeSpan.FromSeconds(30); + private static readonly TimeSpan PendingTimeout = TimeSpan.FromMinutes(1); private readonly NeoSystem system; - private readonly IActorRef protocol; + private readonly ICancelable timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimerInterval, TimerInterval, Context.Self, new Timer(), ActorRefs.NoSender); private readonly Queue message_queue_high = new Queue(); private readonly Queue message_queue_low = new Queue(); private ByteString msg_buffer = ByteString.Empty; - private BloomFilter bloom_filter; + private readonly PendingKnownHashesCollection pendingKnownHashes = new PendingKnownHashesCollection(); + private readonly HashSetCache knownHashes = new HashSetCache(Blockchain.Singleton.MemPool.Capacity * 2 / 5); + private readonly HashSetCache sentHashes = new HashSetCache(Blockchain.Singleton.MemPool.Capacity * 2 / 5); private bool ack = true; private bool verack = false; + private BloomFilter bloom_filter; public IPEndPoint Listener => new IPEndPoint(Remote.Address, ListenerTcpPort); public int ListenerTcpPort { get; private set; } = 0; @@ -36,7 +56,6 @@ public RemoteNode(NeoSystem system, object connection, IPEndPoint remote, IPEndP : base(connection, remote, local) { this.system = system; - this.protocol = Context.ActorOf(ProtocolHandler.Props(system)); LocalNode.Singleton.RemoteNodes.TryAdd(Self, this); var capabilities = new List @@ -119,49 +138,323 @@ protected override void OnAck() CheckMessageQueue(); } + private void OnAddrMessageReceived(AddrPayload payload) + { + system.LocalNode.Tell(new Peer.Peers + { + EndPoints = payload.AddressList.Select(p => p.EndPoint).Where(p => p.Port > 0) + }); + } + protected override void OnData(ByteString data) { msg_buffer = msg_buffer.Concat(data); for (Message message = TryParseMessage(); message != null; message = TryParseMessage()) - protocol.Tell(message); + OnMessage(message); } - protected override void OnReceive(object message) + private void OnFilterAddMessageReceived(FilterAddPayload payload) { - base.OnReceive(message); - switch (message) + bloom_filter?.Add(payload.Data); + } + + private void OnFilterClearMessageReceived() + { + bloom_filter = null; + } + + private void OnFilterLoadMessageReceived(FilterLoadPayload payload) + { + bloom_filter = new BloomFilter(payload.Filter.Length * 8, payload.K, payload.Tweak, payload.Filter); + } + + /// + /// Will be triggered when a MessageCommand.GetAddr message is received. + /// Randomly select nodes from the local RemoteNodes and tells to RemoteNode actors a MessageCommand.Addr message. + /// The message contains a list of networkAddresses from those selected random peers. + /// + private void OnGetAddrMessageReceived() + { + Random rand = new Random(); + IEnumerable peers = LocalNode.Singleton.RemoteNodes.Values + .Where(p => p.ListenerTcpPort > 0) + .GroupBy(p => p.Remote.Address, (k, g) => g.First()) + .OrderBy(p => rand.Next()) + .Take(AddrPayload.MaxCountToSend); + NetworkAddressWithTime[] networkAddresses = peers.Select(p => NetworkAddressWithTime.Create(p.Listener.Address, p.Version.Timestamp, p.Version.Capabilities)).ToArray(); + if (networkAddresses.Length == 0) return; + EnqueueMessage(Message.Create(MessageCommand.Addr, AddrPayload.Create(networkAddresses))); + } + + /// + /// Will be triggered when a MessageCommand.GetBlocks message is received. + /// Tell the specified number of blocks' hashes starting with the requested HashStart until payload.Count or MaxHashesCount + /// Responses are sent to RemoteNode actor as MessageCommand.Inv Message. + /// + /// A GetBlocksPayload including start block Hash and number of blocks requested. + private void OnGetBlocksMessageReceived(GetBlocksPayload payload) + { + UInt256 hash = payload.HashStart; + // The default value of payload.Count is -1 + int count = payload.Count < 0 || payload.Count > InvPayload.MaxHashesCount ? InvPayload.MaxHashesCount : payload.Count; + TrimmedBlock state = Blockchain.Singleton.View.Blocks.TryGet(hash); + if (state == null) return; + List hashes = new List(); + for (uint i = 1; i <= count; i++) { - case Message msg: - EnqueueMessage(msg); + uint index = state.Index + i; + if (index > Blockchain.Singleton.Height) break; - case IInventory inventory: - OnSend(inventory); + hash = Blockchain.Singleton.GetBlockHash(index); + if (hash == null) break; + hashes.Add(hash); + } + if (hashes.Count == 0) return; + EnqueueMessage(Message.Create(MessageCommand.Inv, InvPayload.Create(InventoryType.Block, hashes.ToArray()))); + } + + private void OnGetBlockDataMessageReceived(GetBlockDataPayload payload) + { + for (uint i = payload.IndexStart, max = payload.IndexStart + payload.Count; i < max; i++) + { + Block block = Blockchain.Singleton.GetBlock(i); + if (block == null) break; - case Relay relay: - OnRelay(relay.Inventory); + + if (bloom_filter == null) + { + EnqueueMessage(Message.Create(MessageCommand.Block, block)); + } + else + { + BitArray flags = new BitArray(block.Transactions.Select(p => bloom_filter.Test(p)).ToArray()); + EnqueueMessage(Message.Create(MessageCommand.MerkleBlock, MerkleBlockPayload.Create(block, flags))); + } + } + } + + /// + /// Will be triggered when a MessageCommand.GetData message is received. + /// The payload includes an array of hash values. + /// For different payload.Type (Tx, Block, Consensus), get the corresponding (Txs, Blocks, Consensus) and tell them to RemoteNode actor. + /// + /// The payload containing the requested information. + private void OnGetDataMessageReceived(InvPayload payload) + { + UInt256[] hashes = payload.Hashes.Where(p => sentHashes.Add(p)).ToArray(); + foreach (UInt256 hash in hashes) + { + switch (payload.Type) + { + case InventoryType.TX: + Transaction tx = Blockchain.Singleton.GetTransaction(hash); + if (tx != null) + EnqueueMessage(Message.Create(MessageCommand.Transaction, tx)); + break; + case InventoryType.Block: + Block block = Blockchain.Singleton.GetBlock(hash); + if (block != null) + { + if (bloom_filter == null) + { + EnqueueMessage(Message.Create(MessageCommand.Block, block)); + } + else + { + BitArray flags = new BitArray(block.Transactions.Select(p => bloom_filter.Test(p)).ToArray()); + EnqueueMessage(Message.Create(MessageCommand.MerkleBlock, MerkleBlockPayload.Create(block, flags))); + } + } + break; + case InventoryType.Consensus: + if (Blockchain.Singleton.ConsensusRelayCache.TryGet(hash, out IInventory inventoryConsensus)) + EnqueueMessage(Message.Create(MessageCommand.Consensus, inventoryConsensus)); + break; + } + } + } + + /// + /// Will be triggered when a MessageCommand.GetHeaders message is received. + /// Tell the specified number of blocks' headers starting with the requested HashStart to RemoteNode actor. + /// A limit set by HeadersPayload.MaxHeadersCount is also applied to the number of requested Headers, namely payload.Count. + /// + /// A GetBlocksPayload including start block Hash and number of blocks' headers requested. + private void OnGetHeadersMessageReceived(GetBlocksPayload payload) + { + UInt256 hash = payload.HashStart; + int count = payload.Count < 0 || payload.Count > HeadersPayload.MaxHeadersCount ? HeadersPayload.MaxHeadersCount : payload.Count; + DataCache cache = Blockchain.Singleton.View.Blocks; + TrimmedBlock state = cache.TryGet(hash); + if (state == null) return; + List
headers = new List
(); + for (uint i = 1; i <= count; i++) + { + uint index = state.Index + i; + hash = Blockchain.Singleton.GetBlockHash(index); + if (hash == null) break; + Header header = cache.TryGet(hash)?.Header; + if (header == null) break; + headers.Add(header); + } + if (headers.Count == 0) return; + EnqueueMessage(Message.Create(MessageCommand.Headers, HeadersPayload.Create(headers.ToArray()))); + } + + private void OnHeadersMessageReceived(HeadersPayload payload) + { + if (payload.Headers.Length == 0) return; + system.Blockchain.Tell(payload.Headers); + } + + private void OnInventoryReceived(IInventory inventory) + { + system.TaskManager.Tell(new TaskManager.TaskCompleted { Hash = inventory.Hash }); + system.LocalNode.Tell(new LocalNode.Relay { Inventory = inventory }); + pendingKnownHashes.Remove(inventory.Hash); + knownHashes.Add(inventory.Hash); + } + + private void OnInvMessageReceived(InvPayload payload) + { + UInt256[] hashes = payload.Hashes.Where(p => !pendingKnownHashes.Contains(p) && !knownHashes.Contains(p) && !sentHashes.Contains(p)).ToArray(); + if (hashes.Length == 0) return; + switch (payload.Type) + { + case InventoryType.Block: + using (SnapshotView snapshot = Blockchain.Singleton.GetSnapshot()) + hashes = hashes.Where(p => !snapshot.ContainsBlock(p)).ToArray(); break; - case VersionPayload payload: - OnVersionPayload(payload); + case InventoryType.TX: + using (SnapshotView snapshot = Blockchain.Singleton.GetSnapshot()) + hashes = hashes.Where(p => !snapshot.ContainsTransaction(p)).ToArray(); break; - case MessageCommand.Verack: - OnVerack(); + } + if (hashes.Length == 0) return; + foreach (UInt256 hash in hashes) + pendingKnownHashes.Add((hash, DateTime.UtcNow)); + system.TaskManager.Tell(new TaskManager.NewTasks { Payload = InvPayload.Create(payload.Type, hashes) }); + } + + private void OnMemPoolMessageReceived() + { + foreach (InvPayload payload in InvPayload.CreateGroup(InventoryType.TX, Blockchain.Singleton.MemPool.GetVerifiedTransactions().Select(p => p.Hash).ToArray())) + EnqueueMessage(Message.Create(MessageCommand.Inv, payload)); + } + + private void OnMessage(Message msg) + { + foreach (IP2PPlugin plugin in Plugin.P2PPlugins) + if (!plugin.OnP2PMessage(msg)) + return; + if (Version == null) + { + if (msg.Command != MessageCommand.Version) + throw new ProtocolViolationException(); + OnVersionMessageReceived((VersionPayload)msg.Payload); + return; + } + if (!verack) + { + if (msg.Command != MessageCommand.Verack) + throw new ProtocolViolationException(); + OnVerackMessageReceived(); + return; + } + switch (msg.Command) + { + case MessageCommand.Addr: + OnAddrMessageReceived((AddrPayload)msg.Payload); + break; + case MessageCommand.Block: + OnInventoryReceived((Block)msg.Payload); + break; + case MessageCommand.Consensus: + OnInventoryReceived((ConsensusPayload)msg.Payload); break; - case ProtocolHandler.SetFilter setFilter: - OnSetFilter(setFilter.Filter); + case MessageCommand.FilterAdd: + OnFilterAddMessageReceived((FilterAddPayload)msg.Payload); + break; + case MessageCommand.FilterClear: + OnFilterClearMessageReceived(); break; - case PingPayload payload: - OnPingPayload(payload); + case MessageCommand.FilterLoad: + OnFilterLoadMessageReceived((FilterLoadPayload)msg.Payload); + break; + case MessageCommand.GetAddr: + OnGetAddrMessageReceived(); break; + case MessageCommand.GetBlocks: + OnGetBlocksMessageReceived((GetBlocksPayload)msg.Payload); + break; + case MessageCommand.GetBlockData: + OnGetBlockDataMessageReceived((GetBlockDataPayload)msg.Payload); + break; + case MessageCommand.GetData: + OnGetDataMessageReceived((InvPayload)msg.Payload); + break; + case MessageCommand.GetHeaders: + OnGetHeadersMessageReceived((GetBlocksPayload)msg.Payload); + break; + case MessageCommand.Headers: + OnHeadersMessageReceived((HeadersPayload)msg.Payload); + break; + case MessageCommand.Inv: + OnInvMessageReceived((InvPayload)msg.Payload); + break; + case MessageCommand.Mempool: + OnMemPoolMessageReceived(); + break; + case MessageCommand.Ping: + OnPingMessageReceived((PingPayload)msg.Payload); + break; + case MessageCommand.Pong: + OnPongMessageReceived((PingPayload)msg.Payload); + break; + case MessageCommand.Transaction: + if (msg.Payload.Size <= Transaction.MaxTransactionSize) + OnInventoryReceived((Transaction)msg.Payload); + break; + case MessageCommand.Verack: + case MessageCommand.Version: + throw new ProtocolViolationException(); + case MessageCommand.Alert: + case MessageCommand.MerkleBlock: + case MessageCommand.NotFound: + case MessageCommand.Reject: + default: break; } } - private void OnPingPayload(PingPayload payload) + private void OnPingMessageReceived(PingPayload payload) { - if (payload.LastBlockIndex > LastBlockIndex) + UpdateLastBlockIndex(payload); + EnqueueMessage(Message.Create(MessageCommand.Pong, PingPayload.Create(Blockchain.Singleton.Height, payload.Nonce))); + } + + private void OnPongMessageReceived(PingPayload payload) + { + UpdateLastBlockIndex(payload); + } + + protected override void OnReceive(object message) + { + base.OnReceive(message); + switch (message) { - LastBlockIndex = payload.LastBlockIndex; - system.TaskManager.Tell(new TaskManager.Update { LastBlockIndex = LastBlockIndex }); + case Timer _: + RefreshPendingKnownHashes(); + break; + case Message msg: + EnqueueMessage(msg); + break; + case IInventory inventory: + OnSend(inventory); + break; + case Relay relay: + OnRelay(relay.Inventory); + break; } } @@ -187,22 +480,17 @@ private void OnSend(IInventory inventory) EnqueueMessage((MessageCommand)inventory.InventoryType, inventory); } - private void OnSetFilter(BloomFilter filter) - { - bloom_filter = filter; - } - - private void OnVerack() + private void OnVerackMessageReceived() { verack = true; system.TaskManager.Tell(new TaskManager.Register { Version = Version }); CheckMessageQueue(); } - private void OnVersionPayload(VersionPayload version) + private void OnVersionMessageReceived(VersionPayload payload) { - Version = version; - foreach (NodeCapability capability in version.Capabilities) + Version = payload; + foreach (NodeCapability capability in payload.Capabilities) { switch (capability) { @@ -216,12 +504,12 @@ private void OnVersionPayload(VersionPayload version) break; } } - if (version.Nonce == LocalNode.Nonce || version.Magic != ProtocolSettings.Default.Magic) + if (payload.Nonce == LocalNode.Nonce || payload.Magic != ProtocolSettings.Default.Magic) { Disconnect(true); return; } - if (LocalNode.Singleton.RemoteNodes.Values.Where(p => p != this).Any(p => p.Remote.Address.Equals(Remote.Address) && p.Version?.Nonce == version.Nonce)) + if (LocalNode.Singleton.RemoteNodes.Values.Where(p => p != this).Any(p => p.Remote.Address.Equals(Remote.Address) && p.Version?.Nonce == payload.Nonce)) { Disconnect(true); return; @@ -231,6 +519,7 @@ private void OnVersionPayload(VersionPayload version) protected override void PostStop() { + timer.CancelIfNotNull(); LocalNode.Singleton.RemoteNodes.TryRemove(Self, out _); base.PostStop(); } @@ -240,6 +529,17 @@ internal static Props Props(NeoSystem system, object connection, IPEndPoint remo return Akka.Actor.Props.Create(() => new RemoteNode(system, connection, remote, local)).WithMailbox("remote-node-mailbox"); } + private void RefreshPendingKnownHashes() + { + while (pendingKnownHashes.Count > 0) + { + var (_, time) = pendingKnownHashes[0]; + if (DateTime.UtcNow - time <= PendingTimeout) + break; + pendingKnownHashes.RemoveAt(0); + } + } + private void SendMessage(Message message) { ack = false; @@ -263,6 +563,15 @@ private Message TryParseMessage() msg_buffer = msg_buffer.Slice(length).Compact(); return msg; } + + private void UpdateLastBlockIndex(PingPayload payload) + { + if (payload.LastBlockIndex > LastBlockIndex) + { + LastBlockIndex = payload.LastBlockIndex; + system.TaskManager.Tell(new TaskManager.Update { LastBlockIndex = LastBlockIndex }); + } + } } internal class RemoteNodeMailbox : PriorityMailbox @@ -273,13 +582,42 @@ internal protected override bool IsHighPriority(object message) { switch (message) { + case Message msg: + switch (msg.Command) + { + case MessageCommand.Consensus: + case MessageCommand.FilterAdd: + case MessageCommand.FilterClear: + case MessageCommand.FilterLoad: + case MessageCommand.Verack: + case MessageCommand.Version: + case MessageCommand.Alert: + return true; + default: + return false; + } case Tcp.ConnectionClosed _: - case Connection.Timer _: + case Connection.Close _: case Connection.Ack _: return true; default: return false; } } + + internal protected override bool ShallDrop(object message, IEnumerable queue) + { + if (!(message is Message msg)) return false; + switch (msg.Command) + { + case MessageCommand.GetAddr: + case MessageCommand.GetBlocks: + case MessageCommand.GetHeaders: + case MessageCommand.Mempool: + return queue.OfType().Any(p => p.Command == msg.Command); + default: + return false; + } + } } } diff --git a/tests/neo.UnitTests/Network/P2P/UT_ProtocolHandler.cs b/tests/neo.UnitTests/Network/P2P/UT_ProtocolHandler.cs deleted file mode 100644 index 1f62a77580..0000000000 --- a/tests/neo.UnitTests/Network/P2P/UT_ProtocolHandler.cs +++ /dev/null @@ -1,42 +0,0 @@ -using Akka.TestKit.Xunit2; -using Microsoft.VisualStudio.TestTools.UnitTesting; -using Neo.Network.P2P; -using Neo.Network.P2P.Capabilities; -using Neo.Network.P2P.Payloads; - -namespace Neo.UnitTests.Network.P2P -{ - [TestClass] - public class UT_ProtocolHandler : TestKit - { - [TestCleanup] - public void Cleanup() - { - Shutdown(); - } - - [TestMethod] - public void ProtocolHandler_Test_SendVersion_TellParent() - { - var senderProbe = CreateTestProbe(); - var parent = CreateTestProbe(); - var protocolActor = ActorOfAsTestActorRef(() => new ProtocolHandler(TestBlockchain.TheNeoSystem), parent); - - var payload = new VersionPayload() - { - UserAgent = "".PadLeft(1024, '0'), - Nonce = 1, - Magic = 2, - Timestamp = 5, - Version = 6, - Capabilities = new NodeCapability[] - { - new ServerCapability(NodeCapabilityType.TcpServer, 25) - } - }; - - senderProbe.Send(protocolActor, Message.Create(MessageCommand.Version, payload)); - parent.ExpectMsg(); - } - } -} diff --git a/tests/neo.UnitTests/Network/P2P/UT_ProtocolHandlerMailbox.cs b/tests/neo.UnitTests/Network/P2P/UT_ProtocolHandlerMailbox.cs deleted file mode 100644 index f41deead2a..0000000000 --- a/tests/neo.UnitTests/Network/P2P/UT_ProtocolHandlerMailbox.cs +++ /dev/null @@ -1,185 +0,0 @@ -using Akka.TestKit.Xunit2; -using FluentAssertions; -using Microsoft.VisualStudio.TestTools.UnitTesting; -using Neo.IO; -using Neo.Network.P2P; -using System; -using System.Collections.Generic; -using System.Linq; - -namespace Neo.UnitTests.Network.P2P -{ - [TestClass] - public class UT_ProtocolHandlerMailbox : TestKit - { - private static readonly Random TestRandom = new Random(1337); // use fixed seed for guaranteed determinism - - ProtocolHandlerMailbox uut; - - [TestCleanup] - public void Cleanup() - { - Shutdown(); - } - - [TestInitialize] - public void TestSetup() - { - Akka.Actor.ActorSystem system = Sys; - var config = TestKit.DefaultConfig; - var akkaSettings = new Akka.Actor.Settings(system, config); - uut = new ProtocolHandlerMailbox(akkaSettings, config); - } - - [TestMethod] - public void ProtocolHandlerMailbox_Test_IsHighPriority() - { - ISerializable s = null; - - //handshaking - uut.IsHighPriority(Message.Create(MessageCommand.Version, s)).Should().Be(true); - uut.IsHighPriority(Message.Create(MessageCommand.Verack, s)).Should().Be(true); - - //connectivity - uut.IsHighPriority(Message.Create(MessageCommand.GetAddr, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.Addr, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.Ping, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.Pong, s)).Should().Be(false); - - //synchronization - uut.IsHighPriority(Message.Create(MessageCommand.GetHeaders, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.Headers, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.GetBlocks, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.Mempool, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.Inv, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.GetData, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.NotFound, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.Transaction, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.Block, s)).Should().Be(false); - uut.IsHighPriority(Message.Create(MessageCommand.Consensus, s)).Should().Be(true); - uut.IsHighPriority(Message.Create(MessageCommand.Reject, s)).Should().Be(false); - - //SPV protocol - uut.IsHighPriority(Message.Create(MessageCommand.FilterLoad, s)).Should().Be(true); - uut.IsHighPriority(Message.Create(MessageCommand.FilterAdd, s)).Should().Be(true); - uut.IsHighPriority(Message.Create(MessageCommand.FilterClear, s)).Should().Be(true); - uut.IsHighPriority(Message.Create(MessageCommand.MerkleBlock, s)).Should().Be(false); - - //others - uut.IsHighPriority(Message.Create(MessageCommand.Alert, s)).Should().Be(true); - - // any random object (non Message) should not have priority - object obj = null; - uut.IsHighPriority(obj).Should().Be(false); - } - - - [TestMethod] - public void ProtocolHandlerMailbox_Test_ShallDrop() - { - // using this for messages - ISerializable s = null; - Message msg = null; // multiple uses - // empty queue - IEnumerable emptyQueue = Enumerable.Empty(); - - // any random object (non Message) should be dropped - object obj = null; - uut.ShallDrop(obj, emptyQueue).Should().Be(true); - - //handshaking - // Version (no drop) - msg = Message.Create(MessageCommand.Version, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // Verack (no drop) - msg = Message.Create(MessageCommand.Verack, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - - //connectivity - // GetAddr (drop) - msg = Message.Create(MessageCommand.GetAddr, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(true); - // Addr (no drop) - msg = Message.Create(MessageCommand.Addr, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // Ping (no drop) - msg = Message.Create(MessageCommand.Ping, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // Pong (no drop) - msg = Message.Create(MessageCommand.Pong, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - - //synchronization - // GetHeaders (drop) - msg = Message.Create(MessageCommand.GetHeaders, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(true); - // Headers (no drop) - msg = Message.Create(MessageCommand.Headers, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // GetBlocks (drop) - msg = Message.Create(MessageCommand.GetBlocks, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(true); - // Mempool (drop) - msg = Message.Create(MessageCommand.Mempool, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(true); - // Inv (no drop) - msg = Message.Create(MessageCommand.Inv, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // NotFound (no drop) - msg = Message.Create(MessageCommand.NotFound, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // Transaction (no drop) - msg = Message.Create(MessageCommand.Transaction, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // Block (no drop) - msg = Message.Create(MessageCommand.Block, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // Consensus (no drop) - msg = Message.Create(MessageCommand.Consensus, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // Reject (no drop) - msg = Message.Create(MessageCommand.Reject, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - - //SPV protocol - // FilterLoad (no drop) - msg = Message.Create(MessageCommand.FilterLoad, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // FilterAdd (no drop) - msg = Message.Create(MessageCommand.FilterAdd, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // FilterClear (no drop) - msg = Message.Create(MessageCommand.FilterClear, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - // MerkleBlock (no drop) - msg = Message.Create(MessageCommand.MerkleBlock, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - - //others - // Alert (no drop) - msg = Message.Create(MessageCommand.Alert, s); - uut.ShallDrop(msg, emptyQueue).Should().Be(false); - uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); - } - } -} diff --git a/tests/neo.UnitTests/Network/P2P/UT_RemoteNode.cs b/tests/neo.UnitTests/Network/P2P/UT_RemoteNode.cs index a700902f6c..c1cbbf6cac 100644 --- a/tests/neo.UnitTests/Network/P2P/UT_RemoteNode.cs +++ b/tests/neo.UnitTests/Network/P2P/UT_RemoteNode.cs @@ -15,8 +15,7 @@ public class UT_RemoteNode : TestKit private static NeoSystem testBlockchain; public UT_RemoteNode() - : base($"remote-node-mailbox {{ mailbox-type: \"{typeof(RemoteNodeMailbox).AssemblyQualifiedName}\" }}" + - $"protocol-handler-mailbox {{ mailbox-type: \"{typeof(ProtocolHandlerMailbox).AssemblyQualifiedName}\" }}") + : base($"remote-node-mailbox {{ mailbox-type: \"{typeof(RemoteNodeMailbox).AssemblyQualifiedName}\" }}") { } diff --git a/tests/neo.UnitTests/Network/P2P/UT_RemoteNodeMailbox.cs b/tests/neo.UnitTests/Network/P2P/UT_RemoteNodeMailbox.cs index b57e9e93a7..c5dccec89f 100644 --- a/tests/neo.UnitTests/Network/P2P/UT_RemoteNodeMailbox.cs +++ b/tests/neo.UnitTests/Network/P2P/UT_RemoteNodeMailbox.cs @@ -2,8 +2,11 @@ using Akka.TestKit.Xunit2; using FluentAssertions; using Microsoft.VisualStudio.TestTools.UnitTesting; +using Neo.IO; using Neo.Network.P2P; using System; +using System.Collections.Generic; +using System.Linq; namespace Neo.UnitTests.Network.P2P { @@ -33,14 +36,155 @@ public void TestSetup() [TestMethod] public void RemoteNode_Test_IsHighPriority() { + ISerializable s = null; + + //handshaking + uut.IsHighPriority(Message.Create(MessageCommand.Version, s)).Should().Be(true); + uut.IsHighPriority(Message.Create(MessageCommand.Verack, s)).Should().Be(true); + + //connectivity + uut.IsHighPriority(Message.Create(MessageCommand.GetAddr, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.Addr, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.Ping, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.Pong, s)).Should().Be(false); + + //synchronization + uut.IsHighPriority(Message.Create(MessageCommand.GetHeaders, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.Headers, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.GetBlocks, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.Mempool, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.Inv, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.GetData, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.NotFound, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.Transaction, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.Block, s)).Should().Be(false); + uut.IsHighPriority(Message.Create(MessageCommand.Consensus, s)).Should().Be(true); + uut.IsHighPriority(Message.Create(MessageCommand.Reject, s)).Should().Be(false); + + //SPV protocol + uut.IsHighPriority(Message.Create(MessageCommand.FilterLoad, s)).Should().Be(true); + uut.IsHighPriority(Message.Create(MessageCommand.FilterAdd, s)).Should().Be(true); + uut.IsHighPriority(Message.Create(MessageCommand.FilterClear, s)).Should().Be(true); + uut.IsHighPriority(Message.Create(MessageCommand.MerkleBlock, s)).Should().Be(false); + + //others + uut.IsHighPriority(Message.Create(MessageCommand.Alert, s)).Should().Be(true); + // high priority commands uut.IsHighPriority(new Tcp.ConnectionClosed()).Should().Be(true); - uut.IsHighPriority(new Connection.Timer()).Should().Be(true); + uut.IsHighPriority(new Connection.Close()).Should().Be(true); uut.IsHighPriority(new Connection.Ack()).Should().Be(true); // any random object should not have priority object obj = null; uut.IsHighPriority(obj).Should().Be(false); } + + public void ProtocolHandlerMailbox_Test_ShallDrop() + { + // using this for messages + ISerializable s = null; + Message msg; // multiple uses + // empty queue + IEnumerable emptyQueue = Enumerable.Empty(); + + // any random object (non Message) should be dropped + object obj = null; + uut.ShallDrop(obj, emptyQueue).Should().Be(true); + + //handshaking + // Version (no drop) + msg = Message.Create(MessageCommand.Version, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // Verack (no drop) + msg = Message.Create(MessageCommand.Verack, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + + //connectivity + // GetAddr (drop) + msg = Message.Create(MessageCommand.GetAddr, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(true); + // Addr (no drop) + msg = Message.Create(MessageCommand.Addr, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // Ping (no drop) + msg = Message.Create(MessageCommand.Ping, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // Pong (no drop) + msg = Message.Create(MessageCommand.Pong, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + + //synchronization + // GetHeaders (drop) + msg = Message.Create(MessageCommand.GetHeaders, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(true); + // Headers (no drop) + msg = Message.Create(MessageCommand.Headers, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // GetBlocks (drop) + msg = Message.Create(MessageCommand.GetBlocks, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(true); + // Mempool (drop) + msg = Message.Create(MessageCommand.Mempool, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(true); + // Inv (no drop) + msg = Message.Create(MessageCommand.Inv, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // NotFound (no drop) + msg = Message.Create(MessageCommand.NotFound, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // Transaction (no drop) + msg = Message.Create(MessageCommand.Transaction, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // Block (no drop) + msg = Message.Create(MessageCommand.Block, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // Consensus (no drop) + msg = Message.Create(MessageCommand.Consensus, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // Reject (no drop) + msg = Message.Create(MessageCommand.Reject, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + + //SPV protocol + // FilterLoad (no drop) + msg = Message.Create(MessageCommand.FilterLoad, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // FilterAdd (no drop) + msg = Message.Create(MessageCommand.FilterAdd, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // FilterClear (no drop) + msg = Message.Create(MessageCommand.FilterClear, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + // MerkleBlock (no drop) + msg = Message.Create(MessageCommand.MerkleBlock, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + + //others + // Alert (no drop) + msg = Message.Create(MessageCommand.Alert, s); + uut.ShallDrop(msg, emptyQueue).Should().Be(false); + uut.ShallDrop(msg, new object[] { msg }).Should().Be(false); + } } } From f4c4fb1b965f37ce6a3e05e4c57a68e2d92a2469 Mon Sep 17 00:00:00 2001 From: erikzhang Date: Tue, 31 Mar 2020 16:32:38 +0800 Subject: [PATCH 2/3] Fix UT --- tests/neo.UnitTests/Network/P2P/UT_RemoteNode.cs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tests/neo.UnitTests/Network/P2P/UT_RemoteNode.cs b/tests/neo.UnitTests/Network/P2P/UT_RemoteNode.cs index c1cbbf6cac..943d45bf58 100644 --- a/tests/neo.UnitTests/Network/P2P/UT_RemoteNode.cs +++ b/tests/neo.UnitTests/Network/P2P/UT_RemoteNode.cs @@ -2,6 +2,7 @@ using Akka.TestKit.Xunit2; using FluentAssertions; using Microsoft.VisualStudio.TestTools.UnitTesting; +using Neo.IO; using Neo.Network.P2P; using Neo.Network.P2P.Capabilities; using Neo.Network.P2P.Payloads; @@ -33,7 +34,7 @@ public void RemoteNode_Test_Abort_DifferentMagic() connectionTestProbe.ExpectMsg(); - var payload = new VersionPayload() + var msg = Message.Create(MessageCommand.Version, new VersionPayload { UserAgent = "".PadLeft(1024, '0'), Nonce = 1, @@ -44,10 +45,10 @@ public void RemoteNode_Test_Abort_DifferentMagic() { new ServerCapability(NodeCapabilityType.TcpServer, 25) } - }; + }); var testProbe = CreateTestProbe(); - testProbe.Send(remoteNodeActor, payload); + testProbe.Send(remoteNodeActor, new Tcp.Received((ByteString)msg.ToArray())); connectionTestProbe.ExpectMsg(); } @@ -60,7 +61,7 @@ public void RemoteNode_Test_Accept_IfSameMagic() connectionTestProbe.ExpectMsg(); - var payload = new VersionPayload() + var msg = Message.Create(MessageCommand.Version, new VersionPayload() { UserAgent = "Unit Test".PadLeft(1024, '0'), Nonce = 1, @@ -71,10 +72,10 @@ public void RemoteNode_Test_Accept_IfSameMagic() { new ServerCapability(NodeCapabilityType.TcpServer, 25) } - }; + }); var testProbe = CreateTestProbe(); - testProbe.Send(remoteNodeActor, payload); + testProbe.Send(remoteNodeActor, new Tcp.Received((ByteString)msg.ToArray())); var verackMessage = connectionTestProbe.ExpectMsg(); From 6bf9b70ca733d083d9af0fefaa7e6c60a7a78c63 Mon Sep 17 00:00:00 2001 From: erikzhang Date: Wed, 1 Apr 2020 15:37:18 +0800 Subject: [PATCH 3/3] Split into two files --- .../Network/P2P/RemoteNode.ProtocolHandler.cs | 389 ++++++++++++++++++ src/neo/Network/P2P/RemoteNode.cs | 382 +---------------- 2 files changed, 390 insertions(+), 381 deletions(-) create mode 100644 src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs diff --git a/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs b/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs new file mode 100644 index 0000000000..91b3921326 --- /dev/null +++ b/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs @@ -0,0 +1,389 @@ +using Akka.Actor; +using Neo.Cryptography; +using Neo.IO.Caching; +using Neo.Ledger; +using Neo.Network.P2P.Capabilities; +using Neo.Network.P2P.Payloads; +using Neo.Persistence; +using Neo.Plugins; +using System; +using System.Collections; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Linq; +using System.Net; + +namespace Neo.Network.P2P +{ + partial class RemoteNode + { + private class Timer { } + private class PendingKnownHashesCollection : KeyedCollection + { + protected override UInt256 GetKeyForItem((UInt256, DateTime) item) + { + return item.Item1; + } + } + + private readonly PendingKnownHashesCollection pendingKnownHashes = new PendingKnownHashesCollection(); + private readonly HashSetCache knownHashes = new HashSetCache(Blockchain.Singleton.MemPool.Capacity * 2 / 5); + private readonly HashSetCache sentHashes = new HashSetCache(Blockchain.Singleton.MemPool.Capacity * 2 / 5); + private bool verack = false; + private BloomFilter bloom_filter; + + private static readonly TimeSpan TimerInterval = TimeSpan.FromSeconds(30); + private static readonly TimeSpan PendingTimeout = TimeSpan.FromMinutes(1); + + private readonly ICancelable timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimerInterval, TimerInterval, Context.Self, new Timer(), ActorRefs.NoSender); + + private void OnMessage(Message msg) + { + foreach (IP2PPlugin plugin in Plugin.P2PPlugins) + if (!plugin.OnP2PMessage(msg)) + return; + if (Version == null) + { + if (msg.Command != MessageCommand.Version) + throw new ProtocolViolationException(); + OnVersionMessageReceived((VersionPayload)msg.Payload); + return; + } + if (!verack) + { + if (msg.Command != MessageCommand.Verack) + throw new ProtocolViolationException(); + OnVerackMessageReceived(); + return; + } + switch (msg.Command) + { + case MessageCommand.Addr: + OnAddrMessageReceived((AddrPayload)msg.Payload); + break; + case MessageCommand.Block: + OnInventoryReceived((Block)msg.Payload); + break; + case MessageCommand.Consensus: + OnInventoryReceived((ConsensusPayload)msg.Payload); + break; + case MessageCommand.FilterAdd: + OnFilterAddMessageReceived((FilterAddPayload)msg.Payload); + break; + case MessageCommand.FilterClear: + OnFilterClearMessageReceived(); + break; + case MessageCommand.FilterLoad: + OnFilterLoadMessageReceived((FilterLoadPayload)msg.Payload); + break; + case MessageCommand.GetAddr: + OnGetAddrMessageReceived(); + break; + case MessageCommand.GetBlocks: + OnGetBlocksMessageReceived((GetBlocksPayload)msg.Payload); + break; + case MessageCommand.GetBlockData: + OnGetBlockDataMessageReceived((GetBlockDataPayload)msg.Payload); + break; + case MessageCommand.GetData: + OnGetDataMessageReceived((InvPayload)msg.Payload); + break; + case MessageCommand.GetHeaders: + OnGetHeadersMessageReceived((GetBlocksPayload)msg.Payload); + break; + case MessageCommand.Headers: + OnHeadersMessageReceived((HeadersPayload)msg.Payload); + break; + case MessageCommand.Inv: + OnInvMessageReceived((InvPayload)msg.Payload); + break; + case MessageCommand.Mempool: + OnMemPoolMessageReceived(); + break; + case MessageCommand.Ping: + OnPingMessageReceived((PingPayload)msg.Payload); + break; + case MessageCommand.Pong: + OnPongMessageReceived((PingPayload)msg.Payload); + break; + case MessageCommand.Transaction: + if (msg.Payload.Size <= Transaction.MaxTransactionSize) + OnInventoryReceived((Transaction)msg.Payload); + break; + case MessageCommand.Verack: + case MessageCommand.Version: + throw new ProtocolViolationException(); + case MessageCommand.Alert: + case MessageCommand.MerkleBlock: + case MessageCommand.NotFound: + case MessageCommand.Reject: + default: break; + } + } + + private void OnAddrMessageReceived(AddrPayload payload) + { + system.LocalNode.Tell(new Peer.Peers + { + EndPoints = payload.AddressList.Select(p => p.EndPoint).Where(p => p.Port > 0) + }); + } + + private void OnFilterAddMessageReceived(FilterAddPayload payload) + { + bloom_filter?.Add(payload.Data); + } + + private void OnFilterClearMessageReceived() + { + bloom_filter = null; + } + + private void OnFilterLoadMessageReceived(FilterLoadPayload payload) + { + bloom_filter = new BloomFilter(payload.Filter.Length * 8, payload.K, payload.Tweak, payload.Filter); + } + + /// + /// Will be triggered when a MessageCommand.GetAddr message is received. + /// Randomly select nodes from the local RemoteNodes and tells to RemoteNode actors a MessageCommand.Addr message. + /// The message contains a list of networkAddresses from those selected random peers. + /// + private void OnGetAddrMessageReceived() + { + Random rand = new Random(); + IEnumerable peers = LocalNode.Singleton.RemoteNodes.Values + .Where(p => p.ListenerTcpPort > 0) + .GroupBy(p => p.Remote.Address, (k, g) => g.First()) + .OrderBy(p => rand.Next()) + .Take(AddrPayload.MaxCountToSend); + NetworkAddressWithTime[] networkAddresses = peers.Select(p => NetworkAddressWithTime.Create(p.Listener.Address, p.Version.Timestamp, p.Version.Capabilities)).ToArray(); + if (networkAddresses.Length == 0) return; + EnqueueMessage(Message.Create(MessageCommand.Addr, AddrPayload.Create(networkAddresses))); + } + + /// + /// Will be triggered when a MessageCommand.GetBlocks message is received. + /// Tell the specified number of blocks' hashes starting with the requested HashStart until payload.Count or MaxHashesCount + /// Responses are sent to RemoteNode actor as MessageCommand.Inv Message. + /// + /// A GetBlocksPayload including start block Hash and number of blocks requested. + private void OnGetBlocksMessageReceived(GetBlocksPayload payload) + { + UInt256 hash = payload.HashStart; + // The default value of payload.Count is -1 + int count = payload.Count < 0 || payload.Count > InvPayload.MaxHashesCount ? InvPayload.MaxHashesCount : payload.Count; + TrimmedBlock state = Blockchain.Singleton.View.Blocks.TryGet(hash); + if (state == null) return; + List hashes = new List(); + for (uint i = 1; i <= count; i++) + { + uint index = state.Index + i; + if (index > Blockchain.Singleton.Height) + break; + hash = Blockchain.Singleton.GetBlockHash(index); + if (hash == null) break; + hashes.Add(hash); + } + if (hashes.Count == 0) return; + EnqueueMessage(Message.Create(MessageCommand.Inv, InvPayload.Create(InventoryType.Block, hashes.ToArray()))); + } + + private void OnGetBlockDataMessageReceived(GetBlockDataPayload payload) + { + for (uint i = payload.IndexStart, max = payload.IndexStart + payload.Count; i < max; i++) + { + Block block = Blockchain.Singleton.GetBlock(i); + if (block == null) + break; + + if (bloom_filter == null) + { + EnqueueMessage(Message.Create(MessageCommand.Block, block)); + } + else + { + BitArray flags = new BitArray(block.Transactions.Select(p => bloom_filter.Test(p)).ToArray()); + EnqueueMessage(Message.Create(MessageCommand.MerkleBlock, MerkleBlockPayload.Create(block, flags))); + } + } + } + + /// + /// Will be triggered when a MessageCommand.GetData message is received. + /// The payload includes an array of hash values. + /// For different payload.Type (Tx, Block, Consensus), get the corresponding (Txs, Blocks, Consensus) and tell them to RemoteNode actor. + /// + /// The payload containing the requested information. + private void OnGetDataMessageReceived(InvPayload payload) + { + UInt256[] hashes = payload.Hashes.Where(p => sentHashes.Add(p)).ToArray(); + foreach (UInt256 hash in hashes) + { + switch (payload.Type) + { + case InventoryType.TX: + Transaction tx = Blockchain.Singleton.GetTransaction(hash); + if (tx != null) + EnqueueMessage(Message.Create(MessageCommand.Transaction, tx)); + break; + case InventoryType.Block: + Block block = Blockchain.Singleton.GetBlock(hash); + if (block != null) + { + if (bloom_filter == null) + { + EnqueueMessage(Message.Create(MessageCommand.Block, block)); + } + else + { + BitArray flags = new BitArray(block.Transactions.Select(p => bloom_filter.Test(p)).ToArray()); + EnqueueMessage(Message.Create(MessageCommand.MerkleBlock, MerkleBlockPayload.Create(block, flags))); + } + } + break; + case InventoryType.Consensus: + if (Blockchain.Singleton.ConsensusRelayCache.TryGet(hash, out IInventory inventoryConsensus)) + EnqueueMessage(Message.Create(MessageCommand.Consensus, inventoryConsensus)); + break; + } + } + } + + /// + /// Will be triggered when a MessageCommand.GetHeaders message is received. + /// Tell the specified number of blocks' headers starting with the requested HashStart to RemoteNode actor. + /// A limit set by HeadersPayload.MaxHeadersCount is also applied to the number of requested Headers, namely payload.Count. + /// + /// A GetBlocksPayload including start block Hash and number of blocks' headers requested. + private void OnGetHeadersMessageReceived(GetBlocksPayload payload) + { + UInt256 hash = payload.HashStart; + int count = payload.Count < 0 || payload.Count > HeadersPayload.MaxHeadersCount ? HeadersPayload.MaxHeadersCount : payload.Count; + DataCache cache = Blockchain.Singleton.View.Blocks; + TrimmedBlock state = cache.TryGet(hash); + if (state == null) return; + List
headers = new List
(); + for (uint i = 1; i <= count; i++) + { + uint index = state.Index + i; + hash = Blockchain.Singleton.GetBlockHash(index); + if (hash == null) break; + Header header = cache.TryGet(hash)?.Header; + if (header == null) break; + headers.Add(header); + } + if (headers.Count == 0) return; + EnqueueMessage(Message.Create(MessageCommand.Headers, HeadersPayload.Create(headers.ToArray()))); + } + + private void OnHeadersMessageReceived(HeadersPayload payload) + { + if (payload.Headers.Length == 0) return; + system.Blockchain.Tell(payload.Headers); + } + + private void OnInventoryReceived(IInventory inventory) + { + system.TaskManager.Tell(new TaskManager.TaskCompleted { Hash = inventory.Hash }); + system.LocalNode.Tell(new LocalNode.Relay { Inventory = inventory }); + pendingKnownHashes.Remove(inventory.Hash); + knownHashes.Add(inventory.Hash); + } + + private void OnInvMessageReceived(InvPayload payload) + { + UInt256[] hashes = payload.Hashes.Where(p => !pendingKnownHashes.Contains(p) && !knownHashes.Contains(p) && !sentHashes.Contains(p)).ToArray(); + if (hashes.Length == 0) return; + switch (payload.Type) + { + case InventoryType.Block: + using (SnapshotView snapshot = Blockchain.Singleton.GetSnapshot()) + hashes = hashes.Where(p => !snapshot.ContainsBlock(p)).ToArray(); + break; + case InventoryType.TX: + using (SnapshotView snapshot = Blockchain.Singleton.GetSnapshot()) + hashes = hashes.Where(p => !snapshot.ContainsTransaction(p)).ToArray(); + break; + } + if (hashes.Length == 0) return; + foreach (UInt256 hash in hashes) + pendingKnownHashes.Add((hash, DateTime.UtcNow)); + system.TaskManager.Tell(new TaskManager.NewTasks { Payload = InvPayload.Create(payload.Type, hashes) }); + } + + private void OnMemPoolMessageReceived() + { + foreach (InvPayload payload in InvPayload.CreateGroup(InventoryType.TX, Blockchain.Singleton.MemPool.GetVerifiedTransactions().Select(p => p.Hash).ToArray())) + EnqueueMessage(Message.Create(MessageCommand.Inv, payload)); + } + + private void OnPingMessageReceived(PingPayload payload) + { + UpdateLastBlockIndex(payload); + EnqueueMessage(Message.Create(MessageCommand.Pong, PingPayload.Create(Blockchain.Singleton.Height, payload.Nonce))); + } + + private void OnPongMessageReceived(PingPayload payload) + { + UpdateLastBlockIndex(payload); + } + + private void OnVerackMessageReceived() + { + verack = true; + system.TaskManager.Tell(new TaskManager.Register { Version = Version }); + CheckMessageQueue(); + } + + private void OnVersionMessageReceived(VersionPayload payload) + { + Version = payload; + foreach (NodeCapability capability in payload.Capabilities) + { + switch (capability) + { + case FullNodeCapability fullNodeCapability: + IsFullNode = true; + LastBlockIndex = fullNodeCapability.StartHeight; + break; + case ServerCapability serverCapability: + if (serverCapability.Type == NodeCapabilityType.TcpServer) + ListenerTcpPort = serverCapability.Port; + break; + } + } + if (payload.Nonce == LocalNode.Nonce || payload.Magic != ProtocolSettings.Default.Magic) + { + Disconnect(true); + return; + } + if (LocalNode.Singleton.RemoteNodes.Values.Where(p => p != this).Any(p => p.Remote.Address.Equals(Remote.Address) && p.Version?.Nonce == payload.Nonce)) + { + Disconnect(true); + return; + } + SendMessage(Message.Create(MessageCommand.Verack)); + } + + private void RefreshPendingKnownHashes() + { + while (pendingKnownHashes.Count > 0) + { + var (_, time) = pendingKnownHashes[0]; + if (DateTime.UtcNow - time <= PendingTimeout) + break; + pendingKnownHashes.RemoveAt(0); + } + } + + private void UpdateLastBlockIndex(PingPayload payload) + { + if (payload.LastBlockIndex > LastBlockIndex) + { + LastBlockIndex = payload.LastBlockIndex; + system.TaskManager.Tell(new TaskManager.Update { LastBlockIndex = LastBlockIndex }); + } + } + } +} diff --git a/src/neo/Network/P2P/RemoteNode.cs b/src/neo/Network/P2P/RemoteNode.cs index 696a54ab36..88b58163bf 100644 --- a/src/neo/Network/P2P/RemoteNode.cs +++ b/src/neo/Network/P2P/RemoteNode.cs @@ -4,47 +4,25 @@ using Neo.Cryptography; using Neo.IO; using Neo.IO.Actors; -using Neo.IO.Caching; using Neo.Ledger; using Neo.Network.P2P.Capabilities; using Neo.Network.P2P.Payloads; -using Neo.Persistence; -using Neo.Plugins; -using System; using System.Collections; using System.Collections.Generic; -using System.Collections.ObjectModel; using System.Linq; using System.Net; namespace Neo.Network.P2P { - public class RemoteNode : Connection + public partial class RemoteNode : Connection { internal class Relay { public IInventory Inventory; } - private class Timer { } - private class PendingKnownHashesCollection : KeyedCollection - { - protected override UInt256 GetKeyForItem((UInt256, DateTime) item) - { - return item.Item1; - } - } - - private static readonly TimeSpan TimerInterval = TimeSpan.FromSeconds(30); - private static readonly TimeSpan PendingTimeout = TimeSpan.FromMinutes(1); private readonly NeoSystem system; - private readonly ICancelable timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimerInterval, TimerInterval, Context.Self, new Timer(), ActorRefs.NoSender); private readonly Queue message_queue_high = new Queue(); private readonly Queue message_queue_low = new Queue(); private ByteString msg_buffer = ByteString.Empty; - private readonly PendingKnownHashesCollection pendingKnownHashes = new PendingKnownHashesCollection(); - private readonly HashSetCache knownHashes = new HashSetCache(Blockchain.Singleton.MemPool.Capacity * 2 / 5); - private readonly HashSetCache sentHashes = new HashSetCache(Blockchain.Singleton.MemPool.Capacity * 2 / 5); private bool ack = true; - private bool verack = false; - private BloomFilter bloom_filter; public IPEndPoint Listener => new IPEndPoint(Remote.Address, ListenerTcpPort); public int ListenerTcpPort { get; private set; } = 0; @@ -138,14 +116,6 @@ protected override void OnAck() CheckMessageQueue(); } - private void OnAddrMessageReceived(AddrPayload payload) - { - system.LocalNode.Tell(new Peer.Peers - { - EndPoints = payload.AddressList.Select(p => p.EndPoint).Where(p => p.Port > 0) - }); - } - protected override void OnData(ByteString data) { msg_buffer = msg_buffer.Concat(data); @@ -154,290 +124,6 @@ protected override void OnData(ByteString data) OnMessage(message); } - private void OnFilterAddMessageReceived(FilterAddPayload payload) - { - bloom_filter?.Add(payload.Data); - } - - private void OnFilterClearMessageReceived() - { - bloom_filter = null; - } - - private void OnFilterLoadMessageReceived(FilterLoadPayload payload) - { - bloom_filter = new BloomFilter(payload.Filter.Length * 8, payload.K, payload.Tweak, payload.Filter); - } - - /// - /// Will be triggered when a MessageCommand.GetAddr message is received. - /// Randomly select nodes from the local RemoteNodes and tells to RemoteNode actors a MessageCommand.Addr message. - /// The message contains a list of networkAddresses from those selected random peers. - /// - private void OnGetAddrMessageReceived() - { - Random rand = new Random(); - IEnumerable peers = LocalNode.Singleton.RemoteNodes.Values - .Where(p => p.ListenerTcpPort > 0) - .GroupBy(p => p.Remote.Address, (k, g) => g.First()) - .OrderBy(p => rand.Next()) - .Take(AddrPayload.MaxCountToSend); - NetworkAddressWithTime[] networkAddresses = peers.Select(p => NetworkAddressWithTime.Create(p.Listener.Address, p.Version.Timestamp, p.Version.Capabilities)).ToArray(); - if (networkAddresses.Length == 0) return; - EnqueueMessage(Message.Create(MessageCommand.Addr, AddrPayload.Create(networkAddresses))); - } - - /// - /// Will be triggered when a MessageCommand.GetBlocks message is received. - /// Tell the specified number of blocks' hashes starting with the requested HashStart until payload.Count or MaxHashesCount - /// Responses are sent to RemoteNode actor as MessageCommand.Inv Message. - /// - /// A GetBlocksPayload including start block Hash and number of blocks requested. - private void OnGetBlocksMessageReceived(GetBlocksPayload payload) - { - UInt256 hash = payload.HashStart; - // The default value of payload.Count is -1 - int count = payload.Count < 0 || payload.Count > InvPayload.MaxHashesCount ? InvPayload.MaxHashesCount : payload.Count; - TrimmedBlock state = Blockchain.Singleton.View.Blocks.TryGet(hash); - if (state == null) return; - List hashes = new List(); - for (uint i = 1; i <= count; i++) - { - uint index = state.Index + i; - if (index > Blockchain.Singleton.Height) - break; - hash = Blockchain.Singleton.GetBlockHash(index); - if (hash == null) break; - hashes.Add(hash); - } - if (hashes.Count == 0) return; - EnqueueMessage(Message.Create(MessageCommand.Inv, InvPayload.Create(InventoryType.Block, hashes.ToArray()))); - } - - private void OnGetBlockDataMessageReceived(GetBlockDataPayload payload) - { - for (uint i = payload.IndexStart, max = payload.IndexStart + payload.Count; i < max; i++) - { - Block block = Blockchain.Singleton.GetBlock(i); - if (block == null) - break; - - if (bloom_filter == null) - { - EnqueueMessage(Message.Create(MessageCommand.Block, block)); - } - else - { - BitArray flags = new BitArray(block.Transactions.Select(p => bloom_filter.Test(p)).ToArray()); - EnqueueMessage(Message.Create(MessageCommand.MerkleBlock, MerkleBlockPayload.Create(block, flags))); - } - } - } - - /// - /// Will be triggered when a MessageCommand.GetData message is received. - /// The payload includes an array of hash values. - /// For different payload.Type (Tx, Block, Consensus), get the corresponding (Txs, Blocks, Consensus) and tell them to RemoteNode actor. - /// - /// The payload containing the requested information. - private void OnGetDataMessageReceived(InvPayload payload) - { - UInt256[] hashes = payload.Hashes.Where(p => sentHashes.Add(p)).ToArray(); - foreach (UInt256 hash in hashes) - { - switch (payload.Type) - { - case InventoryType.TX: - Transaction tx = Blockchain.Singleton.GetTransaction(hash); - if (tx != null) - EnqueueMessage(Message.Create(MessageCommand.Transaction, tx)); - break; - case InventoryType.Block: - Block block = Blockchain.Singleton.GetBlock(hash); - if (block != null) - { - if (bloom_filter == null) - { - EnqueueMessage(Message.Create(MessageCommand.Block, block)); - } - else - { - BitArray flags = new BitArray(block.Transactions.Select(p => bloom_filter.Test(p)).ToArray()); - EnqueueMessage(Message.Create(MessageCommand.MerkleBlock, MerkleBlockPayload.Create(block, flags))); - } - } - break; - case InventoryType.Consensus: - if (Blockchain.Singleton.ConsensusRelayCache.TryGet(hash, out IInventory inventoryConsensus)) - EnqueueMessage(Message.Create(MessageCommand.Consensus, inventoryConsensus)); - break; - } - } - } - - /// - /// Will be triggered when a MessageCommand.GetHeaders message is received. - /// Tell the specified number of blocks' headers starting with the requested HashStart to RemoteNode actor. - /// A limit set by HeadersPayload.MaxHeadersCount is also applied to the number of requested Headers, namely payload.Count. - /// - /// A GetBlocksPayload including start block Hash and number of blocks' headers requested. - private void OnGetHeadersMessageReceived(GetBlocksPayload payload) - { - UInt256 hash = payload.HashStart; - int count = payload.Count < 0 || payload.Count > HeadersPayload.MaxHeadersCount ? HeadersPayload.MaxHeadersCount : payload.Count; - DataCache cache = Blockchain.Singleton.View.Blocks; - TrimmedBlock state = cache.TryGet(hash); - if (state == null) return; - List
headers = new List
(); - for (uint i = 1; i <= count; i++) - { - uint index = state.Index + i; - hash = Blockchain.Singleton.GetBlockHash(index); - if (hash == null) break; - Header header = cache.TryGet(hash)?.Header; - if (header == null) break; - headers.Add(header); - } - if (headers.Count == 0) return; - EnqueueMessage(Message.Create(MessageCommand.Headers, HeadersPayload.Create(headers.ToArray()))); - } - - private void OnHeadersMessageReceived(HeadersPayload payload) - { - if (payload.Headers.Length == 0) return; - system.Blockchain.Tell(payload.Headers); - } - - private void OnInventoryReceived(IInventory inventory) - { - system.TaskManager.Tell(new TaskManager.TaskCompleted { Hash = inventory.Hash }); - system.LocalNode.Tell(new LocalNode.Relay { Inventory = inventory }); - pendingKnownHashes.Remove(inventory.Hash); - knownHashes.Add(inventory.Hash); - } - - private void OnInvMessageReceived(InvPayload payload) - { - UInt256[] hashes = payload.Hashes.Where(p => !pendingKnownHashes.Contains(p) && !knownHashes.Contains(p) && !sentHashes.Contains(p)).ToArray(); - if (hashes.Length == 0) return; - switch (payload.Type) - { - case InventoryType.Block: - using (SnapshotView snapshot = Blockchain.Singleton.GetSnapshot()) - hashes = hashes.Where(p => !snapshot.ContainsBlock(p)).ToArray(); - break; - case InventoryType.TX: - using (SnapshotView snapshot = Blockchain.Singleton.GetSnapshot()) - hashes = hashes.Where(p => !snapshot.ContainsTransaction(p)).ToArray(); - break; - } - if (hashes.Length == 0) return; - foreach (UInt256 hash in hashes) - pendingKnownHashes.Add((hash, DateTime.UtcNow)); - system.TaskManager.Tell(new TaskManager.NewTasks { Payload = InvPayload.Create(payload.Type, hashes) }); - } - - private void OnMemPoolMessageReceived() - { - foreach (InvPayload payload in InvPayload.CreateGroup(InventoryType.TX, Blockchain.Singleton.MemPool.GetVerifiedTransactions().Select(p => p.Hash).ToArray())) - EnqueueMessage(Message.Create(MessageCommand.Inv, payload)); - } - - private void OnMessage(Message msg) - { - foreach (IP2PPlugin plugin in Plugin.P2PPlugins) - if (!plugin.OnP2PMessage(msg)) - return; - if (Version == null) - { - if (msg.Command != MessageCommand.Version) - throw new ProtocolViolationException(); - OnVersionMessageReceived((VersionPayload)msg.Payload); - return; - } - if (!verack) - { - if (msg.Command != MessageCommand.Verack) - throw new ProtocolViolationException(); - OnVerackMessageReceived(); - return; - } - switch (msg.Command) - { - case MessageCommand.Addr: - OnAddrMessageReceived((AddrPayload)msg.Payload); - break; - case MessageCommand.Block: - OnInventoryReceived((Block)msg.Payload); - break; - case MessageCommand.Consensus: - OnInventoryReceived((ConsensusPayload)msg.Payload); - break; - case MessageCommand.FilterAdd: - OnFilterAddMessageReceived((FilterAddPayload)msg.Payload); - break; - case MessageCommand.FilterClear: - OnFilterClearMessageReceived(); - break; - case MessageCommand.FilterLoad: - OnFilterLoadMessageReceived((FilterLoadPayload)msg.Payload); - break; - case MessageCommand.GetAddr: - OnGetAddrMessageReceived(); - break; - case MessageCommand.GetBlocks: - OnGetBlocksMessageReceived((GetBlocksPayload)msg.Payload); - break; - case MessageCommand.GetBlockData: - OnGetBlockDataMessageReceived((GetBlockDataPayload)msg.Payload); - break; - case MessageCommand.GetData: - OnGetDataMessageReceived((InvPayload)msg.Payload); - break; - case MessageCommand.GetHeaders: - OnGetHeadersMessageReceived((GetBlocksPayload)msg.Payload); - break; - case MessageCommand.Headers: - OnHeadersMessageReceived((HeadersPayload)msg.Payload); - break; - case MessageCommand.Inv: - OnInvMessageReceived((InvPayload)msg.Payload); - break; - case MessageCommand.Mempool: - OnMemPoolMessageReceived(); - break; - case MessageCommand.Ping: - OnPingMessageReceived((PingPayload)msg.Payload); - break; - case MessageCommand.Pong: - OnPongMessageReceived((PingPayload)msg.Payload); - break; - case MessageCommand.Transaction: - if (msg.Payload.Size <= Transaction.MaxTransactionSize) - OnInventoryReceived((Transaction)msg.Payload); - break; - case MessageCommand.Verack: - case MessageCommand.Version: - throw new ProtocolViolationException(); - case MessageCommand.Alert: - case MessageCommand.MerkleBlock: - case MessageCommand.NotFound: - case MessageCommand.Reject: - default: break; - } - } - - private void OnPingMessageReceived(PingPayload payload) - { - UpdateLastBlockIndex(payload); - EnqueueMessage(Message.Create(MessageCommand.Pong, PingPayload.Create(Blockchain.Singleton.Height, payload.Nonce))); - } - - private void OnPongMessageReceived(PingPayload payload) - { - UpdateLastBlockIndex(payload); - } - protected override void OnReceive(object message) { base.OnReceive(message); @@ -480,43 +166,6 @@ private void OnSend(IInventory inventory) EnqueueMessage((MessageCommand)inventory.InventoryType, inventory); } - private void OnVerackMessageReceived() - { - verack = true; - system.TaskManager.Tell(new TaskManager.Register { Version = Version }); - CheckMessageQueue(); - } - - private void OnVersionMessageReceived(VersionPayload payload) - { - Version = payload; - foreach (NodeCapability capability in payload.Capabilities) - { - switch (capability) - { - case FullNodeCapability fullNodeCapability: - IsFullNode = true; - LastBlockIndex = fullNodeCapability.StartHeight; - break; - case ServerCapability serverCapability: - if (serverCapability.Type == NodeCapabilityType.TcpServer) - ListenerTcpPort = serverCapability.Port; - break; - } - } - if (payload.Nonce == LocalNode.Nonce || payload.Magic != ProtocolSettings.Default.Magic) - { - Disconnect(true); - return; - } - if (LocalNode.Singleton.RemoteNodes.Values.Where(p => p != this).Any(p => p.Remote.Address.Equals(Remote.Address) && p.Version?.Nonce == payload.Nonce)) - { - Disconnect(true); - return; - } - SendMessage(Message.Create(MessageCommand.Verack)); - } - protected override void PostStop() { timer.CancelIfNotNull(); @@ -529,32 +178,12 @@ internal static Props Props(NeoSystem system, object connection, IPEndPoint remo return Akka.Actor.Props.Create(() => new RemoteNode(system, connection, remote, local)).WithMailbox("remote-node-mailbox"); } - private void RefreshPendingKnownHashes() - { - while (pendingKnownHashes.Count > 0) - { - var (_, time) = pendingKnownHashes[0]; - if (DateTime.UtcNow - time <= PendingTimeout) - break; - pendingKnownHashes.RemoveAt(0); - } - } - private void SendMessage(Message message) { ack = false; SendData(ByteString.FromBytes(message.ToArray())); } - protected override SupervisorStrategy SupervisorStrategy() - { - return new OneForOneStrategy(ex => - { - Disconnect(true); - return Directive.Stop; - }, loggingEnabled: false); - } - private Message TryParseMessage() { var length = Message.TryDeserialize(msg_buffer, out var msg); @@ -563,15 +192,6 @@ private Message TryParseMessage() msg_buffer = msg_buffer.Slice(length).Compact(); return msg; } - - private void UpdateLastBlockIndex(PingPayload payload) - { - if (payload.LastBlockIndex > LastBlockIndex) - { - LastBlockIndex = payload.LastBlockIndex; - system.TaskManager.Tell(new TaskManager.Update { LastBlockIndex = LastBlockIndex }); - } - } } internal class RemoteNodeMailbox : PriorityMailbox