Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add header back #2259

Merged
merged 107 commits into from
Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
6f5fe69
add header back
Jan 22, 2021
ce6e479
remove ping pong
Jan 22, 2021
af9d2a8
restore ping pong
Jan 22, 2021
77a0d7d
fix
Jan 22, 2021
c70a985
Merge branch 'master' into add_header
Tommo-L Jan 23, 2021
f190df4
logic fix
Jan 23, 2021
3df7314
Merge branch 'add_header' of https://github.com/Qiao-Jin/neo into add…
Jan 23, 2021
87070ba
Update TaskManager/TaskSession
Jan 25, 2021
02be1c7
fix ut
Jan 25, 2021
84b562c
Array.Empty
shargon Jan 25, 2021
a7320ac
kick line
Jan 25, 2021
53f2f05
Merge branch 'add_header' of https://github.com/Qiao-Jin/neo into add…
Jan 25, 2021
97479d7
Merge branch 'master' into add_header
Qiao-Jin Jan 26, 2021
ffc236f
add header cache
Jan 26, 2021
7c663d6
Merge branch 'add_header' of https://github.com/Qiao-Jin/neo into add…
Jan 26, 2021
90c8d95
format
Jan 26, 2021
a1f56fc
fix
Jan 26, 2021
4206e00
rename parameter
Jan 26, 2021
0a85bdc
fix
Jan 26, 2021
82516be
header only in cache
Jan 26, 2021
8ac3ea2
fix
Jan 26, 2021
538eeab
fix
Jan 26, 2021
5eacd2f
Update HeaderCache
erikzhang Jan 26, 2021
fa112e9
Update HeaderCache.cs
erikzhang Jan 26, 2021
23269f2
Merge branch 'master' into add_header
Qiao-Jin Jan 26, 2021
9808fc3
update headercache logic
Jan 26, 2021
fdfbb67
fix
Jan 27, 2021
fabf59c
Add IndexedQueue
erikzhang Jan 27, 2021
4143d82
Remove HeaderCache
erikzhang Jan 27, 2021
e7529cf
Rename
erikzhang Jan 27, 2021
b7075b5
Limit the count of cached headers
erikzhang Jan 27, 2021
2ca2e59
Clean using
shargon Jan 27, 2021
e543a42
Merge remote-tracking branch 'Qiao-Jin/add_header' into add_header
shargon Jan 27, 2021
b2574f6
Add IndexedQueue UT
shargon Jan 27, 2021
7683b3f
Merge branch 'master' into add_header
shargon Jan 27, 2021
fe29464
restore GetBlockByIndexPayload and fix
Jan 27, 2021
1cec5c6
Merge branch 'add_header' of https://github.com/Qiao-Jin/neo into add…
Jan 27, 2021
d2fa995
Merge branch 'master' into add_header
Qiao-Jin Jan 28, 2021
6d8dcae
restore getheaders
Jan 28, 2021
ccb0b45
Optimize IndexedQueue
erikzhang Jan 28, 2021
b563d1d
HeaderCacheFull
erikzhang Jan 28, 2021
2de91fe
Update src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs
erikzhang Jan 28, 2021
573f630
Merge branch 'master' into add_header
Qiao-Jin Jan 28, 2021
478c12c
fix
Jan 28, 2021
e1b82f0
Merge branch 'master' into add_header
Qiao-Jin Jan 29, 2021
7710ab9
Update OnGetHeadersMessageReceived
Jan 29, 2021
e77b299
Merge branch 'add_header' of https://github.com/Qiao-Jin/neo into add…
Jan 29, 2021
052559a
Add lock for IndexedQueue
Jan 29, 2021
33fceba
Revert "Add lock for IndexedQueue"
erikzhang Jan 29, 2021
026c7fe
Add HeaderCache and locks
erikzhang Jan 29, 2021
0e06127
Optimize
erikzhang Jan 29, 2021
a4240cf
Optimize
erikzhang Jan 29, 2021
234ffcb
assign GetBlockByIndexPayload to GetBlocks
Jan 29, 2021
35fd13c
Optimize
shargon Jan 29, 2021
f0ed92a
fix sync logic
Jan 29, 2021
82627a2
Merge branch 'add_header' of https://github.com/Qiao-Jin/neo into add…
Jan 29, 2021
3c3b43b
Revert "Optimize"
erikzhang Jan 29, 2021
6a17179
Merge branch 'master' into add_header
Qiao-Jin Jan 29, 2021
2131bef
Revert "assign GetBlockByIndexPayload to GetBlocks"
Jan 29, 2021
9e1ff37
fix
Jan 29, 2021
cea69bb
Add check null
shargon Jan 29, 2021
8aba54f
Add nullable
shargon Jan 29, 2021
421317e
Add HeaderCache.GetSnapshot()
erikzhang Jan 29, 2021
1bd4bbb
fix null pointer
Jan 30, 2021
51fc9ec
Optimize
erikzhang Jan 30, 2021
7f1a362
Do not send ping when persisted block
erikzhang Jan 30, 2021
6e1f597
Remove TaskCompleted
erikzhang Jan 30, 2021
0810e80
Remove HeaderTaskCompleted
erikzhang Jan 30, 2021
131f2df
restore OnBlock
Jan 30, 2021
44781e1
Optimize ping
erikzhang Jan 30, 2021
4847752
Remove TaskSession.StartHeight
erikzhang Jan 30, 2021
d237a9a
Remove TaskSession.RemoteNode
erikzhang Jan 30, 2021
8b85668
Remove TaskSession.Version
erikzhang Jan 30, 2021
4099cf4
Update TaskSession.cs
erikzhang Jan 30, 2021
a66d1d7
Remove MemPoolTaskHash
erikzhang Jan 30, 2021
f72105a
Optimize
erikzhang Jan 30, 2021
9c90f4b
restore indextasks
Jan 30, 2021
fa5e04c
Remove hardcoded time
shargon Jan 31, 2021
66bb0a2
Optimize
shargon Jan 31, 2021
e720a72
Revert change
shargon Jan 31, 2021
4cffe13
Rename
erikzhang Feb 1, 2021
1834852
Optimize
erikzhang Feb 1, 2021
73edcfd
reset TimeProvider.Current.UtcNow
Feb 1, 2021
a8713a3
use count
Feb 1, 2021
2b366e4
fix
Feb 1, 2021
439699f
fix (2)
Feb 1, 2021
eda2db7
Add globalIndexTasks
erikzhang Feb 1, 2021
620a95b
fix
erikzhang Feb 1, 2021
1669be6
fix
erikzhang Feb 1, 2021
804d30b
Fix
erikzhang Feb 1, 2021
12b2f6d
Assert
erikzhang Feb 1, 2021
d14eb19
restore logic
Feb 2, 2021
3540a54
Optimize
erikzhang Feb 2, 2021
d695e1b
Fix
erikzhang Feb 2, 2021
51a879e
Update TaskManager.cs
erikzhang Feb 2, 2021
9692809
add a comparison
Feb 2, 2021
2354dfd
optimize
Feb 2, 2021
a860c45
optimize globalindextaks
Feb 2, 2021
a972e45
Revert "optimize globalindextaks"
Feb 2, 2021
565fa05
logic fix
Feb 2, 2021
7057ec7
fix
Feb 2, 2021
9d8057b
fix
Feb 2, 2021
a84ce4b
Revert "fix"
Feb 3, 2021
954a04c
logic fix
Feb 3, 2021
5ede7b9
Merge branch 'master' into add_header
Qiao-Jin Feb 3, 2021
6d0814c
code clean
Feb 3, 2021
4a5ebbb
Update src/neo/Network/P2P/TaskSession.cs
Qiao-Jin Feb 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 65 additions & 7 deletions src/neo/Ledger/Blockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class RelayResult { public IInventory Inventory; public VerifyResult Resu
public DataCache View => new SnapshotCache(Store);
public MemoryPool MemPool { get; }
public uint Height => NativeContract.Ledger.CurrentIndex(currentSnapshot);
public uint HeaderHeight => NativeContract.Ledger.CurrentHeaderIndex(currentSnapshot);
public UInt256 CurrentBlockHash => NativeContract.Ledger.CurrentHash(currentSnapshot);

private static Blockchain singleton;
Expand Down Expand Up @@ -216,30 +217,83 @@ private VerifyResult OnNewBlock(Block block)
{
if (block.Index <= Height)
return VerifyResult.AlreadyExists;
if (block.Index - 1 > Height)
if (block.Index - 1 > HeaderHeight)
{
AddUnverifiedBlockToCache(block);
return VerifyResult.UnableToVerify;
}
if (block.Index == Height + 1)
if (block.Index == HeaderHeight + 1)
{
if (!block.Verify(currentSnapshot))
return VerifyResult.Invalid;
block_cache.TryAdd(block.Hash, block);
block_cache_unverified.Remove(block.Index);
Persist(block);
}
else
{
if (!block.Hash.Equals(NativeContract.Ledger.GetBlockHash(currentSnapshot, block.Index)))
return VerifyResult.Invalid;
}
block_cache.TryAdd(block.Hash, block);
if (block.Index == Height + 1)
{
Block block_persist = block;
List<Block> blocksToPersistList = new List<Block>();
while (true)
erikzhang marked this conversation as resolved.
Show resolved Hide resolved
{
blocksToPersistList.Add(block_persist);
if (block_persist.Index + 1 >= NativeContract.Ledger.CurrentHeaderIndex(currentSnapshot)) break;
UInt256 hash = NativeContract.Ledger.CurrentHeaderHash(currentSnapshot);
if (!block_cache.TryGetValue(hash, out block_persist)) break;
}

int blocksPersisted = 0;
foreach (Block blockToPersist in blocksToPersistList)
{
block_cache_unverified.Remove(blockToPersist.Index);
Persist(blockToPersist);

// 15000 is the default among of seconds per block, while MilliSecondsPerBlock is the current
uint extraBlocks = (15000 - MillisecondsPerBlock) / 1000;

if (blocksPersisted++ < blocksToPersistList.Count - (2 + Math.Max(0, extraBlocks))) continue;
// Empirically calibrated for relaying the most recent 2 blocks persisted with 15s network
// Increase in the rate of 1 block per second in configurations with faster blocks

if (blockToPersist.Index + 100 >= NativeContract.Ledger.CurrentHeaderIndex(currentSnapshot))
system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = blockToPersist });
}
if (block_cache_unverified.TryGetValue(Height + 1, out var unverifiedBlocks))
{
foreach (var unverifiedBlock in unverifiedBlocks.Blocks)
Self.Tell(unverifiedBlock, ActorRefs.NoSender);
block_cache_unverified.Remove(Height + 1);
}
// We can store the new block in block_cache and tell the new height to other nodes after Persist().
system.LocalNode.Tell(Message.Create(MessageCommand.Ping, PingPayload.Create(Singleton.Height)));
}
else
{
if (block.Index + 100 >= NativeContract.Ledger.CurrentHeaderIndex(currentSnapshot))
system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = block });
}
return VerifyResult.Succeed;
}

private void OnNewHeaders(Header[] headers)
{
using (SnapshotCache snapshot = GetSnapshot())
{
foreach (Header header in headers)
{
if (header.Index - 1 >= HeaderHeight) break;
if (header.Index < HeaderHeight) continue;
if (!header.Verify(snapshot)) break;
NativeContract.Ledger.SaveHeader(snapshot, header);
erikzhang marked this conversation as resolved.
Show resolved Hide resolved
NativeContract.Ledger.SetCurrentHeader(snapshot, header.Hash, header.Index);
}
snapshot.Commit();
}
UpdateCurrentSnapshot();
system.TaskManager.Tell(new TaskManager.HeaderTaskCompleted(), Sender);
}

private VerifyResult OnNewInventory(IInventory inventory)
{
if (!inventory.Verify(currentSnapshot)) return VerifyResult.Invalid;
Expand Down Expand Up @@ -271,6 +325,9 @@ protected override void OnReceive(object message)
case FillMemoryPool fill:
OnFillMemoryPool(fill.Transactions);
break;
case Header[] headers:
OnNewHeaders(headers);
break;
case Block block:
OnInventory(block, false);
break;
Expand Down Expand Up @@ -430,6 +487,7 @@ internal protected override bool IsHighPriority(object message)
{
switch (message)
{
case Header[] _:
case Block _:
case ExtensiblePayload _:
case Terminated _:
Expand Down
4 changes: 0 additions & 4 deletions src/neo/Network/P2P/MessageCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ public enum MessageCommand : byte
GetAddr = 0x10,
[ReflectionCache(typeof(AddrPayload))]
Addr = 0x11,
[ReflectionCache(typeof(PingPayload))]
Ping = 0x18,
[ReflectionCache(typeof(PingPayload))]
Pong = 0x19,
erikzhang marked this conversation as resolved.
Show resolved Hide resolved

//synchronization
[ReflectionCache(typeof(GetBlockByIndexPayload))]
Expand Down
16 changes: 16 additions & 0 deletions src/neo/Network/P2P/Payloads/Header.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Neo.SmartContract.Native;
using System;
using System.IO;

Expand Down Expand Up @@ -35,5 +36,20 @@ public override void Serialize(BinaryWriter writer)
base.Serialize(writer);
writer.Write((byte)0);
}

public TrimmedBlock Trim()
erikzhang marked this conversation as resolved.
Show resolved Hide resolved
{
return new TrimmedBlock
{
Version = Version,
PrevHash = PrevHash,
MerkleRoot = MerkleRoot,
Timestamp = Timestamp,
Index = Index,
NextConsensus = NextConsensus,
Witness = Witness,
Hashes = new UInt256[0]
};
}
}
}
27 changes: 9 additions & 18 deletions src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,15 @@ private void OnMessage(Message msg)
case MessageCommand.GetHeaders:
OnGetHeadersMessageReceived((GetBlockByIndexPayload)msg.Payload);
break;
case MessageCommand.Headers:
OnHeadersMessageReceived((HeadersPayload)msg.Payload);
break;
case MessageCommand.Inv:
OnInvMessageReceived((InvPayload)msg.Payload);
break;
case MessageCommand.Mempool:
OnMemPoolMessageReceived();
break;
case MessageCommand.Ping:
OnPingMessageReceived((PingPayload)msg.Payload);
break;
case MessageCommand.Pong:
OnPongMessageReceived((PingPayload)msg.Payload);
break;
case MessageCommand.Transaction:
if (msg.Payload.Size <= Transaction.MaxTransactionSize)
OnInventoryReceived((Transaction)msg.Payload);
Expand All @@ -110,7 +107,6 @@ private void OnMessage(Message msg)
case MessageCommand.Version:
throw new ProtocolViolationException();
case MessageCommand.Alert:
case MessageCommand.Headers:
case MessageCommand.MerkleBlock:
case MessageCommand.NotFound:
case MessageCommand.Reject:
Expand Down Expand Up @@ -290,6 +286,12 @@ private void OnGetHeadersMessageReceived(GetBlockByIndexPayload payload)
EnqueueMessage(Message.Create(MessageCommand.Headers, HeadersPayload.Create(headers.ToArray())));
}

private void OnHeadersMessageReceived(HeadersPayload payload)
{
if (payload.Headers.Length == 0) return;
erikzhang marked this conversation as resolved.
Show resolved Hide resolved
system.Blockchain.Tell(payload.Headers);
}

private void OnInventoryReceived(IInventory inventory)
{
pendingKnownHashes.Remove(inventory.Hash);
Expand Down Expand Up @@ -330,17 +332,6 @@ private void OnMemPoolMessageReceived()
EnqueueMessage(Message.Create(MessageCommand.Inv, payload));
}

private void OnPingMessageReceived(PingPayload payload)
{
UpdateLastBlockIndex(payload.LastBlockIndex, true);
EnqueueMessage(Message.Create(MessageCommand.Pong, PingPayload.Create(Blockchain.Singleton.Height, payload.Nonce)));
}

private void OnPongMessageReceived(PingPayload payload)
{
UpdateLastBlockIndex(payload.LastBlockIndex, true);
}

private void OnVerackMessageReceived()
{
verack = true;
Expand Down
4 changes: 0 additions & 4 deletions src/neo/Network/P2P/RemoteNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ private void EnqueueMessage(Message message)
case MessageCommand.GetBlocks:
case MessageCommand.GetHeaders:
case MessageCommand.Mempool:
case MessageCommand.Ping:
case MessageCommand.Pong:
is_single = true;
break;
}
Expand Down Expand Up @@ -130,8 +128,6 @@ protected override void OnReceive(object message)
{
if (payload.LastBlockIndex > LastHeightSent)
LastHeightSent = payload.LastBlockIndex;
else if (msg.Command == MessageCommand.Ping)
break;
}
EnqueueMessage(msg);
break;
Expand Down
56 changes: 21 additions & 35 deletions src/neo/Network/P2P/TaskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ public class TaskManager : UntypedActor
internal class Register { public VersionPayload Version; }
internal class Update { public uint LastBlockIndex; public bool RequestTasks; }
internal class NewTasks { public InvPayload Payload; }
public class HeaderTaskCompleted { }
public class RestartTasks { public InvPayload Payload; }
private class Timer { }

private static readonly TimeSpan TimerInterval = TimeSpan.FromSeconds(30);
private static readonly TimeSpan TaskTimeout = TimeSpan.FromMinutes(1);
private static readonly UInt256 HeaderTaskHash = UInt256.Zero;
private static readonly UInt256 MemPoolTaskHash = UInt256.Parse("0x0000000000000000000000000000000000000000000000000000000000000001");

private const int MaxConncurrentTasks = 3;
Expand All @@ -40,6 +42,7 @@ private class Timer { }
private readonly HashSet<uint> failedSyncTasks = new HashSet<uint>();
private readonly Dictionary<IActorRef, TaskSession> sessions = new Dictionary<IActorRef, TaskSession>();
private readonly ICancelable timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimerInterval, TimerInterval, Context.Self, new Timer(), ActorRefs.NoSender);
private bool HasHeaderTask => globalTasks.ContainsKey(HeaderTaskHash);
private uint lastTaskIndex = 0;

public TaskManager(NeoSystem system)
Expand All @@ -51,6 +54,15 @@ public TaskManager(NeoSystem system)
Context.System.EventStream.Subscribe(Self, typeof(Blockchain.RelayResult));
}

private void OnHeaderTaskCompleted()
{
if (!sessions.TryGetValue(Sender, out TaskSession session))
return;
session.InvTasks.Remove(HeaderTaskHash);
DecrementGlobalTask(HeaderTaskHash);
RequestTasks();
}

private bool AssignSyncTask(uint index, TaskSession filterSession = null)
{
if (index <= Blockchain.Singleton.Height || sessions.Values.Any(p => p != filterSession && p.IndexTasks.ContainsKey(index)))
Expand Down Expand Up @@ -78,7 +90,7 @@ private void OnBlock(Block block)
if (session is null) return;
session.IndexTasks.Remove(block.Index);
receivedBlockIndex.TryAdd(block.Index, session);
RequestTasks(false);
RequestTasks();
}

private void OnInvalidBlock(Block invalidBlock)
Expand Down Expand Up @@ -121,7 +133,7 @@ private void OnNewTasks(InvPayload payload)
private void OnPersistCompleted(Block block)
{
receivedBlockIndex.Remove(block.Index);
RequestTasks(false);
RequestTasks();
}

protected override void OnReceive(object message)
Expand All @@ -137,6 +149,9 @@ protected override void OnReceive(object message)
case NewTasks tasks:
OnNewTasks(tasks.Payload);
break;
case HeaderTaskCompleted _:
OnHeaderTaskCompleted();
break;
case RestartTasks restart:
OnRestartTasks(restart.Payload);
break;
Expand Down Expand Up @@ -169,7 +184,7 @@ private void OnRegister(VersionPayload version)
if (session.IsFullNode)
session.InvTasks.TryAdd(MemPoolTaskHash, TimeProvider.Current.UtcNow);
sessions.TryAdd(Sender, session);
RequestTasks(true);
RequestTasks();
}

private void OnUpdate(Update update)
Expand All @@ -178,7 +193,7 @@ private void OnUpdate(Update update)
return;
session.LastBlockIndex = update.LastBlockIndex;
session.ExpireTime = TimeProvider.Current.UtcNow.AddMilliseconds(PingCoolingOffPeriod);
if (update.RequestTasks) RequestTasks(true);
if (update.RequestTasks) RequestTasks();
}

private void OnRestartTasks(InvPayload payload)
Expand Down Expand Up @@ -260,7 +275,7 @@ private void OnTimer()
}
}
}
RequestTasks(true);
RequestTasks();
}

protected override void PostStop()
Expand All @@ -274,12 +289,10 @@ public static Props Props(NeoSystem system)
return Akka.Actor.Props.Create(() => new TaskManager(system)).WithMailbox("task-manager-mailbox");
}

private void RequestTasks(bool sendPing)
private void RequestTasks()
{
if (sessions.Count() == 0) return;

if (sendPing) SendPingMessage();

while (failedSyncTasks.Count() > 0)
{
var failedTask = failedSyncTasks.First();
Expand All @@ -299,33 +312,6 @@ private void RequestTasks(bool sendPing)
if (!AssignSyncTask(++lastTaskIndex)) break;
}
}

private void SendPingMessage()
{
TrimmedBlock block;
using (SnapshotCache snapshot = Blockchain.Singleton.GetSnapshot())
{
block = NativeContract.Ledger.GetTrimmedBlock(snapshot, NativeContract.Ledger.CurrentHash(snapshot));
}

foreach (KeyValuePair<IActorRef, TaskSession> item in sessions)
{
var node = item.Key;
var session = item.Value;

if (session.ExpireTime < TimeProvider.Current.UtcNow ||
(block.Index >= session.LastBlockIndex &&
TimeProvider.Current.UtcNow.ToTimestampMS() - PingCoolingOffPeriod >= block.Timestamp))
{
if (session.InvTasks.Remove(MemPoolTaskHash))
{
node.Tell(Message.Create(MessageCommand.Mempool));
}
node.Tell(Message.Create(MessageCommand.Ping, PingPayload.Create(Blockchain.Singleton.Height)));
session.ExpireTime = TimeProvider.Current.UtcNow.AddMilliseconds(PingCoolingOffPeriod);
}
}
}
}

internal class TaskManagerMailbox : PriorityMailbox
Expand Down
Loading