Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple Way to Parallel Verification Transaction #1298

Merged
merged 38 commits into from
Dec 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
1bab96b
parallel verify
eryeer Nov 11, 2019
539875c
add create parallel verifier after terminated
eryeer Nov 12, 2019
4f5a229
update UT
eryeer Nov 12, 2019
b2e3600
format
eryeer Nov 12, 2019
6697e13
add UT
eryeer Nov 12, 2019
2df2035
optimize
eryeer Nov 12, 2019
1cbbbf2
Add readonly
shargon Nov 12, 2019
ef36f25
remove numerics
eryeer Nov 12, 2019
08cd901
Merge branch 'pr_multiTxVerifierActor' of github.com:eryeer/neoUT int…
eryeer Nov 12, 2019
5c4013a
Update TransactionParallelVerifier.cs
shargon Nov 12, 2019
9982459
Move feePeerByte again inside the lock
shargon Nov 12, 2019
c755139
Clean
shargon Nov 12, 2019
9756837
update tryGet
eryeer Nov 12, 2019
af606d3
format
eryeer Nov 12, 2019
4b5ff44
format
eryeer Nov 12, 2019
64619f6
Merge branch 'master' into pr_multiTxVerifierActor
erikzhang Nov 13, 2019
782d5a2
Merge branch 'master' into pr_multiTxVerifierActor
lock9 Nov 13, 2019
cfa6598
Merge branch 'master' into pr_multiTxVerifierActor
eryeer Nov 14, 2019
be24593
Merge branch 'master' into pr_multiTxVerifierActor
eryeer Nov 18, 2019
faf8e09
merge Conflict
eryeer Nov 27, 2019
932d529
use new parallel approach
eryeer Nov 27, 2019
9e41b2f
update
eryeer Nov 27, 2019
a000fd9
update
eryeer Nov 27, 2019
919ea0c
refactor
eryeer Nov 28, 2019
4cb2e12
Merge branch 'master' into pr_multiTxVerifierActor
eryeer Nov 28, 2019
09142db
Reorganise
erikzhang Nov 28, 2019
295450d
Remove the useless UT
erikzhang Nov 28, 2019
8fdfcc2
Merge branch 'master' into pr_multiTxVerifierActor
eryeer Nov 29, 2019
6d40876
Merge branch 'master' into pr_multiTxVerifierActor
erikzhang Nov 29, 2019
bff2210
Merge branch 'master' into pr_multiTxVerifierActor
vncoelho Nov 29, 2019
0fb410e
Rename Reverify to VerifyForEachBlock
vncoelho Nov 29, 2019
c84b217
Reorder methods
erikzhang Nov 29, 2019
62e5fe9
Update Transaction.cs
erikzhang Nov 29, 2019
e5124ed
Remove duplicate
shargon Nov 29, 2019
081dac5
Should check `CanTransactionFitInPool()` again
erikzhang Nov 29, 2019
e07331a
Remove duplicate check
erikzhang Nov 29, 2019
e333115
Merge branch 'master' into pr_multiTxVerifierActor
erikzhang Dec 1, 2019
d72241d
More RelayResultReason
erikzhang Dec 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/neo/Consensus/ConsensusService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ internal ConsensusService(IActorRef localNode, IActorRef taskManager, ConsensusC

private bool AddTransaction(Transaction tx, bool verify)
{
if (verify && !tx.Verify(context.Snapshot, context.SendersFeeMonitor.GetSenderFee(tx.Sender)))
if (verify && tx.Verify(context.Snapshot, context.SendersFeeMonitor.GetSenderFee(tx.Sender)) != RelayResultReason.Succeed)
{
Log($"Invalid transaction: {tx.Hash}{Environment.NewLine}{tx.ToArray().ToHexString()}", LogLevel.Warning);
RequestChangeView(ChangeViewReason.TxInvalid);
Expand Down
61 changes: 45 additions & 16 deletions src/neo/Ledger/Blockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Neo.Ledger
{
Expand All @@ -26,6 +27,7 @@ public class Import { public IEnumerable<Block> Blocks; }
public class ImportCompleted { }
public class FillMemoryPool { public IEnumerable<Transaction> Transactions; }
public class FillCompleted { }
private class ParallelVerified { public Transaction Transaction; public bool ShouldRelay; public RelayResultReason VerifyResult; }

public static readonly uint MillisecondsPerBlock = ProtocolSettings.Default.MillisecondsPerBlock;
public const uint DecrementInterval = 2000000;
Expand Down Expand Up @@ -276,7 +278,7 @@ private void OnFillMemoryPool(IEnumerable<Transaction> transactions)
// First remove the tx if it is unverified in the pool.
MemPool.TryRemoveUnVerified(tx.Hash, out _);
// Verify the the transaction
if (!tx.Verify(currentSnapshot, MemPool.SendersFeeMonitor.GetSenderFee(tx.Sender)))
if (tx.Verify(currentSnapshot, MemPool.SendersFeeMonitor.GetSenderFee(tx.Sender)) != RelayResultReason.Succeed)
continue;
// Add to the memory pool
MemPool.TryAdd(tx.Hash, tx);
Expand Down Expand Up @@ -395,22 +397,46 @@ private void OnNewHeaders(Header[] headers)
system.TaskManager.Tell(new TaskManager.HeaderTaskCompleted(), Sender);
}

private RelayResultReason OnNewTransaction(Transaction transaction, bool relay)
private void OnNewTransaction(Transaction transaction, bool relay)
{
RelayResultReason reason;
if (ContainsTransaction(transaction.Hash))
return RelayResultReason.AlreadyExists;
if (!MemPool.CanTransactionFitInPool(transaction))
return RelayResultReason.OutOfMemory;
if (!transaction.Verify(currentSnapshot, MemPool.SendersFeeMonitor.GetSenderFee(transaction.Sender)))
return RelayResultReason.Invalid;
if (!NativeContract.Policy.CheckPolicy(transaction, currentSnapshot))
return RelayResultReason.PolicyFail;

if (!MemPool.TryAdd(transaction.Hash, transaction))
return RelayResultReason.OutOfMemory;
if (relay)
system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = transaction });
return RelayResultReason.Succeed;
reason = RelayResultReason.AlreadyExists;
else if (!MemPool.CanTransactionFitInPool(transaction))
reason = RelayResultReason.OutOfMemory;
else
reason = transaction.VerifyForEachBlock(currentSnapshot, MemPool.SendersFeeMonitor.GetSenderFee(transaction.Sender));
if (reason == RelayResultReason.Succeed)
{
Task.Run(() =>
{
return new ParallelVerified
{
Transaction = transaction,
ShouldRelay = relay,
VerifyResult = transaction.VerifyParallelParts(currentSnapshot)
};
}).PipeTo(Self, Sender);
}
else
{
Sender.Tell(reason);
}
}

private void OnParallelVerified(ParallelVerified parallelVerified)
{
RelayResultReason reason = parallelVerified.VerifyResult;
if (reason == RelayResultReason.Succeed)
{
if (!MemPool.CanTransactionFitInPool(parallelVerified.Transaction))
reason = RelayResultReason.OutOfMemory;
else if (!MemPool.TryAdd(parallelVerified.Transaction.Hash, parallelVerified.Transaction))
reason = RelayResultReason.OutOfMemory;
else if (parallelVerified.ShouldRelay)
system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = parallelVerified.Transaction });
}
Sender.Tell(reason);
}

private void OnPersistCompleted(Block block)
Expand Down Expand Up @@ -443,7 +469,10 @@ protected override void OnReceive(object message)
break;
}
case Transaction transaction:
Sender.Tell(OnNewTransaction(transaction, true));
OnNewTransaction(transaction, true);
break;
case ParallelVerified parallelVerified:
OnParallelVerified(parallelVerified);
break;
case ConsensusPayload payload:
Sender.Tell(OnNewConsensus(payload));
Expand Down
2 changes: 1 addition & 1 deletion src/neo/Ledger/MemoryPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ private int ReverifyTransactions(SortedSet<PoolItem> verifiedSortedTxPool,
// Since unverifiedSortedTxPool is ordered in an ascending manner, we take from the end.
foreach (PoolItem item in unverifiedSortedTxPool.Reverse().Take(count))
{
if (item.Tx.Reverify(snapshot, SendersFeeMonitor.GetSenderFee(item.Tx.Sender)))
if (item.Tx.VerifyForEachBlock(snapshot, SendersFeeMonitor.GetSenderFee(item.Tx.Sender)) == RelayResultReason.Succeed)
{
reverifiedItems.Add(item);
SendersFeeMonitor.AddSenderFee(item.Tx);
Expand Down
2 changes: 2 additions & 0 deletions src/neo/Ledger/RelayResultReason.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ public enum RelayResultReason : byte
OutOfMemory,
UnableToVerify,
Invalid,
Expired,
InsufficientFunds,
PolicyFail,
Unknown
}
Expand Down
58 changes: 33 additions & 25 deletions src/neo/Network/P2P/Payloads/Transaction.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Neo.Cryptography;
using Neo.IO;
using Neo.IO.Json;
using Neo.Ledger;
using Neo.Persistence;
using Neo.SmartContract;
using Neo.SmartContract.Native;
Expand Down Expand Up @@ -205,25 +206,6 @@ public UInt160[] GetScriptHashesForVerifying(StoreView snapshot)
return hashes.OrderBy(p => p).ToArray();
}

public virtual bool Reverify(StoreView snapshot, BigInteger totalSenderFeeFromPool)
{
if (ValidUntilBlock <= snapshot.Height || ValidUntilBlock > snapshot.Height + MaxValidUntilBlockIncrement)
return false;
if (NativeContract.Policy.GetBlockedAccounts(snapshot).Intersect(GetScriptHashesForVerifying(snapshot)).Count() > 0)
return false;
BigInteger balance = NativeContract.GAS.BalanceOf(snapshot, Sender);
BigInteger fee = SystemFee + NetworkFee + totalSenderFeeFromPool;
if (balance < fee) return false;
UInt160[] hashes = GetScriptHashesForVerifying(snapshot);
if (hashes.Length != Witnesses.Length) return false;
for (int i = 0; i < hashes.Length; i++)
{
if (Witnesses[i].VerificationScript.Length > 0) continue;
if (snapshot.Contracts.TryGet(hashes[i]) is null) return false;
}
return true;
}

void ISerializable.Serialize(BinaryWriter writer)
{
((IVerifiable)this).SerializeUnsigned(writer);
Expand Down Expand Up @@ -279,17 +261,43 @@ public static Transaction FromJson(JObject json)

bool IInventory.Verify(StoreView snapshot)
{
return Verify(snapshot, BigInteger.Zero);
return Verify(snapshot, BigInteger.Zero) == RelayResultReason.Succeed;
}

public virtual RelayResultReason Verify(StoreView snapshot, BigInteger totalSenderFeeFromPool)
{
RelayResultReason result = VerifyForEachBlock(snapshot, totalSenderFeeFromPool);
if (result != RelayResultReason.Succeed) return result;
return VerifyParallelParts(snapshot);
}

public virtual RelayResultReason VerifyForEachBlock(StoreView snapshot, BigInteger totalSenderFeeFromPool)
{
if (ValidUntilBlock <= snapshot.Height || ValidUntilBlock > snapshot.Height + MaxValidUntilBlockIncrement)
return RelayResultReason.Expired;
UInt160[] hashes = GetScriptHashesForVerifying(snapshot);
if (NativeContract.Policy.GetBlockedAccounts(snapshot).Intersect(hashes).Any())
return RelayResultReason.PolicyFail;
BigInteger balance = NativeContract.GAS.BalanceOf(snapshot, Sender);
BigInteger fee = SystemFee + NetworkFee + totalSenderFeeFromPool;
if (balance < fee) return RelayResultReason.InsufficientFunds;
if (hashes.Length != Witnesses.Length) return RelayResultReason.Invalid;
for (int i = 0; i < hashes.Length; i++)
{
if (Witnesses[i].VerificationScript.Length > 0) continue;
if (snapshot.Contracts.TryGet(hashes[i]) is null) return RelayResultReason.Invalid;
}
return RelayResultReason.Succeed;
}

public virtual bool Verify(StoreView snapshot, BigInteger totalSenderFeeFromPool)
public RelayResultReason VerifyParallelParts(StoreView snapshot)
{
if (!Reverify(snapshot, totalSenderFeeFromPool)) return false;
int size = Size;
if (size > MaxTransactionSize) return false;
if (size > MaxTransactionSize) return RelayResultReason.Invalid;
long net_fee = NetworkFee - size * NativeContract.Policy.GetFeePerByte(snapshot);
if (net_fee < 0) return false;
return this.VerifyWitnesses(snapshot, net_fee);
if (net_fee < 0) return RelayResultReason.InsufficientFunds;
if (!this.VerifyWitnesses(snapshot, net_fee)) return RelayResultReason.Invalid;
return RelayResultReason.Succeed;
}

public StackItem ToStackItem()
Expand Down
27 changes: 8 additions & 19 deletions src/neo/Network/RPC/RpcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,26 +129,15 @@ private JObject GetInvokeResult(byte[] script, IVerifiable checkWitnessHashes =

private static JObject GetRelayResult(RelayResultReason reason, UInt256 hash)
{
switch (reason)
if (reason == RelayResultReason.Succeed)
{
case RelayResultReason.Succeed:
{
var ret = new JObject();
ret["hash"] = hash.ToString();
return ret;
}
case RelayResultReason.AlreadyExists:
throw new RpcException(-501, "Block or transaction already exists and cannot be sent repeatedly.");
case RelayResultReason.OutOfMemory:
throw new RpcException(-502, "The memory pool is full and no more transactions can be sent.");
case RelayResultReason.UnableToVerify:
throw new RpcException(-503, "The block cannot be validated.");
case RelayResultReason.Invalid:
throw new RpcException(-504, "Block or transaction validation failed.");
case RelayResultReason.PolicyFail:
throw new RpcException(-505, "One of the Policy filters failed.");
default:
throw new RpcException(-500, "Unknown error.");
var ret = new JObject();
ret["hash"] = hash.ToString();
return ret;
}
else
{
throw new RpcException(-500, reason.ToString());
}
}

Expand Down
8 changes: 4 additions & 4 deletions tests/neo.UnitTests/Ledger/UT_MemoryPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ private Transaction CreateTransactionWithFee(long fee)
var randomBytes = new byte[16];
random.NextBytes(randomBytes);
Mock<Transaction> mock = new Mock<Transaction>();
mock.Setup(p => p.Reverify(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(true);
mock.Setup(p => p.Verify(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(true);
mock.Setup(p => p.VerifyForEachBlock(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(RelayResultReason.Succeed);
mock.Setup(p => p.Verify(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(RelayResultReason.Succeed);
mock.Object.Script = randomBytes;
mock.Object.Sender = UInt160.Zero;
mock.Object.NetworkFee = fee;
Expand All @@ -99,8 +99,8 @@ private Transaction CreateTransactionWithFeeAndBalanceVerify(long fee)
random.NextBytes(randomBytes);
Mock<Transaction> mock = new Mock<Transaction>();
UInt160 sender = UInt160.Zero;
mock.Setup(p => p.Reverify(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(((StoreView snapshot, BigInteger amount) => NativeContract.GAS.BalanceOf(snapshot, sender) >= amount + fee));
mock.Setup(p => p.Verify(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(true);
mock.Setup(p => p.VerifyForEachBlock(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns((StoreView snapshot, BigInteger amount) => NativeContract.GAS.BalanceOf(snapshot, sender) >= amount + fee ? RelayResultReason.Succeed : RelayResultReason.InsufficientFunds);
mock.Setup(p => p.Verify(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(RelayResultReason.Succeed);
mock.Object.Script = randomBytes;
mock.Object.Sender = sender;
mock.Object.NetworkFee = fee;
Expand Down
4 changes: 2 additions & 2 deletions tests/neo.UnitTests/Ledger/UT_SendersFeeMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ private Transaction CreateTransactionWithFee(long networkFee, long systemFee)
var randomBytes = new byte[16];
random.NextBytes(randomBytes);
Mock<Transaction> mock = new Mock<Transaction>();
mock.Setup(p => p.Reverify(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(true);
mock.Setup(p => p.Verify(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(true);
mock.Setup(p => p.VerifyForEachBlock(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(RelayResultReason.Succeed);
mock.Setup(p => p.Verify(It.IsAny<StoreView>(), It.IsAny<BigInteger>())).Returns(RelayResultReason.Succeed);
mock.Object.Script = randomBytes;
mock.Object.Sender = UInt160.Zero;
mock.Object.NetworkFee = networkFee;
Expand Down
2 changes: 1 addition & 1 deletion tests/neo.UnitTests/Network/P2P/Payloads/UT_Transaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ public void Transaction_Reverify_Hashes_Length_Unequal_To_Witnesses_Length()
};
UInt160[] hashes = txSimple.GetScriptHashesForVerifying(snapshot);
Assert.AreEqual(2, hashes.Length);
Assert.IsFalse(txSimple.Reverify(snapshot, BigInteger.Zero));
Assert.AreNotEqual(RelayResultReason.Succeed, txSimple.VerifyForEachBlock(snapshot, BigInteger.Zero));
}

[TestMethod]
Expand Down