-
Notifications
You must be signed in to change notification settings - Fork 68
[refactor] use block_waiter instead of batch_loader #738
Conversation
executor/src/subscriber.rs
Outdated
Ok(block) => { | ||
// we successfully received the payload. Now let's add to store | ||
store | ||
.write_all(block.batches.iter().map(|b| (b.id, b.transactions.clone()))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.write_all(block.batches.iter().map(|b| (b.id, b.transactions.clone()))) | |
.write_all(block.batches.into_iter().map(|b| (b.id, b.transactions))) |
e182598
to
2cfb29b
Compare
@@ -1,95 +1,103 @@ | |||
// Copyright (c) 2022, Mysten Labs, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Theoretically I could eliminate the Subscriber
as well and embed the functionality in the executor::Core
. However, the separation with the Subscriber
offers us buffering capabilities, ie if the Core
for some reason stuck we can still continue downloading the certificates' batches and output in the tx_executor
channel, effectively saving time. Of course if tx_executor
fills up we'll stall as well, but that's desired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we may not need a separate thread for the subscriber but I would keep the logic in the core as simple as possible (so Subscriber may remain but perhaps not spawn its own tokio task?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hhm what do you mean no spawn it's own tokio task? I mean, if we don't want to make subscriber block we need to run our loop in a task. Unless you mean something different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can either keep the subscriber as it is (in this PR) or make it a collection of functions that are called by the Executor Core
@@ -1,95 +1,103 @@ | |||
// Copyright (c) 2022, Mysten Labs, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we may not need a separate thread for the subscriber but I would keep the logic in the core as simple as possible (so Subscriber may remain but perhaps not spawn its own tokio task?)
// It's important to have the futures in ordered fashion as we want | ||
// to guarantee that will deliver to the executor the certificates | ||
// in the same order we received from rx_consensus. So it doesn't | ||
// mater if we somehow managed to fetch the batches from a later | ||
// certificate. Unless the earlier certificate's payload has been | ||
// fetched, no later certificate will be delivered. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙏
tx_executor, | ||
tx_get_block_commands, | ||
get_block_retry_policy, | ||
} | ||
.run() | ||
.await | ||
.expect("Failed to run subscriber") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ha cool, if an unrecoverable error happens we shut down here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly, please also see the test here where I am explicitly checking that subscriber will crash in this case.
You can also see the recoverable / irrecoverable cases here . Errors that are mapped to Transient
errors will make the retrier retry again. Errors that are marked as Permanent
will make the retrier stop and eventually make the subscriber crash.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#707 should also help with the crashing :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent work, thanks!
primary/src/metrics.rs
Outdated
@@ -69,7 +69,7 @@ pub struct PrimaryChannelMetrics { | |||
pub tx_primary_messages: IntGauge, | |||
/// occupancy of the channel from the `primary::PrimaryReceiverHandler` to the `primary::Helper` | |||
pub tx_helper_requests: IntGauge, | |||
/// occupancy of the channel from the `primary::ConsensusAPIGrpc` to the `primary::BlockWaiter` | |||
/// occupancy of the channel from the `primary::ConsensusAPIGrpc` & `executor::Subscriber` to the `primary::BlockWaiter` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd add one line saying that in contexts with an external consensus, it will be the former, and when the consensus is internal, it will be the later. This should make clear we don't actually expect 2 simultaneous senders yet.
2cfb29b
to
da5b9f4
Compare
This commit is swapping the batch_loader with the block_waiter in the executor. This allow us to use a common solution for fetching batches and take advantage of the existing payload sync capabilities. Also earlier fix code has been removed for swapping external worker addresses with internal ones.
This commit is swapping the batch_loader with the block_waiter in the executor. This allow us to use a common solution for fetching batches and take advantage of the existing payload sync capabilities. Also earlier fix code has been removed for swapping external worker addresses with internal ones.
This reverts commit babd3d8.
This reverts commit babd3d8.
This reverts commit babd3d8.
…al#738) This commit is swapping the batch_loader with the block_waiter in the executor. This allow us to use a common solution for fetching batches and take advantage of the existing payload sync capabilities. Also earlier fix code has been removed for swapping external worker addresses with internal ones.
Resolves #706
This PR is:
batch_loader
in executorblock_waiter
to fetch the blocks/payloadstemp_batch_store
instead of reusing thebatch_store
where the later is meant to be used from the worker nodes. Also on thetemp_batch_store
we have the ability to store the unserialisedBatch
directly leading to simplification.end to end test
to confirm the consensus output is successfully receivedNOTE: Please review this first / as well #746