-
Notifications
You must be signed in to change notification settings - Fork 370
Feature: Add solidification stage to pipeline and only broadcast solid transactions #1646
base: dev
Are you sure you want to change the base?
Changes from 22 commits
53ea166
be7c132
6593388
33f193f
ff58495
1a0ea43
ad3a203
2283fb0
24e31a5
03590dd
06242eb
1ec3794
bc5897f
62a2330
9fc4cba
ccca691
f0c1955
fa72acc
c3b1417
1538b36
a34c5e8
14e784d
89ec550
8801ef2
26b9521
16170cf
7e34c91
778c833
0b61c89
1a2d950
864c2db
4bfb138
d623d1d
48258b1
1cde4bf
6543981
7b4d3c5
5e0a974
f86fe55
de08907
4086fce
b16c752
f5de8fe
30f226e
303648b
7e36e1c
eb53b2f
0409d3e
1b5294a
ab6e628
839d9cb
031180b
f61e6ff
6e0a383
4126903
ff4a13c
5bd3066
10baada
4327b10
092cfa5
cc220b0
82359a7
7adfdc5
d3baeee
ffea048
0f29416
0084b48
4664000
cc5e8b8
d488a1d
9ad4886
9ab2a7d
5663d30
b2cb269
31223b3
ea3bbf8
e4feaf6
059e7e4
7528b22
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,10 @@ | ||
package com.iota.iri.network.pipeline; | ||
|
||
import com.iota.iri.TransactionValidator; | ||
import com.iota.iri.service.validation.TransactionSolidifier; | ||
import com.iota.iri.service.validation.TransactionValidator; | ||
import com.iota.iri.conf.NodeConfig; | ||
import com.iota.iri.controllers.TipsViewModel; | ||
import com.iota.iri.controllers.TransactionViewModel; | ||
import com.iota.iri.crypto.batched.BatchedHasher; | ||
import com.iota.iri.crypto.batched.BatchedHasherFactory; | ||
import com.iota.iri.crypto.batched.HashRequest; | ||
|
@@ -19,7 +21,10 @@ | |
import com.iota.iri.utils.Converter; | ||
|
||
import java.nio.ByteBuffer; | ||
import java.util.Iterator; | ||
import java.util.LinkedHashSet; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.concurrent.ArrayBlockingQueue; | ||
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.ExecutorService; | ||
|
@@ -62,12 +67,13 @@ public class TransactionProcessingPipelineImpl implements TransactionProcessingP | |
private BroadcastStage broadcastStage; | ||
private BatchedHasher batchedHasher; | ||
private HashingStage hashingStage; | ||
private TransactionSolidifier txSolidifier; | ||
|
||
private BlockingQueue<ProcessingContext> preProcessStageQueue = new ArrayBlockingQueue<>(100); | ||
private BlockingQueue<ProcessingContext> validationStageQueue = new ArrayBlockingQueue<>(100); | ||
private BlockingQueue<ProcessingContext> receivedStageQueue = new ArrayBlockingQueue<>(100); | ||
private BlockingQueue<ProcessingContext> broadcastStageQueue = new ArrayBlockingQueue<>(100); | ||
private BlockingQueue<ProcessingContext> replyStageQueue = new ArrayBlockingQueue<>(100); | ||
private BlockingQueue<ProcessingContext> broadcastStageQueue = new ArrayBlockingQueue<>(100); | ||
|
||
/** | ||
* Creates a {@link TransactionProcessingPipeline}. | ||
|
@@ -82,9 +88,9 @@ public class TransactionProcessingPipelineImpl implements TransactionProcessingP | |
* reply stage | ||
*/ | ||
public TransactionProcessingPipelineImpl(NeighborRouter neighborRouter, NodeConfig config, | ||
TransactionValidator txValidator, Tangle tangle, SnapshotProvider snapshotProvider, | ||
TipsViewModel tipsViewModel, LatestMilestoneTracker latestMilestoneTracker, | ||
TransactionRequester transactionRequester) { | ||
TransactionValidator txValidator, Tangle tangle, SnapshotProvider snapshotProvider, | ||
TipsViewModel tipsViewModel, LatestMilestoneTracker latestMilestoneTracker, | ||
TransactionRequester transactionRequester, TransactionSolidifier txSolidifier) { | ||
DyrellC marked this conversation as resolved.
Show resolved
Hide resolved
|
||
FIFOCache<Long, Hash> recentlySeenBytesCache = new FIFOCache<>(config.getCacheSizeBytes()); | ||
this.preProcessStage = new PreProcessStage(recentlySeenBytesCache); | ||
this.replyStage = new ReplyStage(neighborRouter, config, tangle, tipsViewModel, latestMilestoneTracker, | ||
|
@@ -94,6 +100,7 @@ public TransactionProcessingPipelineImpl(NeighborRouter neighborRouter, NodeConf | |
this.receivedStage = new ReceivedStage(tangle, txValidator, snapshotProvider, transactionRequester); | ||
this.batchedHasher = BatchedHasherFactory.create(BatchedHasherFactory.Type.BCTCURL81, 20); | ||
this.hashingStage = new HashingStage(batchedHasher); | ||
this.txSolidifier = txSolidifier; | ||
} | ||
|
||
@Override | ||
|
@@ -119,6 +126,7 @@ private void addStage(String name, BlockingQueue<ProcessingContext> queue, | |
try { | ||
while (!Thread.currentThread().isInterrupted()) { | ||
ProcessingContext ctx = stage.process(queue.take()); | ||
|
||
switch (ctx.getNextStage()) { | ||
case REPLY: | ||
replyStageQueue.put(ctx); | ||
|
@@ -177,6 +185,7 @@ public BlockingQueue<ProcessingContext> getValidationStageQueue() { | |
public void process(Neighbor neighbor, ByteBuffer data) { | ||
try { | ||
preProcessStageQueue.put(new ProcessingContext(new PreProcessPayload(neighbor, data))); | ||
refillBroadcastQueue(); | ||
} catch (InterruptedException e) { | ||
e.printStackTrace(); | ||
} | ||
|
@@ -191,6 +200,23 @@ public void process(byte[] txTrits) { | |
hashAndValidate(new ProcessingContext(payload)); | ||
} | ||
|
||
@Override | ||
public void refillBroadcastQueue(){ | ||
try{ | ||
Iterator<TransactionViewModel> hashIterator = txSolidifier.getBroadcastQueue().iterator(); | ||
Set<TransactionViewModel> toRemove = new LinkedHashSet<>(); | ||
while(!Thread.currentThread().isInterrupted() && hashIterator.hasNext()){ | ||
TransactionViewModel t = hashIterator.next(); | ||
broadcastStageQueue.put(new ProcessingContext(new BroadcastPayload(null, t))); | ||
toRemove.add(t); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: Small peev: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This refill is what adds solid transactions to the broadcast queue, so here, yes you would send it to all neighbours. I'll change the 1 letter variable though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just making sure, now with the new stage, this problem is resolved? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Due to the asynchronous nature of solidification, there is still no neighbour variable to pass through unless we create a mapping, which imo is not worth it to do. So anything that gets updated as solid will be broadcast to all neighbours. But now only transactions that are solid will end up in the broadcast queue. We no longer forward transactions that have just been received. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to create this mapping There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @DyrellC I am marking this as a point to discuss before merging There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We will need to discuss this, because now that the solidify stage does not add the incoming transactions to the solidification queue anymore, we have no entry point to put the origin neighbour into the process. As is, the only place that places transactions into the solidification queue is from the milestone solidifier. This is something that could be introduced in the milestone stage, I mention that in the new issue #1701 |
||
hashIterator.remove(); | ||
} | ||
txSolidifier.clearBroadcastQueue(toRemove); | ||
} catch(InterruptedException e){ | ||
log.info(e.getMessage()); | ||
} | ||
} | ||
|
||
/** | ||
* Sets up the given hashing stage {@link ProcessingContext} so that up on success, it will submit further to the | ||
* validation stage. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently not sure why this has changed...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In line 138 you gossip the tx to neighbor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was to bolster broadcast rate to neighbours to increase the rate that
transactionsToRequest
are sent out. It was meant to increase solidification speed.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems a bit hacky...
I've been thinking about it and I think this should be the correct design:
You need to add a solidification stage and a solidification queue in
TransactionProcessingPipeline
.The flow should be like this:
From the
ReceivedStage
-> if solid (due to quickly solid) -> Broadcast-> else go to solidification stage -> Broadcast
Also remember to pass neighbor information along so we don't broadcast the solid tx to the neighbor that sent this to us..