diff --git a/src/neo/Helper.cs b/src/neo/Helper.cs index ebafdc16cc..b8a0a7f79e 100644 --- a/src/neo/Helper.cs +++ b/src/neo/Helper.cs @@ -237,6 +237,10 @@ public static ulong ToTimestampMS(this DateTime time) return (ulong)(time.ToUniversalTime() - unixEpoch).TotalMilliseconds; } + /// + /// Checks if address is IPv4 Maped to IPv6 format, if so, Map to IPv4. + /// Otherwise, return current address. + /// internal static IPAddress Unmap(this IPAddress address) { if (address.IsIPv4MappedToIPv6) @@ -244,6 +248,10 @@ internal static IPAddress Unmap(this IPAddress address) return address; } + /// + /// Checks if IPEndPoint is IPv4 Maped to IPv6 format, if so, unmap to IPv4. + /// Otherwise, return current endpoint. + /// internal static IPEndPoint Unmap(this IPEndPoint endPoint) { if (!endPoint.Address.IsIPv4MappedToIPv6) diff --git a/src/neo/Network/P2P/LocalNode.cs b/src/neo/Network/P2P/LocalNode.cs index 0efd8a3287..a651d46a16 100644 --- a/src/neo/Network/P2P/LocalNode.cs +++ b/src/neo/Network/P2P/LocalNode.cs @@ -59,11 +59,21 @@ public LocalNode(NeoSystem system) } } + /// + /// Packs a MessageCommand to a full Message with an optional ISerializable payload. + /// Forwards it to . + /// + /// The message command to be packed. + /// Optional payload to be Serialized along the message. private void BroadcastMessage(MessageCommand command, ISerializable payload = null) { BroadcastMessage(Message.Create(command, payload)); } + /// + /// Broadcast a message to all connected nodes, namely . + /// + /// The message to be broadcasted. private void BroadcastMessage(Message message) { Connections.Tell(message); @@ -87,6 +97,10 @@ private static IPEndPoint GetIPEndpointFromHostPort(string hostNameOrAddress, in return new IPEndPoint(ipAddress, port); } + /// + /// Return an amount of random seeds nodes from the default SeedList file defined on . + /// + /// Limit of random seed nodes to be obtained, also limited by the available seeds from file. private static IEnumerable GetIPEndPointsFromSeedList(int seedsToTake) { if (seedsToTake > 0) @@ -122,6 +136,12 @@ public IEnumerable GetUnconnectedPeers() return UnconnectedPeers; } + /// + /// Override of abstract class that is triggered when is empty. + /// Performs a BroadcastMessage with the command `MessageCommand.GetAddr`, which, eventually, tells all known connections. + /// If there are no connected peers it will try with the default, respecting MaxCountFromSeedList limit. + /// + /// The count of peers required protected override void NeedMorePeers(int count) { count = Math.Max(count, MaxCountFromSeedList); @@ -131,6 +151,8 @@ protected override void NeedMorePeers(int count) } else { + // Will call AddPeers with default SeedList set cached on . + // It will try to add those, sequentially, to the list of currently uncconected ones. AddPeers(GetIPEndPointsFromSeedList(count)); } } @@ -157,6 +179,12 @@ protected override void OnReceive(object message) } } + /// + /// For Transaction type of IInventory, it will tell Transaction to the actor of Consensus. + /// Otherwise, tell the inventory to the actor of Blockchain. + /// There are, currently, three implementations of IInventory: TX, Block and ConsensusPayload. + /// + /// The inventory to be relayed. private void OnRelay(IInventory inventory) { if (inventory is Transaction transaction) diff --git a/src/neo/Network/P2P/Message.cs b/src/neo/Network/P2P/Message.cs index 2e5c1f8258..2f395ce01f 100644 --- a/src/neo/Network/P2P/Message.cs +++ b/src/neo/Network/P2P/Message.cs @@ -14,6 +14,10 @@ public class Message : ISerializable private const int CompressionMinSize = 128; private const int CompressionThreshold = 64; + /// + /// Flags that represents whether a message is compressed. + /// 0 for None, 1 for Compressed. + /// public MessageFlags Flags; public MessageCommand Command; public ISerializable Payload; diff --git a/src/neo/Network/P2P/Peer.cs b/src/neo/Network/P2P/Peer.cs index bc852fb8fa..8e7eb21bd2 100644 --- a/src/neo/Network/P2P/Peer.cs +++ b/src/neo/Network/P2P/Peer.cs @@ -36,8 +36,19 @@ private class WsConnected { public WebSocket Socket; public IPEndPoint Remote; p private static readonly HashSet localAddresses = new HashSet(); private readonly Dictionary ConnectedAddresses = new Dictionary(); + /// + /// A dictionary that stores the connected nodes. + /// protected readonly ConcurrentDictionary ConnectedPeers = new ConcurrentDictionary(); + /// + /// An ImmutableHashSet that stores the Peers received: 1) from other nodes or 2) from default file. + /// If the number of desired connections is not enough, first try to connect with the peers from this set. + /// protected ImmutableHashSet UnconnectedPeers = ImmutableHashSet.Empty; + /// + /// When a TCP connection request is sent to a peer, the peer will be added to the ImmutableHashSet. + /// If a Tcp.Connected or a Tcp.CommandFailed (with TCP.Command of type Tcp.Connect) is received, the related peer will be removed. + /// protected ImmutableHashSet ConnectingPeers = ImmutableHashSet.Empty; protected HashSet TrustedIpAddresses { get; } = new HashSet(); @@ -63,10 +74,16 @@ static Peer() localAddresses.UnionWith(NetworkInterface.GetAllNetworkInterfaces().SelectMany(p => p.GetIPProperties().UnicastAddresses).Select(p => p.Address.Unmap())); } + /// + /// Tries to add a set of peers to the immutable ImmutableHashSet of UnconnectedPeers. + /// + /// Peers that the method will try to add (union) to (with) UnconnectedPeers. protected void AddPeers(IEnumerable peers) { if (UnconnectedPeers.Count < UnconnectedMax) { + // Do not select peers to be added that are already on the ConnectedPeers + // If the address is the same, the ListenerTcpPort should be different peers = peers.Where(p => (p.Port != ListenerTcpPort || !localAddresses.Contains(p.Address)) && !ConnectedPeers.Values.Contains(p)); ImmutableInterlocked.Update(ref UnconnectedPeers, p => p.Union(peers)); } @@ -75,9 +92,11 @@ protected void AddPeers(IEnumerable peers) protected void ConnectToPeer(IPEndPoint endPoint, bool isTrusted = false) { endPoint = endPoint.Unmap(); + // If the address is the same, the ListenerTcpPort should be different, otherwise, return if (endPoint.Port == ListenerTcpPort && localAddresses.Contains(endPoint.Address)) return; if (isTrusted) TrustedIpAddresses.Add(endPoint.Address); + // If connections with the peer greater than or equal to MaxConnectionsPerAddress, return. if (ConnectedAddresses.TryGetValue(endPoint.Address, out int count) && count >= MaxConnectionsPerAddress) return; if (ConnectedPeers.Values.Contains(endPoint)) return; @@ -96,6 +115,10 @@ private static bool IsIntranetAddress(IPAddress address) return (value & 0xff000000) == 0x0a000000 || (value & 0xff000000) == 0x7f000000 || (value & 0xfff00000) == 0xac100000 || (value & 0xffff0000) == 0xc0a80000 || (value & 0xffff0000) == 0xa9fe0000; } + /// + /// Abstract method for asking for more peers. Currently triggered when UnconnectedPeers is empty. + /// + /// Number of peers that are being requested. protected abstract void NeedMorePeers(int count); protected override void OnReceive(object message) @@ -141,6 +164,7 @@ private void OnStart(ChannelsConfig config) MaxConnections = config.MaxConnections; MaxConnectionsPerAddress = config.MaxConnectionsPerAddress; + // schedule time to trigger `OnTimer` event every TimerMillisecondsInterval ms timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(0, 5000, Context.Self, new Timer(), ActorRefs.NoSender); if ((ListenerTcpPort > 0 || ListenerWsPort > 0) && localAddresses.All(p => !p.IsIPv4MappedToIPv6 || IsIntranetAddress(p)) @@ -174,6 +198,13 @@ private void OnStart(ChannelsConfig config) } } + /// + /// Will be triggered when a Tcp.Connected message is received. + /// If the conditions are met, the remote endpoint will be added to ConnectedPeers. + /// Increase the connection number with the remote endpoint by one. + /// + /// The remote endpoint of TCP connection. + /// The local endpoint of TCP connection. private void OnTcpConnected(IPEndPoint remote, IPEndPoint local) { ImmutableInterlocked.Update(ref ConnectingPeers, p => p.Remove(remote)); @@ -198,6 +229,11 @@ private void OnTcpConnected(IPEndPoint remote, IPEndPoint local) } } + /// + /// Will be triggered when a Tcp.CommandFailed message is received. + /// If it's a Tcp.Connect command, remove the related endpoint from ConnectingPeers. + /// + /// Tcp.Command message/event. private void OnTcpCommandFailed(Tcp.Command cmd) { switch (cmd) @@ -223,7 +259,10 @@ private void OnTerminated(IActorRef actorRef) private void OnTimer() { + // Check if the number of desired connections is already enough if (ConnectedPeers.Count >= MinDesiredConnections) return; + + // If there aren't available UnconnectedPeers, it triggers an abstract implementation of NeedMorePeers if (UnconnectedPeers.Count == 0) NeedMorePeers(MinDesiredConnections - ConnectedPeers.Count); IPEndPoint[] endpoints = UnconnectedPeers.Take(MinDesiredConnections - ConnectedPeers.Count).ToArray(); diff --git a/src/neo/Network/P2P/ProtocolHandler.cs b/src/neo/Network/P2P/ProtocolHandler.cs index 12aaae48d9..cecbb02796 100644 --- a/src/neo/Network/P2P/ProtocolHandler.cs +++ b/src/neo/Network/P2P/ProtocolHandler.cs @@ -174,6 +174,11 @@ private void OnFilterLoadMessageReceived(FilterLoadPayload payload) Context.Parent.Tell(new SetFilter { Filter = bloom_filter }); } + /// + /// Will be triggered when a MessageCommand.GetAddr message is received. + /// Randomly select nodes from the local RemoteNodes and tells to RemoteNode actors a MessageCommand.Addr message. + /// The message contains a list of networkAddresses from those selected random peers. + /// private void OnGetAddrMessageReceived() { Random rand = new Random(); @@ -187,9 +192,16 @@ private void OnGetAddrMessageReceived() Context.Parent.Tell(Message.Create(MessageCommand.Addr, AddrPayload.Create(networkAddresses))); } + /// + /// Will be triggered when a MessageCommand.GetBlocks message is received. + /// Tell the specified number of blocks' hashes starting with the requested HashStart until payload.Count or MaxHashesCount + /// Responses are sent to RemoteNode actor as MessageCommand.Inv Message. + /// + /// A GetBlocksPayload including start block Hash and number of blocks requested. private void OnGetBlocksMessageReceived(GetBlocksPayload payload) { UInt256 hash = payload.HashStart; + // The default value of payload.Count is -1 int count = payload.Count < 0 || payload.Count > InvPayload.MaxHashesCount ? InvPayload.MaxHashesCount : payload.Count; TrimmedBlock state = Blockchain.Singleton.View.Blocks.TryGet(hash); if (state == null) return; @@ -227,6 +239,12 @@ private void OnGetBlockDataMessageReceived(GetBlockDataPayload payload) } } + /// + /// Will be triggered when a MessageCommand.GetData message is received. + /// The payload includes an array of hash values. + /// For different payload.Type (Tx, Block, Consensus), get the corresponding (Txs, Blocks, Consensus) and tell them to RemoteNode actor. + /// + /// The payload containing the requested information. private void OnGetDataMessageReceived(InvPayload payload) { UInt256[] hashes = payload.Hashes.Where(p => sentHashes.Add(p)).ToArray(); @@ -262,6 +280,12 @@ private void OnGetDataMessageReceived(InvPayload payload) } } + /// + /// Will be triggered when a MessageCommand.GetHeaders message is received. + /// Tell the specified number of blocks' headers starting with the requested HashStart to RemoteNode actor. + /// A limit set by HeadersPayload.MaxHeadersCount is also applied to the number of requested Headers, namely payload.Count. + /// + /// A GetBlocksPayload including start block Hash and number of blocks' headers requested. private void OnGetHeadersMessageReceived(GetBlocksPayload payload) { UInt256 hash = payload.HashStart; diff --git a/src/neo/Network/P2P/RemoteNode.cs b/src/neo/Network/P2P/RemoteNode.cs index 5d4e712729..44bb2ef107 100644 --- a/src/neo/Network/P2P/RemoteNode.cs +++ b/src/neo/Network/P2P/RemoteNode.cs @@ -50,6 +50,12 @@ public RemoteNode(NeoSystem system, object connection, IPEndPoint remote, IPEndP SendMessage(Message.Create(MessageCommand.Version, VersionPayload.Create(LocalNode.Nonce, LocalNode.UserAgent, capabilities.ToArray()))); } + /// + /// It defines the message queue to be used for dequeuing. + /// If the high-priority message queue is not empty, choose the high-priority message queue. + /// Otherwise, choose the low-priority message queue. + /// Finally, it sends the first message of the queue. + /// private void CheckMessageQueue() { if (!verack || !ack) return; @@ -67,6 +73,10 @@ private void EnqueueMessage(MessageCommand command, ISerializable payload = null EnqueueMessage(Message.Create(command, payload)); } + /// + /// Add message to high priority queue or low priority queue depending on the message type. + /// + /// The message to be added. private void EnqueueMessage(Message message) { bool is_single = false; diff --git a/src/neo/Network/P2P/TaskManager.cs b/src/neo/Network/P2P/TaskManager.cs index 4fb1a2fd09..818973ea4b 100644 --- a/src/neo/Network/P2P/TaskManager.cs +++ b/src/neo/Network/P2P/TaskManager.cs @@ -27,7 +27,11 @@ private class Timer { } private readonly NeoSystem system; private const int MaxConncurrentTasks = 3; + private const int PingCoolingOffPeriod = 60; // in secconds. + /// + /// A set of known hashes, of inventories or payloads, already received. + /// private readonly FIFOSet knownHashes; private readonly Dictionary globalTasks = new Dictionary(); private readonly Dictionary sessions = new Dictionary(); @@ -55,16 +59,20 @@ private void OnNewTasks(InvPayload payload) { if (!sessions.TryGetValue(Sender, out TaskSession session)) return; + // Do not accept payload of type InventoryType.TX if not synced on best known HeaderHeight if (payload.Type == InventoryType.TX && Blockchain.Singleton.Height < Blockchain.Singleton.HeaderHeight) { RequestTasks(session); return; } HashSet hashes = new HashSet(payload.Hashes); + // Remove all previously processed knownHashes from the list that is being requested hashes.Remove(knownHashes); + // Add to AvailableTasks the ones, of type InventoryType.Block, that are global (already under process by other sessions) if (payload.Type == InventoryType.Block) session.AvailableTasks.UnionWith(hashes.Where(p => globalTasks.ContainsKey(p))); + // Remove those that are already in process by other sessions hashes.Remove(globalTasks); if (hashes.Count == 0) { @@ -72,6 +80,7 @@ private void OnNewTasks(InvPayload payload) return; } + // Update globalTasks with the ones that will be requested within this current session foreach (UInt256 hash in hashes) { IncrementGlobalTask(hash); @@ -214,9 +223,11 @@ public static Props Props(NeoSystem system) private void RequestTasks(TaskSession session) { if (session.HasTask) return; + // If there are pending tasks of InventoryType.Block we should process them if (session.AvailableTasks.Count > 0) { session.AvailableTasks.Remove(knownHashes); + // Search any similar hash that is on Singleton's knowledge, which means, on the way or already processed session.AvailableTasks.RemoveWhere(p => Blockchain.Singleton.ContainsBlock(p)); HashSet hashes = new HashSet(session.AvailableTasks); if (hashes.Count > 0) @@ -234,6 +245,9 @@ private void RequestTasks(TaskSession session) return; } } + + // When the number of AvailableTasks is no more than 0, no pending tasks of InventoryType.Block, it should process pending the tasks of headers + // If not HeaderTask pending to be processed it should ask for more Blocks if ((!HasHeaderTask || globalTasks[HeaderTaskHash] < MaxConncurrentTasks) && Blockchain.Singleton.HeaderHeight < session.LastBlockIndex) { session.Tasks[HeaderTaskHash] = DateTime.UtcNow;