-
Notifications
You must be signed in to change notification settings - Fork 68
Simplify executor-consensus interface #691
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- a big red diff with -700 lines,
- removing a complex completion loop,
- and a store,
- and a blocking call (see comment) on the direct line to the output of consensus (hence backpressuring it),
- and we need none of that.
... what's not to like? 🤩 🤩 🤩
@@ -185,29 +84,26 @@ impl Subscriber { | |||
tokio::select! { | |||
// Receive the ordered sequence of consensus messages from a consensus node. | |||
Some(message) = self.rx_consensus.recv() => { | |||
// Process the consensus message (synchronize missing messages, download transaction data). | |||
let sequence = self.handle_consensus_message(&message).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am so happy to see this line go away.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very happy to see this simplification.
waiting.push(future); | ||
} | ||
// Send the certificate to the batch loader to download all transactions' data. | ||
self.tx_batch_loader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a metered channel? Are we sure if this blocks it will not take down a tokio select that is required for liveness? Should we make it print a warn then error if the delay is too long? (I have become paranoid now).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is indeed not metered, but it seems that the last PR of @huitseeker fixes this. The batch loader can only block if the network blocks.
executor/src/subscriber.rs
Outdated
.expect("Failed to send message ot batch loader"); | ||
|
||
// Wait for the transaction data to be available in the store. We will then forward these | ||
// transactions to the Executor Core for execution. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe tell us a little more in this comment how this is guaranteed to happen, and therefore liveness if safe?
.await | ||
.map_err(|_| SubscriberError::ExecutorConnectionDropped)?, | ||
Some(message) = waiting.next() => { | ||
if self.tx_executor.send(message?).await.is_err() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same questions about sending as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Red is good! Thanks @asonnino for simplifying this 💯
@@ -300,16 +284,14 @@ impl Node { | |||
store.batch_store.clone(), | |||
execution_state, | |||
tx_reconfigure, | |||
/* rx_consensus */ rx_consensus_to_client, | |||
/* tx_consensus */ tx_client_to_consensus, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah much nicer now!
Remove useless sync
Remove useless sync
Simplify the consensus-executor interface by removing a sync that is not used. This sync was originally added when we planned to run the executor (and Sui) on a different machine than the consensus