Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine RemoteNode and ProtocolHandler #1520

Merged
merged 8 commits into from
Apr 9, 2020
1 change: 0 additions & 1 deletion src/neo/NeoSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ public class NeoSystem : IDisposable
$"blockchain-mailbox {{ mailbox-type: \"{typeof(BlockchainMailbox).AssemblyQualifiedName}\" }}" +
$"task-manager-mailbox {{ mailbox-type: \"{typeof(TaskManagerMailbox).AssemblyQualifiedName}\" }}" +
$"remote-node-mailbox {{ mailbox-type: \"{typeof(RemoteNodeMailbox).AssemblyQualifiedName}\" }}" +
$"protocol-handler-mailbox {{ mailbox-type: \"{typeof(ProtocolHandlerMailbox).AssemblyQualifiedName}\" }}" +
$"consensus-service-mailbox {{ mailbox-type: \"{typeof(ConsensusServiceMailbox).AssemblyQualifiedName}\" }}");
public IActorRef Blockchain { get; }
public IActorRef LocalNode { get; }
Expand Down
10 changes: 5 additions & 5 deletions src/neo/Network/P2P/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Neo.Network.P2P
{
public abstract class Connection : UntypedActor
{
internal class Timer { public static Timer Instance = new Timer(); }
internal class Close { public bool Abort; }
internal class Ack : Tcp.Event { public static Ack Instance = new Ack(); }

/// <summary>
Expand All @@ -32,7 +32,7 @@ protected Connection(object connection, IPEndPoint remote, IPEndPoint local)
{
this.Remote = remote;
this.Local = local;
this.timer = Context.System.Scheduler.ScheduleTellOnceCancelable(TimeSpan.FromSeconds(connectionTimeoutLimitStart), Self, Timer.Instance, ActorRefs.NoSender);
this.timer = Context.System.Scheduler.ScheduleTellOnceCancelable(TimeSpan.FromSeconds(connectionTimeoutLimitStart), Self, new Close { Abort = true }, ActorRefs.NoSender);
switch (connection)
{
case IActorRef tcp:
Expand Down Expand Up @@ -89,8 +89,8 @@ protected override void OnReceive(object message)
{
switch (message)
{
case Timer _:
Disconnect(true);
case Close close:
Disconnect(close.Abort);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do we current use Disconnect(false) or just Disconnect();

break;
case Ack _:
OnAck();
Expand All @@ -107,7 +107,7 @@ protected override void OnReceive(object message)
private void OnReceived(ByteString data)
{
timer.CancelIfNotNull();
timer = Context.System.Scheduler.ScheduleTellOnceCancelable(TimeSpan.FromSeconds(connectionTimeoutLimit), Self, Timer.Instance, ActorRefs.NoSender);
timer = Context.System.Scheduler.ScheduleTellOnceCancelable(TimeSpan.FromSeconds(connectionTimeoutLimit), Self, new Close { Abort = true }, ActorRefs.NoSender);
try
{
OnData(data);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
using Akka.Actor;
using Akka.Configuration;
using Neo.Cryptography;
using Neo.IO;
using Neo.IO.Actors;
using Neo.IO.Caching;
using Neo.Ledger;
using Neo.Network.P2P.Capabilities;
using Neo.Network.P2P.Payloads;
using Neo.Persistence;
using Neo.Plugins;
Expand All @@ -17,11 +15,9 @@

namespace Neo.Network.P2P
{
internal class ProtocolHandler : UntypedActor
partial class RemoteNode
{
public class SetFilter { public BloomFilter Filter; }
internal class Timer { }

private class Timer { }
private class PendingKnownHashesCollection : KeyedCollection<UInt256, (UInt256, DateTime)>
{
protected override UInt256 GetKeyForItem((UInt256, DateTime) item)
Expand All @@ -30,11 +26,9 @@ protected override UInt256 GetKeyForItem((UInt256, DateTime) item)
}
}

private readonly NeoSystem system;
private readonly PendingKnownHashesCollection pendingKnownHashes;
private readonly HashSetCache<UInt256> knownHashes;
private readonly HashSetCache<UInt256> sentHashes;
private VersionPayload version;
private readonly PendingKnownHashesCollection pendingKnownHashes = new PendingKnownHashesCollection();
private readonly HashSetCache<UInt256> knownHashes = new HashSetCache<UInt256>(Blockchain.Singleton.MemPool.Capacity * 2 / 5);
private readonly HashSetCache<UInt256> sentHashes = new HashSetCache<UInt256>(Blockchain.Singleton.MemPool.Capacity * 2 / 5);
private bool verack = false;
private BloomFilter bloom_filter;

Expand All @@ -43,33 +37,12 @@ protected override UInt256 GetKeyForItem((UInt256, DateTime) item)

private readonly ICancelable timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimerInterval, TimerInterval, Context.Self, new Timer(), ActorRefs.NoSender);

public ProtocolHandler(NeoSystem system)
{
this.system = system;
this.pendingKnownHashes = new PendingKnownHashesCollection();
this.knownHashes = new HashSetCache<UInt256>(Blockchain.Singleton.MemPool.Capacity * 2 / 5);
this.sentHashes = new HashSetCache<UInt256>(Blockchain.Singleton.MemPool.Capacity * 2 / 5);
}

protected override void OnReceive(object message)
{
switch (message)
{
case Message msg:
OnMessage(msg);
break;
case Timer _:
OnTimer();
break;
}
}

private void OnMessage(Message msg)
{
foreach (IP2PPlugin plugin in Plugin.P2PPlugins)
if (!plugin.OnP2PMessage(msg))
return;
if (version == null)
if (Version == null)
{
if (msg.Command != MessageCommand.Version)
throw new ProtocolViolationException();
Expand Down Expand Up @@ -158,20 +131,17 @@ private void OnAddrMessageReceived(AddrPayload payload)

private void OnFilterAddMessageReceived(FilterAddPayload payload)
{
if (bloom_filter != null)
bloom_filter.Add(payload.Data);
bloom_filter?.Add(payload.Data);
}

private void OnFilterClearMessageReceived()
{
bloom_filter = null;
Context.Parent.Tell(new SetFilter { Filter = null });
}

private void OnFilterLoadMessageReceived(FilterLoadPayload payload)
{
bloom_filter = new BloomFilter(payload.Filter.Length * 8, payload.K, payload.Tweak, payload.Filter);
Context.Parent.Tell(new SetFilter { Filter = bloom_filter });
vncoelho marked this conversation as resolved.
Show resolved Hide resolved
}

/// <summary>
Expand All @@ -189,7 +159,7 @@ private void OnGetAddrMessageReceived()
.Take(AddrPayload.MaxCountToSend);
NetworkAddressWithTime[] networkAddresses = peers.Select(p => NetworkAddressWithTime.Create(p.Listener.Address, p.Version.Timestamp, p.Version.Capabilities)).ToArray();
if (networkAddresses.Length == 0) return;
Context.Parent.Tell(Message.Create(MessageCommand.Addr, AddrPayload.Create(networkAddresses)));
EnqueueMessage(Message.Create(MessageCommand.Addr, AddrPayload.Create(networkAddresses)));
}

/// <summary>
Expand All @@ -216,7 +186,7 @@ private void OnGetBlocksMessageReceived(GetBlocksPayload payload)
hashes.Add(hash);
}
if (hashes.Count == 0) return;
Context.Parent.Tell(Message.Create(MessageCommand.Inv, InvPayload.Create(InventoryType.Block, hashes.ToArray())));
EnqueueMessage(Message.Create(MessageCommand.Inv, InvPayload.Create(InventoryType.Block, hashes.ToArray())));
}

private void OnGetBlockDataMessageReceived(GetBlockDataPayload payload)
Expand All @@ -229,12 +199,12 @@ private void OnGetBlockDataMessageReceived(GetBlockDataPayload payload)

if (bloom_filter == null)
{
Context.Parent.Tell(Message.Create(MessageCommand.Block, block));
EnqueueMessage(Message.Create(MessageCommand.Block, block));
}
else
{
BitArray flags = new BitArray(block.Transactions.Select(p => bloom_filter.Test(p)).ToArray());
Context.Parent.Tell(Message.Create(MessageCommand.MerkleBlock, MerkleBlockPayload.Create(block, flags)));
EnqueueMessage(Message.Create(MessageCommand.MerkleBlock, MerkleBlockPayload.Create(block, flags)));
}
}
}
Expand All @@ -255,26 +225,26 @@ private void OnGetDataMessageReceived(InvPayload payload)
case InventoryType.TX:
Transaction tx = Blockchain.Singleton.GetTransaction(hash);
if (tx != null)
Context.Parent.Tell(Message.Create(MessageCommand.Transaction, tx));
EnqueueMessage(Message.Create(MessageCommand.Transaction, tx));
break;
case InventoryType.Block:
Block block = Blockchain.Singleton.GetBlock(hash);
if (block != null)
{
if (bloom_filter == null)
{
Context.Parent.Tell(Message.Create(MessageCommand.Block, block));
EnqueueMessage(Message.Create(MessageCommand.Block, block));
}
else
{
BitArray flags = new BitArray(block.Transactions.Select(p => bloom_filter.Test(p)).ToArray());
Context.Parent.Tell(Message.Create(MessageCommand.MerkleBlock, MerkleBlockPayload.Create(block, flags)));
EnqueueMessage(Message.Create(MessageCommand.MerkleBlock, MerkleBlockPayload.Create(block, flags)));
}
}
break;
case InventoryType.Consensus:
if (Blockchain.Singleton.ConsensusRelayCache.TryGet(hash, out IInventory inventoryConsensus))
Context.Parent.Tell(Message.Create(MessageCommand.Consensus, inventoryConsensus));
EnqueueMessage(Message.Create(MessageCommand.Consensus, inventoryConsensus));
break;
}
}
Expand Down Expand Up @@ -304,18 +274,18 @@ private void OnGetHeadersMessageReceived(GetBlocksPayload payload)
headers.Add(header);
}
if (headers.Count == 0) return;
Context.Parent.Tell(Message.Create(MessageCommand.Headers, HeadersPayload.Create(headers.ToArray())));
EnqueueMessage(Message.Create(MessageCommand.Headers, HeadersPayload.Create(headers.ToArray())));
}

private void OnHeadersMessageReceived(HeadersPayload payload)
{
if (payload.Headers.Length == 0) return;
system.Blockchain.Tell(payload.Headers, Context.Parent);
system.Blockchain.Tell(payload.Headers);
}

private void OnInventoryReceived(IInventory inventory)
{
system.TaskManager.Tell(new TaskManager.TaskCompleted { Hash = inventory.Hash }, Context.Parent);
system.TaskManager.Tell(new TaskManager.TaskCompleted { Hash = inventory.Hash });
system.LocalNode.Tell(new LocalNode.Relay { Inventory = inventory });
pendingKnownHashes.Remove(inventory.Hash);
knownHashes.Add(inventory.Hash);
Expand All @@ -339,47 +309,61 @@ private void OnInvMessageReceived(InvPayload payload)
if (hashes.Length == 0) return;
foreach (UInt256 hash in hashes)
pendingKnownHashes.Add((hash, DateTime.UtcNow));
system.TaskManager.Tell(new TaskManager.NewTasks { Payload = InvPayload.Create(payload.Type, hashes) }, Context.Parent);
system.TaskManager.Tell(new TaskManager.NewTasks { Payload = InvPayload.Create(payload.Type, hashes) });
}

private void OnMemPoolMessageReceived()
{
foreach (InvPayload payload in InvPayload.CreateGroup(InventoryType.TX, Blockchain.Singleton.MemPool.GetVerifiedTransactions().Select(p => p.Hash).ToArray()))
Context.Parent.Tell(Message.Create(MessageCommand.Inv, payload));
EnqueueMessage(Message.Create(MessageCommand.Inv, payload));
}

private void OnPingMessageReceived(PingPayload payload)
{
Context.Parent.Tell(payload);
Context.Parent.Tell(Message.Create(MessageCommand.Pong, PingPayload.Create(Blockchain.Singleton.Height, payload.Nonce)));
UpdateLastBlockIndex(payload);
EnqueueMessage(Message.Create(MessageCommand.Pong, PingPayload.Create(Blockchain.Singleton.Height, payload.Nonce)));
}

private void OnPongMessageReceived(PingPayload payload)
{
Context.Parent.Tell(payload);
UpdateLastBlockIndex(payload);
}

private void OnVerackMessageReceived()
{
verack = true;
Context.Parent.Tell(MessageCommand.Verack);
system.TaskManager.Tell(new TaskManager.Register { Version = Version });
CheckMessageQueue();
}

private void OnVersionMessageReceived(VersionPayload payload)
{
version = payload;
Context.Parent.Tell(payload);
}

private void OnTimer()
{
RefreshPendingKnownHashes();
}

protected override void PostStop()
{
timer.CancelIfNotNull();
base.PostStop();
Version = payload;
foreach (NodeCapability capability in payload.Capabilities)
{
switch (capability)
{
case FullNodeCapability fullNodeCapability:
IsFullNode = true;
LastBlockIndex = fullNodeCapability.StartHeight;
break;
case ServerCapability serverCapability:
if (serverCapability.Type == NodeCapabilityType.TcpServer)
ListenerTcpPort = serverCapability.Port;
break;
}
}
if (payload.Nonce == LocalNode.Nonce || payload.Magic != ProtocolSettings.Default.Magic)
{
Disconnect(true);
return;
}
if (LocalNode.Singleton.RemoteNodes.Values.Where(p => p != this).Any(p => p.Remote.Address.Equals(Remote.Address) && p.Version?.Nonce == payload.Nonce))
{
Disconnect(true);
return;
}
SendMessage(Message.Create(MessageCommand.Verack));
}

private void RefreshPendingKnownHashes()
Expand All @@ -393,50 +377,12 @@ private void RefreshPendingKnownHashes()
}
}

public static Props Props(NeoSystem system)
{
return Akka.Actor.Props.Create(() => new ProtocolHandler(system)).WithMailbox("protocol-handler-mailbox");
}
}

internal class ProtocolHandlerMailbox : PriorityMailbox
{
public ProtocolHandlerMailbox(Settings settings, Config config)
: base(settings, config)
{
}

internal protected override bool IsHighPriority(object message)
private void UpdateLastBlockIndex(PingPayload payload)
{
if (!(message is Message msg)) return false;
switch (msg.Command)
if (payload.LastBlockIndex > LastBlockIndex)
{
case MessageCommand.Consensus:
case MessageCommand.FilterAdd:
case MessageCommand.FilterClear:
case MessageCommand.FilterLoad:
case MessageCommand.Verack:
case MessageCommand.Version:
case MessageCommand.Alert:
return true;
default:
return false;
}
}

internal protected override bool ShallDrop(object message, IEnumerable queue)
{
if (message is ProtocolHandler.Timer) return false;
if (!(message is Message msg)) return true;
switch (msg.Command)
{
case MessageCommand.GetAddr:
case MessageCommand.GetBlocks:
case MessageCommand.GetHeaders:
case MessageCommand.Mempool:
return queue.OfType<Message>().Any(p => p.Command == msg.Command);
default:
return false;
LastBlockIndex = payload.LastBlockIndex;
system.TaskManager.Tell(new TaskManager.Update { LastBlockIndex = LastBlockIndex });
}
}
}
Expand Down
Loading